记录一次 Kotlin 协程执行先后的面试问题
作者:访客发布时间:2023-12-26分类:程序开发学习浏览:321
记录一次 Kotlin 协程执行先后的面试问题
前几天有同事问了我一个这样的问题,问下面的程序打印的顺序是什么:
object KotlinMain {
@JvmStatic
fun main(args: Array<String>) {
runBlocking {
launch {
println("1")
}
launch(Dispatchers.IO) {
println("2")
}
launch(Dispatchers.Default) {
println("3")
}
launch(Dispatchers.Unconfined) {
println("4")
}
}
}
}
这里直接给出答案,绝大多数的情况下输出是 2 3 4 1,如果移除掉打印 2 3 的代码,那么最终的打印输出一定是 4 1。这个问题考察了被面试者对于 Kotlin 协程的理解,如果对于协程的基本工作原理还不理解的同学可以参考一下我之前写过的一篇文章: Kotlin 协程源码阅读笔记 —— 协程工作原理。
协程任务的调度都是通过 ContinuationInterceptor 来控制的,像我们平时常用的 Dispatchers.Main,Dispatchers.IO 和 Dispatcher.Default 他们都是属于 ContinuationInterceptor。像遇到这种调用先后的问题首先要重点查看对应的 ContinuationInterceptor 的工作方式。
首先看看 runBlocking() 方法的源码:
@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// ...
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
注意看上面的代码,在没有 ContinuationInterceptor 的时候会添加一个 ThreadLocalEventLoop.eventLoop 的 CoroutineContext,它也是一个 ContinuationInterceptor。 我们去看看 eventLoop:
private val ref = CommonThreadLocal<EventLoop?>()
internal val eventLoop: EventLoop
get() = ref.get() ?: createEventLoop().also { ref.set(it) }
internal actual fun createEventLoop(): EventLoop = BlockingEventLoop(Thread.currentThread())
上面的代码也很简单如果 EventLoop 的缓存为空,就创建一个新的 EventLoop,同时以当前线程作为构造函数的参数。
我在Kotlin 协程源码阅读笔记 —— 协程工作原理文章中有分析过 coroutine.start() 的源码,它会为 runBlocking() 方法传入的 Lambda 对象构造成一个 Continuation 对象,这个 Continuation 对象又会被 DispatchedContinuation 对象代理,然后执行 DispatchedContinuation#resumeWith() 方法来触发协程执行,通过 DispatchedContinuation 的处理,它的代码最终会在 EventLoop 的线程中执行。这里要记住 EventLoop 中只有一个线程。
而 BlockingCoroutine#joinBlocking() 方法就是在等待协程执行完成,在执行完成之前如果当前线程需要等待是通过 LockSupport.parkNanos() 方法来暂停线程的,这个和 Java 中的 AQS 队列等待时一样。当协程执行完毕后就去拿到协程的 result 然后返回。具体的 joinBlocking() 的代码我就不分析了。
首先执行协程时启动了一个子协程子协程中打印了 1,通过 launch() 方法启动子协程时如果没有指定新的 ContinuationInterceptor 那它就会复用父协程的,也就是说会使用 BlockingEventLoop 作为它的 ContinuationInterceptor,所以它执行时会向 BlockingEventLoop 添加一个任务,而 BlockingEventLoop 中只有一个线程,而目前正在执行 runBlocking() 中的 Lambda 中的代码,所以这时它是忙碌的,所以子协程中的任务会被添加到等待队列中。
我们先忽略打印 2 和 3 的子协程,直接看看打印 4 的子协程,它指定的 ContinuationInterceptor 是 Dispatchers.Unconfined, Dispatchers.Unconfined 非常特殊,他表示不需要在 ContinuationInterceptor 中调度,就直接在当前的线程执行。
internal object Unconfined : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) {
// ...
}
override fun toString(): String = "Dispatchers.Unconfined"
}
Unconfined 就是通过 isDispatchNeeded() 方法返回 false 来实现它的这种不在 ContinuationInterceptor 中调度的特性的。
我们再简单看看 DispatchedContinuation 中的 resumeWith() 代码:
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
// 通过 dispatcher 调度
_state = state
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this)
} else {
// 直接在当前线程执行
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
通过 Dispatchers.Unconfined 处理的协程的代码就相当于如下的代码(忽略协程的创建):
runBlocking {
launch {
println("1")
}
println("4")
}
由于默认 runBlocking() 是一个单线程的 ContinuationInterceptor,所以 println("1") 的代码需要等待 runBlocking() 中的 Lambda 方法执行完毕后才能执行,由于 println("4") 直接就会在 Lambda 中执行,所以打印的顺序 4 一定在 1 前面。
那么为什么 2 和 3 在 4 的前面呢?很简单因为他们在执行时指定了使用 Dispatchers.IO 和 Dispatchers.Default,他们不会在默认的 BlockingEventLoop 执行,Dispatchers.IO 与 Dispatchers.Default 也是闲置的,不需要将任务加入到等待队列中,所以他们能够直接执行任务。所以输出是 2 3 4 1。
这时候你可能就开窍了,println("1") 被延迟执行的原因是由于 runBlocking() 默认的 ContinuationInterceptor 中只有一个线程来处理任务,所以导致 println("1") 的任务等待,那么我用别的只有一个线程的 ContinuationInterceptor 是不是有一样的效果?是的。以下的代码也会得到一样的输出结果:
runBlocking(Executors.newFixedThreadPool(1).asCoroutineDispatcher()) {
launch {
println("1")
}
launch(Dispatchers.IO) {
println("2")
}
launch(Dispatchers.Default) {
println("3")
}
launch(Dispatchers.Unconfined) {
println("4")
}
}
而下面的代码的结果就是 1 2 3 4 :
runBlocking(Executors.newFixedThreadPool(2).asCoroutineDispatcher()) {
launch {
println("1")
}
launch(Dispatchers.IO) {
println("2")
}
launch(Dispatchers.Default) {
println("3")
}
launch(Dispatchers.Unconfined) {
println("4")
}
}
标签:Kotlin
- 程序开发学习排行
- 最近发表


