
HTML5规范在传统的web交互基础上为我们带来了众多的新特性,随着web技术被广泛用于web APP的开发,这些新特性得以推广和使用,而websocket作为一种新的web通信技术具有巨大意义。WebSocket是HTML5新增的协议,它的目的是在浏览器和服务器之间建立一个不受限的双向通信的通道,比如说,服务器可以在任意时刻发送消息给浏览器。支持双向通信。




1、支持双向通信,实时性更强。 2、更好的二进制支持。 3、较少的控制开销。连接创建后,ws客户端、服务端进行数据交换时,协议控制的数据包头部较小。在不包含头部的情况下,服务端到客户端的包头只有2~10字节(取决于数据包长度),客户端到服务端的的话,需要加上额外的4字节的掩码。而HTTP协议每次通信都需要携带完整的头部。 4、支持扩展。ws协议定义了扩展,用户可以扩展协议,或者实现自定义的子协议。(比如支持自定义压缩算法等) 5、建立在tcp协议之上,服务端实现比较容易 6、数据格式比较轻量,性能开销小,通信效率高 7、和http协议有着良好的兼容性,默认端口是80和443,并且握手阶段采用HTTP协议,因此握手的时候不容易屏蔽,能通过各种的HTTP代理


在使用websocket过程中,可能会出现网络断开的情况,比如信号不好,或者网络临时性关闭,这时候websocket的连接已经断开,而浏览器不会执行websocket 的 onclose方法,我们无法知道是否断开连接,也就无法进行重连操作。如果当前发送websocket数据到后端,一旦请求超时,onclose便会执行,这时候便可进行绑定好的重连操作。


五、在后端Spring Boot 和前端VUE中如何建立通信

1、在Spring Boot 中 pom.xml中添加 websocket依赖



2、创建 WebSocketConfig.java 开启websocket支持

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.web.socket.server.standard.ServerEndpointExporter;


* 开启WebSocket支持




public class WebSocketConfig {


public ServerEndpointExporter serverEndpointExporter() {

return new ServerEndpointExporter();



3、创建 WebSocketServer.java 链接

package com.mes.dispatch.socket;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;

import org.springframework.stereotype.Component;

import javax.websocket.*;

import javax.websocket.server.PathParam;

import javax.websocket.server.ServerEndpoint;

import java.io.IOException;

import java.util.HashMap;

import java.util.Iterator;

import java.util.concurrent.ConcurrentHashMap;

/** @Author: best_liu

* @Description:WebSocket服务

* @Date: 13:05 2023/8/31

* @Param

* @return





public class WebSocketServer {


* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。


private static int onlineCount = 0;


* concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。


private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>();


* 与某个客户端的连接会话,需要通过它来给客户端发送数据


private Session session;


* 接收userId


private String userId = "";


* 连接建立成功调用的方法



public void onOpen(Session session, @PathParam("userId") String userId) {

this.session = session;

this.userId = userId;

if (webSocketMap.containsKey(userId)) {



} else {

webSocketMap.put(userId, this);





log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());

try {

HashMap map = new HashMap<>();

map.put("key", "连接成功");


} catch (IOException e) {

log.error("用户:" + userId + ",网络异常!!!!!!");




* 连接关闭调用的方法



public void onClose() {

if (webSocketMap.containsKey(userId)) {





log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());



* 收到客户端消息后调用的方法


* @param message 客户端发送过来的消息



public void onMessage(String message, Session session) {

log.info("用户消息:" + userId + ",报文:" + message);



if (StringUtils.isNotBlank(message)) {

try {


JSONObject jsonObject = JSONObject.parseObject(message);


jsonObject.put("fromUserId", this.userId);

String fromUserId = jsonObject.getString("fromUserId");


if (StringUtils.isNotBlank(fromUserId) && webSocketMap.containsKey(fromUserId)) {



// DeviceLocalThread.paramData.put(jsonObject.getString("group"),jsonObject.toJSONString());

} else {

log.error("请求的userId:" + fromUserId + "不在该服务器上");



} catch (Exception e) {






* 发生错误时候


* @param session

* @param error



public void onError(Session session, Throwable error) {

log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());




* 实现服务器主动推送


public void sendMessage(String message) throws IOException {


synchronized (session) {

try {



} catch (IOException e) {

log.error("服务器推送失败:" + e.getMessage());




/** @Author: best_liu

* @Description:发送自定义消息

* @Date: 13:01 2023/8/31

* @Param [message, toUserId]

* @return void


public static void sendInfo(String message, String toUserId) throws IOException {


if (StringUtils.isEmpty(toUserId)) {


Iterator itera = webSocketMap.keySet().iterator();

while (itera.hasNext()) {

String keys = itera.next();

WebSocketServer item = webSocketMap.get(keys);





else if (webSocketMap.containsKey(toUserId)) {

WebSocketServer item = webSocketMap.get(toUserId);


} else {

log.error("请求的userId:" + toUserId + "不在该服务器上");



public static synchronized int getOnlineCount() {

return onlineCount;


public static synchronized void addOnlineCount() {



public static synchronized void subOnlineCount() {



public static synchronized ConcurrentHashMap getWebSocketMap() {

return WebSocketServer.webSocketMap;



4、创建一个测试调用websocket发送消息 TimerSocketMessage.java (用定时器发送推送消息

import org.springframework.scheduling.annotation.EnableScheduling;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.stereotype.Component;

import java.util.HashMap;

import java.util.Map;



public class TimerSocketMessage {


* 推送消息到前台


@Scheduled(cron = "*/5 * * * * * ")

public void SocketMessage(){

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

Map maps = new HashMap<>();

maps.put("type", "sendMessage");

maps.put("data", sdf.format(new Date()));




5、在VUE中创建和后端 websocket服务的连接并建立心跳机制。


 7、vue文件连接websocket的url地址要拼接 context-path: /demo



Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005

Caused by: java.lang.IllegalArgumentException: WebSocket close status code does NOT comply with RFC-6455: 1005


spring cloud gateway 转发websocket请求无法监听到 close 事件 没有收到预期的状态码




package com.mes.gateway.filter;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.ObjectProvider;

import org.springframework.cloud.gateway.filter.GatewayFilterChain;

import org.springframework.cloud.gateway.filter.GlobalFilter;

import org.springframework.cloud.gateway.filter.headers.HttpHeadersFilter;

import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;

import org.springframework.core.Ordered;

import org.springframework.http.HttpHeaders;

import org.springframework.stereotype.Component;

import org.springframework.util.StringUtils;

import org.springframework.web.reactive.socket.CloseStatus;

import org.springframework.web.reactive.socket.WebSocketHandler;

import org.springframework.web.reactive.socket.WebSocketMessage;

import org.springframework.web.reactive.socket.WebSocketSession;

import org.springframework.web.reactive.socket.client.WebSocketClient;

import org.springframework.web.reactive.socket.server.WebSocketService;

import org.springframework.web.server.ServerWebExchange;

import org.springframework.web.util.UriComponentsBuilder;

import reactor.core.publisher.Mono;

import java.net.URI;

import java.util.*;


* @Author: best_liu

* @Description:解决websocket关闭异常 问题

* @Desc websocket客户端主动断开连接,网关服务报错1005

* @Date Create in 11:15 2023/10/25

* @Modified By:



public class CustomWebsocketRoutingFilter implements GlobalFilter, Ordered {

private static final Logger log = LoggerFactory.getLogger(AuthFilter.class);

//Sec-Websocket protocol

public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";

//Sec-Websocket header

public static final String SEC_WEBSOCKET_HEADER = "sec-websocket";

//http header schema

public static final String HEADER_UPGRADE_WebSocket = "websocket";

public static final String HEADER_UPGRADE_HTTP = "http";

public static final String HEADER_UPGRADE_HTTPS = "https";

private final WebSocketClient webSocketClient;

private final WebSocketService webSocketService;

private final ObjectProvider> headersFiltersProvider;

// 不直接使用 headersFilters 用该变量代替

private volatile List headersFilters;

public CustomWebsocketRoutingFilter(WebSocketClient webSocketClient, WebSocketService webSocketService, ObjectProvider> headersFiltersProvider) {

this.webSocketClient = webSocketClient;

this.webSocketService = webSocketService;

this.headersFiltersProvider = headersFiltersProvider;


/* for testing */


static String convertHttpToWs(String scheme) {

scheme = scheme.toLowerCase();

return "http".equals(scheme) ? "ws" : "https".equals(scheme) ? "wss" : scheme;



public int getOrder() {

// Before NettyRoutingFilter since this routes certain http requests

//修改了这里 之前是-1 降低优先级

return Ordered.LOWEST_PRECEDENCE - 2;



public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {


URI requestUrl = exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);

String scheme = requestUrl.getScheme();

if (ServerWebExchangeUtils.isAlreadyRouted(exchange) || (!"ws".equals(scheme) && !"wss".equals(scheme))) {

return chain.filter(exchange);



HttpHeaders headers = exchange.getRequest().getHeaders();

HttpHeaders filtered = HttpHeadersFilter.filterRequest(getHeadersFilters(), exchange);

List protocols = getProtocols(headers);

return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols));


/* for testing */


List getProtocols(HttpHeaders headers) {

List protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);

if (protocols != null) {

ArrayList updatedProtocols = new ArrayList<>();

for (int i = 0; i < protocols.size(); i++) {

String protocol = protocols.get(i);

updatedProtocols.addAll(Arrays.asList(StringUtils.tokenizeToStringArray(protocol, ",")));


protocols = updatedProtocols;


return protocols;


/* for testing */

List getHeadersFilters() {

if (this.headersFilters == null) {

this.headersFilters = this.headersFiltersProvider.getIfAvailable(ArrayList::new);

// remove host header unless specifically asked not to

this.headersFilters.add((headers, exchange) -> {

HttpHeaders filtered = new HttpHeaders();



boolean preserveHost = exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false);

if (preserveHost) {

String host = exchange.getRequest().getHeaders().getFirst(HttpHeaders.HOST);

filtered.add(HttpHeaders.HOST, host);


return filtered;


this.headersFilters.add((headers, exchange) -> {

HttpHeaders filtered = new HttpHeaders();

for (Map.Entry> entry : headers.entrySet()) {

if (!entry.getKey().toLowerCase().startsWith(SEC_WEBSOCKET_HEADER)) {

filtered.addAll(entry.getKey(), entry.getValue());



return filtered;



return this.headersFilters;


static void changeSchemeIfIsWebSocketUpgrade(ServerWebExchange exchange) {

// 检查版本是否适合

URI requestUrl = exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);

String scheme = requestUrl.getScheme().toLowerCase();

String upgrade = exchange.getRequest().getHeaders().getUpgrade();

// change the scheme if the socket client send a "http" or "https"

if (HEADER_UPGRADE_WebSocket.equalsIgnoreCase(upgrade) && (HEADER_UPGRADE_HTTP.equals(scheme) || HEADER_UPGRADE_HTTPS.equals(scheme))) {

String wsScheme = convertHttpToWs(scheme);

boolean encoded = ServerWebExchangeUtils.containsEncodedParts(requestUrl);

URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build(encoded).toUri();

exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);

if (log.isTraceEnabled()) {

log.trace("changeSchemeTo:[" + wsRequestUrl + "]");





private static class ProxyWebSocketHandler implements WebSocketHandler {

private final WebSocketClient client;

private final URI url;

private final HttpHeaders headers;

private final List subProtocols;

ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers, List protocols) {

this.client = client;

this.url = url;

this.headers = headers;

if (protocols != null) {

this.subProtocols = protocols;

} else {

this.subProtocols = Collections.emptyList();




public List getSubProtocols() {

return this.subProtocols;



public Mono handle(WebSocketSession session) {

return this.client.execute(this.url, this.headers, new WebSocketHandler() {

private CloseStatus adaptCloseStatus(CloseStatus closeStatus) {

int code = closeStatus.getCode();

if (code > 2999 && code < 5000) {

return closeStatus;


switch (code) {

case 1000:


return closeStatus;

case 1001:


return closeStatus;

case 1002:


return closeStatus;

case 1003:


return closeStatus;

case 1004:

// 预留关闭状态码

return CloseStatus.PROTOCOL_ERROR;

case 1005:

// 预留关闭状态码 期望收到状态码但是没有收到

return CloseStatus.PROTOCOL_ERROR;

case 1006:

// 预留关闭状态码 连接异常关闭

return CloseStatus.PROTOCOL_ERROR;

case 1007:


return closeStatus;

case 1008:


return closeStatus;

case 1009:


return closeStatus;

case 1010:


return closeStatus;

case 1011:


return closeStatus;

case 1012:

// Not in RFC6455

// return CloseStatus.SERVICE_RESTARTED;

return CloseStatus.PROTOCOL_ERROR;

case 1013:

// Not in RFC6455

// return CloseStatus.SERVICE_OVERLOAD;

return CloseStatus.PROTOCOL_ERROR;

case 1015:

// 不能进行TLS握手 如:server证书不能验证

return CloseStatus.PROTOCOL_ERROR;


return CloseStatus.PROTOCOL_ERROR;




* send 发送传出消息

* receive 处理入站消息流

* doOnNext 对每条消息做什么

* zip 加入流

* then 返回接收完成时完成的Mono



public Mono handle(WebSocketSession proxySession) {

Mono serverClose = proxySession.closeStatus().filter(__ -> session.isOpen())



Mono proxyClose = session.closeStatus().filter(__ -> proxySession.isOpen())



// Use retain() for Reactor Netty

Mono proxySessionSend = proxySession


Mono serverSessionSend = session


// Ensure closeStatus from one propagates to the other

Mono.when(serverClose, proxyClose).subscribe();

// Complete when both sessions are done

return Mono.zip(proxySessionSend, serverSessionSend).then();



public List getSubProtocols() {

return CustomWebsocketRoutingFilter.ProxyWebSocketHandler.this.subProtocols;








import com.mes.process.utils.RedisUtil;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

import java.util.List;

/** @Author: best_liu

* @Description:WebSocket消息推送redis工具类

* @Date: 13:50 2023/11/28

* @Param

* @return




public class WebSocketRedisUtil {


* 功能描述:将JavaBean对象的信息缓存进Redis


* @param message 信息JavaBean

* @return 是否保存成功


public static boolean saveCacheChatMessage(String key, String message) {


if (RedisUtil.hasKey(key)) {


long redisSize = RedisUtil.lGetListSize(key);

System.out.println("redis当前数据条数" + redisSize);

Long index = RedisUtil.rightPushValue(key, message);

System.out.println("redis执行rightPushList返回值:" + index);

return redisSize

} else {


// JSONArray jsonArray=new JSONArray();

// jsonArray.add(message);

boolean isCache = RedisUtil.lSet(key, message);

//保存成功,设置过期时间 暂不设置失效时间

if (isCache) {

// RedisUtil.expire(key, 3L, TimeUnit.DAYS);



return isCache;




* 功能描述:从缓存中读取信息


* @param key 缓存信息的键

* @return 缓存中信息list


public static List getCacheChatMessage(String key) {

List chatList = null;


if (RedisUtil.hasKey(key)) {

chatList = RedisUtil.getOpsForList(key);

} else {

log.info("redis缓存中无此键值:" + key);


return chatList;



* 功能描述: 在缓存中删除信息


* @param key 缓存信息的键


public static void deleteCacheChatMessage(String key) {


if (RedisUtil.hasKey(key)) {






import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.stereotype.Component;

import java.util.Arrays;

import java.util.List;

import java.util.concurrent.TimeUnit;

/** @Author: best_liu

* @Description:

* @Date: 13:50 2023/11/28

* @Param

* @return




public class RedisUtil {

private static RedisTemplate redisTemplate;


public void setRedisTemplate(RedisTemplate redisTemplate) {

RedisUtil.redisTemplate = redisTemplate;




* 指定缓存失效时间


* @param key 键

* @param time 时间(秒)

* @return


public static boolean expire(String key, long time) {

try {

if (time > 0) {

redisTemplate.expire(key, time, TimeUnit.SECONDS);


return true;

} catch (Exception e) {

log.error("设置redis指定key失效时间错误:", e);

return false;




* 根据key 获取过期时间


* @param key 键 不能为null

* @return 时间(秒) 返回0代表为永久有效 失效时间为负数,说明该主键未设置失效时间(失效时间默认为-1)


public static Long getExpire(String key) {

return redisTemplate.getExpire(key, TimeUnit.SECONDS);



* 判断key是否存在


* @param key 键

* @return true 存在 false 不存在


public static Boolean hasKey(String key) {

try {

return redisTemplate.hasKey(key);

} catch (Exception e) {

log.error("redis判断key是否存在错误:", e);

return false;




* 删除缓存


* @param key 可以传一个值 或多个



public static void del(String... key) {

if (key != null && key.length > 0) {

if (key.length == 1) {


} else {







* 普通缓存获取


* @param key 键

* @return 值



public static T get(String key) {

return key == null ? null : (T) redisTemplate.opsForValue().get(key);



* 普通缓存放入


* @param key 键

* @param value 值

* @return true成功 false失败


public static boolean set(String key, Object value) {

try {

redisTemplate.opsForValue().set(key, value);

return true;

} catch (Exception e) {

log.error("设置redis缓存错误:", e);

return false;




* 普通缓存放入并设置时间


* @param key 键

* @param value 值

* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期

* @return true成功 false 失败


public static boolean set(String key, Object value, long time) {

try {

if (time > 0) {

redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);

} else {

set(key, value);


return true;

} catch (Exception e) {


return false;



// ===================================自定义工具扩展===========================================


* HashGet


* @param key 键 不能为null

* @param item 项 不能为null

* @return 值


public Object hget ( String key, String item ) {

return redisTemplate.opsForHash().get(key, item);


// /**

// * 获取hashKey对应的所有键值

// *

// * @param key 键

// * @return 对应的多个键值

// */

// public static Map hmget (String key ) {

// return redisTemplate.opsForHash().entries(key);

// }


* 获取hashKey对应的所有键值


* @param key 键

* @return 对应的多个键值

// */

// public static List hmget (String key ) {

// return redisTemplate.opsForList().ge;

// }


* 获取list缓存的长度


* @param key 键

* @return


public static long lGetListSize ( String key ) {

try {

return redisTemplate.opsForList().size(key);

} catch (Exception e) {


return 0;




* 功能描述:在list的右边添加元素

* 如果键不存在,则在执行推送操作之前将其创建为空列表


* @param key 键

* @return value 值

* @author RenShiWei

* Date: 2020/2/6 23:22


public static Long rightPushValue ( String key, Object value ) {

return redisTemplate.opsForList().rightPush(key, value);



* 功能描述:获取缓存中所有的List key


* @param key 键


public static List getOpsForList ( String key) {

return redisTemplate.opsForList().range(key, 0, redisTemplate.opsForList().size(key));



* 将list放入缓存


* @param key 键

* @param value 值

* @return


public static boolean lSet ( String key, Object value ) {

try {

redisTemplate.opsForList().rightPush(key, value);

return true;

} catch (Exception e) {


return false;




* 将list放入缓存


* @param key 键

* @param value 值

* @return


public boolean lSet ( String key, List value ) {

try {

redisTemplate.opsForList().rightPushAll(key, value);

return true;

} catch (Exception e) {


return false;







* 查询是否有离线消息并推送



public void cacheMessageContains(String userId){


String user = "socket-"+userId.split("-")[0];

List Strings = WebSocketRedisUtil.getCacheChatMessage(user);

if (Strings!=null) {


List list = Strings;

if (list == null) {



list.forEach(message -> {


WebSocketServer item = webSocketMap.get(userId);

try {


} catch (IOException e) {





list = null;






* 暂存离线消息



public static void cacheMessagePut(String userId, String message){

// //把新消息添加到消息列表

if (!StringUtils.isEmpty(message)){

boolean isCache = WebSocketRedisUtil.saveCacheChatMessage("socket-"+userId, message);

if (isCache){

log.info("消息暂存成功" + message);


log.error("消息暂存失败" + message);







