轩辕李的博客 轩辕李的博客
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

轩辕李

勇猛精进,星辰大海
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Java

    • 核心

    • 并发

      • Java并发-线程基础与synchronized关键字
      • Java并发-重入锁ReetrantLock的使用
      • Java并发-信号量Semaphore
      • Java并发-读写锁ReadWriteLock
      • Java并发-倒计时器CountDownLatch
      • Java并发-栅栏CyclicBarrier
      • Java并发-LockSupport线程阻塞工具类
      • Java并发-线程池ThreadPoolExecutor
      • Java并发-阻塞队列BlockingQueue
      • Java并发-以空间换时间之ThreadLocal
      • Java并发-无锁策略CAS与atomic包
      • Java并发-JDK并发容器
      • Java并发-异步调用结果之Future和CompletableFuture
      • Java并发-Fork Join框架
        • 一、介绍
          • 1. 什么是Fork/Join框架
          • 2. Fork/Join框架的特点
        • 二、基本概念
          • 1. Fork/Join任务
          • 2. Fork/Join池
          • 3. 工作窃取算法
        • 三、示例代码
          • 1. 创建Fork/Join任务
          • 2. 使用Fork/Join框架执行任务
          • 使用默认的ForkJoinPool.commonPool()作为池
        • 四、Java 8 parallelStream中对于Fork/Join的使用
          • 自定义parallelStream的线程池
        • 五、总结
      • Java并发-调试与诊断
    • 经验

    • JVM

    • 企业应用

  • Spring

  • 其他语言

  • 工具

  • 后端
  • Java
  • 并发
轩辕李
2023-08-13
目录

Java并发-Fork Join框架

# 一、介绍

# 1. 什么是Fork/Join框架

Fork/Join框架是Java并发编程中的一个重要工具,它是在Java 7中引入的。

Fork/Join框架主要用于解决一类常见的并行问题,即将一个大任务拆分成多个小任务并行执行,然后将小任务的结果合并得到最终结果。

# 2. Fork/Join框架的特点

Fork/Join框架具有以下特点:

  • 任务分治:Fork/Join框架通过将大任务划分为多个小任务,实现任务的分解与合并,充分利用多核处理器的性能。
  • 工作窃取算法:Fork/Join框架使用工作窃取算法,即当某个线程执行完自己的任务后,会从其他线程的任务队列中窃取未执行的任务,以保证线程的负载均衡。
  • 线程池支持:Fork/Join框架内部使用了线程池来管理线程的创建和销毁,可以通过配置线程池的参数来调整并发执行的效率。

Fork/Join框架是Java并发编程中一个强大的工具,可以用于解决各种需要并行计算的问题,提高程序的性能和效率。

# 二、基本概念

# 1. Fork/Join任务

Fork/Join任务是指可以被Fork/Join框架处理的可拆分任务,他的基类是ForkJoinTask。一个Fork/Join任务通常继承自ForkJoinTask的子类RecursiveTask或RecursiveAction类。RecursiveTask用于有返回结果的任务,RecursiveAction用于没有返回结果的任务。

在编写Fork/Join任务时,需要实现compute()方法,在compute()方法中定义任务的具体逻辑。如果任务可以进一步拆分为子任务,可以通过调用fork()方法来拆分任务,并返回一个子任务。拆分任务后,可以通过调用join()方法等待子任务的执行结果,并对子任务的结果进行合并得到最终结果。

# 2. Fork/Join池

Fork/Join池是Fork/Join框架的核心组件,用于管理和调度Fork/Join任务的执行。Fork/Join池内部包含了一组工作线程,这些线程用于执行任务并进行工作窃取。

Fork/Join池可以通过ForkJoinPool类来创建和配置。在创建Fork/Join池时,可以指定线程数量、线程的优先级、线程的命名等参数。Fork/Join池还提供了一些方法来执行任务,如invoke()用于执行一个Fork/Join任务并返回结果。

# 3. 工作窃取算法

工作窃取算法是Fork/Join框架中用于实现线程负载均衡的关键策略。在Fork/Join框架中,每个工作线程都有一个任务队列,用于存放待执行的任务。当一个线程执行完自己的任务后,会从其他线程的任务队列中窃取未执行的任务并执行。

通过工作窃取算法,Fork/Join框架可以实现线程的负载均衡。如果某个线程的任务执行速度较快,它可以从其他线程的任务队列中获取更多任务,以保证线程的利用率。这样可以充分利用多核处理器的性能,提高并发执行的效率。

在Fork/Join框架中,工作窃取算法是自动进行的,无需用户手动干预。框架会根据任务的拆分和合并情况,自动进行任务的调度和负载均衡。

# 三、示例代码

# 1. 创建Fork/Join任务

在Fork/Join框架中,我们需要自定义任务类来实现具体的任务逻辑。下面是一个示例代码,展示了如何创建一个Fork/Join任务:

import java.util.concurrent.RecursiveTask;

public class MyTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 10;
    private int start;
    private int end;

    public MyTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - start <= THRESHOLD) {
            // 当任务足够小,直接计算结果
            int sum = 0;
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            // 任务太大,需要拆分为子任务
            int mid = (start + end) / 2;
            MyTask leftTask = new MyTask(start, mid);
            MyTask rightTask = new MyTask(mid + 1, end);

            // 拆分任务并等待子任务的完成
            leftTask.fork();
            rightTask.fork();

            // 合并子任务的结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            return leftResult + rightResult;
        }
    }
}

在这个示例代码中,我们定义了一个名为MyTask的任务类,继承自RecursiveTask<Integer>。任务类中重写了compute()方法,该方法用于定义任务的具体逻辑。

当任务的大小小于等于阈值THRESHOLD时,任务直接计算结果并返回。否则,将任务拆分为两个子任务,分别计算左半部分和右半部分的结果,并将子任务的结果合并得到最终结果。

# 2. 使用Fork/Join框架执行任务

在创建了Fork/Join任务之后,我们需要使用Fork/Join框架来执行任务。下面是一个使用示例代码:

import java.util.concurrent.ForkJoinPool;

public class Main {
    public static void main(String[] args) {
        // 创建Fork/Join池
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        // 创建任务实例
        MyTask myTask = new MyTask(1, 100);

        // 执行任务
        int result = forkJoinPool.invoke(myTask);

        System.out.println("计算结果:" + result);
    }
}

在这个示例代码中,我们首先创建了一个Fork/Join池实例forkJoinPool。然后,创建了一个任务实例myTask,并通过调用forkJoinPool.invoke(myTask)来执行任务。

# 使用默认的ForkJoinPool.commonPool()作为池

我们直接调用MyTask#invoke()方法:

public class Main2 {
    public static void main(String[] args) {
        // 创建任务实例
        MyTask myTask = new MyTask(1, 100);

        // 执行任务
        int result = myTask.invoke(myTask);

        System.out.println("计算结果:" + result);
    }
}

如果直接调用MyTask#invoke()的话,会使用ForkJoinPool.commonPool()作为池。看java.util.concurrent.ForkJoinTask#fork()方法的源码:

    public final ForkJoinTask<V> fork() {
        Thread t; ForkJoinWorkerThread wt;
        ForkJoinPool p; ForkJoinPool.WorkQueue q;
        U.storeStoreFence();  // ensure safely publishable
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            p = (wt = (ForkJoinWorkerThread)t).pool;
            q = wt.workQueue;
        }
        else
            q = (p = ForkJoinPool.common).submissionQueue(false);
        q.push(this, p, true);
        return this;
    }

这里的逻辑是,如果当前线程是ForkJoinWorkerThread,则使用当前线程所属的ForkJoinPool作为池;否则使用ForkJoinPool.commonPool()作为池。

ForkJoinPool#common的初始化逻辑在ForkJoinPool的static块中:

        ForkJoinPool p = common = (System.getSecurityManager() == null) ?
            new ForkJoinPool((byte)0) :
            AccessController.doPrivileged(new PrivilegedAction<>() {
                    public ForkJoinPool run() {
                        return new ForkJoinPool((byte)0); }});
    
    // 初始化的时候调用了ForkJoinPool(byte)构造方法                    
    private ForkJoinPool(byte forCommonPoolOnly) {
        ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory;
        UncaughtExceptionHandler handler = null;
        int maxSpares = DEFAULT_COMMON_MAX_SPARES;
        int pc = 0, preset = 0; // nonzero if size set as property
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            if (pp != null) {
                pc = Math.max(0, Integer.parseInt(pp));
                preset = PRESET_SIZE;
            }
            String ms = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
            if (ms != null)
                maxSpares = Math.max(0, Math.min(MAX_CAP, Integer.parseInt(ms)));
            String sf = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String sh = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (sf != null || sh != null) {
                ClassLoader ldr = ClassLoader.getSystemClassLoader();
                if (sf != null)
                    fac = (ForkJoinWorkerThreadFactory)
                        ldr.loadClass(sf).getConstructor().newInstance();
                if (sh != null)
                    handler = (UncaughtExceptionHandler)
                        ldr.loadClass(sh).getConstructor().newInstance();
            }
        } catch (Exception ignore) {
        }
        if (preset == 0)
            pc = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
        int p = Math.min(pc, MAX_CAP);
        int size = (p == 0) ? 1 : 1 << (33 - Integer.numberOfLeadingZeros(p-1));
        this.parallelism = p;
        ...
    }

可以看到代码中使用了一些环境变量,其中最重要的就是java.util.concurrent.ForkJoinPool.common.parallelism,这个环境变量用来设置ForkJoinPool.commonPool()的最大线程数。如果没有设置这个环境变量,则默认使用当前系统可用核心数-1个,即Runtime.getRuntime().availableProcessors() - 1。

# 四、Java 8 parallelStream中对于Fork/Join的使用

在Java 8中,引入了parallelStream方法来支持并行流操作。并行流操作可以将一个集合或数组分成多个子任务,并使用Fork/Join框架来并行执行这些子任务,从而提高处理数据的效率。

使用parallelStream方法非常简单,只需在集合或数组上调用该方法即可。下面是一个示例代码,展示了如何使用parallelStream方法:

import java.util.Arrays;
import java.util.List;

public class Main {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        int sum = numbers.parallelStream()
                .filter(n -> n % 2 == 0) // 过滤偶数
                .mapToInt(n -> n) // 转换为int类型
                .sum(); // 求和

        System.out.println("计算结果:" + sum);
    }
}

在这个示例代码中,我们首先创建了一个整数列表numbers。然后,通过调用parallelStream方法来创建一个并行流。接下来,我们使用流的一系列操作,如filter过滤、mapToInt转换为int类型、sum求和等。

从Java源码角度分析parallelStream()方法的实现,可以看到它是基于Fork/Join框架来实现并行处理的。

下面是parallelStream()方法的源码实现:

default Stream<E> parallelStream() {
    return StreamSupport.stream(spliterator(), true);
}

可以看到,parallelStream()方法内部调用了StreamSupport.stream()方法,并将第二个参数设置为true。这里的true表示创建的Stream是并行的。

StreamSupport.stream()方法的实现如下:

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);

    return new ReferencePipeline.Head<>(spliterator,
                                         StreamOpFlag.fromCharacteristics(spliterator),
                                         parallel);
}

在stream()方法中,创建了一个新的ReferencePipeline.Head对象,并将传入的spliterator和parallel参数传递给它。ReferencePipeline.Head是Stream API中的一个节点,它实现了Stream接口。

ReferencePipeline.Head的构造方法如下:

ReferencePipeline(Spliterator<?> source,
                  int sourceFlags, boolean parallel) {
    super(source, sourceFlags);
    if (source.hasCharacteristics(Spliterator.DISTINCT)
            && (source.hasCharacteristics(Spliterator.SORTED) || source.hasCharacteristics(Spliterator.ORDERED)))
        throw new IllegalArgumentException("Source must not have the DISTINCT characteristic if terminal operation is ordered or sorted");
    this.sourceStage = source;
    this.sourceSpliterator = source;
    this.depth = 0;
    this.sourceOrOpFlags = sourceFlags;
    this.parallel = parallel;
    this.combinedFlags = StreamOpFlag.combineOpFlags(sourceFlags, StreamOpFlag.toStreamFlags(StreamOpFlag.INITIAL_OPS_VALUE));
}

在ReferencePipeline.Head的构造方法中,将传入的spliterator和parallel参数保存到成员变量中。这里的parallel参数决定了创建的Stream是并行的。

我们以forEach方法举例,最终会调用java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)方法:

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

在evaluate方法中,首先判断当前的Stream是否是并行的,如果是并行的,则调用terminalOp.evaluateParallel()方法来执行并行操作。

这里terminalOp的实现类是java.util.stream.ForEachOps.ForEachOp,具体逻辑:

        @Override
        public <S> Void evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<S> spliterator) {
            if (ordered)
                new ForEachOrderedTask<>(helper, spliterator, this).invoke();
            else
                new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
            return null;
        }

该方法的作用是并行地对一个数据流进行评估。方法接受两个参数:一个是类型为PipelineHelper<T>的helper对象,另一个是类型为Spliterator<S>的spliterator对象。

代码中首先进行了一个判断,如果ordered为真,则创建一个ForEachOrderedTask的实例,并调用其invoke方法。否则,创建一个ForEachTask的实例,并调用其invoke方法。

ForEachOrderedTask和ForEachTask都实现了ForkJoinTask。这里的invoke方法是ForkJoinTask的方法,用于启动任务的执行。

通过以上分析,我们可以看到,Java 8中的并行流底层操作是基于Fork/Join框架来实现的。

# 自定义parallelStream的线程池

要自定义parallelStream的线程池,可以把并行流逻辑放到自定义的ForkJoinPool中执行,示例代码如下:

List<Integer> integerList= IntStream.range(1,1000).boxed().collect(Collectors.toList());

ForkJoinPool customThreadPool = new ForkJoinPool(4);
Integer actualTotal = customThreadPool.submit(
        () -> integerList.parallelStream().reduce(0, Integer::sum)).get();
log.info("{}",actualTotal);

以上面的ForEachTask为例,他的invoke()方法最终调用了compute():

        public void compute() {
            Spliterator<S> rightSplit = spliterator, leftSplit;
            long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
            if ((sizeThreshold = targetSize) == 0L)
                targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
            boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
            boolean forkRight = false;
            Sink<S> taskSink = sink;
            ForEachTask<S, T> task = this;
            while (!isShortCircuit || !taskSink.cancellationRequested()) {
                if (sizeEstimate <= sizeThreshold ||
                    (leftSplit = rightSplit.trySplit()) == null) {
                    task.helper.copyInto(taskSink, rightSplit);
                    break;
                }
                ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
                task.addToPendingCount(1);
                ForEachTask<S, T> taskToFork;
                if (forkRight) {
                    forkRight = false;
                    rightSplit = leftSplit;
                    taskToFork = task;
                    task = leftTask;
                }
                else {
                    forkRight = true;
                    taskToFork = leftTask;
                }
                taskToFork.fork();
                sizeEstimate = rightSplit.estimateSize();
            }
            task.spliterator = null;
            task.propagateCompletion();
        }

这里只需要关注taskToFork.fork()这一行代码,它会将任务提交到ForkJoinPool中执行。

而上面我们已经分析过:如果当前线程是ForkJoinWorkerThread,则使用当前线程所属的ForkJoinPool作为池;否则使用ForkJoinPool.commonPool()作为池。

那么这里的fork()方法就是在自定义线程池中执行的了。

# 五、总结

本文以Java并发的Fork/Join框架为主题,介绍了其基本概念、特点以及使用方法,并提供了示例代码加深对其的理解。

我们还通过部分源码对ForkJoinTask进行了分析,以及分析了Java 8中的并行流底层操作是基于Fork/Join框架来实现的。

希望通过学习本文,你能够对Fork/Join框架有一个更深入的理解。

祝你变得更强!

编辑 (opens new window)
上次更新: 2023/08/16
Java并发-异步调用结果之Future和CompletableFuture
Java并发-调试与诊断

← Java并发-异步调用结果之Future和CompletableFuture Java并发-调试与诊断→

最近更新
01
Spring Boot版本新特性
09-15
02
Spring框架版本新特性
09-01
03
Spring Boot开发初体验
08-15
更多文章>
Theme by Vdoing | Copyright © 2018-2025 京ICP备2021021832号-2 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式