暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

「Flink」工具人之智能盯盘乞丐版

将咖啡转化为程序的工具人 2021-09-09
729

        一般篮球打得越菜,就越喜欢买篮球鞋,当然工具人也不例外,看到心仪的球鞋就毫不手软,犹如像饿虎扑食,凶残无比。可是,在股市中,工具人却怂得一笔,往往在犹豫中,错过了一次又一次一夜暴富的机会。

        相信像工具人这样的韭菜有很多,所以工具人决定做一个乞丐版的“智能盯盘”来赚点小钱,来补贴球鞋的巨额支出。

        假设我们看中一只芯片股,但由于大盘行情下调,股价也一跌再跌,我们想在股价企稳后抄底。工具人判断,如果该股票在跌到12元后,最多继续下探3%,然后会迅速反弹(如果超过了3%后,还在下跌,则工具人判断失误,放弃这次机会),如果在5分钟内反弹6%,那么该股票的走势在短期内已经逆转,可以做一波短线操作了,如果时间超过5分钟,或者涨幅不到6%,说明市场的逆转意愿不强烈,后续走势不明朗,工具人也将继续装怂。

        有了策略思路后,工具人就可以开始动手了。我们就以反弹买入为例,使用Flink来实现一个乞丐版的智能盯盘策略。

        考虑到在实际使用中,用户会不断地添加、修改、删除盯盘规则,所以需要把用户策略也作为一个输入流,实时变更Flink中的用户策略数据。大致流程如下图所示:

        

首先,我们定义用户策略模型:


    public class TrackRule implements Serializable {


    String ruleID;
    String securityCode;
    String exchange;


    public TrackRule(final String ruleID, final String securityCode, final String exchange) {
    this.ruleID = ruleID;
    this.securityCode = securityCode;
    this.exchange = exchange;
    }


        .....
    }


    public class TrackReboundRule extends TrackRule {


    private List<TrackRebound> trackReboundList = Lists.newArrayList();


    public TrackReboundRule(final String ruleID, final String securityCode, final String exchange) {
    super(ruleID, securityCode, exchange);
    }




        ......


    public static class TrackRebound implements Serializable {


            BigDecimal basePrice;        //基础价格
            BigDecimal fallChangeRate;   //下跌区间
    BigDecimal tenMinRiseRate; 反弹涨幅


    public TrackRebound(final BigDecimal basePrice, final BigDecimal tenMinRiseRate, final BigDecimal fallChangeRate) {
    this.tenMinRiseRate = tenMinRiseRate;
    this.fallChangeRate = fallChangeRate;
    }




    ......
    }
    }



           在这里需要缓存最新收到的用户策略数据在Flink的State中,并将实时行情流与用户策略流合并后,把股票行情与该股票对应的策略分发到同一个分区中。


      //kafka source
      FlinkKafkaConsumer010<QuoteStream> quoteConsumer = new FlinkKafkaConsumer010<QuoteStream>(rtQuoteTopic, new QuoteStreamL2Deserializer(), kafkaProperties);
      DataStream<QuoteStream> quoteStream = env.addSource(quoteC);
      FlinkKafkaConsumer010<TrackReboundRule> ruleConsumer = new FlinkKafkaConsumer010<QuoteStream>(userRuleTopic, new TrackReboundRuleDeserializer(), kafkaProperties);
      DataStream<TrackReboundRule> ruleStream = env.addSource(ruleConsumer);


      DataStream<QuoteTracker> quoteTrackerStream = quoteStream
      .filter(new FilterFunction<QuoteStream>() {
      @Override
      public boolean filter(QuoteStream quoteStream) {
      //过滤非股票数据
      return "E".equals(quoteStream.getSecurityType());
      }
      })
      .connect(ruleStream)
      .keyBy(new QuoteStreamKeySelector(), new RuleSelector())
      .flatMap(new QuoteTrackFunction())
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<QuoteTracker>(Time.milliseconds(500L)) {
      @Override
      public long extractTimestamp(QuoteTracker quoteTracker) {
      return TimestampUtils.timestamp2Long(quoteTracker.getQuoteStream().getTimestamp());
      })
      .keyBy(new KeySelector<QuoteTracker, String>() {
      @Override
      public String getKey(final QuoteTracker quoteTracker) throws Exception {
      return StockKeyUtils.genStockKey(quoteTracker.getQuoteStream().getCode(), quoteTracker.getQuoteStream().getExchange());
      }
      });

              策略识别器,我们依然使用Flink-CEP来实现,首先第一步我们需要识别到下跌后的底部位置,然后根据底部位置计算反弹涨幅,当反弹涨幅达到预期阈值后,输出买入点位。


                Pattern<QuoteTracker, QuoteTracker> pattern = Pattern.<QuoteTracker>
                        begin("MarkPoint")   //下跌后的底部位置
                        .where(new SimpleCondition<QuoteTracker>() {
        @Override
                            public boolean filter(QuoteTracker quoteTracker) throws Exception {
        QuoteStream quoteStream = quoteTracker.getQuoteStream();
        if (Strings.isNullOrEmpty(quoteStream.getTradePrice()) || Strings.isNullOrEmpty(quoteStream.getOpenPrice())) {
        return false;
        }
        BigDecimal tradePrice = new BigDecimal(quoteStream.getTradePrice());
        try {
        if (quoteTracker.getRule() == null) {
        return false;
        }
        for (TrackReboundRule.TrackRebound rebound : quoteTracker.getRule().getTrackReboundList()) {
        BigDecimal changeRate = tradePrice.divide(rebound.getBasePrice(), 4, RoundingMode.HALF_UP).subtract(BigDecimal.valueOf(1)).multiply(BigDecimal.valueOf(100));
        if (changeRate.compareTo(rebound.getTodayChangeRate().subtract(BigDecimal.valueOf(0.1))) > 0
        && changeRate.compareTo(rebound.getTodayChangeRate().add(BigDecimal.valueOf(0.1))) < 0) {
        return true;
        }
        }
        } catch (Exception e) {
        logger.error("error:", e);
        }
        return false;
        }
        })
                        .followedByAny("ReboundPoint") //反弹的买点
        .where(new IterativeCondition<QuoteTracker>() {


        @Override
        public boolean filter(QuoteTracker value, Context<QuoteTracker> ctx) throws Exception {
        Iterable<QuoteTracker> markPointIterator = ctx.getEventsForPattern("MarkPoint");
        Iterator<QuoteTracker> it = markPointIterator.iterator();
        while (it.hasNext()) {
        QuoteTracker tracker = it.next();
        QuoteStream markPointQuoteStream = tracker.getQuoteStream();
        BigDecimal markPointPrice = new BigDecimal(markPointQuoteStream.getTradePrice());
        BigDecimal tradePrice = new BigDecimal(value.getQuoteStream().getTradePrice());
        BigDecimal changeRate = tradePrice.divide(markPointPrice, 4, RoundingMode.HALF_UP).subtract(BigDecimal.valueOf(1)).multiply(BigDecimal.valueOf(100));
        for (TrackReboundRule.TrackRebound rebound : tracker.getRule().getTrackReboundList()) {
        if (changeRate.compareTo(rebound.getTenMinRiseRate()) > 0) {
        return true;
        }
        }
        }
        return false;
        }
        })
        .within(Time.minutes(10));


        PatternStream<QuoteTracker> patternStream = CEP.pattern(quoteTrackerStream, pattern);
        SingleOutputStreamOperator<AllInChance> result = patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<QuoteTracker, QuoteTracker>() {


        @Override
        public QuoteTracker timeout(Map<String, List<QuoteTracker>> map, long l) throws Exception {
        return map.get("MarkPoint").get(0);
        }
        }, new PatternSelectFunction<QuoteTracker, AllInChance>() {


        @Override
        public AllInChance select(Map<String, List<QuoteTracker>> map) throws Exception {
        AllInChance allInChance = new AllInChance();
        allInChance.setBottom(map.get("MarkPoint").get(0).getQuoteStream());
        allInChance.setRebound(map.get("ReboundPoint").get(0).getQuoteStream());
        return allInChance;
        }
        });

               反弹行情出现后,股票会持续上涨,此时识别器会不断地提醒用户,所以我们需要控制提醒次数,减少对用户不必要的干扰。


          SingleOutputStreamOperator<AllInChance> output = result.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<AllInChance>(Time.milliseconds(500L)) {
          @Override
          public long extractTimestamp(final AllInChance allInChance) {
                  return TimestampUtils.timestamp2Long(allInChance.getRebound().getTimestamp());
          }
          }).keyBy(new KeySelector<AllInChance, String>() {
          @Override
          public String getKey(final AllInChance value) throws Exception {
          return StockKeyUtils.genStockKey(value.getRebound().getCode(), value.getRebound().getExchange());
          }
          }).timeWindow(Time.of(10, TimeUnit.MINUTES))
          .reduce(new ReduceFunction<AllInChance>() {
          @Override
          public AllInChance reduce(final AllInChance value1, final AllInChance value2) throws Exception {
          return value1;
              }
          });
          output.print();

                  智能盯盘乞丐版大致完成了,是国王还是乞丐,拉出来溜溜!

                  工具人订阅了一只股票,股票名称:顺丰控股,股票代码:002352

          结果如下

            【002352】底部价格:[64.43],时间:[20210413101215000],买入价格:[65.06],时间:[20210413102112000]


            【002352】底部价格:[64.14],时间:[20210413132727000],买入价格:[64.76],时间:[20210413132909000]



            对应当日分时走势:


                    事实证明,智能盯盘确实能帮助我们散户在多变的市场中,寻找到不错的机会。大鱼围猎吃小鱼,小鱼埋伏吃虾米,如今虾米有了智能盯盘,也能喝点鲜鱼汤了。


            工资低,房租高,咖啡喝不起!

            炒股票,用盯盘首付能凑齐!

            文章转载自将咖啡转化为程序的工具人,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论