本文讨论几种不同方式合并RxJava的Observable数据流。

Observable介绍

Observable 序列,或简单称为Observable,表示异步数据流。这些概念遵循基于观察者模式,在该模式中,一个叫做观察者的对象订阅了Observable发出的数据。要使用RxJava需要增加依赖:

io.reactivex

rxjava

1.2.5

订阅是无阻塞的,因为观察者会对Observable未来发出的任何消息做出响应,这也又促进了并发性。下面是简单RxJava示例:

Observable

.from(new String[] { "John", "Doe" })

.subscribe(name -> System.out.println("Hello " + name))

合并Observable数据

使用响应式框架编程,常见场景是合并不同的Observable数据。举例,web应用中可能需要获得两组相互独立的异步数据流。为了避免等待前面数据流完成才请求下一个数据流,我们可以同时调用,然后订阅合并两个数据流。本节讨论几种不同方式合并多个Observable数据,并区别它们之间的差异。

Merge

使用Merge操作可以合并多个Observable数据为一个输出结果,示例代码如下:

@Test

public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() {

TestSubscriber testSubscriber = new TestSubscriber<>();

Observable.merge(

Observable.from(new String[] {"Hello", "World"}),

Observable.from(new String[] {"I love", "RxJava"})

).subscribe(testSubscriber);

testSubscriber.assertValues("Hello", "World", "I love", "RxJava");

}

MergeDelayError

mergeDelayError 方法与merge功能一致,但如果在合并过程中有错误发生,可以忽略错误继续合并,最后传播错误异常:

@Test

public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() {

TestSubscriber testSubscriber = new TestSubscriber<>();

Observable.mergeDelayError(

Observable.from(new String[] { "hello", "world" }),

Observable.error(new RuntimeException("Some exception")),

Observable.from(new String[] { "rxjava" })

).subscribe(testSubscriber);

testSubscriber.assertValues("hello", "world", "rxjava");

testSubscriber.assertError(RuntimeException.class);

}

上面示例输出结果,与没有错发发生结果一致:

hello

world

rxjava

注意,如果使用 merge 代替 mergeDelayError, 则字符串rxjava不会发出,因为merge遇到错误会立刻停止Observable数据流。

zip

zip 方法组合两个序列为成对(pair)数据序列:

@Test

public void givenTwoObservables_whenZipped_thenReturnCombinedResults() {

List zippedStrings = new ArrayList<>();

Observable.zip(

Observable.from(new String[] { "Simple", "Moderate", "Complex" }),

Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}),

(str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);

assertThat(zippedStrings).isNotEmpty();

assertThat(zippedStrings.size()).isEqualTo(3);

assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy");

}

Zip With Interval

下面示例给zip方法增加interrval参数,可以有效延迟第一个数据流推送数据元素:

@Test

public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() {

TestSubscriber testSubscriber = new TestSubscriber<>();

Observable data = Observable.just("one", "two", "three", "four", "five");

Observable interval = Observable.interval(1L, TimeUnit.SECONDS);

Observable

.zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData))

.toBlocking().subscribe(testSubscriber);

testSubscriber.assertCompleted();

testSubscriber.assertValueCount(5);

testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five");

}

总结

本文介绍几个合并RxJava的Observable数据流的方法。你还可以学习通过官方文档学习更多的方法:combineLatest, join, groupJoin, switchOnNext等。

好文阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: