RxJava分为创建操作符、变换操作符、过滤操作符、组合操作符、错误操作符、辅助操作符、条件和布尔操作符、算术和聚合操作符、连接操作符等。这里只会介绍部分比较常用的操作符

8.3.1 创建操作符

下文介绍了create、just、from、interval、range和repeat

1. create操作符

创建被观察者

emitter.onNext(...);

emitter.onNext(...);

emitter.onNext(...);

emitter.onError(new Throwable());

emitter.onComplete();

详情见上篇

2. just操作符

更加快捷的创建方式

//定义变量-观察者

Observer observer = new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("onSubscribe...");

}

@Override

public void onNext(Object o) {

System.out.println("onNext..." + o );

}

@Override

public void onError(Throwable e) {

System.out.println("onError..." + e.getMessage());

}

@Override

public void onComplete() {

System.out.println("onComplete");

}

};

private void test2(){

//介绍just操作符

//它有10个重载方法,允许我们最多传入10个事件

Observable.just("1","AAAA","2")

.subscribe(observer);

}

3. from操作符

fromInterable可以创建操作事件集合

private void test3(){

//fromArray可以传入无限多个参数

//很多操作符只是更方便的写法,都是重复的

Observable.fromArray("1","AAAA","2","1","AAAA","2","1","AAAA","2","1","AAAA","2","1","AAAA","2").subscribe(observer);

}

private void test4(){

//fromIterable可以操作事件集合

ArrayList list = new ArrayList<>();

list.add("111");

list.add("222");

Observable.fromIterable(list).subscribe(observer);

//Future可以查看事件在运行过程当中的各个状态

Observable.fromFuture(new Future() {

@Override

public boolean cancel(boolean mayInterruptIfRunning) {

return false;

}

@Override

public boolean isCancelled() {

return false;

}

@Override

public boolean isDone() {

return false;

}

@Override

public Object get() throws ExecutionException, InterruptedException {

return "aaa";

}

@Override

public Object get(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {

return null;

}

}).subscribe(observer);

}

4. Interval操作符

interval 操作符可用于创建一个在每个指定时间周期后发出一个递增的长整型数字的 Observable

//创建一个每秒发射一次长整型数字的Observable

Observable observable = Observable.interval(1 , TimeUnit.SECONDS);

observable.subscribe(number -> Log.d(TAG, "interval: " + number));

interval创建了一个可观察序列,根据周期不断地发出事件。通过Observable类的subscribe方法来订阅这个可观察序列,该方法接受一个实现了Observer(观察者)接口的对象作为参数,这里使用了Lambda表达式。因此,当可观察序列发出一个元素时,该元素将被打印到控制台上。这个过程将一直持续,直到可观察序列完成或出错。

5. range操作符

创建发射指定范围的整数序列的Observable,可以拿来代替for循环。第一个参数是起始值,并且不小于0,第二个参数作为终值。并且左闭右开。

//创建一个发射指定范围的事件序列,处理每一个事件

Observable observable1 = Observable.range(0 , 5);

observable1.subscribe(number -> Log.d(TAG, "range: " + number));

6. repeat操作符

创建一个N次重复发射特定数据的Observable

//创建一个发射指定范围的事件序列,处理每一个事件

Observable observable1 = Observable.range(0 , 5).repeat(2);

observable1.subscribe(number -> Log.d(TAG, "range: " + number));

7. 补充

可以使用 Range 操作符和 create 操作符来实现相同的功能,即创建一个发出一系列整数事件的 Observable 对象。下面是两种实现方式的代码示例。使用range

Observable rangeObservable = Observable.range(1, 5);

rangeObservable.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

// do nothing

}

@Override

public void onNext(Integer integer) {

System.out.println(integer);

}

@Override

public void onError(Throwable e) {

// do nothing

}

@Override

public void onComplete() {

// do nothing

}

});

使用create

Observable createObservable = Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

for (int i = 1; i <= 5; i++) {

emitter.onNext(i);

}

emitter.onComplete();

}

});

createObservable.subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

// do nothing

}

@Override

public void onNext(Integer integer) {

System.out.println(integer);

}

@Override

public void onError(Throwable e) {

// do nothing

}

@Override

public void onComplete() {

// do nothing

}

});

8.3.2 变换操作符

变化操作符主要是对Observable发射的数据按照一定规则做一些变化操作,然后将变化后的数据发射出去。这里主要讲解map、flatMap、cast、concatMap、flatMapIterable、buffer和groupBy。

1. map操作符

map操作符通过指定一个Function对象,将Observable转化为一个新的Observable对象并发射。观察者收到的是新的Observable并进行处理这里的案例假设我们访问网络,Host地址会变化,但是具体的URL地址不变,因此我们可以通过map操作度来进行转化字符操作。补充:Function类的泛型T和R分别表示输入和输出的类型。

final String Host = "http://blog.csdn/net/";

Observable.just("ning111").map(new Function() {

@Override

public String apply(String s) throws Throwable {

return Host + s;

}

}).subscribe(new Consumer() {

@Override

public void accept(String s) throws Throwable {

Log.d(TAG, "accept: " + s);

}

});

2. flatMap操作符、cast操作符

flatMap操作符将Observable发射的数据集合变换为Observable集合,然后将这些Observable发射的数据平坦化放入一个单独的Observable。简单来说将一个Observable发射的每个数据项转换成另一个Observable,然后将这些Observable发射的数据合并到一个单独的Observable中。cast操作符就是强制将Observable发射的所有数据类型转换为指定类型。

final String Host = "http://blog.csdn/net/";

List mList = new ArrayList<>();

mList.add("Ning1");

mList.add("Ning2");

mList.add("Ning3");

mList.add("Ning4");

Observable.fromIterable(mList).flatMap(new Function>() {

//将集合中的每个数据项,转化成另一个数据项

//变为新的Observable

@Override

public ObservableSource apply(String s) throws Throwable {

return Observable.just(Host + s);

}

}).cast(String.class).subscribe(new Consumer() {

//这里不用cast转化的话(因为返回的是ObserverSource),会报错

@Override

public void accept(String s) throws Throwable {

Log.d(TAG, "flatMap: " + s);

}

});

此外flatMap的合并运行交叉,也就是说可能会交错地发送事件。最终结果的顺序可能并不是原始Observable发送事件的顺序。

3. concatMap操作符

concatMap操作符和flatMap操作符一致。不过它解决了flatMap交叉问题,提供了一种能把发射的值连续在一起函数。这里的代码都是一样的,就不再写了

4. flatMapIterable操作符

在处理时会将我们的数据包装成Iterable,在Iterable中我们就可以对数据进行处理。

Observable.just(1,2,3).flatMapIterable(new Function>() {

@Override

public Iterable apply(Integer integer) throws Throwable {

List mList = new ArrayList<>();

mList.add(integer + 1);

return mList;

}

}).cast(Integer.class).subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "flatMapIterable" + integer);

}

});

不使用cast的情况,这里一定要把Interable里面的问号改成具体的数据类型,不然创建消费者的时候会出错。

Observable.just(1,2,3).flatMapIterable(new Function>() {

@Override

public Iterable apply(Integer integer) throws Throwable {

List mList = new ArrayList<>();

mList.add(integer + 1);

return mList;

}

}).subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

System.out.println(integer);

}

});

5. buffer操作符

buffer操作符将Observable变换为一个新的Observable,这个新的Observable每次发射一组列表值,而不是一个一个发射。

Observable.just(1,2,3,4,5,6,7,8,9,10)

.buffer(3)

.subscribe(new Consumer>() {

@Override

public void accept(List integers) throws Throwable {

for(Integer i : integers){

Log.d(TAG, "buffer: " + i);

}

Log.d(TAG, "buffer: -----------------------" );

}

});

buffer(3)表示缓存容量为3

6. groupBy操作符

用于分组元素,将源Observable变化成一个发射Observables的新Observable。根据groupBy内的要求,这里将Observables分为偶数Observable和奇数Observable,然后放入到一个Observable中。所以有两次订阅。

Observable.just(1,2,3,4,5,6)

.groupBy(integer -> integer % 2 == 0 ? "偶数" : "奇数")

.subscribe(groupObservable -> {

groupObservable.subscribe(integer -> Log.d(TAG, "key: " + groupObservable.getKey() + ", value: " + integer));

});

正常写法

Observable.just(1,2,3,4,5,6)

.groupBy(new Function() {

@Override

public String apply(Integer integer) throws Throwable {

if(integer % 2 == 0){

return "偶数";

}else{

return "奇数";

}

}

}).subscribe(new Consumer>() {

@Override

public void accept(GroupedObservable stringIntegerGroupedObservable) throws Throwable {

Log.d(TAG, "accept: key = " + stringIntegerGroupedObservable.getKey());

stringIntegerGroupedObservable.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "accept: value = " + integer);

}

});

}

});

8.3.3 过滤操作符

过滤操作符用于过滤和选择Observable发射的数据序列,让Observable只返回满足我们条件的数据。这里介绍filter、elementAt、distinct、skip、take、ignoreElements、throttleFirst和throttleWithTimeOut。

1. filter操作符

filter操作符对源Observable产生的结果自定义规则进行过滤,只有满足条件的结果才会交给订阅者。filter操作符推荐传入的函数接口是Predicate

Observable.just(1,2,3,4).filter(new Predicate() {

@Override

public boolean test(Integer integer) throws Throwable {

return integer % 2 == 0;

}

}).subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "filter: " + integer);

}

});

2. elementAt操作符

elementAt操作符用来返回指定位置的数据。和它类似的有elementAtOrDefault(int , T),其可以允许默认值。

Observable.just(1,2,3,4)

.elementAt(2)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "elementAt: " + integer);

}

});

3. distinct操作符

distinct操作符用来去重。只允许还没有发射过的数据项通过。和它类似的还有distinctUntilChanged,它用来去掉连续重复的数据。

Observable.just(1,2,2,2,3,4,5,5,6)

.distinct()

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "distinct: " + integer);

}

});

4. skip操作符、take操作符

skip操作符将源Observable发射的数据过滤前n项

Observable.just(1,2,2,2,3,4,5,5,6)

.skip(3)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "skip: " + integer);

}

});

take操作符则只取前n项

Observable.just(1,2,2,2,3,4,5,5,6)

.take(3)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "skip: " + integer);

}

});

此外还有skipLast和takeLast操作符,它们是从后面开始的过滤操作。

5. ignoreElements操作符

ignoreElements操作符忽略所有源Observable产生的结果,只把Observable的onCompleted和onError事件通知给订阅者。这里虽然Observable发射了很多元素,但是我们没有对他们进行任何操作,我们使用了ignoreElements操作符,只关心Observable是否完成。

Observable.just(1,2,2,2,3,4,5,5,6)

.ignoreElements()

.doOnComplete(new Action() {

@Override

public void run() throws Throwable {

Log.d(TAG, "Observable Completed");

}

})

.subscribe();

6. throttleFirst操作符

throttleFirst操作符会定期发射这个时间段里源Observable发射的第一个数据throttleFirst操作符默认在computation调度器上执行。在后文会讲到类似的还有sample操作符,sample操作符会定时地发射源Observable最近发射的数据,其他都会被过滤掉。

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {

for(int i = 0 ; i < 10 ; i++){

emitter.onNext(i);

try{

Thread.sleep(100);

}catch (InterruptedException e){

e.printStackTrace();

}

}

emitter.onComplete();

}

}).throttleFirst(200 , TimeUnit.MILLISECONDS).subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "throttleFirst: " + integer);

}

});

每隔100ms发射一个数据throttleFirst操作符设置的时间是200ms,因此它会发射200ms内的第一个数据。输出结果如下:

7. throttleWithTimeOut操作符

通过时间来限流,源Observable每次发射出来一个数据后就会进行计时。如果在设定好的时间结束前Observable有新的数据发射出来,这个数据就会被丢弃。同时throttleWithTimeOut开始重新计时,如果每次都是在计时结束前发射数据,那么这个限流就会走向极端,只会发射最后一个数据。

他也默认在computation调度器中执行类似的还有deounce操作符,它不仅可以使用时间来过滤,还可以根据一个函数来限流。

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {

for(int i = 0 ; i < 10 ; i++){

emitter.onNext(i);

int sleep = 100;

if(i % 3 == 0){

sleep = 300;

}

try{

Thread.sleep(sleep);

}catch (InterruptedException e){

e.printStackTrace();

}

}

emitter.onComplete();

}

}).throttleWithTimeout(200 , TimeUnit.MILLISECONDS)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "throttleWithTimeout" + integer);

}

});

8.3.4 组合操作符

组合操作符可以同时处理多个Observable来创建我们所需要的Observable。组合操作符有merge、concat、zip、combineLastest、join和switch等。下面介绍merge、concat、zip和combineLastest

1. merge操作符

merge操作符将多个Observable合并到一个Observable中进行发射。但是merge可能会让合并的Observable发射的数据交错。

//subscribeOn(Schedulers.io) 可以将Observable的订阅过程切换到I/O线程池中执行

//避免在主线程或者其他UI线程中执行长时间的I/O操作。

Observable obs1 = Observable.just(1,2,3).subscribeOn(Schedulers.io());

Observable obs2 = Observable.just(4,5,6);

Observable.merge(obs1 , obs2)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "merge" + integer);

}

});

2. concat操作符

将多个Observable发射的数据进行合并发射,concat严格执行顺序发射数据。前一个没发射完,是不会发射后面一个的。下面的代码和上面一致:不过输出是123456这个顺序。

3. zip操作符

zip操作符合并两个或者多个Observable发射出的数据项。并根据指定的函数对Observable进行变换操作,输出一个新值。

Observable obs1 = Observable.just(1,2,3);

Observable obs2 = Observable.just("a" , "b" , "c");

Observable.zip(obs1, obs2, new BiFunction() {

@Override

public String apply(Integer integer, String s) throws Throwable {

return integer + s;

}

}).subscribe(new Consumer() {

@Override

public void accept(String s) throws Throwable {

Log.d(TAG, "zip: " + s);

}

});

4. combineLastest操作符

当两个Observable中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数结果发射数据。

Observable obs1 = Observable.just(1,2,3);

Observable obs2 = Observable.just("a" , "b" , "c");

Observable.combineLatest(obs1, obs2, new BiFunction() {

@Override

public String apply(Integer integer, String s) throws Throwable {

return integer + s;

}

}).subscribe(new Consumer() {

@Override

public void accept(String s) throws Throwable {

Log.d(TAG, "combineLatest: " + s);

}

});

如果其中一个Observable还有数据没有发射,那么该操作符会将两个Observable最新发射的数据组合在一起

8.3.5 辅助操作符

辅助操作符可以帮助我们方便地处理Observable。辅助操作符包括delay、do、subscribeOn、observeOn、timeOut、meterialize、dematerialize、timeInterval、timestamp和to等。这里主要介绍delay、do、subscribeOn、observeOn和timeout

1. delay操作符

delay操作符让原始Observable在发射每项数据之前都暂停一段指定时间。

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {

Long currentTime = System.currentTimeMillis()/1000;

emitter.onNext(currentTime);

}

}).delay(2 , TimeUnit.SECONDS)

.subscribe(new Consumer() {

@Override

public void accept(Long aLong) throws Throwable {

Log.d(TAG, "delay: " + (System.currentTimeMillis()/1000 - aLong));

}

});

这里通过System.currentTimeMillis是获取系统当前的时间。两者一减就是delay暂停的时间。

2. do操作符

do系列操作符就是为原始Observable的生命周期事件注册一个回调,当Observable的某个事件发生时就会调用这回调。和subscribe操作符一样都是某种程度上的订阅

Observable.just(1,2)

.doOnNext(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "call: " + integer);

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(@NonNull Disposable d) {

}

@Override

public void onNext(@NonNull Integer integer) {

Log.d(TAG, "onNext: " + integer);

}

@Override

public void onError(@NonNull Throwable e) {

Log.d(TAG, "onError: ");

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete: ");

}

});

3. subscribeOn操作符、observeOn操作符

subscribeOn操作符用于指定Observable在自身的哪个线程上执行,如果Observable需要执行耗时操作,一般会让其开新开一个子线程。observeOn一般指定Observer运行的线程,也就是发射出的数据可以在哪个线程中使用。 一般在主线程,可以更新UI

Observable obs = Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {

Log.d(TAG, "subscribe: " + Thread.currentThread().getName());

emitter.onNext(1);

emitter.onComplete();

}

});

//表示Observable运行在新开的线程

obs.subscribeOn(Schedulers.newThread())

//表示Observer运行在主线程

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "accept: " + Thread.currentThread().getName());

}

});

4. timeout操作符

如果Observable过了指定的一段时长还没有发射任何数据,则timeout操作符会以一个onError通知来终止这个Observable,或者继续执行一个备用的Observabletimeout有很多的变体这里介绍timeout(long , TimeUnit , Observable) ,它在超时时会切换到使用一个你指定的备用的Observable,而不是发送错误信息。 默认在computation调度器上执行。

Observable obs = Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {

for(int i = 0 ; i < 4 ; i++){

try{

Thread.sleep(i * 100);

}catch (InterruptedException e){

e.printStackTrace();

}

emitter.onNext(i);

}

emitter.onComplete();

}

}).timeout(200 , TimeUnit.MILLISECONDS , Observable.just(10 ,11));

obs.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "timeOut: " + integer);

}

});

如果在200ms内没有发射数据,就会切换到Observable.just(10,11)。

8.3.6 错误处理操作符

RxJava在错误出现的时候会调用观察者的onError方法将错误发送出去,由观察者自己来处理错误。但是让每个观察者都处理一遍错误的话,工作量就会很大。通过使用错误处理操作符,错误处理操作符有catch和retry。

1. catch操作符

catch操作符可以用来拦截原始的Observable的onError通知,将它替换为其他数据项或数据序列,让产生的Observable可以正常终止或者根本不终止。

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {

for(int i = 0 ; i < 5 ; i++){

if(i > 2){

emitter.onError(new Throwable("Throwable"));

}

emitter.onNext(i);

}

emitter.onComplete();

}

}).onErrorReturn(new Function() {

@Override

public Integer apply(Throwable throwable) throws Throwable {

return 6;

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(@NonNull Disposable d) {

Log.d(TAG, "onSubscribe: ");

}

@Override

public void onNext(@NonNull Integer integer) {

Log.d(TAG, "onNext: " + integer);

}

@Override

public void onError(@NonNull Throwable e) {

Log.d(TAG, "onError: " + e.getMessage());

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete");

}

});

当i>2后,出现错误,这个时候会发射一个特殊的项,并且调用观察者的onComplete方法结束。

2. retry操作符

retry操作符不会将原始的Observable的onError通知传递给观察者,观察者会订阅这个Observable,再给这个Observable一次机会来无错误的完成它的数据序列。retry总是传递onNext通知给观察者,由于重新订阅,因此也可能造成数据项重复。这里介绍retry(long),retry(long)指定了最多重新订阅的次数。如果次数超过了,那就会不会尝试再次订阅。而是把最新的一个onError通知传递给自己的观察者。

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {

for(int i = 0 ; i < 5 ; i++){

if(i == 1){

emitter.onError(new Throwable("Throwable"));

}else{

emitter.onNext(i);

}

}

emitter.onComplete();

}

}).retry(2).subscribe(new Observer() {

@Override

public void onSubscribe(@NonNull Disposable d) {

Log.d(TAG, "onSubscribe");

}

@Override

public void onNext(@NonNull Integer integer) {

Log.d(TAG, "onNext: " + integer);

}

@Override

public void onError(@NonNull Throwable e) {

Log.d(TAG, "onError: " + e.getMessage());

}

@Override

public void onComplete() {

Log.d(TAG, "onComplete: ");

}

});

上面的重新订阅次数为2,在i=0的时候会调用onNext方法,调用3次onNext方法会,最后才会调用onError方法。

8.3.7 条件操作符和布尔操作符

条件操作符和布尔操作符可以用于根据条件发射或变换Observable,或者对他们做布尔运算。

布尔操作符

布尔操作符有all、contains、isEmpty、exists和sequenceEqual。下面介绍三个操作符

1. all操作符

all操作符根据一个函数对源Observable发射的所有数据进行判断,最终返回的结果就是这个判断结果。这个函数使用发射的数据作为参数,内部判断所有的数据是否满足我们定义好的判断条件,满足返回true不满足返回false

Observable.just(1,2,3).all(new Predicate() {

@Override

public boolean test(Integer integer) throws Throwable {

return integer < 2;

}

}).subscribe(new Consumer() {

@Override

public void accept(Boolean aBoolean) throws Throwable {

Log.d(TAG, "accept: " + aBoolean);

}

});

这里判断整个Observable中的所有数据是否满足小于2的条件

2. contains操作符、isEmpty操作符

contains操作符用来判断源Observable所发射的数据是否包含某一个数据。如果包含返回true。如果Observable已经结束了却还没发射这个数据,返回false。isEmpty操作符用来判断源Observable是否发射过数据。如果发射过数据,返回false。如果源Observable结束了还没有发射这个数据,则返回true。

Observable.just(1,2,3)

.contains(1)

.subscribe(new Consumer() {

@Override

public void accept(Boolean aBoolean) throws Throwable {

Log.d(TAG, "contains: " + aBoolean);

}

});

Observable.just(1,2,3)

.isEmpty()

.subscribe(new Consumer() {

@Override

public void accept(Boolean aBoolean) throws Throwable {

Log.d(TAG, "isEmpty: " + aBoolean);

}

});

条件操作符

条件操作符有amb、defaultIfEmpty、skipUntil、skipWhile、takeUntil和takeWhile等。

1. amb操作符

amb操作符对于给定的两个或多个Observable,它只发射首先发射数据或通知那个Observable的所有数据。ambArray返回的也是一个Observable

Observable.ambArray(Observable.just(1,2,3).delay(2 , TimeUnit.SECONDS) , Observable.just(4,5,6))

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "amb: " + integer

);

}

});

第一个Observable延时2s发射,所以,很显然最终只会发射第二个Observable。

2. defaultIfEmpty操作符

发射来自原始Observable的数据。如果原始Observable没有发射数据,则发射一个默认的数据。

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter emitter) throws Throwable {

emitter.onComplete();

}

}).defaultIfEmpty(3)

.subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Throwable {

Log.d(TAG, "defaultIfEmpty: " + integer);

}

});

8.3.8 转换操作符

转换操作符用来将Observable转换成另一个对象或数据结构转换操作符有toList , toSortedList , toMap , toMultiMap , getIterator 和 nest等。

1. toList操作符

将发射多项数据的Observable会为每一项数据调用onNext方法,toList操作符可将Observable发射的多项数据组合成一个List。

Observable.just(1,2,3).toList()

.subscribe(new Consumer>() {

@Override

public void accept(List integers) throws Throwable {

for(int num : integers){

Log.d(TAG, "accept: " + num);

}

}

});

2. toSortedList操作符

类似于toList操作符,不同的是toSortedList操作符将对产生的列表排序,默认是自然升序。如果发射的数据没有实现Comparable接口,则会抛出一个异常。如果为实现。可以使用toSortedList(Func2)变体,其传递的函数参数可用于比较两个数据项

Observable.just(3,1,2).toSortedList()

.subscribe(new Consumer>() {

@Override

public void accept(List integers) throws Throwable {

for(int num : integers){

Log.d(TAG, "accept: " + num);

}

}

});

3. toMap操作符

将原始Observable发射的所有数据收集到一个Map中,然后发射这个Map。你可以提供一个用于生成Map的key函数,可以将一个函数转换后的数据项作为Map存储值。

Observable source = Observable.just("Alpha", "Beta", "Gamma");

Single> mapSingle = source.toMap(new Function() {

@Override

public Integer apply(String s) throws Throwable {

return s.length();

}

}, new Function() {

@Override

public String apply(String s) throws Throwable {

return s.toUpperCase();

}

});

mapSingle.subscribe(map -> {

System.out.println("Received Map: " + map);

}, throwable -> {

System.err.println("Error Occurred: " + throwable.getMessage());

});

推荐文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 

发表评论

返回顶部暗黑模式