Kotlin中协程的Flow异步流(二)
flow的背压flow的操作符过渡流操作符末端流操作符组合多个流展平流流的异常处理流的完成
flow的背压
buffer(),并发运行流中发射元素的代码。conflate(), 合并发射项,不对每个值进行处理。collectLatest(),取消并重新发射最后一个值。当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但是buffer函数显式地请求缓冲而不改变执行上下文。
fun simpleFlow8() = flow
for (i in 1..3) {
delay(100)
emit(i)
println("Emitting $i ${Thread.currentThread().name}")
}
}
@Test
fun `test flow back pressure`() = runBlocking
val time = measureTimeMillis {
simpleFlow8()
// .flowOn(Dispatchers.Default) //切换线程
// .buffer(50) //增加缓存
// .conflate()
.collectLatest { value ->
// .collect { value ->
delay(300) //处理这个元素消耗300ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
flow的操作符
过渡流操作符
可以使用操作符转换流,就像使用集合与序列一样。过渡操作符应用于上游流,并返回下游流。这些操作符也是冷操作符,就像流一样。这类操作符本身不是挂起函数。它运行的速度很快,返回新的转换流的定义。
suspend fun performRequest(request: Int): String {
delay(1000)
return "respone $request"
}
@Test
fun `test transform flow operator`() = runBlocking
(1..3).asFlow()
.map { request -> performRequest(request) }
.collect { value -> println(value) }
(1..3).asFlow().transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { value -> println(value) }
}
fun numbers() = flow
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
//限长操作符
@Test
fun `test limit length operator`() = runBlocking
numbers().take(2) //只收集两个元素
.collect { value -> println(value) }
}
末端流操作符
末端操作符是在流上用于启动流收集的挂起函数。collect是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符: 1.转化为各种集合,例如toList与toSet. 2.获取第一个(first)值与确保流发射单个(single)值的操作符。 3.使用reduce与fold将流规约到单个值。
@Test
fun `test terminal operator`() = runBlocking
val sum = (1..5).asFlow()
.map { it * it }
.reduce { a, b -> a + b }
println(sum)
}
组合多个流
就像Kotlin标准库中的Sequence.zip扩展函数一样,流拥有一个zip操作符用于组合两个流中的相关值。
@Test
fun `test zip`() = runBlocking
val numbers = (1..3).asFlow().onEach { delay(300) }
val strs = flowOf("One", "Two", "Three").onEach { delay(400) }
val startTime = System.currentTimeMillis()
numbers.zip(strs) { a, b -> "$a ->$b" }.collect {
println("$it at ${System.currentTimeMillis() - startTime} ms from start")
}
}
展平流
流表示异步接收的值序列,所以很容易遇到这样的情况:每个值都会触发对另一个值序列的请求,然而,由于流具有异步的性质,因此需要不用的展平模式,因此,存在一系列的流展平操作符: 1.flatMapConcat连接模式, 2.flatMapMerge合并模式, 3.flatMapLatest最新展平模式。
fun requestFlow(i: Int) = flow
emit("$i:First")
delay(500)
emit("$i:Second")
}
@Test
fun `test flatMapConcat`() = runBlocking
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
// .map { requestFlow(it) }
.flatMapConcat { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
@Test
fun `test flatMapMerge`() = runBlocking
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
// .map { requestFlow(it) }
.flatMapMerge { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
@Test
fun `test flatMapLatest`() = runBlocking
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) }
// .map { requestFlow(it) }
.flatMapLatest { requestFlow(it) }
.collect { println("$it at ${System.currentTimeMillis() - startTime} ms from start") }
}
流的异常处理
当运算符中的发射器或代码抛出异常时,有几种处理异常的方法:
try/catch块catch函数
fun simpleFlow10() = flow
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
@Test
fun `test flow exception`() = runBlocking
try {
simpleFlow10().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
@Test
fun `test flow exception2`() = runBlocking
flow {
throw ArithmeticException("Div 0")
emit(1)
}.catch { e: Throwable ->
println("Caught $e")
emit(10)
}
.flowOn(Dispatchers.IO)
.collect { println(it) }
}
流的完成
当流收集完成时(普通情况或异常情况),它可能需要执行一个动作。
命令式finally块onCompletion声明式处理
fun simpleFlow11() = (1..3).asFlow()
@Test
fun `test flow complete in finally`() = runBlocking
try {
simpleFlow11().collect { println(it) }
} finally {
println("Done")
}
}
fun simpleFlow12() = flow
emit(1)
throw RuntimeException()
}
@Test
fun `test flow complete in onCompletion`() = runBlocking
/*simpleFlow11()
.onCompletion { println("Done") }
.collect { println(it) }*/
simpleFlow12()
.onCompletion { exception ->
if (exception != null) println("Flow completed exceptionall")
}
.catch { exception -> println("Caught $exception") }//捕获上游异常
.collect { println(it) }
//捕获下游收集时的异常
simpleFlow12().onCompletion { exception ->
if (exception != null) println("Flow completed exceptionall")
} .catch { exception -> println("Caught $exception") }//捕获上游异常
.collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
}
精彩内容
发表评论