Fork-join framework是Java 7并行庫的新內(nèi)容,基于divide-and-conquer算法來處理大數(shù)據(jù)量計算。DnQ是處理微粒度并行計算(數(shù)據(jù)量大,單位運行時間短)的理想模式。數(shù)據(jù)量在達(dá)到一個預(yù)定義的門檻之前,被分割成多個任務(wù)被Worker threads執(zhí)行。因為現(xiàn)代Java虛擬機(jī)都把Java thread映射到系統(tǒng)線程或LWP(Light-weight process) ,同時Worker數(shù)量一般設(shè)定等同于CPU個數(shù),這樣在多核的硬件系統(tǒng)中能充分利用多個CPU的計算能力。
寫了一個MergeSort的測試?yán)樱罱K的排序用的是Java Collection Framework 自帶的Arrays.sort()。在自己雙核機(jī)器試了試,發(fā)現(xiàn)提升不是特別明顯。Arrays.sort 本身很高效,F(xiàn)ramework有thread之間協(xié)作和管理worker pool的開銷,所以必須選擇一個適合的數(shù)據(jù)量闞值。下面是運行結(jié)果:
java -Xms64m -Xmx128m -cp C;/forkjoin/jsr166y.zip;C:/workspace/java.tij forkjoin.SortTask
Number of processor 2
=================Sequential ===================
Sorting takes 2617701971 to complete
=================ForkJoin ====================
Sorting takes 2284940405 to complete
找不到更多核的機(jī)器,有條件的同學(xué)可以測試一把。另外,Brain Goetz (Java Concurrency in Practice作者) 的文章可參考,他的測試?yán)语@示了不錯的性能提升(最高17倍

在32cpu系統(tǒng)),一般4核或8核的能達(dá)到3倍或5倍的
SPEEDUP
Java thread and practice: Stick a fork in it Part 1 -
http://www.ibm.com/developerworks/java/library/j-jtp11137.html
package forkjoin;

import jsr166y.forkjoin.RecursiveAction;
import jsr166y.forkjoin.ForkJoinPool;
import java.util.Random;
import java.util.Arrays;


public class SortTask extends RecursiveAction
{

final static int ARRAY_LENGTH = 10000000;

final static int THRESHOLD = 3000000;

final int[] array;

final int lo;

final int hi;


public SortTask(int[] array, int lo, int hi)
{
this.array = array;
this.lo = lo;
this.hi = hi;
}


private void sequentiallySort(int[] array, int lo, int hi)
{
int[] units = new int[hi - lo + 1];
for (int i = lo; i <= hi; i++)
units[i - lo] = array[i];
Arrays.sort(units);
for (int i = lo; i <= hi; i++)
array[i] = units[i - lo];
}


private void merge(int[] array, int lo, int mid, int hi)
{

int[] units = new int[hi - lo + 1];
int i = lo;
int j = mid + 1;


for (int k = 0; k < units.length; k++)
{
if (array[i] <= array[j])
units[k] = array[i++];
else if (array[i] > array[j])
units[k] = array[j++];

if (i > mid)
for (int m = j; m <= hi; m++)
units[++k] = array[m];
else if (j > hi)
for (int m = i; m <= mid; m++)
units[++k] = array[m];
}

for (int k = lo; k <= hi; k++)
array[k] = units[k - lo];

}


protected void compute()
{

try
{
if (hi - lo < THRESHOLD)
sequentiallySort(array, lo, hi);

else
{
int mid = (lo + hi) >>> 1;
//System.out.println(mid);
forkJoin(new SortTask(array, lo, mid), new SortTask(array, mid + 1, hi));

merge(array, lo, mid, hi);
}

} catch (Throwable t)
{
t.printStackTrace();
}
}


/** *//**
* @param args
*/

public static void main(String[] args)
{
int[] sample = new int[ARRAY_LENGTH];

System.out.println("Number of processor"
+ Runtime.getRuntime().availableProcessors());
Random seed = new Random(47);


for (int i = 0; i < sample.length; i++)
{
sample[i] = seed.nextInt();
}

long start = System.nanoTime();
Arrays.sort(sample);
long duration = System.nanoTime() - start;


System.out.println("===============Sequential==================");
System.out.println("Sorting takes " + duration + " to compelte");

int[] sample2 = new int[ARRAY_LENGTH];


for (int i = 0; i < sample2.length; i++)
{
sample2[i] = seed.nextInt();
}

ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime()
.availableProcessors());
SortTask st = new SortTask(sample2, 0, sample2.length - 1);

start = System.nanoTime();
pool.execute(st);

while (!st.isDone())
{
}
duration = System.nanoTime() - start;

System.out.println("===============ForkJoin==================");
System.out.println("Sorting takes " + duration + " to compelte");
}

}
