PHP数据管道与流式计算框架 PHP数据管道与流式计算框架数据管道是数据处理的核心模式。PHP可以实现简单的流式计算框架对数据流进行实时处理。今天说说PHP数据管道的设计。数据管道的核心是各阶段的处理节点。数据从输入节点流入经过多个处理节点最终从输出节点流出。phpinterface PipelineNode{public function process(mixed $data): mixed;public function setName(string $name): void;public function getName(): string;}class InputNode implements PipelineNode{private string $name input;public function setName(string $name): void{$this-name $name;}public function getName(): string{return $this-name;}public function process(mixed $data): mixed{echo 输入: . json_encode($data) . \n;return $data;}}class FilterNode implements PipelineNode{private string $name filter;private callable $predicate;public function __construct(callable $predicate){$this-predicate $predicate;}public function setName(string $name): void{$this-name $name;}public function getName(): string{return $this-name;}public function process(mixed $data): mixed{$result ($this-predicate)($data);echo 过滤: . ($result ? 通过 : 丢弃) . \n;return $result ? $data : null;}}class TransformNode implements PipelineNode{private string $name transform;private callable $transformer;public function __construct(callable $transformer){$this-transformer $transformer;}public function setName(string $name): void{$this-name $name;}public function getName(): string{return $this-name;}public function process(mixed $data): mixed{$result ($this-transformer)($data);echo 转换: . json_encode($data) . - . json_encode($result) . \n;return $result;}}class AggregateNode implements PipelineNode{private string $name aggregate;private array $buffer [];private int $batchSize;private callable $aggregator;public function __construct(int $batchSize, callable $aggregator){$this-batchSize $batchSize;$this-aggregator $aggregator;}public function setName(string $name): void{$this-name $name;}public function getName(): string{return $this-name;}public function process(mixed $data): mixed{$this-buffer[] $data;if (count($this-buffer) $this-batchSize) {$result ($this-aggregator)($this-buffer);$this-buffer [];echo 聚合: . json_encode($result) . \n;return $result;}return null;}public function flush(): mixed{if (empty($this-buffer)) return null;$result ($this-aggregator)($this-buffer);$this-buffer [];return $result;}}class OutputNode implements PipelineNode{private string $name output;public function setName(string $name): void{$this-name $name;}public function getName(): string{return $this-name;}public function process(mixed $data): mixed{echo 输出: . json_encode($data) . \n;return $data;}}class DataPipeline{private array $nodes [];public function addNode(PipelineNode $node): void{$this-nodes[] $node;}public function process(mixed $data): array{$results [];foreach ($data as $item) {$current $item;foreach ($this-nodes as $node) {$current $node-process($current);if ($current null) break;}if ($current ! null) {$results[] $current;}}// 处理聚合节点中剩余的数据foreach ($this-nodes as $node) {if ($node instanceof AggregateNode) {$remaining $node-flush();if ($remaining ! null) {$results[] $remaining;}}}return $results;}}// 创建管道$pipeline new DataPipeline();$pipeline-addNode(new InputNode());$pipeline-addNode(new FilterNode(fn($item) $item[status] paid));$pipeline-addNode(new TransformNode(fn($item) [order_id $item[order_id],amount $item[amount],user $item[user],final_amount $item[amount] * 0.95,]));$pipeline-addNode(new OutputNode());$orders [[order_id 1, amount 100, user 张三, status paid],[order_id 2, amount 200, user 李四, status unpaid],[order_id 3, amount 300, user 王五, status paid],];echo 开始处理订单数据...\n;$results $pipeline-process($orders);echo \n处理完成共 . count($results) . 条结果\n;?窗口计算是流式计算的常见模式phpclass WindowCalculator{private array $window [];private int $windowSize;public function __construct(int $windowSize 5){$this-windowSize $windowSize;}public function add(mixed $value): void{$this-window[] $value;if (count($this-window) $this-windowSize) {array_shift($this-window);}}public function sum(): float{return array_sum($this-window);}public function average(): float{if (empty($this-window)) return 0;return array_sum($this-window) / count($this-window);}public function max(): float{return max($this-window);}public function min(): float{return min($this-window);}public function count(): int{return count($this-window);}public function reset(): void{$this-window [];}}$calculator new WindowCalculator(3);// 模拟流式数据$data [10, 20, 30, 40, 50, 60, 70, 80];echo 滑动窗口计算:\n;foreach ($data as $i $value) {$calculator-add($value);echo 添加 {$value}: 平均{$calculator-average()}, 和{$calculator-sum()}\n;}?PHP的数据管道实现虽然不如Kafka Streams或Flink专业但在数据量不大的场景下完全够用。管道模式让数据处理逻辑模块化每个节点各司其职便于测试和维护。适合日志处理、数据清洗、实时计算等场景。