前言

Websocket是HTML5新增的一种全双工通信协议,客户端和服务端基于TCP握手连接成功后,两者之间就可以建立持久性的连接,实现双向数据传输。

Socket.io不是Websocket,它只是将Websocket和轮询 (Polling)机制以及其它的实时通信方式封装成了通用的接口,并且在服务端实现了这些实时机制的相应代码。也就是说,Websocket仅仅是 Socket.io实现实时通信的一个子集。因此Websocket客户端连接不上Socket.io服务端,当然Socket.io客户端也连接不上Websocket服务端。

前置思路

思路:

配置好SocketConfig,创建SocketIOServer,绑定监听IP和Port,规定好消息超时时间等抽象好getUserCode方法,如果确定每个客户端连接的SocketIOClient,我们通常使用用户code 进行区分Handler容器内编写好连接方法@OnConnect、断开方法@OnDisconnect、接收事件方法@OnEvent(“event”)等最后在Spring项目初始化后,利用CommandLineRunner,将SocketIOServer启动

前置技术支持

技术支持:CommandLineRunner

假如我们想要在Spring项目启动完成后执行一些方法或者脚本,可以使用一下方式,但明显过于粗糙,因此可以实现CommandLineRunner中的run方法。

@SpringBootApplication

public class ImApplication {

public static void main(String[] args) {

SpringApplication.run(ImApplication.class, args);

System.out.println("运行方法1");

System.out.println("运行方法2");

System.out.println("运行方法3");

System.out.println("运行方法4");

}

}

我们还可以自定义方法的执行顺序

package com.im;

import org.springframework.boot.CommandLineRunner;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.core.annotation.Order;

import org.springframework.stereotype.Component;

@SpringBootApplication

public class ImApplication {

public static void main(String[] args) {

SpringApplication.run(ImApplication.class, args);

}

}

@Component

@Order(1)

class Function1 implements CommandLineRunner{

@Override

public void run(String... args) throws Exception {

System.out.println("运行方法1");

}

}

@Component

@Order(2)

class Function2 implements CommandLineRunner{

@Override

public void run(String... args) throws Exception {

System.out.println("运行方法2");

}

}

@Component

@Order(3)

class Function3 implements CommandLineRunner{

@Override

public void run(String... args) throws Exception {

System.out.println("运行方法3");

}

}

搭建一个SocketIO 服务

配置 SocketIOServer

import com.corundumstudio.socketio.SocketIOServer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration

public class SocketConfig {

@Value("${websocket.app.appHost}")

private String appHost;

@Value("${websocket.app.appPort}")

private int appPort;

@Resource

private AppClientHandler appClientHandler;

@Bean(name = "appServer")

public SocketIOServer appIOServer() {

//创建Socket,并设置监听端口

com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();

// 设置主机名,默认是0.0.0.0

config.setHostname(appHost);

// 设置监听端口

config.setPort(appPort);

// 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间

config.setUpgradeTimeout(10000);

// Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔

config.setPingInterval(25000);

// Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件

config.setPingTimeout(60000);

SocketIOServer server = new SocketIOServer(config);

server.addListeners(appClientHandler, AppClientHandler.class);

return server;

}

}

抽象用户code获取方法

每个系统确认建立连接时的用户信息都有所不同,例子中通过解析token来获取当前用户code,又因为使用的Spring Security进行认证,所有看着代码较为复杂,实际生产根据各自系统获取用户唯一标识即可。

import com.corundumstudio.socketio.HandshakeData;

import com.corundumstudio.socketio.SocketIOClient;

import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.security.oauth2.common.OAuth2AccessToken;

import org.springframework.security.oauth2.provider.OAuth2Authentication;

import org.springframework.security.oauth2.provider.token.TokenStore;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

import java.util.Objects;

@Component

public class AbstractHandler {

private static final Logger logger = LoggerFactory.getLogger(AbstractHandler.class);

@Resource

private TokenStore tokenStore;

private static final String TOKEN_PREFIX = "Bearer";

/**

* 获取用户编号

*

* @param client 请求的客户端信息

* @return 用户账号

*/

public String getUserCode(SocketIOClient client) {

HandshakeData handshakeData = client.getHandshakeData();

//原始token,前端传过来

String token = handshakeData.getSingleUrlParam(CommonConstants.TOKEN);

if (StringUtils.isBlank(token) || (token.length() - CommonConstants.ONE) < (TOKEN_PREFIX.length() + CommonConstants.ONE)) {

logger.warn("socket请求token异常,token非法【{}】,sessionId->{}", token, client.getSessionId());

client.sendEvent("fail", 403);

return null;

}

//把token前缀去掉支掉

token = token.substring(TOKEN_PREFIX.length() + CommonConstants.ONE);

//根据token获取accessToken

OAuth2AccessToken accessToken = tokenStore.readAccessToken(token);

if (Objects.isNull(accessToken) || StringUtils.isBlank(accessToken.getValue())) {

logger.warn("socket请求token异常,token非法【{}】,根据toke找不到accessToken,sessionId->{}", token, client.getSessionId());

client.sendEvent("fail", 403);

return null;

}

//根据accessToken,获取用户登录信息

OAuth2Authentication auth2Authentication = tokenStore.readAuthentication(accessToken);

if (Objects.isNull(auth2Authentication) || StringUtils.isBlank(auth2Authentication.getName())) {

logger.warn("socket请求token异常,token非法【{}】,根据toke找不到用户账号,sessionId->{}", token, client.getSessionId());

client.sendEvent("fail", 403);

return null;

}

PigUser pigUser = (PigUser)auth2Authentication.getPrincipal();

return pigUser.getUserCode();

}

}

编写WebClientHandler容器

这个容器其实就是干四件件事。

@OnConnect 把每次用户客户端的连接SocketIOClient用一个ConcurrentHashMap存起来@OnDisconnect 断开连接时,再将SocketIOClient从map中删掉@OnEvent(“event”) 写一个监督前端客户端事件的方法。用不到或不需要监听客户端甚至可以不用写编写一个通过用户code 去map容器中找到SocketIOClient,并发送sendEvent的推送消息方法。

import com.alibaba.fastjson.JSONObject;

import com.chinaentropy.systembase.websocket.AbstractHandler;

import com.corundumstudio.socketio.AckRequest;

import com.corundumstudio.socketio.SocketIOClient;

import com.corundumstudio.socketio.annotation.OnConnect;

import com.corundumstudio.socketio.annotation.OnDisconnect;

import com.corundumstudio.socketio.annotation.OnEvent;

import org.apache.commons.lang3.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;

import java.util.List;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

/**

* SocketIOClient容器

*/

@Component

public class WebClientHandler extends AbstractHandler {

private static final Logger logger = LoggerFactory.getLogger(WebClientHandler.class);

private static ConcurrentHashMap clientMap = new ConcurrentHashMap<>();

public ConcurrentHashMap getClientMap() {

return clientMap;

}

/**

* 添加connect事件,当客户端发起连接时调用

*

* @param client 连接的客户端

*/

@OnConnect

public void onConnect(SocketIOClient client) {

logger.info("客户端发起连接, sessionId: {}", client.getSessionId());

String userCode = getUserCode(client);

if (StringUtils.isBlank(userCode)) {

logger.warn("websocket请求token异常. sessionId->{}", client.getSessionId());

client.sendEvent("fail", 403);

return;

}

clientMap.put(client, userCode);

}

/**

* 接收(监听)来着 web端浏览器发送的事件、事件触发为 webevent

*

* @return void

* @Param [client, request, data]

*/

@OnEvent("event")

public void chatEvent(SocketIOClient client, AckRequest ackRequest, String message) {

logger.info("服务端接收数据, message: {}", message);

}

/**

* 添加@OnDisconnect事件,客户端断开连接时调用,刷新客户端信息

*

* @param client 注销的客户端

*/

@OnDisconnect

public void onDisconnect(SocketIOClient client) {

logger.info("客户端断开连接, sessionId: {}", client.getSessionId().toString());

clientMap.remove(client);

client.disconnect();

}

/**

* 推送Obj给所有用户

*

* @param eventName 事件名

* @param jsonStr 参数

*/

public void pushMessage(String eventName, String jsonStr) {

clientMap.forEach((key, value) -> {

logger.info("[DispatchAppClientHandler][pushMessage]: eventName->{}, data->{}", eventName, jsonStr);

key.sendEvent(eventName, jsonStr);

});

}

/**

* 按照用户编号列表进行String消息推送

*

* @param eventName 事件名

* @param object 参数

* @param userCodeList 用户编号列表

*/

public void pushMessageByUsers(String eventName, Object object, List userCodeList) {

clientMap.forEach((key, value) -> {

if (userCodeList.contains(value)) {

String data = JSONObject.toJSONString(object);

logger.info("[DispatchAppClientHandler][pushMessageByUser]: eventName->{}, data->{}, user->{}", eventName, data, value);

key.sendEvent(eventName, data);

}

});

}

Spring项目启动后,启动SocketIOServer

import com.corundumstudio.socketio.SocketIOServer;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.boot.CommandLineRunner;

import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component

public class WebSocketServerRunner implements CommandLineRunner {

private static final Logger logger = LoggerFactory.getLogger(WebSocketServerRunner.class);

@Resource

@Qualifier("appServer")

private SocketIOServer appServer;

@Override

public void run(String... args) {

logger.info("SocketIO 启动...");

appServer.start();

}

}

好文推荐

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: