Kotlin 笔记之 Flow
前言
Flow
是 Kotlin 协程库中的库,用于异步返回多个值,官方介绍是参考RxJava
等响应式流实现的,但是“拥有尽可能简单的设计,对 Kotlin 以及挂起友好且遵从结构化并发”。本文主要参考Flow 中文文档,梳理了学习过程中的要点和理解,以便日后查验。
正文
对于异步返回多个值的需求,集合(如List
等)只能一次性返回多个值,而序列( Sequence
)只支持阻塞代码,Flow
则支持挂起函数异步返回多个值。
创建 Flow
flow{...}
fun simple(): Flow<Int> = flow<Int> { for (i in 1..3) { delay(100) // 假装我们异步等待了 100 毫秒,也可以用 Thread.sleep() 但是会阻塞当前线程 emit(i) // 发射下一个值 } }
.asFlow()
fun simple(): Flow<Int> = (1..10).asFlow()
flowOf{}
fun simple(): Flow<Int> = flowOf(1, 2, 3, 4, 5)
因为流只会在被收集的时候才会被启动(指执行类似flow{...}
中的内容),所以上述simple()
在被调用时会尽快返回且不等待,所以无需suspend
修饰。
流的收集/末端流操作符
collect{...}
收集emit
发送的值配合
onEach{}
可以将collect
中执行的代码放到onEach
中。collectLatest{...}
收集emit
发送的值,但每次新的emit
到来时,取消之前的收集器,创建新的收集器(用新的值执行{...}
中的代码)launchIn
指定在单独的协程中启动流的收集,这样就可以立即继续进一步执行代码,不会挂起后面的协程代码。single()
只接受 flow 发送的一个值,0 个或多个都会报错first{...}
查找符合条件的第一个值reduce()
求和fold(initial,{...})
在初始值initial
的基础上求和toList
、toSet
过渡流操作符
过渡操作符应用于上游流,并返回下游流。就像流一样。这类操作符本身不是挂起函数。它运行的速度很快,返回新的转换流的定义。
map{}
filter{}
take(n)
限长操作符,只取前 n 个发射的值
流上下文
流默认运行在收集器提供的上下文中,但是可以通过flowOn
更改:
fun simple(): Flow<Int> = flow {
...
}.flowOn(Dispatchers.Default) // 在流构建器中改变消耗 CPU 代码上下文的正确方式
展平流
将嵌套有Flow
的Flow
(如Flow<Flow<String>>
)展平为单个流(如Flow<String>
)。
flatMapConcat
将收集到的流交给{...}
处理后,等待内部流处理完毕后,再去请求下一个流flatMapMerge
先顺序收集所有流,再同时收集结果流flatMapLatest{...}
类似于collectLatest{...}
,在新流发出的时候,立即取消{...}
中所有的代码flattenConcat
依次展平流flattenMerge{...}
并发拼接,先执行{...}
中的方法,再执行collect
等方法,顺序会乱。
异常处理
try/catch
fun simple(): Flow<Int> = flow { for (i in 1..3) { println("Emitting $i") emit(i) // 发射下一个值 } } fun main() = runBlocking<Unit> { try { simple().collect { value -> println(value) check(value <= 1) { "Collected $value" } } } catch (e: Throwable) { println("Caught $e") } }
catch()
透明捕获:只捕获上游异常,其之后的异常不会被处理。
simple() .catch { e -> emit("Caught $e") } // 发射一个异常 .collect { value -> println(value) //此处如有异常,不会被 catch 捕获 }
声明式捕获:将
collect
的代码移动到onEach
中,将其放到catch
之前,从而使其被catch
捕获。simple() .onEach { value -> check(value <= 1) { "Collected $value" } //此处异常会被 catch 捕获 println(value) } .catch { e -> println("Caught $e") } .collect()
流取消
flow { ... }
创建的流的繁忙循环默认可以取消- 其他流如果需要取消,可以添加
.onEach { currentCoroutineContext().ensureActive() }
或者.cancellable()
流完成
命令式
try { simple().collect { value -> println(value) } } finally { println("Done") //监听流完成 }
声明式
simple() .onCompletion { println("Done") } //监听流完成,在 collect 执行结束后才执行 .collect { value -> println(value) }
onCompletion
的可空参数Throwable
可以用于确定流收集是正常完成(为null
)还是有异常发生。他不会处理异常。
其余操作
buffer()
缓冲发射项,收集完成后再传给下一步conflate()
合并发射项,会丢弃来不及处理的中间值,只获取并处理最新的值zip()
合并两个流的值,两个流中的值一一对应例如
(1,2,3) 3s发射一次,(a,b,c) 4s发射一次
直接拼接,合并之后为(1a,2b,3c)
combine()
结合两个流的值,任意一个流中的值发生变化都会触发执行计算例如
(1,2,3) 3s发射一次,(a,b,c) 4s发射一次
直接拼接,合并之后为(1a,2a,2b,3b,3c)