欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java7之forkjoin简介_动力节点Java学院整理

程序员文章站 2024-02-15 13:09:30
java7引入了fork join的概念,来更好的支持并行运算。顾名思义,fork join类似与流程语言的分支,合并的概念。也就是说java7 se原生支持了在一个主线程...

java7引入了fork join的概念,来更好的支持并行运算。顾名思义,fork join类似与流程语言的分支,合并的概念。也就是说java7 se原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟fork join,这也就可以实现fork join的嵌套。

有两个核心类forkjoinpool和forkjointask。

forkjoinpool实现了executorservice接口,起到线程池的作用。所以他的用法和executor框架的使用时一样的,当然fork join本身就是executor框架的扩展。forkjoinpool有3个关键的方法,来启动线程,execute(…),invoke(…),submit(…)。具体描述如下:

<span style='font-size: 9pt;"微软雅黑","sans-serif"; color: #333333;"border-top: windowtext 1pt solid; border-right: windowtext 1pt solid; border-bottom: windowtext 1pt solid; padding-bottom: 0cm; padding-top: 0cm; padding-left: 0cm; border-left: windowtext 1pt solid; padding-right: 0cm; background-color: transparent;"> <p style="background: white; text-align: left; line-height: normal;" align=left><span style='font-size: 9pt;"微软雅黑","sans-serif"; color: #333333;

forkjointask是分支合并的执行任何,分支合并的业务逻辑使用者可以再继承了这个抽先类之后,在抽象方法exec()中实现。其中exec()的返回结果和forkjoinpool的执行调用方(execute(…),invoke(…),submit(…)),共同决定着线程是否阻塞,具体请看下面的测试用例。

首先,用户需要创建一个自己的forkjointask。代码如下:

public class myforkjointask extends forkjointask {
 
 /**
  *
  */
 private static final long serialversionuid = 1l;
 private v value;
 private boolean success = false;
 @override
 public v getrawresult() {
  return value;
 }
 @override
 protected void setrawresult(v value) {
  this.value = value;
 }
 @override
 protected boolean exec() {
  system.out.println("exec");
  return this.success;
 }
 public boolean issuccess() {
  return success;
 }
 public void setsuccess(boolean issuccess) {
  this.success = issuccess;
 }
}

测试forkjoinpool.invoke(…):

 @test
 public void testforkjoininvoke() throws interruptedexception, executionexception {
  forkjoinpool forkjoinpool = new forkjoinpool();
  myforkjointask task = new myforkjointask();
  task.setsuccess(true);
  task.setrawresult("test");
  string invokeresult = forkjoinpool.invoke(task);
  assertequals(invokeresult, "test");
 }
 @test
 public void testforkjoininvoke2() throws interruptedexception, executionexception {
  final forkjoinpool forkjoinpool = new forkjoinpool();
  final myforkjointask task = new myforkjointask();
  new thread(new runnable() {
   public void run() {
    try {
     thread.sleep(1000);
    } catch (interruptedexception e) {
    }
    task.complete("test");
   }
  }).start();
  // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
  string result = forkjoinpool.invoke(task);
  system.out.println(result);
 }
 @test
 public void testforkjoinsubmit() throws interruptedexception, executionexception {
  final forkjoinpool forkjoinpool = new forkjoinpool();
  final myforkjointask task = new myforkjointask();
  task.setsuccess(true); // 是否在此任务运行完毕后结束阻塞
  forkjointask result = forkjoinpool.submit(task);
  result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
 }

测试forkjoinpool.submit(…):

@test
 public void testforkjoinsubmit() throws interruptedexception, executionexception {
  final forkjoinpool forkjoinpool = new forkjoinpool();
  final myforkjointask task = new myforkjointask();
  task.setsuccess(true); // 是否在此任务运行完毕后结束阻塞
  forkjointask result = forkjoinpool.submit(task);
  result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
 }
 @test
 public void testforkjoinsubmit2() throws interruptedexception, executionexception {
  final forkjoinpool forkjoinpool = new forkjoinpool();
  final myforkjointask task = new myforkjointask();
  forkjoinpool.submit(task);
  thread.sleep(1000);
 }
 @test
 public void testforkjoinsubmit3() throws interruptedexception, executionexception {
  final forkjoinpool forkjoinpool = new forkjoinpool();
  final myforkjointask task = new myforkjointask();
  new thread(new runnable() {
   public void run() {
    try {
     thread.sleep(1000);
    } catch (interruptedexception e) {
    }
    task.complete("test");
   }
  }).start();
  forkjointask result = forkjoinpool.submit(task);
  // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
  result.get();
  thread.sleep(1000);
 }

测试forkjoinpool.execute(…):

 @test
 public void testforkjoinexecute() throws interruptedexception, executionexception {
  forkjoinpool forkjoinpool = new forkjoinpool();
  myforkjointask task = new myforkjointask();
  forkjoinpool.execute(task); // 异步执行,无视task.exec()返回值。
 }

在实际情况中,很多时候我们都需要面对经典的“分治”问题。要解决这类问题,主要任务通常被分解为多个任务块(分解阶段),其后每一小块任务被独立并行计算。一旦计算任务完成,每一快的结果会被合并或者解决(解决阶段)。forkjointask天然就是为了支持“分治”问题的。

分支/合并的完整过程如下: 

Java7之forkjoin简介_动力节点Java学院整理

下面列举一个分治算法的实例。

import java.util.random;
import java.util.concurrent.forkjoinpool;
import java.util.concurrent.recursivetask;
public class maximumfinder extends recursivetask<integer> {
 private static final int sequential_threshold = 5;
 private final int[] data;
 private final int start;
 private final int end;
 public maximumfinder(int[] data, int start, int end) {
 this.data = data;
 this.start = start;
 this.end = end;
 }
 public maximumfinder(int[] data) {
 this(data, 0, data.length);
 }
 @override
 protected integer compute() {
 final int length = end - start;
 if (length < sequential_threshold) {
  return computedirectly();
 }
 final int split = length / 2;
 final maximumfinder left = new maximumfinder(data, start, start + split);
 left.fork();
 final maximumfinder right = new maximumfinder(data, start + split, end);
 return math.max(right.compute(), left.join());
 }
 private integer computedirectly() {
 system.out.println(thread.currentthread() + ' computing: ' + start
      + ' to ' + end);
 int max = integer.min_value;
 for (int i = start; i < end; i++) {
  if (data[i] > max) {
  max = data[i];
  }
 }
 return max;
 }
 public static void main(string[] args) {
 // create a random data set
 final int[] data = new int[1000];
 final random random = new random();
 for (int i = 0; i < data.length; i++) {
  data[i] = random.nextint(100);
 }
 // submit the task to the pool
 final forkjoinpool pool = new forkjoinpool(4);
 final maximumfinder finder = new maximumfinder(data);
 system.out.println(pool.invoke(finder));
 }
}

以上所示是小编给大家介绍的java7之forkjoin简介_动力节点java学院整理,希望对大家有所帮助