Kotlin 协程源码阅读笔记 —— Flow
作者:访客发布时间:2023-12-31分类:程序开发学习浏览:335
Kotlin 协程源码阅读笔记 —— Flow
Flow 就是 Kotlin 协程中的流,我们可以通过它在 Kotlin 中进行写出流式代码,大名鼎鼎的 RxJava 就是流式编程的库(不过我遇到很多的人他们只会用 RxJava 切线程😂),在 Kotlin 的早期 Flow 的各种操作符相对于 RxJava 比较少,不过现在 Flow 的操作符也是非常的丰富,RxJava 中常用的操作符在 Flow 中几乎都能够找到功能类似的操作符。所以如果项目中的 RxJava 现在也可以无缝迁移到 Flow。Flow 属于冷流(如果不知道冷流,热流的同学建议去网上找找相关的概念),对应到 RxJava 中的就是 Observable,Flowable,Single,Maybe 和 Completable 等等。Kotlin 协程中的热流实现是 Channel,MutableSharedFlow 和 MutableStateFlow 等等,对应到 RxJava 中的热流是 PublisherSubject 和 BehaviorSubject。
本篇文章主要是介绍 Kotlin 协程中的冷流 Flow,其他的热流相关的类考虑后面的文章再介绍。
源码阅读基于 Kotlin 协程 1.8.0-RC2
Flow 的简单使用
我在以下的代码就构建了一个简单的 Flow:
val myFlow = flow {
repeat(10) {
emit(it)
}
}
我的上面我创建的 Flow 会发送 10 个元素,依次从 0 到 9。
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(10) {
emit(it)
}
}
launch {
myFlow.collect {
println("Coroutine1: $it")
}
}
launch {
myFlow.collect {
println("Coroutine2: $it")
}
}
}
协程1和协程2都通过 Flow#collect() 来订阅 Flow,协程1和协程2之间的订阅他们之间是相互不影响的,他们的订阅都会触发一次 flow() 方法中的 Lambda 调用,而对应的 flow() 中的 Lambda 中每调用一次 emit() 方法都会触发一次 collect() 方法订阅时传递的 Lambda。
Flow 也有一些限制,flow() 的 Lambda 中调用 emit() 方法时不能够修改原来的 CoroutineContext,比如切换 ContinuationContext,假如我把上面的代码改成下面这样就会抛出异常(如果想要绕过这个限制可以使用 ChannelFlow):
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(10) {
withContext(Dispatchers.IO) {
emit(it)
}
}
}
launch {
myFlow.collect {
println("Coroutine1: $it")
}
}
}
我将上面的代码修改成使用 ChannelFlow 就可以正常工作了:
runBlocking(Dispatchers.Default) {
val myFlow = channelFlow {
repeat(10) {
withContext(Dispatchers.IO) {
channel.send(it)
}
}
}
launch {
myFlow.collect {
println("Coroutine1: $it")
}
}
}
Flow 还有一个限制就是如果 collect() 的 Lambda 方法中出现异常后,就算在 emit() 方法调用时将它 catch 掉,后续的使用还是会继续报错,简单来说就是 collect() 的 Lambda 方法中报错了后那么这次 collect() 订阅就结束了,不能够再使用,我继续把上面的代码改成下面这样,也是不能够使用的:
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(10) {
try {
emit(it)
} catch (e: Throwable) {
emit(233)
}
}
}
launch {
myFlow.collect {
if (it == 2) {
error("Test Error")
}
println("Coroutine1: $it")
}
}
}
Flow 工作原理
我们先看看 flow() 方法的源码:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
直接创建一个 Flow 的实现类 SafeFlow,它继承于 AbstractFlow,订阅时调用的是 collect() 方法,我们看看 AbstractFlow#collect() 方法的实现:
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
collect() 方法中将外部传入的 FlowCollector 使用 SafeCollector 代理了,然后传递给 SafeFlow#collectSafely() 方法,我们看看它的实现:
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
上面的代码也是朴实无华,直接通过 SafeCollector 调用 flow() 方法中传入的方法,我们想要发送元素时也是通过 emit() 方法来完成,我们看看 SafeCollector#emit() 方法的实现(如果不被 SafeCollector 对象代理,这个时候我们直接调用 collect() 方法中传入的 FlowCollector 对象的 emit() 方法就能够完成 Flow 向订阅者的通信了,有一个 unsafeFlow 就是这么实现的):
actual override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
emit(uCont, value)
} catch (e: Throwable) {
lastEmissionContext = DownstreamExceptionContext(e, uCont.context)
throw e
}
}
}
首先拿到对应的协程的 Continuation 对象,然后再调用 emit() 方法,注意这里如果调用 emit() 方法报错时会将当前的 CoroutineContext 和错误保存在 DownstreamExceptionContext 对象中,然后赋值给 lastEmissionContext,lastEmissionContext 是用来保存上次 emit() 的 CoroutineContext 的,如果上次的 lastEmissionContext 是 DownstreamExceptionContext 对象就表示上次的 emit() 调用出了异常,后面我们会看到处理 DownstreamExceptionContext 的代码,我们再来看看这个非 suspend 的 emit() 方法:
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
val currentContext = uCont.context
currentContext.ensureActive()
// This check is triggered once per flow on a happy path.
val previousContext = lastEmissionContext
// 当前的 Context 和 上次的 Context 不一样,需要检查 Context
if (previousContext !== currentContext) {
checkContext(currentContext, previousContext, value)
lastEmissionContext = currentContext
}
completion_ = uCont
// 这里就是直接调用被代理的 FlowCollector 的 emit() 方法。
val result = emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
/*
* If the callee hasn't suspended, that means that it won't (it's forbidden) call 'resumeWith` (-> `invokeSuspend`)
* and we don't have to retain a strong reference to it to avoid memory leaks.
*/
if (result != COROUTINE_SUSPENDED) {
completion_ = null
}
return result
}
如果前一次调用 emit() 的 CoroutineContext 和当前的 CoroutineContext 不是同一个实例,就需通过 checkContext() 方法检查 CoroutineContext 是否有错(也就是我们上面测试代码中的两个限制), 然后就直接调用被代理的 FlowCollector 的 emit() 方法,也就是我们订阅时传入的 Lambda 方法。
我们来看看 checkContext() 方法是如何检查 CoroutineContext 的:
private fun checkContext(
currentContext: CoroutineContext,
previousContext: CoroutineContext?,
value: T
) {
// 如果上次 emit 由错误,这次的 emit 也是直接抛出异常
if (previousContext is DownstreamExceptionContext) {
exceptionTransparencyViolated(previousContext, value)
}
// 检查当前的 CoroutineContext 和 collect() 方法中的是否一致
checkContext(currentContext)
}
private fun exceptionTransparencyViolated(exception: DownstreamExceptionContext, value: Any?) {
error("""
Flow exception transparency is violated:
Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
""".trimIndent())
}
internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
val result = currentContext.fold(0) fold@{ count, element ->
val key = element.key
val collectElement = collectContext[key]
if (key !== Job) {
return@fold if (element !== collectElement) Int.MIN_VALUE
else count + 1
}
val collectJob = collectElement as Job?
val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob)
if (emissionParentJob !== collectJob) {
error(
"Flow invariant is violated:\n" +
"\t\tEmission from another coroutine is detected.\n" +
"\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +
"\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" +
"\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
)
}
if (collectJob == null) count else count + 1
}
if (result != collectContextSize) {
error(
"Flow invariant is violated:\n" +
"\t\tFlow was collected in $collectContext,\n" +
"\t\tbut emission happened in $currentContext.\n" +
"\t\tPlease refer to 'flow' documentation or use 'flowOn' instead"
)
}
}
上面的代码比较简单,如果上次的 emit() 有异常,直接抛出异常;然后检查当前的 CoroutineContext 和调用 collect() 方法传入的是否一致,如果不一致就抛出异常,具体如何判断 CoroutineContext 是否一致,大家自己看看上面的代码,我就不解释了。
总的来说 Flow 本身的实现是非常简单的,如果不是被 SafeCollector 代理去检查异常和 CoroutineContext 它的代码会更加简单。在调用 collect() 方法订阅时,会创建一个 FlowCollector 对象(Lambda),collect() 方法同时会触发构建 Flow 时 flow() 方法中的 Lambda 的调用,在 flow() 方法中的 Lambda 就通过 collect() 时传递的 FlowCollector 的 emit() 方法来向其发送消息。
Flow 中还有非常丰富的操作符方法,我们继续分析一些有代表性的操作符。
map() 操作符
map 操作符可以说是非常的简单了,我就不介绍它的具体使用了,我们直接看对应的方法:
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
return@transform emit(transform(value))
}
我们看到这里调用了 transform() 方法,然后在 transform() 方法中传入了一个 Lambda,在这个方法中又会调用 map() 方法传入的参数来对原来的元素进行转换,最后将转换后的元素通过 emit() 方法再发送给下游。这里要再说明一下,很多的操作符都是依赖于这个 transform() 方法,比如 filter()、filterNot()、filterIsInstance()、filterNotNull()、mapNotNull()、withIndex() 和 onEatch 等等。所以我们看懂了 transform() 方法后上面的基于它实现的操作符也都很容易理解了。
public inline fun <T, R> Flow<T>.transform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
collect { value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
return@collect transform(value)
}
}
在看 Flow 流的操作符时,我们先定一个概念:转换前的流通常称为上游(Upstream),通常操作符处理后需要构建一个新的流,而新的流在对原来的流的元素处理后再发送给下游(Downstream)。
我们继续看 transform() 方法,它首先通过 flow() 方法构建一个新的流,构建新的流的 Lambda 中会订阅上游的流,收到上游的数据后通过 transform 参数再处理,在 map 操作符中就是对原来的数据做简单的转换,然后将转换后的数据通过新的 Flow 中的 FlowCollector#emit() 方法再发送出去。
flatMap() 类操作符
在 Flow 中原来的 flatMap() 操作符已经被弃用了,原来的 flatMap() 操作符用 flatMapConcat() 代替,根据不同的功能还衍生出了 flatMapMerge() 操作符和 flatMapLatest() 操作符,我相信很多人对这三个 flatMap 都搞得不是很清楚,所以在分析源码前要先介绍一下他们的区别。
flatMapConcat()、flatMapMerge() 、flatMapLatest() 之间的区别
flatMapConcat()
它相当于 RxJava 中的 concatMap() 操作符,flatMapConcat() 会将原来流的元素构建成一个新的流,而新的流中的元素数据一定会按照原来的流的元素的数据,可能说得有点抽象,我这里举一个例子。
fun main(args: Array<String>) {
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(3) {
emit(it)
}
}
launch {
myFlow
.flatMapConcat { upstreamValue ->
flow {
delay(1000L - upstreamValue * 100)
repeat(2) {
emit(upstreamValue * 10 + it)
}
}
}
.collect {
println(it)
}
}
}
}
最后得输出结果是:
0
1
10
11
20
21
我的测试代码中原来得 Flow 会发送 0 1 2 三个元素,然后通过 flatMapConcat() 将原来的一个元素转换成一个包含两个元素的 Flow。越是先发送的元素延迟的时间越长,虽然是这样但是还是按照原来的 Flow 的元素 0 1 2 构成的顺序处理转换后的 Flow。
flatMapMerge()
它相当于 RxJava 中的 flatMap() 操作符,和 flatMapConcat() 相比,它就不会保证原来的顺序,而是哪个流先处理完成就先发送数据。还是上面的例子,修改成 flatMapMerge() 操作符。
fun main(args: Array<String>) {
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(3) {
emit(it)
}
}
launch {
myFlow
.flatMapMerge { upstreamValue ->
flow {
delay(1000L - upstreamValue * 100)
repeat(2) {
emit(upstreamValue * 10 + it)
}
}
}
.collect {
println(it)
}
}
}
}
最后的输出结果就是:
20
21
10
11
0
1
flatMapMerge() 中还有一个非常重要的参数 concurrency,它的默认值是 16,他表示可以并行执行的 Flow 的数量,如果达到最大的并行值,后续的 Flow 就需要等待前面的并行的 Flow 执行完毕后才能执行。当 concurrency 为 1 时,处理的逻辑就是和 flatMapConcat() 一样,也就是每个 Flow 必须排队执行。我们将上面的代码的 concurrency 修改成 2 来试试:
fun main(args: Array<String>) {
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(3) {
emit(it)
}
}
launch {
myFlow
.flatMapMerge(2) { upstreamValue ->
flow {
delay(1000L - upstreamValue * 100)
repeat(2) {
emit(upstreamValue * 10 + it)
}
}
}
.collect {
println(it)
}
}
}
}
然后输出的结果就是:
10
11
0
1
20
21
flatMapLatest()
它相当于 RxJava 中的 swtichMap() 操作符,前面没有执行完成的 Flow 会被取消,然后被后续的 Flow 替换,还是上面的例子修改成 flatMapLatest() 操作符:
fun main(args: Array<String>) {
runBlocking(Dispatchers.Default) {
val myFlow = flow {
repeat(3) {
emit(it)
}
}
launch {
myFlow
.flatMapLatest { upstreamValue ->
flow {
delay(1000L - upstreamValue * 100)
repeat(2) {
emit(upstreamValue * 10 + it)
}
}
}
.collect {
println(it)
}
}
}
}
最后的输出结果是:
20
21
flatMapConcat() 操作符
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
map(transform).flattenConcat()
通过 map() 操作符将原来的流的元素转换成 Flow,然后通过 flattenConcat() 处理 Flow<Flow<R>>,最后返回的是 Flow<R>,我们再看看 flattenConcat() 是如何处理的:
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value -> emitAll(value) }
}
处理非常简单,构建一个新的 Flow,这个 Flow 在被订阅的时候它会再去订阅他的上游,上游的类型是 Flow<Flow<R>>,所以上游发送来的数据类型是 Flow<R>,新的 Flow 将上游发送来的数据再通过他的 emitAll() 方法直接将这个 Flow<R> 中的数据发送给下游,我们再看看 FlowCollector#emitAll() 方法的实现:
public suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) {
ensureActive()
flow.collect(this)
}
处理也是非常的简单,直接使用新构建的 Flow 中的 FlowCollector 来收集参数中的 flow。通过上面的方法来收集每个上游中的 Flow<R> 元素时,需要每个 Flow<R> 处理完成后才会去处理下一个,等待中的 Flow<R> 元素是挂起状态。虽然上面的代码很少,但是逻辑是有点绕的,需要你去多多理解一下。
flatMapMerge() 操作符
flatMapMerge() 操作符的实现可要比 flatMapConcat() 操作符要复杂多了,整理好心情我们继续。
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R> =
map(transform).flattenMerge(concurrency)
这个 concurrency 参数表示可以并行执行的 Flow 的数量,默认是 16,前面我讲过,这个值如果是 1,就和 flatMapConcat() 的处理方式一样。我们看看 flattenMerge() 方法的实现:
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}
concurrency 必须大于 0;如果 concurrency 为 1,那么用 flattenConcat() 处理(上面已经分析过这个方法了);其他情况使用 ChannelFlowMerge 来处理。ChannelFlowMerge 它是一个 ChannelFlow,上面我们讲 Flow 使用的时候有讲到 ChannelFlow,在很多的地方都有使用到 ChannelFlow,我们也借助分析 ChannelFlowMerge 时来理解 ChannelFlow 的工作方式。
首先 ChannelFlow 也是一个 Flow,我们从上面也知道,触发订阅 Flow,是通过它的 collect() 方法,那么我们直接看看 ChannelFlow#collect() 方法的实现:
override suspend fun collect(collector: FlowCollector<T>): Unit =
coroutineScope {
collector.emitAll(produceImpl(this))
}
首先通过 coroutineScope() 方法构建一个新的 CoroutineScope(通过 CoroutineScope 也就可以启动新的协程),然后通过 FlowCollector#emitAll() 方法将 produceImpl() 返回的结果全部发送过去,produceImpl() 的返回结果是一个 Channel(),我们看看 FlowCollector#emitAll() 方法是如何处理 Channel 的。
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Unit =
emitAllImpl(channel, consume = true)
private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
ensureActive()
var cause: Throwable? = null
try {
for (element in channel) {
emit(element)
}
} catch (e: Throwable) {
cause = e
throw e
} finally {
if (consume) channel.cancelConsumed(cause)
}
}
上面代码非常简单,遍历 channel 中的元素,然后通过 emit() 方法发送给下游,最后将 channel 关闭。
OK,我们继续看看 ChannelFlow#produceImpl() 方法是怎么生成 Channel 的:
public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)
继续调用 CoroutineScope#produce() 方法,这里要注意一下,上面的 collectToFun 是 collectTo() 函数,它是一个抽象函数,是由 ChannelFlowMerge#collectTo() 实现。
继续看看 produce() 方法:
internal fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
val channel = Channel<E>(capacity, onBufferOverflow)
val newContext = newCoroutineContext(context)
val coroutine = ProducerCoroutine(newContext, channel)
if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
coroutine.start(start, coroutine, block)
return coroutine
}
首先构建一个 Channel 实例,然后构建一个 ProducerCoroutine 协程,然后启动一个协程,我在前面的文章中有介绍过协程启动的基本流程,不熟悉这个流程的同学可以看看我的这篇文章:Kotlin 协程源码阅读笔记 —— 协程工作原理,协程开始执行时的入口方法就是那个 block,也就是我们上面说到的 ChannelFlowMerge#collectTo(),注意我们的协程 ProducerCoroutine,他也是一个 Channel,而真正的实现是方法开始时构建的 Channel。我们继续看看 ChannelFlowMerge#collectTo() 方法的实现:
override suspend fun collectTo(scope: ProducerScope<T>) {
val semaphore = Semaphore(concurrency)
val collector = SendingCollector(scope)
val job: Job? = coroutineContext[Job]
flow.collect { inner ->
/*
* We launch a coroutine on each emitted element and the only potential
* suspension point in this collector is `semaphore.acquire` that rarely suspends,
* so we manually check for cancellation to propagate it to the upstream in time.
*/
job?.ensureActive()
semaphore.acquire()
scope.launch {
try {
inner.collect(collector)
} finally {
semaphore.release() // Release concurrency permit
}
}
}
}
上面的参数 ProduceScope 其实就是上面的 ProducerCoroutine,他是一个 Channel;然后会构建一个 Semaphore 用来控制并行的 Channel 的数量,这个不是 Java 中的 Semaphore,我在讲 Mutex 时有讲这个对象,感兴趣的同学可以参考我前面的文章: Kotlin 协程源码阅读笔记 —— Mutex;然后订阅上游的流,类型是 Flow<Flow<R>>,收到的元素类型是 Flow<R>,收到元素后通过 Semaphore 来控制并行数量,如果可以执行,通过 launch() 方法来创建一个新的协程来订阅这个 Flow<R> 中的数据,对应的 FlowCollector 实现是 SendingCollector,我们来看看它的实现:
public class SendingCollector<T>(
private val channel: SendChannel<T>
) : FlowCollector<T> {
override suspend fun emit(value: T): Unit = channel.send(value)
}
简单高效,直接把上游发送来的数据,通过 Channel 发送出去,根据上面的代码我们知道,这个 Channel 中的数据最终会被发送给下游。
到这里 flatMapMerge() 操作符就分析完毕了,你看懂了吗?没有看懂可以再看一遍。
flatMapLatest() 操作符
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
transformLatest { emitAll(transform(it)) }
flatMapLatest() 借助了 transformLatest() 方法,在他的 Lambda 中直接将返回的 Flow<R> 中的元素发送给下游。
public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
ChannelFlowTransformLatest(transform, this)
这里实现是通过 ChannelFlowTransformLatest 对象,它也是一个 ChannelFlow,它是继承于 ChannelFlowOperator,ChannelFlowOperator 继承于 ChannelFlow。由上面分析 ChannleFlow 我们直到最终的订阅时会构建一个协程,然后协程执行开始时的函数时 collectTo() 方法,所以我们也从 ChannelFlowOperator#collectTo() 方法开始分析:
protected override suspend fun collectTo(scope: ProducerScope<T>) =
flowCollect(SendingCollector(scope))
先构建了一个 SendingCollector 对象(前面有分析了),然后调用 flowCollect() 方法,它是一个抽象方法,我们看看 ChannelFlowTransformLatest#flowCollect() 的实现:
override suspend fun flowCollect(collector: FlowCollector<R>) {
assert { collector is SendingCollector } // So cancellation behaviour is not leaking into the downstream
coroutineScope {
var previousFlow: Job? = null
flow.collect { value ->
previousFlow?.apply {
cancel(ChildCancelledException())
join()
}
// Do not pay for dispatch here, it's never necessary
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
collector.transform(value)
}
}
}
}
它处理每一个 Flow<R> 也是创建一个新的协程,当有新的 Flow<R> 元素来的时候,就会把上次的处理 Flow<R> 的任务取消。
flowOn() 操作符
我们通常使用 flowOn() 操作符来切换 ContinuationInterceptor,通过这样来控制下游执行的线程,准确地说 flowOn() 是用来控制下游 CoroutineContext 的。 我们看看它的实现:
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
以上代码分为 3 种情况:
CoroutineContext为空,直接不处理。- 如果是
FusibleFlow就使用它的fuse()方法来返回一个新的Flow,巧了不是ChannelFlow就是继承于FusibleFlow。 - 其他情况通过
ChannelFlowOperatorImpl来处理,他也是一个ChannelFlow,和flatMapLatest()一样它的实现也是通过继承ChannelFlowOperator。
我们以 ChannelFlowMerge#fuse() 的实现来看看 fuse() 方法:
public override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): Flow<T> {
assert { capacity != Channel.CONFLATED } // CONFLATED must be desugared to (0, DROP_OLDEST) by callers
// note: previous upstream context (specified before) takes precedence
val newContext = context + this.context
val newCapacity: Int
val newOverflow: BufferOverflow
if (onBufferOverflow != BufferOverflow.SUSPEND) {
// this additional buffer never suspends => overwrite preceding buffering configuration
newCapacity = capacity
newOverflow = onBufferOverflow
} else {
// combine capacities, keep previous overflow strategy
newCapacity = when {
this.capacity == Channel.OPTIONAL_CHANNEL -> capacity
capacity == Channel.OPTIONAL_CHANNEL -> this.capacity
this.capacity == Channel.BUFFERED -> capacity
capacity == Channel.BUFFERED -> this.capacity
else -> {
// sanity checks
assert { this.capacity >= 0 }
assert { capacity >= 0 }
// combine capacities clamping to UNLIMITED on overflow
val sum = this.capacity + capacity
if (sum >= 0) sum else Channel.UNLIMITED // unlimited on int overflow
}
}
newOverflow = this.onBufferOverflow
}
if (newContext == this.context && newCapacity == this.capacity && newOverflow == this.onBufferOverflow)
return this
return create(newContext, newCapacity, newOverflow)
}
根据传入的参数构建一个新的 CoroutineContext,剩下的一大堆代码其实就是为了计算出一个新的 capacity,具体的计算逻辑大家自己看看,其实就是 Channel 中的容量,然后调用 create() 方法。我们看看 ChannelFlowMerge#create() 方法的实现:
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
ChannelFlowMerge(flow, concurrency, context, capacity, onBufferOverflow)
然后我们再看看 ChannelFlowOperatorImpl#flowCollect() 方法的实现:
override suspend fun flowCollect(collector: FlowCollector<T>) =
flow.collect(collector)
因为 flowCollect() 会工作在新的 CoroutineContext 的协程中,所以就不用特殊处理了,直接将上游的数据发送给下游就好了,下游就会工作在新的 CoroutineContext 中了。
最后
本篇文章介绍了 Flow 的基本工作原理和一些常用的操作符,如果你想知道某个操作符具体是用来干什么的,看源码比看注释可能更容易理解,操作符的源码本来也就不难,和我分析的操作符的方式也都是大同小异,本篇文章也是 2023 年的最后一篇文章,由它来作为 2023 年的句号,希望 2024 年能够更加好,祝大家新年快乐!!!!
- 程序开发学习排行
- 最近发表


