Esper——快速开始 Esper快速开始1、简介2、EPL2、快速开始2.1、引入依赖2.2、提供输入事件信息2.3、编译EPL2.4、运行时2.5、发送事件2.6、示例3、温控示例1、简介Esper 编译器和运行时环境的开发旨在满足分析和响应事件的应用程序的需求。一些典型的应用程序示例包括业务流程管理和自动化流程监控、业务自动化管理 (BAM)、异常报告金融算法交易、欺诈检测、风险管理网络和应用程序监控入侵检测、服务级别协议 (SLA) 监控传感器网络应用程序RFID 读取、生产线调度和控制、空中交通这些应用程序的共同之处在于需要实时或近实时地处理事件或消息。这有时被称为复杂事件处理 (CEP) 和事件序列分析。此类应用程序的关键考虑因素是吞吐量、延迟以及所需逻辑的复杂性。高吞吐量 - 处理大量消息的应用程序每秒 1,000 到 100,000 条消息低延迟 - 对发生的状况做出实时响应的应用程序几毫秒到几秒复杂计算 - 检测事件模式事件关联、过滤事件、聚合事件的时间或长度窗口、连接事件序列、基于事件缺失触发等的应用程序2、EPLEsper 是一种语言、一个语言编译器和一个运行时环境。Esper 语言是事件处理语言 (EPL)。它是一种声明式、面向数据的语言用于处理高频的基于时间的事件数据。EPL 符合 SQL-92标准并进行了扩展可以分析一系列事件及其时间关系。Esper 编译器将 EPL 源代码编译成 Java 虚拟机 (JVM) 字节码以便生成的执行代码可以在 Esper 运行时环境中的 JVM 上运行。Esper 运行时运行在 JVM 之上。您可以使用 Esper 运行时运行 Esper 编译器生成的字节码。Esper 的架构与其他编译成 JVM 字节码的编程语言类似例如 Scala、Clojure 和 Kotlin。但 Esper EPL 并非命令式过程式编程语言。Esper 语言是一种事件处理语言 (EPL)专为复杂事件处理和流式分析而设计。EPL 由模块组成。编译器会将模块编译成字节码。我们使用术语“模块”来指代 EPL 源代码单元。一个模块由语句组成。语句是用于执行事件和时间分析的声明性代码。大多数语句采用“select … from …”的形式。我们使用术语“语句”来指代构成模块的每个声明性代码单元。您的应用程序通过回调或迭代语句的当前结果来接收语句的输出。一条语句可以声明如下所示的 EPL 对象事件类型定义流类型信息可通过创建模式或配置添加。变量是自由格式的值容器可通过创建变量或配置添加。命名窗口是可共享的命名数据窗口可通过创建窗口添加。表是可共享的组织行包含简单列、聚合列和复杂列可通过创建表添加。上下文定义分析生命周期可通过创建上下文添加。表达式和脚本是可重用的表达式可通过创建表达式添加。索引组织数据。使用诸如 private、protected 和 public 之类的访问修饰符来控制对 EPL 对象的访问。模块可以选择性地拥有模块名称。模块名称的作用类似于编程语言中的包名或命名空间名称。模块名称用于组织 EPL 对象并避免名称冲突。部署已编译的模块时运行时会为该部署分配一个部署 ID。部署 ID 唯一标识已编译模块的特定部署。已编译的模块可以参数化并多次部署。语句始终具有语句名称。语句名称标识已部署模块中的语句并且在同一部署中是唯一的。部署 ID 和语句名称的组合唯一标识运行时中的语句。EPL 是类型安全的因为 EPL 不允许对对象执行无效的操作。2、快速开始2.1、引入依赖dependenciesdependencygroupIdcom.espertech/groupIdartifactIdesper-common/artifactIdversion8.8.0/version/dependencydependencygroupIdcom.espertech/groupIdartifactIdesper-runtime/artifactIdversion8.8.0/version/dependencydependencygroupIdcom.espertech/groupIdartifactIdesper-compiler/artifactIdversion8.8.0/version/dependency!-- 日志 --dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-simple/artifactIdversion1.7.36/version/dependency/dependencies2.2、提供输入事件信息应用程序可以注册事件类型以指示编译器输入事件的格式。编译模块时编译器会检查可用的事件类型信息以确定该模块是否有效。此示例假设存在一个名为 PersonEvent 的 Java 类并且 PersonEvent 类的每个实例都是一个事件。publicclassPersonEvent{privateStringname;privateintage;publicPersonEvent(Stringname,intage){this.namename;this.ageage;}publicStringgetName(){returnname;}publicintgetAge(){returnage;}}2.3、编译EPL可以通过调用 EPCompilerProvider 类的静态方法 getCompiler 来获取编译器EPCompilercompilerEPCompilerProvider.getCompiler();编译器需要一个配置对象ConfigurationconfigurationnewConfiguration();configuration.getCommon().addEventType(PersonEvent.class);本入门部分的示例模块仅包含一条语句用于选择每个到达人员事件的姓名和年龄。它使用name注解指定语句名称并将语句命名为my-statement。name(my-statement)select name,age fromPersonEvent使用 compile 方法编译模块并将配置作为编译器的一部分传递。CompilerArgumentsargsnewCompilerArguments(configuration);EPCompiledepCompiled;try{epCompiledcompiler.compile(name(my-statement)select name,age fromPersonEvent,args);}catch(EPCompileExceptionex){// handle exception herethrownewRuntimeException(ex);}编译此模块时编译器会验证 PersonEvent 是否存在因为它列在from 子句中。编译器还会验证 PersonEvent 的 name 和 age 属性是否可用因为它们列在select 子句中。编译器生成字节码用于提取属性值并生成输出事件。编译器构建内部数据结构供后续的筛选索引使用以确保当收到 PersonEvent 时能够快速处理。2.4、运行时获取运行时需要提供一个配置对象该对象添加了预定义的人员事件ConfigurationconfigurationnewConfiguration();configuration.getCommon().addEventType(PersonEvent.class);可以通过调用 EPRuntimeProvider 类的静态方法 getDefaultRuntime 并传递配置信息来获取运行时环境EPRuntimeruntimeEPRuntimeProvider.getDefaultRuntime(configuration);部署 EPL 编译模块并附加回调使用管理界面的 deploy 方法部署已编译的模块。EPDeploymentdeployment;try{deploymentruntime.getDeploymentService().deploy(epCompiled);}catch(EPDeployExceptionex){// handle exception herethrownewRuntimeException(ex);}作为部署的一部分运行时会验证所有模块依赖项例如事件类型是否确实存在。在部署期间运行时会向筛选索引添加条目以确保当收到 PersonEvent 时能够快速处理。您的应用程序可以将回调函数附加到 EPStatement 以接收语句结果。EPStatementstatementruntime.getDeploymentService().getStatement(deployment.getDeploymentId(),mystatement);statement.addListener((newData,oldData,statement,runtime)-{Stringname(String)newData[0].get(name);intage(int)newData[0].get(age);System.out.println(String.format(Name: %s, Age: %d,name,age));});2.5、发送事件可以使用运行时接口中的 sendEventBean 方法或其他与您选择的事件相匹配的 sendEvent 方法将事件发送到运行时ntime.getEventService().sendEventBean(newPersonEvent(Peter,10),PersonEvent);输出Name:Peter,Age:102.6、示例publicclassQuickStart{publicstaticvoidmain(String[]args){EPCompilercompilerEPCompilerProvider.getCompiler();ConfigurationconfigurationnewConfiguration();configuration.getCommon().addEventType(PersonEvent.class);CompilerArgumentscompilerArgumentsnewCompilerArguments(configuration);Stringeplname(my-statement) select name, age from PersonEvent;EPCompiledepCompiled;try{epCompiledcompiler.compile(epl,compilerArguments);}catch(EPCompileExceptionex){thrownewRuntimeException(ex);}EPRuntimeruntimeEPRuntimeProvider.getDefaultRuntime(configuration);EPDeploymentdeployment;try{deploymentruntime.getDeploymentService().deploy(epCompiled);}catch(EPDeployExceptionex){thrownewRuntimeException(ex);}EPStatementstatementruntime.getDeploymentService().getStatement(deployment.getDeploymentId(),my-statement);statement.addListener((newData,oldData,stat,rt)-{Stringname(String)newData[0].get(name);intage(int)newData[0].get(age);System.out.println(String.format(Name: %s, Age: %d,name,age));});runtime.getEventService().sendEventBean(newPersonEvent(Peter,10),PersonEvent);try{Thread.sleep(5000L);}catch(InterruptedExceptione){e.printStackTrace();}}}3、温控示例publicclassTemperatureEvent{privateStringdeviceId;privatedoubletemperature;publicTemperatureEvent(StringdeviceId,doubletemperature){this.deviceIddeviceId;this.temperaturetemperature;}// Esper 强依赖 Getter 方法来提取属性publicStringgetDeviceId(){returndeviceId;}publicdoublegetTemperature(){returntemperature;}}publicclassEsperQuickStart{publicstaticvoidmain(String[]args)throwsException{// 1. 基础配置注册我们的事件类型ConfigurationconfignewConfiguration();config.getCommon().addEventType(TempEvent,TemperatureEvent.class);// 2. 编写 EPL 规则统计最近 3 笔温控事件的平均温度StringeplName(test)select deviceId, avg(temperature) as avgTemp from TempEvent.win:length(3) group by deviceId;// 3. 编译 EPL 语句将 EPL 文本编译为 JVM 字节码CompilerArgumentscompilerArgsnewCompilerArguments(config);EPCompiledcompiledEPCompilerProvider.getCompiler().compile(epl,compilerArgs);// 4. 获取运行时引擎EPRuntimeruntimeEPRuntimeProvider.getRuntime(MyRuntime,config);// 5. 通过部署服务进行部署EPDeploymentdeploymentruntime.getDeploymentService().deploy(compiled);// 6. 从部署好的 deployment 实例中拿到生成的语句// 默认情况下如果你的 EPL 里没有显式用 Name(xxx) 命名// 编译出的第一个语句可以通过 getStatements()[0] 直接获取EPStatementstatementruntime.getDeploymentService().getStatement(deployment.getDeploymentId(),test);// 7. 注册回调监听器statement.addListener((newData,oldData,stmt,rt)-{if(newData!null){StringdeviceId(String)newData[0].get(deviceId);DoubleavgTemp(Double)newData[0].get(avgTemp);System.out.printf( 【告警通知】设备: %s | 最近3次平均温度: %.2f\n,deviceId,avgTemp);}});// 8. 模拟发送实时流数据System.out.println( 启动数据流发送...);runtime.getEventService().sendEventBean(newTemperatureEvent(Sensor_01,20.0),TempEvent);runtime.getEventService().sendEventBean(newTemperatureEvent(Sensor_01,24.0),TempEvent);// 发送第三条时滑动窗口攒满 3 条avg 达到 25 度触发上面监听器的打印runtime.getEventService().sendEventBean(newTemperatureEvent(Sensor_01,31.0),TempEvent);// 发送第四条时第一条 (20.0) 会被挤出窗口窗口内变为 [24, 31, 41]runtime.getEventService().sendEventBean(newTemperatureEvent(Sensor_01,41.0),TempEvent);}} 启动数据流发送... 【告警通知】设备:Sensor_01|最近3次平均温度:20.00 【告警通知】设备:Sensor_01|最近3次平均温度:22.00 【告警通知】设备:Sensor_01|最近3次平均温度:25.00 【告警通知】设备:Sensor_01|最近3次平均温度:32.00