WebSocket 实现长连接及通过WebSocket获取客户端IP

WebSocket 是一种支持双向通讯的网络通信协议。

实现过程:

1 添加ServerEndpointExporter配置bean

@Configuration

public class WebSocketConfig {

// 自动注册使用了@ServerEndpoint**注解声明的Websocket endpoint

@Bean

public ServerEndpointExporter serverEndpointExporter() {

return new ServerEndpointExporter();

}

}

2 实现过程

需求是通过WebSocket,建立长连接,并获取当前在线的人数。通过Websocket 不断发送消息,建立长连接,给Session续命。我是通过MAC地址,区分不同的设备,因为我的需求中需要一个账号能够登录多台机器。所以我通过MAC地址用于标识不同的设备信息。(若是一个账号只能登陆一次,采用用户ID)

1 . 添加配置

@ServerEndpoint(value = "/websocket/onlineAme/{Mac}")

2. 主要方法

@OnOpen

首次建立连接时,运行该注解下的方法。在此方法中可以获取websocket的session并将该用户的Mac 以及 Session存放到 private static ConcurrentHashMap sessionMap = new ConcurrentHashMap<>();

@OnOpen

public void onOpen(Session session, @PathParam(value = "Mac") String mac) throws IOException {

log.info("【Ame websocket 链接成功】,Ame mac:"+ mac);

session.setMaxIdleTimeout(sessionTimeout);

// 获取客户端的Ip

if(StringUtils.isBlank(mac)||ObjectUtil.isNull(mac)){

log.error("并未上传设备信息");

}

setMap(session,mac);

}

private void setMap(Session session,String mac){

sessionMap.put(mac,session);

log.warn("Ame MAC address:{},当前在线人数为:{}",mac,sessionMap.size());

}

@OnMessage

该方法是客户端与服务端进行通讯。 每次客户端与服务端建立通讯时,会给Session续命,延长Session的时常 @OnMessage

public void onMessage(Session session, String msg) {

session.setMaxIdleTimeout(sessionTimeout);

if(StringUtils.isBlank(msg)){

return;

}

// 判断 MAC 地址 是否 是正在上线

String mac = getMACBySession(session);

if(StringUtils.isBlank(mac)){

return;

}

// 将上传的msg转化为 AmeServicePack

handleAmeMsg(mac,ame);

}

private String getMACBySession(Session session){

String mac = getUserIdBySession(session);

if(ObjectUtil.isNull(mac)){

return null;

}

return mac;

}

private String getUserIdBySession(Session session){

for (String mac : sessionMap.keySet()) {

/*session 本身是有一个id的,通过userid 找到Session 然后再通过 其对应 id,与 传入的Session 中的 session对比 */

if(sessionMap.get(mac).getId().equals(session.getId())){

return mac;

}

}

return null;

}

@OnClose @OnClose

public void onClose(Session session,@PathParam(value = "Mac") String mac) {

removeMap(session);

log.info("【websocket退出成功】该设备退出:"+mac);

}

private void removeMap(Session session){

String mac = getUserIdBySession(session);

if(ObjectUtil.isNull(mac)){

return;

}

sessionMap.remove(mac);

//userMap.remove(userId);

removeAme(mac);

}

private void removeAme(String mac){

ameHashMap.remove(mac);

sendInfo("Ame:"+ameInfo.getAmeip()+"下线成功",mac);

}

@OnError @OnError

public void onError(Session session,Throwable throwable) {

log.error("websocket: 发生了错误");

removeMap(session);

throwable.printStackTrace();

}

向某一个用户发送消息 /*

发送自定义消息

向某一个用户发送,消息*/

public static void sendInfo(String message,String toMac){

log.info("发送消息:{},内容是:{}",message,toMac);

if(ObjectUtil.isNull(toMac) || StringUtils.isBlank(message)){

log.error("消息不完整");

return;

}

// 包含就发送

//System.out.println(sessionMap.containsKey(toUserId));

if(sessionMap.containsKey(toMac)){

try {

sendMessage(sessionMap.get(toMac),message);

}catch (Exception e){

log.error("发送给用户{}的消息出错",toMac);

}

}

// 用户不在线

else {

log.error("设备{}不在线",toMac);

}

}

public static void sendMessage(Session session,String message) throws IOException {

session.getBasicRemote().sendText(message);

}

3 通过WebSocket获取 请求Ip地址

Websocket 中的request中并没有header 中并没有客户端的Ip地址,但是在SpringCloud中,是通过网关,路由转发。在网关中的请求的request中存在Ip地址,可以通过拦截器,获取网关的ip然后将request放到websocket的request中。

3.1 拦截器

package com.mam.gateway.filter;

import jdk.nashorn.internal.runtime.regexp.joni.Config;

import org.springframework.cloud.gateway.filter.GatewayFilter;

import org.springframework.cloud.gateway.filter.GatewayFilterChain;

import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;

import org.springframework.http.server.reactive.ServerHttpRequest;

import org.springframework.stereotype.Component;

import org.springframework.web.server.ServerWebExchange;

import reactor.core.publisher.Mono;

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpSession;

import java.net.InetSocketAddress;

import java.util.Objects;

/**

* 获取 WebSocket 上传Session的 Ip信息

*/

@Component

public class SessionFilter extends AbstractGatewayFilterFactory {

public SessionFilter()

{

super(SessionFilter.Config.class);

}

@Override

public String name()

{

return "SessionFilter";

}

@Override

public GatewayFilter apply(SessionFilter.Config config) {

return new GatewayFilter() {

@Override

public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {

ServerWebExchange mutatedServerWebExchange = exchange.mutate().request(exchange.getRequest()).build();

return chain.filter(mutatedServerWebExchange);

}

};

}

static class Config

{

private Integer order;

public Integer getOrder()

{

return order;

}

public void setOrder(Integer order)

{

this.order = order;

}

}

}

3.2 ServerEndpointConfig 的配置

public class WebSocketConfigurator extends ServerEndpointConfig.Configurator{

private static final Logger log = LoggerFactory.getLogger(WebSocketConfigurator.class);

@Override

public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {

Map attributes = sec.getUserProperties();

try{

String clientIp = IpUtils.getIpAddrByHandshakeRequest(request.getHeaders());

attributes.put("clientIp",clientIp);

log.info("websocker拦截器X-Real_IP{}header{}",request.getHeaders().get("X-Real_IP"),request.getHeaders().toString());

}catch (Exception e){

e.printStackTrace();

}

super.modifyHandshake(sec,request,response);

}

}

public static String getIpAddrByHandshakeRequest(Map> map)

{

if (map == null)

{

return null;

}

String ip = null;

// X-Forwarded-For:Squid 服务代理

String ipAddresses = Convert.toStr(map.get("X-Forwarded-For"));

if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))

{

// Proxy-Client-IP:apache 服务代理

ipAddresses = Convert.toStr(map.get("Proxy-Client-IP"));

}else {

ipAddresses = ipAddresses.substring(1,ipAddresses.length()-1);

}

if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))

{

// WL-Proxy-Client-IP:weblogic 服务代理

ipAddresses = Convert.toStr(map.get("WL-Proxy-Client-IP"));

}

if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))

{

// HTTP_CLIENT_IP:有些代理服务器

ipAddresses = Convert.toStr(map.get("HTTP_CLIENT_IP"));

}

if (ipAddresses == null || ipAddresses.length() == 0 || "unknown".equalsIgnoreCase(ipAddresses))

{

// X-Real-IP:nginx服务代理

ipAddresses = Convert.toStr(map.get("X-Real-IP"));

}

// 有些网络通过多层代理,那么获取到的ip就会有多个,一般都是通过逗号(,)分割开来,并且第一个ip为客户端的真实IP

if (ipAddresses != null && ipAddresses.length() != 0)

{

ip = ipAddresses.split(",")[0];

}

return ip.equals("0:0:0:0:0:0:0:1") ? "127.0.0.1" : ip;

}

3.3 WebSocket 获取Ip

AmeServicePack ame = JSONObject.toJavaObject(JSONObject.parseObject(msg), AmeServicePack.class);

Map userProperties = session.getUserProperties();

String clientip = (String) userProperties.get("clientIp");

4 完整代码

@Component

@ServerEndpoint(value = "/websocket/onlineAme/{Mac}",configurator = WebSocketConfigurator.class)

public class AmeLoginWebSocket {

static Logger log = LoggerFactory.getLogger(AmeLoginWebSocket.class);

/* 存储session */

private static ConcurrentHashMap sessionMap = new ConcurrentHashMap<>();

/* 存储 在线 ame服务 信息*/

private ConcurrentHashMap ameHashMap = new ConcurrentHashMap<>();

/* 存储 Ip */

private static final long sessionTimeout = 600000;

/** 链接成功后发送消息**/

@OnMessage

public void onMessage(Session session, String msg) {

session.setMaxIdleTimeout(sessionTimeout);

log.info("【websocket 接收成功】内容为"+msg);

if(StringUtils.isBlank(msg)){

return;

}

// 判断 MAC 地址 是否 是正在上线

String mac = getMACBySession(session);

if(StringUtils.isBlank(mac)){

return;

}

AmeServicePack ame = JSONObject.toJavaObject(JSONObject.parseObject(msg), AmeServicePack.class);

Map userProperties = session.getUserProperties();

String clientip = (String) userProperties.get("clientIp");

// 将上传的msg转化为 AmeServicePack

handleAmeMsg(mac,ame);

}

private void handleAmeMsg(String mac, AmeServicePack ameInfo) {

log.info("Ame:MAC{}Ip{}:",mac,ameInfo.getAmeip());

if(ameHashMap.containsKey(mac)){

log.info("该设备{}Ip{}已上线",mac,ameInfo.getAmeip());

sendInfo("该用户Ip"+ameInfo.getAmeip()+"已存在",mac);

}else {

ameHashMap.put(mac,ameInfo);

}

}

private boolean updateOnline(AmeServicePack ameInfo){

AjaxResult isonline = SpringUtils.getBean(IAmePackService.class).update(ameInfo,SecurityConstants.INNER);

if((Integer)isonline.get("code")== 200){

return true;

}else {

return false;

}

}

/**

* ameIp 为上线 但 任务表中仍有 正在运行的任务,并将其修改为 -2

* @param ameIp

*/

private void updateErrorTaskStatus(String ameIp){

SpringUtils.getBean(IAmePackService.class).updateErrorAMEStatus(ameIp,SecurityConstants.INNER);

}

private void handlePCMsg(LoginUser loginUser, String msg) {

log.info("系统用户:{},消息{}:",loginUser.getUsername(),msg);

}

private String getMACBySession(Session session){

String mac = getUserIdBySession(session);

if(ObjectUtil.isNull(mac)){

return null;

}

return mac;

}

/*

*成功建立连接后调用

* @param [session, username]

* @return void

*/

@OnOpen

public void onOpen(Session session, @PathParam(value = "Mac") String mac) throws IOException {

log.info("【Ame websocket 链接成功】,Ame mac:"+ mac);

session.setMaxIdleTimeout(sessionTimeout);

// 获取客户端的Ip

if(StringUtils.isBlank(mac)||ObjectUtil.isNull(mac)){

log.error("并未上传设备信息");

}

setMap(session,mac);

}

private void setMap(Session session,String mac){

sessionMap.put(mac,session);

log.warn("Ame MAC address:{},当前在线人数为:{}",mac,sessionMap.size());

}

/*

*关闭连接时调用

* @param [userId]

* @return void

*/

@OnClose

public void onClose(Session session,@PathParam(value = "Mac") String mac) {

removeMap(session);

log.info("【websocket退出成功】该设备退出:"+mac);

}

private void removeMap(Session session){

String mac = getUserIdBySession(session);

if(ObjectUtil.isNull(mac)){

return;

}

sessionMap.remove(mac);

//userMap.remove(userId);

removeAme(mac);

}

private String getUserIdBySession(Session session){

for (String mac : sessionMap.keySet()) {

/*session 本身是有一个id的,通过userid 找到Session 然后再通过 其对应 id,与 传入的Session 中的 session对比 */

if(sessionMap.get(mac).getId().equals(session.getId())){

return mac;

}

}

return null;

}

private void removeAme(String mac){

ameHashMap.remove(mac);

log.info("{}:下线成功",ameInfo.getAmeip());

sendInfo("Ame:"+ameInfo.getAmeip()+"下线成功",mac);

}

/*

*发生错误时调用

* @param [session, throwable]

* @return void

*/

@OnError

public void onError(Session session,Throwable throwable) {

log.error("websocket: 发生了错误");

removeMap(session);

throwable.printStackTrace();

}

/*

发送自定义消息

向某一个用户发送,消息

*/

public static void sendInfo(String message,String toMac){

log.info("发送消息:{},内容是:{}",message,toMac);

if(ObjectUtil.isNull(toMac) || StringUtils.isBlank(message)){

log.error("消息不完整");

return;

}

// 包含就发送

//System.out.println(sessionMap.containsKey(toUserId));

if(sessionMap.containsKey(toMac)){

try {

sendMessage(sessionMap.get(toMac),message);

}catch (Exception e){

log.error("发送给用户{}的消息出错",toMac);

}

}

// 用户不在线

else {

log.error("设备{}不在线",toMac);

}

}

public static void sendMessage(Session session,String message) throws IOException {

session.getBasicRemote().sendText(message);

}

}

推荐阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: