自己在公司做的一次分享,内容来自文末的各位大佬,稍微做了整合与修改,感谢。如果有问题,还请指出,感激不尽。
计算机领域中的协程
马尔文·康威于1958年发明了术语“coroutine”并用于构建汇编程序[1] ,关于协程最初的出版解说在1963年发表[2]
- 协程是一种在程序中处理并发任务的方案,也是这种方案的一个组件,与线程属于同一个层级的概念。协程中不存在线程,也不存在并行。
- 线程是操作系统能够进行运算调度的最小单位,需要来自操作系统的支持,线程的调度不能精确控制;线程的切换会产生性能消耗,于是才有了协程。
- 协程属于语言级别的调度算法实现
- 协程与线程的关系参看:WIKI百科
Kotlin中的协程
- Kotlin 协程就是一个基于Java Thread API封装的工具包,主要帮助我们轻松的写出复杂的并发代码。
- kotlin 协程相较于线程池,并没有什么性能上的优势。
- 非阻塞式挂起没什么特别的,Java 子线程也同样是非阻塞式的(非阻塞式实际上就是指切线程,线程切换了自然不会阻塞)
- 主要解决的问题是:回调地狱,在 Android 中带来的便利是:整合了生命周期的管理
Kotlin协程中有什么?
关键概念:协程上下文、协程启动模式、协程调度器、协程作用域。
先看最常用的的一个协程启动方法:
/** * Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job]. * The coroutine is cancelled when the resulting job is [cancelled][Job.cancel]. * * The coroutine context is inherited from a [CoroutineScope]. Additional context elements can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden * with a corresponding [coroutineContext] element. * * By default, the coroutine is immediately scheduled for execution. * Other start options can be specified via `start` parameter. See [CoroutineStart] for details. * An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case, * the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function * and will be started implicitly on the first invocation of [join][Job.join]. * * Uncaught exceptions in this coroutine cancel the parent job in the context by default * (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with * the context of another coroutine, then any uncaught exception leads to the cancellation of the parent coroutine. * * See [newCoroutineContext] for a description of debugging facilities that are available for a newly created coroutine. * * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. * @param start coroutine start option. The default value is [CoroutineStart.DEFAULT]. * @param block the coroutine code which will be invoked in the context of the provided scope. **/public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }
其中传入的三个参数分别是:协程上下文、协程启动模式、协程体。
协程上下文 CoroutineContext
一般情况下,默认为 EmptyCoroutineContext
。定义了一个可操作的集合。
/** * An empty coroutine context. */@SinceKotlin("1.3") public object EmptyCoroutineContext : CoroutineContext, Serializable { private const val serialVersionUID: Long = 0 private fun readResolve(): Any = EmptyCoroutineContext public override fun <E : Element> get(key: Key<E>): E? = null public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial public override fun plus(context: CoroutineContext): CoroutineContext = context public override fun minusKey(key: Key<*>): CoroutineContext = this public override fun hashCode(): Int = 0 public override fun toString(): String = "EmptyCoroutineContext" }
跟源码到最后,比如这条线路:
Dispatchers.Main -> MainDispatcherLoader.dispatcher -> loadMainDispatcher() -> abstract class MainCoroutineDispatcher : CoroutineDispatcher() -> abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor -> : CoroutineContext.Element
所有的上下文皆定义为 Element。通过查看继承可知,上下文用于指定相关功能模块。
协程启动模式 CoroutineStart
/** * Defines start options for coroutines builders. * It is used in `start` parameter of [launch][CoroutineScope.launch], [async][CoroutineScope.async], and other coroutine builder functions. * * The summary of coroutine start options is: * * [DEFAULT] -- immediately schedules coroutine for execution according to its context; * * [LAZY] -- starts coroutine lazily, only when it is needed; * * [ATOMIC] -- atomically (in a non-cancellable way) schedules coroutine for execution according to its context; * * [UNDISPATCHED] -- immediately executes coroutine until its first suspension point _in the current thread_. */public enum class CoroutineStart
模式 | 功能 |
---|---|
DEFAULT | 立即执行协程体 |
ATOMIC | 立即执行协程体,但在开始运行之前无法取消,也就是说最起码会执行协程体中的第一行代码。 |
UNDISPATCHED | 立即在当前线程执行协程体,直到第一个 suspend 调用 挂起之后,执行线程便取决于上下文当中的调度器。 |
LAZY | 只有在需要的情况下运行(懒加载) |
- DEFAULT 立即执行协程体
runBlocking { val job = GlobalScope.launch(start = CoroutineStart.DEFAULT) { println("1: " + Thread.currentThread().name) } // 不需要调用join方法 // job.join() }
打印结果
1: DefaultDispatcher-worker-1
CoroutineStart.DEFAULT
启动模式不需要手动调用join
或start
等方法,而是在调用launch
方法的时候就会自动执行协程体的代码。
- ATOMIC 立即执行协程体,但在开始运行之前无法取消, 即开启协程会无视
cancelling
状态
runBlocking { val job = GlobalScope.launch(start = CoroutineStart.ATOMIC) { println("1: " + Thread.currentThread().name) delay(1000) println("2: " + Thread.currentThread().name) } job.cancel() delay(2000) }
打印结果
1: DefaultDispatcher-worker-1
CoroutineStart. ATOMIC
启动模式的协程体 即使调了cancel
方法 也一定会执行,因为开启协程会无视cancelling
状态;
上面的example只打印了一句话,是因为执行delay(1000)
的时候 发现协程处于关闭状态, 所以出现了JobCancellationException
异常,导致下面的代码没有执行,如果 delay(1000)
这句代码用 try catch
捕获一下异常,就会继续执行下面的代码。(仅限于当前协程上下文)
- UNDISPATCHED 立即在当前线程执行协程体,直到第一个
suspend
调用 挂起之后,执行线程便取决于上下文当中的调度器。
runBlocking { println("0: " + Thread.currentThread().name) // 注意这里没有用GlobalScope.launch // 因为GlobalScope.launch启动的是一个顶层协程, 无法关联当前协程的上下文(coroutineContext), 导致结果有偏差 launch(context = Dispatchers.Default, start = CoroutineStart.UNDISPATCHED) { println("1: " + Thread.currentThread().name) delay(1000) println("2: " + Thread.currentThread().name) } delay(2000) }
打印结果
0: main 1: main 2: DefaultDispatcher-worker-1
可见 0 和 1 的执行线程是一样的,当执行完delay(1000)
, 后面的代码执行线程取决于Dispatchers.Default
调度器指定的线程,所以 2 在另一个线程中执行
- LAZY 只有在需要的情况下才执行协程体。
runBlocking { val job = GlobalScope.launch(start = CoroutineStart.LAZY) { println("1: " + Thread.currentThread().name) } // 一定调用join方法 job.join() }
打印结果
1: DefaultDispatcher-worker-1
CoroutineStart.LAZY
启动模式一定要手动调用join
或start
等方法,否者协程体不会执行。这里的join
,start
与线程中的意思相近。如果在协程体中使用join
会让出当前资源去执行协程。start 则是直接启动一个新的协程。– TODO
PS: 该方式可以用来替代AsyncTask
,提前准备好Job
,需要时再运行。
协程调度器 Dispatchers
其实就是线程调度器,在上下文中提过,实际上也是上下文的一种,继承自CoroutineContext.Element
。
用来指定协程代码块在哪个线程中执行,kotlin
提供了几个默认的协程调度器,分别是Default
、Main
、Unconfined
, 并针对jvm
, kotlin
提供了一个特有的IO
调度器。
- Dispatchers.Default 指定代码块在线程池中执行,该线程池由协程框架进行维护。
GlobalScope.launch(Dispatchers.Default) { println("1: " + Thread.currentThread().name) launch (Dispatchers.Default) { delay(1000) // 延迟1秒后,再继续执行下面的代码 println("2: " + Thread.currentThread().name) } println("3: " + Thread.currentThread().name) }
打印结果如下
1: DefaultDispatcher-worker-1 3: DefaultDispatcher-worker-1 2: DefaultDispatcher-worker-1
- Dispatchers.Main 指定代码块在
main
线程中执行(针对Android就是ui
线程)
GlobalScope.launch(Dispatchers.Default) { println("1: " + Thread.currentThread().name) launch (Dispatchers.Main) { delay(1000) // 延迟1秒后,再继续执行下面的代码 println("2: " + Thread.currentThread().name) } println("3: " + Thread.currentThread().name) }
打印结果如下:
1: DefaultDispatcher-worker-1 3: DefaultDispatcher-worker-1 2: main
可见Dispatchers.Main
就是指定协程代码块在main
线程中执行
- Dispatchers.Unconfined 没有指定协程代码快在哪个特定线程中执行,即当前在哪个线程,代码块中接下来的代码就在哪个线程中执行(即一段协程代码块 由于启动了子协程 导致切换了线程, 那么接下来的代码块也是在这个线程中执行)
GlobalScope.launch(Dispatchers.Default) { println("1: " + Thread.currentThread().name) launch (Dispatchers.Unconfined) { println("2: " + Thread.currentThread().name) requestApi() // delay(1000) 本来想用delay,但是使用requestApi 可能更加清晰 println("3: " + Thread.currentThread().name) } println("4: " + Thread.currentThread().name) } // 定义一个挂起函数,在一个新的子线程中执行 private suspend fun requestApi() = suspendCancellableCoroutine<String> { Thread { println("5: requestApi: " + Thread.currentThread().name) it.resume("success") }.start() }
打印结果如下:
1: DefaultDispatcher-worker-1 2: DefaultDispatcher-worker-1 5: requestApi: Thread-3 4: DefaultDispatcher-worker-1 3: Thread-3
可见2 和 3的代码 执行线程明显不一样;当执行到requestApi
这句代码的时候 会切换到子线程(即Thread-3
)中执行代码,然后接下来的协程代码块就会在Thread-3
中执行。
协程作用域 GlobalScope、coroutineScope、supervisorScope
- GlobeScope
GlobeScope
启动的协程会单独启动一个作用域,无法继承外面协程的作用域,其内部的子协程遵从默认的作用域规则。仅在Android中可用,绑定整个应用生命周期。
GlobalScope.launch { //launch top-level coroutines }
- coroutineScope
coroutineScope
启动的协程会继承父协程的作用域,其内部的取消操作是双向传播的,子协程未捕获的异常也会向上传递给父协程。
val handler = CoroutineExceptionHandler { _, exception -> Log.d(TAG, "Caught: $exception") } runBlocking { Log.d(TAG, "CoroutineScope: 1") try { coroutineScope { launch(/*handler 上抛时此处异常捕获无效 需父级协程处理 */) { Log.d(TAG, "CoroutineScope: 2") throw NullPointerException("Exception") } launch { Log.d(TAG, "CoroutineScope: 3") } Log.d(TAG, "CoroutineScope: 4") } } catch (e: Exception) { Log.d(TAG, "CoroutineScope: $e") } }
打印结果如下:
CoroutineScope: 1 CoroutineScope: 4 CoroutineScope: 2 CoroutineScope: java.lang.NullPointerException: Exception
- supervisorScope
supervisorScope
启动的协程会继承父协程的作用域,他跟coroutineScope
不一样的点是 它是单向传递的,即内部的取消操作和异常传递 只能由父协程向子协程传播,不能从子协程传向父协程
val handler = CoroutineExceptionHandler { _, exception -> Log.d(TAG, "Caught: $exception") } runBlocking { Log.d(TAG, "CoroutineScope: 1") supervisorScope { Log.d(TAG, "CoroutineScope: 2") launch(handler/*下抛时 此处会捕获*/) { Log.d(TAG, "CoroutineScope: 3") throw NullPointerException("Exception") } launch { Log.d(TAG, "CoroutineScope: 4") } Log.d(TAG, "CoroutineScope: 5") } }
打印结果如下:
CoroutineScope: 1 CoroutineScope: 2 CoroutineScope: 5 CoroutineScope: 3 Caught: java.lang.NullPointerException: Exception CoroutineScope: 4
这里我们再玩复杂一点:
val handler = CoroutineExceptionHandler { _, exception -> Log.d(TAG, "Caught: $exception") } GlobalScope.launch { Log.d(TAG, "CoroutineScope: 1") supervisorScope { Log.d(TAG, "CoroutineScope: 2") launch(handler) { Log.d(TAG, "CoroutineScope: 3") launch { Log.d(TAG, "CoroutineScope: 4") } throw NullPointerException("Exception") } launch { Log.d(TAG, "CoroutineScope: 5") } Log.d(TAG, "CoroutineScope: 6") } }
首先,觉得异常会被捕获吗?在子协程去捕获(4所处协程加上异常捕获,其余地方的捕获去除)行吗?
第一个答案是会被捕获,第二个答案是:不行。
这里的打印结果是:
CoroutineScope: 1 CoroutineScope: 2 CoroutineScope: 3 CoroutineScope: 6 CoroutineScope: 4 Caught: java.lang.NullPointerException: Exception CoroutineScope: 5
相信很容易看出差别,就是5不会被影响中断。
这里有个常用的协程,Android特有,MainScope,简单看下源码:
/** * Creates the main [CoroutineScope] for UI components. * * Example of use: * ``` * class MyAndroidActivity { * private val scope = MainScope() * * override fun onDestroy() { * super.onDestroy() * scope.cancel() * } * } * * ``` * * The resulting scope has [SupervisorJob] and [Dispatchers.Main] context elements. * If you want to append additional elements to the main scope, use [CoroutineScope.plus] operator: * `val scope = MainScope() + CoroutineName("MyActivity")`. */ @Suppress("FunctionName") public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
MainScope
就是使用的supervisorScope
作用域,所以子协程 出错 或 cancel
并不会影响父协程,从而也不会影响兄弟协程。
Kotlin协程怎么用
协程用起来很简单,确定上下文(要不要返回值,要不要延迟执行),需要做什么,在什么线程做即可。
协程的启动
协程的启动有三种,被成为coroutine builders
:
runBlocking
launch
async
runBlocking
启动的内部协程,会阻塞当前线程,直到block内的协程执行结束。常用的协程函数调用方式,一般用于写例子,直接在java环境运行。
在Android环境的主线程使用则会阻塞主线程。
runBlocking { //直接启动 }
launch
最常用的启动协程的方式;不会阻塞当前调用线程;
创建协程只能运行一个任务, 返回Job
对象,可操作Job
, 但不是业务相关的返回值.
GlobalScope.launch { //启动一个与app同生命周期的协程 }
也可以像使用一个线程池一样去创建:
val mainScope = MainScope() mainScope.launch { //job1 } mainScope.launch { //job2 } //destory时取消所有任务 mainScope.cancel()
//3.1 自定义 val scope = CoroutineScope(Job() + Dispatchers.IO) //看源码会知道,可以省略Job() 这样也是可以的 val scope = CoroutineScope(Dispatchers.IO) scope.launch { //job1 } scope.launch { //job2 } scope.cancel()
在生命周期对象中,扩展函数库支持:
lifecycleScope.launch{ //启动一个与当前组件生命周期相同的协程 如果是activity则跟随activity fragment则跟随fragment }
在viewmodel中:
viewModelScope.launch { //跟随viewModel生命周期,而viewModel跟随创建viewModel的生命周期对象。 }
async
类似于launch,启动时跟随当前环境的线程,如果在主线程,则async
也会在主线程,但不会阻塞;可用于多并发;await合并结果。
async
协程构建器, 它返回一个Deferred
—— 一个轻量级的非阻塞future
, 这代表将会在稍后提供一个带结果的 promise
。可以使用 .await()
在一个延迟的位置得到它的最终结果, 因为 Deferred
继承自 Job
,所以如果需要的话,可以取消它。
val runBlocking = runBlocking { async { "res" } } //runBlocking 内容将会是 "res"
怎么切线程
withContext 直接用:
runBlocking { withContext(Dispatchers.IO){ //do on IO } withContext(Dispatchers.Main){ //do on Main } }
或者写两个挂起函数:
suspend fun <T> withUI(block: suspend CoroutineScope.() -> T): T { return withContext(Dispatchers.Main, block) } suspend fun <T> withIO(block: suspend CoroutineScope.() -> T): T { return withContext(Dispatchers.IO, block) }
用的时候:
runBlocking { withIO { //do on IO } withUI { //do on Main } }
挂起函数 suspend fun
关键字 suspend,用于将代码分离,降低耦合,抽象处理,在suspend 修饰的方法中,可以调用协程上下文的方法。
线程的代码在到达 suspend
函数的时候被掐断,接下来协程会从这个 suspend
函数开始继续往下执行,执行完了再回来执行之前协程的方法。如果不使用挂起函数,则会将当前协程跑完再去执行其他协程。简单来说:挂起,就是一个稍后会被自动切回来的线程调度操作。
实现原理:状态转移 参考:协程状态机动画演示 状态机WIKI
常见挂起函数:
withContext
delay
withTimeout
写法:
private suspend fun suspendJob() { withContext(Dispatchers.IO) { suspendCoroutine<Boolean> { //do sth } } }
并发async
上面有提到async
,这里给出一个case,并发并等待结果。
runBlocking { val one = async { /*network api1*/ Log.d(TAG, "async: 1 start") delay(1000) Log.d(TAG, "async: 1 end") return@async "code1" } val two = async { /*network api2*/ Log.d(TAG, "async: 2 start") delay(3000) Log.d(TAG, "async: 2 end") return@async "code2" } val same = one.await() == two.await() Log.d(TAG, "res is the same ? $same") }
打印结果:
10:36:44.177 : async: 1 start 10:36:44.178 : async: 2 start 10:36:45.179 : async: 1 end 10:36:47.179 : async: 2 end 10:36:47.180 : res is the same ? false
异常处理
可在异常抛出的位置直接try catch,也可使用 CoroutineExceptionHandler。注意不要包裹在launch{ }
代码块外,将无法捕捉异常。
runBlocking { try { throw Exception("Exception") } catch (e: Exception) { Log.e(TAG, "Exception ", e) } }
使用 CoroutineExceptionHandler:
fun CoroutineScope.launchWithExceptionCatch(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> Unit) { launch(context + CoroutineExceptionHandler { coroutineContext, throwable -> if (throwable is CancellationException) return@CoroutineExceptionHandler Log.e(TAG, "block context:${coroutineContext} exception:$throwable") }) { block() } }
async则有点特殊,async
是一个期望由用户来处理异常的方法, 另外它的异常是当客户端调用await()
时才会抛出,
val deferred = GlobalScope.async { println("Throwing exception from async") throw ArithmeticException() // 没有打印任何东西,依赖用户去调用 await 时处理 } try { //捕获点在这里 deferred.await() println("Unreached") } catch (e: ArithmeticException) { println("Caught ArithmeticException") }
特殊挂起函数 suspendCoroutine
从执行机制上来讲,跟回调没有什么本质的区别。本质上是为了解决 Java 回调接口与协程的衔接问题。
suspend fun suspendScope(): Boolean { return suspendCoroutine<Boolean> { suspend -> someSdkCallBack { suspend.resume(it) //suspend.resumeWithException() } } }
这样用起来,就可以将一个异步回调接口变成一个同步调用的挂起函数。
这里提一个坑,需要注意的是:withTimeout 与 suspendCoroutine 共同使用时的阻塞问题。
withTimeout(3000){ //如果 someSdkCallBack 没有切换线程 则 withTimeout 将无效 //原因是 someSdkCallBack 会阻塞当前线程 withTimeout无法执行 suspendCoroutine<Boolean> { suspend -> someSdkCallBack { suspend.resume(it) } } }
正确用法应该是:
withTimeout(2000L) { GlobalScope.launch { suspendCoroutine<String> { it.resume(time().get()) } }.join() }
val d = async { suspendCoroutine<String> { it.resume(time().get()) } } withTimeout(2000) { d.await() }
lifecycleScope原理
如果自己实现,原始的代码:
lifecycle.addObserver(object : LifecycleEventObserver { override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) { //接收生命周期变化 when (event) { Lifecycle.Event.ON_CREATE -> { } Lifecycle.Event.ON_START -> { } Lifecycle.Event.ON_RESUME -> { } Lifecycle.Event.ON_PAUSE -> { } Lifecycle.Event.ON_STOP -> { } Lifecycle.Event.ON_DESTROY -> { } else -> { } } } })
lifecycleScope
源码:
/** * [CoroutineScope] tied to this [Lifecycle]. * * This scope will be cancelled when the [Lifecycle] is destroyed. * * This scope is bound to * [Dispatchers.Main.immediate][kotlinx.coroutines.MainCoroutineDispatcher.immediate] */val Lifecycle.coroutineScope: LifecycleCoroutineScope get() { while (true) { val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl? if (existing != null) { return existing } val newScope = LifecycleCoroutineScopeImpl( this, SupervisorJob() + Dispatchers.Main.immediate ) if (mInternalScopeRef.compareAndSet(null, newScope)) { newScope.register() return newScope } } }
看下 LifecycleCoroutineScopeImpl
internal class LifecycleCoroutineScopeImpl( override val lifecycle: Lifecycle, override val coroutineContext: CoroutineContext ) : LifecycleCoroutineScope(), LifecycleEventObserver { init { // in case we are initialized on a non-main thread, make a best effort check before // we return the scope. This is not sync but if developer is launching on a non-main // dispatcher, they cannot be 100% sure anyways. if (lifecycle.currentState == Lifecycle.State.DESTROYED) { coroutineContext.cancel() } } fun register() { launch(Dispatchers.Main.immediate) { if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) { lifecycle.addObserver(this@LifecycleCoroutineScopeImpl) } else { coroutineContext.cancel() } } } override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) { if (lifecycle.currentState <= Lifecycle.State.DESTROYED) { lifecycle.removeObserver(this) coroutineContext.cancel() } } }
清晰明了。
协程高级用法
Channel 队列
它是并发安全的,用来连接协程,实现不同协程的通信,参考:
Flow 流
与 RxJava 相似,Flow
就是 Kotlin 协程与响应式编程模型结合的产物。参考:
两者均包含实验性API,在协程1.4可能会发生变化。
参考
其他关于 AAC 架构的 KTX 扩展库:
https://developer.android.com/kotlin/ktx
参考以及推荐文章:
破解 Kotlin 协程 之:
本站由以下主机服务商提供服务支持:
世上最可爱的美少女
牛批
Mosaic-C
谢谢世上最可爱的美少女的认可呀~