暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

springboot websocket集群

dean技术分享 2020-05-14
1182

websocket集群在websocket基础上修改,添加redis消息队列

原理:由于集群有多个websocket应用,两个用户有可能不在同一个应用中,发送消息时用户会接收不到消息。所以将用户发送的消息先发布到redis消息队列中,所有应用都连接到同一个redis并订阅消息主题,所有应用都会收到这条消息,最后所有应用都发送这条消息,无论用户连接到哪个websocket应用,都会收到这条消息。

  1. 依赖
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
    </dependencies>

  1. 配置文件
spring:
  redis:
    host: 192.168.211.100
    port: 6379
    database: 1
    password: 
    timeout: 5000ms

  1. 连接校验
/**
 * 连接时校验用户信息,并返回重写的Principal
 */

@Component
public class MyHandshakeHandler extends DefaultHandshakeHandler {

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        if (!(request instanceof ServletServerHttpRequest)) {
            return null;
        }
        ServletServerHttpRequest req = (ServletServerHttpRequest) request;
        //获取请求参数中携带的uid
        String uid = req.getServletRequest().getParameter("uid");
        if(uid == null){
            throw new RuntimeException("未登录");
        }
        return new MyPrincipal(uid);
    }
}

  1. 记录用户登陆退出,将用户信息存储到redis中
/**
 * 用户登录退出操作
 */

@Component
public class MyWebSocketHandler implements WebSocketHandlerDecoratorFactory {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public WebSocketHandler decorate(WebSocketHandler handler) {
        return new WebSocketHandlerDecorator(handler) {
            //用户登录
            @Override
            public void afterConnectionEstablished(WebSocketSession session) throws Exception {
                String uid = session.getPrincipal().getName();
                System.out.println(uid + "登录");
                //将用户存入到redis在线用户中
                stringRedisTemplate.opsForSet().add("online", uid);
                super.afterConnectionEstablished(session);
            }

            //用户退出
            @Override
            public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
                String uid = session.getPrincipal().getName();
                System.out.println(uid + "退出");
                //将用户从redis在线用户中删除
                stringRedisTemplate.opsForSet().remove("online", uid);
                super.afterConnectionClosed(session, closeStatus);
            }
        };
    }
}

  1. websocket配置
@Configuration
//开启消息代理,默认使用内置消息代理,也可以选择配置RabbitMQ等
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    @Autowired
    private MyWebSocketHandler myWebSocketHandler;
    @Autowired
    private MyHandshakeHandler myHandshakeHandler;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //启用/user /topic两个消息前缀,消息发送的前缀,也是前端订阅的前缀
        registry.enableSimpleBroker("/user""/topic");
        //当使用convertAndSendToUser发送消息时,前端订阅用/user开头。即一对一发送消息,使用/user为前缀订阅
        registry.setUserDestinationPrefix("/user");
        //前端向服务端发送消息的前缀
        registry.setApplicationDestinationPrefixes("/im/");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
        //客户端和服务端进行连接的endpoint
        //如果使用移动端开发app,需要/im/conn/websocket连接
        stompEndpointRegistry.addEndpoint("/im/conn")
                .setHandshakeHandler(myHandshakeHandler)//设置连接校验
                .setAllowedOrigins("*")//跨域
                .withSockJS();//开启sockjs
    }

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
        //注册登陆退出
        registry.addDecoratorFactory(myWebSocketHandler);
    }
}

  1. 创建redis消息处理类
/**
 * 处理订阅redis的消息
 */

@Component
public class RedisReceiver {
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    /**
     * 处理一对一消息
     * @param message 消息队列中的消息
     */

    public void sendMsg(String message) {
        SendMsg msg = JSONObject.parseObject(message, SendMsg.class);
        simpMessagingTemplate.convertAndSendToUser(msg.getToUid(), "msg", msg);
    }

    /**
     * 处理广播消息
     * @param message
     */

    public void sendAllMsg(String message){
        simpMessagingTemplate.convertAndSend("/topic/sys", message);
    }
}

  1. redis消息监听配置
@Configuration
public class RedisMessageListenerConfig {
    @Autowired
    private RedisReceiver redisReceiver;

    /**
     * 监听redis中的订阅信息
     * @param redisConnectionFactory
     * @return
     */

    @Bean
    public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        //添加redis消息队列监听,监听im-topic消息主题的消息,使用messageListenerAdapter()中设置的类和方法处理消息。
        redisMessageListenerContainer.addMessageListener(messageListenerAdapter(), new PatternTopic("im-topic"));
        //同上一样
        redisMessageListenerContainer.addMessageListener(messageAllListenerAdapter(), new PatternTopic("sys-topic"));
        return redisMessageListenerContainer;
    }

    /**
     * 添加订阅消息处理类,通过反射获取处理类中的处理方法
     * 即使用RedisReceiver类中的sendMsg方法处理消息
     * @return
     */

    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        return new MessageListenerAdapter(redisReceiver, "sendMsg");
    }

    @Bean
    public MessageListenerAdapter messageAllListenerAdapter(){
        return new MessageListenerAdapter(redisReceiver, "sendAllMsg");
    }
}

  1. 创建controller
@RestController
public class ImController {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 发送消息,一对一
     * Principal为连接websocket校验时返回的,可以直接在参数中使用
     *
     * @param msg
     * @param principal
     * @return
     */

    @MessageMapping("/send2user")
    public String send2user(@Validated SendMsg msg, Principal principal) {
        String uid = principal.getName();
        //当前发送信息的uid
        msg.setUid(uid);
        System.out.println(uid + ":" + msg);
        //获取在线的用户列表
        Set<String> onlineUsers = stringRedisTemplate.opsForSet().members("online");
        //判断发送的用户是否在线
        if (onlineUsers.contains(msg.getToUid())) {
            //如果用户在线,则将消息发送到redis消息队列im-topic主题中,所有连接同一个redis的应用并订阅im-topic主题都会收到这条消息。
            //然后都使用SimpMessagingTemplate发送消息到指定的订阅中
            //接收消息发送消息的类 RedisReceiver
            stringRedisTemplate.convertAndSend("im-topic", JSONObject.toJSONString(msg));
        } else {
           //用户不在线,保存消息记录,用户上线后拉取,这里不做实现
        }
        return "success";
    }

    /**
     * 发送消息,发送给所有订阅/topic/sys的用户
     * 广播消息只发送给在线的用户
     * @param msg
     * @return
     */

    @GetMapping("/sendAll")
    public String sendAll(String msg){
        System.out.println("广播消息:" + msg);
        //将消息发布到redis sys-topic主题中
        stringRedisTemplate.convertAndSend("sys-topic", msg);
        return "success";
    }

}

  1. 通过nginx配置集群
   upstream websocket {
server 192.168.211.100:8080;
server 192.168.211.100:8081;
}
server {
listen 80;
server_name 192.168.211.100;

location / {
proxy_pass http://websocket;
//websocket集群需要配置以下两个参数,否在会连接失败
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
}
}

  1. 修改html
//修改连接为nginx配置的路径
let socket = new SockJS('http://192.168.211.100/im/conn?uid=' + uid);

  1. 测试
    开启两个不同端口的websocket应用,与nginx配置的集群相同。打开两个页面,连接到不同端口的websocket应用,通过控制台查看连接。两个用户发送消息,在不同应用中的用户依然能收到消息


阅读原文有项目链接

文章转载自dean技术分享,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论