Kotlin中协程的Flow异步流(二)

flow的背压flow的操作符过渡流操作符末端流操作符组合多个流展平流流的异常处理流的完成

flow的背压

buffer(),并发运行流中发射元素的代码。conflate(), 合并发射项,不对每个值进行处理。collectLatest(),取消并重新发射最后一个值。当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但是buffer函数显式地请求缓冲而不改变执行上下文。

fun simpleFlow8() = flow {

for (i in 1..3) {

delay(100)

emit(i)

println("Emitting $i ${Thread.currentThread().name}")

}

}

@Test

fun `test flow back pressure`() = runBlocking {

val time = measureTimeMillis {

simpleFlow8()

// .flowOn(Dispatchers.Default) //切换线程

// .buffer(50) //增加缓存

// .conflate()

.collectLatest { value ->

// .collect { value ->

delay(300) //处理这个元素消耗300ms

println("Collected $value ${Thread.currentThread().name}")

}

}

println("Collected in $time ms")

}

flow的操作符

过渡流操作符

可以使用操作符转换流,就像使用集合与序列一样。过渡操作符应用于上游流,并返回下游流。这些操作符也是冷操作符,就像流一样。这类操作符本身不是挂起函数。它运行的速度很快,返回新的转换流的定义。

suspend fun performRequest(request: Int): String {

delay(1000)

return "respone $request"

}

@Test

fun `test transform flow operator`() = runBlocking {

(1..3).asFlow()

.map { request -> performRequest(request) }

.collect { value -> println(value) }

(1..3).asFlow().transform { request ->

emit("Making request $request")

emit(performRequest(request))

}

.collect { value -> println(value) }

}

fun numbers() = flow {

try {

emit(1)

emit(2)

println("This line will not execute")

emit(3)

} finally {

println("Finally in numbers")

}

}

//限长操作符

@Test

fun `test limit length operator`() = runBlocking {

numbers().take(2) //只收集两个元素

.collect { value -> println(value) }

}

末端流操作符

末端操作符是在流上用于启动流收集的挂起函数。collect是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符: 1.转化为各种集合,例如toList与toSet. 2.获取第一个(first)值与确保流发射单个(single)值的操作符。 3.使用reduce与fold将流规约到单个值。

@Test

fun `test terminal operator`() = runBlocking {

val sum = (1..5).asFlow()

.map { it * it }

.reduce { a, b -> a + b }

println(sum)

}

组合多个流

就像Kotlin标准库中的Sequence.zip扩展函数一样,流拥有一个zip操作符用于组合两个流中的相关值。

@Test

fun `test zip`() = runBlocking {

val numbers = (1..3).asFlow().onEach { delay(300) }

val strs = flowOf("One", "Two", "Three").onEach { delay(400) }

val startTime = System.currentTimeMillis()

numbers.zip(strs) { a, b -> "$a ->$b" }.collect {

println("$it at ${System.currentTimeMillis() - startTime} ms from start")

}

}

展平流

流表示异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不用的展平模式,因此,存在一系列的流展平操作符: 1.flatMapConcat连接模式, 2.flatMapMerge合并模式, 3.flatMapLatest最新展平模式。

fun requestFlow(i: Int) = flow {

emit("$i:First")

delay(500)

emit("$i:Second")

}

@Test

fun `test flatMapConcat`() = runBlocking {

val startTime = System.currentTimeMillis()

(1..3).asFlow().onEach { delay(100) }

// .map { requestFlow(it) }

.flatMapConcat { requestFlow(it) }

.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }

}

@Test

fun `test flatMapMerge`() = runBlocking {

val startTime = System.currentTimeMillis()

(1..3).asFlow().onEach { delay(100) }

// .map { requestFlow(it) }

.flatMapMerge { requestFlow(it) }

.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }

}

@Test

fun `test flatMapLatest`() = runBlocking {

val startTime = System.currentTimeMillis()

(1..3).asFlow().onEach { delay(100) }

// .map { requestFlow(it) }

.flatMapLatest { requestFlow(it) }

.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }

}

流的异常处理

当运算符中的发射器或代码抛出异常时,有几种处理异常的方法:

try/catch块catch函数

fun simpleFlow10() = flow {

for (i in 1..3) {

println("Emitting $i")

emit(i)

}

}

@Test

fun `test flow exception`() = runBlocking {

try {

simpleFlow10().collect { value ->

println(value)

check(value <= 1) { "Collected $value" }

}

} catch (e: Throwable) {

println("Caught $e")

}

}

@Test

fun `test flow exception2`() = runBlocking {

flow {

throw ArithmeticException("Div 0")

emit(1)

}.catch { e: Throwable ->

println("Caught $e")

emit(10)

}

.flowOn(Dispatchers.IO)

.collect { println(it) }

}

流的完成

当流收集完成时(普通情况或异常情况),它可能需要执行一个动作。

命令式finally块onCompletion声明式处理

fun simpleFlow11() = (1..3).asFlow()

@Test

fun `test flow complete in finally`() = runBlocking {

try {

simpleFlow11().collect { println(it) }

} finally {

println("Done")

}

}

fun simpleFlow12() = flow {

emit(1)

throw RuntimeException()

}

@Test

fun `test flow complete in onCompletion`() = runBlocking {

/*simpleFlow11()

.onCompletion { println("Done") }

.collect { println(it) }*/

simpleFlow12()

.onCompletion { exception ->

if (exception != null) println("Flow completed exceptionall")

}

.catch { exception -> println("Caught $exception") }//捕获上游异常

.collect { println(it) }

//捕获下游收集时的异常

simpleFlow12().onCompletion { exception ->

if (exception != null) println("Flow completed exceptionall")

} .catch { exception -> println("Caught $exception") }//捕获上游异常

.collect { value ->

println(value)

check(value <= 1) { "Collected $value" }

}

}

精彩内容

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: