要使用Rxjava首先要导入两个包,其中rxandroid是rxjava在android中的扩展

implementation 'io.reactivex:rxandroid:1.2.1'

implementation 'io.reactivex:rxjava:1.2.0'

Subscriber翻译成中文为订阅者,这里要和Subscribe区分开,虽然只有最后差一个字母,但是Subscribe是动词订阅的意思,在RXjava中是observable的一个方法,也可以称作函数

observable.subscribe(subscriber);`

observable就是我们的被观察者,也就是事件发生源。

由于Subscriber源代码不多,就直接贴源码啦

/**

* Provides a mechanism for receiving push-based notifications from Observables, and permits manual

* unsubscribing from these Observables.

*

* After a Subscriber calls an {@link Observable}'s {@link Observable#subscribe subscribe} method, the

* {@link Observable} calls the Subscriber's {@link #onNext} method to emit items. A well-behaved

* {@link Observable} will call a Subscriber's {@link #onCompleted} method exactly once or the Subscriber's

* {@link #onError} method exactly once.

*

* @see ReactiveX documentation: Observable

* @param

* the type of items the Subscriber expects to observe

*/

public abstract class Subscriber implements Observer, Subscription {

// represents requested not set yet

private static final long NOT_SET = Long.MIN_VALUE;

private final SubscriptionList subscriptions;

private final Subscriber subscriber;

/* protected by `this` */

private Producer producer;

/* protected by `this` */

private long requested = NOT_SET; // default to not set

protected Subscriber() {

this(null, false);

}

/**

* Construct a Subscriber by using another Subscriber for backpressure and

* for holding the subscription list (when this.add(sub) is

* called this will in fact call subscriber.add(sub)).

*

* @param subscriber

* the other Subscriber

*/

protected Subscriber(Subscriber subscriber) {

this(subscriber, true);

}

/**

* Construct a Subscriber by using another Subscriber for backpressure and

* optionally for holding the subscription list (if

* shareSubscriptions is true then when

* this.add(sub) is called this will in fact call

* subscriber.add(sub)).

*

* To retain the chaining of subscribers when setting

* shareSubscriptions to false, add the created

* instance to {@code subscriber} via {@link #add}.

*

* @param subscriber

* the other Subscriber

* @param shareSubscriptions

* {@code true} to share the subscription list in {@code subscriber} with

* this instance

* @since 1.0.6

*/

protected Subscriber(Subscriber subscriber, boolean shareSubscriptions) {

this.subscriber = subscriber;

this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();

}

/**

* Adds a {@link Subscription} to this Subscriber's list of subscriptions if this list is not marked as

* unsubscribed. If the list is marked as unsubscribed, {@code add} will indicate this by

* explicitly unsubscribing the new {@code Subscription} as well.

*

* @param s

* the {@code Subscription} to add

*/

public final void add(Subscription s) {

subscriptions.add(s);

}

@Override

public final void unsubscribe() {

subscriptions.unsubscribe();

}

/**

* Indicates whether this Subscriber has unsubscribed from its list of subscriptions.

*

* @return {@code true} if this Subscriber has unsubscribed from its subscriptions, {@code false} otherwise

*/

@Override

public final boolean isUnsubscribed() {

return subscriptions.isUnsubscribed();

}

/**

* This method is invoked when the Subscriber and Observable have been connected but the Observable has

* not yet begun to emit items or send notifications to the Subscriber. Override this method to add any

* useful initialization to your subscription, for instance to initiate backpressure.

*/

public void onStart() {

// do nothing by default

}

/**

* Request a certain maximum number of emitted items from the Observable this Subscriber is subscribed to.

* This is a way of requesting backpressure. To disable backpressure, pass {@code Long.MAX_VALUE} to this

* method.

*

* Requests are additive but if a sequence of requests totals more than {@code Long.MAX_VALUE} then

* {@code Long.MAX_VALUE} requests will be actioned and the extras may be ignored. Arriving at

* {@code Long.MAX_VALUE} by addition of requests cannot be assumed to disable backpressure. For example,

* the code below may result in {@code Long.MAX_VALUE} requests being actioned only.

*

*

* request(100);

* request(Long.MAX_VALUE-1);

*

*

* @param n the maximum number of items you want the Observable to emit to the Subscriber at this time, or

* {@code Long.MAX_VALUE} if you want the Observable to emit items at its own pace

* @throws IllegalArgumentException

* if {@code n} is negative

*/

protected final void request(long n) {

if (n < 0) {

throw new IllegalArgumentException("number requested cannot be negative: " + n);

}

// if producer is set then we will request from it

// otherwise we increase the requested count by n

Producer producerToRequestFrom;

synchronized (this) {

if (producer != null) {

producerToRequestFrom = producer;

} else {

addToRequested(n);

return;

}

}

// after releasing lock (we should not make requests holding a lock)

producerToRequestFrom.request(n);

}

private void addToRequested(long n) {

if (requested == NOT_SET) {

requested = n;

} else {

final long total = requested + n;

// check if overflow occurred

if (total < 0) {

requested = Long.MAX_VALUE;

} else {

requested = total;

}

}

}

/**

* If other subscriber is set (by calling constructor

* {@link #Subscriber(Subscriber)} or

* {@link #Subscriber(Subscriber, boolean)}) then this method calls

* setProducer on the other subscriber. If the other subscriber

* is not set and no requests have been made to this subscriber then

* p.request(Long.MAX_VALUE) is called. If the other subscriber

* is not set and some requests have been made to this subscriber then

* p.request(n) is called where n is the accumulated requests

* to this subscriber.

*

* @param p

* producer to be used by this subscriber or the other subscriber

* (or recursively its other subscriber) to make requests from

*/

public void setProducer(Producer p) {

long toRequest;

boolean passToSubscriber = false;

synchronized (this) {

toRequest = requested;

producer = p;

if (subscriber != null) {

// middle operator ... we pass through unless a request has been made

if (toRequest == NOT_SET) {

// we pass through to the next producer as nothing has been requested

passToSubscriber = true;

}

}

}

// do after releasing lock

if (passToSubscriber) {

subscriber.setProducer(producer);

} else {

// we execute the request with whatever has been requested (or Long.MAX_VALUE)

if (toRequest == NOT_SET) {

producer.request(Long.MAX_VALUE);

} else {

producer.request(toRequest);

}

}

}

}

通过读源码可知Subscriber实现了 Observer接口,毫无疑问,Subscriber是一个观察者,Observer源码如下,简单易懂,这里不多说,看不懂的可以看另一篇 Android中的RxJava

public interface Observer {

/**

* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.

*

* The {@link Observable} will not call this method if it calls {@link #onError}.

*/

void onCompleted();

/**

* Notifies the Observer that the {@link Observable} has experienced an error condition.

*

* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or

* {@link #onCompleted}.

*

* @param e

* the exception encountered by the Observable

*/

void onError(Throwable e);

/**

* Provides the Observer with a new item to observe.

*

* The {@link Observable} may call this method 0 or more times.

*

* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or

* {@link #onError}.

*

* @param t

* the item emitted by the Observable

*/

void onNext(T t);

}

首先Subscriber是一个抽象类,所以不能直接使用,又实现了Subscription接口,那么这个Subscription是干什么的呢,源码也不多,贴上

/**

* Subscription returns from {@link Observable#subscribe(Subscriber)} to allow unsubscribing.

*

* See the utilities in {@link Subscriptions} and the implementations in the {@code rx.subscriptions} package.

*

* This interface is the RxJava equivalent of {@code IDisposable} in Microsoft's Rx implementation.

*/

public interface Subscription {

/**

* Stops the receipt of notifications on the {@link Subscriber} that was registered when this Subscription

* was received.

*

* This allows deregistering an {@link Subscriber} before it has finished receiving all events (i.e. before

* onCompleted is called).

*/

void unsubscribe();

/**

* Indicates whether this {@code Subscription} is currently unsubscribed.

*

* @return {@code true} if this {@code Subscription} is currently unsubscribed, {@code false} otherwise

*/

boolean isUnsubscribed();

}

Subscription翻译成中文是名词订阅的意思,在 执行 Observable.subscribe(Subscriber);后return的对象就是一个Subscription; Subscription 一共两个方法 ,unsubscribe()允许订阅后取消订阅,isUnsubscribed()判断是否已经取消了订阅。Subscription搞完了我们继续回到Subscriber源代码。 Subscriber一共有三个构造方法

protected Subscriber() {

this(null, false);

}

/**

* Construct a Subscriber by using another Subscriber for backpressure and

* for holding the subscription list (when this.add(sub) is

* called this will in fact call subscriber.add(sub)).

*

* @param subscriber

* the other Subscriber

*/

protected Subscriber(Subscriber subscriber) {

this(subscriber, true);

}

/**

* Construct a Subscriber by using another Subscriber for backpressure and

* optionally for holding the subscription list (if

* shareSubscriptions is true then when

* this.add(sub) is called this will in fact call

* subscriber.add(sub)).

*

* To retain the chaining of subscribers when setting

* shareSubscriptions to false, add the created

* instance to {@code subscriber} via {@link #add}.

*

* @param subscriber

* the other Subscriber

* @param shareSubscriptions

* {@code true} to share the subscription list in {@code subscriber} with

* this instance

* @since 1.0.6

*/

protected Subscriber(Subscriber subscriber, boolean shareSubscriptions) {

this.subscriber = subscriber;

this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();

}

前来两个构造最后都会调用第三个,直接看第三个构造方法,Subscriber(Subscriber subscriber, boolean shareSubscriptions)需要两个参数,subscriber毫无疑问是一个观察者,并赋值给了 this.subscriber, 那么shareSubscriptions是什么鸟东西呢

this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();

通过上面代码逻辑知道,只有当shareSubscriptions为true和subscriber不为空时, this.subscriptions才会使用传入的subscriber.subscriptions,否则new 个SubscriptionList,SubscriptionList又是什么呢,SubscriptionList也是一个Subscription ,内部还有一个Subscription list可以unsubscribed together,剩下的就是对内部list增删改查操作

/**

* Subscription that represents a group of Subscriptions that are unsubscribed together.

*

* @see Rx.Net equivalent CompositeDisposable

*/

public final class SubscriptionList implements Subscription {

private List subscriptions;

private volatile boolean unsubscribed;

/**

* Constructs an empty SubscriptionList.

*/

public SubscriptionList() {

// nothing to do

}

/**

* Constructs a SubscriptionList with the given initial child subscriptions.

* @param subscriptions the array of subscriptions to start with

*/

public SubscriptionList(final Subscription... subscriptions) {

this.subscriptions = new LinkedList(Arrays.asList(subscriptions));

}

/**

* Constructs a SubscriptionList with the given initial child subscription.

* @param s the initial subscription instance

*/

public SubscriptionList(Subscription s) {

this.subscriptions = new LinkedList();

this.subscriptions.add(s);

}

@Override

public boolean isUnsubscribed() {

return unsubscribed;

}

/**

* Adds a new {@link Subscription} to this {@code SubscriptionList} if the {@code SubscriptionList} is

* not yet unsubscribed. If the {@code SubscriptionList} is unsubscribed, {@code add} will

* indicate this by explicitly unsubscribing the new {@code Subscription} as well.

*

* @param s

* the {@link Subscription} to add

*/

public void add(final Subscription s) {

if (s.isUnsubscribed()) {

return;

}

if (!unsubscribed) {

synchronized (this) {

if (!unsubscribed) {

List subs = subscriptions;

if (subs == null) {

subs = new LinkedList();

subscriptions = subs;

}

subs.add(s);

return;

}

}

}

// call after leaving the synchronized block so we're not holding a lock while executing this

s.unsubscribe();

}

public void remove(final Subscription s) {

if (!unsubscribed) {

boolean unsubscribe;

synchronized (this) {

List subs = subscriptions;

if (unsubscribed || subs == null) {

return;

}

unsubscribe = subs.remove(s);

}

if (unsubscribe) {

// if we removed successfully we then need to call unsubscribe on it (outside of the lock)

s.unsubscribe();

}

}

}

/**

* Unsubscribe from all of the subscriptions in the list, which stops the receipt of notifications on

* the associated {@code Subscriber}.

*/

@Override

public void unsubscribe() {

if (!unsubscribed) {

List list;

synchronized (this) {

if (unsubscribed) {

return;

}

unsubscribed = true;

list = subscriptions;

subscriptions = null;

}

// we will only get here once

unsubscribeFromAll(list);

}

}

private static void unsubscribeFromAll(Collection subscriptions) {

if (subscriptions == null) {

return;

}

List es = null;

for (Subscription s : subscriptions) {

try {

s.unsubscribe();

} catch (Throwable e) {

if (es == null) {

es = new ArrayList();

}

es.add(e);

}

}

Exceptions.throwIfAny(es);

}

/* perf support */

public void clear() {

if (!unsubscribed) {

List list;

synchronized (this) {

list = subscriptions;

subscriptions = null;

}

unsubscribeFromAll(list);

}

}

/**

* Returns true if this composite is not unsubscribed and contains subscriptions.

* @return {@code true} if this composite is not unsubscribed and contains subscriptions.

*/

public boolean hasSubscriptions() {

if (!unsubscribed) {

synchronized (this) {

return !unsubscribed && subscriptions != null && !subscriptions.isEmpty();

}

}

return false;

}

}

下面回到Subscriber分析,Subscriber的add,unsubscribe,isUnsubscribed也都是对 Subscriber内部的SubscriptionList对象subscriptions的add,unsubscribe,isUnsubscribed操作。 Subscriber内部还有一个producer对象,最后面还没分析的三个函数都涉及到这个对象。

/**

* Interface that establishes a request-channel between an Observable and a Subscriber and allows

* the Subscriber to request a certain amount of items from the Observable (otherwise known as

* backpressure).

*

*

The request amount only affects calls to {@link Subscriber#onNext(Object)}; onError and onCompleted may appear without

* requests.

*

*

However, backpressure is somewhat optional in RxJava 1.x and Subscribers may not

* receive a Producer via their {@link Subscriber#setProducer(Producer)} method and will run

* in unbounded mode. Depending on the chain of operators, this can lead to {@link rx.exceptions.MissingBackpressureException}.

*/

public interface Producer {

/**

* Request a certain maximum number of items from this Producer. This is a way of requesting backpressure.

* To disable backpressure, pass {@code Long.MAX_VALUE} to this method.

*

* Requests are additive but if a sequence of requests totals more than {@code Long.MAX_VALUE} then

* {@code Long.MAX_VALUE} requests will be actioned and the extras may be ignored. Arriving at

* {@code Long.MAX_VALUE} by addition of requests cannot be assumed to disable backpressure. For example,

* the code below may result in {@code Long.MAX_VALUE} requests being actioned only.

*

*

* request(100);

* request(Long.MAX_VALUE-1);

*

*

* @param n the maximum number of items you want this Producer to produce, or {@code Long.MAX_VALUE} if you

* want the Producer to produce items at its own pace

* @throws IllegalArgumentException if the request amount is negative

*/

void request(long n);

}

Producer是一个接口,内部只有一个 void request(long n) 方法,这里就开始提到了backpressure 也就是背压的概念。在Observable和Subscriber之间建立请求通道,并允许Subscriber从Observable请求一定数量的项目(也称为背压)。请求量仅影响对Subscriber.onNext(Object)的调用;onError和onCompleted可能在没有请求的情况下出现。给request方法传递参数Long.MAX_VALUE可以理解为禁用背压, request(100);理解为开启背压,观察者向事件发射源请求100条数据。最后谁会实现Producer接口我们先不去管它 继续往下看。

再回到 Subscriber 我们看 request(n)方法

protected final void request(long n) {

if (n < 0) {

throw new IllegalArgumentException("number requested cannot be negative: " + n);

}

// if producer is set then we will request from it

// otherwise we increase the requested count by n

Producer producerToRequestFrom;

synchronized (this) {

if (producer != null) {

producerToRequestFrom = producer;

} else {

addToRequested(n);

return;

}

}

// after releasing lock (we should not make requests holding a lock)

producerToRequestFrom.request(n);

}

传入的n肯定是要从数据源请求的数据数量,最大值为Long.MAX_VALUE,超过最大值的数据可能会丢弃掉,如果设置了producer,则调用producer的request方法请求数据,反之,则根据n调用 addToRequested()设置requested变量,requested没set过,则直接赋值为n,set过则大小增加n,最后判断是否大于上限Long.MAX_VALUE。

private void addToRequested(long n) {

if (requested == NOT_SET) {

requested = n;

} else {

final long total = requested + n;

// check if overflow occurred

if (total < 0) {

requested = Long.MAX_VALUE;

} else {

requested = total;

}

}

}

剩下最后一个方法 setProducer

public void setProducer(Producer p) {

long toRequest;

boolean passToSubscriber = false;

synchronized (this) {

toRequest = requested;

producer = p;

if (subscriber != null) {

// middle operator ... we pass through unless a request has been made

if (toRequest == NOT_SET) {

// we pass through to the next producer as nothing has been requested

passToSubscriber = true;

}

}

}

// do after releasing lock

if (passToSubscriber) {

subscriber.setProducer(producer);

} else {

// we execute the request with whatever has been requested (or Long.MAX_VALUE)

if (toRequest == NOT_SET) {

producer.request(Long.MAX_VALUE);

} else {

producer.request(toRequest);

}

}

}

不论subscriber是否有被set过,producer都会被赋值,如果subscriber有被set过且request未被调用过,会执行 producer.request(Long.MAX_VALUE);反之,如果subscriber未set且request有被调用过会执行 producer.request(toRequest); 虽然能读懂Subscriber源码含义,但是单独看是没什么用,也要结合Rxjava整体调用来看。

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: