在多连接服的情况下,如何做到某个业务逻辑服能准确定位到目标消息推送的用户所在的单台连接服?

前提:

长连接服务器组单个服务启动的时候,需要向nacos里面注册源数据ws.sid 网关,连接服,业务逻辑服使用dubbo作为RPC框架

# 网关中长连接服务的路由配置

- id: websocket-server-group

uri: lb:ws://websocket-server

predicates:

- Path=/websocket/**

filters:

- StripPrefix=1

- Customer

// 在网关的启动类加上注解

@SpringBootApplication(scanBasePackages = {"org.xgxy.common", "org.xgxy.gateway"})

// nacos

@EnableDiscoveryClient

// 自定义负载均衡处理类,只针对转发地址为websocket-server的请求生效

@LoadBalancerClient(value = "websocket-server", configuration = {NacosWebsocketClusterChooseRule.class})

public class GatewayApplication {

public static void main(String[] args) {

SpringApplication.run(GatewayApplication.class, args);

}

}

原理:

请求打过来的时候会进入choose方法,函数参数为Request,可以获取到请求参数中的userid 网关用此userid作为dubbo调用的第一个参数作为负载均衡算法的参数,利用dubbo的机制调用到对应服务器的服务类获取到注册到nacos中的ws.sid 从自定义类中提供的服务器组,遍历每一台的源数据,比对ws.sid,找到对应的长连接服务器建立长连接 因为这里借用的是dubbo的机制,那么只要业务逻辑服调用dubbo时第一个参数也为userid,那么同样利用dubbo的机制一定会转发到同一台长连接服,实现定向调用

package org.xgxy.gateway.lb;

import java.net.URI;

import java.util.*;

import com.alibaba.cloud.nacos.NacosDiscoveryProperties;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.ObjectProvider;

import org.springframework.cloud.client.ServiceInstance;

import org.springframework.cloud.client.loadbalancer.*;

import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;

import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;

import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;

import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;

import org.xgxy.common.enums.CommonEnums;

import org.xgxy.gateway.util.ApplicationContextUtil;

import org.xgxy.gateway.util.GatewayUtil;

import reactor.core.publisher.Mono;

import javax.annotation.Resource;

@Slf4j

// 自定义负载均衡实现需要实现 ReactorServiceInstanceLoadBalancer 接口 以及重写choose方法

public class NacosWebsocketClusterChooseRule implements ReactorServiceInstanceLoadBalancer {

// 注入当前服务的nacos的配置信息

@Resource

private NacosDiscoveryProperties nacosDiscoveryProperties;

// loadbalancer 提供的访问的服务列表

ObjectProvider serviceInstanceListSupplierProvider;

public NacosWebsocketClusterChooseRule(ObjectProvider serviceInstanceListSupplierProvider) {

this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;

}

/**

* 服务器调用负载均衡时调的放啊

* 此处代码内容与 RandomLoadBalancer 一致

*/

public Mono> choose(Request request) {

RequestDataContext context = (RequestDataContext) request.getContext();

URI uri = context.getClientRequest().getUrl();

log.info("request getRawQuery:{}", uri.getRawQuery());

String rawQuery = uri.getRawQuery();

String[] queryList = rawQuery.split("&");

String uid = null;

for (String query : queryList) {

String[] q = query.split("=");

if ("uid".equals(q[0])) {

uid = q[1];

}

}

if (null == uid) {

log.error("建立连接失败,请求参数错误(缺少字段uid), url:{}", uri.toString());

return null;

}

GatewayUtil gatewayUtil = (GatewayUtil)ApplicationContextUtil.getBean(GatewayUtil.class);

final int sid = gatewayUtil.getUserSid(Integer.parseInt(uid));

log.info("获取到用户需要分配的服务器的sid:{}", sid);

ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);

return supplier.get(request).next().map(

(serviceInstances) -> this.processInstanceResponse(supplier, serviceInstances, sid)

);

}

/**

* 对负载均衡的服务进行筛选的方法

* 此处代码内容与 RandomLoadBalancer 一致

*/

private Response processInstanceResponse(ServiceInstanceListSupplier supplier, List serviceInstances, int sid) {

Response serviceInstanceResponse = this.getInstanceResponse(serviceInstances, sid);

if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {

((SelectedInstanceCallback)supplier).selectedServiceInstance((ServiceInstance)serviceInstanceResponse.getServer());

}

return serviceInstanceResponse;

}

/**

* 对负载均衡的服务进行筛选的方法

* 自定义

* 此处的 instances 实例列表 只会提供健康的实例 所以不需要担心如果实例无法访问的情况

*/

private Response getInstanceResponse(List instances, int sid) {

int serverCount = instances.size();

log.info("获取到连接服务器组配置,总数:{}", serverCount);

if (instances.isEmpty()) {

return new EmptyResponse();

}

ServiceInstance finalService = null;

for (ServiceInstance instance : instances) {

// 获取服务在nacos中注册的元数据

Map map = instance.getMetadata();

// 根据uid做一致性hash和服务启动时向nacos中注册的metadata中的ws.sid比较

int metaDataSid = Integer.parseInt(map.get(CommonEnums.WS_SID.getData()));

if (map.containsKey(CommonEnums.WS_SID.getData()) && metaDataSid == sid) {

log.info("找到对应服务器, sid={}", metaDataSid);

finalService = instance;

break;

}

}

if (finalService == null)

throw new RuntimeException(String.format("分配服务器失败,serviceInstances count:%d, sid:%s", sid));

return new DefaultResponse(finalService);

}

}

长连接服中dubbo接口需要指定负载均衡为:用第一个参数做一致性hash

@DubboService(loadbalance = "consistenthash", parameters = {"hash.arguments", "0"})

@Component

@Slf4j

public class SocketServiceImpl implements ISocketService {

@Autowired

private Configs configs;

@Override

public int getServerSid(int userid) {

return configs.getSid();

}

@Override

public void sendMsg(int userid, String msg) {

ServerHelper.sendToClient(userid, msg);

}

}

Nacos写源数据(meta-data)的代码

package org.xgxy.gbr.websocket.init;

import com.alibaba.cloud.nacos.registry.NacosRegistration;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component

@Slf4j

public class NacosMetadataRegister {

// 配置写在配置文件或者Naocs都行

@Value("${ws.sid}")

private String sid;

@Autowired

private NacosRegistration registration;

@PostConstruct

public void putCustomerMetadata() {

registration.getMetadata().put("ws.sid", sid);

}

}

文章链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: