Flink编程模型与API(一) 针对Flink的编程模型与API进行讲解主要基于DataStream API 进行编程学习Flink编程方式处理数据流程以及转换处理本章节中涉及到的代码实现使用Java和Scala两种语言来实现。Flink APIStateful Stream Processing底层的状态流处理API的抽象程度最低而且只能用于流处理提供了非常灵活的接口可以用于自定义底层与状态、时间相关的操作。DataSteam/DataSet API这一层级的API是Flink中的核心API这一层级中要处理的数据会被抽象成数据流DataStream或数据集DataSet,然后在其上通过定义转换操作实现业务逻辑例如map/flatMap/window/keyby/sum/join等这一层级API的使用风格与Java 8中的Stream使用风格十分类似。Table API在DataStream/DataSet API 之上是Table API Table API和DataStream/DataSet API不同不是用复杂的函数定义业务流程的而是用陈述性的语言加以描述这样就大大降低编程难度增强描述性。这种语言来着SQL语法只不过以API的形式呈现出来既然有了Table API ,那么自然可以直接使用SQL来进行描述这就是最上层的SQL。SQLFlink提供的最高层级的抽象是SQL这一层抽象在语法与表达能力上与Table API 类似SQL抽象与Table API交互密切同时SQL查询可以直接在Table API定义的表上执行。总而言之越上层的API其描述性和可阅读性越强越下层API其灵活度高、表达力越强多数时候上层API能做到的事情下层API也能做到反过来未必不过这些API的底层模型是一致的可以混合使用。Flink架构可以处理批和流Flink 批处理数据需要使用到Flink中的DataSet API此API主要是支持Flink针对批数据进行操作本质上Flink处理批数据也是看成一种特殊的流处理有界流所以没有必要分成批和流两套API从Flink1.12版本往后Dataset API 已经标记为Legacy(已过时)已被官方软弃用官方建议使用Table API 或者SQL 来处理批数据我们也可以使用带有Batch执行模式的DataStream API来处理批数据DataSet和DataStream API做到了合并在未来Flink版本中DataSet API 将会被删除。DataStream API的学习对于理解Flink数据处理流程非常方便上手相对来说比较容易下面我们先从核心API层开始学习对于底层API、Table API、SQL部分在后续章节在做介绍。Flink编程模型代码编写流程我们知道DataStream的编程模型包括以下几个部分Environment、DataSource、Transformation、DataSink、触发执行。nvironment是编写Flink程序的基础不同层级API编程中创建的Environment环境不同如Dataset 编程中需要创建ExecutionEnvironmentDataStream编程中需要创建StreamExecutionEnvironment在Table和SQL API中需要创建TableExecutionEnvironment使用不同语言编程导入的包也不同在获取到对应的Environment后我们还可以进行外参数的配置例如并行度、容错机制设置等。DataSource部分主要定义了数据接入功能主要是将外部数据接入到Flink系统中并转换成DataStream对象供后续的转换使用。Transformation部分有各种各样的算子操作可以对DataStream流进行转换操作最终将转换结果数据通过DataSink写出到外部存储介质中例如文件、数据库、Kafka消息系统等。在DataStream编程中编写完成DataSink代码后并不意味着程序结束由于Flink是基于事件驱动处理的有一条数据时就会进行处理所以最后一定要使用Environment.execute()来触发程序执行。Flink数据类型在Flink内部处理数据时涉及到数据的网络传输、数据的序列化及反序列化Flink需要知道操作的数据类型为了能够在分布式计算过程中对数据的类型进行管理和判断Flink中定义了TypeInformation来对数据类型进行描述通过TypeInfomation能够在数据处理之前将数据类型推断出来而不是真正在触发计算后才识别出这样可以有效避免用户在编写Flink应用的过程出现数据类型问题。常用的TypeInformation有BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo类等针对这些常用TypeInfomation介绍如下Flink通过实现BasicTypeInfo数据类型能够支持任意Java原生基本或装箱类型和String类型例如Integer,String,Double等除了BasicTypeInfo外类似的还有BasicArrayTypeInfo支持Java中数组和集合类型通过定义TupleTypeInfo来支持Tuple类型的数据通过CaseClassTypeInfo支持Scala Case Class PojoTypeInfo可以识别任意的POJOs类,包括Java和Scala类POJOs可以完成复杂数据架构的定义但是在Flink中使用POJOs数据类型需要满足以下要求:POJOs类必须是Public修饰且独立定义不能是内部类POJOs 类中必须含有默认空构造器POJOs类中所有的Fields必须是Public或者具有Public修饰的getter和Setter方法在使用Java API开发Flink应用时通常情况下Flink都能正常进行数据类型推断进而选择合适的serializers以及comparators但是在定义函数时如果使用到了泛型JVM就会出现类型擦除的问题Flink就获取不到对应的类型信息这就需要借助类型提示Type Hints来告诉系统函数中传入的参数类型信息和输出类型进而对数据类型进行推断处理。如Flink序列化机制在两个进程进行远程通信时它们需要将各种类型的数据以二进制序列的形式在网络上传输数据发送方需要将对象转换为字节序列进行序列化而接收方则将字节序列恢复为各种对象进行反序列化。对象的序列化有两个主要用途一是将对象的字节序列永久保存到硬盘上通常存放在文件中二是在网络上传输对象的字节序列。序列化的好处包括减少数据在内存和硬盘中的占用空间减少网络传输开销精确推算内存使用情况降低垃圾回收的频率。Flink序列化机制负责在节点之间传输数据时对数据对象进行序列化和反序列化确保数据的正确性和一致性。Flink提供了多种序列化器包括Kryo、Avro和Java序列化器等大多数情况下用户不用担心flink的序列化框架Flink会通过TypeInfomation在数据处理之前推断数据类型进而使用对应的序列化器例如针对标准类型int,double,long,string直接由Flink自带的序列化器处理其他类型默认会交给Kryo处理。但是对于Kryo仍然无法处理的类型可以采取以下两种解决方案public class Student { public Integer id; public String name; public Integer age; public Student() { } public Student(Integer id, String name, Integer age) { this.id id; this.name name; this.age age; } Override public String toString() { return Student{ id id , name name \ , age age }; } } public class StudentSerializer extends Serializer { Override public void write(Kryo kryo, Output output, Object o) { Student student (Student) o; output.writeInt(student.id); output.writeString(student.name); output.writeInt(student.age); } Override public Object read(Kryo kryo, Input input, Class aClass) { Student student new Student(); student.id input.readInt(); student.name input.readString(); student.age input.readInt(); return student; } }