Actor模型是一种常见的并发模型,与最常见的并发模型——共享内存(同步锁)不同,它将程序分为许多独立的计算单元——Actor,每个Actor独立管理自己的资源,不同Actor之间通过消息传递来交互。它的好处是全异步执行,不会造成线程阻塞,从而提升CPU使用率,另外由于线程之间是异步交互,所以也不用考虑加锁和线程同步的问题。

Actor模型在业界有许多应用,例如游戏服务器框架Skynet、编程语言Erlang。

因为历史原因,Java下的Actor模型应用较少,知名的只有基于Scala的Akka。而且Actor模型也不是万能的,异步编程会需要编写更多的回调代码,原本的一步需要拆分成若干步来处理,无疑增加了代码编写复杂度(callback hell)。

本文以学习和研究为目的,使用Java实现一个简单Actor模型,功能上模仿Skynet,支持的功能包括:

Actor基础功能:消息发送接收、异步处理等。集群功能:支持多节点之间通信。非阻塞的sleep和网络通信。

完整的源代码在可以在Github获取。以下是部分关键代码以及设计思路讲解。

Actor

Actor是Actor模型中的核心概念,每个Actor独立管理自己的资源,与其他Actor之间通信通过Message。

这里的每个Actor由单线程驱动,相当于Skynet中的服务。Actor不断从mailbox中获取尚未处理的Message,mailbox使用的结构是无界阻塞的LinkedBlockingQueue。

Actor类是抽象类,其中处理消息的handleMessage方法为抽象方法,需要每个具体类来重载实现。

public abstract class Actor {

private Node node;

private String name;

private final BlockingQueue mailbox = new LinkedBlockingQueue<>();

private Thread actorThread;

public Node getNode() {

return node;

}

public void setNode(Node node) {

this.node = node;

}

public void setName(String name) {

this.name = name;

}

public String getName() {

return name;

}

public void start() {

actorThread = new Thread(() -> {

ActorSystem.setThreadLocalActor(this);

for(;;) {

try {

Message message = mailbox.take();

try {

handleMessage(message);

} catch (Exception e) {

e.printStackTrace();

}

} catch (InterruptedException ignore) {

// ignore

}

}

});

actorThread.start();

}

public void act(Message msg) {

mailbox.offer(msg);

}

protected abstract void handleMessage(Message message);

}

Node

Node代表节点,与Skynet中节点意义相同。它是一个独立的Java进程,有自己的IP和端口,Node之间通过异步的网络通信发送和接收消息。一个Node中可以运行多个Actor,一个Actor仅可与一个Node绑定。

Node的唯一标识也是它的name,与Actor的name稍有不同,Node的name是全局唯一,而Actor的name是Node内唯一。

public abstract class Node {

/**

* 名字

* 需要是唯一的,按名字查找

*/

private String name;

private InetSocketAddress address;

public String getName() {

return name;

}

public void setName(String nodeName) {

name = nodeName;

}

public void setAddress(InetSocketAddress address) {

this.address = address;

}

}

ActorSystem

ActorSystem是Actor的管理系统,也是外部调用API的主要入口,提供本框架中的主要功能:创建Actor、发送消息、休眠Actor、网络通信等。下面分别详细说明。

ActorSystem初始化

分为以下三步:

首先是调用conf方法读取集群配置,包括每个Node的name和address。

其次是调用bindNode方法绑定当前Node。

最后是调用start方法初始化自身,包括对定时器的初始化和Netty服务端的初始化。之所以引入定时器,是因为无阻塞sleep需要用到,这个具体后面再说,另外也可以用于扩展实现通用的定时任务功能。Node之间发送消息都是异步的,客户端和服务端都使用了Netty做异步网络通信。

public class ActorSystem {

private static Map clusterConfig;

/**

* 当前绑定到的节点

*/

private static Node currNode;

private final static Map actors = new HashMap<>();

/**

* 维护线程与Actor的对应关系

*/

private final static ThreadLocal currThreadActor = new ThreadLocal<>();

/**

* 客户端Netty bootstrap

*/

private static Bootstrap clientBootstrap;

/**

* 维护节点与通道的对应关系

*/

private final static Map channels = new ConcurrentHashMap<>();

private static void startNettyBootstrap() {

try {

// 先启动服务端bootstrap

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, 100)

.handler(new LoggingHandler(LogLevel.INFO))

.childHandler(new ChannelInitializer() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline p = ch.pipeline();

p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)))

.addLast(new ObjectEncoder())

.addLast(new ServerHandler());

}

});

InetSocketAddress address = clusterConfig.get(currNode.getName());

b.bind(address).sync();

// 再启动客户端bootstrap

EventLoopGroup group = new NioEventLoopGroup();

clientBootstrap = new Bootstrap();

clientBootstrap.group(group)

.channel(NioSocketChannel.class)

.handler(new LoggingHandler(LogLevel.INFO))

.option(ChannelOption.TCP_NODELAY, true)

.handler(new ChannelInitializer() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline p = ch.pipeline();

p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)))

.addLast(new ObjectEncoder())

.addLast(new ClientHandler());

}

});

} catch (Exception e) {

throw new RuntimeException("actor system start fail", e);

}

}

public static void start() {

// 启动定时器

Timer.start();

// 启动Netty bootstrap

startNettyBootstrap();

}

public static void conf(Map config) {

clusterConfig = config;

}

/**

* 将当前系统绑定到某个节点

*/

public static void bindNode(Class nodeClass, String nodeName) {

InetSocketAddress address = clusterConfig.get(nodeName);

try {

Constructor constructor = nodeClass.getDeclaredConstructor();

Node node = constructor.newInstance();

node.setName(nodeName);

currNode = node;

} catch (Exception e) {

throw new RuntimeException("create node fail", e);

}

}

创建Actor

创建Actor调用newActor方法,指定要创建的Actor具体类和Actor name,Actor name需要Node内部唯一。

创建Actor时,先绑定当前Node,再调用Actor的start方法初始化,然后将name与Actor的映射关系加入到actors中。

/**

* 启动新的Actor

*/

public static void newActor(Class actorClass, String name) {

try {

Constructor constructor = actorClass.getDeclaredConstructor();

Actor actor = constructor.newInstance();

actor.setName(name);

actor.setNode(currNode);

actor.start();

actors.put(name, actor);

} catch (Exception e) {

throw new RuntimeException("create actor fail", e);

}

}

}

发送消息

核心是send方法,指定目标Node name、目标Actor name、命令名和参数后发送消息,也可以把这些信息包装在Message中发出。

消息的来源Node和来源Actor保存在一个ThreadLocal变量currThreadActor中。它的作用是在Actor创建时,将Actor线程与Actor绑定在一起,这样当调用send方法发送消息时,无需再显式指定来源Node和来源Actor,因为如果是Actor线程本身调用的send方法,那么直接从currThreadActor中取值即可;否则取不到值,那么来源Node和来源Actor都是null。

如果消息的目标Node与来源Node相同,那么直接找到对应的Actor添加消息即可;否则,需要走网络通信。这里的网络通信实际上就是一个简单的RPC通信,此处使用了Netty的ObjectEncoder和ObjectDecoder做消息的序列化和反序列化(注意:ObjectEncoder和ObjectDecoder在Netty的最新版本中已被废弃,因为Java序列化具有很大的安全隐患,这里仍然使用它们仅是为了演示方便)。

当走网络通信发送消息时,先判断到目标Node的Channel是否有效,若是,则直接发送消息;否则,先重新创建好Channel,再异步发送。这里实际上会有一个多线程同步的问题,就是多个线程同时尝试创建Channel,那么后面创建的Channel会把前面的覆盖掉,最后只会保留最后创建的一个。优化方法有两种:一是允许多个线程同时尝试创建Channel,但是当创建Channel成功时,如果发现已经有创建好的Channel引用了(来自别的线程创建),那么不保留这次创建的Channel,发送也通过已有的Channel引用;二是每次尝试创建Channel时都禁止别的线程做同样的操作。两种优化方法各有优劣,限于时间,这里没有用优化方法做具体实现。

public static void send(Message msg) {

String destNodeName = msg.getDestNode();

String destActorName = msg.getDestActor();

if (destNodeName.equals(currNode.getName())) {

Actor destActor = actors.get(destActorName);

destActor.act(msg);

} else {

sendToAnotherNode(msg);

}

}

private static void sendToAnotherNode(Message msg) {

try {

String destNodeName = msg.getDestNode();

// 如果没有连接,那么先建立连接

Channel channel = getChannel(destNodeName);

if (!isChannelValid(channel)) {

InetSocketAddress address = clusterConfig.get(destNodeName);

// TODO 有可能出现多线程同时尝试建立连接的情况,这里会保留最后一个

// 优化方法有两种:

// 1. 允许多次尝试,当建立连接成功后,如果已有成功连接的引用,那么不保留这次创建的连接

// 2. 尝试时阻塞其他尝试

clientBootstrap.connect(address).addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

setChannel(destNodeName, future.channel());

future.channel().writeAndFlush(msg);

}

});

} else {

// 否则直接发送消息

channel.writeAndFlush(msg);

}

} catch (Exception e) {

throw new RuntimeException("send to another node fail");

}

}

public static void send(String destNodeName, String destActorName, String command, Object... params) {

Actor srcActor = currThreadActor.get();

String srcActorName = srcActor == null ? null : srcActor.getName();

String srcNodeName = srcActor == null ? null : srcActor.getNode().getName();

Message msg = new Message(command, srcNodeName, srcActorName, destNodeName, destActorName, params);

send(msg);

}

public static boolean isChannelValid(Channel channel) {

return channel != null && channel.isActive() && channel.isWritable();

}

public static Channel getChannel(String destNodeName) {

return channels.get(destNodeName);

}

public static void setChannel(String destNodeName, Channel channel) {

channels.put(destNodeName, channel);

}

/**

* Actor发送给自己

*/

public static void sendSelf(String command, Object... params) {

Actor selfActor = currThreadActor.get();

if (selfActor == null) {

throw new RuntimeException("not in an actor, send fail");

}

send(selfActor.getNode().getName(), selfActor.getName(), command, params);

}

public static void setThreadLocalActor(Actor actor) {

currThreadActor.set(actor);

}

休眠Actor

休眠Actor调用sleep方法实现,它制定了需要休眠的毫秒数,休眠完后回调的命令及参数。

sleep方法对应于Skynet中的skynet.sleep,它们都是阻塞任务但是不阻塞线程。不同的是,skynet.sleep使用了Lua的协程yield/resume,在实现上更加优雅,对用户是透明的,用户无需指定回调函数,就能在sleep到期时自动切换回当前任务继续执行。而Java没有这种特性,所以此处乞丐版的实现需要指定回调方法。

这里的sleep方法和skynet.sleep一样,底层都是通过定时任务来实现。具体来说,sleep调用后会添加一个TimerTask,封装了过期时间和回调命令及参数,待任务到期后将命令封装成Message发送给当前Actor自身。

public static void sleep(long millis, String command, Object... params) {

String destActorName = currThreadActor.get().getName();

Timer.addTimeTask(new TimerTask(System.currentTimeMillis() + millis, () -> {

ActorSystem.send(currNode.getName(), destActorName, command, params);

}));

}

定时器

上面说到sleep方法依赖定时器的实现。定时器在Timer类中实现,它在start方法中启动一个线程不断轮询处理定时任务,并提供了addTimeTask方法添加新的定时任务。

Timer使用优先级队列作为存储定时任务的数据结构,这样在插入任务时可以达到O(logN)的时间复杂度。

为性能考虑,Timer主线程非采用每隔一小段时间不断轮询的方式,而是在当前没有任务需要执行时保持阻塞。为此需要考虑两个唤醒阻塞条件,一是任务队列由空到非空时唤醒,二是当下个定时任务还没到期而阻塞时,插入一个到期时间更早的定时任务,需要重新设定阻塞时间,因此先唤醒主线程。

public class Timer {

/**

* 基于优先级队列实现的定时任务队列

*/

private static final PriorityQueue timerTasks = new PriorityQueue<>();

private static final ReentrantLock lock = new ReentrantLock();

/**

* 唤醒阻塞条件一:队列非空

*/

private static final Condition notEmpty = lock.newCondition();

/**

* 唤醒阻塞条件二:当前时刻有任务需要执行

*/

private static final Condition hasCurrTask = lock.newCondition();

/**

* 添加新的定时任务

*/

public static void addTimeTask(TimerTask task) {

lock.lock();

if (timerTasks.isEmpty()) {

notEmpty.signal();

}

TimerTask firstTask = timerTasks.peek();

timerTasks.offer(task);

if (firstTask != null && task.getExecTime() < firstTask.getExecTime()) {

hasCurrTask.signal();

}

lock.unlock();

}

/**

* 启动定时器

*/

public static void start() {

Executor executor = Executors.newSingleThreadExecutor();

executor.execute(() -> {

while (true) {

TimerTask firstTask;

lock.lock();

if (timerTasks.isEmpty()) {

try {

notEmpty.await();

} catch (InterruptedException ignore) {

// ignore

}

}

firstTask = timerTasks.peek();

long currDeadlineMillis = firstTask.getExecTime();

long currTime = System.currentTimeMillis();

long delay = currDeadlineMillis - currTime;

if (delay > 0) {

try {

hasCurrTask.await(delay, TimeUnit.MILLISECONDS);

} catch (InterruptedException ignore) {

// ignore

}

} else {

firstTask = timerTasks.poll();

}

lock.unlock();

if (firstTask != null) {

firstTask.run();

}

}

});

}

}

程序运行

示例程序放在test包下面,涉及到的类说明:

ActorPing:每隔固定间隔向ActorPong发送消息,并接收回包。ActorPong:接收ActorPing发送的消息并原样返回。Cluster:包含NodeA和NodeB两个节点的配置。NodeA:启动时创建两个ActorPing,分别命名为ping1和ping2,分别以1s和5s的间隔向NodeB上的pong发送消息。NodeB:启动时创建一个ActorPong,命名为pong。

运行时,先启动NodeB,再启动NodeA,NodeA下面会打印带时间戳的如下信息:

[time:8, srcActor:null, destActor:ping1]command:start,params:[1000]

[time:8, srcActor:null, destActor:ping2]command:start,params:[5000]

[time:9, srcActor:ping1, destActor:ping1]command:ping,params:[1000]

[time:9, srcActor:ping2, destActor:ping2]command:ping,params:[5000]

[time:22, taskId:2]addTask

[time:22, taskId:1]addTask

[time:143, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]

[time:143, srcActor:pong, destActor:ping2]command:receivePong,params:[msg]

[time:1026, taskId:2]execTask

[time:1026, srcActor:null, destActor:ping1]command:ping,params:[1000]

[time:1029, taskId:3]addTask

[time:1035, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]

[time:2033, taskId:3]execTask

[time:2034, srcActor:null, destActor:ping1]command:ping,params:[1000]

[time:2034, taskId:4]addTask

[time:2037, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]

[time:3036, taskId:4]execTask

[time:3036, srcActor:null, destActor:ping1]command:ping,params:[1000]

[time:3036, taskId:5]addTask

[time:3039, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]

[time:4041, taskId:5]execTask

[time:4042, srcActor:null, destActor:ping1]command:ping,params:[1000]

[time:4042, taskId:6]addTask

[time:4044, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]

[time:5022, taskId:1]execTask

[time:5022, srcActor:null, destActor:ping2]command:ping,params:[5000]

[time:5022, taskId:7]addTask

NodeB下面会打印如下信息:

[time:1938, srcActor:ping2, destActor:pong]command:pong,params:[msg]

[time:1940, srcActor:ping1, destActor:pong]command:pong,params:[msg]

[time:2855, srcActor:ping1, destActor:pong]command:pong,params:[msg]

[time:3856, srcActor:ping1, destActor:pong]command:pong,params:[msg]

[time:4856, srcActor:ping1, destActor:pong]command:pong,params:[msg]

[time:5860, srcActor:ping1, destActor:pong]command:pong,params:[msg]

[time:6850, srcActor:ping2, destActor:pong]command:pong,params:[msg]

小结

本文总结了使用Java实现一个简单Actor模型的完整流程。由于时间所限,本文只实现了Actor模型的基础功能。不过造轮子的目的主要是为了深入掌握Actor模型的核心概念,作为演示和研究的用途。对于并发模型来说,不管用哪种语言来实现,原理才是主要的、相通的,语言只不过是实现的工具。相信笔者的这篇文章也会帮助读者对Actor模型有更为深入的了解。

文章链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: