作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,阿里云专家博主系列专栏:Java设计模式、Spring源码系列、Netty源码系列、Kafka源码系列、JUC源码系列、duubo源码系列如果感觉博主的文章还不错的话,请三连支持一下博主哦博主正在努力完成2023计划中:以梦为马,扬帆起航,2023追梦人联系方式:hls1793929520,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬
文章目录
一、引言二、服务端调用流程1、启动封装1.1 过滤器的封装1.2 Hander的封装
2、服务调用2.1 Handler调用2.1 MultiMessageHandler2.2 HeartbeatHandler2.3 AllChannelHandler2.4 DecodeHandler2.5 HeaderExchangeHandler
2.2 过滤器调用2.3 方法调用
三、流程图
四、总结
一、引言
对于 Java 开发者而言,关于 dubbo ,我们一般当做黑盒来进行使用,不需要去打开这个黑盒。
但随着目前程序员行业的发展,我们有必要打开这个黑盒,去探索其中的奥妙。
本期 dubbo 源码解析系列文章,将带你领略 dubbo 源码的奥秘
本期源码文章吸收了之前 Spring、Kakfa、JUC源码文章的教训,将不再一行一行的带大家分析源码,我们将一些不重要的部分当做黑盒处理,以便我们更快、更有效的阅读源码。
虽然现在是互联网寒冬,但乾坤未定,你我皆是黑马!
废话不多说,发车!
二、服务端调用流程
对于整个服务端调用来说,主要分为两部分:
服务端启动时:封装的 Handler 和 Filter服务端调用时:经过 Handler,然后经过 Filter,最后调用目标方法
1、启动封装
我们启动的时候,会经过 Exporter> exporter = protocolSPI.export(invoker) ,走 Dubbo SPI 的扩展机制
1.1 过滤器的封装
首先是我们的过滤器的封装:ProtocolFilterWrapper
public
FilterChainBuilder builder = getFilterChainBuilder(invoker.getUrl());
// buildInvokerChain:
return protocol.export(builder.buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
public
// 获取所有的过滤器
filters = ScopeModelUtil.getExtensionLoader(Filter.class, moduleModels.get(0)).getActivateExtension(url, key, group);
// 如果当前的过滤器不为空
if (!CollectionUtils.isEmpty(filters)) {
// 将所有的过滤器封装成链表(last)的形式
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker
last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);
}
// 将过滤器链表封装成CallbackRegistrationInvoker类型
return new CallbackRegistrationInvoker<>(last, filters);
}
}
到这里,我们将过滤器封装成一个链表并且将其封装成 CallbackRegistrationInvoker 形式
我们直接跳到 DubboProtocol.export 中:
public
// 得到当前注册Zookeeper的URL
URL url = invoker.getUrl();
// 根据URL得到唯一的key:cn/com.common.service.IUserService:1.0.0.test:20883
// 分组(group) + 接口(intfenerce) + 版本(1.0.0.test) + 端口号(20883)
String key = serviceKey(url);
//
DubboExporter
}
public DubboExporter(Invoker
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
// 将当前的过滤器放至exporterMap中,方便我们后面的获取
exporterMap.put(key, this);
// 打开服务
openServer(url);
optimizeSerialization(url);
return exporter;
}
到这里,我们的过滤器就封装到了 exporterMap 里面,后面会用到,我们后面再聊
1.2 Hander的封装
在我们上面封装完过滤器之后,我们会进行 openServer 打开服务这个操作,该操作会进行 Handler 的封装并启动我们的 Netty 服务
这里的Handler 一共封装成下面的流程:
NettyServerhandler -> NettyServer -> MultiMessageHandler--->HeartbeatHandler---->AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> ExchangeHandlerAdapter
最终会走到 NettyServer 的 doOpen 方法:这里对 Netty 不太清楚的,可以看博主的 Netty 源码文章:【Netty 从成神到升仙系列 大结局】全网一图流死磕解析 Netty 源码
// 典型的Netty启动的流程
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = createBossGroup();
workerGroup = createWorkerGroup();
final NettyServerHandler nettyServerHandler = createNettyServerHandler();
channels = nettyServerHandler.getChannels();
// 初始化我们的服务端启动器
initServerBootstrap(nettyServerHandler);
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {
boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_KEEPALIVE, keepalive)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer
@Override
protected void initChannel(SocketChannel ch) throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));
}
// 这里添加Netty的Handler责任链
ch.pipeline()
// 解码器
.addLast("decoder", adapter.getDecoder())
// 编码器
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
// 把上面封装的Handler放到Netty中,便于我们的调用执行
.addLast("handler", nettyServerHandler);
}
});
}
这里如果对 Netty 责任链不熟悉的可参考:【Netty 从成神到升仙系列 五】Netty 的责任链真有这么神奇吗?
2、服务调用
我们上篇文章剖析了 消费端 是如何进行的服务调用:从源码全面解析 dubbo 消费端服务调用的来龙去脉
这篇我们来看下服务端是如何进行服务调用的
2.1 Handler调用
我们直接跳到 NettyServerHandler 的 channelRead 的方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
// 主要看这个Handler做的事情
handler.received(channel, msg);
ctx.fireChannelRead(msg);
}
我们从上图看到,重点是这五个 Handler :
MultiMessageHandler:处理多个数据包发送的消息,也称为“多包消息”HeartbeatHandler:定期发送“心跳”消息,以确保连接仍然存在并响应正常AllChannelHandler:管理所有通道的打开和关闭DecodeHandler:将二进制数据解码为消息对象。HeaderExchangeHandler:负责协议头部的交换和处理
我们挨个去看看他们实际的作用
2.1 MultiMessageHandler
如果当前的请求是多个的话,需要进行切割成单个请求往下传递如果是单个请求的话,直接向下传递即可
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
try {
handler.received(channel, obj);
}
}
} else {
handler.received(channel, message);
}
}
2.2 HeartbeatHandler
服务端:判断当前的请求是不是心跳请求,如果是心跳请求的话,发送心跳请求消费端:判断当前的请求是不是心跳请求,处理心跳请求
public void received(Channel channel, Object message) throws RemotingException {
// 记录最近读取的时间
setReadTimestamp(channel);
// 判断当前的请求是不是心跳请求
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
// 如果当前是同一个心跳检测则返回同一个响应
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(HEARTBEAT_EVENT);
channel.send(res);
if (logger.isDebugEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
}
}
return;
}
// 消费端使用:处理心跳响应
if (isHeartbeatResponse(message)) {
return;
}
handler.received(channel, message);
}
2.3 AllChannelHandler
为每一个服务端的请求从线程池中分配一个线程执行
public void received(Channel channel, Object message) throws RemotingException {
// 获取线程
ExecutorService executor = getPreferredExecutorService(message);
try {
// 将当前的Handler丢进线程池里面执行
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
}
}
2.4 DecodeHandler
根据当前的响应请求进行解码操作
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData());
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
handler.received(channel, message);
}
2.5 HeaderExchangeHandler
调用我们的过滤器并等待数据的返回将数据通过 Channel 返回至客户端
public void received(Channel channel, Object message) throws RemotingException {
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
// 服务端:当前的消息是请求
if (message instanceof Request) {
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
// 进行请求的解析
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else {
handler.received(exchangeChannel, message);
}
}
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
// Request [id=17, version=2.0.2, twoWay=true, event=false, broken=false, data=RpcInvocation [methodName=getUserById, parameterTypes=[class java.lang.Long]]]
Object msg = req.getData();
// 执行过滤器的操作
CompletionStage
// 等待数据的返回
future.whenComplete((appResult, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(appResult);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 没有问题的话,将我们的数据返回
channel.send(res);
}
});
}
服务端解码之后的消息:
2.2 过滤器调用
我们直接跳到 DubboProtocol 的 reply 方法
public CompletableFuture
Invocation inv = (Invocation) message;
// 得到过滤器的invoker
Invoker> invoker = getInvoker(channel, inv);
// 执行过滤器
Result result = invoker.invoke(inv);
// 返回结果
return result.thenApply(Function.identity());
}
Invoker> getInvoker(Channel channel, Invocation inv){
// 根据组+版本号+接口确定唯一的key
String serviceKey = serviceKey(port,path,(String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),(String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY));
// 得到过滤器(我们在上面进行过对应的添加)
DubboExporter> exporter = (DubboExporter>) exporterMap.get(serviceKey);
// 返回过滤器
return exporter.getInvoker();
}
这里的过滤器总共十个,这里不带大家细看每一个的源码了,了解意思即可
ContextFilter:负责将请求上下文传递给请求处理流程中的其他组件ProfilerServerFilter:负责性能分析和监视EchoFilter:将请求消息作为响应消息返回ClassLoaderFilter:负责加载和管理类加载器GenericFilter:提供一种通用的请求处理机制ExceptionFilter:负责处理异常并生成错误响应消息MonitorFilter:负责监视请求处理过程中的状态和信息TimeoutFilter:负责处理请求处理超时TraceFilter:跟踪请求处理流程中的各个阶段和信息ClassLoaderCallbackFilter:提供了回调函数的机制
2.3 方法调用
最终我们会到 AbstractProxyInvoker 的 invoke 方法
public Result invoke(Invocation invocation){
// 执行我们动态代理的方法
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
// 等待返回的结果
CompletableFuture
// 封装请求
CompletableFuture
AppResponse result = new AppResponse(invocation);
if (t != null) {
if (t instanceof CompletionException) {
result.setException(t.getCause());
} else {
result.setException(t);
}
} else {
result.setValue(obj);
}
// 返回数据
// AppResponse [value=User(id=2, name=天涯, age=12), exception=null]
return result;
});
return new AsyncRpcResult(appResponseFuture, invocation);
}
这里可以参考这篇文章:从源码全面解析 dubbo 服务暴露的来龙去脉
public
try {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker
@Override
// proxy:实现类
// methodName:getUserById
// parameterTypes:class java.lang.Long
// arguments:2
// 执行相对应的方法即可
protected Object doInvoke(T proxy, String methodName,
Class>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
三、流程图
高清图片可私信博主
四、总结
鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。
其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。
如果你也对 后端架构和中间件源码 有兴趣,欢迎添加博主微信:hls1793929520,一起学习,一起成长
我是爱敲代码的小黄,独角兽企业的Java开发工程师,CSDN博客专家,喜欢后端架构和中间件源码。
我们下期再见。
我从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。
往期文章推荐:
美团二面:聊聊ConcurrentHashMap的存储流程从源码全面解析Java 线程池的来龙去脉从源码全面解析LinkedBlockingQueue的来龙去脉从源码全面解析 ArrayBlockingQueue 的来龙去脉从源码全面解析ReentrantLock的来龙去脉阅读完synchronized和ReentrantLock的源码后,我竟发现其完全相似从源码全面解析 ThreadLocal 关键字的来龙去脉从源码全面解析 synchronized 关键字的来龙去脉阿里面试官让我讲讲volatile,我直接从HotSpot开始讲起,一套组合拳拿下面试
推荐文章
发表评论