RxJava目前有两种发布和订阅模式

第一种 cold模式,这种模式观察者订阅被观察模式时,被观察者的动作会重放,举例说明:

@NonNull

Flowable<@NonNull Object> observeOn =Flowable.create(e -> {

e.onNext(1); Thread.sleep(1000);

e.onNext(2); Thread.sleep(1000);

e.onNext(3); Thread.sleep(1000);

e.onNext(4); Thread.sleep(1000);

e.onNext(5); Thread.sleep(1000);

e.onNext(6); Thread.sleep(1000);

}, BackpressureStrategy.BUFFER).observeOn(Schedulers.computation());

Thread.sleep(1000 * 1);

observeOn.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

Thread.sleep(1000 * 1);

observeOn.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

这种会大音两次1,2,3,4,5,6,即使时不同的线程,第二次会等第一次完成过后开始(因为未设置subscribeOn,所以是单线程的,第二次会在第一次完成后再开始)。而且是当观察者订阅被观察者的时候触发被观察者的动作。

RxComputationThreadPool-1 + next + 1

RxComputationThreadPool-1 + next + 2

RxComputationThreadPool-1 + next + 3

RxComputationThreadPool-1 + next + 4

RxComputationThreadPool-1 + next + 5

RxComputationThreadPool-1 + next + 6

RxComputationThreadPool-2 + next + 1

RxComputationThreadPool-2 + next + 2

RxComputationThreadPool-2 + next + 3

RxComputationThreadPool-2 + next + 4

RxComputationThreadPool-2 + next + 5

RxComputationThreadPool-2 + next + 6

我们create中lambda的方法会再次运行

相似的ReplaySubject

还有一种当订阅后会重新处理已发送的数据ReplaySubject

ReplaySubject replaySubject = ReplaySubject.create(1);

replaySubject.onNext(1);

replaySubject.onNext(2);

replaySubject.onNext(3);

System.out.println("subscribe 1");

replaySubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

replaySubject.onNext(4);

replaySubject.onNext(5);

System.out.println("subscribe 2");

replaySubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

replaySubject.onNext(6);

控制台输出为

subscribe 1

subscribe 2

RxComputationThreadPool-1 + next + 1

RxComputationThreadPool-1 + next + 2

RxComputationThreadPool-1 + next + 3

RxComputationThreadPool-1 + next + 4

RxComputationThreadPool-1 + next + 5

RxComputationThreadPool-1 + next + 6

RxComputationThreadPool-2 + next + 1

RxComputationThreadPool-2 + next + 2

RxComputationThreadPool-2 + next + 3

RxComputationThreadPool-2 + next + 4

RxComputationThreadPool-2 + next + 5

RxComputationThreadPool-2 + next + 6

但是Flowable.create和ReplaySubject的模式不太一样。 对于Flowable来说,新的subscribe来临时,会重新执行create方法里面的FlowableOnSubscribe的apply方法。 对于ReplaySubject是把前面onNext()的数据保存到list中,然后新的subscribe来临时重新遍历list消费。这里需要注意ReplaySubject有内存泄漏的风险。见io.reactivex.rxjava3.subjects.ReplaySubject.buffer。

第二种是HOT模式。

1、使用cold + publish()方法修改cold为hot。

@NonNull

ConnectableFlowable<@NonNull Object> publish = Flowable.create(e -> {

e.onNext(1); Thread.sleep(1000);System.out.println(1+ " " + Thread.currentThread().getName());

e.onNext(2); Thread.sleep(1000);System.out.println(2+ " " + Thread.currentThread().getName());

e.onNext(3); Thread.sleep(1000);System.out.println(3+ " " + Thread.currentThread().getName());

e.onNext(4); Thread.sleep(1000);System.out.println(4+ " " + Thread.currentThread().getName());

e.onNext(5); Thread.sleep(1000);System.out.println(5+ " " + Thread.currentThread().getName());

e.onNext(6); Thread.sleep(1000);System.out.println(6+ " " + Thread.currentThread().getName());

}, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.computation()).publish();

System.out.println("connect");

publish.connect();

System.out.println("subscribe 1");

publish.subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

Thread.sleep(3000 * 1);

System.out.println("subscribe 2");

publish.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

下面是输出,可以看到没有出现重放。

connect

subscribe 1

main + next + 1

1 RxComputationThreadPool-1

RxComputationThreadPool-1 + next + 2

2 RxComputationThreadPool-1

RxComputationThreadPool-1 + next + 3

3 RxComputationThreadPool-1

RxComputationThreadPool-1 + next + 4

subscribe 2

4 RxComputationThreadPool-1

RxComputationThreadPool-1 + next + 5

RxComputationThreadPool-2 + next + 5

5 RxComputationThreadPool-1

RxComputationThreadPool-1 + next + 6

RxComputationThreadPool-2 + next + 6

6 RxComputationThreadPool-1

2、或者使用Subject对象 这里使用了PublishSubject,他的观察者只处理订阅过后的数据。subject包含了其他类型的对象,可以参考RxJava 的 Subject

PublishSubject publishSubject = PublishSubject.create();

publishSubject.onNext(1);

publishSubject.onNext(2);

publishSubject.onNext(3);

System.out.println("subscribe 1");

publishSubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

publishSubject.onNext(4);

publishSubject.onNext(5);

System.out.println("subscribe 2");

publishSubject.observeOn(Schedulers.computation()).subscribe(e -> System.out.println(Thread.currentThread().getName() + " + next + " + e));

publishSubject.onNext(6);

执行过后的打印

subscribe 1

subscribe 2

RxComputationThreadPool-1 + next + 4

RxComputationThreadPool-1 + next + 5

RxComputationThreadPool-1 + next + 6

RxComputationThreadPool-2 + next + 6

推荐链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: