常见姿势
func main() {
var a chan int
fmt.Println(<-a)
}
fatal error: all goroutines are asleep - deadlock!
向一个nil的chan发送数据会deadlock
func main() {
var a chan int
a <- 1
}
fatal error: all goroutines are asleep - deadlock!
从一个已经关闭的chan获取数据,得到对应的零值
func main() {
var a chan int
a = make(chan int, 1)
close(a)
v, ok := <-a
fmt.Println(v, ok) //0,false
}
向一个已经关闭的chan发送数据会panic
func main() {
var a chan int
a = make(chan int, 1)
close(a)
a <- 1
}
panic: send on closed channel
把一个已经关闭的chan再次关闭会panic
func main() {
var a chan int
a = make(chan int, 1)
close(a)
close(a)
}
panic: close of closed channel
没有buffer的chan,要提前做好接收的准备,否则会deadlock
func main() {
var a chan int
a = make(chan int)
a <- 1
}
fatal error: all goroutines are asleep - deadlock!
有buffer的chan,在buffer满了之后,再发送会deadlock
func main() {
var a chan int
a = make(chan int, 1)
a <- 1 //不会报错
a <- 2 //报错
}
fatal error: all goroutines are asleep - deadlock!
nil的chan,在select和default组合下,不会报错
func main() {
var a chan int
select {
case <-a: //nil的chan并不会报错
default: //会走到default
}
}
range一个chan,记得close。否则会deadlock
func main() {
var a chan int
a = make(chan int)
go func() {
a <- 1
close(a) //如果没close,那么就会deadlock
}()
for v := range a {
fmt.Println(v)
}
time.Sleep(time.Second)
}
源码分析
从make chan开始
func makechan(t *chantype, size int) *hchan {
elem := t.elem
省略...
switch {
case mem == 0: //无缓冲的chan
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0: //元素不含指针
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: //默认
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
- 如果new的是无buffer的chan,那么只需要new一个hchan即可
- 如果new的是有buffer的且元素类型不是指针类型,那么hchan和buffer可以一起申请一块连续的内存
- 如果new的是有buffer的且元素是指针类型,那么就不能一起申请,hchan单独申请,buffer单独申请。
hchan的结构
type hchan struct {
qcount uint // 环形队列的长度
dataqsiz uint // 环形队列的长度、缓冲区的大小
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // 每个元素的大小
closed uint32 // 通道是否关闭,1关闭 0打开
elemtype *_type // 元素类型
sendx uint // 已发送元素在循环数组中的索引
recvx uint // 已接收元素在循环数组中的索引
recvq waitq // 等待接收的goroutine队列
sendq waitq // 等待发送的goroutine队列
lock mutex //互斥锁,数据的出入都是要锁保护的
}
- buf本身是个环形队列
- sendx:每次推入数据的时候会加1
- recvx:每次取出数据的时候会加1
- recvq:每次取数据发生阻塞的时候,会把当前goroutine放入此队列,待下次唤醒
- sendq:每次推数据发生阻塞的时候,会把当前goroutine放入此队列,待下次唤醒
- lock:buf是一块公共的内存空间,所以需要一把锁
发一个数据
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc()) // block=true
}
像c<-x这样的语句会被编译成 chansend1 , chansend1 调用 chansend 。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block { //block=false的情况 一般在select的时候
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) //向nil的chan发送会发生阻塞
throw("unreachable")
}
if !block && c.closed == 0 && full(c) { //select的时候,chan没关闭,且(buffer也满了或接收方未准备好接收)
return false
}
...
lock(&c.lock) //上锁安全保护
if c.closed != 0 { //已经关闭的chan
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil { //正好有等待取的goroutine
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz { //缓冲区没满
qp := chanbuf(c, c.sendx) //获取当前sendx的索引
...
typedmemmove(c.elemtype, qp, ep) //新元素的copy进去
c.sendx++ // 索引+1
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++ //数量加+1
unlock(&c.lock)
return true
}
// 接下来都是缓冲区满的情况
if !block {
unlock(&c.lock)
return false
}
gp := getg()
mysg := acquireSudog() //打包suog
c.sendq.enqueue(mysg) //推入发送队列
...
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) //让发送的goroutine进入睡眠,等待被唤醒
//以下是恢复时干的事
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg) //释放sudog
return true
}
- chansend的block参数为false的情况,一般就是我们用select的时候
- 向一个nil的chan发送数据会阻塞(非select)
- 向一个chan发数据的时候,其实底层也是会上锁的。
- 向一个chan发送的时候,如果正好有一个等待的取的goroutine,那么直接发给它
- 当此时没有正好等待接收者的时候,会尝试放到缓冲区中
- sendx用于记录发送的索引
- 当缓冲区满或者没有缓冲区的时候,将当前goroutine打包成 sudog 结构
- 将sudog推入sendq的队列
- 当前发送者的goroutine会尝试进入休眠,等待下次被唤醒
- 唤醒后,会继续执行数据的发送
接收一个数据
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
像<-c这样的语句会被编译成 chanrecv1 , chanrecv1 调用 chanrecv 。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c == nil {
if !block { //不阻塞的话,直接返回
return
}
//从一个nil的chan接收数据会阻塞
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
lock(&c.lock) //上锁
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil { //正好有发送者
recv(c, sg, ep, func() { unlock(&c.lock) }, 3) //(带缓冲和不带缓冲的接收)
return true, true
}
if c.qcount > 0 { // buffer有数据
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp) //直接取数据 copy方式
}
typedmemclr(c.elemtype, qp) //清理取掉的位置
c.recvx++ //索引加1
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
//以下缓冲区无数据
if !block { //不需要阻塞的话,直接返回
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog() //打包成sudog
...
c.recvq.enqueue(mysg) //推入recvq队列
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) //让出cpu,等待下次调度
//被唤醒后
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg) //解包sudog
return true, !closed
}
- 从一个nil的chan接收数据的时候,会阻塞(非select 模式)
- 从一个chan接收数据的时候,底层也是会上锁的
- 当接收的时候,正好有发送者,尝试直接从发送者那里取数据。(无缓冲的话,直接从发送者的copy到接收者,有缓冲的话,说明此时缓冲肯定满了,那么从缓冲区取走数据后,同时也唤醒下发送方可以继续发送数据了)
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 { //无缓冲区
...
if ep != nil {
// 从发送者那里copy数据
recvDirect(c.elemtype, sg, ep)
}
} else {//缓冲区一定满了
...
// copy data
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
...
goready(gp, skip+1) //唤醒准备发送的goroutine
}
- buffer有数据的时候,尝试直接从buffer copy数据出来
- buffer无数据的时候,如果不阻塞直接返回
- 如果阻塞,那么当前goroutine被打包成sudog
- 然后推入等待接收的队列中
- 让出cpu,等待下次被调度
- 被唤醒后,继续执行获取数据
关闭一个通道
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel")) // close 一个nil的chan panic
}
lock(&c.lock) //上锁
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel")) //已经关闭的chan 再次close会panic
}
...
c.closed = 1 //关闭chan
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
- 关闭一个nil的chan会panic
- 关闭一个已经关闭的chan会panic
- 把chan的close标识1
- 释放所有的recvq list
- 释放所有的sendq list
欢迎大家关注公众号《假装懂编程》,我将持续输出网络、数据库、go、缓存、架构、面试、程序人生相关文章。