Kotlin 协程入门(一) - suspend
1. 挂起是什么?
首先我们定义三个耗时任务, work1 需要 work0 的返回值作为参数, work2 需要 work1 的返回值作为参数:
1
2
3
|
suspend fun work0(): Int { largeWork(); return 0 }
suspend fun work1(input: Int): Int { largeWork(); return input + 1 }
suspend fun work2(input: Int): Int { largeWork(); return input + 2 }
|
在 kotlin 中, 我们会这么写:
1
2
3
4
5
6
7
8
|
suspend fun coroutineTest() {
val work0 = work0()
println(work0)
val work1 = work1(work0)
println(work1)
val work2 = work2(work1)
println(work2)
}
|
一切是那么的顺畅! 而在 Java 中, 面对这种耗时任务, 我们往往会这么定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
void work0(Consumer<Integer> consumer) {
largeWork(); // 耗时任务
consumer.accept(0);
}
void work1(int input, Consumer<Integer> consumer) {
largeWork(); // 耗时任务
consumer.accept(input + 1);
}
void work2(int input, Consumer<Integer> consumer) {
largeWork(); // 耗时任务
consumer.accept(input + 2);
}
|
然后这么调用, 这里使用了 Java8 的 Lambda 语法
1
2
3
4
5
6
7
|
void javaCallbackHell() {
work0(
workRes0 -> work1(workRes0,
workRes1 -> work2(workRes1,
workRes2 -> {
})));
}
|
于是, 出现了回调地狱, 已经开始有点害怕了, 而且这仅仅是包含请求成功的情况, 如果涉及到请求失败, 线程调度, 线程同步
, 那事情就变得更加恐怖了
而 Kotlin 的挂起, 就是为了解决这个问题而做的一个语法糖, 让我们写的更爽.
Kotlin 挂起的本质, 就是类似于上面的回调, 而被挂起, 本质上就是将后续全部的函数加入回调中.
这被称为 CPS 转换(Continuation-Passing-Style Transformation)
, 不信? 我们来分析一下 suspend
这个关键字干了什么:
2. suspend 干了什么
suspend 做了什么? 写一个空的 suspend 我们就知道了:
1
|
suspend fun coroutineTestFun(): Int = 1
|
反编译之后得到:
1
2
3
|
public final Object coroutineTestFun(@NotNull Continuation $completion) {
return Boxing.boxInt(1);
}
|
事实上只是多了一个 Continuation
对象, 返回值变成 Object
类型, 即将 suspend () -> Int
类型转换为了(Continuation) -> Any?
为什么是 Object
而不是 Integer
呢? 我们不妨看一下调用这个函数的地方做了怎样的判断:
在调用它的地方会变成类似下面这样, coroutineTestFun
如果返回 COROUTINE_SUSPENDED
表示它被挂起,不会立即返回结果
1
2
3
4
5
6
7
|
val var10000: Any = coroutineTestFun($completion)
if (var10000 === IntrinsicsKt.COROUTINE_SUSPENDED)
return var10000
else {
// 挂起函数执行结束, 继续后续的函数
processNext(var10000)
}
|
而当 coroutineTestFun
说它被挂起的时候, 包裹其的外层 suspend
函数也被挂起, 不会立即返回结果, 因此: return var10000
这里我们就可以知道为什么要返回一个 Object
类型了吧, 因为 suspend 函数的返回值有三种:
COROUTINE_SUSPENDED
: 代表当前函数被挂起
- 包裹着执行结果的对象(此对象事实上通过
Continuation
传递, 后面会说到)
- 挂起函数内部并没有执行挂起, 直接返回实际结果
当然, 如果 suspend 内部没有实际挂起(没有直接调用其他 suspend 函数), Continuation
对象也没有任何作用.
这时, 我们想看看 suspend 具体是怎么实现这些的, 但是 suspend 函数的字节码较为复杂, 我们不妨先了解一下 Continuation
3. Continuation
很简单的一个接口, 只有一个参数, 代表协程执行结束后会在哪个 CoroutineContext
继续执行, 只有一个方法 resumeWith
, 代表得到执行结果之后要干什么
1
2
3
4
5
6
7
8
9
10
11
12
|
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
|
Continuation 的具体实现类是 BaseContinuationImpl
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
with(current) {
// ...
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
protected open fun releaseIntercepted() {}
//...
}
|
这里的 invokeSuspend
事实上就是我们说的挂起的本质: 回调
而当传入的 completion 不再是 BaseContinuationImpl
的时候, 这时候就是到了顶层的 Continuation
了, 然后会调用其 resumeWith(result)
方法.
那么, 如果不是 BaseContinuationImpl
, 那么顶层的 Continuation
是什么呢? 这就要说到协程是如何启动的了: Kotlin Coroutine 入门 - launch/async
4. suspend 的实现
现在, 以最简单的思路去展示在本文开头的例子中 suspend 干了什么, 可以看下面这个 gif:
具体实现在字节码中, 但是字节码阅读略微复杂, 这里给出与反编译的字节码大致等同的 Kotlin 代码:
伪代码
首先, 每个 suspend 函数都会创建一个内部的 Continuation
对象(内部实际上并不是 MyContinuation 这个名字, 这里只是个示例), 内部的 result 属性可能是实际的函数返回值, 也可能是 Result 对象.
1
2
3
4
5
6
7
8
9
10
|
class MyContinuation(continuation: Continuation<*>) : ContinuationImpl(continuation) {
lateinit var result: Any
var label = 0
override fun invokeSuspend(result: Result<*>): Any {
this.result = result
label = label or Int.MIN_VALUE // 这里用 |=, 后续流程会减掉
return coroutineTestFun(this)
}
}
|
ContinuationImpl
是 BaseContinuationImpl
的子类, 在分析 BaseContinuationImpl
的时候, 我们发现了其实 invokeSuspend
会被多次循环调用直到到达顶层 Continuation
对象.
suspend 内部会对 continuation 进行判断, 如果 continuation 是 MyContinuation
对象的话, 则会使用当前的, 否则就新创建一个:
1
2
3
|
val myContinuation = if (continuation !is MyContinuation) {
MyContinuation(continuation)
} else continuation
|
接下来就进入了 suspend 函数中的状态机, 首先我们要看的是状态0
和状态1
事实上在字节码中我们能看到其实是通过 label 来控制的, 但是由于在 kotlin 中使用这种方式相当复杂, 有兴趣的可以去自行查看真实的字节码, 这里只是个思路.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
when (myContinuation.label) {
0 -> myContinuation.apply {
result.throwOnFailure()
label = 1
work = work0(this)
if (work == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
1 -> myContinuation.apply {
result.throwOnFailure()
work = result
val work0 = work as Int
println(work0)
label = 2
work = work1(work0, this)
if (work == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
2 -> // ...
3 -> // ...
else -> throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
}
|
如果 work 返回的不是 COROUTINE_SUSPENDED
, 那么代表函数已经得到了确切的返回结果, 比如我们的例子里面是返回 Int, 那么代表着返回了真实的 Int 类型, 直接向下处理即可.
如果返回了 COROUTINE_SUSPENDED
, 则代表 work 被挂起了, 直接标记当前函数也是挂起状态即可, 等待下一次 invokeSuspend 方法被调用进入到下一个标签(label 1
).
同理, 状态2/3也和状态0/1是同样的原理, 最后放上完整的伪代码(
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
class MyContinuation(continuation: Continuation<*>) : ContinuationImpl(continuation) {
lateinit var result: Any
var label = 0
override fun invokeSuspend(result: Result<*>): Any {
this.result = result
label = label or Int.MIN_VALUE // 这里用 |=, 后续流程会减掉
return coroutineTestFun(this)
}
}
fun coroutineTestFun(continuation: Continuation<*>): Any {
val myContinuation = if (continuation !is MyContinuation) {
MyContinuation(continuation)
} else continuation
var work: Any? = null
// 说实话, 字节码中这个先 |=, 然后 -= 的操作有什么用呢?
myContinuation.label -= Int.MIN_VALUE
when (myContinuation.label) {
0 -> myContinuation.apply {
result.throwOnFailure()
label = 1
work = work0(this)
if (work == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
1 -> myContinuation.apply {
result.throwOnFailure()
work = result
val work0 = work as Int
println(work0)
label = 2
work = work1(work0, this)
if (work == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
2 -> myContinuation.apply {
result.throwOnFailure()
work = result
val work1 = work as Int
println(work1)
label = 3
work = work2(work1, this)
if (work == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED
}
}
3 -> myContinuation.apply {
result.throwOnFailure()
work = result
val work2 = work as Int
println(work2)
return Unit
}
else -> throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
}
return Unit
}
|
附: 真实的字节码
事实上的话, 字节码是有点复杂的, 但是看了上面的解释, 想必已经直到了大概的思路了.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
@Nullable
public static final Object coroutineTestFun(@NotNull Continuation var0) {
Object $continuation;
label37: {
if (var0 instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)var0;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label37;
}
}
$continuation = new ContinuationImpl(var0) {
// $FF: synthetic field
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return CoroutineTestKt.coroutineTestFun(this);
}
};
}
Object var10000;
label31: {
Object var7;
label30: {
Object $result = ((<undefinedtype>)$continuation).result;
var7 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).label = 1;
var10000 = work0((Continuation)$continuation);
if (var10000 == var7) {
return var7;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
case 2:
ResultKt.throwOnFailure($result);
var10000 = $result;
break label30;
case 3:
ResultKt.throwOnFailure($result);
var10000 = $result;
break label31;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
int work0 = ((Number)var10000).intValue();
boolean var2 = false;
System.out.println(work0);
((<undefinedtype>)$continuation).label = 2;
var10000 = work1(work0, (Continuation)$continuation);
if (var10000 == var7) {
return var7;
}
}
int work1 = ((Number)var10000).intValue();
boolean var3 = false;
System.out.println(work1);
((<undefinedtype>)$continuation).label = 3;
var10000 = work2(work1, (Continuation)$continuation);
if (var10000 == var7) {
return var7;
}
}
int work2 = ((Number)var10000).intValue();
boolean var4 = false;
System.out.println(work2);
return Unit.INSTANCE;
}
|
最后
事实上本文没有讨论切换线程/协程时的状况, 只是分析了一下 suspend 关键字, 理解挂起的实现原理.