在之前有浅浅的分享了一下RxSwift简单使用, 但是同样的也有一些困惑伴随着我,比如它是如何实现,为什么所有的对象类都可以使用rx方法呢,再比如Timer实现方式为什么跟原生的又差别如此之大呢,带着这些个疑问,就想着看一下这强大的库是如何实现的,下面大概分享一下个人的拙见;

RxSwift本质上就是信号的产生、订阅、发送跟销毁,核心逻辑就是产生、订阅、发送三步曲:1、创建信号 2、订阅信号 3、发送信号,下面就以一个最简单信号创建订阅流程来分析一下,它内部是怎么实现的;

先创建Observable可观察者对象,然后使用subscribe订阅,最后第三步发送信号就是隐藏步骤,实际开发中,我们不需要去直接调用onNext、onError操作;

class ViewController: UIViewController {

let disposeBag = DisposeBag()

override func viewDidLoad() {

super.viewDidLoad()

// 1、创建信号

let ob = Observable.create { observer in

// 3、发送信号

observer.onNext("下一步")

// observer.onError(NSError.init(domain: "Chris's error", code: 10086, userInfo: nil))

observer.onCompleted()

return Disposables.create()

}

// 2、订阅信号

let _ = ob.subscribe { text in

print("订阅到了:\(text)")

} onError: { error in

print("error:\(error)")

} onCompleted: {

print("完成")

} onDisposed: {

print("销毁")

}

.disposed(by: disposeBag)

}

}

一、创建信号

1、Observable.create

通过Observable.create创建一个可观察对象,传入一个尾随闭包作为参数,点进去create看一下其内部实现,发现create是ObservableType的一个扩展方法;

而ObservableType其实是一个协议,继承自ObservableConvertibleType,Observable遵循了ObservableType协议,即Observable调用ObservableType协议里的create方法;

2、AnonymousObservable.subscribeHandler

create方法内部就一句代码 AnonymousObservable(subscribe),AnonymousObservable这个字面意思,是个匿名的可观察者,把subscribe传给AnonymousObservable,这个subscribe是我们创建的尾随闭包,那么AnonymousObservable里面又做什么处理呢?只能继续往下翻了;

public static func create(_ subscribe: @escaping (AnyObserver) -> Disposable) -> Observable {

AnonymousObservable(subscribe)

}

点进去AnonymousObservable看,发现AnonymousObservable给回调闭包取了个别名,又定义了一个全局变量subscribeHandler,保存我们外面传进来的闭包; 这里我们看到AnonymousObservable是继承的Producer,而Producer点击发现,它其实是继承自Observable;

到这一步,其实我们创建订阅信号的步骤就已经完成了;可能有点绕,总结一下,本质的思想就是通过父类Observable创建的订阅信号闭包,交给子类AnonymousObservable去保存实现;

二、订阅信号

1、创建一个AnonymousObserver订阅者

上面我们已经说了,我们创建的ob对象,其实是AnonymousObserver对象,所以此处subscribe就是创建一个AnonymousObserver订阅者;

2、AnonymousObserver初始化的时候保存eventHandler

前面的disposable这些销毁对象的创建先不看,重点看return返回值,self.asObservable().subscribe(observer) 这个做为参数传给可销毁对象Disposables; observer这个其实就是订阅者,它是AnonymousObserver对象,继承自ObserverBase,后面传入的参数是个事件回调闭包,AnonymousObserver就是将eventHandler事件保存下来;注意到里面还有个onCore方法,里面是调用eventHandler执行操作; AnonymousObserver的父类ObserverBase,它里面有个on方法,里面就一个switch方法,Event是个枚举类,往下看就能看到我们熟悉next\error\completed,再往后就是调用onCore方法,这个调用的时机后面具体分析;

3、self.asObservable().subscribe(observer) 方法调用

接下来看,self上面也说了,是AnonymousObservable对象,asObservable也不用过多关心,其实就是类似OC里面的多态,强制性返回Observable Class,重点看subscribe方法调用,传入的observer参数,上面步骤2已经说过了;

4、producer.subscribe

subscribe方法点击jump发现好多地方都有该方法,如上文所说,ob本质是AnonymousObservable对象,他们的继承链关系AnonymousObservable->Producer->Observable;Observable又遵循ObservableType协议,上诉几个类都有实现subscribe方法,具体也不知道要执行哪一个方法,所以这个时候找起来就比较麻烦了,我这个比较懒,不想挨个去找了,此时最简单的方法其实就是查看调用堆栈了; 这里可以发现asObservable().subscribe是执行的父类Producer里面的subscribe方法,其实仔细查看源码也会发现,AnonymousObservable没有实现subscribe方法,而Observable里面只是调用rxAbstractMethod()构造方法而已,做一些错误处理,也没有具体实现,所以关键代码还是在Producer里; 分析Producer里面的方法,主要就是通过不同条件,执行不同代码,其实本质执行的都是执行下面三行代码;

let disposer = SinkDisposer()

let sinkAndSubscription = self.run(observer, cancel: disposer)

disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

这个schedule就先不深究了,调用之后还是会执行action方法,就是刚才subscribe里面的闭包函数;

5、AnonymousObservable.run

重点还是看一下上面run方法;调用的AnonymousObservable对象本身的run方法,run里面又调用AnonymousObservableSink的run方法,这个AnonymousObservableSink又是啥子东西???只能接着往下看了

6、AnonymousObservableSink.run

AnonymousObservableSink是继承自Sink,Sink是什么先不深究,里面定义了一些方法实现,里面保存了observe、cancel对象;

7、parent.subscribeHandler(AnyObserver(self))

看当前的这个run方法,里面就一句代码,parent就是步骤5传入的self,即AnonymousObservable对象,即表示AnonymousObservable调用subscribeHandler方法; 这个时候就串起来了,前文创建信号的时候讲过创建Observable的订阅信号的时候,交给子类AnonymousObservable去保存,这个时候就是调用之前保存的闭包回调了,就会来到下面这一步;

三、发送信号

1、 AnonymousObserver.onNext执行

根据上面的一系列操作,我们已经可以执行create里面的回调了,这一步我们开头的时候也说了,实际开发中不需要我们手动去调用onNext、onError等方法,但是既然我们是探究他的原理,那就继续往下看;

这个observer是个什么东西?为什么能调用onNext等方法呢?

上面步骤7有提到,subscribeHandler.(AnyObserver(self)),我们可以得出observer == AnyObserver(self),那我们点开AnyObserver,发现它其实就是个结构体,遵循了ObserverType协议,用observer保存了AnonymousObservableSink的on方法,AnonymousObservableSink我们在订阅信号的步骤6有提到,截图中有被收纳起来的on方法;

所以create中的observer =AnyObserver(self),里面AnyObserver所持有的对象self.observer = AnonymousObservableSink.on;

2、ObserverType.onNext

到了这一步,我们还是不知道onNext怎么来的,既然它自身没有实现,那么只能去看他的协议方法了,果不其然,在协议扩展方法里面实现了onNext,往下看,其实他是调用当前的on方法,传入.next枚举外带value值;

3、AnonymousObservableSink.on

继续走会发现,调用了AnyObserver本身的on方法,实现就一行代码;

self.observer(event)

上面已经分析出了observer == AnonymousObservableSink.on,其实这里就是调用AnonymousObservableSink的on方法,继续往下看看sink.on里面做了什么操作;

下面是AnonymousObservableSink的on方法,这个方法很眼熟,跟前文中提到的ObserverBase的on方法很类似,只是ObserverBase的on最后是调用的onCore方法,这边调用的是forwardOn;

4、Sink.forwardOn

forwardOn当前类没有实现,只能去它的父类找,继续网上找,发现它是调用的self.observer.on方法,这个self.observer之前订阅信号的步骤6有提到过,sink保存observer跟cancel对象用来后续处理,唉,这边就用到了;

5、ObserverBase.on

这个self.observer打印发现它其实是AnonymousObserver类,继承至ObserverBase,所以到这一步,还是调用我们前文提到的ObserverBase的on方法,在往下执行onCore;

6、AnonymousObserver.onCore->self.eventHandler()

上一步骤的self其实是AnonymousObserver,那么onCore往下执行就到了我们订阅信号的步骤2提到的,执行AnonymousObserver保存下来eventHandler事件; 这个eventHandler事件,在订阅信号创建的时候,被我隐藏了,没有展开讲,这边展开来看一下,里面到底有什么秘密;

7、onNext、onError、onCompleted、dispose执行

点进去发现这个闭包就是响应event事件,对不同事件执行不同方法,onNext就是我们subscribe传入的闭包回调,value就是我们onNext传入的值;

public func onNext(_ element: Element) {

self.on(.next(element))

}

到这里我们整个创建、订阅、发送信号整个过程就已经分析完了,前前后后执行了二十多个方法;里面的涉及了很多的继承、扩展、协议等等,可能有点绕;

四、小结:

总的流程有点绕,下面简单梳理一下: 首先是带able结尾的信号生产者继承链关系:

首先是带observer结尾的信号订阅者继承链关系: 整体核心流程如下:

从图中也可以看出,Observable、Observer分工明确,sink是起到承上启下的作用,同时保存了observer、cancel,订阅信号的run方法,发送信号的on方法,都是从这边调用的,这个类就相当于一个业务中间层,所有业务逻辑都在这里处理,通过这个中间件可以串联Observable信号生产者跟Observer信号订阅者;

我最近做的音视频开发的模块,视频的渲染跟解码就是通过这种方式使之分离,谁要渲染直接订阅当前暴露的接口就可以,调度者就根据订阅的信号去返回AVframe就可以,也无需关心是谁订阅的,同样的,音频帧处理也是这样。其实我们平时开发中也可以借鉴这种设计模式,分析框架最主要的就是学习其优秀的设计模式,为我所用;

以上,就是个人理解的RxSwift大体实现流程,如有不对,欢迎指正!!!

精彩链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: