Kotlin on JVM 协程的可重入实现

1. 协程?

协程的优势就是上下文切换的优势,不用打扰到操作系统。用户自己实现协程的切换。协程可以主动让出线程的能力,让Runtime进行调度。而面对线程而言,通常不是主动让出的,而是被操作系统强制调度。操作系统是感知不到Runtime层面的协程的,也不关心是不是正在进行协程调度。线程的现场信息由操作系统维护,协程的现场信息由Runtime来维护。

  • Kotlin on JVM 的“协程”,不是真正意义上的协程 ?
  • 效果上,比在Java线程池的的Runable基础上面,Runable 不具备暂停调度可恢复的特性。Runable从开始到结束一直占用当前线程,不管是runing还是wait,而Kotlin在协程”wait“的时候,会释放当前线程。

2. 协程的创建和执行过程

写个协程来测试,Example1

  • 添加 jvm参数 -Dkotlinx.coroutines.debug 打印线程信息的时候可以看到协程和线程的关系
1
2
3
4
5
6
7
8
9
suspend fun test(){
    val job = GlobalScope.launch {
        delay(2000)
        printThreadInfo()
        delay(2000)
        printThreadInfo()
    }
    Thread.sleep(10000)
}
  • 输出
1
2
DefaultDispatcher-worker-1 @coroutine#1
DefaultDispatcher-worker-1 @coroutine#1
  • 看这个线程名称,有没有联想到了线程池?

  • 协程的分发

  • 可以通过跟踪调用链发现

  • Schduler 的结构

  • 两个都是全局队列,区别只是一个负责CPU密集Task,一个负责IO密集Task
  • 全局队列之外,每个线程自己还有一个私有队列
  • vmMain/scheduling/CoroutineScheduler.kt:383
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
    trackTask() // this is needed for virtual time support
    //  包装成Task,Task实际上继承Runable
    val task = createTask(block, taskContext)
    // try to submit the task to the local queue and act depending on the result
    
    val currentWorker = currentWorker()
    // 优先提交到当前Worker,当前Worker 可能是null,提交里面直接return
    val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
    if (notAdded != null) {
        //然后尝试把Task提交GlobalQueue
        if (!addToGlobalQueue(notAdded)) {
            // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
            throw RejectedExecutionException("$schedulerName was terminated")
        }
    }
    val skipUnpark = tailDispatch && currentWorker != null
    // Checking 'task' instead of 'notAdded' is completely okay
    if (task.mode == TASK_NON_BLOCKING) {
        if (skipUnpark) return
        // 里面会先尝试去parkedWorkers拿线程,拿不到就创建
        signalCpuWork()
    } else {
        // Increment blocking tasks anyway
        signalBlockingWork(skipUnpark = skipUnpark)
    }
}
  • 多个线程提交Task到Queue的并发锁问题,由Queue保证
  • 创建woker线程之后,启动woker线程
  • 这里启动woker实际上是一个while,while里面不停的去获取Task执行Task,没有Task就park,和Hanlder里面loop很像。
  • jvmMain/scheduling/CoroutineScheduler.kt:670
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
       private fun runWorker() {
            var rescanned = false
            while (!isTerminated && state != WorkerState.TERMINATED) {
                // 这里去拿Task
                val task = findTask(mayHaveLocalTasks)
                // 拿到了就去执行 ,然后重复循环
                if (task != null) {
                    rescanned = false
                    minDelayUntilStealableTaskNs = 0L
                    executeTask(task)
                    continue
                } else {
                    mayHaveLocalTasks = false
                }
                


                // 这里先尝试了park  minDelayUntilStealableTaskNs ,然后把                          // minDelayUntilStealableTaskNs = 0
                  // 主要是 存在正在窃取的任务,还没窃取完成
                 if (minDelayUntilStealableTaskNs != 0L) {
                    if (!rescanned) {
                        rescanned = true
                    } else {
                        rescanned = false
                        tryReleaseCpu(WorkerState.PARKING)
                        interrupted()
                        LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                        minDelayUntilStealableTaskNs = 0L
                    }
                    continue
                }
                //开始park,然后等待唤醒
                tryPark()
            }
            tryReleaseCpu(WorkerState.TERMINATED)
        }
  • 再看一怎么findTask的?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
fun findTask(scanLocalQueue: Boolean): Task? {
    if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
    // If we can't acquire a CPU permit -- attempt to find blocking task
    val task = if (scanLocalQueue) {
        // 
        localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
    } else {
        globalBlockingQueue.removeFirstOrNull()
    }
    return task ?: trySteal(blockingOnly = true)
}


private fun findAnyTask(scanLocalQueue: Boolean): Task? {
    /*
     * Anti-starvation mechanism: probabilistically poll either local
     * or global queue to ensure progress for both external and internal tasks.
     */
    if (scanLocalQueue) {
        val globalFirst = nextInt(2 * corePoolSize) == 0
        if (globalFirst) pollGlobalQueues()?.let { return it }
        localQueue.poll()?.let { return it }
        if (!globalFirst) pollGlobalQueues()?.let { return it }
    } else {
        pollGlobalQueues()?.let { return it }
    }
    return trySteal(blockingOnly = false)
}
  • tryAcquireCpuPermit()尝试当前是不是占有CPU控制权,有的话就看全局Task队列和私有Task队列哪个优先级更高,哪个高就从哪个取任务

  • 如果没有CPU使用权,那么优先看本地队列是不是有任务,没有的话再去取全局的IO密集型任务。

  • 如果都没拿到任务的话,最后会通过 trySteal() 尝试去偷别的woker线程的任务

  • 这里可以清楚了协程的分发流程,实际上是线程池

  • 总结:

    • 新启动一个协程,通过scheduler 的 dispatch 分发,包装成一个Task,如果是当前woker线程,则直接加入到当前Woker线程的Task队列里面去,如果当前Woker线程是null,加入到global队列中,并判断线程池参数来是否启动一个新的woker,是不是可以从pakedWoker线程队列里面唤醒一个woker。
    • Woker 线程一旦启动之后就是 在一个死循环 runwoker中,不断的去findTask来执行,findTask优先从自己线程的Task队列拿,然后全局的Task队列,最后还可以去“偷”其他woker的Task。没有新的Task就会park住当前线程,放入pakerWoker线程队列里面,等待下次有新的唤醒。

3.Kotlin on JVM 协程的可重入性

  • 协程重入?Example2
1
2
3
4
5
6
7
8
9
  GlobalScope.launch {
        printThreadInfo()
        delay(5000)
        printThreadInfo()
    }
    sleep(1000)
    GlobalScope.launch {
        printThreadInfo()
    }
  • 输出信息
1
2
3
DefaultDispatcher-worker-1 @coroutine#1
DefaultDispatcher-worker-1 @coroutine#2
DefaultDispatcher-worker-1 @coroutine#1
  • 可以看到创建的2个协程都在线程worker-1,协程1在线程1上执行了一下,然后让出了线程,接着协程2获得线程执行,最后协程1重新获得线程执行。
  • 嗯???,这种“调度”机制是怎么实现的?一个协程的暂停和恢复是怎么实现的???
  • 协程的生命周期
  • 实际上Kotlin JVM Coroutine 的运行依赖于各种 Callback 机制
  • CPS(Continuation-Passing-Style, 续体传递风格)
  • 函数通过回调传递结果
  • 一般的写法
1
2
3
4
5
6
7
8
class Test {
    public static long plus(int i1, int i2) {
        return i1 + i2;
    }
    public static void main(String[] args) {
        System.out.println(plus(1, 2));
    }
}
  • CPS 写法
1
2
3
4
5
6
7
8
9
10
11
class Test {
    interface Continuation {
        void next(int result);
    }
    public static void plus(int i1, int i2, Continuation continuation) {
        continuation.next(i1 + i2);
    }
    public static void main(String[] args) {
        plus(1, 2, result -> System.out.println(result));
    }
}
  • Kotlin Continuation 接口
1
2
3
4
5
6
7
8
9
10
11
public interface Continuation; {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext
    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Resultlt)
}
  • 每次切出去到 suspend 状态,再进入 running 状态都是通过 resumeWith 接口
  • Coroutine 实现了 Job,Continuation,CoroutineScope 接口
1
2
3
4
5
6
7
8
9
public abstract class AbstractCoroutine;(
    /**
     * The context of the parent coroutine.
     */
    @JvmField
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<, CoroutineScope {
    // 省略
  • Kotlin Coroutine —> Java 反编译

  • 对Example1做反编译,看下字节码的层面

1
2
3
4
5
6
7
8
9
10
 public static final Object test(Continuation<? super Unit> $completion) {
        Job launch$default = BuildersKt__Builders_commonKt.launch$default(GlobalScope.INSTANCE, (CoroutineContext) null, (CoroutineStart) null, new KotlinCodeKt$test$job$1((Continuation) null), 3, (Object) null);
        Thread.sleep(10000);
        return Unit.INSTANCE;
    }
    public static final void printThreadInfo() {
        Thread currentThread = Thread.currentThread();
        Intrinsics.checkExpressionValueIsNotNull(currentThread, "Thread.currentThread()");
        System.out.println(currentThread.getName());
    }
  • Kotlin 函数 生成了一个 匿名函数 KotlinCodeKt$test$job$1 继承了 SuspendLambda
  • 而 SuspendLambda <—- ContinuationImpl <—- BaseContinuationImpl <—-Continuation
  • 也就是说我们写的代码,被编译器包装成了一个 Continuation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
final class KotlinCodeKt$test$job$1 extends SuspendLambda implements Function2&lt;CoroutineScope, Continuation&lt;? super Unit&gt;, Object&gt; {
    Object L$0;
    int label;
    private CoroutineScope p$;
    KotlinCodeKt$test$job$1(Continuation continuation) {
        super(2, continuation);
    }
    public final Continuation&lt;Unit&gt; create(Object obj, Continuation&lt;?&gt; continuation) {
        Intrinsics.checkParameterIsNotNull(continuation, &quot;completion&quot;);
        KotlinCodeKt$test$job$1 kotlinCodeKt$test$job$1 = new KotlinCodeKt$test$job$1(continuation);
        CoroutineScope coroutineScope = (CoroutineScope) obj;
        kotlinCodeKt$test$job$1.p$ = (CoroutineScope) obj;
        return kotlinCodeKt$test$job$1;
    }
    public final Object invoke(Object obj, Object obj2) {
        return ((KotlinCodeKt$test$job$1) create(obj, (Continuation) obj2)).invokeSuspend(Unit.INSTANCE);
    }
   // 调用 resumeWith() 恢复协程调用的方法,它里面的逻辑,是调用invokeSuspend() 这个方法内部就是我们自己写的协程任务的代码
   // return coroutine_suspended 表示协程挂起了
    public final Object invokeSuspend(Object $result) {
        CoroutineScope $this$launch;
        Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        int i = this.label;
        if (i == 0) {
            ResultKt.throwOnFailure($result);
            $this$launch = this.p$;
            this.L$0 = $this$launch;
            this.label = 1;
            //调用我们写的代码片段
            if (DelayKt.delay(2000, this) == coroutine_suspended) {
                return coroutine_suspended;
            }
        } else if (i == 1) {
            $this$launch = (CoroutineScope) this.L$0;
            ResultKt.throwOnFailure($result);
        } else if (i == 2) {
            CoroutineScope $this$launch2 = (CoroutineScope) this.L$0;
            ResultKt.throwOnFailure($result);
            KotlinCodeKt.printThreadInfo();
            return Unit.INSTANCE;
        } else {
            throw new IllegalStateException(&quot;call to &#39;resume&#39; before &#39;invoke&#39; with coroutine&quot;);
        }
        //调用我们写的代码片段
        KotlinCodeKt.printThreadInfo();
        this.L$0 = $this$launch;
        this.label = 2;
         //调用我们写的代码片段
         // 这里实际上就 达到dealy 的效果,postDelayed(Time=2000,Runnable = this)
         // 让this Runable 获得再次调度的机会
         // 这里是非阻塞的
        if (DelayKt.delay(2000, this) == coroutine_suspended) {
            return coroutine_suspended;
        }
        CoroutineScope coroutineScope = $this$launch;
         //调用我们写的代码片段
        KotlinCodeKt.printThreadInfo();
        return Unit.INSTANCE;
    }
}




public final override fun resumeWith(result: Result&lt;Any?&gt;) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke &quot;resume&quot; debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result&lt;Any?&gt; =
                    try {
                        val outcome = invokeSuspend(param)
                        // 如果是SUSPENDED则直接return这次resume
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
  • 通过关键词结合编译期我们自己写的协程代码进行包装成 Continuation 对象,实际就是对代码进行了分片,在执行和唤醒的粒度就是代码片
  • Kotlin on JVM 协程本质是通过编译分片和状态机来实现,没有协程的现场恢复和保留的过程。
  • dealy暂停,实际上是return了,dealy的时间实际上是通过 postDelayed(Time,_Runnable)来实现,但是_postDelay并不是post到scheduler里面的Task队列里面的。
  • Dealy 2s 又是怎么实现的???
1
DelayKt.delay(2000, this)
  • 联系到handler ,涉及到延时的,肯定是有序的,怎么实现时间有序?
  • 实际是 包装成了一个 DelayedResumeTask ,等时间到了,最后调用task.run 还是 走到了上面的scheduler的dispatch流程。
1
2
3
4
5
6
7
private inner class DelayedResumeTask(
    nanoTime: Long,
    private val cont: CancellableContinuation<Unit>
) : DelayedTask(nanoTime) {
    override fun run() { with(cont) { resumeUndispatched(Unit) } }
    override fun toString(): String = super.toString() + cont.toString()
}
  • 延迟2秒是怎么实现的???
  • 我们写的dalay首先直接会调到
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine [email protected] { cont: CancellableContinuation<Unit> ->
        // 这里获取了context.delay
         cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}


// 实际上获取了 DefaultDelay 
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay


// DefaultDelay 就是 DefaultExecutor
internal actual val DefaultDelay: Delay = DefaultExecutor
  • 也就是说dealy的时候和 DefaultExecutor挂钩了
  • 然后 dealy方法 会先调到 EventLoop.common.kt:370
1
2
3
4
5
6
7
8
9
private fun scheduleImpl(now: Long, delayedTask: DelayedTask): Int {
    if (isCompleted) return SCHEDULE_COMPLETED
    val delayedQueue = _delayed.value ?: run {
        _delayed.compareAndSet(null, DelayedTaskQueue(now))
        _delayed.value!!
    }
    //这里可以看 delayedQueue = _delayed实际上是DelayedTaskQueue
    return delayedTask.scheduleTask(now, delayedQueue, this)
}
  • DelayedTaskQueue 继承了ThreadSafeHeap
  • 继续跟踪 scheduleTask,实际上走到commonMain/internal/ThreadSafeHeap.kt:64,添加到最后
1
2
3
4
5
6
7
8
public inline fun addLastIf(node: T, cond: (T?) -> Boolean): Boolean = synchronized(this) {
    if (cond(firstImpl())) {
        addImpl(node)
        true
    } else {
        false
    }
}
  • addImpl里面实际上就是添加到数组的最后面
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
internal fun addImpl(node: T) {
    assert { node.heap == null }
    node.heap = this
    val a = realloc()
    val i = size++
    a[i] = node
    node.index = i
    // 调整
    siftUpFrom(i)
}


// 这里在比较调整,看下 实现的 Comparable<T>
private tailrec fun siftUpFrom(i: Int) {
    if (i <= 0) return
    val a = a!!
    val j = (i - 1) / 2
    if (a[j]!! <= a[i]!!) return
    swap(i, j)
    siftUpFrom(j)
}


// 是在比较 dealy time
override fun compareTo(other: DelayedTask): Int {
    val dTime = nanoTime - other.nanoTime
    return when {
        dTime > 0 -> 1
        dTime < 0 -> -1
        else -> 0
    }
}
  • 具体的延时是通过 DelayedTaskQueue来定时调度任务的,实际上是通过最小堆来排序的,添加的node 通过 比较 dealy 时间。
  • sheduleImpl完成之后,也就加入到DefaultExecutor的 DelayedTaskQueue之后,调用了unpark
1
2
3
4
5
6
7
8
public fun schedule(now: Long, delayedTask: DelayedTask) {
    when (scheduleImpl(now, delayedTask)) {
        SCHEDULE_OK -> if (shouldUnpark(delayedTask)) unpark()
        SCHEDULE_COMPLETED -> reschedule(now, delayedTask)
        SCHEDULE_DISPOSED -> {} // do nothing -- task was already disposed
        else -> error("unexpected result")
    }
}
  • Unpark 里面 发现线程为null,则会创建线程,start线程,直接就是DefaultExecutor的run方法了,run里面调用processNextEvent。这里就是取出来用了,调用 processNextEvent() 来处理事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 留意 processNextEvent() 是有返回值的,实际上返回的线程的park时间
override fun processNextEvent(): Long {
    // 省略部分代码
    //这里出现了 _delayed ,也就是上面提到了DelayedTaskQueue
    val delayed = _delayed.value
    if (delayed != null && !delayed.isEmpty) {
        val now = nanoTime()
            // while ,是可能存在多个task到期了
            while (true) {
            // 这里从 DelayedTaskQueue 取出来放到了 队列 _queue 中去了
            // it.timeToExecute实际上就是比较 now - nanoTime >= 0           
            delayed.removeFirstIf {
                if (it.timeToExecute(now)) {
                    enqueueImpl(it)
                } else
                    false
            } ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
        }
    }
    // 从 _queue 里面取出一个来DelayedResumeTask任务执行
    // 虽然本次可能加入多个Task,但是本次只执行一个,剩下的下次执行,
    val task = dequeue()
    if (task != null) {
        task.run()
        return 0
    }


    // 需要park
    return nextTime
}


// 根据queue和nextDelayedTask判断,本次需要park的时间
protected override val nextTime: Long
    get() {
        if (super.nextTime == 0L) return 0L
        val queue = _queue.value
        when {
            queue === null -> {} // empty queue -- proceed
            queue is Queue<*> -> if (!queue.isEmpty) return 0 // non-empty queue
            queue === CLOSED_EMPTY -> return Long.MAX_VALUE // no more events -- closed
            else -> return 0 // non-empty queue
        }
        val nextDelayedTask = _delayed.value?.peek() ?: return Long.MAX_VALUE
        return (nextDelayedTask.nanoTime - nanoTime()).coerceAtLeast(0)
    }
  • 为什么要取出来放到_queue,直接取出来执行不行?
  • 取出来一个,执行一个,需要时间,而且在多次修改ThreadSafeHeap排序堆,同时其他线程也可能存在竞争情况,是带有锁的,一次性取来,取出来的task放到_queue是无序的。
  • Dealy 会导致协程切换线程吗?
  • 可以,协程可能切换线程。如果当前协程获得恢复执行,而原先的线程又在执行其它线程,则协程会在其它线程上执行。也就是说,协程在生命周期中可以更换线程。

为什么Kotlin JVM 要用线程池加CPS的方式来实现 ?直接用线程池不行?

  • 主要是JVM原生没有协程的概念,感知不到协程,只有线程,JVM要真正的像GO那样元素支持协程,是需要修改JVM的,或者抛弃JVM,但这样通用性和可扩展性就会很受影响
  • Kotlin 的目标是想要全平台的,Kotlin on JS ,Kotlin on Native。你JVM没有的能力,不代表其他没有。可以提供统一的编程接口,不用关心平台的实现。
  • 提供友好的异步编程方式,简洁 更高抽象隐藏异步实现细节,实现写异步如同同步的顺序式写法,开发维护成本大大减低、但内存上有部分的妥协,协程因为创建了中间的一系列封装对象,比传统的多线程编程增加了额外的内存消耗

为什么用 dealy 代替 sleep ?

  • 了解了Kotlin的协程实现原理,在sleep的这段时间,还是占用当前线程,其他协程得不到来这个线程执行的机会。本质上sleep是sleep了当前线程,而dealy是dealy了当前协程。

附录 参考资料