在如今硬件非常发达的时代,并发编程(concurrent programming)变得非常重要,Go在并发编程比大多数语言更方便。
1. Goroutines
Go中没有多进程多线程的概念,而是使用goroutine的概念。为了方便理解,你暂时可以把goroutine理解为其他语言中的线程,后面我们会说二者之间有什么区别。使用 go
关键字就可以启动一个goroutine。看下面例子:
Go
package main import ( "fmt" "time")func sub_goroutine() { fmt.Println("I'm goroutine 1")}func main() { fmt.Println("In Main Goroutine") go sub_goroutine() go func() { fmt.Println("I'm goroutine 2") }() time.Sleep(time.Second * 1)}
上面的程序中我们在main(main其实也是goroutine,类似于主进程的概念)中创建了两个goroutine,和线程一样,goroutine一旦被创建就会立刻去执行。如果在goroutine返回前,main就结束的话,那这些子goroutine也会被强制退出。上面程序中,我们为了保证main退出前两个goroutine执行完毕,我们在main中调用了sleep,当然这样并不能绝对保证goroutine之间的先后顺序,下面我们介绍一种可靠的机制来实现多个goroutine之间的先后顺序。
2. channel s
我们知道进程间通信和线程间通信有很多种方法,那goroutine间如何通信呢?没错,就是用 ch annel(信道)。我们先简单介绍一下Channel,再看如何利用Channel在不同goroutine之间通信。关于Channel有如下要点:
声明一个channel使用关键字
chan
,后面跟这个channel里面所传递的对象类型,创建channel使用make
。Go
ch := make(chan int) // ch has type 'chan int'
上面创建了一个(只能)传递int类型元素的channel。
channel主要有三个操作:
send。比如
ch <- x
表示将x发送到ch。receive。比如
x = <-ch
表示从ch中接收值并赋给变量x。close 。我们可以使用
close
函数关闭一个channel:close(ch)
。当一个channel被关闭的时候,表示不会再有数据发送到这个channel上面。如果向一个已经关闭的channel发送数据,到导致panic。如果从一个已经关闭的channel上面接收数据,会先将channel上面残留的数据全部接收,后面再接收时会收到该channel类型的零值。
channel是引用类型的。相同类型的channel可以使用
==
进行比较,如果他们引用了相同的数据结构,则结果为真,否则为假。channel的零值是nil,在一个为nil的channel上面执行send或者receive将永远阻塞。
make创建channel时,可以接收第二个参数,表示创建一个容量为指定值得channel:
Go
ch = make(chan int)ch = make(chan int, 0)ch = make(chan int, 3)
上面的例子中,前两行创建的channel称为Unbuffered Channel,最后一句创建的channel称为Buffered Channel,下面我们分别介绍这两种不同类型的Channel。
2.1 Unbuffered Channel
假设ch是一个Unbuffered Channel,那么我们在goroutine A中向ch中发送数据时,A会一直被阻塞,直到有另外一个goroutine从ch中读取数据,A才会继续往下执行。同理,如果A从ch中读取数据,那么A会一直被阻塞,直到有另外一个goroutine向ch中发送数据。所以可以看到,通过Unbuffered Channel,我们可以实现两个进程的同步(synchronize),所以Unbuffered Channel有时也被称为同步信道(synchronous channels)。
看一个例子:
Go
package mainimport ( "fmt" "time")func main() { ch := make(chan int) go func() { for i := 0; i < 10; i++ { time.Sleep(time.Millisecond) fmt.Println("Sub goroutine send: ", i) ch <- i } close(ch) }() for i := 0; i < 10; i++ { time.Sleep(time.Millisecond) fmt.Println("Main goroutine receive: ", <-ch) }}
程序执行结果:
Bash
Sub goroutine send: 0 Main goroutine receive: 0 Sub goroutine send: 1 Main goroutine receive: 1 Sub goroutine send: 2 Main goroutine receive: 2 Sub goroutine send: 3 Main goroutine receive: 3 Sub goroutine send: 4 Main goroutine receive: 4 Sub goroutine send: 5 Main goroutine receive: 5 Sub goroutine send: 6 Main goroutine receive: 6 Sub goroutine send: 7 Main goroutine receive: 7 Sub goroutine send: 8 Main goroutine receive: 8 Sub goroutine send: 9 Main goroutine receive: 9[Finished in 0.2s]
上面的例子中我们在Main goroutine中从ch中接收数据,在Sub goroutine中发送数据,可以看到收发是同步的。(PS:这里在收发中个sleep 1毫秒是因为CPU的执行速度远远大于IO速度,当我们从ch接收数据后,发送端就不再阻塞,就可以马上再发送,并且打印日志,但此时接收端可能还在打印输出,所以可能最终的打印就会有些错位,没有那么直观。所以我们增加一个sleep来消除这个问题)
当然,从这个例子中我们预先知道发送了10次,所以我们也就接收10次,但如果我们不知道发送了多少次,那接收端如何知道发送端是否已经发送结束了呢?有同学可能会说当我们接收到零值的时候。显然这个是行不通的,比如上面的例子中,ch中元素类型的零值就是整数0,那如果发送端发送的数据也是0的话,接收端是无法判断出发送端发送的数据是0,还是因为发送端已经关闭了channel而导致接收到0.当然,Go设计者早就考虑到了这个问题,所以提供了’comma, ok’机制来解决这个问题。我们从channel接收数据时,可以获得一个bool值,如果channel已经关闭并且已经没有数据可以再接收,那这个bool值就为false,否则为true,这样我们通过这个bool值就可以判断上面的情景了。这里我们更改一下上面的例子:
Go
package main import ( "fmt" "time")func main() { ch := make(chan int) go func() { for i := 0; i < 10; i++ { time.Sleep(time.Millisecond) fmt.Println("Sub goroutine send: ", i) ch <- i } close(ch) }() var ( temp int ok bool ) for { if temp, ok = <-ch; !ok { fmt.Println("Channel has been closed and drained.") break } fmt.Println("Main goroutine receive: ", temp) time.Sleep(time.Millisecond) } return}
除此以外,还可以使用更加方便的 range loop
来遍历channel,当channel里面的所有元素被读取完后,循环会自动退出。看下面的例子:
Go
package main import ( "fmt" "time")func main() { ch := make(chan int) go func() { for i := 0; i < 10; i++ { time.Sleep(time.Millisecond) fmt.Println("Sub goroutine send: ", i) ch <- i } close(ch) }() for temp := range ch { fmt.Println("Main goroutine receive: ", temp) time.Sleep(time.Millisecond) } return}
最后关于Unbuffered Channel还有以下注意点:
channel使用完以后可以不用显式的关闭,程序结束时会自动关掉。但是如果我们想告诉接收端所有数据已经发送完毕的话,那我们就需要在所有数据发送后显式的关闭channel。但是关闭已经关闭的channel会导致panic。
我们可以声明只用于发送(
chan <- type
,send-only channel)或者只用于接收(<- chan type
,receive-only channel)的channel,这在函数传参是往往非常有用。这种单向的channel我们称之为Unidirectional Channel。
2.2 Buffered Channel
Buffered Channel有点像消息队列,其大小在使用make创建的时候由第二个参数指定。它和Unbuffered Channel的区别在于它没有被填满之前是非阻塞的,比如一个容量为100的Buffered Channel,我们可以一直往里面发送数据,在channel达到100个元素之前,发送是不会被阻塞的。当满100后,发送就会被阻塞,此时,接收端接收一个,就可以再发送一个。接收端也是相同的道理,只有当里面没有元素时才会阻塞。
我们可以使用 len
函数获取当前channel中元素的个数,可以使用 cap
函数获取channel的容量。
看个例子:
Go
package main import ( "fmt")func main() { ch := make(chan int, 4) fmt.Println("chan cap:", cap(ch)) fmt.Println("chan len:", len(ch)) go func() { for i := 0; i < 10; i++ { fmt.Println("Sub goroutine send: ", i) ch <- i } close(ch) }() for temp := range ch { fmt.Println("Main goroutine receive: ", temp) } return}
程序执行结果:
Bash
chan cap: 4 chan len: 0 Sub goroutine send: 0 Sub goroutine send: 1 Sub goroutine send: 2 Sub goroutine send: 3 Sub goroutine send: 4 Sub goroutine send: 5 Main goroutine receive: 0 Main goroutine receive: 1 Main goroutine receive: 2 Main goroutine receive: 3 Main goroutine receive: 4 Main goroutine receive: 5 Sub goroutine send: 6 Sub goroutine send: 7 Sub goroutine send: 8 Sub goroutine send: 9 Main goroutine receive: 6 Main goroutine receive: 7 Main goroutine receive: 8 Main goroutine receive: 9[Finished in 0.2s]
3. Select
Go也提供了多路复用机制——select,语法如下:
Go
select {case 场景1: // ...case 场景2: //...case 场景n: //...default: // 可选的 //...}
上面的每种场景都必须指定从某个channel读取数据或者向某个channel发送数据(也就是说select是配合channel用的)。
没有default语句的情况下,select会一直阻塞到某个case可以处理,即成功发送或成功接收到数据。如果有多个case同时ready,就会随机从中选一个进行处理。当然,select也可以不包含任何case,此时,select将永远阻塞。
如果有default语句的话,select将是非阻塞的:如果所有的case都没有ready,将直接执行default语句。
看两个例子:
阻塞型:
Go
package main import "time" import "fmt" func main() { c1 := make(chan string) c2 := make(chan string) go func() { time.Sleep(time.Second * 1) c1 <- "one" }() go func() { time.Sleep(time.Second * 2) c2 <- "two" }() for i := 0; i < 2; i++ { select { case msg1 := <-c1: fmt.Println("received", msg1) case msg2 := <-c2: fmt.Println("received", msg2) } }}
程序执行结果:
Bash
received one received two[Finished in 2.4s]
非阻塞型:
Go
package main
import "time"
import "fmt"
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
time.Sleep(time.Second * 1)
c1 <- "one"
}()
go func() {
time.Sleep(time.Second * 2)
c2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
default:
fmt.Println("Non-b Lock ing")
}
}}
程序运行结果:
Bash
Non-blocking Non-blocking[Finished in 0.3s]
4. 锁
锁是并发程序中必不可少的一部分,Go的sync包提供了排它锁 sync.Mutex
和 读写锁 sync.RWMutex
.除此以外,还提供了一个保证函数只执行一次的函数 sync.Once
.下面我们分别介绍。
4.1 sync.Mutex
排它锁的使用很简单,利用sync.Mutex,我们可以保证同一时刻只有一个goroutine可以访问临界区。看个例子:
不使用锁:
Go
package main import ( "fmt" // "sync")func main() { done := make(chan bool, 100) var a int for i := 0; i < 100; i++ { go func() { for j := 0; j < 10000; j++ { a++ } done <- true }() } for i := 0; i < 100; i++ { <-done } fmt.Println("a:", a)}
这里我们创建100个goroutine,每个goroutine里面循环10000,每次都对a加1。抛开并发的概念的话,最终a的值应该是100*10000=1000000.但实际我们运行程序却会发现每次运行a的值都不一样,且都比1000000小。这个就是因为100个goroutine并发执行的结果。下面我们对操作a的地方都加上锁:
Go
package main
import (
"fmt"
"sync")func main() {
done := make(chan bool, 100)
var mu sync.Mutex var a int
for i := 0; i < 100; i++ {
go func() {
for j := 0; j < 10000; j++ {
mu.Lock()
a++
mu. Unlock ()
}
done <- true
}()
}
for i := 0; i < 100; i++ {
<-done }
fmt.Println("a:", a)}
这样同一时刻只有一个goroutine可以操作a,便解决了并发的问题,多次运行程序a的值都是1000000。当然我们发现程序执行的时间也变长了。
sync.Mutex类型只有Lock和Unlock两个函数,且默认值(零值)为unlock状态。Lock和Unlock必须成对。
4.2 sync.RWMutex
sync.RWMutex为读写锁,所谓读写锁就是对于临界区资源,可以有多个人同时去读,但同一时刻只能有一个人写。即只存在两种状态:①一个人或多个人读临界区资源,没有人写临界区资源②只有1个人写临界区资源且无人读临界区资源。简单说就是读和读不互斥,但读和写互斥。
sync.RWMutex类型有四个主要函数(假设rw为sync.RWMutex类型的变量):
rw.Lock():锁住rw用于写,如果已经有人锁住rw,不论是读或者写,则操作被阻塞。
rw.Unlock():解锁rw,只能与rw.Lock配合使用。
rw.RLock():锁住rw用于读。
rw.RUlock():解锁rw,只能与rw.RLock配合使用。
关于读写锁,需要注意必须配合使用,即Lock和Unlock成对使用(读),RLock和RUlock成对使用(写)。
这里我们举一个简单的例子:
Go
package main import ( "fmt" "sync" "time")func main() { done := make(chan bool, 6) var rw sync.RWMutex go func() { rw.RLock() fmt.Println("Enter goroutine 1") time.Sleep(time.Second * 5) fmt.Println("Exit goroutine 1") rw.RUnlock() done <- true }() go func() { rw.RLock() fmt.Println("Enter goroutine 2") time.Sleep(time.Second * 5) fmt.Println("Exit goroutine 2") rw.RUnlock() done <- true }() go func() { rw.RLock() fmt.Println("Enter goroutine 3") time.Sleep(time.Second * 5) fmt.Println("Exit goroutine 3") rw.RUnlock() done <- true }() go func() { rw.Lock() fmt.Println("Enter goroutine 4") time.Sleep(time.Second * 5) fmt.Println("Exit goroutine 4") rw.Unlock() done <- true }() go func() { rw.Lock() fmt.Println("Enter goroutine 5") time.Sleep(time.Second * 5) fmt.Println("Exit goroutine 5") rw.Unlock() done <- true }() go func() { rw.Lock() fmt.Println("Enter goroutine 6") time.Sleep(time.Second * 5) fmt.Println("Exit goroutine 6") rw.Unlock() done <- true }() for i := 0; i < 6; i++ { <-done }}
上面的例子中,前3个goroutine都申请的是读锁,所以他们几个是可以同时进入临界区的,而后三个申请的都是写锁,它们要进入临界区的条件是没有任何人在临界区中,且它进入临界区后,其他所有人都不能再进入临界区。
关于Go的两种锁(排他锁和读写锁)我们需要注意以下两点:
只有加锁后才可以解锁,否则将导致运行时错误(runtime error)。比如,如果一个锁mu(排他锁)或者rw(读写锁)并没有调用过Lock/RLock,我们就在上面调用Unlock/RUlock的话,将导致运行时错误。所以,锁必须成对使用。
锁虽然必须成对使用,但可以在这个goroutine中加锁后,在另外一个goroutine里面解锁。即锁状态不是和goroutine绑定的。
4.3 sync.Once
这个锁的典型使用场景为:我们有一个程序有许多goroutine,但是他们都共享了一个全局资源,这个资源可以由任意一个且只能有一个goroutine初始化,而且这个初始化一般还是比较耗时的。这种情况下,我们就可以将初始化的动作写成一个函数,然后使用这个锁去调用这个函数,那么这个函数将只被执行一次。
这里我们用一个例子进行说明,例子中,有一个全局的map,里面记录了图标和图标所对应的图片,程序(该程序包含多个goroutine)启动时,任意一个goroutine需要检查这个map是否已经被初始化,若干没有,就初始化这个map,但是因为图片很多,初始化动作比较慢。很显然,因为map是个全局资源,这个初始化动作只能由一个goroutine去完成。下面看我们是怎么去实现的:
Go
package mainimport (
"fmt"
"image"
"sync"
"time")var icons map[string]image.Imagevar loadIconsOnce sync.Oncefunc loadIcon(name string) {
// do something
return}func loadIcons() {
icons = map[string]image.Image{
"a. png ": loadIcon("a.png"),
"b.png": loadIcon("b.png"),
"c.png": loadIcon("c.png"),
...
}}func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]}
我们声明了一个sync.Once变量,该类型有一个函数Do,其参数为一个不带任何参数,也没有返回值的函数。它的特性是:如果有多个人调用xxx.Do(f),那么只有第一个调用的人会执行函数f。对于这种场景,我们在其他语言中的,我们一般是使用一个排它锁去实现,没错,Go中的sync.Once底层实现也是借助排它锁和一个无符号整数变量实现的。Go的源代码如下:
Go
// Do calls the function f if and only if Do is being called for the// first time for this instance of Once. In other words, given// var once Once// if once.Do(f) is called multiple times, only the first call will invoke f,// even if f has a different value in each invocation. A new instance of// Once is required for each function to execute.//// Do is intended for initialization that must be run exactly once. Since f// is niladic, it may be necessary to use a function literal to capture the// arguments to a function to be invoked by Do:// config.once.Do(func() { config.init(filename) })//// Because no call to Do returns until the one call to f returns, if f causes// Do to be called, it will deadlock.//// If f panics, Do considers it to have returned; future calls of Do return// without calling f.//func (o *Once) Do(f func()) { if atomic.LoadUint32(&o.done) == 1 { return } // Slow-path. o.m.Lock() defer o.m.Unlock() if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() }}
5. Goroutine和 thread
从前面的介绍中,我们可以看到宏观上Goroutine和Thread几乎没有区别。现在我们从“微观”看看二者的区别。
5.1 栈(stack)的区别
我们知道OS的thread都有一个固定大小的栈,一般为2MB。而goroutine的栈是可变大小的,一般初始值2KB,随着使用可以动态扩展,最大可至1GB。这样有什么好处呢?
一般的线程可能用不到2MB大小的栈,这对于资源是一个浪费,限制了系统thread的个数。而goroutine栈初始值为2KB,很节省资源。
对于一些深递归函数,2MB的栈空间很可能不够用,栈溢出导致段错误。而Goroutine的栈大小是动态调整的。所以总体而言,goroutine栈大小的机制对于资源利用的更加充分。
5.2 调度的区别
thread是系统级别的,是由内核负责调度的。线程切换是需要进行上下文切换(context switch),而且需要更新调度器的数据结构。这个切换过程是比较缓慢的(当然线程级的切换比进程切换还是快不少的),其中可能还涉及CPU与内存的结构(可了解CPU与内存的亲和性)。
而goroutine是应用级的,是有goroutine自己的调度器调度的(使用的是“m:n scheduling”算法)。首先它是应用级的,所以多个goroutine之间的切换比thread之间的切换更加轻量级,因为不涉及内核级别的上下文切换。而且它的调度算法也尽量避免了goroutine在多个CPU核之间的切换,有点类似于进程/线程绑核后的效果。
一般如果一个机器有N个CPU,那goroutine的调度器会一次将go程序运行在N个thread上面。当然我们也可以使用GOMAXPROCS来指定我们所要使用的CPU核数(不能超出机器实际的核数)。
简单理解就是,系统底层依旧是thread,但goroutine有自己的调度器,这个调度器会起n(n为GOMAXPROCS的值或者CPU的核数)个thread,然后它会将go程序分布在这n个thread上去执行,且保证同一个goroutine以及相关的代码一直只在一个thread上面运行,这样就可以尽量避免内核级别的线程切换,从而提高了效率。