一般篮球打得越菜,就越喜欢买篮球鞋,当然工具人也不例外,看到心仪的球鞋就毫不手软,犹如像饿虎扑食,凶残无比。可是,在股市中,工具人却怂得一笔,往往在犹豫中,错过了一次又一次一夜暴富的机会。
相信像工具人这样的韭菜有很多,所以工具人决定做一个乞丐版的“智能盯盘”来赚点小钱,来补贴球鞋的巨额支出。
假设我们看中一只芯片股,但由于大盘行情下调,股价也一跌再跌,我们想在股价企稳后抄底。工具人判断,如果该股票在跌到12元后,最多继续下探3%,然后会迅速反弹(如果超过了3%后,还在下跌,则工具人判断失误,放弃这次机会),如果在5分钟内反弹6%,那么该股票的走势在短期内已经逆转,可以做一波短线操作了,如果时间超过5分钟,或者涨幅不到6%,说明市场的逆转意愿不强烈,后续走势不明朗,工具人也将继续装怂。

有了策略思路后,工具人就可以开始动手了。我们就以反弹买入为例,使用Flink来实现一个乞丐版的智能盯盘策略。
考虑到在实际使用中,用户会不断地添加、修改、删除盯盘规则,所以需要把用户策略也作为一个输入流,实时变更Flink中的用户策略数据。大致流程如下图所示:

首先,我们定义用户策略模型:
public class TrackRule implements Serializable {String ruleID;String securityCode;String exchange;public TrackRule(final String ruleID, final String securityCode, final String exchange) {this.ruleID = ruleID;this.securityCode = securityCode;this.exchange = exchange;}.....}public class TrackReboundRule extends TrackRule {private List<TrackRebound> trackReboundList = Lists.newArrayList();public TrackReboundRule(final String ruleID, final String securityCode, final String exchange) {super(ruleID, securityCode, exchange);}......public static class TrackRebound implements Serializable {BigDecimal basePrice; //基础价格BigDecimal fallChangeRate; //下跌区间BigDecimal tenMinRiseRate; 反弹涨幅public TrackRebound(final BigDecimal basePrice, final BigDecimal tenMinRiseRate, final BigDecimal fallChangeRate) {this.tenMinRiseRate = tenMinRiseRate;this.fallChangeRate = fallChangeRate;}......}}
在这里需要缓存最新收到的用户策略数据在Flink的State中,并将实时行情流与用户策略流合并后,把股票行情与该股票对应的策略分发到同一个分区中。
//kafka sourceFlinkKafkaConsumer010<QuoteStream> quoteConsumer = new FlinkKafkaConsumer010<QuoteStream>(rtQuoteTopic, new QuoteStreamL2Deserializer(), kafkaProperties);DataStream<QuoteStream> quoteStream = env.addSource(quoteC);FlinkKafkaConsumer010<TrackReboundRule> ruleConsumer = new FlinkKafkaConsumer010<QuoteStream>(userRuleTopic, new TrackReboundRuleDeserializer(), kafkaProperties);DataStream<TrackReboundRule> ruleStream = env.addSource(ruleConsumer);DataStream<QuoteTracker> quoteTrackerStream = quoteStream.filter(new FilterFunction<QuoteStream>() {@Overridepublic boolean filter(QuoteStream quoteStream) {//过滤非股票数据return "E".equals(quoteStream.getSecurityType());}}).connect(ruleStream).keyBy(new QuoteStreamKeySelector(), new RuleSelector()).flatMap(new QuoteTrackFunction()).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<QuoteTracker>(Time.milliseconds(500L)) {@Overridepublic long extractTimestamp(QuoteTracker quoteTracker) {return TimestampUtils.timestamp2Long(quoteTracker.getQuoteStream().getTimestamp());}).keyBy(new KeySelector<QuoteTracker, String>() {@Overridepublic String getKey(final QuoteTracker quoteTracker) throws Exception {return StockKeyUtils.genStockKey(quoteTracker.getQuoteStream().getCode(), quoteTracker.getQuoteStream().getExchange());}});
策略识别器,我们依然使用Flink-CEP来实现,首先第一步我们需要识别到下跌后的底部位置,然后根据底部位置计算反弹涨幅,当反弹涨幅达到预期阈值后,输出买入点位。
Pattern<QuoteTracker, QuoteTracker> pattern = Pattern.<QuoteTracker>begin("MarkPoint") //下跌后的底部位置.where(new SimpleCondition<QuoteTracker>() {@Overridepublic boolean filter(QuoteTracker quoteTracker) throws Exception {QuoteStream quoteStream = quoteTracker.getQuoteStream();if (Strings.isNullOrEmpty(quoteStream.getTradePrice()) || Strings.isNullOrEmpty(quoteStream.getOpenPrice())) {return false;}BigDecimal tradePrice = new BigDecimal(quoteStream.getTradePrice());try {if (quoteTracker.getRule() == null) {return false;}for (TrackReboundRule.TrackRebound rebound : quoteTracker.getRule().getTrackReboundList()) {BigDecimal changeRate = tradePrice.divide(rebound.getBasePrice(), 4, RoundingMode.HALF_UP).subtract(BigDecimal.valueOf(1)).multiply(BigDecimal.valueOf(100));if (changeRate.compareTo(rebound.getTodayChangeRate().subtract(BigDecimal.valueOf(0.1))) > 0&& changeRate.compareTo(rebound.getTodayChangeRate().add(BigDecimal.valueOf(0.1))) < 0) {return true;}}} catch (Exception e) {logger.error("error:", e);}return false;}}).followedByAny("ReboundPoint") //反弹的买点.where(new IterativeCondition<QuoteTracker>() {@Overridepublic boolean filter(QuoteTracker value, Context<QuoteTracker> ctx) throws Exception {Iterable<QuoteTracker> markPointIterator = ctx.getEventsForPattern("MarkPoint");Iterator<QuoteTracker> it = markPointIterator.iterator();while (it.hasNext()) {QuoteTracker tracker = it.next();QuoteStream markPointQuoteStream = tracker.getQuoteStream();BigDecimal markPointPrice = new BigDecimal(markPointQuoteStream.getTradePrice());BigDecimal tradePrice = new BigDecimal(value.getQuoteStream().getTradePrice());BigDecimal changeRate = tradePrice.divide(markPointPrice, 4, RoundingMode.HALF_UP).subtract(BigDecimal.valueOf(1)).multiply(BigDecimal.valueOf(100));for (TrackReboundRule.TrackRebound rebound : tracker.getRule().getTrackReboundList()) {if (changeRate.compareTo(rebound.getTenMinRiseRate()) > 0) {return true;}}}return false;}}).within(Time.minutes(10));PatternStream<QuoteTracker> patternStream = CEP.pattern(quoteTrackerStream, pattern);SingleOutputStreamOperator<AllInChance> result = patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<QuoteTracker, QuoteTracker>() {@Overridepublic QuoteTracker timeout(Map<String, List<QuoteTracker>> map, long l) throws Exception {return map.get("MarkPoint").get(0);}}, new PatternSelectFunction<QuoteTracker, AllInChance>() {@Overridepublic AllInChance select(Map<String, List<QuoteTracker>> map) throws Exception {AllInChance allInChance = new AllInChance();allInChance.setBottom(map.get("MarkPoint").get(0).getQuoteStream());allInChance.setRebound(map.get("ReboundPoint").get(0).getQuoteStream());return allInChance;}});
反弹行情出现后,股票会持续上涨,此时识别器会不断地提醒用户,所以我们需要控制提醒次数,减少对用户不必要的干扰。
SingleOutputStreamOperator<AllInChance> output = result.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<AllInChance>(Time.milliseconds(500L)) {@Overridepublic long extractTimestamp(final AllInChance allInChance) {return TimestampUtils.timestamp2Long(allInChance.getRebound().getTimestamp());}}).keyBy(new KeySelector<AllInChance, String>() {@Overridepublic String getKey(final AllInChance value) throws Exception {return StockKeyUtils.genStockKey(value.getRebound().getCode(), value.getRebound().getExchange());}}).timeWindow(Time.of(10, TimeUnit.MINUTES)).reduce(new ReduceFunction<AllInChance>() {@Overridepublic AllInChance reduce(final AllInChance value1, final AllInChance value2) throws Exception {return value1;}});output.print();
智能盯盘乞丐版大致完成了,是国王还是乞丐,拉出来溜溜!
工具人订阅了一只股票,股票名称:顺丰控股,股票代码:002352
结果如下
【002352】底部价格:[64.43],时间:[20210413101215000],买入价格:[65.06],时间:[20210413102112000]【002352】底部价格:[64.14],时间:[20210413132727000],买入价格:[64.76],时间:[20210413132909000]
对应当日分时走势:

事实证明,智能盯盘确实能帮助我们散户在多变的市场中,寻找到不错的机会。大鱼围猎吃小鱼,小鱼埋伏吃虾米,如今虾米有了智能盯盘,也能喝点鲜鱼汤了。

工资低,房租高,咖啡喝不起!
炒股票,用盯盘,首付能凑齐!




