作者codestats| 架构设计思想分享者擅长用最简单的示例复现最核心的架构设计思想 写在前面Stream API 很多人每天都在用但能说清楚它“为什么这样设计”的人不多。今天我们从零手写一个极简版Stream不依赖任何黑魔法。当你亲手实现过一遍再看源码会发现——原来就这么回事。一、先问一个问题Stream到底解决了什么痛点先看一段“朴素”代码java// 需求找出年龄18的男性用户取前10个名字 ListUser users getUsers(); ListString result new ArrayList(); int count 0; for (User user : users) { if (user.getAge() 18 男.equals(user.getGender())) { result.add(user.getName()); count; if (count 10) break; } }这段代码有什么问题问题表现 代码冗长循环、判断、计数、边界混在一起 意图不清需要3秒才能看出“过滤映射截断”的逻辑 难以扩展想加排序再加循环想并行改到崩溃 中间集合每步都产生新List内存浪费再看Stream版本javaListString result users.stream() .filter(u - u.getAge() 18 男.equals(u.getGender())) .map(User::getName) .limit(10) .collect(Collectors.toList());高下立判。但问题来了——它是怎么做到的二、核心设计思想一句话总结把“做什么”和“怎么做”分离。你只管声明操作Stream内部管理迭代、短路、求值时机。四大关键词关键词含义 函数式编程把行为Predicate、Function当参数传递 流水线模式操作不立即执行而是串成一条链 惰性求值只有终端操作forEach/collect才真正执行 内部迭代开发者不用写for循环Stream替你迭代三、最精简复现从零实现MiniStream3.1 定义函数式接口简化JDKjava// 断言过滤条件 interface MyPredicateT { boolean test(T t); } // 映射类型转换 interface MyFunctionT, R { R apply(T t); } // 消费最终操作 interface MyConsumerT { void accept(T t); }3.2 核心Stream接口javainterface MyStreamT { // 中间操作返回Stream支持链式 MyStreamT filter(MyPredicateT predicate); R MyStreamR map(MyFunctionT, R mapper); // 终端操作触发真正计算 void forEach(MyConsumerT action); }关键点filter和map返回的还是MyStream所以能链式调用。但它们没有真正执行只是在“记账”。3.3 最核心的实现类javaclass MyStreamImplT implements MyStreamT { private final ListT source; // 数据源 private final MyPipelineStageT, T stage; // 操作链 public static T MyStreamT stream(ListT list) { return new MyStreamImpl(list, null); } private MyStreamImpl(ListT source, MyPipelineStageT, T stage) { this.source source; this.stage stage; } Override public MyStreamT filter(MyPredicateT predicate) { // 记录过滤操作不执行 MyPipelineStageT, T newStage (value, sink) - { if (predicate.test(value)) sink.accept(value); }; return new MyStreamImpl(source, combine(newStage)); } Override public R MyStreamR map(MyFunctionT, R mapper) { // 记录映射操作不执行 MyPipelineStageR, T newStage (value, sink) - { R result mapper.apply(value); sink.accept(result); }; return new MyStreamImpl(source, combine(newStage)); } Override public void forEach(MyConsumerT action) { // 终端方法终于执行了 MyConsumerT finalSink action; if (stage ! null) { finalSink value - stage.process(value, action); } for (T item : source) { finalSink.accept(item); } } // 合并两个阶段构建责任链 private MyPipelineStage combine(MyPipelineStage newStage) { if (stage null) return newStage; return (value, sink) - stage.process(value, v - newStage.process(v, sink)); } } // 流水线阶段接口核心中的核心 interface MyPipelineStageT, R { void process(T input, MyConsumerR sink); }3.4 执行流程图解text┌─────────────────────────────────────────────────────────────┐ │ 【构建阶段 - 啥都没执行】 │ └─────────────────────────────────────────────────────────────┘ users.stream() ──→ new MyStreamImpl(source) │ ├─→ filter(...) ──→ 记录 PipelineStage{ if(条件) 传给下游 } │ (返回新Stream包装这个Stage) │ └─→ map(...) ──→ 记录 PipelineStage{ 转换类型传给下游 } (再次包装) 此时数据还在source里一动不动 ┌─────────────────────────────────────────────────────────────┐ │ 【触发阶段 - forEach才动真格】 │ └─────────────────────────────────────────────────────────────┘ forEach(action) 被调用 │ ├─→ 把最终的action当作sink │ ├─→ 从最外层的stage开始反向构建调用链 │ map的stage 调用 → filter的stage调用 → action │ └─→ for循环遍历source每个元素走一遍上面的链 具体数据流以元素 5 为例 source中的5 ↓ map stage (5 → 25) ↓ filter stage (2510? ✓) ↓ action (打印 25)四、为什么能实现惰性求值问题答案❓ 为什么filter/map不执行它们只是组装了一个函数PipelineStage没有调用❓ 什么时候执行forEach触发时才把函数应用到每个元素上❓ 为什么没有中间集合元素是流式处理5进去→过滤→映射→打印一气呵成五、跑起来看看javapublic class Demo { public static void main(String[] args) { ListInteger numbers Arrays.asList(1, 2, 3, 4, 5); MyStreamImpl.stream(numbers) .filter(n - n % 2 0) // 偶数 .map(n - n * n) // 平方 .forEach(n - System.out.print(n )); } } // 输出4 16执行过程逐行拆解步骤操作状态1stream(numbers)创建MyStreamImplsource[1,2,3,4,5]2filter(偶数)记录检查偶数的Stage3map(平方)记录平方的Stage合并成先过滤再平方4forEach(打印)遍历source每个元素走链条六、设计思想对照表原始问题MiniStream如何解决代码冗长只需声明filter/map/forEach不写for/if意图不明确.filter().map().forEach()自解释中间集合流式处理一个元素走完全程才处理下一个难以并行替换forEach里的for为多线程分片使用者无感知七、完整代码复制可运行javaimport java.util.*; // 函数式接口 interface MyPredicateT { boolean test(T t); } interface MyFunctionT, R { R apply(T t); } interface MyConsumerT { void accept(T t); } // 流水线阶段 interface MyPipelineStageT, R { void process(T input, MyConsumerR sink); } // Stream接口 interface MyStreamT { MyStreamT filter(MyPredicateT predicate); R MyStreamR map(MyFunctionT, R mapper); void forEach(MyConsumerT action); } // Stream实现 class MyStreamImplT implements MyStreamT { private final ListT source; private final MyPipelineStageT, T stage; public static T MyStreamT stream(ListT list) { return new MyStreamImpl(list, null); } private MyStreamImpl(ListT source, MyPipelineStageT, T stage) { this.source source; this.stage stage; } Override public MyStreamT filter(MyPredicateT predicate) { MyPipelineStageT, T newStage (value, sink) - { if (predicate.test(value)) sink.accept(value); }; return new MyStreamImpl(source, combine(newStage)); } Override public R MyStreamR map(MyFunctionT, R mapper) { MyPipelineStageR, T newStage (value, sink) - { R result mapper.apply(value); sink.accept(result); }; return new MyStreamImpl(source, combine(newStage)); } Override public void forEach(MyConsumerT action) { MyConsumerT finalAction action; if (stage ! null) { finalAction value - stage.process(value, action); } for (T item : source) { finalAction.accept(item); } } SuppressWarnings(unchecked) private R MyPipelineStage combine(MyPipelineStage newStage) { if (stage null) return newStage; return (value, sink) - stage.process((T) value, v - newStage.process(v, sink)); } } // 测试 public class StreamApiCoreDemo { public static void main(String[] args) { ListInteger nums Arrays.asList(1, 2, 3, 4, 5, 6); System.out.println( 平方后10的数); MyStreamImpl.stream(nums) .map(n - n * n) .filter(n - n 10) .forEach(n - System.out.print(n )); // 输出16 25 36 System.out.println(\n\n 偶数且大于2); MyStreamImpl.stream(nums) .filter(n - n % 2 0) .filter(n - n 2) .forEach(n - System.out.print(n )); // 输出4 6 } } 最后Stream API 没有魔法。它的本质是你把filter、map这些操作组装成一条流水线但机器没开。直到你按下forEach或collect这个“启动按钮”数据才开始从流水线上一个个流过每个零件处理完立刻传给下一个。这就是惰性求值 流水线。善于用最简单的示例复现最核心的思想从整体上把握它“在干什么”才能真正理解它。今天你亲手实现了一个最简版——以后再看Stream源码心里应该有底了。点赞 让更多人看到收藏 ⭐方便后续研究评论 分享你的想法或尝试经验作者CodeStats-CSDN博客擅长用最简单的示例复现最核心的架构设计思想如果这篇文章对你有帮助欢迎点赞、收藏、关注。下篇预告Collector的设计与groupingBy实现。
理解Java Stream API核心设计思想,看这一篇就懂90%了
发布时间:2026/6/4 13:24:25
作者codestats| 架构设计思想分享者擅长用最简单的示例复现最核心的架构设计思想 写在前面Stream API 很多人每天都在用但能说清楚它“为什么这样设计”的人不多。今天我们从零手写一个极简版Stream不依赖任何黑魔法。当你亲手实现过一遍再看源码会发现——原来就这么回事。一、先问一个问题Stream到底解决了什么痛点先看一段“朴素”代码java// 需求找出年龄18的男性用户取前10个名字 ListUser users getUsers(); ListString result new ArrayList(); int count 0; for (User user : users) { if (user.getAge() 18 男.equals(user.getGender())) { result.add(user.getName()); count; if (count 10) break; } }这段代码有什么问题问题表现 代码冗长循环、判断、计数、边界混在一起 意图不清需要3秒才能看出“过滤映射截断”的逻辑 难以扩展想加排序再加循环想并行改到崩溃 中间集合每步都产生新List内存浪费再看Stream版本javaListString result users.stream() .filter(u - u.getAge() 18 男.equals(u.getGender())) .map(User::getName) .limit(10) .collect(Collectors.toList());高下立判。但问题来了——它是怎么做到的二、核心设计思想一句话总结把“做什么”和“怎么做”分离。你只管声明操作Stream内部管理迭代、短路、求值时机。四大关键词关键词含义 函数式编程把行为Predicate、Function当参数传递 流水线模式操作不立即执行而是串成一条链 惰性求值只有终端操作forEach/collect才真正执行 内部迭代开发者不用写for循环Stream替你迭代三、最精简复现从零实现MiniStream3.1 定义函数式接口简化JDKjava// 断言过滤条件 interface MyPredicateT { boolean test(T t); } // 映射类型转换 interface MyFunctionT, R { R apply(T t); } // 消费最终操作 interface MyConsumerT { void accept(T t); }3.2 核心Stream接口javainterface MyStreamT { // 中间操作返回Stream支持链式 MyStreamT filter(MyPredicateT predicate); R MyStreamR map(MyFunctionT, R mapper); // 终端操作触发真正计算 void forEach(MyConsumerT action); }关键点filter和map返回的还是MyStream所以能链式调用。但它们没有真正执行只是在“记账”。3.3 最核心的实现类javaclass MyStreamImplT implements MyStreamT { private final ListT source; // 数据源 private final MyPipelineStageT, T stage; // 操作链 public static T MyStreamT stream(ListT list) { return new MyStreamImpl(list, null); } private MyStreamImpl(ListT source, MyPipelineStageT, T stage) { this.source source; this.stage stage; } Override public MyStreamT filter(MyPredicateT predicate) { // 记录过滤操作不执行 MyPipelineStageT, T newStage (value, sink) - { if (predicate.test(value)) sink.accept(value); }; return new MyStreamImpl(source, combine(newStage)); } Override public R MyStreamR map(MyFunctionT, R mapper) { // 记录映射操作不执行 MyPipelineStageR, T newStage (value, sink) - { R result mapper.apply(value); sink.accept(result); }; return new MyStreamImpl(source, combine(newStage)); } Override public void forEach(MyConsumerT action) { // 终端方法终于执行了 MyConsumerT finalSink action; if (stage ! null) { finalSink value - stage.process(value, action); } for (T item : source) { finalSink.accept(item); } } // 合并两个阶段构建责任链 private MyPipelineStage combine(MyPipelineStage newStage) { if (stage null) return newStage; return (value, sink) - stage.process(value, v - newStage.process(v, sink)); } } // 流水线阶段接口核心中的核心 interface MyPipelineStageT, R { void process(T input, MyConsumerR sink); }3.4 执行流程图解text┌─────────────────────────────────────────────────────────────┐ │ 【构建阶段 - 啥都没执行】 │ └─────────────────────────────────────────────────────────────┘ users.stream() ──→ new MyStreamImpl(source) │ ├─→ filter(...) ──→ 记录 PipelineStage{ if(条件) 传给下游 } │ (返回新Stream包装这个Stage) │ └─→ map(...) ──→ 记录 PipelineStage{ 转换类型传给下游 } (再次包装) 此时数据还在source里一动不动 ┌─────────────────────────────────────────────────────────────┐ │ 【触发阶段 - forEach才动真格】 │ └─────────────────────────────────────────────────────────────┘ forEach(action) 被调用 │ ├─→ 把最终的action当作sink │ ├─→ 从最外层的stage开始反向构建调用链 │ map的stage 调用 → filter的stage调用 → action │ └─→ for循环遍历source每个元素走一遍上面的链 具体数据流以元素 5 为例 source中的5 ↓ map stage (5 → 25) ↓ filter stage (2510? ✓) ↓ action (打印 25)四、为什么能实现惰性求值问题答案❓ 为什么filter/map不执行它们只是组装了一个函数PipelineStage没有调用❓ 什么时候执行forEach触发时才把函数应用到每个元素上❓ 为什么没有中间集合元素是流式处理5进去→过滤→映射→打印一气呵成五、跑起来看看javapublic class Demo { public static void main(String[] args) { ListInteger numbers Arrays.asList(1, 2, 3, 4, 5); MyStreamImpl.stream(numbers) .filter(n - n % 2 0) // 偶数 .map(n - n * n) // 平方 .forEach(n - System.out.print(n )); } } // 输出4 16执行过程逐行拆解步骤操作状态1stream(numbers)创建MyStreamImplsource[1,2,3,4,5]2filter(偶数)记录检查偶数的Stage3map(平方)记录平方的Stage合并成先过滤再平方4forEach(打印)遍历source每个元素走链条六、设计思想对照表原始问题MiniStream如何解决代码冗长只需声明filter/map/forEach不写for/if意图不明确.filter().map().forEach()自解释中间集合流式处理一个元素走完全程才处理下一个难以并行替换forEach里的for为多线程分片使用者无感知七、完整代码复制可运行javaimport java.util.*; // 函数式接口 interface MyPredicateT { boolean test(T t); } interface MyFunctionT, R { R apply(T t); } interface MyConsumerT { void accept(T t); } // 流水线阶段 interface MyPipelineStageT, R { void process(T input, MyConsumerR sink); } // Stream接口 interface MyStreamT { MyStreamT filter(MyPredicateT predicate); R MyStreamR map(MyFunctionT, R mapper); void forEach(MyConsumerT action); } // Stream实现 class MyStreamImplT implements MyStreamT { private final ListT source; private final MyPipelineStageT, T stage; public static T MyStreamT stream(ListT list) { return new MyStreamImpl(list, null); } private MyStreamImpl(ListT source, MyPipelineStageT, T stage) { this.source source; this.stage stage; } Override public MyStreamT filter(MyPredicateT predicate) { MyPipelineStageT, T newStage (value, sink) - { if (predicate.test(value)) sink.accept(value); }; return new MyStreamImpl(source, combine(newStage)); } Override public R MyStreamR map(MyFunctionT, R mapper) { MyPipelineStageR, T newStage (value, sink) - { R result mapper.apply(value); sink.accept(result); }; return new MyStreamImpl(source, combine(newStage)); } Override public void forEach(MyConsumerT action) { MyConsumerT finalAction action; if (stage ! null) { finalAction value - stage.process(value, action); } for (T item : source) { finalAction.accept(item); } } SuppressWarnings(unchecked) private R MyPipelineStage combine(MyPipelineStage newStage) { if (stage null) return newStage; return (value, sink) - stage.process((T) value, v - newStage.process(v, sink)); } } // 测试 public class StreamApiCoreDemo { public static void main(String[] args) { ListInteger nums Arrays.asList(1, 2, 3, 4, 5, 6); System.out.println( 平方后10的数); MyStreamImpl.stream(nums) .map(n - n * n) .filter(n - n 10) .forEach(n - System.out.print(n )); // 输出16 25 36 System.out.println(\n\n 偶数且大于2); MyStreamImpl.stream(nums) .filter(n - n % 2 0) .filter(n - n 2) .forEach(n - System.out.print(n )); // 输出4 6 } } 最后Stream API 没有魔法。它的本质是你把filter、map这些操作组装成一条流水线但机器没开。直到你按下forEach或collect这个“启动按钮”数据才开始从流水线上一个个流过每个零件处理完立刻传给下一个。这就是惰性求值 流水线。善于用最简单的示例复现最核心的思想从整体上把握它“在干什么”才能真正理解它。今天你亲手实现了一个最简版——以后再看Stream源码心里应该有底了。点赞 让更多人看到收藏 ⭐方便后续研究评论 分享你的想法或尝试经验作者CodeStats-CSDN博客擅长用最简单的示例复现最核心的架构设计思想如果这篇文章对你有帮助欢迎点赞、收藏、关注。下篇预告Collector的设计与groupingBy实现。