背景
最近由于性能问题,后端服务一直在做python到golang的迁移和重构。go语言精简优雅,既有编译型语言的严谨和高性能,又有解释型语言的开发效率,出色的并发性能也是go区别于其他语言的一大特色。go的并发编程代码虽然简单,但重在其并发模型和流程的设计。所以这里总结下golang协程并发常用的流水线模型。
转自:
参考:go语言中文文档:www.topgoer.com
简单的流水线思维
流水线模式并不是什么新奇的概念,但是它能极大地提高生产效率。比如实际生活中的汽车生产流水线,流水线上的每一个流程负责不同的工作,比如第一个流程是拼装车身,第二个流程是安装发动机,第三个流程是装轮胎…,这些步骤我们可以类比成go并发流程中的协程,每一个协程就是一个任务。流水线上面传递的车身、发动机、轮胎,这些我们可以类比成协程间需要传递的数据,而在这些流程(协程)间传递这些配件(数据),自然就要通过传送带(channel)。在流水线上,我们装四个轮胎肯定不是一个一个来装的,肯定是有四个机械臂同时来装。因此装轮胎这个步骤我们有4个协程在并发工作来提高效率。这么一来,流水线模型的基本要素就构成了。
Golang的并发模型灵感其实都来自我们生活,对程序而言,高的生产效率就是高的性能。 在Golang中,流水线由多个流程节点组成,流程之间通过channel连接,每个流程节点可以由多个同时运行的goroutine组成。
如何构造流水线
有了流水线模式的思维,接下来就是如何构造流水线了。简单来说,其实就是通过channel将任务流程连接起来,两个相邻的流程互为生产者和消费者,通过channel进行通信。耗时的流程可以将任务分散到多个协程来执行。
我们先来看一个最简单的流水线,如下图,A是生产者流程,B是它的消费流程,同时又是C的生产者流程。A,B,C三个协程直接,通过读写channel进行通信。
那如果此时B流程可以将a channel中的任务并发执行呢,很简单,我们只需要起多个B协程就可以了。如下图。
总之,我们构造流水线并发的思路是关注数据的流动,数据流动的过程交给channel,channel两端数据处理的每个环节都交给goroutine,这个流程连起来,就构成了流水线模型。
关于channel
为什么我们可以选择channel来进行协程间的通信呢,协程之间又是怎么保持同步顺序呢,当然这都要归功于channel。channel是go提供的进程内协程间的通信方式,它是协程/线程安全的,channe的读写阻塞会导致协程的切换。
channel的操作和状态组合可以有以下几种情况:
**有1个特殊场景**:当`nil`的通道在`select`的某个`case`中时,这个case会阻塞,但不会造成死锁。
channel不仅可以保证协程安全的数据流动,还可以保证协程的同步。当有并发问题时,channel也是我们首先应该想到的数据结构。不过显而易见,当使用有缓冲区的channel时,才能达到协程并发的效果,并且生产者和消费者的协程间是相对同步的。使用无缓冲区的channel时,是没有并发效果的,协程间是绝对同步的,生产者和消费者必须同时写和读协程才能运行。
channel关注的是数据的流动,这种场景下都可以考虑使用channel。比如:消息传递、信号广播、任务分发、结果汇总、同步与异步、并发控制… 更多的不在这里赘述了,总之, Share memory by communicating, don’t communicate by sharing memory.
流水线模型实例
举个简单栗子,计算80000以内的质数并输出。
这个例子如果我们采用非并发的方式,就是for循环80000,挨个判断是不是素数再输出。不过如果我们采用流水线的并发模型会更高效。
从数据流动的角度来分析,需要遍历生成1-80000的数字到一个channel中,数字判断是否为素数,输出结果到一个channel中。因此我们需要两个channel,channel的两端就设计成协程即可。
1、遍历生成原始80000个数据(生产者)
2、计算这80000个数据中的素数(生产者+消费者)
3、取结果输出(消费者)
代码如下:
package gen_channel
import "fmt"
import "time"
func generate_source(data_source_chan chan int) {
for i := 1; i <= 80000; i++ {
data_source_chan <- i
}
fmt.Println("写入协程结束")
close (data_source_chan)
}
func generate_sushu(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool) {
for num:= range data_source_chan {
falg := true
for i := 2; i < num; i++ {
if num%i == 0 {
falg = false
break }
}
if falg == true {
data_result_chan <- num
}
}
fmt.Println("该协程结束")
gen_chan <- true
}
func workpool(data_source_chan chan int, data_result_chan chan int, gen_chan chan bool, gen_num int){
// 开启8个协程
for i := 0; i < gen_num; i++ {
go generate_sushu(data_source_chan, data_result_chan, gen_chan)
}
}
func Channel_main() {
// 任务数据
data_source_chan := make(chan int, 2000)
// 结果数据
data_result_chan := make(chan int, 2000)
// 所有任务协程是否结束
gen_chan := make(chan bool, 8)
time1 := time.Now(). Unix ()
go generate_source(data_source_chan)
// 协程池,任务分发
workpool(data_source_chan, data_result_chan, gen_chan, 8)
// 所有协程结束后关闭结果数据channel
go func() {
for i := 0; i < 8; i++ {
<-gen_chan
}
close(data_result_chan)
fmt.Println("spend timeis ", time.Now().Unix()-time1)
}()
for date_result := range data_result_chan {
fmt.Println(date_result)
}
}
上面这段代码中。 data_source_chan 和 data_result_chan 这两个channel分别用来放原始数据和结果数据,buffer分别为2000。
generate_source协程: 生产数据,它会把数据写入data_source_chan通道,全部写入完成后关闭通道。
generate_sushu协程: 负责计算并判断data_source_chan中的数据是否为质数,是的话就写入data_result_chan通道。
主协程for date_result := range data_result_chan: 最后负责读取data_result_chan中的结果,直到data_result_chan关闭后结束程序。
可以看到我们通过workpool方法起了8个 generate_sushu 协程来并发处理 data_source_chan 的任务。那么就有一个问题,如何知道所有数据都已处理完毕呢,等到生产者 generate_source 协程结束data_source_chan关闭吗? 恐怕不是,因为可能 data_source_chan 关闭后8个任务协程仍然在继续计算。那么只能等8个协程全部处理完毕后,才能说明所有数据已处理完,从而才能关闭 data_result_chan ,然后主协程读取 data_result_chan 结束。
因此我们这里引入了另一个channel: gen_chan ,来记录计算结束的任务。每个 generate_sushu 协程处理完,就写入一个记录到channel中。因此我们有一个匿名协程,当可以从 gen_chan 中取8个结果出来的话,就说明所有协程已计算完成,那么可以关上阻塞程序的最后阀门 data_result_chan 。
当然这种设计方式并不唯一,我们也可以不用统一的 data_result_chan 来接收结果,而是每个协程分配一个channel来存放结果,最后再merge到一起。
可能大家觉得这种方式很复杂,确实比较高效但写起来并不友好,那有没有更友好的方式呢?
sync包
在处理并发任务时我们首先想到的应该是channel,但有时候channel不是万能或者最方便的,所以go也为我们提供了sync包。
sync包提供了各种异步及锁类型及其内置方法。用起来也很方便,比如 Mutex 就是给协程加锁,某个时段内不能有多个协程访问同一段代码。 WaitGroup 就是等待一些工作完成后,再进行下一步工作。 Once 可以用来确保协程中某个函数只执行1次…当我们面对一个并发问题的时候,应该去分析采用哪种协程同步方式, 是channel还是Mutex呢。这需要看我们关注的是数据的流动还是数据的安全性 。篇幅原因这里不再展开讲了。
- Mutex:互斥锁
- RWMutex:读写锁
- WaitGroup:等待组
- Once:单次执行
- Cond:信号量
- Pool:临时对象池
- Map:自带锁的map
我们接着上面质数的问题,使用sync中的WaitGroup,会让我们的代码更加友好,因为我们不需要引入一个channel来记录是否4个车轮都换完了,让WaitGroup来做就好了。
package gen_channel
import (
"fmt"
"time")
import "sync"
func generate_source3(data_source_chan chan int) {
for i := 1; i <= 80000; i++ {
data_source_chan <- i
}
fmt.Println("写入协程结束")
close(data_source_chan)
}
func generate_sushu3(data_source_chan, data_result_chan chan int, wg *sync.WaitGroup) {
defer wg.Done()
for num := range data_source_chan {
falg := true
for i := 2; i < num; i++ {
if num%i == 0 {
falg = false
break }
}
if falg == true {
data_result_chan <- num
}
}
fmt.Println("该协程结束")
}
func workpool3(data_source_chan chan int, data_result_chan chan int, wg *sync.WaitGroup, gen_num int) {
// 开启8个协程
for i := 0; i < gen_num; i++ {
wg.Add(1)
go generate_sushu3(data_source_chan, data_result_chan, wg)
}
}
func Channel_main3() {
data_source_chan := make(chan int, 500)
data_result_chan := make(chan int, 2000)
time1 := time.Now().Unix()
var wg sync.WaitGroup
go generate_source3(data_source_chan)
// 开启8个协程
for i := 0; i < 8; i++ {
wg.Add(1)
go generate_sushu3(data_source_chan, data_result_chan, &wg)
}
wg.Wait()
close(data_result_chan)
fmt.Println("spend timeis ", time.Now().Unix()-time1)
for date_result := range data_result_chan {
fmt.Println(date_result)
}
}
总结
流水线模式的设计要关注数据的流动,然后在数据流动的路径中将数据放到channel中,将channel的两端设计成协程。
并发设计中channel和sync可以从开发效率和性能的角度自由组合,channel不一定是最优解
写入channel的协程来控制该协程的关闭,消费者协程不关闭读协程,防止报错。养成在协程入口限制channel读写类型的习惯。
以上是我们在go并发的流水线模型中的一些总结。可以看出go的协程并发更考验我们的设计能力,因为协程间的同步和数据传递都交给了开发者来设计。同时也留给我们一些引申思考, 协程在IO密集和CPU密集的情况下是否都能大幅提高性能呢?是否和channel的缓冲区或者并发设计有关呢?协程异常该怎么处理呢?go的协程和python的协程又有什么区别呢? …我们后面慢慢探讨~