Websocket和Server-Sent Events 对比推送数据给前端及各自的实现

二者对比WebSocket:Server-Sent Events (SSE):选择 WebSocket 还是 SSE:

Websocket 实现使用原生 WebSocket API:使用 Netty 创建 WebSocket:总结和选择:Netty 实现 Websocket

Server-Sent Events (SSE)实现创建DataManager接口实现实现说明前端实现弊端以及解决方案

在现代 Web 应用程序中,实时数据推送给前端变得越来越重要。无论是实时聊天、实时通知还是仪表板上的实时更新,都需要一种有效的方式来将数据推送给前端。本文将介绍两种常用的实现方法:WebSocket 和 Server-Sent Events(SSE),并提供详细的实现步骤。

二者对比

WebSocket 和 Server-Sent Events (SSE) 都是用于实现实时数据推送的技术,但它们在设计、用途和实现上有一些重要的区别。让我们详细比较这两种技术。

WebSocket:

双向通信:

WebSocket 允许双向通信,客户端和服务器都可以在任何时候向对方发送数据。这使得 WebSocket 非常适用于需要双向交互的应用,如在线聊天、多人协作工具等。 持久连接:

WebSocket 建立持久连接,客户端和服务器之间的连接保持打开状态。这减少了与建立和关闭连接相关的开销,适用于频繁的数据交换。 低延迟:

由于持久连接,WebSocket 可以实现低延迟的实时数据传输,适用于需要快速响应的应用。 复杂性:

实现 WebSocket 可能相对复杂,需要更多的服务器资源和额外的协议处理。 跨域通信:

WebSocket 通常需要配置服务器以允许跨域通信,因为它们使用自定义协议。 浏览器支持:

WebSocket 在现代浏览器中得到广泛支持。

Server-Sent Events (SSE):

单向通信:

SSE 是一种单向通信,只允许服务器向客户端发送数据。客户端无法向服务器发送数据。 HTTP 协议:

SSE 建立在 HTTP 协议之上,使用标准 HTTP 请求和响应。这使得 SSE 更容易部署,因为它与现有的 HTTP 基础设施兼容。 简单性:

SSE 的实现相对简单,服务器和客户端都不需要太多复杂的逻辑。 无需专用库:

SSE 不需要额外的库或协议处理,客户端可以使用浏览器的原生 EventSource API 来接收数据。 跨域通信:

SSE 支持跨域通信,可以通过 CORS(跨域资源共享)机制进行配置。 浏览器支持:

SSE 在现代浏览器中也得到广泛支持,但与 WebSocket 相比,它的历史要长一些。

选择 WebSocket 还是 SSE:

WebSocket 适用于需要双向通信和低延迟的场景,例如在线游戏、实时聊天应用等。 SSE 适用于单向服务器到客户端的实时数据推送,例如新闻更新、实时股票报价、天气预报等,特别是当你希望使用现有的 HTTP 基础设施时。 在某些情况下,你甚至可以同时使用 WebSocket 和 SSE,根据不同的需求选择合适的技术。

无论选择哪种技术,都需要考虑你的应用程序的具体需求和复杂性。WebSocket 提供了更多的灵活性和功能,而 SSE 更加简单和易于部署。最终的选择取决于你的项目目标和资源。

Websocket 实现

使用原生 WebSocket API:

简单性:

Spring Boot 提供了对原生 WebSocket API 的支持,使得创建 WebSocket 应用相对简单。开发人员可以直接使用 Java 标准库中的 WebSocket 相关类来处理 WebSocket 通信。 依赖:

原生 WebSocket 不需要额外的依赖,因为 WebSocket API 已经包含在 Java 标准库中。 性能:

原生 WebSocket API 在性能方面表现良好,适用于大多数中小型应用。 生态系统:

使用原生 WebSocket 可以更容易地集成到现有的 Spring Boot 生态系统中,例如 Spring Security 等。 简单应用:

当你需要创建相对简单的 WebSocket 应用时,原生 WebSocket 是一个不错的选择。

使用 Netty 创建 WebSocket:

灵活性:

Netty 是一个高度可定制的异步事件驱动框架,它可以用于创建各种网络应用,包括 WebSocket。Netty 提供了更多的灵活性和自定义选项,适用于复杂的 WebSocket 应用。 性能:

Netty 以其高性能和低延迟而闻名,适用于需要处理大量并发连接的应用。 协议支持:

Netty 支持多种协议,不仅限于 WebSocket。这意味着你可以在同一个应用程序中处理多种网络通信需求。 集成:

尽管 Netty 可以集成到 Spring Boot 中,但其集成可能需要更多的配置和代码。 复杂应用:

当你需要处理复杂的 WebSocket 场景,如高并发、自定义协议、复杂的消息处理等时,使用 Netty 是更好的选择。

总结和选择:

选择原生 WebSocket 还是使用 Netty 创建 WebSocket 应取决于你的项目需求和复杂性:

如果你的应用相对简单,对性能要求不是很高,可以考虑使用原生 WebSocket API,它更容易上手并且不需要额外的依赖。 如果你的应用需要处理高并发、复杂的协议、自定义消息处理或需要最大程度的性能和灵活性,那么使用 Netty 创建 WebSocket 可能更合适。Netty 为你提供了更多的控制权和自定义选项。

无论你选择哪种方法,Spring Boot 都提供了良好的支持,使得在应用中集成 WebSocket 变得相对容易。因此,你可以根据具体的项目需求来选择适合你的方法。

Netty 实现 Websocket

添加 maven 坐标

io.netty

netty-common

4.1.79.Final

创建 NettyWebsocketServer package com.todoitbo.baseSpringbootDasmart.netty.server;

import com.todoitbo.baseSpringbootDasmart.netty.handler.HeartbeatHandler;

import com.todoitbo.baseSpringbootDasmart.netty.handler.WebSocketHandler;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.HttpServerCodec;

import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;

import io.netty.handler.stream.ChunkedWriteHandler;

import io.netty.handler.timeout.IdleStateHandler;

import io.netty.handler.traffic.ChannelTrafficShapingHandler;

/**

* @author xiaobo

* @date 2023/9/5

*/

public class NettyWebsocketServer {

private final int port;

public NettyWebsocketServer(int port) {

this.port = port;

}

public void run() throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 创建用于接受客户端连接的 boss 线程池

EventLoopGroup workerGroup = new NioEventLoopGroup(); // 创建用于处理客户端请求的 worker 线程池

try {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ChannelTrafficShapingHandler trafficShapingHandler = new ChannelTrafficShapingHandler(

1, // 读取速率限制(字节/秒)

1, // 写入速率限制(字节/秒)

1, // 流量检查时间间隔(毫秒)

1 // 最大允许的时间窗口(毫秒)

);

ChannelPipeline pipeline = ch.pipeline();

// 添加心跳检测处理器,3秒内没有读写事件将触发 IdleStateEvent,下面的顺序错了也会出现问题的

pipeline.addLast(new IdleStateHandler(30, 0, 0));

pipeline.addLast(new HeartbeatHandler());

pipeline.addLast(new HttpServerCodec()); // 处理 HTTP 请求

pipeline.addLast(new ChunkedWriteHandler()); // 写大数据流的处理器

pipeline.addLast(new HttpObjectAggregator(8192)); // 将 HTTP 消息聚合为 FullHttpRequest 或 FullHttpResponse

// pipeline.addLast(new WebSocketFrameAggregator(8192)); // 将 HTTP 消息聚合为 FullHttpRequest 或 FullHttpResponse

// pipeline.addLast(new WebSocketServerCompressionHandler()); // 消息压缩

pipeline.addLast(new WebSocketHandler()); // 自定义 WebSocket 处理器

pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10)); // 处理 WebSocket 升级握手和数据帧处理

}

})

.option(ChannelOption.SO_BACKLOG, 128) // 设置服务器接受队列大小

.childOption(ChannelOption.SO_KEEPALIVE, true); // 开启 TCP 连接的 Keep-Alive 功能

// Bind and start to accept incoming connections.

System.out.println("TCP server started successfully");

ChannelFuture f = b.bind(port).sync(); // 绑定端口并等待绑定完成

// Wait until the server socket is closed.

// In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); // 阻塞直到服务器关闭

} finally {

// 优雅地关闭线程池

workerGroup.shutdownGracefully();

bossGroup.shutdownGracefully();

}

}

}

这里需要注意一下,pipeline.addLast的顺序不一致可能会导致程序报错,运行时

创建心跳 handle package com.todoitbo.baseSpringbootDasmart.netty.handler;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.handler.timeout.IdleState;

import io.netty.handler.timeout.IdleStateEvent;

public class HeartbeatHandler extends ChannelInboundHandlerAdapter {

int readTimeOut = 0;

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

IdleStateEvent event = (IdleStateEvent) evt;

if(event.state() == IdleState.READER_IDLE){

readTimeOut++;

}

if(readTimeOut >= 3){

System.out.println("超时超过3次,断开连接");

ctx.close();

}

}

}

创建WebSocketHandler package com.todoitbo.baseSpringbootDasmart.netty.handler;

import cn.hutool.core.collection.CollectionUtil;

import com.todoitbo.baseSpringbootDasmart.netty.NamedChannelGroup;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.handler.codec.http.*;

import io.netty.handler.codec.http.websocketx.*;

import io.netty.util.AttributeKey;

import io.netty.util.CharsetUtil;

import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

/**

* @author xiaobo

*/

@Slf4j

public class WebSocketHandler extends SimpleChannelInboundHandler {

private WebSocketServerHandshaker handshaker;

public static final AttributeKey USER_ID_KEY = AttributeKey.valueOf("userId");

public static final AttributeKey GROUP_ID_KEY = AttributeKey.valueOf("groupId");

private static final Map WORK_CHANNEL_MAP = new HashMap();

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

log.info("与客户端建立连接,通道开启!");

// 添加到channelGroup通道组(广播)

// 之后可以根据ip来进行分组

NamedChannelGroup.getChannelGroup("default").add(ctx.channel());

}

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

log.info("与客户端断开连接,通道关闭!");

// 从channelGroup通道组(广播)中删除

// 之后可以根据ip来进行分组

Channel channel = ctx.channel();

NamedChannelGroup.getChannelGroup("default").remove(channel);

WORK_CHANNEL_MAP.remove(channel);

}

public boolean userAuthentication(ChannelHandlerContext ctx,FullHttpRequest req) {

// 提取URI参数

QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());

Map> parameters = queryStringDecoder.parameters();

// 根据参数进行处理

List userId = parameters.get("userId");

List groupId = parameters.get("groupId");

if (CollectionUtil.isNotEmpty(userId) && CollectionUtil.isNotEmpty(groupId)) {

ctx.channel().attr(USER_ID_KEY).set(userId.get(0));

ctx.channel().attr(GROUP_ID_KEY).set(groupId.get(0));

return true;

}else {

return false;

}

}

private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {

// 检查是否升级到WebSocket

if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {

// 如果不是WebSocket协议的握手请求,返回400 Bad Request响应

sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));

return;

}

// 构建握手响应

WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(

getWebSocketLocation(req), null, true);

handshaker = wsFactory.newHandshaker(req);

if (handshaker == null) {

// 如果不支持WebSocket版本,返回HTTP 426 Upgrade Required响应

WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());

} else {

handshaker.handshake(ctx.channel(), req);

// 进行WebSocket握手

// 在认证成功后,设置用户ID到Channel属性中

boolean authentication = userAuthentication(ctx,req);// 这里需要实现用户认证逻辑

if (!authentication) {

// 用户认证失败,可能需要关闭连接或发送认证失败消息

// 1. 关闭连接:

ctx.close();

// 2. 发送认证失败消息给客户端:

String failureMessage = "认证失败,请提供有效的身份验证信息。";

ctx.writeAndFlush(failureMessage);

return;

}

// 其他逻辑...

WORK_CHANNEL_MAP.put(ctx.channel(), ctx.channel().attr(GROUP_ID_KEY).get());

}

}

private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {

// 发送HTTP响应

if (res.status().code() != 200) {

ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);

res.content().writeBytes(buf);

buf.release();

HttpUtil.setContentLength(res, res.content().readableBytes());

}

ChannelFuture future = ctx.channel().writeAndFlush(res);

if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {

future.addListener(ChannelFutureListener.CLOSE);

}

}

private String getWebSocketLocation(FullHttpRequest req) {

return "ws://" + req.headers().get(HttpHeaderNames.HOST) + req.uri();

}

private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {

// 处理WebSocket消息,可以根据实际需求进行处理

if (frame instanceof TextWebSocketFrame) {

// 处理文本消息

String text = ((TextWebSocketFrame) frame).text();

System.out.println("Received message: " + text);

// 可以在这里处理WebSocket消息并发送响应

// ...

} else if (frame instanceof BinaryWebSocketFrame) {

// 处理二进制WebSocket消息

// ...

System.out.println("123");

} else if (frame instanceof CloseWebSocketFrame) {

// 处理WebSocket关闭请求

handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());

} else if (frame instanceof PingWebSocketFrame) {

// 处理WebSocket Ping消息

System.out.println("cs");

ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));

}

}

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

if (msg instanceof FullHttpRequest) {

// 处理HTTP握手请求

handleHttpRequest(ctx, (FullHttpRequest) msg);

} else if (msg instanceof WebSocketFrame) {

// 处理WebSocket消息

handleWebSocketFrame(ctx, (WebSocketFrame) msg);

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

// 发生异常时的处理

log.error(cause.getMessage());

ctx.close();

}

}

创建NamedChannelGroup package com.todoitbo.baseSpringbootDasmart.netty;

import io.netty.channel.group.ChannelGroup;

import io.netty.channel.group.DefaultChannelGroup;

import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

public class NamedChannelGroup{

private String groupName;

public static Map channelGroupMap = new ConcurrentHashMap<>();

static {

channelGroupMap.put("default", new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));

}

public static void setGroupName(String groupName){

channelGroupMap.put(groupName, new DefaultChannelGroup(GlobalEventExecutor.INSTANCE));

}

public static ChannelGroup getChannelGroup(String groupName){

return channelGroupMap.get(groupName);

}

}

Server-Sent Events (SSE)实现

创建DataManager

package com.todoitbo.baseSpringbootDasmart.sse;

import org.springframework.http.MediaType;

import org.springframework.stereotype.Component;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

/**

* 数据管理器用于管理Server-Sent Events (SSE) 的订阅和数据推送。

* @author xiaobo

*/

@Component

public class DataManager {

private final Map> dataEmitters = new HashMap<>();

/**

* 订阅特定数据类型的SSE连接。

*

* @param dataType 要订阅的数据类型

* @param emitter SSE连接

*/

public void subscribe(String dataType, SseEmitter emitter) {

dataEmitters.computeIfAbsent(dataType, k -> new ArrayList<>()).add(emitter);

emitter.onCompletion(() -> removeEmitter(dataType, emitter));

emitter.onTimeout(() -> removeEmitter(dataType, emitter));

}

/**

* 推送特定数据类型的数据给所有已订阅的连接。

*

* @param dataType 要推送的数据类型

* @param data 要推送的数据

*/

public void pushData(String dataType, String data) {

List emitters = dataEmitters.getOrDefault(dataType, new ArrayList<>());

emitters.forEach(emitter -> {

try {

emitter.send(SseEmitter.event().data(data, MediaType.TEXT_PLAIN));

} catch (IOException e) {

removeEmitter(dataType, emitter);

}

});

}

private void removeEmitter(String dataType, SseEmitter emitter) {

List emitters = dataEmitters.get(dataType);

if (emitters != null) {

emitters.remove(emitter);

}

}

}

接口实现

package com.todoitbo.baseSpringbootDasmart.controller;

import com.todoitbo.baseSpringbootDasmart.sse.DataManager;

import org.springframework.http.MediaType;

import org.springframework.http.ResponseEntity;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.PathVariable;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.annotation.Resource;

/**

* @author xiaobo

*/

@RestController

@RequestMapping("/environment")

public class EnvironmentController {

@Resource private DataManager dataManager;

@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

public SseEmitter subscribe() {

SseEmitter emitter = new SseEmitter();

dataManager.subscribe("environment", emitter);

return emitter;

}

// 示例:推送环境监测数据给前端

@GetMapping("/push/{testText}")

public ResponseEntity pushEnvironmentData(@PathVariable String testText) {

dataManager.pushData("environment", testText);

return ResponseEntity.ok("Data pushed successfully.");

}

}

实现说明

每个不同类型的数据推送都需要一个对应的SSE订阅端点(subscribe)。每个数据类型都有一个对应的订阅端点,用于前端建立SSE连接,并在后端接收和处理特定类型的数据推送。

在你的后端应用中,对于每种数据类型,你需要创建一个对应的Controller或处理器来处理该数据类型的SSE订阅。每个Controller会有自己的SSE订阅端点,前端可以连接到不同的端点以接收相应类型的数据。

这种方式允许你将不同类型的数据推送逻辑分离,使代码更具可维护性和可扩展性。当有新的数据可用时,只需调用相应类型的数据推送方法,而不必修改通用的SSE管理逻辑。

前端实现

SSE Data Receiver

Real-time Data Display

弊端以及解决方案

如果你没什么处理的话,在它首次调用subscribe时候可能会出现连接超时的问题,因为这个是一个长连接,出现这种问题是因为,此时并没有数据产生,至此,除非你刷新页面,否则即使有数据产生前端也不会受到了

你希望前端在第一次订阅SSE连接后,即使后端没有数据产生,之后也能接收到数据。这可以通过以下方式来实现:

保持持久连接: 确保前端建立的SSE连接是持久性连接,不会在第一次连接成功后关闭。这可以让连接一直保持打开状态,即使后端没有即时数据产生。你可以在前端代码中使用以下方式来确保连接持久: const eventSource = new EventSource('/environment/subscribe');

默认情况下,EventSource对象会自动重连,以保持连接的持久性。 定期发送心跳数据: 在后端定期发送一些心跳数据,以确保连接保持活跃。这可以防止连接超时关闭。你可以在后端定期发送一个包含无用信息的SSE事件,例如: @Scheduled(fixedRate = 30000) // 每30秒发送一次心跳数据

public void sendHeartbeat() {

dataManager.pushData("heartbeat", "Heartbeat data");

}

前端可以忽略这些心跳事件,但它们会保持连接处于活跃状态。 前端自动重连: 在前端代码中添加自动重连逻辑,以处理连接断开的情况。这样,如果连接由于某种原因断开,前端会自动尝试重新建立连接。示例: const eventSource = new EventSource('/environment/subscribe');

eventSource.onerror = (error) => {

// 处理连接错误

console.error('Error occurred:', error);

// 重新建立连接

eventSource.close();

setTimeout(() => {

// 重新建立连接

eventSource = new EventSource('/environment/subscribe');

}, 1000); // 1秒后重试

};

通过结合上述方法,你可以确保前端能够建立并保持持久SSE连接,即使后端没有即时数据产生。这样,一旦后端有数据产生,前端也可以接收到数据而无需重新订阅。

推荐链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 

发表评论

返回顶部暗黑模式