AbstractTask 类
AbstractTask 的核心作用是:定义一个通用的、基于 Fork/Join 框架的分治任务模型。它封装了任务分裂、子任务管理和结果传递的通用逻辑,使得子类可以专注于实现具体的计算任务,而无需关心并行的底层细节。
// ... existing code ...
@SuppressWarnings("serial")
abstract class AbstractTask<P_IN, P_OUT, R,K extends AbstractTask<P_IN, P_OUT, R, K>>extends CountedCompleter<R> {
// ... existing code ...
extends CountedCompleter<R>: 这是整个并行机制的核心。它继承自CountedCompleter,这是一种特殊的ForkJoinTask。CountedCompleter能够自动管理子任务的完成状态。当一个任务的所有子任务都执行完毕后,它的onCompletion方法会被自动回调,这非常适合处理分治算法中“合并”结果的阶段。- 泛型参数:
P_IN: 流管道的输入元素类型。P_OUT: 流管道的输出元素类型。R: 任务的中间结果或最终结果的类型。K: 任务自身的类型,用于类型安全的父子、兄弟节点引用(一种典型的递归泛型模式)。
核心字段
helper:PipelineHelper对象,封装了从源到当前操作的所有中间操作。所有子任务共享同一个helper。spliterator: 每个任务都关联一个Spliterator,它描述了当前任务需要处理的数据源部分。targetSize: 目标叶子任务大小。这是一个阈值,用于决定任务是应该继续分裂还是直接计算。leftChild,rightChild: 指向左右子任务的引用,用于构建任务树。localResult: 存储当前任务计算出的本地结果。
compute() 方法
compute() 方法是 AbstractTask 的灵魂,它实现了通用的分治(Divide and Conquer)算法。
// ... existing code ...@Overridepublic void compute() {Spliterator<P_IN> rs = spliterator, ls; // right, left spliteratorslong sizeEstimate = rs.estimateSize();long sizeThreshold = getTargetSize(sizeEstimate);boolean forkRight = false;@SuppressWarnings("unchecked") K task = (K) this;// 1. 分裂循环 (Divide)while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {K leftChild, rightChild, taskToFork;task.leftChild = leftChild = task.makeChild(ls);task.rightChild = rightChild = task.makeChild(rs);task.setPendingCount(1);if (forkRight) {forkRight = false;rs = ls;task = leftChild;taskToFork = rightChild;}else {forkRight = true;task = rightChild;taskToFork = leftChild;}taskToFork.fork(); // 异步执行一个子任务sizeEstimate = rs.estimateSize();}// 2. 计算叶子节点 (Conquer)task.setLocalResult(task.doLeaf());// 3. 完成task.tryComplete();}
// ... existing code ...
分裂循环 (Divide):
while循环是分裂阶段。- 只要当前任务的数据量(
sizeEstimate)大于阈值(sizeThreshold),并且spliterator可以成功分裂 (trySplit()),循环就会继续。 - 在循环内部,它通过
makeChild(一个抽象方法,由子类实现)创建左右两个子任务。 taskToFork.fork(): 将其中一个子任务提交到ForkJoinPool中异步执行。- 另一个子任务则由当前线程继续在
while循环中处理(这是一种尾递归优化,避免了方法调用的深度堆栈)。 forkRight变量用于交替fork左右子任务,以应对Spliterator可能产生的不均衡分裂。
- 只要当前任务的数据量(
计算叶子节点 (Conquer): 当任务小到无法再分裂时,循环结束。
task.doLeaf(): 调用doLeaf()这个抽象方法。这是子类需要实现的核心计算逻辑。比如,ReduceTask会在这里执行归约操作,CollectorTask会在这里把元素收集到Builder中。task.setLocalResult(...): 将计算结果保存在localResult字段中。
完成 (Combine):
task.tryComplete(): 通知CountedCompleter框架,当前叶子任务已完成。这会触发父任务的待完成计数器减一。当计数器归零时,父任务的onCompletion方法会被调用,从而实现结果的合并。
抽象方法与子类职责
AbstractTask 定义了两个关键的抽象方法,将“做什么”与“怎么做”分离:
protected abstract K makeChild(Spliterator<P_IN> spliterator);: 子类必须告诉AbstractTask如何创建自己的同类实例。这使得分裂逻辑可以通用。protected abstract R doLeaf();: 子类必须定义在叶子节点上执行的具体计算。这是任务的核心价值所在。
此外,子类通常会重写 onCompletion(CountedCompleter<?> caller) 方法,以定义如何将子任务的结果合并到父任务中。
AbstractShortCircuitTask
AbstractShortCircuitTask 的核心目的是为 并行流中的短路操作(Short-Circuiting Operations) 提供一个统一的、可复用的执行框架。
anyMatch, allMatch, noneMatch,以及 findFirst, findAny 都属于短路操作。这些操作的共同特点是:一旦在并行处理的某个分支中找到了一个可以决定最终结果的元素,就需要一种机制来尽快地通知所有其他并行的任务“停止工作”,从而避免不必要的计算。AbstractShortCircuitTask 就是为了实现这个“通知并停止”的机制而设计的。
它继承自 AbstractTask,后者是所有 Stream 并行任务的基类,提供了任务拆分(trySplit)和子任务创建(makeChild)等基础功能。
核心设计思想与关键字段
AbstractShortCircuitTask 的设计精髓在于它如何管理和同步并行任务之间的状态。
// ... existing code ...
@SuppressWarnings("serial")
abstract class AbstractShortCircuitTask<P_IN, P_OUT, R,K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>>extends AbstractTask<P_IN, P_OUT, R, K> {/*** The result for this computation; this is shared among all tasks and set* exactly once*/protected final AtomicReference<R> sharedResult;/*** Indicates whether this task has been canceled.* ...*/protected volatile boolean canceled;
// ... existing code ...
protected final AtomicReference<R> sharedResult;这是整个设计的核心。它是一个被所有相关 Fork/Join 任务(包括根任务和所有子任务)共享的原子引用。- 共享:子任务在创建时会直接引用父任务的
sharedResult实例,确保大家操作的是同一个对象。 - 原子性:使用
AtomicReference保证了多线程环境下对结果的写入是线程安全的。 - 作用:它的初始值为
null。任何一个子任务一旦找到了可以“短路”的最终结果,就会尝试通过compareAndSet(null, result)将这个结果写入sharedResult。由于 CAS 操作的原子性,只有第一个成功写入的线程会生效。一旦sharedResult不再是null,就标志着整个大任务的最终结果已经产生。
- 共享:子任务在创建时会直接引用父任务的
protected volatile boolean canceled;这是一个任务级的取消标志。volatile关键字确保了其在多线程之间的可见性。它主要用于实现有序短路操作,比如findFirst。
compute() - 任务执行与短路检查
compute() 方法是 Fork/Join 任务的执行入口。AbstractShortCircuitTask 重写了这个方法,在标准的任务拆分逻辑中加入了短路检查。
// ... existing code ...@Overridepublic void compute() {// ... (任务拆分准备) ...AtomicReference<R> sr = sharedResult;R result;while ((result = sr.get()) == null) { // 核心检查点if (task.taskCanceled()) { // 检查任务是否被取消result = task.getEmptyResult();break;}// ... (标准的任务拆分和执行逻辑) ...// 如果当前任务块足够小,就执行 doLeaf()if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {result = task.doLeaf();break;}// ... (否则,拆分成左右子任务并 fork 一个) ...}task.setLocalResult(result);task.tryComplete();}protected boolean taskCanceled() {boolean cancel = canceled;if (!cancel) {for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent())cancel = parent.canceled;}return cancel;}
// ... existing code ...
compute 方法的核心是一个 while 循环,其循环条件 (result = sr.get()) == null 是短路机制的关键。在每次进行任务拆分或执行叶子任务(doLeaf)之前,它都会检查共享的 sharedResult。如果发现 sharedResult 已经被其他任务写入了值,循环就会立即终止,当前任务也就不会再进行后续的计算,从而实现了短路。
shortCircuit(R result) - 设置短路结果
当一个叶子任务(在 doLeaf() 方法中)计算出了一个可以决定全局结果的值时,它会调用这个方法。
// ... existing code ...protected void shortCircuit(R result) {if (result != null)sharedResult.compareAndSet(null, result);}
// ... existing code ...
这个方法非常简单,就是尝试用 CAS 操作将结果写入 sharedResult。第一个成功的任务会“胜出”,后续所有调用 shortCircuit 的任务都会因为 sharedResult 不再是 null 而失败,这保证了结果只会被设置一次。
其它方法
getEmptyResult() - 获取默认结果
这是一个抽象方法,需要子类去实现。它定义了在没有发生短路,并且所有元素都处理完毕后,应该返回的默认结果。
- 对于
anyMatch,如果遍历完都没找到匹配项,默认结果是false。 - 对于
allMatch,如果遍历完所有项都匹配,默认结果是true。 - 对于
findFirst,如果流为空,默认结果是Optional.empty()。
cancel() 和 cancelLaterNodes() - 有序操作的短路
对于像 findFirst 这样的有序操作,仅仅找到一个结果是不够的。我们必须确保找到的是 遭遇顺序(encounter order) 中最靠前的那一个。
cancelLaterNodes() 方法就是为此设计的。当一个任务(比如处理流中 0-100 号元素的任务)找到了一个结果后,它会调用 cancelLaterNodes()。这个方法会沿着任务树向上回溯,并把所有在它“右边”的兄弟任务以及父节点的右兄弟任务的 canceled 标志都设置为 true。这些被取消的任务在 compute 循环的 taskCanceled() 检查点就会提前退出。
这确保了只有遭遇顺序最靠前的任务产生的结果才会被接受,所有处理更后面元素的任务都会被取消。
子类如何使用
像我们之前分析的 MatchTask 和 FindTask 都会继承 AbstractShortCircuitTask。它们需要:
- 提供
getEmptyResult()的具体实现。 - 实现
doLeaf()方法。在这个方法里,它们会处理一小块数据,如果发现了短路条件,就调用shortCircuit(result)。
总结
AbstractShortCircuitTask 是 Java Stream 并行短路操作的基石。它通过一个所有任务共享的 AtomicReference (sharedResult) 作为通信和同步的媒介,构建了一个优雅而高效的并行短路框架。
- 核心机制:任务在执行前检查
sharedResult,如果已有结果则提前退出。 - 结果写入:叶子任务通过
shortCircuit()方法以原子方式写入第一个有效结果。 - 有序性支持:通过
cancel()和cancelLaterNodes()机制,为findFirst等有序操作提供了额外的取消逻辑,保证了结果的正确性。
这个类的设计充分体现了在并行计算中,如何通过共享状态和原子操作来协调多个并发任务,以最高效率达成共同的目标。



















