协程与Channels (CSP: Kotlin, Golang)



CSP 通信顺序进程

CSP, communicating sequential processes.

CSP模型是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。



管道(*通道) Channel


协程 Coroutine




  1. 用户空间 避免了内核态和用户态的切换导致的成本
  2. 可以由语言和框架层进行调度
  3. 更小的栈空间允许创建大量的实例


Golang uses the Pull migration based load balancing scheduling in its runtime to optimize resource utilization.

In the diagram, G0, G1…G9 are goroutines running on a 2 core machine with 2 processors P0 and P1 . The system in context has a global queue with 4 goroutines queued in it.

Golang 通过为goroutine提供语言层面的调度器,来实现了高效率的M:N线程对应关系,如下图


  • M:是内核线程
  • P : 是调度协调,用于协调M和G的执行,内核线程只有拿到了 P才能对goroutine继续调度执行,一般都是通过限定P的个数来控制golang的并发度
  • G : 是待执行的goroutine,包含这个goroutine的栈空间
  • Gn : 灰色背景的Gn 是已经挂起的goroutine,它们被添加到了执行队列中,然后需要等待网络IO的goroutine,当P通过 epoll查询到特定的fd的时候,会重新调度起对应的,正在挂起的goroutine。

Golang为了调度的公平性,在调度器加入了steal working 算法 ,在一个P自己的执行队列,处理完之后,它会先到全局的执行队列中偷G进行处理,如果没有的话,再会到其他P的执行队列中抢G来进行处理。

Go的CSP并发模型,是通过 goroutine channel 来实现的。

  • goroutine 是Go语言中并发的执行单位。有点抽象,其实就是和传统概念上的”线程“类似,可以理解为”线程“。
  • channel 是Go语言中各个并发结构体( goroutine )之前的通信机制。 通俗的讲,就是各个 goroutine 之间通信的”管道“,有点类似于Linux中的管道。

生成一个 goroutine 的方式非常的简单:Go一下,就生成了。

 go f();  

通信机制 channel 也很方便,传数据用 channel <- data ,取数据用 <-channel

在通信过程中,传数据 channel <- data 和取数据 <-channel 必然会成对出现,因为这边传,那边取,两个 goroutine 之间才会实现通信。

而且不管传还是取,必阻塞,直到另外的 goroutine 传或者取为止。

Kotlin 协程





我们可以将通道视为类似于元素的集合(直接模拟将是一个队列:元素被添加到一端并从另一端接收)。但是,有一个重要的区别:与集合不同,即使在它们的同步版本中,通道也可以暂停 send和receive操作。当通道为空或已满时会发生这种情况(通道的大小可能会受到限制,然后它可能已满)。


 interface SendChannel<in E> {
    suspend fun send(element: E)
    fun close(): Boolean

interface ReceiveChannel<out E> {
    suspend fun receive(): E

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>  


 import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        log("A done")
    launch {
        log("B done")
    launch {
        repeat(3) {
            val x = channel.receive()

fun log(message: Any?) {
    println("[${Thread.currentThread().name}] $message")


Several types of channels are defined in the library. They differ in how many elements they can internally store, and whether the send call can suspend or not. For all channel types, the receive call behaves in the same manner: it receives an element if the channel is not empty, and otherwise suspends.

  • Unlimited channel

An unlimited channel is the closest analog to queue: producers can send elements to this channel, and it will grow infinitely. The send call will never be suspended. If there’s no more memory, you’ll get an OutOfMemoryException. The difference with a queue appears when a consumer tries to receive from an empty channel and gets suspended until some new elements are sent to this channel.

  • Buffered channel

A buffered channel’s size is constrained by the specified number. Producers can send elements to this channel until the size limit is reached. All the elements are internally stored. When the channel is full, the next send call on it suspends until more free space appears.

  • “Rendezvous”(会合;在约定场所会面) channel

The “Rendezvous” channel is a channel without a buffer; it’s the same as creating a buffered channel with zero size. One of the functions (send or receive) always gets suspended until the other is called. If the send function is called and there’s no suspended receive call ready to process the element, then send suspends. Similarly, if the receive function is called and the channel is empty – or in other words, there’s no suspended send call ready to send the element – the receive call suspends. The “rendezvous” name (“a meeting at an agreed time and place”) refers to the fact that send and receive should “meet on time”.

  • Conflated(混为一谈) channel

A new element sent to the conflated channel will overwrite the previously sent element, so the receiver will always get only the latest element. The send call will never suspend.

