服务器发送事件(Server-Sent Events),简称 SSE。这是一种服务器端到客户端的单向消息推送。SSE 基于 HTTP 协议的,SSE 在服务器和客户端之间打开一个单向通道,服务端响应的不再是一次性的数据包而是text/event-stream类型的数据流信息

后端代码:

import cn.hutool.core.util.IdUtil;

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import java.util.function.Consumer;

@Slf4j

@Component

public class SseUtil {

private static final Map sseEmitterMap = new ConcurrentHashMap<>();

/**

* 创建连接

*/

public SseEmitter connect(Long userId, Consumer errorCallback,Runnable timeOutCallback) {

if (sseEmitterMap.containsKey(userId)) {

SseEmitter sseEmitter =sseEmitterMap.get(userId);

sseEmitterMap.remove(userId);

sseEmitter.complete();

}

try {

// 设置超时时间,0表示不过期。默认30秒

SseEmitter sseEmitter = new SseEmitter(5*60*1000L);

sseEmitter.send(SseEmitter.event().id(IdUtil.simpleUUID()).reconnectTime(1*60*1000L).data(""));

// 注册回调

sseEmitter.onCompletion(() -> {

});

sseEmitter.onError(errorCallback);

sseEmitter.onTimeout(timeOutCallback);

sseEmitterMap.put(userId, sseEmitter);

log.info("创建sse连接完成,当前用户:{}", userId);

return sseEmitter;

} catch (Exception e) {

log.info("创建sse连接异常,当前用户:{}", userId);

}

return null;

}

/**

* 给指定用户发送消息

*

*/

public boolean sendMessage(Long userId,String messageId, String message) {

if (sseEmitterMap.containsKey(userId)) {

SseEmitter sseEmitter = sseEmitterMap.get(userId);

try {

sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(1*60*1000L).data(message));

log.info("用户{},消息id:{},推送成功:{}", userId,messageId, message);

return true;

}catch (Exception e) {

sseEmitterMap.remove(userId);

log.info("用户{},消息id:{},推送异常:{}", userId,messageId, e.getMessage());

sseEmitter.complete();

return false;

}

}else {

log.info("用户{}未上线", userId);

}

return false;

}

/**

* 断开

* @param userId

*/

public void removeUser(Long userId){

if (sseEmitterMap.containsKey(userId)) {

SseEmitter sseEmitter = sseEmitterMap.get(userId);

sseEmitterMap.remove(userId);

sseEmitter.complete();

}else {

log.info("用户{} 连接已关闭",userId);

}

}

}

细节:

创建SseEmitter 对象时需要返回给客户端,且不能二次包装建立连接后,浏览器会处于加载中的状态,直到SseEmitter发送消息,或者连接超时和关闭,所以连接后就像进行了一次空消息的发送,避免浏览器一直处于加载中在 SseEmitter 中, timeout 属性表示 SseEmitter 在发送 SSE 事件到客户端时的超时时间。也就是说,当您使用 SseEmitter 的 send() 方法来发送 SSE 事件时,如果超过了 timeout 属性指定的时间,则将抛出 AsyncRequestTimeoutException 异常。客户端会进行自动重连,这个异常最好直接交给spring处理,因为这个请求是text/event-stream,全局异常处理可能会报错在 SseEventBuilder 中, timeout 属性表示当前正在构建的 SSE 事件的超时时间。也就是说,当您调用 SseEventBuilder 的 build() 方法来构建 SSE 事件时,如果超过了 timeout 属性指定的时间,则将抛出 SseEventTimeoutException 异常

前端代码:

 

相关文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: