暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kotlin协程基本原理

周末随心分享 2021-09-12
615
背景
协程对于大家来说肯定不陌生了,“一个进程有多个线程,一个线程有多个协程”,协程作为轻量级“线程”,最大特点是可以把异步逻辑以同步方式写出来,让代码更加清晰。同时协程可以和生命周期绑定,比如viewModelScope、lifecycleScope等等,极大的方便了开发者。那协程的不阻塞线程到底是什么原理呢?它和线程是什么关系呢?它真的比线程开销小吗?这里帮大家初探一下!

 

协程使用

1、以viewModelScope为例,我们先来看一下简单的使用样例,如下:

    viewModelScope.launch(Dispatchers.Main) {
    val result = withContext(Dispatchers.IO) {
    searchRepository.getWebSites()
    }
        if (result is Result.Success) {
           emitArticleUiState(showHot = true, showWebSites = result.data)    
        }
    }
    viewModelScope本身就是androidX提供的ViewModel的一个扩展,它会在viewModel销毁的同时终止所有操作,是一个挺不错的扩展。

    上面代码可以看到通过viewModelScope开启一个基于UI线程的协程,然后协程里面又可以通过withContext切换线程。等等!协程?线程?,协程和线程还有关系,不是说线程可以创建多个协程吗?答案我们一步步揭晓

     

    2、协程里的线程

    在上面样例中我们是不是注意到了一个特别的对象:Dispatchers.IO,切换环境都是通过这个指定的,那我们就从这个入手吧,先来看一下它的本质:

      @JvmStatic
      public val IO: CoroutineDispatcher = DefaultScheduler.IO
      val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))


      public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
          require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
          return LimitingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
      }


      private class LimitingDispatcher(
          val dispatcher: ExperimentalCoroutineDispatcher,
          val parallelism: Int,
          override val taskMode: TaskMode
      ) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
          private val queue = ConcurrentLinkedQueue<Runnable>()
          private val inFlightTasks = atomic(0)
          override val executor: Executor
              get() = this
          override fun execute(command: Runnable) = dispatch(command, false)
          override fun close()Unit = error("Close cannot be invoked on LimitingBlockingDispatcher")
          override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
          private fun dispatch(block: Runnable, fair: Boolean) {
              var taskToSchedule = block
              while (true) {
                  // Commit in-flight tasks slot
                  val inFlight = inFlightTasks.incrementAndGet()
                  // Fast path, if parallelism limit is not reached, dispatch task and return
      if (inFlight <= parallelism) {
                      dispatcher.dispatchWithContext(taskToSchedule, this, fair)
                      return
                  }
      // Parallelism limit is reached, add task to the queue
                  queue.add(taskToSchedule)
                  if (inFlightTasks.decrementAndGet() >= parallelism) {
                      return
                  }
                  taskToSchedule = queue.poll() ?: return
              }
          }

      把主要代码都拿出来了,我们顺着上面顺序可以发现最后走到了queue.add(taskToSchedule),是不是有点线程池的既视感了!我们接着往下分析,上面只是介绍了添加任务的过程,即把任务包装成一个Runnable,然后把它丢到一个队列中去,这个设计和线程池是异曲同工的


      那这个队列里的任务是怎样执行的呢,可以看到主要通过dispatcher.dispatchWithContext(taskToSchedule, this, fair)这个函数,那dispatcher又是什么东东呢,我们来看一下:
        open class ExperimentalCoroutineDispatcher(
        private val corePoolSize: Int,
        private val maxPoolSize: Int,
        private val idleWorkerKeepAliveNs: Long,
        private val schedulerName: String = "CoroutineScheduler"
        ) : ExecutorCoroutineDispatcher() {
        constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
        ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

        @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
        constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE
        ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)

        override val executor: Executor
        get() = coroutineScheduler

        // This is variable for test purposes, so that we can reinitialize from clean state
        private var coroutineScheduler = createScheduler()


        override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
        coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
        DefaultExecutor.dispatch(context, block)
        }
        ...
        }

        上面是它的类定义,有核心池大小、最大池大小,是不是看的越来越像线程池了,继续看coroutineScheduler,因为最终是通过它来分发的:

          private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)internal class CoroutineScheduler(    private val corePoolSize: Int,    private val maxPoolSize: Int,    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,    private val schedulerName: String = DEFAULT_SCHEDULER_NAME) : Executor, Closeable {...}
          可以把感觉去掉了,它就是线程池!!

           

          我们继续往下看这个线程池的工作原理:
            internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean) {
            try {
            coroutineScheduler.dispatch(block, context, fair)
            } catch (e: RejectedExecutionException) {
            // Context shouldn't be lost here to properly invoke before/after task
            DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
            }
            }


            fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
            trackTask() // this is needed for virtual time support
            val task = createTask(block, taskContext)
            // try to submit the task to the local queue and act depending on the result
            when (submitToLocalQueue(task, fair)) {
            ADDED -> return
            NOT_ADDED -> {
            // try to offload task to global queue
            if (!globalQueue.addLast(task)) {
            // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
            throw RejectedExecutionException("$schedulerName was terminated")
            }
            requestCpuWorker()
            }
            else -> requestCpuWorker() // ask for help
            }
            }

            submitToLocalQueue就是任务的分发了,我们接着看它的分发过程:

              private fun submitToLocalQueue(task: Task, fair: Boolean): Int {
              val worker = currentWorker() ?: return NOT_ADDED
              /*
              * This worker could have been already terminated from this thread by close/shutdown and it should not
              * accept any more tasks into its local queue.
              */
              if (worker.state === WorkerState.TERMINATED) return NOT_ADDED
              var result = ADDED
              if (task.mode == TaskMode.NON_BLOCKING) {
              /*
              * If the worker is currently executing blocking task and tries to dispatch non-blocking task, it's one the following reasons:
              * 1) Blocking worker is finishing its block and resumes non-blocking continuation
              * 2) Blocking worker starts to create non-blocking jobs
              * First use-case is expected (as recommended way of using blocking contexts),
              * so we add non-blocking task to local queue, but also request CPU worker to mitigate second case
              */
              if (worker.isBlocking) {
              result = ADDED_REQUIRES_HELP
              } else {
              /*
              * If thread is not blocking, then it's just tries to finish its
              * local work in order to park (or grab another blocking task), do not add non-blocking tasks
              * to its local queue if it can't acquire CPU
              */
              val hasPermit = worker.tryAcquireCpuPermit()
              if (!hasPermit) {
              return NOT_ADDED
              }
              }
              }


              val noOffloadingHappened = if (fair) {
              worker.localQueue.addLast(task, globalQueue)
              } else {
              worker.localQueue.add(task, globalQueue)
              }
              if (noOffloadingHappened) {
              // When we're close to queue capacity, wake up anyone to steal work
              // Note: non-atomic bufferSize here is Ok (it is just a performance optimization)
              if (worker.localQueue.bufferSize > QUEUE_SIZE_OFFLOAD_THRESHOLD) {
              return ADDED_REQUIRES_HELP
              }
              return result
              }
              return ADDED_REQUIRES_HELP
              }

              可以看到最终走到了worker.localQueue.add(task, globalQueue),那这里的worker又是什么呢?

                private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }

                很清楚了,分发流程就是:
                • 优先考虑当前线程,即添加到当前线程的队列里面(如果当前线程与目标线程不一样,比如UI线程和异步线程,那会直接跳过当前线程),不过有几个条件限制,即当前线程非阻塞状态才行

                • 如果当前线程阻塞,那么添加到globalQueue里面,同时调用requestCpuWorker,这个函数就是线程池的基本使用了,corePoolSize内的线程始终保活,再多的话就进入等待队列,即globalQueue

                 

                3、总结

                经过上面简单分析,我们基本了解了协程里面的线程到底是怎么回事了,它其实就是把当前任务分发到线程池里面执行(UI线程只有一个),withContext和async都是这种原理,只不过withContext = async.await,即如果要并发就用async,要顺序执行就用withContext

                补充:

                • 协程和线程使用场景?

                  从源码来看,协程其实是帮我们封装了一个线程池并且提供了方便的线程切换API,所以使用协程和线程本质上是一样的,但是协程多了一套挂起机制,这个是线程所没有的特性

                • 个人使用来看,协程和kotlin配合是非常默契的,特别使用到Jetpack那套框架,基本不会去手动创建线程池和线程,一路都是协程来切换异步线程和UI线程

                • 非常适合协程的场景?

                  既然适合协程,肯定是用到了它的特性,首先是挂起机制,通常我们写带回调的函数需要处理同步问题,一般是使用锁等工具,但是使用协程挂起机制就很方便了,直接等到回调返回再继续往下执行,代码逻辑很清晰也不用担心同步问题和阻塞UI线程问题

                 

                  

                协程不阻塞线程原理

                前面介绍了协程具体执行任务的过程,那我们平时听到的不阻塞线程又是什么原因呢

                1、我们先来看一个挂起样例:
                  fun main(args: Array<String>) = runBlocking<Unit> { 
                  launch(Dispatchers.Unconfined) {
                         println("${Thread.currentThread().name} : launch start"
                         async(Dispatchers.Default) {
                             println("${Thread.currentThread().name} : async start"
                             delay(100
                             println("${Thread.currentThread().name} : async end")        
                         }.await()
                         println("${Thread.currentThread().name} : launch end")   
                      }
                  }
                  async在delay函数中被挂起,我们来看awit函数:
                    internal suspend fun awaitInternal(): Any? {
                    // fast-path -- check state (avoid extra object creation)
                    while (true) { // lock-free loop on state
                    val state = this.state
                    if (state !is Incomplete) {
                    // already complete -- just return result
                    if (state is CompletedExceptionally) { // Slow path to recover stacktrace
                    recoverAndThrow(state.cause)
                    }
                    return state.unboxState()
                    }
                    if (startInternal(state) >= 0) break // break unless needs to retry
                    }
                    return awaitSuspend() // slow-path
                    }


                    private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
                    /*
                    * Custom code here, so that parent coroutine that is using await
                    * on its child deferred (async) coroutine would throw the exception that this child had
                    * thrown and not a JobCancellationException.
                    */
                    val cont = AwaitContinuation(uCont.intercepted(), this)
                    cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler))
                    cont.getResult()
                    }

                    上面代码最关键的地方在于AwaitContinuation,看一下它的定义:

                      private class AwaitContinuation<T>(
                      delegate: Continuation<T>,
                      private val job: JobSupport
                      ) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
                      override fun getContinuationCancellationCause(parent: Job): Throwable {
                      val state = job.state
                      /*
                      * When the job we are waiting for had already completely completed exceptionally or
                      * is failing, we shall use its root/completion cause for await's result.
                      */
                      if (state is Finishing) state.rootCause?.let { return it }
                      if (state is CompletedExceptionally) return state.cause
                      return parent.getCancellationException()
                      }

                      protected override fun nameString(): String =
                      "AwaitContinuation"
                      }

                      焦点转移到CancellableContinuationImpl,这个类就是协程挂起的核心了,我们重点来看一下


                      2、CancellableContinuationImpl的工作原理:
                      如上所示,这个类其实是一个状态机,它有三种状态,任务第一次进来是UNDECIDED状态,这时候协程会调用trySuspend来进入下一个状态,即SUSPEND,等任务结束再调用tryResume来进入最后一个状态,即RESUME,当然如果任务过于简单,一进来就完成的话就直接进入RESUME状态

                       

                      3、trySuspend函数和tryResume函数:
                        private fun trySuspend(): Boolean {
                        _decision.loop { decision ->
                        when (decision) {
                        UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
                        RESUMED -> return false
                        else -> error("Already suspended")
                        }
                        }
                        }


                        private fun tryResume(): Boolean {
                        _decision.loop { decision ->
                        when (decision) {
                        UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
                        SUSPENDED -> return false
                        else -> error("Already resumed")
                        }
                        }
                        }
                        如果上面状态返回是SUSPEND,那么就会调用Continuation来进行挂起,这个又是什么东东呢,有点不好解释,我们先来看一下知乎上面的回答:

                        按照上面说法,简单来说就是遇到挂起状态时会将指令寄存器里的数据保存起来放到一个回调函数里面,这样当前线程就能继续执行接下来任务,即所谓不阻塞线程原理,当我们调用tryResume时候,协程会invoke之前保存的回调函数,让指令寄存器切换到之前的状态来继续执行剩下的任务,有点像so注入原理(^_^)

                         

                         

                        总结

                        Android使用Kotlin作为官方开发语言后,协程的使用得到了很大的推广,因为协程结合kotlin实在太方便了,代码看起来也更加清晰,因此对它的了解也变的重要了,当然不了解的情况下也可以使用,但是一旦遇到性能等其他比较疑惑的问题时就会变得迷茫,所以这篇文章主要为大家了解协程基本原理提供一个渠道


                        不过注意的是这边介绍的Kotlin协程并不是真正意义上的协程,只是因为Java底层不支持协程,所以通过封装线程池来创建一个伪协程供业务使用,不过这种伪协程反而结合了线程池优势,使Kotlin取代了Java线程池的使用,使用起来还是很方便的

                         

                        好了,今天的分享就到这了,下周继续~

                        文章转载自周末随心分享,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论