1:pom导入依赖

org.springframework.boot

spring-boot-starter-websocket

2.2.7.RELEASE

2:myWebSocketClient自定义webSocket客户端

package com.example.poi.utils;

import org.springframework.stereotype.Component;

import javax.websocket.*;

import java.io.IOException;

/**

* @Author xu

* @create 2023/9/11 18

*/

@ClientEndpoint

@Component

public class MyWebSocketClient {

public Session session;

@OnOpen

public void onOpen(Session session) {

this.session = session;

System.out.println("WebSocket2连接已打开");

}

@OnMessage

public void onMessage(String message) {

System.out.println("收到消息2:" + message);

}

@OnClose

public void onClose() {

System.out.println("客户端关闭2");

}

@OnError

public void onError(Throwable throwable) {

System.err.println("发生错误2:" + throwable.getMessage());

}

public void sendMessage(String message) throws IOException {

session.getBasicRemote().sendText(message);

}

}

3:WebSocketServer自定义webSocket服务端

package com.example.poi.utils;

import cn.hutool.json.JSONObject;

import cn.hutool.log.Log;

import cn.hutool.log.LogFactory;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang.ObjectUtils;

import org.springframework.context.annotation.Bean;

import org.springframework.stereotype.Component;

import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import javax.websocket.*;

import javax.websocket.server.PathParam;

import javax.websocket.server.ServerEndpoint;

import java.io.IOException;

import java.util.concurrent.CopyOnWriteArraySet;

/**

* @Author xu

* @create 2023/7/21 19

*/

@ServerEndpoint("/websocket/{sid}")

@Component

@Slf4j

public class WebSocketServer {

static Log log = LogFactory.get(WebSocketServer.class);

//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。

private static int onlineCount = 0;

//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。

private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet();

//与某个客户端的连接会话,需要通过它来给客户端发送数据

public Session session;

//接收sid

private String sid = "";

/**

* 连接建立成功调用的方法

*

* @param session

* @param sid

*/

@OnOpen

public void onOpen(Session session, @PathParam("sid") String sid) {

this.session = session;

webSocketSet.add(this); //加入set中

addOnlineCount(); //在线数加1

log.info("有新窗口开始监听:" + sid + ",当前在线人数为" + getOnlineCount());

this.sid = sid;

/*try {

sendMessage("连接成功");

} catch (IOException e) {

log.error("websocket IO异常");

}*/

}

/**

* 连接关闭调用的方法

*/

@OnClose

public void onClose() {

webSocketSet.remove(this); //从set中删除

subOnlineCount(); //在线数减1

log.info("有一连接关闭!当前在线人数为" + getOnlineCount());

}

/**

* 收到客户端消息后调用的方法

* 客户端发送过来的消息

*

* @param message

* @param session

*/

@OnMessage

public void onMessage(String message, Session session) {

log.info("收到来自窗口" + sid + "的信息:" + message);

//群发消息

for (WebSocketServer item : webSocketSet) {

if (ObjectUtils.equals(item.sid, sid)) {

try {

JSONObject jsonObject = new JSONObject();

jsonObject.put("name", sid);

item.sendMessage(jsonObject.toString());

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

/**

* @param session

* @param error

*/

@OnError

public void onError(Session session, Throwable error) {

log.error("发生错误");

error.printStackTrace();

}

/**

* 实现服务器主动推送

*

* @param message

* @throws IOException

*/

public void sendMessage(String message) throws IOException {

this.session.getBasicRemote().sendText(message);

}

/**

* 获取存在的webSocket

*/

public CopyOnWriteArraySet getWebSocketServer() {

return webSocketSet;

}

public String getSid(){

return sid;

}

public void close2(String ss){

for (WebSocketServer webSocketServer : webSocketSet) {

if (webSocketServer.sid.equals(ss)) {

webSocketSet.remove(webSocketServer);

log.info("删除了:"+ss);

}

}

subOnlineCount(); //在线数减1

log.info("有一连接关闭!当前在线人数为" + getOnlineCount());

}

/**

* 发送消息

*

* @param message

* @param sid

* @throws IOException

*/

public static void sendInfo(String message, @PathParam("sid") String sid) throws IOException {

log.info("推送消息到窗口" + sid + ",推送内容:" + message);

for (WebSocketServer item : webSocketSet) {

try {

//这里可以设定只推送给这个sid的,为null则全部推送

if (sid == null) {

item.sendMessage(message);

} else if (item.sid.equals(sid)) {

item.sendMessage(message);

}

} catch (IOException e) {

continue;

}

}

}

public static synchronized int getOnlineCount() {

return onlineCount;

}

public static synchronized void addOnlineCount() {

WebSocketServer.onlineCount++;

}

public static synchronized void subOnlineCount() {

WebSocketServer.onlineCount--;

}

/**

* 必须要有这个bean才能生效使用webSocketServer

*/

@Bean

public ServerEndpointExporter serverEndpointExporter() {

return new ServerEndpointExporter();

}

}

注意:如果在WebSocketServer 注入spring管理的容器问题,在WebSocketServer 类,自动注入Service层报错空指针异常,spring默认管理的都是单例(singleton),和 websocket (多对象)相冲突,项目启动时初始化,会初始化 websocket (非用户连接的),spring 同时会为其注入 service,该对象的 service 不是 null,被成功注入。但是,由于 spring 默认管理的是单例,所以只会注入一次 service。当新用户进入聊天时,系统又会创建一个新的 websocket 对象,这时矛盾出现了:spring 管理的都是单例,不会给第二个 websocket 对象注入 service,只要是用户连接创建的 websocket 对象,都不能再注入了。 controller,service ,dao 都是单例,所以注入时不会报 null。但是 websocket 不是单例,所以使用spring注入一次后,后面的对象就不会再注入了,会报null,在WebSocketServer 层注入如下成功:

/**static修饰,保证每一个WebSocketServer只有一个UserMapper实例*/

private static UserMapper userMapper;

@Autowired

public void setUserMapper(UserMapper userMapper) {

this.userMapper=userMapper;

}

4:controller控制层

@SneakyThrows

@GetMapping("/testSql")

public List testSql(String id) {

/** WebSocket服务器的地址*/

try {

Random random = new Random();

Integer i = random.nextInt(5) + 1;

CopyOnWriteArraySet webSocketServerSet = webSocketServer.getWebSocketServer();

for (WebSocketServer socketServer : webSocketServerSet) {

if (socketServer.getSid().equals(i.toString())) {

webSocketServer.close2(i.toString());

return null;

}

}

URI uri = new URI("ws://127.0.0.1:9088/test/websocket/" + i);

WebSocketContainer container = ContainerProvider.getWebSocketContainer();

container.connectToServer(myWebSocketClient, uri);

myWebSocketClient.sendMessage("你好" + i);

} catch (Exception e) {

throw new RuntimeException(e);

}

log.info("++++++");

return null;

}

5:注意事项:连接自动断开

webSocket连接之后,发现一个问题:就是每隔一段时间如果不传送数据的话,与前端的连接就会自动断开。采用心跳消息的方式,就可以解决这个问题。比如客服端每隔30秒自动发送ping消息给服务端,服务端返回pong

5-1:注意事项:对象无法自动注入

使用了@ServerEndpoint注解的类中使用@Resource或@Autowired注入对象都会失败,并且报空指针异常,解决方法:如下@ServerEndpoint注解的类,使用静态对象,并且对外暴露set方法,这样在对象初始化的时候,将其注入到WebSocketServer

@ServerEndpoint("/websocket/{sid}")

@Component

@Slf4j

public class WebSocketServer {

private static EntityDemoServiceImpl entityDemoService;

public static void setEntityDemoService(EntityDemoServiceImpl entityDemoService) {

WebSocketServer.entityDemoService = entityDemoService;

}

在要注入到@ServerEndpoint注解的类中,使用@PostConstruct后置注入

@Service

public class EntityDemoServiceImpl extends ServiceImpl implements IEntityDemoService {

@PostConstruct

public void init() {

WebSocketServer.setEntityDemoService(this);

}

}

5-2:注意事项:Session无法被序列化

分布式场景会存在这样一个问题,当一次请求负载到第一台服务器时,session在第一台服务器线程上,第二次请求,负载到第二台服务器上,此时通过userId查找当前用户的session时,是查找不到的。 本来想着把session存入到redis中,就可以从redis获取用户的session,希望用这种方式来解决分布式场景下消息发送的问题。这种场景可以通过发送消息中间件的方式解决。具体这样解决:每次连接时,都将userId和对应的session存入到本机,发送消息时,直接发送给MQ-Broker,然后每台应用负载都去消费这个消息,拿到消息之后,判断在本机能根据userId是否能找到session,找到则推送到客户端

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: