工具人身边有位来自凡尔赛的朋友,每当工具人掏出东方赢家APP,血淋林地割肉时,凡尔赛人都会低调秀出他1000元入手的茅台,显摆他教科书般的择时能力。

以上截图来自东方赢家APP
于是工具人卧薪尝胆,学习使用Flink-CEP来捕捉震荡市场的个股异动。
首先,我们构造一个股票事件对象,并假设一种异动模式V形反转:股价先下跌10%后上涨20%。
public class Launch {public static class StockEvent {String stockCode;long timestamp;Double price;public StockEvent(String stockCode, long timestamp, Double price) {this.stockCode = stockCode;this.timestamp = timestamp;this.price = price;}....}
接下里我们模拟行情事件:
public class Launch {....public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<StockEvent> source = env.fromElements(new StockEvent("600519.SH", 1616761570000L, 100.00),new StockEvent("600519.SH", 1616761590000L, 95.00),new StockEvent("600519.SH", 1616761630000L, 89.00),new StockEvent("600519.SH", 1616761690000L, 122.00),new StockEvent("600519.SH", 1616761770000L, 99.00),new StockEvent("600519.SH", 1616761830000L, 121.00)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<StockEvent>(Time.milliseconds(500L)) {@Overridepublic long extractTimestamp(StockEvent stockEvent) {return stockEvent.getTimestamp();}}).keyBy(new KeySelector<StockEvent, Object>() {@Overridepublic Object getKey(StockEvent value) throws Exception {return value.getStockCode();}});// 逻辑处理代码result.print();env.execute("execute cep");}}
然后我们编写异动模型处理逻辑:
OutputTag<StockEvent> orderTimeoutOutput = new OutputTag<StockEvent>("orderTimeout") {};Pattern<StockEvent, StockEvent> pattern = Pattern.<StockEvent>begin("base").followedByAny("bottom").where(new IterativeCondition<StockEvent>() {@Overridepublic boolean filter(StockEvent value, Context<StockEvent> ctx) throws Exception {Iterable<StockEvent> baseEvents = ctx.getEventsForPattern("base");Iterator<StockEvent> it = baseEvents.iterator();while (it.hasNext()) {StockEvent e = it.next();if ((e.getPrice() - value.getPrice()) e.getPrice() > 0.1) {return true;}}return false;}}).followedByAny("movement").where(new IterativeCondition<StockEvent>() {@Overridepublic boolean filter(StockEvent value, Context<StockEvent> ctx) throws Exception {Iterable<StockEvent> lastEvents = ctx.getEventsForPattern("bottom");Iterator<StockEvent> it = lastEvents.iterator();while (it.hasNext()) {StockEvent e = it.next();if ((value.getPrice() - e.getPrice()) e.getPrice() > 0.2) {return true;}}return false;}}).within(Time.minutes(6));PatternStream<StockEvent> patternStream = CEP.pattern(source, pattern);SingleOutputStreamOperator<MovementResult> result = patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<StockEvent, StockEvent>() {@Overridepublic StockEvent timeout(Map<String, List<StockEvent>> map, long l) throws Exception {return map.get("base").get(0);}}, new PatternSelectFunction<StockEvent, MovementResult>() {@Overridepublic MovementResult select(Map<String, List<StockEvent>> map) throws Exception {MovementResult res = new MovementResult(map.get("base").get(0).getPrice(), map.get("bottom").get(0).getPrice(), map.get("movement").get(0).getPrice());return res;}});});
以上代码中,我们构建了一个非确定有限自动机,它的状态变迁过程大致如下:

Flink CEP中具有如下几种状态转移元语:
take:消费输入事件,存入缓存,并保持当前状态;
ignore:忽略输入事件,不存入缓存,并保持当前状态;
proceed:感知输入事件,转移到下一个状态,同时保留该事件给下一个状态处理。
对于不断流入的事件,Flink会根据它是否符合条件,来决定它是否被记录进缓存。大致有如下几种选择策略:
strict(严格连续):严格按顺序选择所有符合条件的事件,途中不能出现不符合条件的事件。对应Flink-CEP API中的Pattern.next()/notNext();
skip till next match(宽松连续):按顺序选择所有符合条件的事件,而途中不符合条件的事件被忽略,对应Flink-CEP API中的Pattern.followedBy()/notFollowedBy()。
skip till any match(可变宽松连续):在skip till next match的基础上,还允许忽略一些符合条件的事件,以尽量延长匹配序列的长度,对应Flink-CEP API中的Pattern.followedByAny()。
最终我们可以获得异动结果:
MovementResult{basePrice=100.0, bottomPrice=89.0, movementPrice=122.0}MovementResult{basePrice=100.0, bottomPrice=89.0, movementPrice=121.0}MovementResult{basePrice=122.0, bottomPrice=99.0, movementPrice=121.0}
但问题随之而来,每个输入的行情事件,它可能是一次异动过程的开头,同样也可能是一次异动中间的某个状态,更可能只是某个宽松连续事件中的一个。这些情况都需要被记录在Flink的缓存中,以便后续观察。在这种无界流的情况下,如果对每个匹配序列都是独立存储,那内存的开销会几何式的上升,那Flink是如何应对这个问题的呢?
欲知后事如何?请听下回分解!




