Rxjava的优势
提高工作效率能优雅的解决复杂的业务场景
1.Rxjava原理
RxJava 的原理就是创建一个 Observable 对象来干活,然后使用各种操作符建立起来的链式操作,就如同流水线一样,把你想要处理的数据一步一步地加工成你想要的成品,然后发射给 Subscriber 处理。
2.使用Rxjava需要添加的依赖
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
implementation 'io.reactivex.rxjava2:rxjava:2.2.8'
3.Rxjava跟Rxandroid的关系
其中 RxAndroid 是 RxJava 在 Android 平台的扩展。它包含了一些能够简化 Android 开发的 工具,比如特殊的调度器:AndroidSchedulers.mainThread() (线程切换用的是这个)
4.基本的用法
Observable.create(new ObservableOnSubscribe
@Override
public void subscribe(ObservableEmitter
emitter.onNext("hi 观察者");
}
}).subscribe(new Observer
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println("观察者收到来自被观察者的消息---"+s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
通过create操作符创建被观察者,通过subscribe这个方法进行订阅,事件处理完成通知观察者。
onCompleted():当不会再有新的 onNext 发出时,触发 onCompleted()方法作为完成标志
onError():事件处理过程中出现异常时,onError()会被触发
5.通过Consumer 实现简略回调
Observable.create(new ObservableOnSubscribe
@Override
public void subscribe(ObservableEmitter
emitter.onNext("hi 观察者");
}
}).subscribe(new Consumer
@Override
public void accept(String s) throws Exception {
System.out.println("收到来自被观察者的消息---"+s);
}
});
这个跟上面得到的结果是一样的
6.RxJava创建型操作符,常用的create、 just 、fromArray、interval、range 和 repeat
create操作符已经用到就是创建被观察者
just是不断地将事件添加到任务队列中
Observable.just("唐三","唐昊","唐晨").subscribe(new Consumer
@Override
public void accept(String s) throws Exception {
System.out.println("收到来自被观察者的消息---"+s);
}
});
fromArray:跟just基本一样就是可以直接接收数组
String[] array={"唐三","唐昊","唐晨"};
Observable.fromArray(array).subscribe(new Consumer
@Override
public void accept(String s) throws Exception {
System.out.println("收到来自被观察者的消息---"+s);
}
});
interval:创建一个按固定时间间隔发射整数序列的 Observable,相当于定时器
Observable.interval(50, 200, TimeUnit.MILLISECONDS).subscribe(new Consumer
@SuppressLint("CheckResult")
@Override
public void accept(Long aLong) throws Exception {
System.out.println("收到来自被观察者的消息---"+aLong);
}
});
注意定时器这样的例子需要在Android环境进行测试,否则没有效果
range:创建发射指定范围的整数序列的 Observable,可以拿来替代 for 循环,发射一个范围内的有 序整数序列
Observable.range(0,5).subscribe(new Consumer
@Override
public void accept(Integer integer) throws Exception {
System.out.println("range---"+integer);
}
});
输出
range—0 range—1 range—2 range—3 range—4
repeat:创建一个 N 次重复发射特定数据的 Observable
Observable.range(0,2).repeat(2).subscribe(new Consumer
@Override
public void accept(Integer integer) throws Exception {
System.out.println("range---"+integer);
}
});
输出:
range—0 range—1 range—0 range—1
7.RxJava变换操作符,常用的map、flatMap、cast、concatMap、flatMapIterable、 buffer 和 groupBy
map:将 Observable 转换为一个新的 Observable 对象并发射,观察者将收到新的 Observable 处理
String Host="http://blog.csdn.net/";
Observable.just("lpf85").map(s -> Host+s).subscribe(new Consumer
@Override
public void accept(String integer) throws Exception {
System.out.println("accept---"+integer);
}
});
输出:
accept—http://blog.csdn.net/lpf85
flatMap:flatMap 操作符将 Observable 发射的数据集合变换为 Observable 集合,然后将这些 Observable发射的数据平坦化地放进一个单独的 Observable
cast:强制将数据转换成另外一个类型
String Host="http://blog.csdn.net/";
Observable.just("lpf85","lpf86","lpf87").flatMap(new Function
@Override
public ObservableSource> apply(String s) throws Exception {
return createResponse(Host,s);
}
}).cast(String.class).subscribe(new Consumer
@Override
public void accept(String o) throws Exception {
System.out.println("accept---"+o);
}
});
private ObservableSource> createResponse(String host,String s) {
return Observable.create(new ObservableOnSubscribe
@Override
public void subscribe(ObservableEmitter
emitter.onNext(host+s);
}
});
}
输出:
accept—http://blog.csdn.net/lpf85 accept—http://blog.csdn.net/lpf86 accept—http://blog.csdn.net/lpf87
concatMap:concatMap 操作符功能与 flatMap 操作符一致;不过,它解决了 flatMap 交叉问题,提供了一种能够把发射的值连续在一起的函数,而不是合并它们
buffer:buffer 操作符将源 Observable 变换为一个新的 Observable,这个新的 Observable 每次发射一组列表值而不是一个一个发射,和 buffer 操作符类似的还有 window 操作符,只不过 window 操作符发射的是 Observable 而不是数据列表。
Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").buffer(3).subscribe(new Consumer>() {
@Override
public void accept(List
for (int i = 0; i < strings.size(); i++) {
System.out.println("accept---"+strings.get(i)+i);
}
System.out.println("accept-------------------");
}
});
buffer(3),表示缓存容量是3
输出:
accept—唐三0 accept—唐昊1 accept—唐晨2 accept------------------- accept—小舞0 accept—马红俊1 accept—宁容蓉2 accept-------------------
groupBy:goupBy 操作符用于分组元素,将源 Observable 变换成一个发射 Observables 的新 Observable,它们中的每一个新 Observable 都发射一组指定的数据。
Observable.just(1,2,3,4,5,6).groupBy(new Function
@Override
public String apply(Integer integer) throws Exception {
return integer>3?"1组":"2组";
}
}).subscribe(new Consumer
@Override
public void accept(GroupedObservable
System.out.println("accept---"+objectIntegerGroupedObservable.getKey()+" value=");
}
});
输出:
accept—2组 accept—1组
8.Rxjava 过滤操作符,常用的filter、 elementAt、 distinct、skip、take、ignoreElements
filter:filter 操作符是对源 Observable 产生的结果自定义规则进行过滤,只有满足条件的结果才会 提交给订阅者
Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").filter(new Predicate
@Override
public boolean test(String s) throws Exception {
return s.startsWith("唐");
}
}).subscribe(new Consumer
@Override
public void accept(String s) throws Exception {
System.out.println("accept---"+s);
}
});
输出:
accept—唐三 accept—唐昊 accept—唐晨
elementAt:elementAt 操作符用来返回指定位置的数据
Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").elementAt(3).subscribe(new Consumer
@Override
public void accept(String s) throws Exception {
System.out.println("accept---"+s);
}
});
位置是从0开始的,跟数组是一样的
输出:
accept—小舞
distinct:distinct 操 作 符 用 来 去 重 , 其 只 允 许 还 没 有 发 射 过 的 数 据 项 通 过
Observable.just(1,2,2,3,4,4,5,6).distinct().subscribe(new Consumer
@Override
public void accept(Integer integer) throws Exception {
System.out.println("accept---"+integer);
}
});
输出:
accept—1 accept—2 accept—3 accept—4 accept—5 accept—6
skip、take:skip 操作符将源 Observable 发射的数据过滤掉前 n 项;而 take 操作符则只取前 n 项;
Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").skip(3).subscribe(new Consumer
@Override
public void accept(String str) throws Exception {
System.out.println("accept---"+str);
}
});
输出:
accept—小舞 accept—马红俊 accept—宁容蓉
Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").take(3).subscribe(new Consumer
@Override
public void accept(String str) throws Exception {
System.out.println("accept---"+str);
}
});
输出:
accept—唐三 accept—唐昊 accept—唐晨
ignoreElements:ignoreElements 操作符忽略所有源 Observable 产生的结果,只把 Observable 的 onSubscribe和 onComplete 事件通知给订阅者
Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").ignoreElements().subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe---");
}
@Override
public void onComplete() {
System.out.println("onComplete---");
}
@Override
public void onError(Throwable e) {
System.out.println("onError---");
}
});
输出:
onSubscribe— onComplete—
8.Rxjava 组合操作符,常用的有startWith、merge、concat、zip 和 combineLastest
startWith:startWith 操作符会在源 Observable 发射的数据前面插上一些数据
Observable.just("云韵","古薰儿","美杜莎").startWith("萧炎").subscribe(new Consumer
@Override
public void accept(String s) throws Exception {
System.out.println("accept---"+s);
}
});
输出:
accept—萧炎 accept—云韵 accept—古薰儿 accept—美杜莎
merge:merge 操作符将多个 Observable 合并到一个 Observable 中进行发射,merge 可能会让合并的Observable 发射的数据交错。
Observable
Observable
Observable.merge(just1,just2).subscribe(new Consumer
@Override
public void accept(String s) throws Exception {
System.out.println("accept---"+s);
}
});
输出:
accept—云韵 accept—古薰儿 accept—美杜莎 accept—小舞 accept—宁容容 accept—白沉香
concat:将多个 Obserbavle 发射的数据进行合并发射。concat 严格按照顺序发射数据,前一个 Observable 没发射完成是不会发射后一个 Observable 的数据的
Observable
Observable
Observable.concat(just1,just2).subscribeOn(Schedulers.io()).subscribe(new Consumer
@Override
public void accept(String s) throws Exception {
System.out.println("accept---"+s);
}
});
输出:
accept—云韵 accept—古薰儿 accept—美杜莎 accept—小舞 accept—宁容容 accept—白沉香
zip:zip 操作符合并两个或者多个 Observable 发射出的数据项,根据指定的函数变换它们,并发 射一个新值。
Observable
Observable
Observable.zip(just1, just2, new BiFunction
@Override
public String apply(String s, String s2) throws Exception {
return s+"颜值第"+s2;
}
}).subscribe(new Consumer
@Override
public void accept(String s) throws Exception {
System.out.println("accept---"+s);
}
});
输出:
accept—云韵颜值第一 accept—古薰儿颜值第二 accept—美杜莎颜值第三
combineLastest:当两个 Observable 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据。
Observable
Observable
Observable.combineLatest(just1, just2, new BiFunction
@Override
public String apply(String s, String s2) throws Exception {
return s+"第"+s2;
}
}).subscribe(new Consumer
@Override
public void accept(String s) throws Exception {
System.out.println("accept---"+s);
}
});
输出:
accept—美杜莎第一 accept—美杜莎第二 accept—美杜莎第三
8.Rxjava 辅助操作符,常用的有delay、Do、subscribeOn、observeOn 和 timeout
delay:delay 操作符让原始 Observable 在发射每项数据之前都暂停一段指定的时间段。
这个测试需要在Android环境下,不能在java环境下测试,否则没有效果。
Do: Do 系列操作符就是为原始 Observable 的生命周期事件注册一个回调,当 Observable 的某个 事件发生时就会调用这些回调
doOnEach:为 Observable 注册这样一个回调,当 Observable 每发射一项数据时就会调 用它一次,包括 onNext、onError 和 onCompleted。doOnNext:只有执行 onNext 的时候会被调用。doOnSubscribe:当观察者订阅 Observable 时就会被调用。doOnUnsubscribe:当观察者取消订阅 Observable 时就会被调用; Observable 通过 onError 或者 onCompleted 结束时,会取消订阅所有的 Subscriber。doOnCompleted:当 Observable 正常终止调用 onCompleted 时会被调用。doOnError:当 Observable 异常终止调用 onError 时会被调用。doOnTerminate:当 Observable 终止(无论是正常终止还是异常终止)之前会被调用。finallyDo:当 Observable 终止(无论是正常终止还是异常终止)之后会被调用。
subscribeOn、observeOn:
subscribeOn 操作符用于指定 Observable 自身在哪个线程上运行,如果 Observable 需要执行耗时操作,一般可以让其在新开的一个子线程上运行。
observerOn 用来指定 Observer 所运行的线程,也就是发射出的数据在哪个线程上使用。一般情况下会指定在主线程中运行,这样就可以修改 UI。
timeout:如果原始 Observable 过了指定的一段时长没有发射任何数据,timeout 操作符会以一个 onError 通知终止这个 Observable,或者继续执行一个备用的 Observable。
9.Rxjava 错误处理操作符
RxJava 在错误出现的时候就会调用 Subscriber 的 onError 方法将错误分发出去,由 Subscriber 自己来处理错误。但是如果每个 Subscriber 都处理一遍的话,工作量就有点大了,这时候可以 使用错误处理操作符。错误处理操作符有 catch 和 retry。
catch:catch 操作符拦截原始 Observable 的 onError 通知,将它替换为其他数据项或数据序列,让 产生的 Observable 能够正常终止或者根本不终止。RxJava 将 catch 实现为以下 3 个不同的操 作符:
onErrorReturn:Observable 遇到错误时返回原有 Observable 行为的备用 Observable,备 用 Observable 会忽略原有 Observable 的 onError 调用,不会将错误传递给观察者。作为 替代,它会发射一个特殊的项并调用观察者的 onCompleted 方法。onErrorResumeNext: Observable 遇到错误时返回原有 Observable 行为的备用 Observable, 备用 Observable 会忽略原有 Observable 的 onError 调用,不会将错误传递给观察者。作 为替代,它会发射备用 Observable 的数据。onExceptionResumeNext:它和 onErrorResumeNext 类似。不同的是,如果 onError 收到 的 Throwable 不是一个 Exception,它会将错误传递给观察者的 onError 方法,不会使用 备用的 Observable。
Observable.create(new ObservableOnSubscribe
@Override
public void subscribe(ObservableEmitter
for (int i = 0; i < 5; i++) {
if (i>2){
emitter.onError(new Throwable("throwable"));
}
emitter.onNext(i);
}
emitter.onComplete();
}
}).onErrorReturn(new Function
@Override
public Integer apply(Throwable throwable) throws Exception {
return 6;
}
}).subscribe(new Observer
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println("accept---"+integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError---"+e.toString());
}
@Override
public void onComplete() {
System.out.println("onComplete---");
}
});
输出:
accept—0 accept—1 accept—2 accept—6 onComplete—
Observable.create(new ObservableOnSubscribe
@Override
public void subscribe(ObservableEmitter
for (int i = 0; i < 5; i++) {
if (i>2){
emitter.onError(new Throwable("throwable"));
}
emitter.onNext(i);
}
}
}).onExceptionResumeNext(new Observable
@Override
protected void subscribeActual(Observer super Integer> observer) {
observer.onNext(6);
}
}).subscribe(new Observer
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println("accept---"+integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError---"+e.toString());
}
@Override
public void onComplete() {
System.out.println("onComplete---");
}
});
输出:
accept—0 accept—1 accept—2 onError—java.lang.Throwable: throwable
retry:retry 操作符不会将原始 Observable 的 onError 通知传递给观察者,它会订阅这个 Observable,再给它一次机会无错误地完成其数据序列。 retry 总是传递 onNext 通知给观察者,由于重新订阅,这可能会造成数据项重复。RxJava 中的实现为 retry 和 retryWhen。
Observable.create(new ObservableOnSubscribe
@Override
public void subscribe(ObservableEmitter
for (int i = 0; i < 5; i++) {
if (i==1){
emitter.onError(new Throwable("throwable"));
}
emitter.onNext(i);
}
}
}).retry(2).subscribe(new Observer
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println("accept---"+integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError---"+e.toString());
}
@Override
public void onComplete() {
System.out.println("onComplete---");
}
});
accept—0 accept—0 accept—0 onError—java.lang.Throwable: throwable
10.布尔操作符,常用的all、contains、isEmpty
all:操作符根据一个函数对源 Observable 发射的所有数据进行判断,最终返回的结果就是这 个判断结果。这个函数使用发射的数据作为参数,内部判断所有的数据是否满足我们定义好的 判断条件。如果全部都满足则返回 true,否则就返回 false。
Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").all(new Predicate
@Override
public boolean test(String s) throws Exception {
return s.startsWith("唐");
}
}).subscribe(new Consumer
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println("accept---"+aBoolean);
}
});
}
输出:
accept—false
contains、isEmpty:contains 操作符用来判断源 Observable 所发射的数据是否包含某一个数据。如果包含该数据,会返回 true;如果源 Observable 已经结束了却还没有发射这个数据,则返回 false。isEmpty操作符用来判断源 Observable 是否发射过数据。
Observable.just("唐三","唐昊","唐晨","小舞","马红俊","宁容蓉").contains("唐三").subscribe(new Consumer
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println("accept---"+aBoolean);
}
});
输出:
accept—true
11.条件操作符,常用的amb、defaultIfEmpty
amb:amb 操作符对于给 定两个 或多个 Observable,它只 发射首先发射数 据或通知 的那个 Observable 的所有数据。
defaultIfEmpty:发射来自原始 Observable 的数据。如果原始 Observable 没有发射数据,就发射一个默认数据
Observable.create(new ObservableOnSubscribe
@Override
public void subscribe(ObservableEmitter
emitter.onComplete();
}
}).defaultIfEmpty("萧炎").subscribe(new Consumer
@Override
public void accept(String s) throws Exception {
System.out.println("accept---"+s);
}
});
输出:
accept—萧炎
12.转换操作符:常用的oList、toSortedList、toMap
toList:toList 操作符将发射多项数据且为每一项数据调用 onNext 方法的 Observable 发射的多项数 据组合成一个 List,然后调用一次 onNext 方法传递整个列表。
Observable.just("唐三","唐昊","唐晨").toList().subscribe(new Consumer>() {
@Override
public void accept(List
for (int i = 0; i < strings.size(); i++) {
String s = strings.get(i);
System.out.println("accept---"+s);
}
}
});
输出:
accept—唐三 accept—唐昊 accept—唐晨
toSortedList:toSortedList 操作符类似于 toList 操作符;不同的是,它会对产生的列表排序,默认是自然升序。如果发射的数据项没有实现 Comparable 接口,会抛出一个异常。当然,若发射的数据项 没有实现 Comparable 接口,可以使用 toSortedList(Func2)变体,其传递的函数参数 Func2 会作 用于比较两个数据项。
Observable.just("唐三","小舞","唐昊","唐晨").toSortedList().subscribe(new Consumer>() {
@Override
public void accept(List
for (int i = 0; i < strings.size(); i++) {
String s = strings.get(i);
System.out.println("accept---"+s);
}
}
});
输出:
accept—唐三 accept—唐昊 accept—唐晨 accept—小舞
toMap:toMap 操作符收集原始 Observable 发射的所有数据项到一个 Map(默认是 HashMap),然 后发射这个 Map。
Douluo tangsan=new Douluo("唐三","100级");
Douluo douluo=new Douluo("菊斗罗","95级");
Douluo tangchen=new Douluo("唐晨","99级");
Observable.just(tangchen,douluo,tangchen).toMap(new Function
@Override
public String apply(Douluo douluo) throws Exception {
return douluo.getLevel();
}
}).subscribe(new Consumer
@Override
public void accept(Map
String name = stringDouluoMap.get("99级").getName();
System.out.println("accept---"+name);
}
});
输出:
accept—唐晨
13.rxjava 线程控制
如果不指定线程,默认是在调用 subscribe 方法的线程上进行回调的。如果我们想切换 线程,就需要使用 Scheduler。RxJava 已经内置了常用的如下 4个 Scheduler:
Schedulers.newThread():总是启用新线程,并在新线程执行操作。Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。 行为模式和 newThread()差不多,区别在于 io() 的内部实现是用一个无数量上限的线 程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。Schedulers.computation():计算所使用的 Scheduler,例如图形的计算。这个 Scheduler 使用固定线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。它是 buffer、debounce、delay、interval、sample 和 skip 操作符的默认调度器。Schedulers.trampoline():当我们想在当前线程执行一个任务时,并不是立即时,可以 用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中的每一个 任务。它是 repeat 和 retry 操作符默认的调度器。
另外,RxAndroid 也提供了一个常用的 Scheduler:
AndroidSchedulers.mainThread()—RxAndroid 库中提供的 Scheduler,它指定的操作在 主线程中运行。
在 RxJava 中用 subscribeOn 和 observeOn 操作符来控制线程。
以上就是Rxjava全部的内容了,总结一下方便查阅使用。
Android-Rxjava在项目中的应用
好文链接
大家都在找:
Android:android工程师
rxjava:rxjava教程
rxandroid:rxandroid 不推荐
发表评论