前言

在上篇文章对RxJava 的工作流程进行的简单的分析,今天来分享一下线程切换的流程。如果觉得源码枯燥可以直接移至文末看图理解。

实例代码

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) {

emitter.onNext("123");

}

})

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Observer() {

.........

}

我们都知道subscribeOn 是切换上游线程,observeOn是切换下游环境,接下来我们就看下它是怎么切换的。

我们先看下 Schedulers.io() 是什么

@NonNull

public static Scheduler io() {

return RxJavaPlugins.onIoScheduler(IO);

}

->

Scheduler IO = RxJavaPlugins.initIoScheduler(new IOTask());

->

static final class IOTask implements Supplier {

@Override

public Scheduler get() {

return IoHolder.DEFAULT;

}

}

->

static final class IoHolder {

static final Scheduler DEFAULT = new IoScheduler();

}

我们通过层层分解,层层递进,了解到 Schedulers.io() 最终返回的是一个 IoScheduler()

可以暂时将它理解为一个任务调度器,用来执行我们的任务。

接下来我们再看下 AndroidSchedulers.mainThread() 。

public static Scheduler mainThread() {

return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);

}

->

private static final Scheduler MAIN_THREAD =

RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);

->

Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), true);

可以看到这里返回的是一个 HandlerScheduler ,这里是对Handler进行了一个封装,所以归根结底,向主线程切换任务还是通过handler 来完成的,接下来我们就看看其中的细枝末节。

两个参数分析完了,然后来看下两个操作符,看看它俩做了些什么事情。

首先我们来看下subscribeOn

public final Observable subscribeOn(@NonNull Scheduler scheduler) {

Objects.requireNonNull(scheduler, "scheduler is null");

return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));

}

这里是对 IoScheduler() 和上游封装的包裹进行的二次封装

上游的包裹:这里指的通过create创建的 ObservableCreate

observeO操作符:

public final Observable observeOn(@NonNull Scheduler scheduler) {

return observeOn(scheduler, false, bufferSize());

}

->

public final Observable observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {

Objects.requireNonNull(scheduler, "scheduler is null");

ObjectHelper.verifyPositive(bufferSize, "bufferSize");

return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));

}

在这里是对 HandlerScheduler 和 上游封装的包裹进行的二次封装

上游的包裹:这里指的是通过subscribeOn 操作符 创建的 ObservableSubscribeOn

然后我们开始看订阅这里了,我们的流程从 subscribe 这里才刚刚开始。

经过上一篇文章的分析,我们可以知道调用的 subscribeActual 方法都是在上游操作符创建的封装对象里,所以我们直接看 ObservableObserveOn 的 subscribeActual 方法。

如果感觉这段讲解有些跳跃,可以先看一下上篇文章《浅析RxJava》

@Override

protected void subscribeActual(Observer observer) {

.........

Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));

}

在这里是将我们自定义的 Observer 封装成 ObserveOnObserver ,这里的source 是我们上游的封装的包裹,这里指的就是通过subscribeOn 操作符创建的 ObservableSubscribeOn。最终会调用到 ObservableSubscribeOn 的 subscribeActual

方法。

@Override

public void subscribeActual(final Observer observer) {

final SubscribeOnObserver parent = new SubscribeOnObserver<>(observer);

observer.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

}

这里是将下游传上来的 ObserveOnObserver 再次进行封装,封装成 SubscribeOnObserver ,然后再将 SubscribeOnObserver 封装成 SubscribeTask,其实就是一个Runnable。

流程我们稍后再进行分析,我们先来看看异步任务是怎么切换的。

通过上文分析得知,此处的 scheduler 为 subscribeOn 操作符传入的参数,也就是 IoScheduler() 。

接下来我们再看 scheduler的 scheduleDirect 方法。

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

w.schedule(task, delay, unit);

return task;

}

在这里是通过 createWorker() 创建了一个 Worker ,由这个 Worker 去执行 具体的任务。

createWork()是抽象方法,我们需要看IoScheduler 的具体实现。

public Worker createWorker() {

return new EventLoopWorker(pool.get());

}

->

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {

....

return threadWorker.scheduleActual(action, delayTime, unit, tasks);

}

->

NewThreadWorker类

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {

Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

.........

if (delayTime <= 0) {

f = executor.submit((Callable)sr);

} else {

f = executor.schedule((Callable)sr, delayTime, unit);

}

sr.setFuture(f);

return sr;

}

经过层层挖掘,我们看到任务最后是通过 executor 来执行的,executor 就是内部维护的线程池

private final ScheduledExecutorService executor;

至此,整个工作流 就切换成子线程来工作了。

接下来我们继续分析封装的SubscribeTask

final class SubscribeTask implements Runnable {

private final SubscribeOnObserver parent;

SubscribeTask(SubscribeOnObserver parent) {

this.parent = parent;

}

@Override

public void run() {

source.subscribe(parent);

}

}

在任务执行的时候,会通过source 继续将 SubscribeOnObserver向上游传送。这里的source 指的是create 创建的ObservableCreate ,source.subscribe 就会直接调用到 ObservableCreate 的 subscribeActual

@Override

protected void subscribeActual(Observer observer) {

CreateEmitter parent = new CreateEmitter<>(observer);

observer.onSubscribe(parent);

try {

source.subscribe(parent);

} catch (Throwable ex) {

Exceptions.throwIfFatal(ex);

parent.onError(ex);

}

}

一直到这里,跟我们在上篇文章分析的流程就一样了。

将下游传过来的SubscribeOnObserver 再次封装成 CreateEmitter 发射器,然后通过source 继续向上传递,这里的souce 就是指的是我们在create 中传递进去的ObservableOnSubscribe。

然后在ObservableOnSubscribe 的 subscribe 中,通过 emitter.onNext 将我们的数据开始进行下发。

ObservableEmitter

@Override

public void onNext(T t) {

........

if (!isDisposed()) {

observer.onNext(t);

}

}

这里的observer 是SubscribeOnObserver

SubscribeOnObserver

@Override

public void onNext(T t) {

downstream.onNext(t);

}

这里的downstream 是指 ObservableObserveOn

ObserveOnObserver

@Override

public void onNext(T t) {

......

if (sourceMode != QueueDisposable.ASYNC) {

queue.offer(t);

}

schedule();

}

这里的 sourceMode 未被赋值,会调用 queue.offer(t) ,将数据放入到队列中。

接下来再看 schedule() 做了些什么 ?

void schedule() {

if (getAndIncrement() == 0) {

worker.schedule(this);

}

}

通过上文的分析,我们可以知道 observeOn 操作符创建的 Scheduler 为 HandlerScheduler ,所以这里的 worker.schedule(this) 方法调用的是 HandlerScheduler 的内部静态子类 HandlerWorker 的 schedule 方法。

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {

...........

if (disposed) {

return Disposable.disposed();

}

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);

message.obj = this;

if (async) {

message.setAsynchronous(true);

}

handler.sendMessageDelayed(message, unit.toMillis(delay));

if (disposed) {

handler.removeCallbacks(scheduled);

return Disposable.disposed();

}

return scheduled;

}

最终是在这里通过Handler将任务切换到了主线程执行。

ObserveOnObserver 类实现了Runnable 接口, worker.schedule(this) 是将自身交给Handler 去执行。所以最终的结果还会由 ObserveOnObserver 的run方法来执行。

public void run() {

if (outputFused) {

drainFused();

} else {

drainNormal();

}

}

这里我们是典型的使用方式,我们直接来看下 drainNormal();

void drainNormal() {

final SimpleQueue q = queue;

final Observer a = downstream;

for (;;) {

............

for (;;) {

T v = q.poll();

a.onNext(v);

}

..............

}

}

在这里 将数据从队列中取出,然后调用下游的 onNext ,这里的 downstream 也就是我们最后自定义的观察者 Observer 了。

整个过程也好比是一个封包裹和拆包裹的过程。用洋葱模型表示一下会更加的形象。

最后上图!

可能文字的叙述还是太抽象, 用这样一张图来表示整个流程可能相对好理解一些。

写在最后

纸上得来终觉浅,绝知此事要躬行。如果有时间还是建议自己跟一遍源码流程,这样才能真正理解。

今天就水到这里,希望对大家有所帮助。

精彩文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 

发表评论

返回顶部暗黑模式