暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【经验与坑】Java实现股市摆盘实时推送

所使用技术

  • Spring Boot

  • Redis

  • WebSocket


架构图

详细说明:交易服务也就是实时行情数据源产生的摆盘数据,存放在redis的队列内,然后行情服务会监听这个队列,获取这个队列里面的行情节点,经过数据处理,生成K线数据,然后存放redis内。同时,将行情数据发布给网关,网关订阅到行情数据后,使用WebSocket实时推送给前端用户。


涉及的问题

1.数据从哪里来

这里的行情数据不是本系统产生的,而是通过对接第三方券商(比如Futu证券等)拿到行情,然后我们这边再对这些数据进行处理然后推送展示。

产生的行情数据结构中,最为重要的包含:成交价格,成交量,成交时间这三个字段,其他的包括股票编号、时间戳等等也是要放进队列中的,方便后续拓展。


2.什么是分时线

分时线就是每分钟的最后一笔成交价的连线。分时线即大盘、个股分时走势图中的白色曲线,反映的是大盘、个股的实时走势。分时线是实时变动的,这里就要使用 WebSocket 技术推送行情了。那么也就引出了另外的问题,分时线数据如何存储?仔细推测,要存储分时图的数据,我们就要将每一分钟的最后一口价进行存储,并且存储时需要注明这个价格是哪一分钟的价格,这些细节问题后续在功能实现部分会道出解决办法。


3.什么是K线图

股市中的K线图的画法包含四个数据,即开盘价、最高价、最低价、收盘价、成交量、成交额,所有的K线都是围绕这四个数据展开的,反映大势的状况和价格信息。如果把每日的K线图放在一张纸上,就能得到日K线图,同样道理就能得到周、月图。


举个例子:比如今天是2022-03-01,那么今天就会产生一个日K数据,数据包含:日期(精确到日,即2022-03-01),开盘价(今天的最低价格)、收盘价(今天的最后一口价),成交量(今天的总成交量),成交额(今天的总成交额)。同理,周线、月线都是一样的规则。


4.为什么要把数据保存到redis队列中,然后行情服务再去主动获取队列里的行情?而不直接使用Redis的发布订阅机制将行情发布给行情服务;或者使用rocketMQ,将行情异步发送给行情服务?

我一开始也是这样想的,为什么不直接使用Redis发布订阅来将行情发布给行情服务呢,这样就可以不用把行情数据存放到Redis的一个队列里这个步骤了。理论上这样是能够实现功能的,但是存在无法弥补的Bug,主要就是因为Redis的发布订阅的消息是不可靠的,也就是说消息很可能丢失,加入行情服务挂掉了,但是交易服务还是还是在正常的发布消息,那么此时发布的消息就全部丢失了。所以为了系统的稳定性,才需要把行情数据先保存到一个队列内,然后等待行情服务主动去获取,这样的好处就是,即使行情服务挂掉了,行情数据也不会丢失。等到行情服务恢复后,行情服务会主动去获取队列里面的数据,一个一个处理。

5.关于WebSocket

WebSocket具体的特点如下:(1)建立在TCP协议之上。(2)与HTTP协议有着良好的兼容性。默认端口号为 80/443,并且握手阶段采用HTTP协议,做到了对传统HTTP协议的兼容。(3)数据格式较为轻量,性能开销小,服务器压力小。(4)可完全取代Ajax (5)协议标识符为 ws(如果加密就是 wss 对比 https记忆)网址就是URL

及萨博涵五六,公众号:码农智涵的程序人生【经验与坑】Spring Boot集成WebSocket


代码实现


根据上面的架构图就是实现的一个大致思路,我们需要将行情基础数据存到Redis的一个队列里,然后行情服务去主动获取队列内的数据,行情服务获取到数据后,就将数据发布到网关,然后由网关使用WebSocket将数据推送到前端。同时,还需要将生成K线图的K线数据存储到Redis里面。数据是否会存入MySQL中进行持久化操作还有待考虑,因为我们目前只想存近三天的数据。以及后续加入Redis集群模式的优化。


大致的实现思路如下:

  • 从数据源获取行情数据

  • 将数据封装成对象发布到Redis频道

  • 行情服务订阅对应频道

  • 使用WebSocket将行情推送至前端


一.从数据源获取行情数据

券商数据源选择由腾讯的富途证券。

前言

及萨博涵五六,公众号:码农智涵的程序人生【经验与坑】如何使用富途API

但是关于摆盘数据,我们还需要设置一个VO层,对应具体的数据进行再一步的封装,以达到符合我们项目需求的数据格式。

最终的效果图如下图所示:



二. Spring Boot集成Redisson实现消息发布/订阅


Redisson是基于NIO的Netty框架上,充分利用了Redis KV 数据库提供的优势,基于Java为使用者提供了一系列具有分布式特性的常用工具类。为以后的分布式化留足了拓展空间。


其次,我们项目中主要使用到这个Redis发布订阅的机制,但是Jedis对这方面的支持非常差,只能向Redis中发布字符串类型的数据,而Redisson却能很好的支持实体类对象的传输。


以上这就是我们最终选择使用Redisson的原因。



2.1 导入依赖

            <!--redis依赖-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <!--使用Redisson-->
    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.16.6</version>
    </dependency>
    <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    </dependency>

    配置文件配置Redis

      spring:
      redis:
          host: localhost
          port: ******
      password: *********
      database: 0
      lettuce:
      pool:
      max-idle: 8
      min-idle: 0
      max-active: 8
      max-wait: -1ms
      timeout: 10000ms


      2.2 配置器

        @Configuration
        public class RedissonConfig {


        @Value("${spring.redis.host}")
        private String host;


        @Value("${spring.redis.port}")
        private String port;


        @Value("${spring.redis.password}")
        private String password;


        @Bean
        public RedissonClient redissonClient(){
        Config config = new Config();
        //单节点
        config.useSingleServer().setAddress("redis://" + host + ":" + port);
        if(StringUtils.isEmpty(password)){
        config.useSingleServer().setPassword(null);
        }else{
        config.useSingleServer().setPassword(password);
        }
        //添加主从配置
        // config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});


        // 集群模式配置 setScanInterval()扫描间隔时间,单位是毫秒, 可以用"rediss://"来启用SSL连接
        // config.useClusterServers().setScanInterval(2000).addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001").addNodeAddress("redis://127.0.0.1:7002");


        return Redisson.create(config);
        }


        }


        2.3 Redisson消息的发布订阅

        简单实例:分别设置两个类。一个发布,一个订阅

          package com.han56.service.subscribe;
          import com.han56.entity.orderBookBean.OrderBookS2C;
          import org.redisson.api.RTopic;
          import org.redisson.api.RedissonClient;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.stereotype.Component;


          /**
          * @author han56
          * @description 功能描述
          * @create 2022/3/4 下午5:20
          */
          @Component
          public class Publish {


          @Autowired
              private RedissonClient redissonClient;
          //发布OrderBookS2C对象
          public long publish(OrderBookS2C orderBookS2C){
          //主题即频道:市场代码+股票代码
          RTopic rTopic = redissonClient.getTopic(orderBookS2C.getSecurity().getMarketCode()+":"
          +orderBookS2C.getSecurity().getStockCode());
          return rTopic.publish(orderBookS2C);
          }


          }
            package com.han56.service.subscribe;


            import com.han56.entity.orderBookBean.OrderBookS2C;
            import lombok.extern.slf4j.Slf4j;
            import org.redisson.api.RTopic;
            import org.redisson.api.RedissonClient;
            import org.redisson.api.listener.MessageListener;
            import org.springframework.beans.factory.annotation.Autowired;
            import org.springframework.stereotype.Component;


            import javax.annotation.PostConstruct;


            /**
            * @author han56
            * @description 功能描述
            * @create 2022/3/4 下午5:27
            */
            @Slf4j
            @Component
            public class SubScribeTest {


            @Autowired
                private RedissonClient redissonClient;
            @PostConstruct
            public void subscribe(){
            RTopic rTopic = redissonClient.getTopic("22:300164");
            rTopic.addListener(OrderBookS2C.class, new MessageListener<OrderBookS2C>() {
            // 接受订阅的消息
            @Override
            public void onMessage(CharSequence charSequence, OrderBookS2C orderBookS2C) {
            log.info("接受到消息主题={},内容={}",charSequence,orderBookS2C);
            System.out.println("传输的数据为="+orderBookS2C);
            }
            });
            }


            }

            大致就是这样的步骤。当然还有一些细节问题需要适配本身的项目。




            (3)WebSocket实时推送数据至前端

            这一部分比较套路化,毕竟WebSocket已经不是什么新鲜的技术了,网上有非常多的博客写过。我们这里的需求主要就是监听交易服务发布的频道信息,然后拿到最新的OrderBookS2C对象,解析后转为JSON格式,将JSON数据通过WebSocket传到前端即可。


            配置类:

              @Component
              public class WebSocketConfig {
              /**
              * ServerEndpointExporter 作用
              *
              * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
              *
              * @return
              */
              @Bean
              public ServerEndpointExporter serverEndpointExporter() {
              return new ServerEndpointExporter();
                  }
              }


              WebSocket控制层(主要代码):

                @ServerEndpoint("/websocket/{marketCode}/{stockCode}")
                //建立WebSocket连接
                @OnOpen
                public void onOpen(@PathParam(value = "marketCode") String marketCode,
                @PathParam(value = "stockCode") String stockCode){
                //开始向Redis频道写入数据
                QotDemo qotDemo = new QotDemo();
                qotDemo.getQotData(Integer.parseInt(marketCode),stockCode,redissonClient,publish);
                }

                我们在与客户端建立连接的同时,就开始向Redis对应的频道中写数据。具体的参数通过WebSocket从客户端传过来。


                这里遇到的大坑,主要是Spring Boot单例模式下的大坑。Spring Spring Boot的WebSocket操作类里面使用@AutoWired注入Service或Bean时,会爆空指针异常的问题。就非常纳闷,郁闷了一下午也没整明白咋回事。

                  这部分改bug部分参考了:http://www.ghostlib.com/article/202008141623
                  Author:Ghost

                  用http交互的时候完全没有问题。

                  解决办法:将要注入的service/bean改成static,就不会报空指针问题了。



                  最终效果


                  这部分的摆线数据是直接通过发布/订阅消息队列机制发送出去的,是不考虑消息不稳定的状态的,后续的K线开发会加入redis list消息队列机制来处理。


                  文章转载自码农智涵的程序人生,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论