Kotlin 协程入门(一) - suspend
1. 挂起是什么?
首先我们定义三个耗时任务, work1 需要 work0 的返回值作为参数, work2 需要 work1 的返回值作为参数:
1
2
3
  | 
suspend fun work0(): Int { largeWork(); return 0 }
suspend fun work1(input: Int): Int { largeWork(); return input + 1 }
suspend fun work2(input: Int): Int { largeWork(); return input + 2 }
  | 
 
在 kotlin 中, 我们会这么写:
1
2
3
4
5
6
7
8
  | 
suspend fun coroutineTest() {
    val work0 = work0()
    println(work0)
    val work1 = work1(work0)
    println(work1)
    val work2 = work2(work1)
    println(work2)
}
  | 
 
一切是那么的顺畅! 而在 Java 中, 面对这种耗时任务, 我们往往会这么定义:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
  | 
void work0(Consumer<Integer> consumer) {
    largeWork(); // 耗时任务
    consumer.accept(0);
}
void work1(int input, Consumer<Integer> consumer) {
    largeWork(); // 耗时任务
    consumer.accept(input + 1);
}
void work2(int input, Consumer<Integer> consumer) {
    largeWork(); // 耗时任务
    consumer.accept(input + 2);
}
  | 
 
然后这么调用, 这里使用了 Java8 的 Lambda 语法
1
2
3
4
5
6
7
  | 
void javaCallbackHell() {
    work0(
      workRes0 -> work1(workRes0,
        workRes1 -> work2(workRes1,
          workRes2 -> {
          })));
}
  | 
 
于是, 出现了回调地狱, 已经开始有点害怕了, 而且这仅仅是包含请求成功的情况, 如果涉及到请求失败, 线程调度, 线程同步, 那事情就变得更加恐怖了
而 Kotlin 的挂起, 就是为了解决这个问题而做的一个语法糖, 让我们写的更爽.
Kotlin 挂起的本质, 就是类似于上面的回调, 而被挂起, 本质上就是将后续全部的函数加入回调中.
这被称为 CPS 转换(Continuation-Passing-Style Transformation), 不信? 我们来分析一下 suspend 这个关键字干了什么:
2. suspend 干了什么
suspend 做了什么? 写一个空的 suspend 我们就知道了:
1
  | 
suspend fun coroutineTestFun(): Int = 1
  | 
 
反编译之后得到:
1
2
3
  | 
public final Object coroutineTestFun(@NotNull Continuation $completion) {
   return Boxing.boxInt(1);
}
  | 
 
事实上只是多了一个 Continuation 对象, 返回值变成 Object 类型, 即将 suspend () -> Int 类型转换为了(Continuation) -> Any?
为什么是 Object 而不是 Integer 呢? 我们不妨看一下调用这个函数的地方做了怎样的判断:
在调用它的地方会变成类似下面这样, coroutineTestFun 如果返回 COROUTINE_SUSPENDED 表示它被挂起,不会立即返回结果
1
2
3
4
5
6
7
  | 
val var10000: Any = coroutineTestFun($completion)
if (var10000 === IntrinsicsKt.COROUTINE_SUSPENDED)
	return var10000
else {
    // 挂起函数执行结束, 继续后续的函数
    processNext(var10000)
}
  | 
 
而当 coroutineTestFun 说它被挂起的时候, 包裹其的外层 suspend 函数也被挂起,  不会立即返回结果, 因此: return var10000
这里我们就可以知道为什么要返回一个 Object 类型了吧, 因为 suspend 函数的返回值有三种:
COROUTINE_SUSPENDED: 代表当前函数被挂起 
- 包裹着执行结果的对象(此对象事实上通过 
Continuation 传递, 后面会说到) 
- 挂起函数内部并没有执行挂起, 直接返回实际结果
 
当然, 如果 suspend 内部没有实际挂起(没有直接调用其他 suspend 函数), Continuation 对象也没有任何作用.
这时, 我们想看看 suspend 具体是怎么实现这些的, 但是 suspend 函数的字节码较为复杂, 我们不妨先了解一下 Continuation
3. Continuation
很简单的一个接口, 只有一个参数, 代表协程执行结束后会在哪个 CoroutineContext 继续执行, 只有一个方法 resumeWith, 代表得到执行结果之后要干什么
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  | 
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext
    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}
  | 
 
Continuation 的具体实现类是 BaseContinuationImpl:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  | 
internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            with(current) {
                // ...
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted()
                if (completion is BaseContinuationImpl) {
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
    protected open fun releaseIntercepted() {}
    //...
}
  | 
 
这里的 invokeSuspend 事实上就是我们说的挂起的本质: 回调
而当传入的 completion 不再是 BaseContinuationImpl 的时候, 这时候就是到了顶层的 Continuation了, 然后会调用其 resumeWith(result) 方法.
那么, 如果不是 BaseContinuationImpl, 那么顶层的 Continuation 是什么呢? 这就要说到协程是如何启动的了: Kotlin Coroutine 入门 - launch/async
4. suspend 的实现
现在, 以最简单的思路去展示在本文开头的例子中 suspend 干了什么, 可以看下面这个 gif:

具体实现在字节码中, 但是字节码阅读略微复杂, 这里给出与反编译的字节码大致等同的 Kotlin 代码:
伪代码
首先, 每个 suspend 函数都会创建一个内部的 Continuation 对象(内部实际上并不是 MyContinuation 这个名字, 这里只是个示例), 内部的 result 属性可能是实际的函数返回值, 也可能是 Result 对象.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
  | 
class MyContinuation(continuation: Continuation<*>) : ContinuationImpl(continuation) {
    lateinit var result: Any
    var label = 0
    override fun invokeSuspend(result: Result<*>): Any {
        this.result = result
        label = label or Int.MIN_VALUE // 这里用 |=, 后续流程会减掉
        return coroutineTestFun(this)
    }
}
  | 
 
ContinuationImpl 是 BaseContinuationImpl 的子类, 在分析 BaseContinuationImpl 的时候, 我们发现了其实 invokeSuspend 会被多次循环调用直到到达顶层 Continuation 对象.
suspend 内部会对 continuation 进行判断, 如果 continuation 是 MyContinuation 对象的话, 则会使用当前的, 否则就新创建一个:
1
2
3
  | 
val myContinuation = if (continuation !is MyContinuation) {
    MyContinuation(continuation)
} else continuation
  | 
 
接下来就进入了 suspend 函数中的状态机, 首先我们要看的是状态0和状态1
事实上在字节码中我们能看到其实是通过 label 来控制的, 但是由于在 kotlin 中使用这种方式相当复杂, 有兴趣的可以去自行查看真实的字节码, 这里只是个思路.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  | 
when (myContinuation.label) {
    0 -> myContinuation.apply {
        result.throwOnFailure()
        label = 1
        work = work0(this)
        if (work == COROUTINE_SUSPENDED) {
            return COROUTINE_SUSPENDED
        }
    }
    1 -> myContinuation.apply {
        result.throwOnFailure()
        work = result
        val work0 = work as Int
        println(work0)
        label = 2
        work = work1(work0, this)
        if (work == COROUTINE_SUSPENDED) {
            return COROUTINE_SUSPENDED
        }
    }
    2 -> // ...
    3 -> // ...
    else -> throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
}
  | 
 
如果 work 返回的不是 COROUTINE_SUSPENDED, 那么代表函数已经得到了确切的返回结果, 比如我们的例子里面是返回 Int, 那么代表着返回了真实的 Int 类型, 直接向下处理即可.
如果返回了 COROUTINE_SUSPENDED, 则代表 work 被挂起了, 直接标记当前函数也是挂起状态即可, 等待下一次 invokeSuspend 方法被调用进入到下一个标签(label 1).
同理, 状态2/3也和状态0/1是同样的原理, 最后放上完整的伪代码(
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
  | 
class MyContinuation(continuation: Continuation<*>) : ContinuationImpl(continuation) {
    lateinit var result: Any
    var label = 0
    override fun invokeSuspend(result: Result<*>): Any {
        this.result = result
        label = label or Int.MIN_VALUE // 这里用 |=, 后续流程会减掉
        return coroutineTestFun(this)
    }
}
fun coroutineTestFun(continuation: Continuation<*>): Any {
    val myContinuation = if (continuation !is MyContinuation) {
        MyContinuation(continuation)
    } else continuation
    var work: Any? = null
    // 说实话, 字节码中这个先 |=, 然后 -= 的操作有什么用呢?
    myContinuation.label -= Int.MIN_VALUE
    when (myContinuation.label) {
        0 -> myContinuation.apply {
            result.throwOnFailure()
            label = 1
            work = work0(this)
            if (work == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED
            }
        }
        1 -> myContinuation.apply {
            result.throwOnFailure()
            work = result
            val work0 = work as Int
            println(work0)
            label = 2
            work = work1(work0, this)
            if (work == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED
            }
        }
        2 -> myContinuation.apply {
            result.throwOnFailure()
            work = result
            val work1 = work as Int
            println(work1)
            label = 3
            work = work2(work1, this)
            if (work == COROUTINE_SUSPENDED) {
                return COROUTINE_SUSPENDED
            }
        }
        3 -> myContinuation.apply {
            result.throwOnFailure()
            work = result
            val work2 = work as Int
            println(work2)
            return Unit
        }
        else -> throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
    }
    return Unit
}
  | 
 
附: 真实的字节码
事实上的话, 字节码是有点复杂的, 但是看了上面的解释, 想必已经直到了大概的思路了.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
  | 
@Nullable
public static final Object coroutineTestFun(@NotNull Continuation var0) {
   Object $continuation;
   label37: {
      if (var0 instanceof <undefinedtype>) {
         $continuation = (<undefinedtype>)var0;
         if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
            ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
            break label37;
         }
      }
      $continuation = new ContinuationImpl(var0) {
         // $FF: synthetic field
         Object result;
         int label;
         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            this.result = $result;
            this.label |= Integer.MIN_VALUE;
            return CoroutineTestKt.coroutineTestFun(this);
         }
      };
   }
   Object var10000;
   label31: {
      Object var7;
      label30: {
         Object $result = ((<undefinedtype>)$continuation).result;
         var7 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
         switch(((<undefinedtype>)$continuation).label) {
         case 0:
            ResultKt.throwOnFailure($result);
            ((<undefinedtype>)$continuation).label = 1;
            var10000 = work0((Continuation)$continuation);
            if (var10000 == var7) {
               return var7;
            }
            break;
         case 1:
            ResultKt.throwOnFailure($result);
            var10000 = $result;
            break;
         case 2:
            ResultKt.throwOnFailure($result);
            var10000 = $result;
            break label30;
         case 3:
            ResultKt.throwOnFailure($result);
            var10000 = $result;
            break label31;
         default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
         }
         int work0 = ((Number)var10000).intValue();
         boolean var2 = false;
         System.out.println(work0);
         ((<undefinedtype>)$continuation).label = 2;
         var10000 = work1(work0, (Continuation)$continuation);
         if (var10000 == var7) {
            return var7;
         }
      }
      int work1 = ((Number)var10000).intValue();
      boolean var3 = false;
      System.out.println(work1);
      ((<undefinedtype>)$continuation).label = 3;
      var10000 = work2(work1, (Continuation)$continuation);
      if (var10000 == var7) {
         return var7;
      }
   }
   int work2 = ((Number)var10000).intValue();
   boolean var4 = false;
   System.out.println(work2);
   return Unit.INSTANCE;
}
  | 
 
最后
事实上本文没有讨论切换线程/协程时的状况, 只是分析了一下 suspend 关键字, 理解挂起的实现原理.