自己在公司做的一次分享,内容来自文末的各位大佬,稍微做了整合与修改,感谢。如果有问题,还请指出,感激不尽。
计算机领域中的协程
马尔文·康威于1958年发明了术语“coroutine”并用于构建汇编程序[1] ,关于协程最初的出版解说在1963年发表[2]
- 协程是一种在程序中处理并发任务的方案,也是这种方案的一个组件,与线程属于同一个层级的概念。协程中不存在线程,也不存在并行。
- 线程是操作系统能够进行运算调度的最小单位,需要来自操作系统的支持,线程的调度不能精确控制;线程的切换会产生性能消耗,于是才有了协程。
- 协程属于语言级别的调度算法实现
- 协程与线程的关系参看:WIKI百科
Kotlin中的协程
- Kotlin 协程就是一个基于Java Thread API封装的工具包,主要帮助我们轻松的写出复杂的并发代码。
- kotlin 协程相较于线程池,并没有什么性能上的优势。
- 非阻塞式挂起没什么特别的,Java 子线程也同样是非阻塞式的(非阻塞式实际上就是指切线程,线程切换了自然不会阻塞)
- 主要解决的问题是:回调地狱,在 Android 中带来的便利是:整合了生命周期的管理
Kotlin协程中有什么?
关键概念:协程上下文、协程启动模式、协程调度器、协程作用域。
先看最常用的的一个协程启动方法:
/**
* Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job].
* The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
*
* The coroutine context is inherited from a [CoroutineScope]. Additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
* with a corresponding [coroutineContext] element.
*
* By default, the coroutine is immediately scheduled for execution.
* Other start options can be specified via `start` parameter. See [CoroutineStart] for details.
* An optional [start] parameter can be set to [CoroutineStart.LAZY] to start coroutine _lazily_. In this case,
* the coroutine [Job] is created in _new_ state. It can be explicitly started with [start][Job.start] function
* and will be started implicitly on the first invocation of [join][Job.join].
*
* Uncaught exceptions in this coroutine cancel the parent job in the context by default
* (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used with
* the context of another coroutine, then any uncaught exception leads to the cancellation of the parent coroutine.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for a newly created coroutine.
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
* @param block the coroutine code which will be invoked in the context of the provided scope.
**/public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}其中传入的三个参数分别是:协程上下文、协程启动模式、协程体。
协程上下文 CoroutineContext
一般情况下,默认为 EmptyCoroutineContext。定义了一个可操作的集合。
/**
* An empty coroutine context.
*/@SinceKotlin("1.3")
public object EmptyCoroutineContext : CoroutineContext, Serializable {
private const val serialVersionUID: Long = 0
private fun readResolve(): Any = EmptyCoroutineContext
public override fun <E : Element> get(key: Key<E>): E? = null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
public override fun plus(context: CoroutineContext): CoroutineContext = context
public override fun minusKey(key: Key<*>): CoroutineContext = this
public override fun hashCode(): Int = 0
public override fun toString(): String = "EmptyCoroutineContext"
}跟源码到最后,比如这条线路:
Dispatchers.Main -> MainDispatcherLoader.dispatcher -> loadMainDispatcher() -> abstract class MainCoroutineDispatcher : CoroutineDispatcher() -> abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor -> : CoroutineContext.Element 所有的上下文皆定义为 Element。通过查看继承可知,上下文用于指定相关功能模块。
协程启动模式 CoroutineStart
/** * Defines start options for coroutines builders. * It is used in `start` parameter of [launch][CoroutineScope.launch], [async][CoroutineScope.async], and other coroutine builder functions. * * The summary of coroutine start options is: * * [DEFAULT] -- immediately schedules coroutine for execution according to its context; * * [LAZY] -- starts coroutine lazily, only when it is needed; * * [ATOMIC] -- atomically (in a non-cancellable way) schedules coroutine for execution according to its context; * * [UNDISPATCHED] -- immediately executes coroutine until its first suspension point _in the current thread_. */public enum class CoroutineStart
| 模式 | 功能 |
|---|---|
| DEFAULT | 立即执行协程体 |
| ATOMIC | 立即执行协程体,但在开始运行之前无法取消,也就是说最起码会执行协程体中的第一行代码。 |
| UNDISPATCHED | 立即在当前线程执行协程体,直到第一个 suspend 调用 挂起之后,执行线程便取决于上下文当中的调度器。 |
| LAZY | 只有在需要的情况下运行(懒加载) |
- DEFAULT 立即执行协程体
runBlocking {
val job = GlobalScope.launch(start = CoroutineStart.DEFAULT) {
println("1: " + Thread.currentThread().name)
}
// 不需要调用join方法
// job.join()
}打印结果
1: DefaultDispatcher-worker-1
CoroutineStart.DEFAULT启动模式不需要手动调用join或start等方法,而是在调用launch方法的时候就会自动执行协程体的代码。
- ATOMIC 立即执行协程体,但在开始运行之前无法取消, 即开启协程会无视
cancelling状态
runBlocking {
val job = GlobalScope.launch(start = CoroutineStart.ATOMIC) {
println("1: " + Thread.currentThread().name)
delay(1000)
println("2: " + Thread.currentThread().name)
}
job.cancel()
delay(2000)
}打印结果
1: DefaultDispatcher-worker-1
CoroutineStart. ATOMIC启动模式的协程体 即使调了cancel方法 也一定会执行,因为开启协程会无视cancelling状态;
上面的example只打印了一句话,是因为执行delay(1000)的时候 发现协程处于关闭状态, 所以出现了JobCancellationException异常,导致下面的代码没有执行,如果 delay(1000) 这句代码用 try catch 捕获一下异常,就会继续执行下面的代码。(仅限于当前协程上下文)
- UNDISPATCHED 立即在当前线程执行协程体,直到第一个
suspend调用 挂起之后,执行线程便取决于上下文当中的调度器。
runBlocking {
println("0: " + Thread.currentThread().name)
// 注意这里没有用GlobalScope.launch
// 因为GlobalScope.launch启动的是一个顶层协程, 无法关联当前协程的上下文(coroutineContext), 导致结果有偏差
launch(context = Dispatchers.Default, start = CoroutineStart.UNDISPATCHED) {
println("1: " + Thread.currentThread().name)
delay(1000)
println("2: " + Thread.currentThread().name)
}
delay(2000)
}打印结果
0: main 1: main 2: DefaultDispatcher-worker-1
可见 0 和 1 的执行线程是一样的,当执行完delay(1000), 后面的代码执行线程取决于Dispatchers.Default调度器指定的线程,所以 2 在另一个线程中执行
- LAZY 只有在需要的情况下才执行协程体。
runBlocking {
val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
println("1: " + Thread.currentThread().name)
}
// 一定调用join方法
job.join()
}打印结果
1: DefaultDispatcher-worker-1
CoroutineStart.LAZY启动模式一定要手动调用join或start等方法,否者协程体不会执行。这里的join,start与线程中的意思相近。如果在协程体中使用join会让出当前资源去执行协程。start 则是直接启动一个新的协程。– TODO
PS: 该方式可以用来替代AsyncTask,提前准备好Job,需要时再运行。
协程调度器 Dispatchers
其实就是线程调度器,在上下文中提过,实际上也是上下文的一种,继承自CoroutineContext.Element。
用来指定协程代码块在哪个线程中执行,kotlin提供了几个默认的协程调度器,分别是Default、Main、Unconfined, 并针对jvm, kotlin提供了一个特有的IO调度器。
- Dispatchers.Default 指定代码块在线程池中执行,该线程池由协程框架进行维护。
GlobalScope.launch(Dispatchers.Default) {
println("1: " + Thread.currentThread().name)
launch (Dispatchers.Default) {
delay(1000) // 延迟1秒后,再继续执行下面的代码
println("2: " + Thread.currentThread().name)
}
println("3: " + Thread.currentThread().name)
}打印结果如下
1: DefaultDispatcher-worker-1 3: DefaultDispatcher-worker-1 2: DefaultDispatcher-worker-1
- Dispatchers.Main 指定代码块在
main线程中执行(针对Android就是ui线程)
GlobalScope.launch(Dispatchers.Default) {
println("1: " + Thread.currentThread().name)
launch (Dispatchers.Main) {
delay(1000) // 延迟1秒后,再继续执行下面的代码
println("2: " + Thread.currentThread().name)
}
println("3: " + Thread.currentThread().name)
}打印结果如下:
1: DefaultDispatcher-worker-1 3: DefaultDispatcher-worker-1 2: main
可见Dispatchers.Main就是指定协程代码块在main线程中执行
- Dispatchers.Unconfined 没有指定协程代码快在哪个特定线程中执行,即当前在哪个线程,代码块中接下来的代码就在哪个线程中执行(即一段协程代码块 由于启动了子协程 导致切换了线程, 那么接下来的代码块也是在这个线程中执行)
GlobalScope.launch(Dispatchers.Default) {
println("1: " + Thread.currentThread().name)
launch (Dispatchers.Unconfined) {
println("2: " + Thread.currentThread().name)
requestApi() // delay(1000) 本来想用delay,但是使用requestApi 可能更加清晰
println("3: " + Thread.currentThread().name)
}
println("4: " + Thread.currentThread().name)
}
// 定义一个挂起函数,在一个新的子线程中执行
private suspend fun requestApi() = suspendCancellableCoroutine<String> {
Thread {
println("5: requestApi: " + Thread.currentThread().name)
it.resume("success")
}.start()
}打印结果如下:
1: DefaultDispatcher-worker-1 2: DefaultDispatcher-worker-1 5: requestApi: Thread-3 4: DefaultDispatcher-worker-1 3: Thread-3
可见2 和 3的代码 执行线程明显不一样;当执行到requestApi这句代码的时候 会切换到子线程(即Thread-3)中执行代码,然后接下来的协程代码块就会在Thread-3中执行。
协程作用域 GlobalScope、coroutineScope、supervisorScope
- GlobeScope
GlobeScope启动的协程会单独启动一个作用域,无法继承外面协程的作用域,其内部的子协程遵从默认的作用域规则。仅在Android中可用,绑定整个应用生命周期。
GlobalScope.launch {
//launch top-level coroutines
}- coroutineScope
coroutineScope启动的协程会继承父协程的作用域,其内部的取消操作是双向传播的,子协程未捕获的异常也会向上传递给父协程。
val handler = CoroutineExceptionHandler { _, exception ->
Log.d(TAG, "Caught: $exception")
}
runBlocking {
Log.d(TAG, "CoroutineScope: 1")
try {
coroutineScope {
launch(/*handler 上抛时此处异常捕获无效 需父级协程处理 */) {
Log.d(TAG, "CoroutineScope: 2")
throw NullPointerException("Exception")
}
launch {
Log.d(TAG, "CoroutineScope: 3")
}
Log.d(TAG, "CoroutineScope: 4")
}
} catch (e: Exception) {
Log.d(TAG, "CoroutineScope: $e")
}
}打印结果如下:
CoroutineScope: 1 CoroutineScope: 4 CoroutineScope: 2 CoroutineScope: java.lang.NullPointerException: Exception
- supervisorScope
supervisorScope启动的协程会继承父协程的作用域,他跟coroutineScope不一样的点是 它是单向传递的,即内部的取消操作和异常传递 只能由父协程向子协程传播,不能从子协程传向父协程
val handler = CoroutineExceptionHandler { _, exception ->
Log.d(TAG, "Caught: $exception")
}
runBlocking {
Log.d(TAG, "CoroutineScope: 1")
supervisorScope {
Log.d(TAG, "CoroutineScope: 2")
launch(handler/*下抛时 此处会捕获*/) {
Log.d(TAG, "CoroutineScope: 3")
throw NullPointerException("Exception")
}
launch {
Log.d(TAG, "CoroutineScope: 4")
}
Log.d(TAG, "CoroutineScope: 5")
}
}打印结果如下:
CoroutineScope: 1 CoroutineScope: 2 CoroutineScope: 5 CoroutineScope: 3 Caught: java.lang.NullPointerException: Exception CoroutineScope: 4
这里我们再玩复杂一点:
val handler = CoroutineExceptionHandler { _, exception ->
Log.d(TAG, "Caught: $exception")
}
GlobalScope.launch {
Log.d(TAG, "CoroutineScope: 1")
supervisorScope {
Log.d(TAG, "CoroutineScope: 2")
launch(handler) {
Log.d(TAG, "CoroutineScope: 3")
launch {
Log.d(TAG, "CoroutineScope: 4")
}
throw NullPointerException("Exception")
}
launch {
Log.d(TAG, "CoroutineScope: 5")
}
Log.d(TAG, "CoroutineScope: 6")
}
}首先,觉得异常会被捕获吗?在子协程去捕获(4所处协程加上异常捕获,其余地方的捕获去除)行吗?
第一个答案是会被捕获,第二个答案是:不行。
这里的打印结果是:
CoroutineScope: 1 CoroutineScope: 2 CoroutineScope: 3 CoroutineScope: 6 CoroutineScope: 4 Caught: java.lang.NullPointerException: Exception CoroutineScope: 5
相信很容易看出差别,就是5不会被影响中断。
这里有个常用的协程,Android特有,MainScope,简单看下源码:
/**
* Creates the main [CoroutineScope] for UI components.
*
* Example of use:
* ```
* class MyAndroidActivity {
* private val scope = MainScope()
*
* override fun onDestroy() {
* super.onDestroy()
* scope.cancel()
* }
* }
*
* ```
*
* The resulting scope has [SupervisorJob] and [Dispatchers.Main] context elements.
* If you want to append additional elements to the main scope, use [CoroutineScope.plus] operator:
* `val scope = MainScope() + CoroutineName("MyActivity")`.
*/ @Suppress("FunctionName")
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)MainScope 就是使用的supervisorScope作用域,所以子协程 出错 或 cancel 并不会影响父协程,从而也不会影响兄弟协程。
Kotlin协程怎么用
协程用起来很简单,确定上下文(要不要返回值,要不要延迟执行),需要做什么,在什么线程做即可。
协程的启动
协程的启动有三种,被成为coroutine builders:
runBlockinglaunchasync
runBlocking
启动的内部协程,会阻塞当前线程,直到block内的协程执行结束。常用的协程函数调用方式,一般用于写例子,直接在java环境运行。
在Android环境的主线程使用则会阻塞主线程。
runBlocking {
//直接启动
}launch
最常用的启动协程的方式;不会阻塞当前调用线程;
创建协程只能运行一个任务, 返回Job对象,可操作Job, 但不是业务相关的返回值.
GlobalScope.launch {
//启动一个与app同生命周期的协程
}也可以像使用一个线程池一样去创建:
val mainScope = MainScope()
mainScope.launch {
//job1
}
mainScope.launch {
//job2
}
//destory时取消所有任务
mainScope.cancel()
//3.1 自定义
val scope = CoroutineScope(Job() + Dispatchers.IO)
//看源码会知道,可以省略Job() 这样也是可以的 val scope = CoroutineScope(Dispatchers.IO)
scope.launch {
//job1
}
scope.launch {
//job2
}
scope.cancel()在生命周期对象中,扩展函数库支持:
lifecycleScope.launch{
//启动一个与当前组件生命周期相同的协程 如果是activity则跟随activity fragment则跟随fragment
}在viewmodel中:
viewModelScope.launch {
//跟随viewModel生命周期,而viewModel跟随创建viewModel的生命周期对象。
}async
类似于launch,启动时跟随当前环境的线程,如果在主线程,则async也会在主线程,但不会阻塞;可用于多并发;await合并结果。
async协程构建器, 它返回一个Deferred —— 一个轻量级的非阻塞future, 这代表将会在稍后提供一个带结果的 promise。可以使用 .await() 在一个延迟的位置得到它的最终结果, 因为 Deferred 继承自 Job,所以如果需要的话,可以取消它。
val runBlocking = runBlocking {
async {
"res"
}
}
//runBlocking 内容将会是 "res"怎么切线程
withContext 直接用:
runBlocking {
withContext(Dispatchers.IO){
//do on IO
}
withContext(Dispatchers.Main){
//do on Main
}
}或者写两个挂起函数:
suspend fun <T> withUI(block: suspend CoroutineScope.() -> T): T {
return withContext(Dispatchers.Main, block)
}
suspend fun <T> withIO(block: suspend CoroutineScope.() -> T): T {
return withContext(Dispatchers.IO, block)
}用的时候:
runBlocking {
withIO {
//do on IO
}
withUI {
//do on Main
}
}挂起函数 suspend fun
关键字 suspend,用于将代码分离,降低耦合,抽象处理,在suspend 修饰的方法中,可以调用协程上下文的方法。
线程的代码在到达 suspend 函数的时候被掐断,接下来协程会从这个 suspend 函数开始继续往下执行,执行完了再回来执行之前协程的方法。如果不使用挂起函数,则会将当前协程跑完再去执行其他协程。简单来说:挂起,就是一个稍后会被自动切回来的线程调度操作。
实现原理:状态转移 参考:协程状态机动画演示 状态机WIKI
常见挂起函数:
withContextdelaywithTimeout
写法:
private suspend fun suspendJob() {
withContext(Dispatchers.IO) {
suspendCoroutine<Boolean> {
//do sth
}
}
}并发async
上面有提到async,这里给出一个case,并发并等待结果。
runBlocking {
val one = async {
/*network api1*/
Log.d(TAG, "async: 1 start")
delay(1000)
Log.d(TAG, "async: 1 end")
return@async "code1"
}
val two = async {
/*network api2*/
Log.d(TAG, "async: 2 start")
delay(3000)
Log.d(TAG, "async: 2 end")
return@async "code2"
}
val same = one.await() == two.await()
Log.d(TAG, "res is the same ? $same")
}打印结果:
10:36:44.177 : async: 1 start 10:36:44.178 : async: 2 start 10:36:45.179 : async: 1 end 10:36:47.179 : async: 2 end 10:36:47.180 : res is the same ? false
异常处理
可在异常抛出的位置直接try catch,也可使用 CoroutineExceptionHandler。注意不要包裹在launch{ }代码块外,将无法捕捉异常。
runBlocking {
try {
throw Exception("Exception")
} catch (e: Exception) {
Log.e(TAG, "Exception ", e)
}
}使用 CoroutineExceptionHandler:
fun CoroutineScope.launchWithExceptionCatch(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> Unit) {
launch(context + CoroutineExceptionHandler { coroutineContext, throwable ->
if (throwable is CancellationException) return@CoroutineExceptionHandler
Log.e(TAG, "block context:${coroutineContext} exception:$throwable")
}) {
block()
}
}async则有点特殊,async是一个期望由用户来处理异常的方法, 另外它的异常是当客户端调用await()时才会抛出,
val deferred = GlobalScope.async {
println("Throwing exception from async")
throw ArithmeticException() // 没有打印任何东西,依赖用户去调用 await 时处理
}
try {
//捕获点在这里
deferred.await()
println("Unreached")
} catch (e: ArithmeticException) {
println("Caught ArithmeticException")
}特殊挂起函数 suspendCoroutine
从执行机制上来讲,跟回调没有什么本质的区别。本质上是为了解决 Java 回调接口与协程的衔接问题。
suspend fun suspendScope(): Boolean {
return suspendCoroutine<Boolean> { suspend ->
someSdkCallBack {
suspend.resume(it)
//suspend.resumeWithException()
}
}
}这样用起来,就可以将一个异步回调接口变成一个同步调用的挂起函数。
这里提一个坑,需要注意的是:withTimeout 与 suspendCoroutine 共同使用时的阻塞问题。
withTimeout(3000){
//如果 someSdkCallBack 没有切换线程 则 withTimeout 将无效
//原因是 someSdkCallBack 会阻塞当前线程 withTimeout无法执行
suspendCoroutine<Boolean> { suspend ->
someSdkCallBack {
suspend.resume(it)
}
}
}正确用法应该是:
withTimeout(2000L) {
GlobalScope.launch {
suspendCoroutine<String> {
it.resume(time().get())
}
}.join()
}
val d = async {
suspendCoroutine<String> {
it.resume(time().get())
}
}
withTimeout(2000) { d.await() }lifecycleScope原理
如果自己实现,原始的代码:
lifecycle.addObserver(object : LifecycleEventObserver {
override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
//接收生命周期变化
when (event) {
Lifecycle.Event.ON_CREATE -> {
}
Lifecycle.Event.ON_START -> {
}
Lifecycle.Event.ON_RESUME -> {
}
Lifecycle.Event.ON_PAUSE -> {
}
Lifecycle.Event.ON_STOP -> {
}
Lifecycle.Event.ON_DESTROY -> {
}
else -> {
}
}
}
})lifecycleScope 源码:
/**
* [CoroutineScope] tied to this [Lifecycle].
*
* This scope will be cancelled when the [Lifecycle] is destroyed.
*
* This scope is bound to
* [Dispatchers.Main.immediate][kotlinx.coroutines.MainCoroutineDispatcher.immediate]
*/val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
if (existing != null) {
return existing
}
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
if (mInternalScopeRef.compareAndSet(null, newScope)) {
newScope.register()
return newScope
}
}
}看下 LifecycleCoroutineScopeImpl
internal class LifecycleCoroutineScopeImpl(
override val lifecycle: Lifecycle,
override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
init {
// in case we are initialized on a non-main thread, make a best effort check before
// we return the scope. This is not sync but if developer is launching on a non-main
// dispatcher, they cannot be 100% sure anyways.
if (lifecycle.currentState == Lifecycle.State.DESTROYED) {
coroutineContext.cancel()
}
}
fun register() {
launch(Dispatchers.Main.immediate) {
if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) {
lifecycle.addObserver(this@LifecycleCoroutineScopeImpl)
} else {
coroutineContext.cancel()
}
}
}
override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
lifecycle.removeObserver(this)
coroutineContext.cancel()
}
}
}清晰明了。
协程高级用法
Channel 队列
它是并发安全的,用来连接协程,实现不同协程的通信,参考:
Flow 流
与 RxJava 相似,Flow 就是 Kotlin 协程与响应式编程模型结合的产物。参考:
两者均包含实验性API,在协程1.4可能会发生变化。
参考
其他关于 AAC 架构的 KTX 扩展库:
https://developer.android.com/kotlin/ktx
参考以及推荐文章:
破解 Kotlin 协程 之:
本站广告由 Google AdSense 提供
世上最可爱的美少女
牛批
Mosaic-C
谢谢世上最可爱的美少女的认可呀~