建议横屏观看!!!
代码可直接复制运行!!!
你的每一次实践都会取得进步!!!
总结:
CEP:就是按照条件筛选出来,有几个条件就加几个filter,几个条件拼接出的场景,就叫做复杂事件。
1.连续三次登录失败(next方法为连续事件)
import com.alibaba.fastjson.JSONObject;import org.apache.flink.cep.CEP;import org.apache.flink.cep.PatternSelectFunction;import org.apache.flink.cep.PatternStream;import org.apache.flink.cep.pattern.Pattern;import org.apache.flink.cep.pattern.conditions.SimpleCondition;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import java.util.List;import java.util.Map;public class LoginFailWithCep {private static final JSONObject jsonLoginEvent = new JSONObject();public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 1. 从文件中读取数据/*** 5402,83.149.11.115,success,1558430815* 23064,66.249.3.15,fail,1558430826* 5692,80.149.25.29,fail,1558430833* 7233,86.226.15.75,success,1558430832*/DataStream<JSONObject> loginEventStream = env.readTextFile("/opt/study/Data/LoginLog2.csv").map(line -> {String[] fields = line.split(",");jsonLoginEvent.put("userId", fields[0]);jsonLoginEvent.put("ip", fields[1]);jsonLoginEvent.put("loginState", fields[2]);jsonLoginEvent.put("timestamp", fields[3]);return jsonLoginEvent;}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(3)) {@Overridepublic long extractTimestamp(JSONObject element) {return new Long(element.getString("timestamp")) * 1000;}});//1、定义一个匹配模式Pattern.<JSONObject>begin("firstFail").where(new SimpleCondition<JSONObject>() {@Overridepublic boolean filter(JSONObject jsonObject) throws Exception {return "fail".equals(jsonObject.getString("loginState"));}}).next("secondFail").where(new SimpleCondition<JSONObject>() {@Overridepublic boolean filter(JSONObject jsonObject) throws Exception {return "fail".equals(jsonObject.getString("loginState"));}}).next("thirdFail").where(new SimpleCondition<JSONObject>() {@Overridepublic boolean filter(JSONObject jsonObject) throws Exception {return "fail".equals(jsonObject.getString("loginState"));}}).within(Time.seconds(3));Pattern<JSONObject, JSONObject> loginFailPattern = Pattern.<JSONObject>begin("failEvents").where(new SimpleCondition<JSONObject>() {@Overridepublic boolean filter(JSONObject jsonObject) throws Exception {return "fail".equals(jsonObject.getString("loginState"));}}).times(3).consecutive().within(Time.seconds(5));//2、将匹配模式应用到数据流上,得到一个pattern streamPatternStream<JSONObject> patternStream = CEP.pattern(loginEventStream.keyBy(json -> json.getString("userId")), loginFailPattern);//3、检出符合匹配条件的复杂事件,进行转换处理,得到报警信息SingleOutputStreamOperator<JSONObject> warningStream = patternStream.select(new LoginFailMatchDetectWaring());warningStream.print();env.execute("login fail detect with cep job");}//实现自定义的LoginFailMatchDetectWaring//public interface PatternSelectFunction<IN, OUT> extends Function, Serializable {public static class LoginFailMatchDetectWaring implements PatternSelectFunction<JSONObject, JSONObject>{@Overridepublic JSONObject select(Map<String, List<JSONObject>> pattern) throws Exception {JSONObject firstFailEvent = pattern.get("failEvents").get(0);JSONObject lastFailEvent = pattern.get("failEvents").get(pattern.get("failEvents").size() - 1);jsonLoginEvent.put("userId2", firstFailEvent.getString("userId"));jsonLoginEvent.put("firstFailEvent", firstFailEvent.getString("timestamp"));jsonLoginEvent.put("lastFailEvent", lastFailEvent.getString("timestamp"));jsonLoginEvent.put("info2", "login fail " + pattern.get("failEvents").size() + " times");return jsonLoginEvent;}}}
2. 订单超时15分钟(followedBy方法为间隔事件)
import com.alibaba.fastjson.JSONObject;import org.apache.flink.cep.CEP;import org.apache.flink.cep.PatternSelectFunction;import org.apache.flink.cep.PatternStream;import org.apache.flink.cep.PatternTimeoutFunction;import org.apache.flink.cep.pattern.Pattern;import org.apache.flink.cep.pattern.conditions.SimpleCondition;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.OutputTag;import java.util.List;import java.util.Map;public class OrderPayTimeout {private static final JSONObject jsonOrderEvent = new JSONObject();public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);// 读取数据并转换成POJO类型/*** 34763,create,,1558430936* 34764,create,,1558430937* 34763,pay,aaaaaa,1558431136* 34764,pay,,1558432936*/DataStream<JSONObject> orderEventStream = env.readTextFile("/opt/study/Data/OrderLog2.csv").map( line -> {String[] fields = line.split(",");jsonOrderEvent.put("userId", fields[0]);jsonOrderEvent.put("orderState", fields[1]);jsonOrderEvent.put("orderId", fields[2]);jsonOrderEvent.put("timestamp", fields[3]);return jsonOrderEvent;} ).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<JSONObject>() {@Overridepublic long extractAscendingTimestamp(JSONObject element) {return new Long(element.getString("timestamp")) * 1000;}});//1、定义一个带时间限制的模式Pattern<JSONObject, JSONObject> orderPayPattern = Pattern.<JSONObject>begin("create").where(new SimpleCondition<JSONObject>() {@Overridepublic boolean filter(JSONObject jsonObject) throws Exception {return "create".equals(jsonObject.getString("orderState"));}}).followedBy("pay").where(new SimpleCondition<JSONObject>() {@Overridepublic boolean filter(JSONObject jsonObject) throws Exception {return "pay".equals(jsonObject.getString("orderState"));}}).within(Time.minutes(15));//2.定义侧输出流标签,用来表示超时事件OutputTag<JSONObject> orderTimeoutTag = new OutputTag<JSONObject>("order-timeout") {};//3.将pattern应用到输入数据流上,得到pattern streamPatternStream<JSONObject> patternStream = CEP.pattern(orderEventStream.keyBy(json -> json.getString("orderId")), orderPayPattern);//4.调用select方法,实现对匹配复杂事件和超时复杂事件的提取和处理SingleOutputStreamOperator<JSONObject> resultStream = patternStream.select(orderTimeoutTag, new OrderTimeoutSelect(), new OrderPaySelect());resultStream.print("payed normally");resultStream.getSideOutput(orderTimeoutTag).print("timeout");env.execute("order timeout detect job");}//实现自定义的超时事件处理函数public static class OrderTimeoutSelect implements PatternTimeoutFunction<JSONObject, JSONObject>{@Overridepublic JSONObject timeout(Map<String, List<JSONObject>> pattern, long timeoutTimestamp) throws Exception {String timeoutOrderId = pattern.get("create").iterator().next().getString("orderId");jsonOrderEvent.put("createtimeoutOrderId", timeoutOrderId);jsonOrderEvent.put("createtime", "timeout " + timeoutOrderId);return jsonOrderEvent;}}//实现自定义的正常匹配事件处理函数public static class OrderPaySelect implements PatternSelectFunction<JSONObject, JSONObject>{@Overridepublic JSONObject select(Map<String, List<JSONObject>> pattern) throws Exception {String payedOrderId = pattern.get("pay").iterator().next().getString("orderId");jsonOrderEvent.put("paytimeoutOrderId", payedOrderId);jsonOrderEvent.put("paytime", "payed");return jsonOrderEvent;}}}
文章转载自大数据知会录,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




