

FLink CEP简介
模式(Pattern):这是定义复杂事件的规则,它由一系列简单的事件组成,这些事件之间通过一定的逻辑关系相连。 事件流(Event Stream):这是实时流入系统的数据流,CEP 会在这些流上寻找匹配的模式。 匹配(Match):当事件流中的事件序列符合定义的模式时,就会产生一个匹配。
定义模式:使用 Flink CEP 的 API 定义感兴趣的事件模式。 应用模式:将定义的模式应用到事件流上,Flink CEP 会在流上寻找匹配该模式的事件序列。 处理匹配事件:对找到的匹配事件进行处理,执行相应的业务逻辑。
3、动态FLink CEP 背景介绍
在风险控制或者模式匹配等场景下,用户经常会想要在模式仍然能够提供服务的情况下,改变事件需要匹配的模式。在目前的 Flink CEP 中,一个 CEP 算子有一个固定的 CEP 模式,不支持改变。因此,为了达到上述目的,用户必须重启 Flink 作业,并等待相对较长的更新时间。
另一种常见情况是,一个事件流需要与多个模式匹配。虽然当前的 Flink CEP 不支持在一个 CEP 运算符中匹配多个模式,但用户必须为每个模式设置一个 Flink 作业或一个运算符。这可能会浪费内存和计算资源。为此我们支持了动态加载规则,支持任务不重启Flink 作用动态的匹配规则。
4、FLink CEP 的应用场景
风险控制:例如,检测用户行为模式,如果发现异常行为(如短时间内多次登录失败),则可以触发相应的风险控制措施。
用户画像:通过分析用户的行为事件流,可以构建用户画像,进而实现精准营销。
运维监控:在企业服务的运维管理中,CEP 可以用来配置复杂的监控规则,以实现对服务状态的实时监控。

ELink 动态CEP方案


FLink 动态CEP DEMO演示
1、规则说明
{"afterMatchStrategy": {"type": "NO_SKIP"},"edges": [{"source": "middle","target": "end","type": "STRICT"},{"source": "start","target": "middle","type": "SKIP_TILL_NEXT"}],"name": "end","nodes": [{"condition": {"expression": "action == 2","type": "AVIATOR"},"name": "end","quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["SINGLE"]},"type": "ATOMIC"},{"condition": {"expression": "action == 1","type": "AVIATOR"},"name": "middle","quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["SINGLE"]},"type": "ATOMIC"},{"condition": {"expression": "action == 0","type": "AVIATOR"},"name": "start","quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["SINGLE"]},"type": "ATOMIC"}],"quantifier": {"consumingStrategy": "SKIP_TILL_NEXT","properties": ["SINGLE"]},"type": "COMPOSITE","version": 1,"window": null}
ADD JAR WITH data/sftp/11_dynamic-cep-jar-1_dynamic-cep-jar-1.0-SNAPSHOT.jar AS functions.jar;CREATE TABLE source (id INT,name VARCHAR,productionId INT,action INT,eventTime BIGINT,procTime AS PROCTIME()) WITH ('connector' = 'kafka-x','topic' = 'stream','properties.group.id' = 'stream','scan.startup.mode' = 'latest-offset','properties.bootstrap.servers' = 'localhost:9092','format' = 'json');CREATE TABLE sink(id int) WITH ('connector' = 'mysql-x','url' = 'jdbc:mysql://localhost:3306/stream','schema-name' = 'stream','table-name' = 'stream_cep_001','username' = 'drpeco','password' = '******','sink.buffer-flush.max-rows' = '1024','sink.buffer-flush.interval' = '10000','sink.all-replace' = 'true','sink.parallelism' = '1');INSERT INTO sinkSELECT id_total as idFROM sourceDYNAMIC MATCH_RECOGNIZE (PARTITION BY productionIdORDER BY procTimeOUTPUT (id_total int)WITH_PATTERN ('tableName' = 'dynamic_cep','user' = 'drpeco','password' = '******','driver' = 'com.mysql.cj.jdbc.Driver','jdbcUrl' = 'jdbc:mysql://localhost:3306/cep','jdbcIntervalMillis' = '1000'))AS T;

{"id": 1,"name" : "middle","productionId" : 11,"action" : 0,"eventTime" : 1};{"id": 2,"name" : "middle","productionId" : 11,"action" : 1,"eventTime" : 1};{"id": 3,"name" : "middle","productionId" : 11,"action" : 2,"eventTime" : 1}



总结


文章转载自数栈研习社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




