联系我们
简单又实用的WordPress网站制作教学
当前位置:网站首页 > 程序开发学习 > 正文

Kotlin 协程源码阅读笔记 —— 协程工作原理

作者:访客发布时间:2023-12-26分类:程序开发学习浏览:343


导读:Kotlin协程源码阅读笔记——协程工作原理Kotlin协程在网上有很多的人把它吹得神乎其神,什么性能多么多么好,效率比线程高多少多少,balabala一堆优点。首先在...

Kotlin 协程源码阅读笔记 —— 协程工作原理

Kotlin 协程在网上有很多的人把它吹得神乎其神,什么性能多么多么好,效率比线程高多少多少,balabala 一堆优点。首先在我看来协程和线程压根儿就没有可比性,就好像说姚明和刘翔谁更厉害一样,线程是操作系统的调度的基本单位,线程也是 CPU 执行的一个基本任务;而协程只是在编程语言上定义的一种优化多线程通信、调度的一种编程方式(至少 Kotlin 中是这样),而操作系统可不认识什么是协程,而协程中的任务最终也是在线程上执行。
Kotlin 协程上来说它的最大的优点只有一个它能够以同步的方式来写异步代码,能够干掉编程中的地狱回调(通过类似于 RxJava 流的编程方式也能够干掉地狱回调,不过不是本篇文章中的讨论内容),而它的其他优点也都是这一个优点的发散,不要小瞧这个优点,如果消除了各种异步 Callback,能够在很大的程度上提高代码的可阅读性,减少 BUG 的产生,也更容易能够定位到 BUG

简单了协程的优点,后续就要看看它是怎么工作的了,我前面还写了一篇介绍 CoroutineContext 的文章,如果不熟悉它的同学可以先看看 Kotlin 协程源码阅读笔记 —— CoroutineContext,那么准备就绪后就开启今天的内容。

用 Callback 写异步任务

假如我有以下的异步任务:

val delayExecutor: ScheduledThreadPoolExecutor by lazy {
    ScheduledThreadPoolExecutor(1)
}

fun delay(time: Long, callback: () -> Unit) {
    delayExecutor.schedule({ callback() }, time.coerceAtLeast(0), TimeUnit.MILLISECONDS)
}

我用 delay() 来实现一个异步耗时任务,实现就是通过 delayExecutor 添加一个定时任务,任务执行时就调用 callback

我有以下代码要调用异步任务:

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        helloWorld {
            println(it)
        }
    }

    fun helloWorld(callback: (s: String) -> Unit) {
        hello { hello ->
            world { world ->
                callback("${hello}${world}")
            }
        }
    }

    fun hello(callback: (s: String) -> Unit) {
        delay(500) {
            callback("Hello, ")
        }
    }

    fun world(callback: (s: String) -> Unit) {
        delay(500) {
            callback("World!!")
        }
    }
}

我在 onCreate() 函数中调用了异步任务 helloWorld(),成功后会在 callback 中打印最后的结果。 helloWorld() 又由 hello()world() 两个任务组成,hello() 任务成功后在调用 world() 任务,最后结合 hello()world() 两个任务的结果的结果回调 helloWorld()callback

用协程写异步任务

同样是上面的异步任务,我用 Kotlin 协程改造一下:

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        val coroutineScope = CoroutineScope(Dispatchers.Default)
        coroutineScope.launch {
            println(helloWorld())
        }
    }

    suspend fun helloWorld(): String {
        val hello = hello()
        val world = world()
        return hello + world
    }

    suspend fun hello(): String {
        return delaSuspend(500, "Hello, ")
    }

    suspend fun world(): String {
        return delaSuspend(500, "World!!")
    }

    suspend fun <T> delaSuspend(time: Long, data: T): T {
        return suspendCancellableCoroutine { cont ->
            delay(time) {
                cont.resumeWith(Result.success(data))
            }
        }
    }
}

首先我把上面的 delay() 耗时任务的回调改造成了协程 suspend 方法,具体的改造参考 delaSuspend() 方法的实现。然后 hello()world() 也都是 suspend() 方法,他们都是通过调用 delaSuspend() 来模拟异步任务,在 helloWrold() 中分别调用 hello()world() 方法,这里都是以同步的方式调用的哦,然后组合他们的结果然后返回。在 onCreate() 新建一个协程,然后也是直接以同步的方式调用 helloWorld()

和改造前的代码有一个非常显著的特点就是消灭的所有的 callback,所有的异步任务都是以同步的方式调用的,你可能也会吐槽也没感觉比之前优化了多少,上面 demo 中的任务比较简单,总共才 3 个 callback,而且也没有处理异常的回调,callback 的层级最大也才 2。越复杂的任务协程的优势就会越大。

Kotlin 协程工作原理

虽然在源码中我们看到的协程代码是同步的,其实虚拟机执行的时候它还是一个不折不扣的 callback,这主要归功于 Kotlin 编译器的处理,我们就以上面的 demo 来一步一步分析它的工作方式。

CoroutineScope

如果要开启一个协程就需要先通过 CoroutineScope() 方法创建一个 CoroutineSope,在这个方法中可以指定我们的默认 CoroutineContext

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}

CoroutineScope 的接口非常简单,只需要实现一个 CoroutineContext

我们来看看 CoroutineScope() 方法的实现:

@Suppress("FunctionName")
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job())

internal class ContextScope(context: CoroutineContext) : CoroutineScope {
    override val coroutineContext: CoroutineContext = context
    // CoroutineScope is used intentionally for user-friendly representation
    override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
}

上面的代码非常简单就只是把我们添加的 CoroutineContext 设置到 CoroutineScope,这里注意添加了一个 JobCoroutineContext,每个协程启动时都会创建一个 Job 对象,这些由 CoroutineScope 启动的协程的 Job 都是 CoroutineScope 中的 Job 的子任务,而协程里面还可以再启动子协程,这个子协程的 Job 的父 Job 就是启动他的协程的 Job。所以通过 Job 就构成了一个任务的继承链。当父 Job 取消后他的子 Job 也会被取消。所以如果是 CoroutineScope 中的顶级父 Job 取消了,那么用他启动的所有的协程或者孙协程等等也都会被取消。CoroutineScope#cancel() 的方法实现就是通过调用 CoroutineContext 中的 Jobcancel() 方法实现的:

public fun CoroutineScope.cancel(cause: CancellationException? = null) {
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)
}

通常我们为了防止内存泄漏,在 Activity 或者 Fragment 或者一些什么别的组件退出后都会调用他们所对应的 CoroutineScope#cancel() 方法,避免内存泄漏。

启动一个协程

我们启动协程是通过 CoroutineScepe#launch() 扩展函数来完成的,其中的 Lambda 对象就是协程执行开始的第一个方法,我们来看看它的源码实现:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

这里会对传入的 CoroutineContext 进行修改,处理的方法是 newCoroutineContext();然后根据启动类型创建一个 coroutine 对象,默认的实现是 StandaloneCoroutine 类,然后调用 Coroutine#start() 方法,最后返回 Coroutine。这里要非常注意 StandaloneCoroutine 是一个 JobContinuation (很多人中文翻译它为续体,后续会重点讲),CoroutineScope(也就是他也能够启动协程,也就是当前协程的子协程)。后面我们会再看这些对象所处理的逻辑,现在有点懵也没关系。

我们继续看看 newCoroutineContext() 方法对传入的 CoroutineContext 的处理:

@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = foldCopies(coroutineContext, context, true)
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

这里会添加一个 CoroutineIdCoroutineContext 用来记录 Coroutine 的名字,其实就是修改协程运行时的线程的名字,添加上协程编号的信息;这里还会判断是否有 CoroutineInterceptorCoroutineContext,如果没有,使用 Dispatcher.Default 作为默认的 CoroutineInterceptor

然后简单看看 StandaloneCoroutine 的实现:

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

它是继承与 AbstractCoroutine,它重写了 handleJobException() 方法来处理协程的异常,处理调用的方法是 handleCoroutineException(),我们来看看它的实现:

@InternalCoroutinesApi
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
    // Invoke an exception handler from the context if present
    try {
        context[CoroutineExceptionHandler]?.let {
            it.handleException(context, exception)
            return
        }
    } catch (t: Throwable) {
        handleUncaughtCoroutineException(context, handlerException(exception, t))
        return
    }
    // If a handler is not present in the context or an exception was thrown, fallback to the global handler
    handleUncaughtCoroutineException(context, exception)
}

这里会查找 CoroutineContext 中是否有 CoroutineExceptionHandler,如果有异常就交给它来处理,如果没有就由 GlobalHandler 来处理,默认就是崩溃啦。

然后就是调用 Coroutine#start() 方法来启动一个协程了,看看它的代码:

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    start(block, receiver, this)
}

@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
    when (this) {
        DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        ATOMIC -> block.startCoroutine(receiver, completion)
        UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        LAZY -> Unit // will start lazily
    }

上面的 block 就是我们启动协程时传递过来的 Lambda 对象,然后 receivercompletion 都是我们上面创建的 StandaloneCoroutine 对象。继续看看 startCoroutineCancellable() 方法的实现:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

这里首先通过 createCoroutineUnintercepted() 方法将我们的 Lambda 对象构建成一个 Continuation 对象;然后调用 intercepted() 方法对原来的 Continuation 对象进行拦截器的处理;然后调用 resumeCancellableWith() 方法来触发协程的开始。上面的三个方法都非常重要,后面两个方法都会反复地看到他们。

这里在开始之前非常有必要解释一下 Continuation,很多人中文翻译成续体,我就不用中文的名字了,还是继续用英文来表示。前面我们说到协程的本质其实就是一个 Callback,而用来控制 Callback 回调成功/失败就是 Continuation 很重要的一个职责,同时它还记录了当前方法对应的执行的位置(像程序计数器一样),上次执行后的中间结果等等。Continuation 还会涉及到两个非常重要的概念那就是 suspendresume,中文翻译成挂起和恢复,我还是使用英文名词来表示他们。所谓的 suspend 其实就相当于我们调用了一个异步方法后等待 callback 时的协程状态,这时由于 callback 还没有回来当前的线程还可以做其他的任务;而 resume 就是 callback 回调成功需要唤醒原来的协程继续执行。这也就是很多人说得很邪乎的挂起与恢复,说白了就是调用异步任务时就挂起,callback 成功或者失败就是恢复,后面我们还会从源码中看到他具体是怎么挂起和恢复的(其实上面的 resumeCancellableWith() 方法就算是恢复)。

createCoroutineUnintercepted()

这个方法其实就是对原来 launch() 方法传递过来的 Lambda 方法进行改造,我们直接看看,反编译后的原来的 Lambda 对象:

截屏2023-12-25 上午11.38.38.png

其中我们发现它继承于 SuspendLambda 对象,而 SuspendLambda 继承于 ContinuationImpl 对象这个对象非常重要,是 Continuation 的一个实现。而当前的 Continuation 执行时的入口函数就是 invokeSuspend() 方法。后面我们会看到调用这个方法的逻辑。而它的构造函数中还有一个 Continuation 对象,这个其实就是上面的 StandalongCoroutine 对象,这个对象可以理解为父级的 Continuation 对象,在 Kotlin 源码中通常被称为 completion,这个就是表示当前 Continuation 执行完成后需要通知父级的 Continuation 继续执行。

intercepted()

我们继续看看 intercepted() 方法对我们原来的 Continuation 做了什么处理:

@SinceKotlin("1.3")  
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =  
(this as? ContinuationImpl)?.intercepted() ?: this

调用了 ContinuationImplintercepted() 方法,我们继续追踪:

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

继续调用 ContinuationIntercetor#interceptContinuation() 方法,我们之前设置的 CoroutineInterceptorDispatchers.Default。 我们来看看 CoroutineDispatcher#interceptContinuation() 方法的实现:

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =  
DispatchedContinuation(this, continuation)

将原来的 DispatherContinuation 作为参数构建了一个新的 DispatchedContinuation 对象。

resumeCancellableWith()

上面我们也讲到它就相当于 resume 协程,他也是我们第一次 resume 协程。我们来看看它的实现:

@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

由于我们在 intercetped() 方法中将原来的 Continuation 对象,转换成了 DispatchedContinuation 对象,所以我们这里调用的是 DispatchedContinuation#resumeCancellableWith() 方法,我们看看它的实现:

    @Suppress("NOTHING_TO_INLINE")
    internal inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
    
    @Suppress("NOTHING_TO_INLINE")
    internal inline fun resumeUndispatchedWith(result: Result<T>) {
        withContinuationContext(continuation, countOrElement) {
            continuation.resumeWith(result)
        }
    }

如果 Dispatcher#isDispatchNeeded() 返回 true 就表示可以使用 Dispatcher 来处理任务,也就是可以做到后续的任务最终在 Dispatcher 中的线程池中执行,通过 Dispatcher#dispatch() 方法下发任务,最后执行时是执行 run() 方法,实现是 DispatchedTask#run() 方法。反之就直接在当前线程调用 Continuation#resultWith() 方法。看看 DispatchedTask#run() 方法的实现:

    final override fun run() {
        assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
        val taskContext = this.taskContext
        var fatalException: Throwable? = null
        try {
            val delegate = delegate as DispatchedContinuation<T>
            val continuation = delegate.continuation
            withContinuationContext(continuation, delegate.countOrElement) {
                // ...
                if (job != null && !job.isActive) {
                    // ...
                } else {
                    if (exception != null) {
                        continuation.resumeWithException(exception)
                    } else {
                        continuation.resume(getSuccessfulResult(state))
                    }
                }
            }
        } catch (e: Throwable) {
            // ...
        } finally {
            // ...
        }
    }

run() 方法执行过程中会判断协程是否报错了,如果没有报错直接执行 Continuation#result() 方法,如果有错调用 Continuation#resumeWithException() 方法,上面说到这个 Continuation 在这里的实现类是 Lambada 对象继承 SuspendLambda 对象实现的,这个 resume() 方法最终是由 BaseContinuationImpl#resumeWith() 方法实现的,这个方法可以说是 Kotlin 协程的灵魂,我们来看看它的实现,后面的操作中会多次提到它,注意理解:

    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            // 通知 debug 协程已经 resume
            probeCoroutineResumed(current)
            with(current) {
                // 父级的 continuation
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        // 调用 invokeSuspend 入口函数
                        val outcome = invokeSuspend(param)
                        // 如果返回 COROUTINE_SUSPENDED 就表示挂起,同时退出循环
                        if (outcome === COROUTINE_SUSPENDED) return
                        // 如果返回不是 COROUTINE_SUSPENDED 就表示该 Continuation 方法已经执行完成了,需要通知它的父级的 Continuation,然后父级的 Continuation 继续执行。
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    // 恢复父级的 Continuation 
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

Continuation 中每调用一次 Kotlin 中的 suspend 的方法,Continuation 都会调用一次它的 invokeSuspend() 方法,当然每次调用 invokeSuspend() 方法执行的代码都不一样,Continuation 会通过一个 label 来记录 invokedSuspend() 执行的位置,后面我们会看到这部分代码,简单再描述一下上面的代码:

  1. 通过 probeCoroutineResumed() 方法通知协程进入 resume 状态。
  2. 调用 invokeSuspend() 方法进入协程方法的具体执行,然后判断返回值,如果返回值是 COROUTINE_SUSPENDED 就表示当前协程需要 suspend(也就是在执行一个异步任务,等待后续异步任务成功后再调用 resumeWith() 方法 resume。);如果返回值不是 COROUTINE_SUSPENDED 就表示当前的 Continuation 已经执行完成了。
  3. 当前 Continuation 执行完成了后,就需要将父级的 Continuation 修改为 resume 状态,唤醒父级 Continuation 继续执行。

我们站在初次启动协程的逻辑来看看 resumeWith() 这个方法,初次调用 resumeWith() 时,对应的 Continuation 就是 SuspendContinuation,而最终的实现是由我们 launch 时传递进去的 Lambda 对象生成的,而父级的 Continuation 就是 StandaloneCoroutineSuspendContinuation 执行完成后就会调用 StandaloneCoroutineresumeWith() 方法,这也就标志当前的协程已经执行完成。

由于在 Kotlin 协程相关代码编译过程中会生成多个类似于 Lambda 对象那样的匿名对象,不利于我们后续的源码分析,我把 demo 中的编译后的代码反编译后再重新整理命名,方便后续的代码的分析。

  • MainActivity
public final class MainActivity extends AppCompatActivity {
    @Override // androidx.fragment.app.FragmentActivity, androidx.activity.ComponentActivity, androidx.core.app.ComponentActivity, android.app.Activity
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        BuildersKt__Builders_commonKt.launch$default(CoroutineScopeKt.CoroutineScope(Dispatchers.getDefault()), null, null, new LaunchContinuation(this, null), 3, null);
    }

    public final java.lang.Object helloWorld(kotlin.coroutines.Continuation<? super java.lang.String> continuation) {
        HelloWorldContinuation helloWorldContinuation = null;
        if (!(continuation instanceof HelloWorldContinuation)) {
            helloWorldContinuation = new HelloWorldContinuation(this, continuation);
        } else {
            helloWorldContinuation = (HelloWorldContinuation) continuation;
        }
        Object suspend = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (helloWorldContinuation.label) {
            case 0: {
                kotlin.ResultKt.throwOnFailure(helloWorldContinuation.result);
                StringBuilder stringBuilder = new StringBuilder();
                helloWorldContinuation.param1 = this;
                helloWorldContinuation.param2 = stringBuilder;
                helloWorldContinuation.label = 1;
                Object result = hello(helloWorldContinuation);
                if (result == suspend) {
                    return suspend;
                } else {
                    // 我们的代码中不会有这种情况
                }
            }
            case 1: {
                kotlin.ResultKt.throwOnFailure(helloWorldContinuation.result);
                MainActivity mainActivity = (MainActivity) helloWorldContinuation.param1;
                StringBuilder stringBuilder = (StringBuilder) helloWorldContinuation.param2;
                String lastResult = (String) helloWorldContinuation.result;
                stringBuilder.append(lastResult);
                helloWorldContinuation.param1 = stringBuilder;
                helloWorldContinuation.param2 = null;
                helloWorldContinuation.label = 2;
                Object result = world(helloWorldContinuation);
                if (result == suspend) {
                    return suspend;
                } else {
                    // 我们的代码中不会有这种情况
                }
            }
            case 2: {
                kotlin.ResultKt.throwOnFailure(helloWorldContinuation.result);
                StringBuilder stringBuilder = (StringBuilder) helloWorldContinuation.param1;
                String lastResult = (String) helloWorldContinuation.result;
                stringBuilder.append(lastResult);
                return stringBuilder.toString();
            }
            default: {

            }
        }

        throw new UnsupportedOperationException("Method not decompiled: com.tans.coroutine_test.MainActivity.helloWorld(kotlin.coroutines.Continuation):java.lang.Object");
    }

    public final Object hello(Continuation<? super String> continuation) {
        return delaSuspend(500L, "Hello, ", continuation);
    }

    public final Object world(Continuation<? super String> continuation) {
        return delaSuspend(500L, "World!!", continuation);
    }

    public final <T> Object delaSuspend(long time, T t, Continuation<? super T> continuation) {
        CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted(continuation), 1);
        cancellable$iv.initCancellability();
        DelayKt.delay(time, new MainActivity$delaSuspend$2$1(cancellable$iv, t));
        Object result = cancellable$iv.getResult();
        if (result == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
            DebugProbesKt.probeCoroutineSuspended(continuation);
        }
        return result;
    }
}
  • LaunchContinuation

LaunchContinuation 就是由 launch() 方法的 Lambda 对象生成的 Continuation 对象,它本来是一个无规则的对象名字,为了代码好阅读我把它的名字修改成了 LaunchContinuation

final class LaunchContinuation extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
    int label;
    final MainActivity mainActivity;

    public LaunchContinuation(MainActivity mainActivity, Continuation<? super LaunchContinuation> continuation) {
        super(2, continuation);
        this.mainActivity = mainActivity;
    }

    @Override
    public final Continuation<Unit> create(Object obj, Continuation<?> continuation) {
        return new LaunchContinuation(this.mainActivity, continuation);
    }

    public final Object invoke(CoroutineScope coroutineScope, Continuation<? super Unit> continuation) {
        return ((LaunchContinuation) create(coroutineScope, continuation)).invokeSuspend(Unit.INSTANCE);
    }

    @Override
    public final Object invokeSuspend(Object $result) {
        Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure($result);
                this.label = 1;
                Object helloWorld = this.mainActivity.helloWorld(this);
                if (helloWorld != coroutine_suspended) {
                    $result = helloWorld;
                    break;
                } else {
                    return coroutine_suspended;
                }
            case 1:
                ResultKt.throwOnFailure($result);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        System.out.println($result);
        return Unit.INSTANCE;
    }
}
  • HelloWorldContinuation

HelloWroldContinuation 是由 helloWorld() 函数生成的一个 Continuation 对象,它本来也是一个匿名对象,为了方便源码阅读,我修改了该类的命名。

public final class HelloWorldContinuation extends ContinuationImpl {
    public Object param1;
    public Object param2;
    public int label;
    public Object result;
    final MainActivity mainActivity;

    public HelloWorldContinuation(MainActivity mainActivity, Continuation<? super HelloWorldContinuation> continuation) {
        super(continuation);
        this.mainActivity = mainActivity;
    }

    @Override
    public final Object invokeSuspend(Object obj) {
        this.result = obj;
        this.label |= Integer.MIN_VALUE;
        return this.mainActivity.helloWorld(this);
    }
}
  • DelayKt
public final class DelayKt {
    private static final Lazy delayExecutor$delegate = LazyKt.lazy(DelayKt$delayExecutor$2.INSTANCE);

    public static final ScheduledThreadPoolExecutor getDelayExecutor() {
        return (ScheduledThreadPoolExecutor) delayExecutor$delegate.getValue();
    }

    public static final void delay(long time, final Function0<Unit> callback) {
        Intrinsics.checkNotNullParameter(callback, "callback");
        getDelayExecutor().schedule(new Runnable() { // from class: com.tans.coroutine_test.DelayKt$$ExternalSyntheticLambda0
            @Override // java.lang.Runnable
            public final void run() {
                DelayKt.delay$lambda$0(Function0.this);
            }
        }, RangesKt.coerceAtLeast(time, 0L), TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void delay$lambda$0(Function0 callback) {
        Intrinsics.checkNotNullParameter(callback, "$callback");
        callback.invoke();
    }
}
  • DelayKt$delayExecutor$2
    这是 DelayExecutor 的代理对象
final class DelayKt$delayExecutor$2 extends Lambda implements Function0<ScheduledThreadPoolExecutor> {
    public static final DelayKt$delayExecutor$2 INSTANCE = new DelayKt$delayExecutor$2();

    DelayKt$delayExecutor$2() {
        super(0);
    }

    @Override
    public final ScheduledThreadPoolExecutor invoke() {
        return new ScheduledThreadPoolExecutor(1);
    }
}
  • MainActivity$delaSuspend$2$1
    这是 MainActivity 中调用 delay() 方法时的 Lambda 生成的对象:
final class MainActivity$delaSuspend$2$1 extends Lambda implements Function0<Unit> {
    final CancellableContinuation<T> $cont;
    final T $data;
    public MainActivity$delaSuspend$2$1(CancellableContinuation<? super T> cancellableContinuation, T t) {
        super(0);
        this.$cont = cancellableContinuation;
        this.$data = t;
    }

    @Override // kotlin.jvm.functions.Function0
    /* renamed from: invoke  reason: avoid collision after fix types in other method */
    public final void invoke2() {
        Continuation continuation = this.$cont;
        Result.Companion companion = Result.Companion;
        continuation.resumeWith(Result.m122constructorimpl(this.$data));
    }
}

LaunchContinuation 的执行流程

上一小节讲到协程启动执行开始的方法是调用 launch() 方法中 Lambda 生成的 ContinuationresumeWith() 方法,这个 Continuation 我们把它命名成 LaunchContinuation (实际上是一个和 Lambda 对象一样的匿名对象,对象名不易阅读),而 resumeWith() 方法最终会调用 LaunchContinuation#invokeSuspend() 方法(忘记了的同学看看前面分析 BaseContinuationImpl#resumeWith() 代码的部分),我们来看看 invokeSuspend() 方法的实现:

    public final Object invokeSuspend(Object $result) {
        Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                // 检查是否有异常
                ResultKt.throwOnFailure($result);
                // 修改 label 为 1
                this.label = 1;
                // 调用 helloWrold 方法,注意这里把自己当参数传递了过去
                Object helloWorld = this.mainActivity.helloWorld(this);
                if (helloWorld != coroutine_suspended) {
                    // 如果返回值不等于 coroutine_suspended 表示已经得到正确的返回结果,我们的例子不会执行这里
                    $result = helloWorld;
                    break;
                } else {
                    // 表示协程进入 suspend 状态,等待下次调用 resumeWith() 方法继续执行
                    return coroutine_suspended;
                }
            case 1:
                // 第二次执行 resuemWith() 方法,表示已经获取到 helloWorld() 中的返回值,检查返回中是否有异常。
                ResultKt.throwOnFailure($result);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        // 执行打印最后的结果
        System.out.println($result);
        return Unit.INSTANCE;
    }

我们来分析一下上面的代码:

  1. 记录 Continuation 的执行位置的对象是 label,默认 label 的值为 0。
  2. label 为 0 的逻辑:
  • 判断 result 中是否有异常
  • 修改 label 为 1,表示下次 resumeWith() 方法执行时,就是 case 1 那部分的逻辑。
  • 调用 MainActivity#helloWorld() 方法,注意这里将当前的 Continuation 对象传递给了 helloWorld() 方法。
  • 这里会判断 helloWorld() 方法的返回值,如果返回值不为 COROUTINE_SUSPEND,就表示已经拿到返回值,不需要进入 suspend 状态,然后进入 label 为 1 的逻辑(我们的代码不会返回 COROUTINE_SUSPEND);反之就表示没有获取到返回值,需要挂起,直接返回 COROUTINE_SUSPEND,等待下次执行 resumeWith() 方法恢复,那时也就表示已经获取到 helloWorld() 方法的返回值。
  1. label 为 1 的逻辑:
  • 判断 result 中是否有异常。
  • 执行打印 result 的结果。
  • 这里返回了一个 Unit 而不是 COROUTINE_SUSPEND,就表示当前方法执行完毕了,我们前面提到 LaunchContinuation 的父级 ContinuationStandalongCoroutine,在讲 BaseContinuationImpl#resumeWith() 方法时讲过,当前 Continuation 执行完毕后就会执行它的父级 Continuation,也就是后续会执行 StandalongCoroutine#resumeWith() 方法,也就是通知 StandalongCoroutine,协程已经执行完毕了。

helloWorld() 的执行流程

LaunchContinuation 的执行过程中会调用 MainActivity#helloWorld() 方法,我们再来看看 MainActivity#helloWorld() 方法是如何处理的:

    public final java.lang.Object helloWorld(kotlin.coroutines.Continuation<? super java.lang.String> continuation) {
        HelloWorldContinuation helloWorldContinuation = null;
        if (!(continuation instanceof HelloWorldContinuation)) {
            // 构建一个 HelloWorldContinuation 实例,注意这里将 Launch Continuation 作为 HelloWorldContinuation 的父级 Continuation
            helloWorldContinuation = new HelloWorldContinuation(this, continuation);
        } else {
            helloWorldContinuation = (HelloWorldContinuation) continuation;
        }
        Object suspend = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (helloWorldContinuation.label) {
            case 0: {
                // 检查 result 是否有异常 
                kotlin.ResultKt.throwOnFailure(helloWorldContinuation.result);
                // 构建 StringBuilder 对象
                StringBuilder stringBuilder = new StringBuilder();
                // 将 MainActivity 实例赋值给 HelloWorldContinuation#param1
                helloWorldContinuation.param1 = this;
                // 将 StringBuilder 实例赋值给 HelloWorldContinuation#param2
                helloWorldContinuation.param2 = stringBuilder;
                // 修改 label 为 1
                helloWorldContinuation.label = 1;
                // 执行 hello() 方法
                Object result = hello(helloWorldContinuation);
                // 返回值为 COROUTINE_SUSPEND 表示当前协程变为 suspend 状态
                if (result == suspend) {
                    return suspend;
                } else {
                    // 我们的代码中不会有这种情况
                }
            }
            case 1: {
                // 检查 result 是否有异常 
                kotlin.ResultKt.throwOnFailure(helloWorldContinuation.result);
                MainActivity mainActivity = (MainActivity) helloWorldContinuation.param1;
                // 获取参数 StringBuilder
                StringBuilder stringBuilder = (StringBuilder) helloWorldContinuation.param2;
                // 获取上次的 hello() 方法的返回结果
                String lastResult = (String) helloWorldContinuation.result;
                // 将 hello() 方法的返回结果添加到 StringBuilder 中
                stringBuilder.append(lastResult);
                // 将 StringBuilder 赋值给 HelloWorldContinuation#param1
                helloWorldContinuation.param1 = stringBuilder;
                helloWorldContinuation.param2 = null;
                // 修改 label 为 2
                helloWorldContinuation.label = 2;
                // 执行 world() 方法。
                Object result = world(helloWorldContinuation);
                // 返回值为 COROUTINE_SUSPEND 表示当前协程变为 suspend 状态
                if (result == suspend) {
                    return suspend;
                } else {
                    // 我们的代码中不会有这种情况
                }
            }
            case 2: {
            // // 检查 result 是否有异常 
                kotlin.ResultKt.throwOnFailure(helloWorldContinuation.result);
                // 获取 StringBuilder
                StringBuilder stringBuilder = (StringBuilder) helloWorldContinuation.param1;
                // 获取 world() 方法的返回值
                String lastResult = (String) helloWorldContinuation.result;
                // 将 world() 方法的返回值,添加到 StringBuilder 中
                stringBuilder.append(lastResult);
                // 将最终结果返回,也就表示当前 Continuation 执行完毕
                return stringBuilder.toString();
            }
            default: {

            }
        }

        throw new UnsupportedOperationException("Method not decompiled: com.tans.coroutine_test.MainActivity.helloWorld(kotlin.coroutines.Continuation):java.lang.Object");
    }

我再讲Retrofit 的源码的文章中也讲过 Kotlinsuspend 函数在编译处理后,会添加一个 Continuation 对象参数,然后返回值变成 Object,在 helloWorld() 方法中也得到了印证。
这个逻辑比 LaunchContinuation 中的状态稍微多了一些,然后各种参数也复杂一点,我们来分析一下:

  1. 如果 continuation 参数不是 HelloWorldContinuation (这种情况就是 LaunchContinuation),构建一个 HelloWorldContinuation 实例,它的父 ContinuationLaunchContinuation
  2. label 为 0 的逻辑:
  • 检查 result 中是否有异常。
  • 构建一个新的 StringBuilder 对象赋值给 HelloWorldContinuation#param2
  • 修改 label 为 1。
  • 执行 hello() 方法,如果返回值是 COROUTINE_SUSPEND 表示协程进入 suspend 状态,我们的代码一定会到这段逻辑;反之就直接执行 label 为 1 的逻辑。
  1. label 为 1 的逻辑:
  • 检查 result 中是否有异常。
  • HelloWorldContinaution#param2 中获取 StringBuilder 实例。
  • HelloWorldContinuation#result 中获取 hello() 方法的返回值。
  • hello() 方法的返回结果写入到 StringBuilder 中。
  • 构建一个新的 StringBuilder 对象赋值给 HelloWorldContinuation#param1
  • 修改 label 为 2,执行 world() 方法,和执行 hello() 方法一样,在我们的代码中一定会进入 suspend 状态。
  1. label 为 2 的逻辑:
  • 检查 result 中是否有异常。
  • HelloWorldContinaution#param1 中获取 StringBuilder 实例。
  • HelloWorldContinuation#result 中获取 world() 方法的返回值。
  • world() 方法的返回结果写入到 StringBuilder 中。
  • 将最终结果返回,也就标志 HelloWorldContinuation 执行完成了。

我们再简单看看 HelloWorldContinuation#invokeSuspend() 方法的实现:

    @Override
    public final Object invokeSuspend(Object obj) {
        this.result = obj;
        this.label |= Integer.MIN_VALUE;
        return this.mainActivity.helloWorld(this);
    }

代码非常简单,将上次的执行的结果写入到 result 中,然后调用 helloWorld() 方法。

hello()world() 的执行流程

    public final Object hello(Continuation<? super String> continuation) {
        return delaSuspend(500L, "Hello, ", continuation);
    }

    public final Object world(Continuation<? super String> continuation) {
        return delaSuspend(500L, "World!!", continuation);
    }

    public final <T> Object delaSuspend(long time, T t, Continuation<? super T> continuation) {
        CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted(continuation), 1);
        cancellable$iv.initCancellability();
        DelayKt.delay(time, new MainActivity$delaSuspend$2$1(cancellable$iv, t));
        Object result = cancellable$iv.getResult();
        if (result == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
            DebugProbesKt.probeCoroutineSuspended(continuation);
        }
        return result;
    }

hello()world() 方法都调用了 delaSuspend() 方法用来讲 delay() 方法的异步 callback 调用转换成一个协程 suspend 方法。源码是这样的:

    suspend fun <T> delaSuspend(time: Long, data: T): T {
        return suspendCancellableCoroutine { cont ->
            delay(time) {
                cont.resumeWith(Result.success(data))
            }
        }
    }

delaSuspend() 的处理中会创建一个 CancellableContinuationImpl 对象,它的父级 ContinuationHelloWorldContinuation,这里注意看调用了 IntrinsicsKt.intercepted() 方法来代理 HelloWorldContinuation,前面我有讲过这个方法,它会让后续的 resumeWith() 方法在 Dispacher 中对应的线程中执行。调用了 DelayKt.delay() 方法然后传入了一个 Lambda 对象,我们看看 Lambda 对象中的方法执行:

    public final void invoke2() {
        Continuation continuation = this.$cont;
        Result.Companion companion = Result.Companion;
        continuation.resumeWith(Result.m122constructorimpl(this.$data));
    }

简单且高效,在回调成功时,直接调用 CancellableContinuationImpl#resumeWith() 方法使协程进入 resume 状态,后续的逻辑的执行的线程由 Dispatcher 决定。

最后

看到这里你可能还是有点懵的状态,这是非常正常的,我再来完整的理一下整个流程:

CoroutineScope#launch() 方法中会创建一个 StandaloneCoroutine 对象,然后通过 launch() 方法传过来的 Lambda 对象构建一个 LaunchContinuation 对象它的父级 ContinuationStandaloneCoroutine,然后调用 LaunchContinuation#resumeWith() 方法标志协程开始。
LaunchContinuation 第一次执行 resumeWith() 时,会调用 helloWorld() 方法,这里会 HelloWorldContinuation 对象,它的父级 ContinuationLaunchContinuation 对象。第一次执行 helloWorld() 方法时会调用 hello() 方法,在 hello() 方法中会构建一个 CancellableContinuationImpl 对象,它的父级 ContinuationHelloWorldContinuation,当 hello() 方法的 callback 异步调用成功后会调用 CancellableContinuationImpl#resumeWith() 方法 resume 协程,最终会调用到 HelloWorldContinuation#resumeWith() 方法中去,这里也会触发 helloWorld() 方法的第二次执行,在第二次执行的过程中会调用 world() 方法,world() 方法的处理逻辑和 hello() 方法一模一样,回调完成后就会触发第三次调用 helloWorld() 方法,第三次调用的时候会组合 hello()world() 两次方法的结果得到最终的结果,然后返回,这时 HelloWorldContinuation 就会调用它的父级的 Continuation 中的 resumeWith() 方法,也就是 LaunchContinuation#resumeWith() 方法,用来通知 LaunchContinuation 表示 helloWorld() 方法已经执行完毕,这个时候是第二次执行 LaunchContinuation#resumeWith(),这时他也不用再进入 suspend 状态,又会继续调用它的父级 ContinuationresumeWith() 方法,也就是 StandaloneConroutine#resumeWith() 的方法,它的这个方法调用后也就标志这个协程执行完毕了。

如果到这里还是没有理解这个过程,推荐你再多看几遍,一定能够看懂的,其实就是通过 Continuation 来处理 callback 的套娃操作,当理解了这个过程后,协程的很多地方的源码你就能够看得懂了。


标签:工作原理笔记Kotlin程源


程序开发学习排行
最近发表
网站分类
标签列表