websocket集群在websocket基础上修改,添加redis消息队列
原理:由于集群有多个websocket应用,两个用户有可能不在同一个应用中,发送消息时用户会接收不到消息。所以将用户发送的消息先发布到redis消息队列中,所有应用都连接到同一个redis并订阅消息主题,所有应用都会收到这条消息,最后所有应用都发送这条消息,无论用户连接到哪个websocket应用,都会收到这条消息。
依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
</dependencies>
配置文件
spring:
redis:
host: 192.168.211.100
port: 6379
database: 1
password:
timeout: 5000ms
连接校验
/**
* 连接时校验用户信息,并返回重写的Principal
*/
@Component
public class MyHandshakeHandler extends DefaultHandshakeHandler {
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
if (!(request instanceof ServletServerHttpRequest)) {
return null;
}
ServletServerHttpRequest req = (ServletServerHttpRequest) request;
//获取请求参数中携带的uid
String uid = req.getServletRequest().getParameter("uid");
if(uid == null){
throw new RuntimeException("未登录");
}
return new MyPrincipal(uid);
}
}
记录用户登陆退出,将用户信息存储到redis中
/**
* 用户登录退出操作
*/
@Component
public class MyWebSocketHandler implements WebSocketHandlerDecoratorFactory {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Override
public WebSocketHandler decorate(WebSocketHandler handler) {
return new WebSocketHandlerDecorator(handler) {
//用户登录
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String uid = session.getPrincipal().getName();
System.out.println(uid + "登录");
//将用户存入到redis在线用户中
stringRedisTemplate.opsForSet().add("online", uid);
super.afterConnectionEstablished(session);
}
//用户退出
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
String uid = session.getPrincipal().getName();
System.out.println(uid + "退出");
//将用户从redis在线用户中删除
stringRedisTemplate.opsForSet().remove("online", uid);
super.afterConnectionClosed(session, closeStatus);
}
};
}
}
websocket配置
@Configuration
//开启消息代理,默认使用内置消息代理,也可以选择配置RabbitMQ等
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
private MyWebSocketHandler myWebSocketHandler;
@Autowired
private MyHandshakeHandler myHandshakeHandler;
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//启用/user /topic两个消息前缀,消息发送的前缀,也是前端订阅的前缀
registry.enableSimpleBroker("/user", "/topic");
//当使用convertAndSendToUser发送消息时,前端订阅用/user开头。即一对一发送消息,使用/user为前缀订阅
registry.setUserDestinationPrefix("/user");
//前端向服务端发送消息的前缀
registry.setApplicationDestinationPrefixes("/im/");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
//客户端和服务端进行连接的endpoint
//如果使用移动端开发app,需要/im/conn/websocket连接
stompEndpointRegistry.addEndpoint("/im/conn")
.setHandshakeHandler(myHandshakeHandler)//设置连接校验
.setAllowedOrigins("*")//跨域
.withSockJS();//开启sockjs
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
//注册登陆退出
registry.addDecoratorFactory(myWebSocketHandler);
}
}
创建redis消息处理类
/**
* 处理订阅redis的消息
*/
@Component
public class RedisReceiver {
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
/**
* 处理一对一消息
* @param message 消息队列中的消息
*/
public void sendMsg(String message) {
SendMsg msg = JSONObject.parseObject(message, SendMsg.class);
simpMessagingTemplate.convertAndSendToUser(msg.getToUid(), "msg", msg);
}
/**
* 处理广播消息
* @param message
*/
public void sendAllMsg(String message){
simpMessagingTemplate.convertAndSend("/topic/sys", message);
}
}
redis消息监听配置
@Configuration
public class RedisMessageListenerConfig {
@Autowired
private RedisReceiver redisReceiver;
/**
* 监听redis中的订阅信息
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
//添加redis消息队列监听,监听im-topic消息主题的消息,使用messageListenerAdapter()中设置的类和方法处理消息。
redisMessageListenerContainer.addMessageListener(messageListenerAdapter(), new PatternTopic("im-topic"));
//同上一样
redisMessageListenerContainer.addMessageListener(messageAllListenerAdapter(), new PatternTopic("sys-topic"));
return redisMessageListenerContainer;
}
/**
* 添加订阅消息处理类,通过反射获取处理类中的处理方法
* 即使用RedisReceiver类中的sendMsg方法处理消息
* @return
*/
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(redisReceiver, "sendMsg");
}
@Bean
public MessageListenerAdapter messageAllListenerAdapter(){
return new MessageListenerAdapter(redisReceiver, "sendAllMsg");
}
}
创建controller
@RestController
public class ImController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* 发送消息,一对一
* Principal为连接websocket校验时返回的,可以直接在参数中使用
*
* @param msg
* @param principal
* @return
*/
@MessageMapping("/send2user")
public String send2user(@Validated SendMsg msg, Principal principal) {
String uid = principal.getName();
//当前发送信息的uid
msg.setUid(uid);
System.out.println(uid + ":" + msg);
//获取在线的用户列表
Set<String> onlineUsers = stringRedisTemplate.opsForSet().members("online");
//判断发送的用户是否在线
if (onlineUsers.contains(msg.getToUid())) {
//如果用户在线,则将消息发送到redis消息队列im-topic主题中,所有连接同一个redis的应用并订阅im-topic主题都会收到这条消息。
//然后都使用SimpMessagingTemplate发送消息到指定的订阅中
//接收消息发送消息的类 RedisReceiver
stringRedisTemplate.convertAndSend("im-topic", JSONObject.toJSONString(msg));
} else {
//用户不在线,保存消息记录,用户上线后拉取,这里不做实现
}
return "success";
}
/**
* 发送消息,发送给所有订阅/topic/sys的用户
* 广播消息只发送给在线的用户
* @param msg
* @return
*/
@GetMapping("/sendAll")
public String sendAll(String msg){
System.out.println("广播消息:" + msg);
//将消息发布到redis sys-topic主题中
stringRedisTemplate.convertAndSend("sys-topic", msg);
return "success";
}
}
通过nginx配置集群
upstream websocket {
server 192.168.211.100:8080;
server 192.168.211.100:8081;
}
server {
listen 80;
server_name 192.168.211.100;
location / {
proxy_pass http://websocket;
//websocket集群需要配置以下两个参数,否在会连接失败
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
}
}
修改html
//修改连接为nginx配置的路径
let socket = new SockJS('http://192.168.211.100/im/conn?uid=' + uid);
测试 开启两个不同端口的websocket应用,与nginx配置的集群相同。打开两个页面,连接到不同端口的websocket应用,通过控制台查看连接。两个用户发送消息,在不同应用中的用户依然能收到消息
阅读原文有项目链接
文章转载自dean技术分享,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




