暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

「Flink」工具人之初学Flink-CEP(上)

将咖啡转化为程序的工具人 2021-09-09
754

工具人身边有位来自凡尔赛的朋友,每当工具人掏出东方赢家APP,血淋林地割肉时,凡尔赛人都会低调秀出他1000元入手的茅台,显摆他教科书般的择时能力。

以上截图来自东方赢家APP

于是工具人卧薪尝胆学习使用Flink-CEP捕捉震荡市场的个股异动。

首先,我们构造一个股票事件对象,并假设一种异动模式V形反转:股价先下跌10%后上涨20%。


    public class Launch {


    public static class StockEvent {
    String stockCode;
    long timestamp;
    Double price;


    public StockEvent(String stockCode, long timestamp, Double price) {
    this.stockCode = stockCode;
    this.timestamp = timestamp;
    this.price = price;
            }
        ....
    }

    接下里我们模拟行情事件:

      public class Launch {
      ....
      public static void main(String[] args) throws Exception {


      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(1);
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


      DataStream<StockEvent> source = env.fromElements(
                      new StockEvent("600519.SH"1616761570000L100.00),
      new StockEvent("600519.SH", 1616761590000L, 95.00),
      new StockEvent("600519.SH", 1616761630000L, 89.00),
      new StockEvent("600519.SH", 1616761690000L, 122.00),
      new StockEvent("600519.SH", 1616761770000L, 99.00),
      new StockEvent("600519.SH", 1616761830000L, 121.00)


      ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<StockEvent>(Time.milliseconds(500L)) {
      @Override
      public long extractTimestamp(StockEvent stockEvent) {
      return stockEvent.getTimestamp();
      }
      }).keyBy(new KeySelector<StockEvent, Object>() {
      @Override
      public Object getKey(StockEvent value) throws Exception {
      return value.getStockCode();
      }
      });


      // 逻辑处理代码
      result.print();
      env.execute("execute cep");
      }
      }

      然后我们编写异动模型处理逻辑:

        OutputTag<StockEvent> orderTimeoutOutput = new OutputTag<StockEvent>("orderTimeout") {};
        Pattern<StockEvent, StockEvent> pattern = Pattern.<StockEvent>
        begin("base")
        .followedByAny("bottom")
            .where(new IterativeCondition<StockEvent>() {
        @Override
        public boolean filter(StockEvent value, Context<StockEvent> ctx) throws Exception {
        Iterable<StockEvent> baseEvents = ctx.getEventsForPattern("base");
        Iterator<StockEvent> it = baseEvents.iterator();
        while (it.hasNext()) {
        StockEvent e = it.next();
        if ((e.getPrice() - value.getPrice()) e.getPrice() > 0.1) {
        return true;
                        }
                    }
        return false;
                 }
        })
        .followedByAny("movement")
             .where(new IterativeCondition<StockEvent>() {
        @Override
        public boolean filter(StockEvent value, Context<StockEvent> ctx) throws Exception {
        Iterable<StockEvent> lastEvents = ctx.getEventsForPattern("bottom");
        Iterator<StockEvent> it = lastEvents.iterator();
        while (it.hasNext()) {
        StockEvent e = it.next();
        if ((value.getPrice() - e.getPrice()) e.getPrice() > 0.2) {
        return true;
                          }
                      }
        return false;
                   }
              })
              .within(Time.minutes(6));
              PatternStream<StockEvent> patternStream = CEP.pattern(source, pattern);
              SingleOutputStreamOperator<MovementResult> result = patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<StockEvent, StockEvent>() {
        @Override
        public StockEvent timeout(Map<String, List<StockEvent>> map, long l) throws Exception {
        return map.get("base").get(0);
        }
        }, new PatternSelectFunction<StockEvent, MovementResult>() {
        @Override
        public MovementResult select(Map<String, List<StockEvent>> map) throws Exception {
        MovementResult res = new MovementResult(map.get("base").get(0).getPrice(), map.get("bottom").get(0).getPrice(), map.get("movement").get(0).getPrice());
        return res;
        }
        });
        });

                以上代码中,我们构建了一个非确定有限自动机,它的状态变迁过程大致如下:

        Flink CEP中具有如下几种状态转移元语:

        • take:消费输入事件,存入缓存,并保持当前状态;

        • ignore:忽略输入事件,不存入缓存,并保持当前状态;

        • proceed:感知输入事件,转移到下一个状态,同时保留该事件给下一个状态处理。

        对于不断流入的事件,Flink会根据它是否符合条件,来决定它是否被记录进缓存。大致有如下几种选择策略

        • strict(严格连续):严格按顺序选择所有符合条件的事件,途中不能出现不符合条件的事件。对应Flink-CEP API中的Pattern.next()/notNext();

        • skip till next match(宽松连续):按顺序选择所有符合条件的事件,而途中不符合条件的事件被忽略,对应Flink-CEP API中的Pattern.followedBy()/notFollowedBy()。

        • skip till any match(可变宽松连续):在skip till next match的基础上,还允许忽略一些符合条件的事件,以尽量延长匹配序列的长度,对应Flink-CEP API中的Pattern.followedByAny()。


        最终我们可以获得异动结果:

          MovementResult{basePrice=100.0, bottomPrice=89.0, movementPrice=122.0}
          MovementResult{basePrice=100.0, bottomPrice=89.0, movementPrice=121.0}
          MovementResult{basePrice=122.0, bottomPrice=99.0, movementPrice=121.0}

          但问题随之而来,每个输入的行情事件,它可能是一次异动过程的开头,同样也可能是一次异动中间的某个状态,更可能只是某个宽松连续事件中的一个。这些情况都需要被记录在Flink的缓存中,以便后续观察。在这种无界流的情况下,如果对每个匹配序列都是独立存储,那内存的开销会几何式的上升,那Flink是如何应对这个问题的呢?


          欲知后事如何?请听下回分解!




          文章转载自将咖啡转化为程序的工具人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论