fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false){ trackTask() // this is needed for virtual time support // 包装成Task,Task实际上继承Runable val task = createTask(block, taskContext) // try to submit the task to the local queue and act depending on the result val currentWorker = currentWorker() // 优先提交到当前Worker,当前Worker 可能是null,提交里面直接return val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) if (notAdded != null) { //然后尝试把Task提交GlobalQueue if (!addToGlobalQueue(notAdded)) { // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted throw RejectedExecutionException("$schedulerName was terminated") } } val skipUnpark = tailDispatch && currentWorker != null // Checking 'task' instead of 'notAdded' is completely okay if (task.mode == TASK_NON_BLOCKING) { if (skipUnpark) return // 里面会先尝试去parkedWorkers拿线程,拿不到就创建 signalCpuWork() } else { // Increment blocking tasks anyway signalBlockingWork(skipUnpark = skipUnpark) } }
fun findTask(scanLocalQueue: Boolean): Task? { if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue) // If we can't acquire a CPU permit -- attempt to find blocking task val task = if (scanLocalQueue) { // localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull() } else { globalBlockingQueue.removeFirstOrNull() } return task ?: trySteal(blockingOnly = true) }
private fun findAnyTask(scanLocalQueue: Boolean): Task? { /* * Anti-starvation mechanism: probabilistically poll either local * or global queue to ensure progress for both external and internal tasks. */ if (scanLocalQueue) { val globalFirst = nextInt(2 * corePoolSize) == 0 if (globalFirst) pollGlobalQueues()?.let { return it } localQueue.poll()?.let { return it } if (!globalFirst) pollGlobalQueues()?.let { return it } } else { pollGlobalQueues()?.let { return it } } return trySteal(blockingOnly = false) }
publicinterfaceContinuation; { /** * The context of the coroutine that corresponds to this continuation. */ public val context: CoroutineContext /** * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the * return value of the last suspension point. */ public fun resumeWith(result: Resultlt) }
publicfinal override fun resumeWith(result: Result<Any?>){ // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume var current = this var param = result while (true) { // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure // can precisely track what part of suspended callstack was already resumed probeCoroutineResumed(current) with(current) { val completion = completion!! // fail fast when trying to resume continuation without completion val outcome: Result<Any?> = try { val outcome = invokeSuspend(param) // 如果是SUSPENDED则直接return这次resume if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // top-level completion reached -- invoke and return completion.resumeWith(outcome) return } } } }
internal fun addImpl(node: T){ assert { node.heap == null } node.heap = this val a = realloc() val i = size++ a[i] = node node.index = i // 调整 siftUpFrom(i) }
// 这里在比较调整,看下 实现的 Comparable<T> private tailrec fun siftUpFrom(i: Int){ if (i <= 0) return val a = a!! val j = (i - 1) / 2 if (a[j]!! <= a[i]!!) return swap(i, j) siftUpFrom(j) }
// 是在比较 dealy time override fun compareTo(other: DelayedTask): Int { val dTime = nanoTime - other.nanoTime return when { dTime > 0 -> 1 dTime < 0 -> -1 else -> 0 } }