1.Rx思维 (起点到终点链条不断,响应式编程)

起点(分发事件(Path): 我饿了)--下楼--去餐厅---点餐--->终点(吃饭消费事件)

程序中的例子

起点(分发事件,点击登录)---登录API---请求服务器---获取响应码---->终点(更新UI登录成功 消费事件)

2.核心思想

有一个起点 和 一个终点, 起点开始流向我们的"事件" 把事件流向到终点,只不过在流向的过程中,可以增加拦截,

拦截时可以对"事件进行改变", 终点只关心它上一个拦截;

---------------------------------------------------------------

Observable 根据上层变化而变化过滤 Observer

被观察者 订阅(上层区域 和 下层区域 关联) 观察者

起点 终点

---------------------------------------------------------------

标准观察者: RxJava的观察者模式:

3.RxJava配合Retrofit

1> 数据模块

// 总数据Bean

public class ProjectBean {

private int errorCode;

private String errorMsg;

private List data;

public static class DataBean {

private int courseId;

private int id;

private String name;

private int order;

private int parentChapterId;

private boolean userControlSetTop;

private int visible;

private List children;

}

}

// Item数据

public class ProjectItem {

private DataBean data;

private int errorCode;

private String errorMsg;

public static class DataBean {

private int curPage;

private int offset;

private boolean over;

private int pageCount;

private int size;

private int total;

private List datas;

public static class DatasBean {

private String apkLink;

private String desc;

private String envelopePic;

private boolean fresh;

private int id;

private String link;

private String niceDate;

private String origin;

private String prefix;

private String projectLink;

private List tags;

public static class TagsBean {

private String name; // 项目

private String url; ///project/list/1?cid=294

}

}

}

}

2> Api模块

public interface WanAndroidApi {

//总数据

@GET("project/tree/json")

Observable getProject(); //异步线程耗时操作

//Item数据

@GET("project/list/{pageIndex}/json") //?cid=294 异步线程 耗时操作

Observable getProjectItem(@Path("pageIndex") int pageIndex, @Query("cid") int cid);

}

3> 业务操作

public class UseActivity extends AppCompatActivity {

private WanAndroidApi api;

public static final String TAG = "rxJava";

@Override

protected void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_use);

api = HttpUtil.getOnlineCookieRetrofit().create(WanAndroidApi.class);

antiShakeAction();

}

//TODO Retrofit+RxJava 查询 项目分类 (总数据查询)

Disposable disposable;

public void getProjectAction(View view) {

disposable = api.getProject()

.subscribeOn(Schedulers.io()) //给上面分配异步线程

.observeOn(AndroidSchedulers.mainThread())//给下面主线程

.subscribe(new Consumer() {//简化版

@Override

public void accept(ProjectBean projectBean) throws Exception {

Log.d(TAG, "accept: " + projectBean.toString());//可以做UI操作

}

});

}

//TODO Retrofit+RxJava 查询 项目分类的 去获取项目列表数据

public void getProjectListAction(View view) {

disposable = api.getProjectItem(1, 294)

.compose(rxUD())

.subscribe(projectItem -> {

Log.d(TAG, "accept: " + projectItem.toString());//可以做UI操作

});

}

/**

* 封装我们线程调度的操作

* UD : upstream 上游 , downstream 下游

*/

public static ObservableTransformer rxUD() {

return upstream -> {

return upstream.subscribeOn(Schedulers.io()) //给上面代码(subscribeOn)分配异步线程

.observeOn(AndroidSchedulers.mainThread()); //给下面代码(observeOn)分配主线程

};

}

@Override

protected void onDestroy() {

super.onDestroy();

if (disposable != null && !disposable.isDisposed()) {

disposable.dispose();

}

}

4.防抖 rxBinding 与 网络嵌套

public class UseActivity extends AppCompatActivity {

private WanAndroidApi api;

public static final String TAG = "rxJava";

@Override

protected void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_use);

api = HttpUtil.getOnlineCookieRetrofit().create(WanAndroidApi.class);

antiShakeAction();

}

//TODO Retrofit+RxJava 查询 项目分类 (总数据查询)

Disposable disposable;

/**

* TODO 自定义 功能防抖

*/

private void antiShakeAction() {

//对Button防抖动

Button btn = findViewById(R.id.bt_anti_shake);

disposable = RxView.clicks(btn)

.throttleFirst(2000, TimeUnit.MILLISECONDS) //两秒内 响应一次

.subscribe(new Consumer() {

@Override

public void accept(Object o) throws Exception {

Log.d(TAG, "accept: 我响应了一次");

}

});

}

//TODO 防抖功能 + 网络嵌套 (解决网络嵌套问题) flatMap

private void antiShakeActionUpdate() {

//对Button防抖动

Button btn = findViewById(R.id.bt_anti_shake);

disposable = RxView.clicks(btn)//使用RxView来进行防抖

.throttleFirst(2000, TimeUnit.MILLISECONDS) //两秒内 响应一次

//我只给下面切换 异步线程

.observeOn(Schedulers.io())

//使用flatMap 自己分发 10个数据 onNext(1)-> 给下面 1-->多分发 10个数据

//将Object 转换成 ObservableSource 传递给下一层

.flatMap(new Function>() {

@Override

public ObservableSource apply(Object o) throws Exception {

return api.getProject(); //返回 Observable

}

})

//将 ProjectBean 转换为 ObservableSource 传递给下一层

.flatMap(new Function>() {

@Override

public ObservableSource apply(@NonNull ProjectBean projectBean) {

return Observable.fromIterable(projectBean.getData());//自己搞一个发射器 发多次;;

}

})

// 将ProjectBean.DataBean 转换成 ObservableSource 传递给下一层

.flatMap(new Function>() {

@Override

public ObservableSource apply(@NonNull ProjectBean.DataBean dataBean) {

return api.getProjectItem(1, dataBean.getId()); //获取Item数据

}

})

.observeOn(AndroidSchedulers.mainThread()) //给下面使用的线程

.subscribe(new Consumer() {

@Override

public void accept(ProjectItem projectItem) {

//如果我要更新UI,要切会主线程

Log.d(TAG, "antiShakeAction: " + projectItem.toString());

}

});

}

@Override

protected void onDestroy() {

super.onDestroy();

if (disposable != null && !disposable.isDisposed()) {

disposable.dispose();

}

}

5.doOnNext(频繁的线程切换)

请求服务器注册(耗时操作)

更新注册UI(main线程)

请求服务器登录(耗时操作)

更新注册UI(main线程)

GitHub - ReactiveX/RxJava: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

ReactiveX

玩Android 开放API-玩Android - wanandroid.com

https://www.bejson.com/json2javapojo/new/%27

玩Android 开放API-玩Android - wanandroid.com

https://www.bejson.com/json2javapojo/new/%27

1> RxJava Hook

RxJava 1.x

@NonNull

public static Observable onAssembly(@NonNull Observable source) {

//预留给2.x

return source;

}

RxJava2.x RxJavaPlugins中

@NonNull

public static Observable onAssembly(@NonNull Observable source) {

//默认情况下 f==null

Function f = onObservableAssembly;

if (f != null) {

return apply(f, source);

}

return source;

}

public static void setOnObservableAssembly(@Nullable Function onObservableAssembly) {

if (lockdown) throw new IllegalStateException("Plugins can't be changed anymore");

RxJavaPlugins.onObservableAssembly = onObservableAssembly;

}

public static void main(String[] args) {

//Hook之前的监听 是static 全局的 Hook 很多操作符都会经过[RxJavaPlugins.onAssembly]

RxJavaPlugins.setOnObservableAssembly(new Function() {

@Override

public Observable apply(Observable observable) throws Exception {

System.out.println("apply: 整个项目全局监听到底有多少地方使用了 RxJava"+observable);

return observable;//不破坏人家的功能

}

});

testJust();

}

public static void testJust(){

Observable.just(1, 2, 4).subscribe(new Consumer() {

@Override

public void accept(Integer integer) throws Exception {

System.out.println("integer==>" + integer);

}

});

}

//执行结果

整个项目全局监听 observable: io.reactivex.internal.operators.observable.ObservableFromArray@64a294a6

integer==>1

integer==>2

integer==>4

2> RxJava的观察者模式

//起点被观察者

Observable.just(PATH) //TODO 第二步 内部会分发 返回 Observable

//TODO 第三步 卡片式拦截 将String 转换为 Bitmap

.map(new Function() {

@Override

public Bitmap apply(String s) throws Exception {

URL url = new URL(PATH);

HttpURLConnection conn = (HttpURLConnection) url.openConnection();

conn.setConnectTimeout(5000);

int responseCode = conn.getResponseCode();

if (responseCode == HttpURLConnection.HTTP_OK) {

InputStream inputStream = conn.getInputStream();

return BitmapFactory.decodeStream(inputStream);

}

return null;

}

})

//日志记录

.map(new Function() {

@Override

public Bitmap apply(Bitmap bitmap) throws Exception {

Log.d(TAG, "apply: " + getNormalTime(System.currentTimeMillis()) + "下载了图片");

return bitmap;//将上个返回的数据继续流给下游;

}

})

.map(new Function() {

@Override

public Bitmap apply(Bitmap bitmap) throws Exception {

Paint paint = new Paint();

paint.setTextSize(88);

paint.setColor(Color.RED);

return drawTextToBitmap(bitmap, "NorthStar", paint, 88, 88);

}

})

//TODO 将上下游调度网络的操作放入compose中

//.subscribeOn(Schedulers.io()) //给上面代码 分配异步线程

//.observeOn(AndroidSchedulers.mainThread()) //给下面代码分配主线程

.compose(rxUD())

//TODO 订阅 起点 和 终点 订阅起来 上层改变 下层会响应改变

.subscribe(

//终点观察者

new Observer() {

@Override//TODO 第一步 订阅开始 预备开始要分发

public void onSubscribe(Disposable d) {

progressDialog = new ProgressDialog(DownloadActivity.this);

progressDialog.setTitle("下载图片中...");

progressDialog.show();

}

@Override// TODO 第四步 拿到事件

public void onNext(Bitmap bitmap) {

iv.setImageBitmap(bitmap);

}

@Override//错误事件

public void onError(Throwable e) {

Log.d(TAG, "onError: " + e.getMessage());

}

@Override//TODO 第五步 完成事件

public void onComplete() {

if (progressDialog != null) {

progressDialog.dismiss();

}

}

});

/**

* 封装我们线程调度的操作

* UD : upstream 上游 , downstream 下游

*/

public static ObservableTransformer rxUD() {

return new ObservableTransformer() {

@Override

public ObservableSource apply(Observable upstream) {

return upstream.subscribeOn(Schedulers.io()) //给上面代码分配异步线程

.observeOn(AndroidSchedulers.mainThread()) //给下面代码分配主线程

.map(new Function() {

@Override

public UD apply(UD ud) throws Exception {

Log.d(TAG, "apply: 我监听到你了, 居然在执行");

return ud;

}

});

}

};

}

//2.Observable 被观察者/发布者 起点 创建过程 : new ObservableCreate(){source=自定义source}

//传入自定义source → creat(ObservableOnSubscribe soure

//ObservableCreate extends Observable

Observable.create(

// 传入自定义source → creat(ObservableOnSubscribe soure

new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

emitter.onNext("A");

}

})

// ObservableCreate.map

// ObservableMap extends AbstractObservableWithUpstream

// AbstractObservableWithUpstream extends Observable implements HasUpstreamObservableSource

// ObservableSource source();

.map(new Function() { //发送一次

@Override

public Bitmap apply(String s) throws Exception {

return null;

}

})

.flatMap(new Function>() { //发送很多次

@Override

public ObservableSource apply(Bitmap bitmap) throws Exception {

ObservableSource bmp=new Observable() {

@Override

protected void subscribeActual(Observer observer) {

}

};

return bmp;

}

})

.doOnNext(new Consumer() {

@Override

public void accept(Bitmap o) throws Exception {

}

})

.subscribeOn(Schedulers.io()) //给上面调度 异步线程

.observeOn(AndroidSchedulers.mainThread()) //给下面调度 主线程

//3.subscribe 订阅过程 ObservableMap.subscribe

.subscribe(

//1.自定义观察者 Observer 终点

new Observer() { //interface

@Override // 谁调subscribe 就在那个线程中,不参与调度

public void onSubscribe(Disposable d) {

}

@Override

public void onNext(Bitmap aBoolean) {

}

@Override

public void onError(Throwable e) {

}

@Override

public void onComplete() {

}

});

}

自定义source ObservableCreate subscribe订阅 ← 自定义观察者

CreateEmitter.OnNext() subscribeActual(自定义观察者) ← subscribeActual new Observer()

↖ ↘ ↓ 创建发射器

↖ ↘CreateEmitter(自定义观察者)---------------------------->onNext();

↖ ↓ 调用

[new ObservableOnSubscribe{}] observer.onSubscribe()

callback ↖ ↓

↖source.subscribe(发射器)

3> Observable 创建过程时序图如下

Observable(被观察者) ObservableOnSubscribe RxJavaPlugins ObservableCreate

| | | |

|1.new ObservableOnSubscribe() | | |

|------------------------------------->| | |

|─┐2.Create | | |

|<┘ | | |

| 3.onAssembly(new ObservableCreate()) | |

|------------------------------------------------------->|4.new ObservableCreate()|

| | |----------------------->|

| 6.返回ObservableCreate对象 | 5.ObservableCreate |

|<-------------------------------------------------------|<-----------------------|

| | | |

Observable(被观察者) ObservableOnSubscribe RxJavaPlugins ObservableCreate

标准观察者模式 RxJava(耦合度低)

↙Observer1 被观察者(发布) 抽象层(发射器) map

Observable ← Observer2 ObservableOnSubscribe CreateEmitter map

List observers ↖Observer3 source.onNext()------->.onNext() ↓

add observer ... ↘ 自定义观察者(订阅)

remove observer Observer

notify observer .onNext()

4>ObservableMap源码分析

一个Map源码分析:

代码区域 流程区域

Observable.create | | ObservableCreate ObservableMap Observable

↓ | |← subscribeActual(obse) ← subscribeActual(obse) ← .subscribe

ObservableCreate |ObservableOnSubscribe|

↓ | 自定义 source | ↓ ↓ ↓

ObservableMap | |

↓ | |--->CreateEmitter--------->MapObserver--------->自定义Observer

Observable.subscribe | |

5>RxJava线程切换

/**

* Calls the associated hook function.

* @param defaultScheduler the hook's input value

* @return the value returned by the hook

*/

@NonNull

public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {

Function f = onIoHandler;

//默认是null

if (f == null) {

return defaultScheduler;

}

return apply(f, defaultScheduler);

}

最终 Scheduler IO scheduler.io()

/**

* Sets the specific hook function.

* @param handler the hook function to set, null allowed

*/

public static void setIoSchedulerHandler(@Nullable Function handler) {

if (lockdown) {

throw new IllegalStateException("Plugins can't be changed anymore");

}

onIoHandler = handler;

}

Schedulers有很多细节

//TODO 1.Schedulers.io() 线程策略机制

Schedulers----> Scheduler IO---->new IOTask()--->DEFAULT =new IoScheduler()-->线程池 ExecutorService

.subscribeOn(

//RxJavaPlugins.onIoScheduler(IO); HOOK技术 Calls the associated hook function.

Schedulers.io() //策略机制

)

@NonNull

public static Scheduler io() {

return RxJavaPlugins.onIoScheduler(IO);

}

IO = RxJavaPlugins.initIoScheduler(new IOTask());

static final class IOTask implements Callable {//有返回值的任务 返回Scheduler

@Override

public Scheduler call() throws Exception {

return IoHolder.DEFAULT;

}

}

static final Scheduler DEFAULT = new IoScheduler();

public IoScheduler() { this(WORKER_THREAD_FACTORY); }

public IoScheduler(ThreadFactory threadFactory) {

this.threadFactory = threadFactory;

this.pool = new AtomicReference(NONE); //线程池

start();

}

//TODO 2.subscribeOn() new IoScheduler---线程池 传入

public final Observable subscribeOn(Scheduler scheduler) {

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));

}

public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {

super(source);

this.scheduler = scheduler;

}

//在终点 订阅时 会调用 subscribeActual()

public final void subscribe(Observer observer) {

subscribeActual(observer);

}

//这个在ObservableSubscribeOn中

@Override

public void subscribeActual(final Observer s) {

final SubscribeOnObserver parent = new SubscribeOnObserver(s);

s.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

}

final class SubscribeTask implements Runnable {

private final SubscribeOnObserver parent;

SubscribeTask(SubscribeOnObserver parent) {

this.parent = parent;

}

@Override

public void run() {

source.subscribe(parent);

}

}

//ioScheduler(线程池).scheduleDirect(new SubscribeTask(parent))

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

final Worker w = createWorker(); == IoScheduler.createWorker(){return new EventLoopWorker(pool.get());}

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//自己包装了 Runnable

DisposeTask task = new DisposeTask(decoratedRun, w);//包装一层 Runnable

w.schedule(task, delay, unit);//↓ EventLoopWorker.schedule

return task;

}

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {

return threadWorker.scheduleActual(action, delayTime, unit, tasks);//↓

}

@NonNull

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime,

@NonNull TimeUnit unit, @Nullable DisposableContainer parent) {

Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

Future f;

try {

if (delayTime <= 0) {

f = executor.submit((Callable)sr);//将任务最终交给线程池执行

} else {

f = executor.schedule((Callable)sr, delayTime, unit);

}

sr.setFuture(f);

}

return sr;

}

6>SubscribeOn(Schedulers.io()) 时序图

属于异步线程

自定义 ObservableCreate SubscribeTask ObservableSubscribeOn<--subscribe(订阅)<-- 终点

source subscribeActual(包裹1) subscribeActual(终点) subscribeActual Observer

↖ ↓ run{ ↓

包裹2.onNext source.subXX(包裹2) <--source.subXX() SubscribeOnObserver(Observer)

↘ ↓ } 终点存放

CreateEmitter ↓

包裹1存放 包裹2 线程池

onNext(){ executor.submit((Callable)sr)

包裹1.onNext() 这时从线程池中执行异步任务

}

异步线程

Scheduler MAIN_THREAD <---------------------------- ObserverOn(AndroidSchedulers.mainThread())

DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));-> HandlerScheduler$.HandlerWork

scheduler{handler.sendMessage(run)}

好文链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: 

发表评论

返回顶部暗黑模式