Java Stream 使用 Fork/Join框架的分治任务模型

AbstractTask 类

AbstractTask 的核心作用是:定义一个通用的、基于 Fork/Join 框架的分治任务模型。它封装了任务分裂、子任务管理和结果传递的通用逻辑,使得子类可以专注于实现具体的计算任务,而无需关心并行的底层细节。


 

// ... existing code ...
@SuppressWarnings("serial")
abstract class AbstractTask<P_IN, P_OUT, R,K extends AbstractTask<P_IN, P_OUT, R, K>>extends CountedCompleter<R> {
// ... existing code ...
  • extends CountedCompleter<R>: 这是整个并行机制的核心。它继承自 CountedCompleter,这是一种特殊的 ForkJoinTaskCountedCompleter 能够自动管理子任务的完成状态。当一个任务的所有子任务都执行完毕后,它的 onCompletion 方法会被自动回调,这非常适合处理分治算法中“合并”结果的阶段。
  • 泛型参数:
    • P_IN: 流管道的输入元素类型。
    • P_OUT: 流管道的输出元素类型。
    • R: 任务的中间结果或最终结果的类型。
    • K: 任务自身的类型,用于类型安全的父子、兄弟节点引用(一种典型的递归泛型模式)。

核心字段

  • helperPipelineHelper 对象,封装了从源到当前操作的所有中间操作。所有子任务共享同一个 helper
  • spliterator: 每个任务都关联一个 Spliterator,它描述了当前任务需要处理的数据源部分。
  • targetSize: 目标叶子任务大小。这是一个阈值,用于决定任务是应该继续分裂还是直接计算。
  • leftChildrightChild: 指向左右子任务的引用,用于构建任务树。
  • localResult: 存储当前任务计算出的本地结果。

compute() 方法

compute() 方法是 AbstractTask 的灵魂,它实现了通用的分治(Divide and Conquer)算法。

// ... existing code ...@Overridepublic void compute() {Spliterator<P_IN> rs = spliterator, ls; // right, left spliteratorslong sizeEstimate = rs.estimateSize();long sizeThreshold = getTargetSize(sizeEstimate);boolean forkRight = false;@SuppressWarnings("unchecked") K task = (K) this;// 1. 分裂循环 (Divide)while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {K leftChild, rightChild, taskToFork;task.leftChild  = leftChild = task.makeChild(ls);task.rightChild = rightChild = task.makeChild(rs);task.setPendingCount(1);if (forkRight) {forkRight = false;rs = ls;task = leftChild;taskToFork = rightChild;}else {forkRight = true;task = rightChild;taskToFork = leftChild;}taskToFork.fork(); // 异步执行一个子任务sizeEstimate = rs.estimateSize();}// 2. 计算叶子节点 (Conquer)task.setLocalResult(task.doLeaf());// 3. 完成task.tryComplete();}
// ... existing code ...
  1. 分裂循环 (Divide)while 循环是分裂阶段。

    • 只要当前任务的数据量(sizeEstimate)大于阈值(sizeThreshold),并且 spliterator 可以成功分裂 (trySplit()),循环就会继续。
    • 在循环内部,它通过 makeChild(一个抽象方法,由子类实现)创建左右两个子任务。
    • taskToFork.fork(): 将其中一个子任务提交到 ForkJoinPool 中异步执行。
    • 另一个子任务则由当前线程继续在 while 循环中处理(这是一种尾递归优化,避免了方法调用的深度堆栈)。
    • forkRight 变量用于交替 fork 左右子任务,以应对 Spliterator 可能产生的不均衡分裂。
  2. 计算叶子节点 (Conquer): 当任务小到无法再分裂时,循环结束。

    • task.doLeaf(): 调用 doLeaf() 这个抽象方法。这是子类需要实现的核心计算逻辑。比如,ReduceTask 会在这里执行归约操作,CollectorTask 会在这里把元素收集到 Builder 中。
    • task.setLocalResult(...): 将计算结果保存在 localResult 字段中。
  3. 完成 (Combine):

    • task.tryComplete(): 通知 CountedCompleter 框架,当前叶子任务已完成。这会触发父任务的待完成计数器减一。当计数器归零时,父任务的 onCompletion 方法会被调用,从而实现结果的合并。

抽象方法与子类职责

AbstractTask 定义了两个关键的抽象方法,将“做什么”与“怎么做”分离:

  • protected abstract K makeChild(Spliterator<P_IN> spliterator);子类必须告诉 AbstractTask 如何创建自己的同类实例。这使得分裂逻辑可以通用。
  • protected abstract R doLeaf();子类必须定义在叶子节点上执行的具体计算。这是任务的核心价值所在。

此外,子类通常会重写 onCompletion(CountedCompleter<?> caller) 方法,以定义如何将子任务的结果合并到父任务中。

AbstractShortCircuitTask

AbstractShortCircuitTask 的核心目的是为 并行流中的短路操作(Short-Circuiting Operations) 提供一个统一的、可复用的执行框架。

anyMatchallMatchnoneMatch,以及 findFirstfindAny 都属于短路操作。这些操作的共同特点是:一旦在并行处理的某个分支中找到了一个可以决定最终结果的元素,就需要一种机制来尽快地通知所有其他并行的任务“停止工作”,从而避免不必要的计算。AbstractShortCircuitTask 就是为了实现这个“通知并停止”的机制而设计的。

它继承自 AbstractTask,后者是所有 Stream 并行任务的基类,提供了任务拆分(trySplit)和子任务创建(makeChild)等基础功能。

核心设计思想与关键字段

AbstractShortCircuitTask 的设计精髓在于它如何管理和同步并行任务之间的状态。

// ... existing code ...
@SuppressWarnings("serial")
abstract class AbstractShortCircuitTask<P_IN, P_OUT, R,K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>>extends AbstractTask<P_IN, P_OUT, R, K> {/*** The result for this computation; this is shared among all tasks and set* exactly once*/protected final AtomicReference<R> sharedResult;/*** Indicates whether this task has been canceled.* ...*/protected volatile boolean canceled;
// ... existing code ...
  • protected final AtomicReference<R> sharedResult; 这是整个设计的核心。它是一个被所有相关 Fork/Join 任务(包括根任务和所有子任务)共享的原子引用。

    • 共享:子任务在创建时会直接引用父任务的 sharedResult 实例,确保大家操作的是同一个对象。
    • 原子性:使用 AtomicReference 保证了多线程环境下对结果的写入是线程安全的。
    • 作用:它的初始值为 null。任何一个子任务一旦找到了可以“短路”的最终结果,就会尝试通过 compareAndSet(null, result) 将这个结果写入 sharedResult。由于 CAS 操作的原子性,只有第一个成功写入的线程会生效。一旦 sharedResult 不再是 null,就标志着整个大任务的最终结果已经产生。
  • protected volatile boolean canceled; 这是一个任务级的取消标志。volatile 关键字确保了其在多线程之间的可见性。它主要用于实现有序短路操作,比如 findFirst

compute() - 任务执行与短路检查

compute() 方法是 Fork/Join 任务的执行入口。AbstractShortCircuitTask 重写了这个方法,在标准的任务拆分逻辑中加入了短路检查。

// ... existing code ...@Overridepublic void compute() {// ... (任务拆分准备) ...AtomicReference<R> sr = sharedResult;R result;while ((result = sr.get()) == null) { // 核心检查点if (task.taskCanceled()) { // 检查任务是否被取消result = task.getEmptyResult();break;}// ... (标准的任务拆分和执行逻辑) ...// 如果当前任务块足够小,就执行 doLeaf()if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {result = task.doLeaf();break;}// ... (否则,拆分成左右子任务并 fork 一个) ...}task.setLocalResult(result);task.tryComplete();}protected boolean taskCanceled() {boolean cancel = canceled;if (!cancel) {for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent())cancel = parent.canceled;}return cancel;}
// ... existing code ...

compute 方法的核心是一个 while 循环,其循环条件 (result = sr.get()) == null 是短路机制的关键。在每次进行任务拆分或执行叶子任务(doLeaf)之前,它都会检查共享的 sharedResult。如果发现 sharedResult 已经被其他任务写入了值,循环就会立即终止,当前任务也就不会再进行后续的计算,从而实现了短路。

shortCircuit(R result) - 设置短路结果

当一个叶子任务(在 doLeaf() 方法中)计算出了一个可以决定全局结果的值时,它会调用这个方法。

// ... existing code ...protected void shortCircuit(R result) {if (result != null)sharedResult.compareAndSet(null, result);}
// ... existing code ...

这个方法非常简单,就是尝试用 CAS 操作将结果写入 sharedResult。第一个成功的任务会“胜出”,后续所有调用 shortCircuit 的任务都会因为 sharedResult 不再是 null 而失败,这保证了结果只会被设置一次。

其它方法

getEmptyResult() - 获取默认结果

这是一个抽象方法,需要子类去实现。它定义了在没有发生短路,并且所有元素都处理完毕后,应该返回的默认结果。

  • 对于 anyMatch,如果遍历完都没找到匹配项,默认结果是 false
  • 对于 allMatch,如果遍历完所有项都匹配,默认结果是 true
  • 对于 findFirst,如果流为空,默认结果是 Optional.empty()

cancel() 和 cancelLaterNodes() - 有序操作的短路

对于像 findFirst 这样的有序操作,仅仅找到一个结果是不够的。我们必须确保找到的是 遭遇顺序(encounter order) 中最靠前的那一个。

cancelLaterNodes() 方法就是为此设计的。当一个任务(比如处理流中 0-100 号元素的任务)找到了一个结果后,它会调用 cancelLaterNodes()。这个方法会沿着任务树向上回溯,并把所有在它“右边”的兄弟任务以及父节点的右兄弟任务的 canceled 标志都设置为 true。这些被取消的任务在 compute 循环的 taskCanceled() 检查点就会提前退出。

这确保了只有遭遇顺序最靠前的任务产生的结果才会被接受,所有处理更后面元素的任务都会被取消。

子类如何使用

像我们之前分析的 MatchTask 和 FindTask 都会继承 AbstractShortCircuitTask。它们需要:

  1. 提供 getEmptyResult() 的具体实现。
  2. 实现 doLeaf() 方法。在这个方法里,它们会处理一小块数据,如果发现了短路条件,就调用 shortCircuit(result)

总结

AbstractShortCircuitTask 是 Java Stream 并行短路操作的基石。它通过一个所有任务共享的 AtomicReference (sharedResult) 作为通信和同步的媒介,构建了一个优雅而高效的并行短路框架。

  • 核心机制:任务在执行前检查 sharedResult,如果已有结果则提前退出。
  • 结果写入:叶子任务通过 shortCircuit() 方法以原子方式写入第一个有效结果。
  • 有序性支持:通过 cancel() 和 cancelLaterNodes() 机制,为 findFirst 等有序操作提供了额外的取消逻辑,保证了结果的正确性。

这个类的设计充分体现了在并行计算中,如何通过共享状态和原子操作来协调多个并发任务,以最高效率达成共同的目标。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/128800.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决本地连接服务器ollama的错误

1. 服务器 ollama 安装 $ curl -fsSL https://ollama.com/install.sh | sh >>> Cleaning up old version at /usr/local/lib/ollama >>> Installing ollama to /usr/local >>> Downloading Linux amd64 bundle ##################################…

Java 8 特性

目录 一、Java 8中Interface接口 二、Lambda表达式 三、函数式接口 Functional Interface 1.常见的内置函数式接口介绍 1.1 Supplier接口--供给型接口 1.2 Consumer接口--消费型接口 1.3 Function接口--转换型接口 1.4 Predicate接口--断言型接口 四、Stream流 1.集合…

RecyclerView 中 ViewHolder

ViewHolder 是 RecyclerView 架构中的核心组件&#xff0c;它作为视图持有者和复用单元&#xff0c;在列表性能和内存优化中扮演着至关重要的角色。一、ViewHolder 的本质与作用1. 核心定义public abstract static class ViewHolder {public final View itemView; // 持有的列…

06.【数据结构-C语言】队列(先进先出,队列实现:入队列、出队列、获取队头or队尾元素,队列实现代码,队列相关题目)

目录 1. 队列的概念及结构 2. 队列的实现&#xff08;链表&#xff09; 2.1 队列的实现方式 2.2 队列结构体声明 2.3 初始化&销毁 2.4 入队列&#xff08;尾插&#xff09; 2.5 出队列&#xff08;头删&#xff09; 2.6 取队头&取队尾 2.7 判空 2.8 获取队列…

学习Java的Day28

今天在昨天完成的留言板项目基础上&#xff0c;我进一步开发了一个酒店房型管理系统。该系统采用MVC架构&#xff0c;主要功能是对酒店房型信息进行增删改查操作。数据库设计方面&#xff0c;我创建了hotel_room_type表&#xff0c;包含以下字段&#xff1a;id&#xff1a;主键…

动态路由菜单:根据用户角色动态生成菜单栏的实践(包含子菜单)

前言在现代后台管理系统中&#xff0c;不同角色的用户通常需要访问不同的功能模块。动态路由菜单技术正是解决这一需求的关键方案。本文将介绍如何基于用户角色实现动态菜单路由&#xff0c;让每个用户登录后只能看到自己权限范围内的菜单项。核心思路实现动态路由菜单的核心流…

【DP】篮球运动

题目描述小明建造了一个篮球场&#xff0c;他请来了2行n列的人&#xff0c;想让他们进行比赛。每一个人都有一个能力值&#xff0c;第一行分别为h11&#xff0c;h12&#xff0c;…&#xff0c;h1n&#xff0c;第二行为h21&#xff0c;h22&#xff0c;…&#xff0c;h2n。现在小…

202506 电子学会青少年等级考试机器人六级器人理论真题

更多内容和历年真题请查看网站&#xff1a;【试卷中心 -----> 电子学会 ----> 机器人技术 ----> 六级】 网站链接 青少年软件编程历年真题模拟题实时更新 2025年6月 青少年等级考试机器人理论真题六级 第 1 题 在windows系统下&#xff0c;下列选项中&#xff0c…

MWFMRf2StdAlnCalClient.dll MWFMReceiverClient.dll MWFMPNACalDataClient.dll MWFMNFClient.dll MWFM

在使用电脑系统时经常会出现丢失找不到某些文件的情况&#xff0c;由于很多常用软件都是采用 Microsoft Visual Studio 编写的&#xff0c;所以这类软件的运行需要依赖微软Visual C运行库&#xff0c;比如像 QQ、迅雷、Adobe 软件等等&#xff0c;如果没有安装VC运行库或者安装…

Ruoyi-Vue-Plus 修改包名、模块名、项目名

效果对比 修改包名 选中需要修改的包名&#xff0c;右键选择重命名&#xff08;快捷键&#xff1a;ShiftfnF6&#xff09; 选择所有目录 选择重构 重构软件包耗时较长&#xff0c;请耐心等待&#xff01;如果需要快速修改多模块中的多级包名&#xff0c;可以参考&#xff1a;…

GPT OSS 双模型上线,百度百舸全面支持快速部署

GPT OSS 是 OpenAI 推出的重量级开放模型&#xff0c;专为强推理能力、智能体任务及多样化开发场景设计&#xff0c;标志着大模型在开放性与实用性上的重要突破。该系列包含两款高性能模型&#xff1a;参数规模为 117B 的 GPT‑OSS‑120B 和 21B 的 GPT‑OSS‑20B。二者皆采用 …

Godot ------ 通过鼠标对节点进行操作

Godot ------ 通过鼠标对节点进行操作 引言 正文 引言 对于一个游戏,通过鼠标对游戏对象进行操作是非常普遍的行为,本文我们将以 Control 节点进行举例,说明如何通过鼠标对 Control 节点进行移动操作。 正文 首先,我们创建一个 Contorl 节点,并将它的 Layout->Trans…

C++虚函数表实现机制以及用C语言对其进行的模拟实现(加入了自己的思考和笔记)

文章目录前言C对象的内存布局没有虚函数的对象拥有仅一个虚函数的类对象拥有多个虚函数的类对象单继承且本身不存在虚函数的继承类的内存布局本身不存在虚函数(不严谨)但存在基类虚函数覆盖的单继承类的内存布局定义了基类没有的虚函数的单继承的类对象布局多继承且存在虚函数覆…

OpenAI GPT-5 深度解析:API Key定价与ChatGPT(Free, Plus, Pro)用户的区别

前言&#xff1a;两年等待&#xff0c;只为这一跃 在科技圈长达两年的屏息期待与无尽猜想之后&#xff0c;2025年8月8日北京时间凌晨&#xff0c;OpenAI终于揭开了其新一代旗舰模型——GPT-5的神秘面纱。这不仅仅是一次常规的产品迭代&#xff0c;更被整个行业视为一块试金石&a…

【Avalonia】无开发者账号使用iOS真机调试跨平台应用

文章目录1. 要求1.1 无需Apple开发者账号1.2 最新版mac系统1.3 最新版Xcode2. 配对Mac3. 配置开发证书3.1 创建一个名为MTClient的Xcode项目3.2 找到签名证书3.3 配置签名3.4 配置标识符4. 真机调试4.1 设置应用首屏 Launch Screen4.2 设置应用图标5. 问题5.1 DI异常该问题的解…

开发手札:UnrealEngine和Unity3d坐标系问题

最近把一套网络模块和一套组件模块从u3d改造到ue4。网络模块通用性很高&#xff0c;毕竟协议都是通用网络协议&#xff0c;改造后没啥问题。但是改造组件模块的时候就遇到了问题。首先&#xff0c;unity3d的坐标系是标准左手坐标系&#xff0c;如下&#xff1a;同时自己的几何算…

Python Gradio 写的-文本情感分析小软件 (不用Html+css+js 可写出网页来)

1. 安装 Gradio首先&#xff0c;需要安装 Gradio 库。打开命令行&#xff0c;执行以下命令&#xff1a;pip install gradio注意&#xff1a;Gradio 需要 Python 3.10 或更高版本。推荐在虚拟环境中安装&#xff0c;避免依赖冲突。2. 创建情感分析应用创建一个 sentiment_analys…

强化学习笔记:从Q学习到GRPO

推荐学习huggingface的强化学习课程&#xff0c;全面了解强化学习的发展史。 以下是个人笔记&#xff0c;内容不一定完整&#xff0c;有些是个人理解。 基于值函数(value function)的强化学习 基于值函数(value function)的强化学习&#xff1a;学习的是一个值函数&#xff0…

X4000 私有 5G 实验室入门套件

私有 5G 实验室入门套件简介技术规格5G核心网5G 接入网简介 入门级紧凑型私人 5G 实验室入门套件提供了经济高效的专用 5G 小型基站&#xff0c;非常适合在安全受控的室内环境中进行 5G 培训、开发、测试和演示&#xff0c;从而实现 5G 应用和物联网设备的连接。 东枫科技 是…

Openlayers基础教程|从前端框架到GIS开发系列课程(19)地图控件和矢量图形绘制

1. 地图控件本篇教程主要介绍以下地图控件&#xff1a;视图跳转控件放大缩小控件全屏控件实现步骤1. 初始化地图上一篇已经介绍了&#xff0c;这一篇直接跳过该步骤。2. 视图跳转控件/* 视图跳转控件 */const ZoomToExtent new ol.control.ZoomToExtent({ extent: [110, 30, …
推荐文章