Rxjava的优势

提高工作效率能优雅的解决复杂的业务场景

1.Rxjava原理

RxJava 的原理就是创建一个 Observable 对象来干活,然后使用各种操作符建立起来的链式操作,就如同流水线一样,把你想要处理的数据一步一步地加工成你想要的成品,然后发射给 Subscriber 处理。

2.使用Rxjava需要添加的依赖

implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

implementation 'io.reactivex.rxjava2:rxjava:2.2.8'

3.Rxjava跟Rxandroid的关系

其中 RxAndroid 是 RxJava 在 Android 平台的扩展。它包含了一些能够简化 Android 开发的 工具,比如特殊的调度器:AndroidSchedulers.mainThread() (线程切换用的是这个)

4.基本的用法

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onNext("hi 观察者");

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("onSubscribe");

}

@Override

public void onNext(String s) {

System.out.println("观察者收到来自被观察者的消息---"+s);

}

@Override

public void onError(Throwable e) {

}

@Override

public void onComplete() {

}

});

通过create操作符创建被观察者,通过subscribe这个方法进行订阅,事件处理完成通知观察者。

onCompleted():当不会再有新的 onNext 发出时,触发 onCompleted()方法作为完成标志

onError():事件处理过程中出现异常时,onError()会被触发

5.通过Consumer 实现简略回调

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onNext("hi 观察者");

}

}).subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("收到来自被观察者的消息---"+s);

}

});

这个跟上面得到的结果是一样的

6.RxJava创建型操作符,常用的create、 just 、fromArray、interval、range 和 repeat

create操作符已经用到就是创建被观察者

just是不断地将事件添加到任务队列中

Observable.just("唐三","唐昊","唐晨").subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("收到来自被观察者的消息---"+s);

}

});

fromArray:跟just基本一样就是可以直接接收数组

String[] array={"唐三","唐昊","唐晨"};

Observable.fromArray(array).subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("收到来自被观察者的消息---"+s);

}

});

interval:创建一个按固定时间间隔发射整数序列的 Observable,相当于定时器

Observable.interval(50, 200, TimeUnit.MILLISECONDS).subscribe(new Consumer() {

@SuppressLint("CheckResult")

@Override

public void accept(Long aLong) throws Exception {

System.out.println("收到来自被观察者的消息---"+aLong);

}

});

注意定时器这样的例子需要在Android环境进行测试,否则没有效果

range:创建发射指定范围的整数序列的 Observable,可以拿来替代 for 循环,发射一个范围内的有 序整数序列

Observable.range(0,5).subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("range---"+integer);

}

});

输出

range—0 range—1 range—2 range—3 range—4

repeat:创建一个 N 次重复发射特定数据的 Observable

Observable.range(0,2).repeat(2).subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("range---"+integer);

}

});

输出:

range—0 range—1 range—0 range—1

7.RxJava变换操作符,常用的map、flatMap、cast、concatMap、flatMapIterable、 buffer 和 groupBy

map:将 Observable 转换为一个新的 Observable 对象并发射,观察者将收到新的 Observable 处理

String Host="http://blog.csdn.net/";

Observable.just("lpf85").map(s -> Host+s).subscribe(new Consumer() {

@Override

public void accept(String integer) throws Exception {

System.out.println("accept---"+integer);

}

});

输出:

accept—http://blog.csdn.net/lpf85

flatMap:flatMap 操作符将 Observable 发射的数据集合变换为 Observable 集合,然后将这些 Observable发射的数据平坦化地放进一个单独的 Observable

cast:强制将数据转换成另外一个类型

String Host="http://blog.csdn.net/";

Observable.just("lpf85","lpf86","lpf87").flatMap(new Function>() {

@Override

public ObservableSource apply(String s) throws Exception {

return createResponse(Host,s);

}

}).cast(String.class).subscribe(new Consumer() {

@Override

public void accept(String o) throws Exception {

System.out.println("accept---"+o);

}

});

private ObservableSource createResponse(String host,String s) {

return Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onNext(host+s);

}

});

}

输出:

accept—http://blog.csdn.net/lpf85 accept—http://blog.csdn.net/lpf86 accept—http://blog.csdn.net/lpf87

concatMap:concatMap 操作符功能与 flatMap 操作符一致;不过,它解决了 flatMap 交叉问题,提供了一种能够把发射的值连续在一起的函数,而不是合并它们

buffer:buffer 操作符将源 Observable 变换为一个新的 Observable,这个新的 Observable 每次发射一组列表值而不是一个一个发射,和 buffer 操作符类似的还有 window 操作符,只不过 window 操作符发射的是 Observable 而不是数据列表。

Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").buffer(3).subscribe(new Consumer>() {

@Override

public void accept(List strings) throws Exception {

for (int i = 0; i < strings.size(); i++) {

System.out.println("accept---"+strings.get(i)+i);

}

System.out.println("accept-------------------");

}

});

buffer(3),表示缓存容量是3

输出:

accept—唐三0 accept—唐昊1 accept—唐晨2 accept------------------- accept—小舞0 accept—马红俊1 accept—宁容蓉2 accept-------------------

groupBy:goupBy 操作符用于分组元素,将源 Observable 变换成一个发射 Observables 的新 Observable,它们中的每一个新 Observable 都发射一组指定的数据。

Observable.just(1,2,3,4,5,6).groupBy(new Function() {

@Override

public String apply(Integer integer) throws Exception {

return integer>3?"1组":"2组";

}

}).subscribe(new Consumer>() {

@Override

public void accept(GroupedObservable objectIntegerGroupedObservable) throws Exception {

System.out.println("accept---"+objectIntegerGroupedObservable.getKey()+" value=");

}

});

输出:

accept—2组 accept—1组

8.Rxjava 过滤操作符,常用的filter、 elementAt、 distinct、skip、take、ignoreElements

filter:filter 操作符是对源 Observable 产生的结果自定义规则进行过滤,只有满足条件的结果才会 提交给订阅者

Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").filter(new Predicate() {

@Override

public boolean test(String s) throws Exception {

return s.startsWith("唐");

}

}).subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("accept---"+s);

}

});

输出:

accept—唐三 accept—唐昊 accept—唐晨

elementAt:elementAt 操作符用来返回指定位置的数据

Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").elementAt(3).subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("accept---"+s);

}

});

位置是从0开始的,跟数组是一样的

输出:

accept—小舞

distinct:distinct 操 作 符 用 来 去 重 , 其 只 允 许 还 没 有 发 射 过 的 数 据 项 通 过

Observable.just(1,2,2,3,4,4,5,6).distinct().subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("accept---"+integer);

}

});

输出:

accept—1 accept—2 accept—3 accept—4 accept—5 accept—6

skip、take:skip 操作符将源 Observable 发射的数据过滤掉前 n 项;而 take 操作符则只取前 n 项;

Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").skip(3).subscribe(new Consumer() {

@Override

public void accept(String str) throws Exception {

System.out.println("accept---"+str);

}

});

输出:

accept—小舞 accept—马红俊 accept—宁容蓉

Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").take(3).subscribe(new Consumer() {

@Override

public void accept(String str) throws Exception {

System.out.println("accept---"+str);

}

});

输出:

accept—唐三 accept—唐昊 accept—唐晨

ignoreElements:ignoreElements 操作符忽略所有源 Observable 产生的结果,只把 Observable 的 onSubscribe和 onComplete 事件通知给订阅者

Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").ignoreElements().subscribe(new CompletableObserver() {

@Override

public void onSubscribe(Disposable d) {

System.out.println("onSubscribe---");

}

@Override

public void onComplete() {

System.out.println("onComplete---");

}

@Override

public void onError(Throwable e) {

System.out.println("onError---");

}

});

输出:

onSubscribe— onComplete—

8.Rxjava 组合操作符,常用的有startWith、merge、concat、zip 和 combineLastest

startWith:startWith 操作符会在源 Observable 发射的数据前面插上一些数据

Observable.just("云韵","古薰儿","美杜莎").startWith("萧炎").subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("accept---"+s);

}

});

输出:

accept—萧炎 accept—云韵 accept—古薰儿 accept—美杜莎

merge:merge 操作符将多个 Observable 合并到一个 Observable 中进行发射,merge 可能会让合并的Observable 发射的数据交错。

Observable just1 = Observable.just("云韵", "古薰儿", "美杜莎");

Observable just2 = Observable.just("小舞", "宁容容", "白沉香");

Observable.merge(just1,just2).subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("accept---"+s);

}

});

输出:

accept—云韵 accept—古薰儿 accept—美杜莎 accept—小舞 accept—宁容容 accept—白沉香

concat:将多个 Obserbavle 发射的数据进行合并发射。concat 严格按照顺序发射数据,前一个 Observable 没发射完成是不会发射后一个 Observable 的数据的

Observable just1 = Observable.just("云韵", "古薰儿", "美杜莎");

Observable just2 = Observable.just("小舞", "宁容容", "白沉香");

Observable.concat(just1,just2).subscribeOn(Schedulers.io()).subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("accept---"+s);

}

});

输出:

accept—云韵 accept—古薰儿 accept—美杜莎 accept—小舞 accept—宁容容 accept—白沉香

zip:zip 操作符合并两个或者多个 Observable 发射出的数据项,根据指定的函数变换它们,并发 射一个新值。

Observable just1 = Observable.just("云韵", "古薰儿", "美杜莎");

Observable just2 = Observable.just("一", "二", "三");

Observable.zip(just1, just2, new BiFunction() {

@Override

public String apply(String s, String s2) throws Exception {

return s+"颜值第"+s2;

}

}).subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("accept---"+s);

}

});

输出:

accept—云韵颜值第一 accept—古薰儿颜值第二 accept—美杜莎颜值第三

combineLastest:当两个 Observable 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据。

Observable just1 = Observable.just("云韵", "古薰儿", "美杜莎");

Observable just2 = Observable.just("一", "二", "三");

Observable.combineLatest(just1, just2, new BiFunction() {

@Override

public String apply(String s, String s2) throws Exception {

return s+"第"+s2;

}

}).subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("accept---"+s);

}

});

输出:

accept—美杜莎第一 accept—美杜莎第二 accept—美杜莎第三

8.Rxjava 辅助操作符,常用的有delay、Do、subscribeOn、observeOn 和 timeout

delay:delay 操作符让原始 Observable 在发射每项数据之前都暂停一段指定的时间段。

这个测试需要在Android环境下,不能在java环境下测试,否则没有效果。

Do: Do 系列操作符就是为原始 Observable 的生命周期事件注册一个回调,当 Observable 的某个 事件发生时就会调用这些回调

doOnEach:为 Observable 注册这样一个回调,当 Observable 每发射一项数据时就会调 用它一次,包括 onNext、onError 和 onCompleted。doOnNext:只有执行 onNext 的时候会被调用。doOnSubscribe:当观察者订阅 Observable 时就会被调用。doOnUnsubscribe:当观察者取消订阅 Observable 时就会被调用; Observable 通过 onError 或者 onCompleted 结束时,会取消订阅所有的 Subscriber。doOnCompleted:当 Observable 正常终止调用 onCompleted 时会被调用。doOnError:当 Observable 异常终止调用 onError 时会被调用。doOnTerminate:当 Observable 终止(无论是正常终止还是异常终止)之前会被调用。finallyDo:当 Observable 终止(无论是正常终止还是异常终止)之后会被调用。

subscribeOn、observeOn:

subscribeOn 操作符用于指定 Observable 自身在哪个线程上运行,如果 Observable 需要执行耗时操作,一般可以让其在新开的一个子线程上运行。

observerOn 用来指定 Observer 所运行的线程,也就是发射出的数据在哪个线程上使用。一般情况下会指定在主线程中运行,这样就可以修改 UI。

timeout:如果原始 Observable 过了指定的一段时长没有发射任何数据,timeout 操作符会以一个 onError 通知终止这个 Observable,或者继续执行一个备用的 Observable。

9.Rxjava 错误处理操作符

​ RxJava 在错误出现的时候就会调用 Subscriber 的 onError 方法将错误分发出去,由 Subscriber 自己来处理错误。但是如果每个 Subscriber 都处理一遍的话,工作量就有点大了,这时候可以 使用错误处理操作符。错误处理操作符有 catch 和 retry。

catch:catch 操作符拦截原始 Observable 的 onError 通知,将它替换为其他数据项或数据序列,让 产生的 Observable 能够正常终止或者根本不终止。RxJava 将 catch 实现为以下 3 个不同的操 作符:

onErrorReturn:Observable 遇到错误时返回原有 Observable 行为的备用 Observable,备 用 Observable 会忽略原有 Observable 的 onError 调用,不会将错误传递给观察者。作为 替代,它会发射一个特殊的项并调用观察者的 onCompleted 方法。onErrorResumeNext: Observable 遇到错误时返回原有 Observable 行为的备用 Observable, 备用 Observable 会忽略原有 Observable 的 onError 调用,不会将错误传递给观察者。作 为替代,它会发射备用 Observable 的数据。onExceptionResumeNext:它和 onErrorResumeNext 类似。不同的是,如果 onError 收到 的 Throwable 不是一个 Exception,它会将错误传递给观察者的 onError 方法,不会使用 备用的 Observable。

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

for (int i = 0; i < 5; i++) {

if (i>2){

emitter.onError(new Throwable("throwable"));

}

emitter.onNext(i);

}

emitter.onComplete();

}

}).onErrorReturn(new Function() {

@Override

public Integer apply(Throwable throwable) throws Exception {

return 6;

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

}

@Override

public void onNext(Integer integer) {

System.out.println("accept---"+integer);

}

@Override

public void onError(Throwable e) {

System.out.println("onError---"+e.toString());

}

@Override

public void onComplete() {

System.out.println("onComplete---");

}

});

输出:

accept—0 accept—1 accept—2 accept—6 onComplete—

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

for (int i = 0; i < 5; i++) {

if (i>2){

emitter.onError(new Throwable("throwable"));

}

emitter.onNext(i);

}

}

}).onExceptionResumeNext(new Observable() {

@Override

protected void subscribeActual(Observer observer) {

observer.onNext(6);

}

}).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

}

@Override

public void onNext(Integer integer) {

System.out.println("accept---"+integer);

}

@Override

public void onError(Throwable e) {

System.out.println("onError---"+e.toString());

}

@Override

public void onComplete() {

System.out.println("onComplete---");

}

});

输出:

accept—0 accept—1 accept—2 onError—java.lang.Throwable: throwable

retry:retry 操作符不会将原始 Observable 的 onError 通知传递给观察者,它会订阅这个 Observable,再给它一次机会无错误地完成其数据序列。 retry 总是传递 onNext 通知给观察者,由于重新订阅,这可能会造成数据项重复。RxJava 中的实现为 retry 和 retryWhen。

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

for (int i = 0; i < 5; i++) {

if (i==1){

emitter.onError(new Throwable("throwable"));

}

emitter.onNext(i);

}

}

}).retry(2).subscribe(new Observer() {

@Override

public void onSubscribe(Disposable d) {

}

@Override

public void onNext(Integer integer) {

System.out.println("accept---"+integer);

}

@Override

public void onError(Throwable e) {

System.out.println("onError---"+e.toString());

}

@Override

public void onComplete() {

System.out.println("onComplete---");

}

});

accept—0 accept—0 accept—0 onError—java.lang.Throwable: throwable

10.布尔操作符,常用的all、contains、isEmpty

all:操作符根据一个函数对源 Observable 发射的所有数据进行判断,最终返回的结果就是这 个判断结果。这个函数使用发射的数据作为参数,内部判断所有的数据是否满足我们定义好的 判断条件。如果全部都满足则返回 true,否则就返回 false。

Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").all(new Predicate() {

@Override

public boolean test(String s) throws Exception {

return s.startsWith("唐");

}

}).subscribe(new Consumer() {

@Override

public void accept(Boolean aBoolean) throws Exception {

System.out.println("accept---"+aBoolean);

}

});

}

输出:

accept—false

contains、isEmpty:contains 操作符用来判断源 Observable 所发射的数据是否包含某一个数据。如果包含该数据,会返回 true;如果源 Observable 已经结束了却还没有发射这个数据,则返回 false。isEmpty操作符用来判断源 Observable 是否发射过数据。

Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").contains("唐三").subscribe(new Consumer() {

@Override

public void accept(Boolean aBoolean) throws Exception {

System.out.println("accept---"+aBoolean);

}

});

输出:

accept—true

11.条件操作符,常用的amb、defaultIfEmpty

amb:amb 操作符对于给 定两个 或多个 Observable,它只 发射首先发射数 据或通知 的那个 Observable 的所有数据。

defaultIfEmpty:发射来自原始 Observable 的数据。如果原始 Observable 没有发射数据,就发射一个默认数据

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onComplete();

}

}).defaultIfEmpty("萧炎").subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("accept---"+s);

}

});

输出:

accept—萧炎

12.转换操作符:常用的oList、toSortedList、toMap

toList:toList 操作符将发射多项数据且为每一项数据调用 onNext 方法的 Observable 发射的多项数 据组合成一个 List,然后调用一次 onNext 方法传递整个列表。

Observable.just("唐三","唐昊","唐晨").toList().subscribe(new Consumer>() {

@Override

public void accept(List strings) throws Exception {

for (int i = 0; i < strings.size(); i++) {

String s = strings.get(i);

System.out.println("accept---"+s);

}

}

});

输出:

accept—唐三 accept—唐昊 accept—唐晨

toSortedList:toSortedList 操作符类似于 toList 操作符;不同的是,它会对产生的列表排序,默认是自然升序。如果发射的数据项没有实现 Comparable 接口,会抛出一个异常。当然,若发射的数据项 没有实现 Comparable 接口,可以使用 toSortedList(Func2)变体,其传递的函数参数 Func2 会作 用于比较两个数据项。

Observable.just("唐三","小舞","唐昊","唐晨").toSortedList().subscribe(new Consumer>() {

@Override

public void accept(List strings) throws Exception {

for (int i = 0; i < strings.size(); i++) {

String s = strings.get(i);

System.out.println("accept---"+s);

}

}

});

输出:

accept—唐三 accept—唐昊 accept—唐晨 accept—小舞

toMap:toMap 操作符收集原始 Observable 发射的所有数据项到一个 Map(默认是 HashMap),然 后发射这个 Map。

Douluo tangsan=new Douluo("唐三","100级");

Douluo douluo=new Douluo("菊斗罗","95级");

Douluo tangchen=new Douluo("唐晨","99级");

Observable.just(tangchen,douluo,tangchen).toMap(new Function() {

@Override

public String apply(Douluo douluo) throws Exception {

return douluo.getLevel();

}

}).subscribe(new Consumer>() {

@Override

public void accept(Map stringDouluoMap) throws Exception {

String name = stringDouluoMap.get("99级").getName();

System.out.println("accept---"+name);

}

});

输出:

accept—唐晨

13.rxjava 线程控制

如果不指定线程,默认是在调用 subscribe 方法的线程上进行回调的。如果我们想切换 线程,就需要使用 Scheduler。RxJava 已经内置了常用的如下 4个 Scheduler:

Schedulers.newThread():总是启用新线程,并在新线程执行操作。Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。 行为模式和 newThread()差不多,区别在于 io() 的内部实现是用一个无数量上限的线 程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。Schedulers.computation():计算所使用的 Scheduler,例如图形的计算。这个 Scheduler 使用固定线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。它是 buffer、debounce、delay、interval、sample 和 skip 操作符的默认调度器。Schedulers.trampoline():当我们想在当前线程执行一个任务时,并不是立即时,可以 用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中的每一个 任务。它是 repeat 和 retry 操作符默认的调度器。

另外,RxAndroid 也提供了一个常用的 Scheduler:

AndroidSchedulers.mainThread()—RxAndroid 库中提供的 Scheduler,它指定的操作在 主线程中运行。

在 RxJava 中用 subscribeOn 和 observeOn 操作符来控制线程。

以上就是Rxjava全部的内容了,总结一下方便查阅使用。

Android-Rxjava在项目中的应用

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: