七叶笔记 » golang编程 » 协程与Channels (CSP: Kotlin, Golang)

协程与Channels (CSP: Kotlin, Golang)

概述

众所周知,编写具有共享可变状态的代码非常困难且容易出错。通过通信共享信息而不是使用通用可变状态共享信息试图简化这一点。协程可以通过通道相互通信。

CSP 通信顺序进程

CSP, communicating sequential processes.

CSP模型是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。

CSP讲究的是“以通信的方式来共享内存”.

普通的线程并发模型,就是像Java、C++、或者Python,他们线程间通信都是通过共享内存的方式来进行的。非常典型的方式就是,在访问共享数据(例如数组、Map、或者某个结构体或对象)的时候,通过锁来访问,因此,在很多时候,衍生出一种方便操作的数据结构,叫做“线程安全的数据结构”。例如Java提供的包”java.util.concurrent”中的数据结构。Go中也实现了传统的线程并发模型。

管道(*通道) Channel

管道与共享内存之间有很大的区别,内存共享是通过内存来共享内存,而管道是通过通信来共享内存。所以管道通信比内存共享效率要高很多。

协程 Coroutine

协程通过管道能够实现百万级的并发。如果说线程是抢占式的,那么协程是协作式的。在协程里面,也是通过管道来调度的。

线程是先占用CPU和内存后才调度,而协程是通过通信发送信号来调度,协程全是通过管道,由于协程的消耗比线程小很多,所以能够实现百万并发。

在协程中,IO操作时绝大部分时间与CPU无关,这是管道带来的优势,不需要长时间锁住内存,也不需要CPU来做调度。协程具有以下特点:

  1. 用户空间 避免了内核态和用户态的切换导致的成本
  2. 可以由语言和框架层进行调度
  3. 更小的栈空间允许创建大量的实例

Go协程调度器

Golang uses the Pull migration based load balancing scheduling in its runtime to optimize resource utilization.

In the diagram, G0, G1…G9 are goroutines running on a 2 core machine with 2 processors P0 and P1 . The system in context has a global queue with 4 goroutines queued in it.

Golang 通过为goroutine提供语言层面的调度器,来实现了高效率的M:N线程对应关系,如下图

说明:

  • M:是内核线程
  • P : 是调度协调,用于协调M和G的执行,内核线程只有拿到了 P才能对goroutine继续调度执行,一般都是通过限定P的个数来控制golang的并发度
  • G : 是待执行的goroutine,包含这个goroutine的栈空间
  • Gn : 灰色背景的Gn 是已经挂起的goroutine,它们被添加到了执行队列中,然后需要等待网络IO的goroutine,当P通过 epoll查询到特定的fd的时候,会重新调度起对应的,正在挂起的goroutine。

Golang为了调度的公平性,在调度器加入了steal working 算法 ,在一个P自己的执行队列,处理完之后,它会先到全局的执行队列中偷G进行处理,如果没有的话,再会到其他P的执行队列中抢G来进行处理。

Go的CSP并发模型,是通过 goroutine channel 来实现的。

  • goroutine 是Go语言中并发的执行单位。有点抽象,其实就是和传统概念上的”线程“类似,可以理解为”线程“。
  • channel 是Go语言中各个并发结构体( goroutine )之前的通信机制。 通俗的讲,就是各个 goroutine 之间通信的”管道“,有点类似于Linux中的管道。

生成一个 goroutine 的方式非常的简单:Go一下,就生成了。

 go f();  

通信机制 channel 也很方便,传数据用 channel <- data ,取数据用 <-channel

在通信过程中,传数据 channel <- data 和取数据 <-channel 必然会成对出现,因为这边传,那边取,两个 goroutine 之间才会实现通信。

而且不管传还是取,必阻塞,直到另外的 goroutine 传或者取为止。

Kotlin 协程


通道是允许我们在不同协程之间传递数据的通信原语。

一个协程可以向通道发送一些信息,而另一个可以从通道接收这些信息:

发送(生产)信息的协程通常称为生产者,接收(消费)信息的协程称为消费者。当需要的时候,很多协程可以向同一个通道发送信息,很多协程可以从它那里接收信息:

请注意,当许多协程从同一通道接收信息时,每个元素仅由其中一个消费者处理一次;自动处理它意味着从通道中删除这个元素。

我们可以将通道视为类似于元素的集合(直接模拟将是一个队列:元素被添加到一端并从另一端接收)。但是,有一个重要的区别:与集合不同,即使在它们的同步版本中,通道也可以暂停 send和receive操作。当通道为空或已满时会发生这种情况(通道的大小可能会受到限制,然后它可能已满)。

Channel通过三个不同的接口表示:SendChannel、ReceiveChannel和Channel扩展了前两个接口。您通常创建一个通道并将其作为SendChannel实例提供给生产者,以便只有他们可以发送给它,并作为ReceiveChannel实例提供给消费者,以便只有他们可以从中接收。请注意,send和receive方法都声明为suspend:

 interface SendChannel<in E> {
    suspend fun send(element: E)
    fun close(): Boolean
}




interface ReceiveChannel<out E> {
    suspend fun receive(): E
}    




interface Channel<E> : SendChannel<E>, ReceiveChannel<E>  

管道使用示例

 import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*




fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        channel.send("A1")
        channel.send("A2")
        log("A done")
    }
    launch {
        channel.send("B1")
        log("B done")
    }
    launch {
        repeat(3) {
            val x = channel.receive()
            log(x)
        }
    }
}




fun log(message: Any?) {
    println("[${Thread.currentThread().name}] $message")
}  

管道的类型

Several types of channels are defined in the library. They differ in how many elements they can internally store, and whether the send call can suspend or not. For all channel types, the receive call behaves in the same manner: it receives an element if the channel is not empty, and otherwise suspends.

  • Unlimited channel

An unlimited channel is the closest analog to queue: producers can send elements to this channel, and it will grow infinitely. The send call will never be suspended. If there’s no more memory, you’ll get an OutOfMemoryException. The difference with a queue appears when a consumer tries to receive from an empty channel and gets suspended until some new elements are sent to this channel.

  • Buffered channel

A buffered channel’s size is constrained by the specified number. Producers can send elements to this channel until the size limit is reached. All the elements are internally stored. When the channel is full, the next send call on it suspends until more free space appears.

  • “Rendezvous”(会合;在约定场所会面) channel

The “Rendezvous” channel is a channel without a buffer; it’s the same as creating a buffered channel with zero size. One of the functions (send or receive) always gets suspended until the other is called. If the send function is called and there’s no suspended receive call ready to process the element, then send suspends. Similarly, if the receive function is called and the channel is empty – or in other words, there’s no suspended send call ready to send the element – the receive call suspends. The “rendezvous” name (“a meeting at an agreed time and place”) refers to the fact that send and receive should “meet on time”.

  • Conflated(混为一谈) channel

A new element sent to the conflated channel will overwrite the previously sent element, so the receiver will always get only the latest element. The send call will never suspend.

参考资料

相关文章