什么是分布式WebSocket?

是指在分布式系统架构中实现WebSocket的通信机制,它允许在不同的服务器节点之间共享和同步WebSocket会话状态,从而实现跨多个服务器的实时消息传递。

在分布式环境中实现WebSocket的挑战主要包括以下几点:

会话共享:在分布式系统中,用户的WebSocket连接可能与不同的服务器建立,这就要求系统能够在不同服务器间共享WebSocket会话信息,以便消息能够被正确地传递到所有相关的客户端。负载均衡:使用负载均衡可以提高系统的可用性和伸缩性。但是,当WebSocket请求在服务器之间负载均衡时,需要确保客户端可以与正确的服务器建立连接,并且能够接收到所有的消息。故障转移:在出现服务器故障时,系统需要能够将WebSocket会话无缝迁移到其他健康的服务器上,以保证服务的连续性。一致性:确保所有用户在任何时候看到的都是一致的消息状态,这对于实时通信非常重要。

为了解决这些挑战,可以采取以下几种策略:

使用消息代理:通过引入一个中心化的消息代理(如RabbitMQ、Redis Pub/Sub等),可以让所有的服务器都连接到这个消息代理。当一个服务器需要发送消息时,它将消息发送到消息代理,然后由消息代理负责将消息分发到所有连接的客户端。这样可以确保消息的一致性和可靠性。共享会话存储:使用一个共享的会话存储(如数据库或内存数据网格)来保存WebSocket会话的状态。这样,即使客户端最初连接到的服务器发生故障,其他服务器也可以接管会话并继续处理消息。基于路由的负载均衡:使用智能负载均衡器(如Nginx、HAProxy等),它们可以根据特定的路由规则(如会话ID或用户ID)将WebSocket连接定向到特定的服务器。服务发现:在微服务架构中,可以使用服务发现机制来动态地找到负责特定会话的服务器,并将消息路由到那里。WebSocket代理:使用专门的WebSocket代理服务器,它可以在多个后端服务器之间代理WebSocket连接,并确保消息的传递和会话的同步。应用层协议:设计应用层协议来处理分布式WebSocket的复杂性,例如通过引入心跳机制来检测连接的健康状况,并通过预定的协议来同步会话状态。

总的来说,在实践中,可能需要结合多种策略来构建一个健壮的分布式WebSocket解决方案,以满足不同场景下的需求。此外,还需要考虑安全性、性能和可扩展性等因素,以确保系统的稳定性和可靠性。

温故而知新:单点WebSocket实现

SpringBoot2.0集成WebSocket,实现后台向前端推送信息_springboot集成websocket-CSDN博客https://zhengkai.blog.csdn.net/article/details/80275084

简单版本:在Java中使用Redis实现WebSocket

要在Java中使用Redis实现WebSocket,你需要使用一个支持WebSocket的Java Web框架,如Spring Boot,以及一个支持Redis的Java库,如Jedis。以下是一个简单的示例:

添加依赖项到你的pom.xml文件

org.springframework.boot

spring-boot-starter-websocket

org.springframework.boot

spring-boot-starter-data-redis

创建一个WebSocket配置类

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.listener.ChannelTopic;

import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;

import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

//by zhengkai.blog.csdn.net

@Configuration

@EnableWebSocketMessageBroker

public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override

public void registerStompEndpoints(StompEndpointRegistry registry) {

registry.addEndpoint("/websocket").withSockJS();

}

@Override

public void configureMessageBroker(org.springframework.messaging.simp.config.MessageBrokerRegistry registry) {

registry.enableSimpleBroker("/topic");

registry.setApplicationDestinationPrefixes("/app");

}

@Bean

public JedisConnectionFactory jedisConnectionFactory() {

return new JedisConnectionFactory();

}

@Bean

public RedisTemplate redisTemplate() {

RedisTemplate template = new RedisTemplate<>();

template.setConnectionFactory(jedisConnectionFactory());

return template;

}

@Bean

public MessageListenerAdapter messageListenerAdapter() {

return new MessageListenerAdapter(new RedisMessageListener());

}

@Bean

public RedisMessageListenerContainer redisMessageListenerContainer() {

RedisMessageListenerContainer container = new RedisMessageListenerContainer();

container.setConnectionFactory(jedisConnectionFactory());

container.addMessageListener(messageListenerAdapter(), topic());

return container;

}

@Bean

public ChannelTopic topic() {

return new ChannelTopic("websocket-topic");

}

}

创建一个WebSocket消息监听器

import org.springframework.data.redis.connection.Message;

import org.springframework.data.redis.connection.MessageListener;

import org.springframework.stereotype.Component;

@Component

public class RedisMessageListener implements MessageListener {

@Override

public void onMessage(Message message, byte[] pattern) {

System.out.println("Received message: " + message);

}

}

发送消息到WebSocket客户端

在你的控制器中,你可以使用SimpMessagingTemplate来发送消息到WebSocket客户端:

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.messaging.simp.SimpMessagingTemplate;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RestController;

@RestController

public class WebSocketController {

@Autowired

private SimpMessagingTemplate messagingTemplate;

@GetMapping("/send")

public String sendMessage() {

messagingTemplate.convertAndSend("/topic/websocket-topic", "Hello from Redis!");

return "Message sent!";

}

}

正式版本:用SpringBoot+Redis实现分布式WebSocket

 

将消息(<用户id,消息内容>)统一推送到一个消息队列(Redis、Kafka等)的的topic,然后每个应用节点都订阅这个topic,在接收到WebSocket消息后取出这个消息的“消息接收者的用户ID/用户名”,然后再比对自身是否存在相应用户的连接,如果存在则推送消息,否则丢弃接收到的这个消息(这个消息接收者所在的应用节点会处理)在用户建立WebSocket连接后,使用Redis缓存记录用户的WebSocket建立在哪个应用节点上,然后同样使用消息队列将消息推送到接收者所在的应用节点上面(实现上比方案一要复杂,但是网络流量会更低)

 

 1. 定义一个WebSocket Channel枚举类

public enum WebSocketChannelEnum {

//测试使用的简易点对点聊天

CHAT("CHAT", "测试使用的简易点对点聊天", "/topic/reply");

WebSocketChannelEnum(String code, String description, String subscribeUrl) {

this.code = code;

this.description = description;

this.subscribeUrl = subscribeUrl;

}

/**

* 唯一CODE

*/

private String code;

/**

* 描述

*/

private String description;

/**

* WebSocket客户端订阅的URL

*/

private String subscribeUrl;

public String getCode() {

return code;

}

public String getDescription() {

return description;

}

public String getSubscribeUrl() {

return subscribeUrl;

}

/**

* 通过CODE查找枚举类

*/

public static WebSocketChannelEnum fromCode(String code){

if(StringUtils.isNoneBlank(code)){

for(WebSocketChannelEnum channelEnum : values()){

if(channelEnum.code.equals(code)){

return channelEnum;

}

}

}

return null;

}

}

2. 配置基于Redis的消息队列

需要注意的是,在大中型正式项目中并不推荐使用Redis实现的消息队列,因为经过测试它并不是特别可靠,所以应该考虑使用Kafka、rabbitMQ等专业的消息队列中间件

@Configuration

@ConditionalOnClass({JedisCluster.class})

public class RedisConfig {

@Value("${spring.redis.timeout}")

private String timeOut;

@Value("${spring.redis.cluster.nodes}")

private String nodes;

@Value("${spring.redis.cluster.max-redirects}")

private int maxRedirects;

@Value("${spring.redis.jedis.pool.max-active}")

private int maxActive;

@Value("${spring.redis.jedis.pool.max-wait}")

private int maxWait;

@Value("${spring.redis.jedis.pool.max-idle}")

private int maxIdle;

@Value("${spring.redis.jedis.pool.min-idle}")

private int minIdle;

@Value("${spring.redis.message.topic-name}")

private String topicName;

@Bean

public JedisPoolConfig jedisPoolConfig(){

JedisPoolConfig config = new JedisPoolConfig();

config.setMaxTotal(maxActive);

config.setMaxIdle(maxIdle);

config.setMinIdle(minIdle);

config.setMaxWaitMillis(maxWait);

return config;

}

@Bean

public RedisClusterConfiguration redisClusterConfiguration(){

RedisClusterConfiguration configuration = new RedisClusterConfiguration(Arrays.asList(nodes));

configuration.setMaxRedirects(maxRedirects);

return configuration;

}

/**

* JedisConnectionFactory

*/

@Bean

public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration configuration,JedisPoolConfig jedisPoolConfig){

return new JedisConnectionFactory(configuration,jedisPoolConfig);

}

/**

* 使用Jackson序列化对象

*/

@Bean

public Jackson2JsonRedisSerializer jackson2JsonRedisSerializer(){

Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);

ObjectMapper objectMapper = new ObjectMapper();

objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

serializer.setObjectMapper(objectMapper);

return serializer;

}

/**

* RedisTemplate

*/

@Bean

public RedisTemplate redisTemplate(JedisConnectionFactory factory, Jackson2JsonRedisSerializer jackson2JsonRedisSerializer){

RedisTemplate redisTemplate = new RedisTemplate<>();

redisTemplate.setConnectionFactory(factory);

//字符串方式序列化KEY

StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

redisTemplate.setKeySerializer(stringRedisSerializer);

redisTemplate.setHashKeySerializer(stringRedisSerializer);

//JSON方式序列化VALUE

redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);

redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

redisTemplate.afterPropertiesSet();

return redisTemplate;

}

/**

* 消息监听器

*/

@Bean

MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer jackson2JsonRedisSerializer){

//消息接收者以及对应的默认处理方法

MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage");

//消息的反序列化方式

messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer);

return messageListenerAdapter;

}

/**

* message listener container

*/

@Bean

RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory

, MessageListenerAdapter messageListenerAdapter){

RedisMessageListenerContainer container = new RedisMessageListenerContainer();

container.setConnectionFactory(connectionFactory);

//添加消息监听器

container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName));

return container;

}

}

这里使用的配置:

spring:

...

#redis

redis:

cluster:

nodes: namenode22:6379,datanode23:6379,datanode24:6379

max-redirects: 6

timeout: 300000

jedis:

pool:

max-active: 8

max-wait: 100000

max-idle: 8

min-idle: 0

#自定义的监听的TOPIC路径

message:

topic-name: topic-test

3. 定义一个Redis消息的处理者

@Component

public class MessageReceiver {

private final Logger logger = LoggerFactory.getLogger(getClass());

@Autowired

private SimpMessagingTemplate messagingTemplate;

@Autowired

private SimpUserRegistry userRegistry;

/**

* 处理WebSocket消息

*/

public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) {

logger.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg));

//1. 取出用户名并判断是否连接到当前应用节点的WebSocket

SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver());

if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){

//2. 获取WebSocket客户端的订阅地址

WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode());

if(channelEnum != null){

//3. 给WebSocket客户端发送消息

messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent());

}

}

}

}

4. 在Controller中发送WebSocket消息

@Controller

@RequestMapping(("/wsTemplate"))

public class RedisMessageController {

private final Logger logger = LoggerFactory.getLogger(getClass());

@Value("${spring.redis.message.topic-name}")

private String topicName;

@Autowired

private SimpMessagingTemplate messagingTemplate;

@Autowired

private SimpUserRegistry userRegistry;

@Resource(name = "redisServiceImpl")

private RedisService redisService;

/**

* 给指定用户发送WebSocket消息

*/

@PostMapping("/sendToUser")

@ResponseBody

public String chat(HttpServletRequest request) {

//消息接收者

String receiver = request.getParameter("receiver");

//消息内容

String msg = request.getParameter("msg");

HttpSession session = SpringContextUtils.getSession();

User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));

this.sendToUser(loginUser.getUsername(), receiver, WebSocketChannelEnum.CHAT.getSubscribeUrl(), JsonUtils.toJson(resultData));

return "ok";

}

/**

* 给指定用户发送消息,并处理接收者不在线的情况

* @param sender 消息发送者

* @param receiver 消息接收者

* @param destination 目的地

* @param payload 消息正文

*/

private void sendToUser(String sender, String receiver, String destination, String payload){

SimpUser simpUser = userRegistry.getUser(receiver);

//如果接收者存在,则发送消息

if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){

messagingTemplate.convertAndSendToUser(receiver, destination, payload);

}

//如果接收者在线,则说明接收者连接了集群的其他节点,需要通知接收者连接的那个节点发送消息

else if(redisService.isSetMember(Constants.REDIS_WEBSOCKET_USER_SET, receiver)){

RedisWebsocketMsg redisWebsocketMsg = new RedisWebsocketMsg<>(receiver, WebSocketChannelEnum.CHAT.getCode(), payload);

redisService.convertAndSend(topicName, redisWebsocketMsg);

}

//否则将消息存储到redis,等用户上线后主动拉取未读消息

else{

//存储消息的Redis列表名

String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;

logger.info(MessageFormat.format("消息接收者{0}还未建立WebSocket连接,{1}发送的消息【{2}】将被存储到Redis的【{3}】列表中", receiver, sender, payload, listKey));

//存储消息到Redis中

redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload);

}

}

/**

* 拉取指定监听路径的未读的WebSocket消息

* @param destination 指定监听路径

* @return java.util.Map

*/

@PostMapping("/pullUnreadMessage")

@ResponseBody

public Map pullUnreadMessage(String destination){

Map result = new HashMap<>();

try {

HttpSession session = SpringContextUtils.getSession();

//当前登录用户

User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

//存储消息的Redis列表名

String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;

//从Redis中拉取所有未读消息

List messageList = redisService.rangeList(listKey, 0, -1);

result.put("code", "200");

if(messageList !=null && messageList.size() > 0){

//删除Redis中的这个未读消息列表

redisService.delete(listKey);

//将数据添加到返回集,供前台页面展示

result.put("result", messageList);

}

}catch (Exception e){

result.put("code", "500");

result.put("msg", e.getMessage());

}

return result;

}

}

5. WebSocket相关配置

@Configuration

@EnableWebSocketMessageBroker

public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{

@Autowired

private AuthHandshakeInterceptor authHandshakeInterceptor;

@Autowired

private MyHandshakeHandler myHandshakeHandler;

@Autowired

private MyChannelInterceptor myChannelInterceptor;

@Override

public void registerStompEndpoints(StompEndpointRegistry registry) {

registry.addEndpoint("/chat-websocket")

.addInterceptors(authHandshakeInterceptor)

.setHandshakeHandler(myHandshakeHandler)

.withSockJS();

}

@Override

public void configureMessageBroker(MessageBrokerRegistry registry) {

//客户端需要把消息发送到/message/xxx地址

registry.setApplicationDestinationPrefixes("/message");

//服务端广播消息的路径前缀,客户端需要相应订阅/topic/yyy这个地址的消息

registry.enableSimpleBroker("/topic");

//给指定用户发送消息的路径前缀,默认值是/user/

registry.setUserDestinationPrefix("/user/");

}

@Override

public void configureClientInboundChannel(ChannelRegistration registration) {

registration.interceptors(myChannelInterceptor);

}

}

6. 示例页面

Chat With STOMP Message

Seems your browser doesn't support Javascript! Websockets rely on Javascript being

enabled. Please enable

Javascript and reload this page!

Chat With STOMP Message

好文阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 

发表评论

返回顶部暗黑模式