springboot整合SSE技术开发经验总结及心得

一、开发背景二、快速了解SSE1、概念2、特性

三、开发思路四、代码演示1、引入依赖2、服务端代码3、后端定时任务代码

4、解决乱码的实体类4、前端代码

五、核心代码分析

一、开发背景

公司需要开发一个大屏界面,大屏页面的数据是实时更新的,由后端主动实时推送数据给大屏页面。此时会立刻联想到:websocket 技术。当然使用websocket,确实可以解决这个场景。但是今天本文的主角是 :SSE,他和websocket略有不同,SSE只能由服务端主动发消息,而websocket前后端都可以推送消息。

二、快速了解SSE

1、概念

SSE全称 Server Sent Event,顾名思义,就是服务器发送事件,所以也就注定了他 只能由服务端发送信息。

2、特性

主动从服务端推送消息的技术本质是一个HTTP的长连接发送的是一个stream流,格式为text/event-stream

三、开发思路

要实现后端的实时推送消息,前台实时更新数据,思路如下:

1、前后端需要建立连接2、后端如何做到实时推送信息呢?可以采用定时调度

四、代码演示

1、引入依赖

原则上是不需要引入的,因为springboot底层已经整合了SSE

org.springframework.boot

spring-boot-starter-web

2、服务端代码

controller层

@RestController

@CrossOrigin

@RequestMapping("/sse")

public class SseEmitterController extends BaseController {

@Autowired

private SseEmitterService sseEmitterService;

/**

* 创建SSE连接

*

* @return

*/

@GetMapping("/connect/{type}")

public SseEmitter connect(@PathVariable("type") String type) {

return sseEmitterService.connect(type);

}

}

service层

public interface SseEmitterService {

SseEmitter connect(String type);

void volumeOverview();

void sysOperation();

void monitor();

........

}

service实现层

@Service

public class SseEmitterServiceImpl implements SseEmitterService {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private static Map sseCache = new ConcurrentHashMap<>();

/**

* 创建连接sse

* @param type

* @return

*/

@Override

public SseEmitter connect(String type) {

final String clientId = UUID.randomUUID().toString().replace("-", "");

SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L);

try {

sseEmitter.send(SseEmitter.event().comment("创建连接成功 !!!"));

} catch (IOException e) {

logger.error("创建连接失败 , {} " , e.getMessage());

}

sseEmitter.onCompletion(() -> {

logger.info("connect onCompletion , {} 结束连接 ..." , clientId);

removeClient(clientId);

});

sseEmitter.onTimeout(() -> {

logger.info("connect onTimeout , {} 连接超时 ..." , clientId);

removeClient(clientId);

});

sseEmitter.onError((throwable) -> {

logger.error("connect onError , {} 连接异常 ..." , clientId);

removeClient(clientId);

});

sseCache.put(clientId, sseEmitter);

//立即推送

volumeOverview();

dealResp();

monitor();

if (type.equals(SseEmitterConstant.OVER_VIEW)){

sysOperation();

mileStone();

}

logger.info("当前用户总连接数 : {} " , sseCache.size());

return sseEmitter;

}

/**

* 交易量概览

*/

@Override

public void volumeOverview() {

Map map = new HashMap<>();

map.put("latest_tps",440.3);

map.put("total_cics_trans",341656001);

map.put("total_zjcx_trans",391656001);

map.put("zjcx_tps",23657);

map.put("day10",48388352);

map.put("history",105013985);

SseEmitter.SseEventBuilder data = SseEmitter.event().name(SseEmitterConstant.VOLUME_OVERVIEW).data(map, MediaType.APPLICATION_JSON);

for (Map.Entry entry : sseCache.entrySet()) {

SseEmitterUTF8 sseEmitter = entry.getValue();

if (sseEmitter == null) {

continue;

}

try {

sseEmitter.send(data);

} catch (IOException e) {

String body = "SseEmitterServiceImpl[volumeOverview ]";

logger.error(body + ": 向客户端 {} 推送消息失败 , 尝试进行重推 : {}", entry.getKey() ,e.getMessage());

messageRepush(entry.getKey(),data,body);

}

}

}

private void messageRepush(String type, SseEmitter.SseEventBuilder data,String body){

for (int i = 0; i < 3; i++) {

try {

Thread.sleep(2000);

SseEmitterUTF8 sseEmitter = sseCache.get(type);

if (sseEmitter == null) {

logger.error(body + " :向客户端{} 第{}次消息重推失败,未创建长链接", type, i + 1);

continue;

}

sseEmitter.send(data);

} catch (Exception ex) {

logger.error(body + " :向客户端{} 第{}次消息重推失败", type, i + 1, ex);

continue;

}

logger.info(body + " :向客户端{} 第{}次消息重推成功", type, i + 1);

return;

}

}

常量类

public class SseEmitterConstant {

/**

* 创建连接的客户端类型

*/

public static final String OVER_VIEW = "overview";

/**

* even 数据类型

*/

public static final String VOLUME_OVERVIEW = "vw";

public SseEmitterConstant(){}

}

3、后端定时任务代码

采用注解的方式实现:@Scheduled,使用该注解时,需要增加这个注解@EnableScheduling,相当于来开启定时调度功能,如果不加@EnableScheduling注解,那么定时调度会不生效的。

启动类增加注解@EnableScheduling

package com.hidata;

import org.mybatis.spring.annotation.MapperScan;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

import org.springframework.context.annotation.ComponentScan;

import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })

@EnableScheduling

public class HidataApplication {

public static void main(String[] args)

{

SpringApplication.run(HidataApplication.class, args);

System.out.println("[HiUrlShorter platform startup!]");

}

}

创建 定时任务调度类,在该类上加上@Scheduled注解,

@Configuration

public class SendMessageTask{

private final Logger logger = LoggerFactory.getLogger(this.getClass());

@Autowired

private SseEmitterService sseEmitterService;

@Scheduled(cron = "0/40 * * * * ?}")

public void volumeOverviewTask() {

try {

sseEmitterService.volumeOverview();

} catch (Exception e) {

logger.error("SendMessageTask [volumeOverviewTask]: {} ",e.getMessage());

}

}

.......

}

4、解决乱码的实体类

如果发送中文数据的时候,会出现乱码的现象。此时需要做对应的处理

package com.hidata.devops.lagrescreen.domain;

import org.springframework.http.HttpHeaders;

import org.springframework.http.MediaType;

import org.springframework.http.server.ServerHttpResponse;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.nio.charset.StandardCharsets;

public class SseEmitterUTF8 extends SseEmitter {

public SseEmitterUTF8(Long timeout) {

super(timeout);

}

@Override

protected void extendResponse(ServerHttpResponse outputMessage) {

super.extendResponse(outputMessage);

HttpHeaders headers = outputMessage.getHeaders();

headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));

}

}

4、前端代码

// 连接服务器

var sseSource = new EventSource("http://localhost:8080/sse/connect");

// 连接打开

sseSource.onopen = function () {

console.log("连接打开");

}

// 连接错误

sseSource.onerror = function (err) {

console.log("连接错误:", err);

}

//接收信息

eventSource.addEventListener("vw", function (event) {

console.log(event.data);

.....

});

五、核心代码分析

先看代码片段

SseEmitter.event().name("vw").data(map, MediaType.APPLICATION_JSON);

分析: 后端不会把所有数据一起发送给前端,而是会把页面分成多个模块,然后发给前端,此时前端需要区分哪一块数据对应哪一块页面。所以我们可以给各个模块的数据起个名字。也就是上述的代码

SseEmitter.event().name("vw")

这样,前端就知道怎么渲染页面了,类似于这样 关于even()的属性,可以查看源码,

public interface SseEventBuilder {

SseEmitter.SseEventBuilder id(String var1);

SseEmitter.SseEventBuilder name(String var1);

SseEmitter.SseEventBuilder reconnectTime(long var1);

SseEmitter.SseEventBuilder comment(String var1);

SseEmitter.SseEventBuilder data(Object var1);

SseEmitter.SseEventBuilder data(Object var1, @Nullable MediaType var2);

Set build();

}

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: