暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

flink案例实践: (五)CEP复杂事件:订单超时15分钟

大数据知会录 2021-07-30
840

建议横屏观看!!!

代码可直接复制运行!!!

你的每一次实践都会取得进步!!!


总结:

CEP:就是按照条件筛选出来,有几个条件就加几个filter,几个条件拼接出的场景,就叫做复杂事件。


1.连续三次登录失败(next方法为连续事件)



import com.alibaba.fastjson.JSONObject;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;


import java.util.List;
import java.util.Map;


public class LoginFailWithCep {
private static final JSONObject jsonLoginEvent = new JSONObject();
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


// 1. 从文件中读取数据
/**
* 5402,83.149.11.115,success,1558430815
* 23064,66.249.3.15,fail,1558430826
* 5692,80.149.25.29,fail,1558430833
* 7233,86.226.15.75,success,1558430832
*/
DataStream<JSONObject> loginEventStream = env.readTextFile("/opt/study/Data/LoginLog2.csv")
.map(line -> {
String[] fields = line.split(",");
jsonLoginEvent.put("userId", fields[0]);
jsonLoginEvent.put("ip", fields[1]);
jsonLoginEvent.put("loginState", fields[2]);
jsonLoginEvent.put("timestamp", fields[3]);
return jsonLoginEvent;
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.seconds(3)) {
@Override
public long extractTimestamp(JSONObject element) {
return new Long(element.getString("timestamp")) * 1000;
}
});


//1、定义一个匹配模式
Pattern.<JSONObject>begin("firstFail").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
return "fail".equals(jsonObject.getString("loginState"));
}
}).next("secondFail").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
return "fail".equals(jsonObject.getString("loginState"));
}
}).next("thirdFail").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
return "fail".equals(jsonObject.getString("loginState"));
}
}).within(Time.seconds(3));


Pattern<JSONObject, JSONObject> loginFailPattern = Pattern.<JSONObject>begin("failEvents").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
return "fail".equals(jsonObject.getString("loginState"));
}
}).times(3).consecutive()
.within(Time.seconds(5));


//2、将匹配模式应用到数据流上,得到一个pattern stream
PatternStream<JSONObject> patternStream = CEP.pattern(loginEventStream.keyBy(json -> json.getString("userId")), loginFailPattern);


//3、检出符合匹配条件的复杂事件,进行转换处理,得到报警信息
SingleOutputStreamOperator<JSONObject> warningStream = patternStream.select(new LoginFailMatchDetectWaring());


warningStream.print();
env.execute("login fail detect with cep job");


}
//实现自定义的LoginFailMatchDetectWaring
//public interface PatternSelectFunction<IN, OUT> extends Function, Serializable {
public static class LoginFailMatchDetectWaring implements PatternSelectFunction<JSONObject, JSONObject>{


@Override
public JSONObject select(Map<String, List<JSONObject>> pattern) throws Exception {
JSONObject firstFailEvent = pattern.get("failEvents").get(0);
JSONObject lastFailEvent = pattern.get("failEvents").get(pattern.get("failEvents").size() - 1);
jsonLoginEvent.put("userId2", firstFailEvent.getString("userId"));
jsonLoginEvent.put("firstFailEvent", firstFailEvent.getString("timestamp"));
jsonLoginEvent.put("lastFailEvent", lastFailEvent.getString("timestamp"));
jsonLoginEvent.put("info2", "login fail " + pattern.get("failEvents").size() + " times");
return jsonLoginEvent;
}
}
}


2. 订单超时15分钟(followedBy方法为间隔事件)



import com.alibaba.fastjson.JSONObject;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;


import java.util.List;
import java.util.Map;




public class OrderPayTimeout {
private static final JSONObject jsonOrderEvent = new JSONObject();
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);


// 读取数据并转换成POJO类型
/**
* 34763,create,,1558430936
* 34764,create,,1558430937
* 34763,pay,aaaaaa,1558431136
* 34764,pay,,1558432936
*/
DataStream<JSONObject> orderEventStream = env.readTextFile("/opt/study/Data/OrderLog2.csv")
.map( line -> {
String[] fields = line.split(",");
jsonOrderEvent.put("userId", fields[0]);
jsonOrderEvent.put("orderState", fields[1]);
jsonOrderEvent.put("orderId", fields[2]);
jsonOrderEvent.put("timestamp", fields[3]);
return jsonOrderEvent;
} )
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<JSONObject>() {
@Override
public long extractAscendingTimestamp(JSONObject element) {
return new Long(element.getString("timestamp")) * 1000;
}
});
//1、定义一个带时间限制的模式
Pattern<JSONObject, JSONObject> orderPayPattern = Pattern
.<JSONObject>begin("create").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
return "create".equals(jsonObject.getString("orderState"));
}
})
.followedBy("pay").where(new SimpleCondition<JSONObject>() {
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
return "pay".equals(jsonObject.getString("orderState"));
}
}).within(Time.minutes(15));




//2.定义侧输出流标签,用来表示超时事件
OutputTag<JSONObject> orderTimeoutTag = new OutputTag<JSONObject>("order-timeout") {};


//3.将pattern应用到输入数据流上,得到pattern stream
PatternStream<JSONObject> patternStream = CEP.pattern(orderEventStream.keyBy(json -> json.getString("orderId")), orderPayPattern);


//4.调用select方法,实现对匹配复杂事件和超时复杂事件的提取和处理
SingleOutputStreamOperator<JSONObject> resultStream = patternStream.select(orderTimeoutTag, new OrderTimeoutSelect(), new OrderPaySelect());


resultStream.print("payed normally");
resultStream.getSideOutput(orderTimeoutTag).print("timeout");
env.execute("order timeout detect job");
}
//实现自定义的超时事件处理函数
public static class OrderTimeoutSelect implements PatternTimeoutFunction<JSONObject, JSONObject>{


@Override
public JSONObject timeout(Map<String, List<JSONObject>> pattern, long timeoutTimestamp) throws Exception {


String timeoutOrderId = pattern.get("create").iterator().next().getString("orderId");
jsonOrderEvent.put("createtimeoutOrderId", timeoutOrderId);
jsonOrderEvent.put("createtime", "timeout " + timeoutOrderId);
return jsonOrderEvent;
}
}
//实现自定义的正常匹配事件处理函数
public static class OrderPaySelect implements PatternSelectFunction<JSONObject, JSONObject>{


@Override
public JSONObject select(Map<String, List<JSONObject>> pattern) throws Exception {
String payedOrderId = pattern.get("pay").iterator().next().getString("orderId");
jsonOrderEvent.put("paytimeoutOrderId", payedOrderId);
jsonOrderEvent.put("paytime", "payed");
return jsonOrderEvent;
}
}
}


文章转载自大数据知会录,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论