七叶笔记 » golang编程 » Go语言进阶之路(五):通道和goroutine、GPM

Go语言进阶之路(五):通道和goroutine、GPM

通道

Go语言可以有效地利用多核CPU,并发性能好,这正是由于goroutine和通道还有GPM模型的原因。

我们知道, Python 语言由于全局锁GIL,单个Python应用的多线程代码没办法有效利用多核CPU,只能写多进程来利用多核CPU。如果用标准库的multiprocessing会对监控和管理造成不少的挑战。部署Python应用的时候通常是每个CPU核部署一个应用,这样会浪费不少资源。Go语言的goroutine和GPM模型性能远远超越Python的并发模型。先来看看通道和goroutine。

Go语言中通道类型是chan关键字,通过make来创建。可以创建带缓冲的通道和无缓冲的通道。

创建通道

 unbuffered := make(chan int)  // 无缓冲的整型通道
buffered := make(chan string, 10)  // 有缓冲的字符型通道  

缓冲通道和无缓冲通道有什么区别?

无缓冲通道没有容量,我们想要往通道中发送数据,那么一定要有另一个操作在往这个通道取数据,否则,发送数据的那段代码也会被阻塞。

缓冲通道只是有一段容量可以缓冲一下,当缓冲区未满时,往通道中发送数据不会阻塞当前操作;当缓冲区满了时往通道中发送数据,会想无缓冲通道那样阻塞。

发送数据

 buffered <- "Gopher"  // 通过通道发送一个字符串  

接收数据

 received :=  <- buffered        // 通过通道接收一个字符串
<- unbuffered                   // 接收并丢弃字符串,如果unbuffered通道里没数据,则会一直阻塞直到有数据放入
receiver, ok := <- buffered
if !ok {
    xxx                         // 通道被关闭等问题
}  

关闭通道

通常来说,我们往通道里发送完数据后,我们应该关闭通道(一般来说由生产者来关闭通道),这就相当于告诉消费者,我的数据发送完了,你取完了数据之后就离开吧。

我们应该在所有数据都放入通道后再关闭通道,否则,如果先关闭通道,再往里放数据的话会产生panic。看下面的例子:

  close (buffered)


go func() {
  for {
    j, more := <-jobs        // 如果通道已被关闭,并且数据已全被取走,则more为false
    if more {
      fmt.Println("received job", j)
    } else {
      fmt.Println("received all jobs")
      done <- true
      return
    }
  }
}()


for post := 1; post <= taskLoad; post++ {
  tasks <- fmt.Sprintf("Task : %d", post)
}
// 当所有工作都处理完时关闭通道,以便所有goroutine退出
close(tasks)
通道可以用来实现同步功能:
func worker(done chan bool) {
  fmt.Print("working...")
  time.Sleep(time.Second)
  fmt.Println("done")
  done <- true
  close(done)
}
func main() {
  done := make(chan bool)
  go worker(done)
  <-done                        // 阻塞,直到通道中有数据
}  

单向通道

默认地,我们使用make(ch chan int)创建的通道都是双向通道,我们可以往里面发送数据也可以从里面读取数据。我们也可以在定义通道的地方指明通道的方向,看下面的例子:

 func ping(pings chan<- string,  msg  string) {
  // pings单向通道,函数中只能往pings发送数据
  pings <- msg
}

func pong(pings <-chan string, pongs  ch an<- string) {
  // pings单向通道,函数中只能从pings接收数据
  // pongs单向通道,函数中只能往pongs发送数据
  msg := <-pings
  pongs <- msg
}  

声明单向通道最大的好处就是可以限制程序的行为。比如我们在定义接口给其他人调用时,我们就可以指明通道为单向通道,接口使用者只能往通道发送数据或者只能从通道读取数据。

生产者消费者问题

生产者只往通道里面发送数据,发送完后关闭通道。消费者只从通道里面读取数据。这可比 Java 中使用wait/notify来实现生产者消费者方便多了。

 func producer(ch chan<- int) {
  for i := 1; i <= 10; i++ {
    ch <- i
  }
  close(ch)
}

func consumer(ch <-chan int) {
  for i := range ch {
    fmt.Println("consumed something: ", i)
  }
}

func main() {
  ch := make(chan int)
  go producer(ch)
  go consumer(ch)
  time.Sleep(time.Second)
}  

goroutine

Go语言中的goroutine,一般也称作协程,不由OS调度,而是用户层自行释放CPU,从而在执行体之间切换调度运行。Go语言在底层进行协助实现。涉及系统调用的地方由Go标准库协助释放CPU。总之,协程运行不通过OS进行切换,由Go语言自行切换,系统运行开支大大降低。一个进程内部可以运行多个 线程 ,而每个线程又可以运行很多协程。线程要负责对协程进行调度。当一个协程睡眠时,它将线程的运行权让给其它的协程来运行。同一个线程内部最多只会有一个协程正在运行。

创建Goroutine

创建goroutine很方便,创建个匿名函数并调用,然后在前面加上go关键字就可以了。

 go func(msg string) {
    fmt.Println(msg)
}("going")  

还可以把函数赋给一个变量,然后创建goroutine运行:

 var myFunc = func(msg string) {
    fmt.Println(msg)
}
go myFunc("going")  

主协程和子协程

Go语言所有代码都运行在协程中,由Go运行时进行调度,其中main函数所在的协程叫做主协程,其余的协程叫做子协程。

 func main() {
  for i := 0; i < 5; i++ {
    go func(i int) {
      fmt.Print(i, " ")
    }(i)
  }
  time.Sleep(time.Second)
}
// 输出:0 4 3 1 2  

当主协程执行完而子协程未执行完时,所有子协程会随着主协程的结束而退出运行。上面的代码输出“0 4 3 1 2 ”,这是因为创建完协程后放入运行时队列到真正运行有个过程,哪个协程先被放入运行时队列,哪个就能先执行。

线程的调度是由操作系统负责的,调度算法运行在内核态,而协程的调用是由 Go 语言的运行时负责的,调度算法运行在用户态。这就使得Go语言的协程比Java的线程更加轻量级。单一的Java应用创建几十上百个线程并运行就已经非常消耗内存和CPU资源了,而单一的Go应用可以创建上千万的goroutine还正常地运行。

GPM

先回顾一下Java中线程和操作系统内核线程的对应关系。Java中线程模型如下:

Java线程的实现方式是通过操作系统提供的高级编程接口——LWP(轻量级进程)来对应到内核线程上的,一个Java线程对应到一个LWP,对应到一个KLT(内核线程)。当Java线程阻塞,需要调度时,由内核调度器来进行切换,把当前KLT-LWP对应的Java线程切换到另一个Java线程,这种切换操作在内核态中进行,这种切换是很耗时的(相对CPU来说)。Java的一个线程对应到一个内核线程,一般创建Java线程占用的内存都大于1M。

Go语言使用了用户级线程的实现方式,Go语言中的goroutine可以理解为用户态的线程,调度切换goroutine直接在用户态进行,不用切换到内核态。

Go 调度器模型我们通常叫做GPM 模型,包括 4 个重要结构:

  1. G:Goroutine,每个 Goroutine 对应一个 G 结构体,我们使用go关键字创建goroutine,并非就一定创建了G结构体的实例,只有当没有可用的G时,才会创建G来装载我们创建的goroutine,否则,会复用现有可用的G来装载goroutine。G 存储 Goroutine 的运行堆栈、状态以及任务函数,可重用。G 并非执行体,每个 G 需要绑定到 P 才能被调度执行。
  2. P: Processor,表示逻辑处理器,对 G 来说,P 相当于 CPU 核心,G 只有绑定到 P 才能被调度。对 M 来说,P 提供了相关的执行环境(Context),如内存分配状态(mcache),任务队列(G)等。P 的数量决定了系统内最大可并行的 G 的数量(前提:物理 CPU 核数 >= P 的数量)。P 的数量由用户设置的 GOMAXPROCS 决定,但是不论 GOMAXPROCS 设置为多大,P 的数量最大为 256。
  3. M: Machine,OS 内核线程抽象,代表着真正执行计算的资源,在绑定有效的 P 后,进入 schedule 循环;而 schedule 循环的机制大致是从 Global 队列、P 的 Local 队列以及 wait 队列中获取。M 的数量是不定的,由 Go Runtime 调整,为了防止创建过多 OS 线程导致系统调度不过来,目前默认最大限制为 10000 个。M 并不保留 G 状态,这是 G 可以跨 M 调度的基础。
  4. Sched:Go 调度器,它维护有存储 M 和 G 的队列以及调度器的一些状态信息等。

GPM和Sched的关系如下:

​Go语言调度器Sched拥有全局运行队列(GPQ)和P的本地运行队列(LRQ)。GRQ中维护着可以运行但是没有绑定到P的所有goroutine,LRQ中维护着绑定到P的可运行的所有goroutine。可以看到,上图中有一个绑定到P的goroutine正在M上运行,M对应到操作系统内核线程。

当正在M上运行的G在某些情况阻塞时,Sched可以直接把P上其他可运行的G调度进来,这种调度操作在用户态进行,十分轻量级。而且,Go语言中创建的goroutine只需要大约占用2KB的内存,比Java中线程占用空间小多了。因此,Go语言程序可以轻松hold住百万goroutine的调度运行。

GPM源码和Sched源码如下,从源码中,我们更能清楚GPM之间的关系:

G

G中会保存goroutine运行函数,goroutine的id,还有被绑定运行的M的指针:

 struct G {
    uintptr    stackguard;    // 分段栈的可用空间下界
    uintptr    stackbase;    // 分段栈的栈基址
    Gobuf    sched;        //进程切换时,利用sched域来保存上下文
    uintptr    stack0;
    FuncVal*    fnstart;        // goroutine运行的函数
     void *    param;        // 用于传递参数,睡眠时其它goroutine设置param,唤醒时此goroutine可以获取
    int16    status;        // 状态Gidle,Grunnable,Grunning,Gsyscall,Gwaiting,Gdead
    int64    goid;        // goroutine的id号
    G*    schedlink;
    M*    m;        // for debuggers, but offset not hard-coded
    M*    lockedm;    // G被锁定只能在这个m上运行
    uintptr    gopc;    // 创建这个goroutine的go表达式的pc
    ...
};  

P

P中含有本地运行队列LRQ,P当前的状态:

 struct P {
    Lock;
    uint32    status;  // Pidle或Prunning等
    P*    link;
    uint32    schedtick;   // 每次调度时将它加一
    M*    m;    // 链接到它关联的M (nil if idle)
    MCache*    mcache;

    G*    runq[256];
    int32    runqhead;
    int32    runqtail;

    // Available G's (status == Gdead)
    G*    gfree;
    int32    gfreecnt;
    byte    pad[64];
};  

M

M中包含了绑定到M的P的指针,正在M上运行的G的指针,M的状态等等:

 struct M {
    G*    g0;        // 带有调度栈的goroutine
    G*    gsignal;    // signal-handling G 处理信号的goroutine
    void    (*mstartfn)(void);
    G*    curg;        // M中当前运行的goroutine
    P*    p;        // 关联P以执行Go代码 (如果没有执行Go代码则P为nil)
    P*    nextp;
    int32    id;
    int32    mallocing; //状态
    int32    throwing;
    int32    gcing;
    int32    locks;
    int32    helpgc;        //不为0表示此m在做帮忙gc。helpgc等于n只是一个编号
    bool    blockingsyscall;
    bool    spinning;
    Note    park;
    M*    alllink;    // 这个域用于链接allm
    M*    schedlink;
    MCache    *mcache;
    G*    lockedg;
    M*    nextwaitm;    // next M waiting for lock
    GCStats    gcstats;
    ...
};  

Sched

Sched包含当前空闲的Md指针,当前空闲的P的指针,全局运行队列GRQ等等:

 struct Sched {
    Lock;

    uint64    goidgen;

    M*    midle;     // idle m's waiting for work
    int32    nmidle;     // number of idle m's waiting for work
    int32    nmidlelocked; // number of locked m's waiting for work
    int3    mcount;     // number of m's that have been created
    int32    maxmcount;    // maximum number of m's allowed (or die)

    P*    pidle;  // idle P's
    uint32    npidle;  //idle P的数量
    uint32    nmspinning;

    // Global runnable queue.
    G*    runqhead;
    G*    runqtail;
    int32    runqsize;

    // Global cache of dead G's.
    Lock    gflock;
    G*    gfree;

    int32    stopwait;
    Note    stopnote;
    uint32    sysmonwait;
    Note    sysmonnote;
    uint64    lastpoll;

    int32    profilehz;    // cpu profiling rate
}  

相关文章