Flink编程模型与API(四) Transformation 类算子是 Apache Flink 中用于定义数据流处理的基本构建块。它们允许对DataStream数据流进行转换和操作包括数据转换、数据操作和数据重组,通过Transformation类算子可以对输入数据流进行映射、过滤、聚合等操作生成新的DataStream数据流作为输出以满足特定的处理需求。下面分别介绍Flink中常见的Transformation类算子。mapmap用于对输入的DataStream数据流中的每个元素进行映射操作,它接受一个函数作为参数该函数将每个输入元素转换为一个新的元素并生成一个新的数据流作为输出。DataStream类型数据通过map函数进行数据转换后还会得到DataStream类型其中数据格式可能会发生变化。 下图演示将输入数据集中的每个数值全部加1处理经过map算子转换后输出到下游数据集。flatMapflatMap算子用于对输入的DataStream中的每个元素进行扁平化映射操作的算子它接受一个函数作为参数该函数将每个输入元素转换为零个或多个新的元素并生成一个新的DataStream数据流作为输出。DataStream类型数据通过map函数进行数据转换后还会得到DataStream类型其中数据格式可能会发生变化。与map算子不同flatMap算子可以生成比输入更多的元素因此可以用于扁平化操作。下图表示通过flatMap算子对输入数据集中每行数据按照逗号分割得到新的数据流输出到下游。FilterkeyByKeyBy算子用于将输入的DataStream按照指定的键或键选择器函数进行分组操作它接受一个键选择器函数作为参数该函数根据输入元素返回一个键用于将数据流中的元素分组到不同的分区中相同键的元素分配到同一个分区中以便后续的操作可以基于键对数据进行聚合、合并或其他操作。KeyBy算子使用时可以通过KeySelector函数来指定key键DataStream通过KeyBy算子处理后得到的是KeyedStream对象该对象也是DataStream。默认KeyBy算子会对数据流中指定的key键的hash值与Flink分区数并行度进行取模运算从而决定该条数据后续被哪个并行度处理如果Flink DataStream类型是POJOs类型需要在该类型中重写hashCode方法否则后续不能正确的将相同数据进行分组处理。下图表示通过KeyBy算子将DataStream中的数据按照指定的key进行分组统计value总和。AggregationsAggregations聚合函数是Flink中用于对输入数据进行聚合操作的函数集合它们可以应用于KeyedStream上将一组输入元素聚合为一个输出元素。Flink提供了多种聚合函数包括sum、min、minBy、max、maxBy,这些函数都是常见的聚合操作作用如下sum针对输入keyedStream对指定列进行sum求和操作。min针对输入keyedStream对指定列进行min最小值操作结果流中其他列保持最开始第一条数据的值。minBy同min类似对指定的字段进行min最小值操作minBy返回的是最小值对应的整个对象。max针对输入keyedStream对指定列进行max最大值操作结果流中其他列保持最开始第一条数据的值。maxBy:同max类似对指定的字段进行max最大值操作maxBy返回的是最大值对应的整个对象。Java代码实现Java代码和Scala代码执行后结果如下:# sum执行结果 StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343077146, duration120} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343077146, duration150} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343077146, duration200} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343077146, duration290} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343077146, duration590} # min 执行结果 StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343412282, duration120} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343412282, duration30} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343412282, duration30} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343412282, duration30} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343412282, duration30} # minBy 执行结果 StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343474909, duration120} StationLog{sidsid1, callOut18600000001, callIn18600000002, callTypefail, callTime1685343474909, duration30} StationLog{sidsid1, callOut18600000001, callIn18600000002, callTypefail, callTime1685343474909, duration30} StationLog{sidsid1, callOut18600000001, callIn18600000002, callTypefail, callTime1685343474909, duration30} StationLog{sidsid1, callOut18600000001, callIn18600000002, callTypefail, callTime1685343474909, duration30} # max 执行结果 StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343523009, duration120} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343523009, duration120} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343523009, duration120} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343523009, duration120} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343523009, duration300} # maxBy 执行结果 StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343559342, duration120} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343559342, duration120} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343559342, duration120} StationLog{sidsid1, callOut18600000000, callIn18600000001, callTypesuccess, callTime1685343559342, duration120} StationLog{sidsid1, callOut18600000004, callIn18600000005, callTypesuccess, callTime1685343559342, duration300}reduceunionunion算子是Flink流处理框架中数据流合并算子可以将多个输入的DataStream多个数据流进行合并并输出一个新的DataStream数据流作为结果适用于需要将多个数据流合并为一个流的场景。需要注意的是union合并的数据流类型必须相同合并之后的数据流包含两个或多个流中所有元素并且数据类型不变。下图表示将两个流进行合并得到合并后的结果流并将结果输出到下游。connectconnect算子将两个输入的DataStream数据流作为参数将两个不同数据类型的DataStream数据流连接在一起生成一个ConnectedStreams对象作为结果与union算子不同union只是简单的将两个类型一样的流合并在一起而connect算子可以将不同类型的DataStream连接在一起并且connect只能连接两个流。connect生成的结果保留了两个输入流的类型信息例如dataStream1数据集为(String, Int)元祖类型dataStream2数据集为Int类型通过connect连接算子将两个不同数据类型的流结合在一起其内部数据为[(String, Int), Int]的混合数据类型保留了两个原始数据集的数据类型。对于连接后的数据流可以使用map、flatMap、process等算子进行操作但内部方法使用的是CoMapFunction、CoFlatMapFunction、CoProcessFunction等函数来进行处理这些函数称作“协处理函数”分别接收两个输入流中的元素并生成一个新的数据流作为输出输出结果DataStream类型保持一致。Java代码实现iterateiterate算子用于实现迭代计算的算子它允许对输入的DataStream进行多次迭代操作直到迭代条件不满足时迭代停止该算子适合迭代计算场景例如机器学习中往往会对损失函数进行判断是否到达某个精度来判断训练是否需要结束就可以使用该算子来完成。