kotlin协程系列之基础设施

前一篇文章介绍了kotlin协程的历史及现状,接下来就介绍一下kotlin协程吧。

kotlin协程分为标准库提供的基础设施以及官方协程库两部分,标准库的基础设施依照协程的思想提供基本的挂起、恢复接口,官方协程库基于这些接口提供丰富实用的功能。

今天先介绍一下kotlin协程的基础设施。为了让大家对kotlin协程有更为直观的认识,先展示一个demo:

// 代码清单2-1
val continuation = suspend {
 println("simpleTest: In Coroutine")
 5
}.createCoroutine(object : Continuation<Int> {
 override fun resumeWith(result: Result<Int>) {
 println("resumeWith: Continuation End: $result")
 }
 override val context: CoroutineContext
 get() = EmptyCoroutineContext
})
continuation.resume(Unit)

demo由三个部分组成:

  • 由suspend声明的挂起函数
  • 调用createCoroutine方法创建Continuation实例
  • 通过continuation.resume启动协程

suspend函数作为协程体,输出一条日志,并且返回5作为执行结果。协程体执行完成后,回调Continuation的resumeWith方法,其内输出协程的执行结果。

suspend函数

suspend函数能在不阻塞线程执行的情况下将协程挂起,适合用于将异步代码转为同步形式。在suspend函数中可以调用普通函数和suspend函数,然而在普通函数中却不能调用suspend函数(由于编译时存在CPS转换,后续讲协程实现时详细说明)。

当suspend函数被真正挂起的时候,对应的调用处被乘坐挂起点

suspend函数内部通过调用suspendCoroutine方法实现挂起操作的,如果没有调用suspendCoroutine,那么suspend函数并不会真正的挂起,前面的demo在实际运行时就不会挂起。

suspendCoroutine的函数签名如下:

// 代码清单2-2
suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T

它是个内联函数,其中block是参数为Continuation的回调函数,在异步任务完成时,早block内部调用continuation.resumeWith()或者调用continuation.resume()continuation.resumeWithException()恢复被挂起的协程。

下面看一个suspend函数的例子:

// 代码清单2-3
suspend fun suspendFun1() {
 println("suspendFun1")
}
suspend fun suspendFun2() = suspendCoroutine<Int> {
 thread {
 sleep(50)
 it.resume(65)
 }
}
fun sleep(time:Long) {
 try {
 Thread.sleep(time)
 } catch (e: InterruptedException) {
 e.printStackTrace()
 }
}
suspend {
 suspendFun1()
 val res2 = suspendFun2()
 println("suspendFun2 result $res2")
 res2 + 3
}.startCoroutine(object : Continuation<Int> {
 override val context: CoroutineContext
 get() = EmptyCoroutineContext
 override fun resumeWith(result: Result<Int>) {
 println("continuationInterceptorTest resume $result")
 }
})

suspendFun1不会真正执行挂起逻辑,而suspendFun2通过suspendCoroutine创建一个真正的挂起函数,并在内部开启子线程模拟耗时任务执行。所以在执行到suspendFun2时,当前协程先被挂起,等待模拟耗时操作完成后,通过continuation的resume方法恢复被挂起的协程。

构建协程

在代码清单2-1中,我们使用createCoroutinecontinuation.resume创建并执行协程,当然我们还可以使用startCoroutine直接创建并启动协程:

// 代码清单2-4
suspend {
 println("simpleTest: In Coroutine with start")
 6
}.startCoroutine(object : Continuation<Int> {
 override val context: CoroutineContext
 get() = EmptyCoroutineContext
 override fun resumeWith(result: Result<Int>) {
 println("resumeWith: Continuation with start End: $result")
 println()
 }
})

可以看出startCoroutine用法和createCoroutine类似,下面就看看它们的函数签名吧:

// 代码清单2-5
fun <T> (suspend () -> T).createCoroutine(
 completion: Continuation<T>
): Continuation<Unit>
fun <T> (suspend () -> T).startCoroutine(
 completion: Continuation<T>
)

startCoroutinecreateCoroutine都是作为(suspend () -> T)函数类型的扩展函数定义的,因此在代码清单2-1和2-4中可以直接在suspend函数后直接调用,并且它们都接收一个Continuation类型实例作为回调,当协程执行完成后,会调用completion的resumeWith函数,并将协程执行结果通过result参数传递。

标准库还定义了带Receiver的startCoroutinecreateCoroutine,用于将协程体的作用域设置成Receiver:

// 代码清单2-6
fun <R, T> (suspend R.() -> T).createCoroutine(
 receiver: R,
 completion: Continuation<T>
)
fun <R, T> (suspend R.() -> T).startCoroutine(
 receiver: R,
 completion: Continuation<T>
)

Continuation

startCoroutinecreateCoroutine创建协程时都用到Continuation对象,它用来记录被挂起的协程在挂起点的状态,其内保存着挂起点之后要执行的代码。考虑如下序列生成器:

// 代码清单2-7
sequence {
 for (i in 1..10) yield(i * i)
 println("over")
} 

该序列生成器在for循环内调用yield生成新的序列,并将当前协程挂起,所以在yield内部一定会有Continuation记录剩余要执行的代码,并且有十个Continuation对象:第一次执行i=2和循环并挂起,第二次执行i=3并挂起,以此逻辑依次执行。当协程被创建,但还没开始运行时,即调用了createCoroutine后未调用resume,此时存在一个初始的Continuation<Unit>表示所有的协程代码。

下面我们看看Continuation的定义:

// 代码清单2-8
interface Continuation<in T> {
 val context: CoroutineContext
 fun resumeWith(result: Result<T>)
}

context表示当前协程的上下文,用户可以根据需求自定义上下文,稍后会详细介绍。在前面的demo中,我们都是直接使用EmptyCoroutineContext,这是一个自带的上下文,无特殊需求时可以用它。

resumeWith函数是协程的完成回调,不论协程执行成功或失败,都通过此函数通知用户。为了方便使用,kotlin提供了两个扩展函数:

// 代码清单2-9
fun <T> Continuation<T>.resume(value: T)
fun <T> Continuation<T>.resumeWithException(exception: Throwable)

resume作为成功通知,resumeWithException作为失败通知。

上下文

协程上下文类似Set集合,用于保存于协程关联的自定义数据:可以包含协程的线程策略、日志信息、协程安全和事务相关信息、协程id及名字等等。可以将协程当做轻量级线程,那么协程上下文就类似线程的ThreadLocal变量,不过ThreadLocal是可变的,而协程上下文是不可变的。

协程上下文在kotlin中用CoroutineContext表示,这是一个集合,下面是其在标准库中的定义:

// 代码清单2-10
interface CoroutineContext {
 operator fun <E : Element> get(key: Key<E>): E?
 fun <R> fold(initial: R, operation: (R, Element) -> R): R
 operator fun plus(context: CoroutineContext): CoroutineContext
 fun minusKey(key: Key<*>): CoroutineContext
 interface Element : CoroutineContext {
 val key: Key<*>
 }
 interface Key<E : Element>
}

从定义中可以看出,CoroutineContext由Element组成,Element仅包含字段key,明显key是作为Element的索引。

Key接口通过泛型将Key与Element关联起来,可以看做Key即为Element本身,这与Set类似。然而CoroutineContext重载了操作符[],即代码清单2-10中的get函数,它接收Key类型索引,而返回Element类型的元素,从这方面看,CoroutineContext又喝Map相似。

foldplusminusKey属于集合操作,这里就不详细介绍了,请自行查阅文档。

总之,CoroutineContext是一种混合了Set和Map结构的新型集合。

EmptyCoroutineContext

EmptyCoroutineContext 是标准库提供的一个不包含任何数据的CoroutineContext实例,在没有特殊需求时可以使用此实例,其定义如下:

// 代码清单2-11
public object EmptyCoroutineContext : CoroutineContext, Serializable {
 private const val serialVersionUID: Long = 0
 private fun readResolve(): Any = EmptyCoroutineContext
 public override fun <E : Element> get(key: Key<E>): E? = null
 public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
 public override fun plus(context: CoroutineContext): CoroutineContext = context
 public override fun minusKey(key: Key<*>): CoroutineContext = this
 public override fun hashCode(): Int = 0
 public override fun toString(): String = "EmptyCoroutineContext"
}

实现

在实现自定义上下文时,不能直接实现CoroutineContext接口,标准库提供了AbstractCoroutineContextElement,应该实现此类,其定义如下:

// 代码清单2-12
public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element

AbstractCoroutineContextElement内部通过重写Element接口的key字段来指定具体的Element类型。

下面看一个自定义上下文的demo:

// 代码清单2-13
class CoroutineName(val name: String) : AbstractCoroutineContextElement(Key) {
 companion object Key : CoroutineContext.Key<CoroutineName>
}
var coroutineContext: CoroutineContext = EmptyCoroutineContext
coroutineContext += CoroutineName("c_test")
suspend {
 println("coroutineContextTest: In Coroutine ${coroutineContext[CoroutineName]?.name} with start")
}.startCoroutine(object : Continuation<Int> {
 override val context: CoroutineContext
 get() = coroutineContext
 override fun resumeWith(result: Result<Int>) {
 println("resumeWith: Continuation with start End: $result")
 }
})

CoroutineName中通过伴生对象志明Key类型,并且通过父类构造函数将其传递上去,以指定CoroutineName所对应的Element类型。

接下去创建CoroutineName实例,并加到EmptyCoroutineContext中,最后再协程体内部通过coroutineContext以及对应的Key(即伴生对象CoroutineName),即可获的CoroutineName的实例。

拦截器

在android代码中,更新UI都要在主线程中执行,当发起网络请求或其他耗时操作都会切到子线程中执行,而suspend函数恢复执行依赖调用continuation.resumeWIth()所在的线程,这样就要手动切换线程,容易引发bug。

ContinuationInterceptor提供了拦截并重新包装Continuation实例的能力,通过重新包装Continuation实例,就可以实现自定义的需求,比如每次都自动切换回主线程执行。

ContinuationInterceptor接口定义如下:

// 代码清单2-14
interface ContinuationInterceptor : CoroutineContext.Element {
 companion object Key : CoroutineContext.Key<ContinuationInterceptor>
 fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
 fun releaseInterceptedContinuation(continuation: Continuation<*>)
}

通过实现interceptContinuation方法重新包装原始的continuation,以实现自定义的需求。下面我们看一个在协程恢复时添加日志的拦截器示例:

// 代码清单2-15
class LogInterceptor() : ContinuationInterceptor {
 override val key: CoroutineContext.Key<*> = ContinuationInterceptor
 override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
 LogContinuation(continuation)
}
class LogContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation {
 override fun resumeWith(result: Result<T>) {
 println("before resumeWith: $result")
 continuation.resumeWith(result)
 println("after resumeWith")
 }
}
fun continuationInterceptorTest() {
 suspend {
 suspendFun1()
 val res2 = suspendFun2()
 println("suspendFun2 result $res2")
 res2 + 3
 }.startCoroutine(object : Continuation<Int> {
 override val context: CoroutineContext
 get() = LogInterceptor()
 override fun resumeWith(result: Result<Int>) {
 println("continuationInterceptorTest resume $result")
 }
 })
}
suspend fun suspendFun1() {
 println("suspendFun1")
}
suspend fun suspendFun2() = suspendCoroutine<Int> {
 thread {
 sleep(50)
 it.resume(65)
 }
}

demo中首先定义了LogInterceptorLogContinuation两个类。LogContinuation接受Continuation作为构造函数参数,通过委托方式实现Continuation接口,在resumeWith中添加日志以记录协程的执行记录。

continuationInterceptorTest中有suspendFun1suspendFun2两个挂起函数,由于suspendFun1并没有真正挂起,所以在执行suspendFun1时没有LogContinuation的日志,suspendFun2则是在调用it.resume(65)时会打印相关的日志。

参考链接

作者:Longlongago

%s 个评论

要回复文章请先登录注册