作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主系列专栏:Java设计模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列、duubo源码系列如果感觉博主的文章还不错的话,请三连支持一下博主哦博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人联系方式:hls1793929520,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

文章目录

一、引言二、服务端调用流程1、启动封装1.1 过滤器的封装1.2 Hander的封装

2、服务调用2.1 Handler调用2.1 MultiMessageHandler2.2 HeartbeatHandler2.3 AllChannelHandler2.4 DecodeHandler2.5 HeaderExchangeHandler

2.2 过滤器调用2.3 方法调用

三、流程图

四、总结

一、引言

对于 Java 开发者而言,关于 dubbo ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。

但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。

本期 dubbo 源码解析系列文章,将带你领略 dubbo 源码的奥秘

本期源码文章吸收了之前 Spring、Kakfa、JUC源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。

虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!

废话不多说,发车!

二、服务端调用流程

对于整个服务端调用来说,主要分为两部分:

服务端启动时:封装的 Handler 和 Filter服务端调用时:经过 Handler,然后经过 Filter,最后调用目标方法

1、启动封装

我们启动的时候,会经过 Exporter exporter = protocolSPI.export(invoker) ,走 Dubbo SPI 的扩展机制

1.1 过滤器的封装

首先是我们的过滤器的封装:ProtocolFilterWrapper

public Exporter export(Invoker invoker) throws RpcException {

FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());

// buildInvokerChain:

return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));

}

public Invoker buildInvokerChain(final Invoker originalInvoker, String key, String group) {

// 获取所有的过滤器

filters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModels.get(0)).getActivateExtension(url, key, group);

// 如果当前的过滤器不为空

if (!CollectionUtils.isEmpty(filters)) {

// 将所有的过滤器封装成链表(last)的形式

for (int i = filters.size() - 1; i >= 0; i--) {

final Filter filter = filters.get(i);

final Invoker next = last;

last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);

}

// 将过滤器链表封装成CallbackRegistrationInvoker类型

return new CallbackRegistrationInvoker<>(last, filters);

}

}

到这里,我们将过滤器封装成一个链表并且将其封装成 CallbackRegistrationInvoker 形式

我们直接跳到 DubboProtocol.export 中:

public Exporter export(Invoker invoker){

// 得到当前注册Zookeeper的URL

URL url = invoker.getUrl();

// 根据URL得到唯一的key:cn/com.common.service.IUserService:1.0.0.test:20883

// 分组(group) + 接口(intfenerce) + 版本(1.0.0.test) + 端口号(20883)

String key = serviceKey(url);

//

DubboExporter exporter = new DubboExporter(invoker, key, exporterMap);

}

public DubboExporter(Invoker invoker, String key, Map> exporterMap) {

super(invoker);

this.key = key;

this.exporterMap = exporterMap;

// 将当前的过滤器放至exporterMap中,方便我们后面的获取

exporterMap.put(key, this);

// 打开服务

openServer(url);

optimizeSerialization(url);

return exporter;

}

到这里,我们的过滤器就封装到了 exporterMap 里面,后面会用到,我们后面再聊

1.2 Hander的封装

在我们上面封装完过滤器之后,我们会进行 openServer 打开服务这个操作,该操作会进行 Handler 的封装并启动我们的 Netty 服务

这里的Handler 一共封装成下面的流程:

NettyServerhandler -> NettyServer -> MultiMessageHandler--->HeartbeatHandler---->AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> ExchangeHandlerAdapter

最终会走到 NettyServer 的 doOpen 方法:这里对 Netty 不太清楚的,可以看博主的 Netty 源码文章:【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码

// 典型的Netty启动的流程

protected void doOpen() throws Throwable {

bootstrap = new ServerBootstrap();

bossGroup = createBossGroup();

workerGroup = createWorkerGroup();

final NettyServerHandler nettyServerHandler = createNettyServerHandler();

channels = nettyServerHandler.getChannels();

// 初始化我们的服务端启动器

initServerBootstrap(nettyServerHandler);

ChannelFuture channelFuture = bootstrap.bind(getBindAddress());

channelFuture.syncUninterruptibly();

channel = channelFuture.channel();

}

protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {

boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);

bootstrap.group(bossGroup, workerGroup)

.channel(NettyEventLoopFactory.serverSocketChannelClass())

.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)

.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)

.childOption(ChannelOption.SO_KEEPALIVE, keepalive)

.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

int idleTimeout = UrlUtils.getIdleTimeout(getUrl());

NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);

if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {

ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));

}

// 这里添加Netty的Handler责任链

ch.pipeline()

// 解码器

.addLast("decoder", adapter.getDecoder())

// 编码器

.addLast("encoder", adapter.getEncoder())

.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))

// 把上面封装的Handler放到Netty中,便于我们的调用执行

.addLast("handler", nettyServerHandler);

}

});

}

这里如果对 Netty 责任链不熟悉的可参考:【Netty 从成神到升仙系列 五】Netty 的责任链真有这么神奇吗?

2、服务调用

我们上篇文章剖析了 消费端 是如何进行的服务调用:从源码全面解析 dubbo 消费端服务调用的来龙去脉

这篇我们来看下服务端是如何进行服务调用的

2.1 Handler调用

我们直接跳到 NettyServerHandler 的 channelRead 的方法

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

// 主要看这个Handler做的事情

handler.received(channel, msg);

ctx.fireChannelRead(msg);

}

我们从上图看到,重点是这五个 Handler :

MultiMessageHandler:处理多个数据包发送的消息,也称为“多包消息”HeartbeatHandler:定期发送“心跳”消息,以确保连接仍然存在并响应正常AllChannelHandler:管理所有通道的打开和关闭DecodeHandler:将二进制数据解码为消息对象。HeaderExchangeHandler:负责协议头部的交换和处理

我们挨个去看看他们实际的作用

2.1 MultiMessageHandler

如果当前的请求是多个的话,需要进行切割成单个请求往下传递如果是单个请求的话,直接向下传递即可

public void received(Channel channel, Object message) throws RemotingException {

if (message instanceof MultiMessage) {

MultiMessage list = (MultiMessage) message;

for (Object obj : list) {

try {

handler.received(channel, obj);

}

}

} else {

handler.received(channel, message);

}

}

2.2 HeartbeatHandler

服务端:判断当前的请求是不是心跳请求,如果是心跳请求的话,发送心跳请求消费端:判断当前的请求是不是心跳请求,处理心跳请求

public void received(Channel channel, Object message) throws RemotingException {

// 记录最近读取的时间

setReadTimestamp(channel);

// 判断当前的请求是不是心跳请求

if (isHeartbeatRequest(message)) {

Request req = (Request) message;

if (req.isTwoWay()) {

// 如果当前是同一个心跳检测则返回同一个响应

Response res = new Response(req.getId(), req.getVersion());

res.setEvent(HEARTBEAT_EVENT);

channel.send(res);

if (logger.isDebugEnabled()) {

int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);

}

}

return;

}

// 消费端使用:处理心跳响应

if (isHeartbeatResponse(message)) {

return;

}

handler.received(channel, message);

}

2.3 AllChannelHandler

为每一个服务端的请求从线程池中分配一个线程执行

public void received(Channel channel, Object message) throws RemotingException {

// 获取线程

ExecutorService executor = getPreferredExecutorService(message);

try {

// 将当前的Handler丢进线程池里面执行

executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));

}

}

2.4 DecodeHandler

根据当前的响应请求进行解码操作

public void received(Channel channel, Object message) throws RemotingException {

if (message instanceof Decodeable) {

decode(message);

}

if (message instanceof Request) {

decode(((Request) message).getData());

}

if (message instanceof Response) {

decode(((Response) message).getResult());

}

handler.received(channel, message);

}

2.5 HeaderExchangeHandler

调用我们的过滤器并等待数据的返回将数据通过 Channel 返回至客户端

public void received(Channel channel, Object message) throws RemotingException {

final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);

// 服务端:当前的消息是请求

if (message instanceof Request) {

Request request = (Request) message;

if (request.isEvent()) {

handlerEvent(channel, request);

} else {

// 进行请求的解析

if (request.isTwoWay()) {

handleRequest(exchangeChannel, request);

} else {

handler.received(exchangeChannel, request.getData());

}

}

} else if (message instanceof Response) {

handleResponse(channel, (Response) message);

} else {

handler.received(exchangeChannel, message);

}

}

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {

Response res = new Response(req.getId(), req.getVersion());

// Request [id=17, version=2.0.2, twoWay=true, event=false, broken=false, data=RpcInvocation [methodName=getUserById, parameterTypes=[class java.lang.Long]]]

Object msg = req.getData();

// 执行过滤器的操作

CompletionStage future = handler.reply(channel, msg);

// 等待数据的返回

future.whenComplete((appResult, t) -> {

try {

if (t == null) {

res.setStatus(Response.OK);

res.setResult(appResult);

} else {

res.setStatus(Response.SERVICE_ERROR);

res.setErrorMessage(StringUtils.toString(t));

}

// 没有问题的话,将我们的数据返回

channel.send(res);

}

});

}

服务端解码之后的消息:

2.2 过滤器调用

我们直接跳到 DubboProtocol 的 reply 方法

public CompletableFuture reply(ExchangeChannel channel, Object message){

Invocation inv = (Invocation) message;

// 得到过滤器的invoker

Invoker invoker = getInvoker(channel, inv);

// 执行过滤器

Result result = invoker.invoke(inv);

// 返回结果

return result.thenApply(Function.identity());

}

Invoker getInvoker(Channel channel, Invocation inv){

// 根据组+版本号+接口确定唯一的key

String serviceKey = serviceKey(port,path,(String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),(String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY));

// 得到过滤器(我们在上面进行过对应的添加)

DubboExporter exporter = (DubboExporter) exporterMap.get(serviceKey);

// 返回过滤器

return exporter.getInvoker();

}

这里的过滤器总共十个,这里不带大家细看每一个的源码了,了解意思即可

ContextFilter:负责将请求上下文传递给请求处理流程中的其他组件ProfilerServerFilter:负责性能分析和监视EchoFilter:将请求消息作为响应消息返回ClassLoaderFilter:负责加载和管理类加载器GenericFilter:提供一种通用的请求处理机制ExceptionFilter:负责处理异常并生成错误响应消息MonitorFilter:负责监视请求处理过程中的状态和信息TimeoutFilter:负责处理请求处理超时TraceFilter:跟踪请求处理流程中的各个阶段和信息ClassLoaderCallbackFilter:提供了回调函数的机制

2.3 方法调用

最终我们会到 AbstractProxyInvoker 的 invoke 方法

public Result invoke(Invocation invocation){

// 执行我们动态代理的方法

Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());

// 等待返回的结果

CompletableFuture future = wrapWithFuture(value, invocation);

// 封装请求

CompletableFuture appResponseFuture = future.handle((obj, t) -> {

AppResponse result = new AppResponse(invocation);

if (t != null) {

if (t instanceof CompletionException) {

result.setException(t.getCause());

} else {

result.setException(t);

}

} else {

result.setValue(obj);

}

// 返回数据

// AppResponse [value=User(id=2, name=天涯, age=12), exception=null]

return result;

});

return new AsyncRpcResult(appResponseFuture, invocation);

}

这里可以参考这篇文章:从源码全面解析 dubbo 服务暴露的来龙去脉

public Invoker getInvoker(T proxy, Class type, URL url) {

try {

final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);

return new AbstractProxyInvoker(proxy, type, url) {

@Override

// proxy:实现类

// methodName:getUserById

// parameterTypes:class java.lang.Long

// arguments:2

// 执行相对应的方法即可

protected Object doInvoke(T proxy, String methodName,

Class[] parameterTypes,

Object[] arguments) throws Throwable {

return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);

}

};

}

}

三、流程图

高清图片可私信博主

四、总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。

如果你也对 后端架构和中间件源码 有兴趣,欢迎添加博主微信:hls1793929520,一起学习,一起成长

我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。

我们下期再见。

我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。

往期文章推荐:

美团二面:聊聊ConcurrentHashMap的存储流程从源码全面解析Java 线程池的来龙去脉从源码全面解析LinkedBlockingQueue的来龙去脉从源码全面解析 ArrayBlockingQueue 的来龙去脉从源码全面解析ReentrantLock的来龙去脉阅读完synchronized和ReentrantLock的源码后,我竟发现其完全相似从源码全面解析 ThreadLocal 关键字的来龙去脉从源码全面解析 synchronized 关键字的来龙去脉阿里面试官让我讲讲volatile,我直接从HotSpot开始讲起,一套组合拳拿下面试

推荐文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 

发表评论

返回顶部暗黑模式