spark 组件间通信原本使用的是akka。后来改成了用netty实现了一个类似akka的框架。 主要类在 spark-core的rpc包下面。

RpcEnv:接口,rpc运行的环境RpcEndpoint:RPC端点是对Spark的RPC通信实体的统一抽象,所有运行于RPC框架之上的实体都应该继承RpcEndpoint。RpcEndpointRef:RPC端点的引用,抽象类RpcEndpoint-Ref定义了所有RpcEndpoint引用的属性与接口NettyRpcEnv:RpcEnv的唯一实现Dispatcher:Dispatcher负责将RPC消息路由到要该对此消息处理的RpcEndpoint(RPC端点)MessageLoop:MessageLoop的主要职责是处理和调度从网络接收到的消息。Inbox:是Spark中的一个消息收件箱,它也位于每个本地节点上。Outbox:是Spark中的一个消息发件箱,它也位于每个本地节点上。NettyRpcCallContext:回调方法,在消息被处理后,回调处理成功或者失败的情况

rpc启动

以spark中master启动举例 程序入口是Master类的main方法。在main中调用startRpcEnvAndEndpoint方法。 startRpcEnvAndEndpoint中 RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) 是开始创建一个RpcEnv 首先创建RpcEnvConfig。参数含义在下图。其中clientMode没有传入,默认是false。代表不是客户端。 再通过NettyRpcEnvFactory创建NettyRpcEnv 创建一个NettyRpcEnv 启动NettyRpcEnv

NettyRpcEnv创建

NettyRpcEnv创建的时候,同时初始化了一些成员变量。 transportContext、clientFactory、transportConf 在上一篇网络通信是说过了。 下面主要讲讲 dispatcher

dispatcher创建

dispatcher创建的时候,为了能路由消息,有两个缓存endpoints、endpointRefs。 同时创建了一个 sharedLoop,在同一个线程池中并行处理多个rpc实例的请求。

ShareMessageLoop创建

创建线程池,提交循环任务到线程池中执行。 循环是不断获取inbox消息进行处理。

NettyRpcEnv启动

回到NettyRpcEnvFactory的create方法中。 Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1是启动的入口。 其中 startNettyRpcEnv 是一个函数。 在 Utils.startServiceOnPort 中 会调用这个函数来启动NettyRpcEnv。 选择合适的端口,尝试多次调用startService启动。 因为spark要启动的service很多,抽象成了Utils里面的一个公共方法,包含了生成端口和重试的通用逻辑。每个service启动的具体方式都是在传入startService函数中实现 startNettyRpcEnv实现就是nettyEnv.startServer。创建了server并在dispatcher中注册。 创建server逻辑在network-common包中,上一篇已经讲过了。主要是使用netty创建了一个server,同时绑定了rpcHandler的处理逻辑,并启动。

val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>

nettyEnv.startServer(config.bindAddress, actualPort)

(nettyEnv, nettyEnv.address.port)

}

注册rpc实例

这里的rpc实例是RpcEndpointVerifier(主要用于验证和监控其他RPC端点的健康状态和连接性),并不是master。 1.创建endpointRef,添加到endpointRefs缓存中(这里在endpoint还没有注册完成就提起添加引用,是因为后面endpoint注册的时候会产生一个onStart的消息,需要endpointRef来处理onStart消息,此时endpointRef就会处理onStart消息) 2.在shareLoop中注册endpoint 3.endpoint注册完成,添加endpoints缓存中 创建inbox消息,同时将onStart消息放到message队列中。 messageLoop注册 实例名和inbox到缓存中。 执行setActive,将inbox放入处理队列进行处理。

endPoint启动

inbox中调用process处理onStart消息。 可以看到首先是判断message类型。这里传入的是onStart消息,所以走OnStart部分。 endPoint.onStart() 这个endPoint是什么?它是在dispatcher.registerRpcEndpoint的时候传入的是,是RpcEndpointVerifier(主要用于验证和监控其他RPC端点的健康状态和连接性)。RpcEndpointVerifier的onStart方法默认什么都没有做。 到此NettyRpc创建完成并且启动。

Master注册到RpcEnv

new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)是master的创建,里面内容太多了。放在master创建里面单独开一篇。现在可以把它当成一个endPoint对待就行。 setupEndpoint就是在dispatcher中注册endPoint。跟上面NettyRpcEnv启动的时候注册RpcEndpointVerifier一样。最后会调用endPoint的onStart方法。 调用master中onStart方法。在上面new Master()是master的创建,onStart是master的启动。这部分也会放到master部分再讲。 至此 master中关于RpcEnv启动完成。

发送rpc消息

NettyRpcEnv主要是以下两个发送消息的方法: send:发送单向异步的消息。所谓“单向”就是发送完后就会忘记此次发送,不会有任何状态要记录,也不会期望得到服务端的回复。send采用了at-most-once的投递规则。RpcEndpointRef的send方法非常类似于Akka中Actor的tell方法。 ask:发送同步的请求,此类请求将会被RpcEndpoint接收,并在指定的超时时间内等待返回类型为T的处理结果。

send方法

NettyRpcEndpointRef发送massage 生成requestMassage,三个参数(发送方的地址、接收方的地址、内容) 调用nettyRpcEnv的send方法 判断接收方地址是不是本地地址。是的话就走local,不是的话走remote。 local比较简单,我们从remote分析 可以看到remote最后调用的是postToOutbox方法,顾名思义就是把消息放到发件箱。

ask方法

我们从send方法停一下,看一下ask方法。 ask和send方法类似,也是先生成requestMassage再调用nettyRpcEnv的同名的askAbortable方法。但是ask参数多了一个超时时间,同时返回一个futrue对象,表明ask是需要回复的。 可以看到最后调用的也是postToOutbox方法。表明send和ask最后都是将消息放到outbox发件箱中来处理的。

postToOutbox放到发件箱

将消息加到massages队列中,调用drainOutbox方法处理队列的消息。 drainOutbox中为了避免线程竞争,采用了分段加锁和判断状态提前返回的方式。 假设是首次调用这个方法,我们看看它会怎么做。 stopped是false,通过 connectFuture是null,通过 client是null,进入launchConnectTask方法,launchConnectTask完成后会return退出方法,所以launchConnectTask中肯定还有drainOutbox调用

launchConnectTask创建远程连接

创建client并将它赋值到outbox的成员变量上,方便后面使用。 val _client = nettyEnv.createClient(address) address是remote的地址。 具体创建client方法在上一篇已经写了,这里就不再写了。 最后再次调用drainOutbox,跟上面设想的一样。

循环处理outbox的消息

在循环里面也采用分段加锁的方式。 发送消息是message.sendWith(_client) 最后messages队列中没有消息了,就将draining状态置为false,退出循环

client发送消息

可以看到是client发送消息。这部分也在上一篇写过了,这里不再赘述了。 到此,rpc消息发送完成。发送流程可以用下图表示:

序号①:表示通过调用NettyRpcEndpointRef的send和ask方法向本地节点的Rpc-Endpoint发送消息。由于是在同一节点,所以直接调用Dispatcher的postLocalMessage或postOneWayMessage方法将消息放入EndpointData内部Inbox的messages列表中。Message-Loop线程最后处理消息,并将消息发给对应的RpcEndpoint处理。序号②:表示通过调用NettyRpcEndpointRef的send和ask方法向远端节点的Rpc-Endpoint发送消息。这种情况下,消息将首先被封装为OutboxMessage,然后放入到远端RpcEndpoint的地址所对应的Outbox的messages列表中。序号③:表示每个Outbox的drainOutbox方法通过循环,不断从messages列表中取得OutboxMessage。序号④:表示每个Outbox的drainOutbox方法使用Outbox内部的TransportClient向远端的NettyRpcEnv发送序号③中取得的OutboxMessage。序号⑤:表示序号④发出的请求在与远端NettyRpcEnv的TransportServer建立了连接后,请求消息首先经过Netty管道的处理,然后经由NettyRpcHandler处理,最后NettyRpcHandler的receive方法会调用Dispatcher的postRemoteMessage或postOneWay-Message方法,将消息放入EndpointData内部Inbox的messages列表中。MessageLoop线程最后处理消息,并将消息发给对应的RpcEndpoint处理。

接收rpc消息

在server端接收消息并处理,主要是通过rpcHandler。 rpcHandler是一个接口,这里的实现类是NettyRpcHandler,对应方法是receive receive有两个,一个是有callback,对请求有回复。一个没有callback,不用回复。 都会调用internalReceive方法。 收到远程消息的时候,需要调用postToAll通知dispatcher中已经注册的rpcPoint实例。postToAll中是每一个已经注册的endpoint调用postMessage发送RemoteProcessConnected消息。 获取到messageLoop,使用messageLoop发送消息。 从loop中获取对应的inbox收件箱,将message加入inbox中(就是加入inbox的messages缓存中)。再将inbox放入待处理队列中,等待处理。 后面处理可以参考上面的onStart消息的处理,流程是一样的。

文章来源

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: