七叶笔记 » golang编程 » 面试必备主流的线程模型以及golang goroutine调度GPM

面试必备主流的线程模型以及golang goroutine调度GPM

现在主流的线程模型分三种:

用户级线程模型、内核级线程模型和两级线程模型(也称混合型线程模型).

用户级线程:

UTL用户线程和内核KSE(KernelSchedulingEntity)是多对一的映射关系,创建、销毁和调度全部都由自己程序(线程库)决定,相比内核级线程较轻量级,但是由于线程表在用户空间,所以操作系统内核对用户级线程无感知的,操作系统只知道进程的存在(进程表),如果全部是用户级线程的话,可以认为操作系统内核调度的基本单位是进程,内核一次只为一个进程分配一个内核处理器,因此一个线程的阻塞将会导致整个进程的阻塞;由于进程内的用户级多线程相对于内核线程KES来说都是一个个执行的,所以不能做到真正意义的并发.

内核级线程:

KLT用户线程和内核线程KSE是一对一的关系,每个创建的线程都会和一个内核线程KSE进行静态绑定,内核线程的线程表在内核空间内,所以系统内核可以感知线程的存在,有些地方把这些内核级线程也被称为伪进程(轻进程),线程的创建、调度等完全交予操作系统内核来完成,可以做到真正意义的并发,但是由于多个线程之间的上下文切换需要进行用户态/内核态的转换,所以较耗费资源和性能.

两级线程模型:

用户级线程和内核级线程的混合模式,两级线程模型中的一个进程可以与多个内核线程KSE关联,于是进程内的多个线程可以绑定不同的KSE,其实现原理就是将用户级线程和内核线程KSE进行动态绑定,内核线程KSE数量<=用户级线程的数量,其实也可以理解为一种动态的映射,当某个KSE因为其绑定的线程的阻塞操作被内核调度出CPU时,其关联的进程中其余用户线程可以重新与其他KSE绑定运行.

Go语言的goroutine用的是两级线程模型,相比较传统的OS线程独占固定的栈空间,一般2M,多线程上下文切换消耗较大;goroutine 用户态线程,可以创建很多,每个goroutine栈初始默认占用2kb内存空间,可以动态扩展;runtime.GOMAXPROCS()函数设置当前程序并发时启用的P的数量和OS线程的数量,同时也决定了关联内核线程KSE的数量,默认使用CPU逻辑核心数.

goroutine的调度——GPM模型

G :即协程goroutine,用户创建的需要被执行的任务.

P :processor管理器,P里面会存储当前goroutine运行的上下文环境,负责关联G和M,P的数量决定了系统内最大并行的G的数量,拥有一个单独的goroutine队列LRQ(LocalRunQueue),当自己队列goroutine任务执行完毕后,自动往全局goroutine队列GRQ(GlobalRunQueue)中取任务,如果全局的队列中也没有任务了,它将会去抢占其他processor调度管理器队列中的任务,一边索取LPQ的1/2任务.

M :对内核级线程的封装,M>=P,真正执行任务的线程,M并不保存G的状态,与G本身并没有关系,所以G可以在不同的M执行,系统中的每个M都会拥有一个特殊的goroutine,是系统在初始化M时候创建,他包含了各种调度、垃圾回收和栈管理等程序,其他的G被称为用户G,这个特殊的可以称为系统G.

关系 :P的队列持有G,G只有绑定了P才能被执行,而P调度执行G,最终通过M真正执行G的任务;可以认为是P包含G绑定M.

系统调用阻塞:

当G被阻塞在某个系统调用上时,此时G会阻塞在_Gsyscall状态,M也处于block on syscall状态,此时的M可被抢占调度,执行该G的M会与P解绑,而P则尝试与其它idle的M绑定,继续执行其它G,如果没有其它idle的M,但P的Local队列中仍然有G需要执行,则创建一个新的M;当系统调用完成后,G会重新尝试获取一个idle的P(因为G必须被P持有才会被M执行,)进入它的Local队列恢复执行,如果没有idle的P,G会被标记为runnable加入到Global队列.

用户态阻塞:

当goroutine因为channel操作或者network I/O而阻塞时,netpoller实现了goroutine网络I/O阻塞不会导致M被阻塞而是阻塞阻塞G,对应的G会被放置到某个wait队列(如channel的waitq),该G的状态由_Gruning变为_Gwaitting,而M会跳过该G尝试获取并执行下一个G,如果此时没有runnable的G供M运行,那么M将解绑P,并进入sleep状态;当阻塞的G被另一端的G2唤醒时(比如channel的可读/写通知),G被标记为runnable,尝试加入G2所在P的runnext,然后再是P的Local队列和Global队列.

Seched

代表着一个调度器,它维护有存储空闲的M队列和空闲的P队列,可运行的G队列,自由的G队列以及调度器的一些状态信息等;调度器Seched生成一个M,然后M需要持有绑定一个P,接着M会启动一个内核级线程,循环执行P队列的G任务.

DEMO代码

 func hi(s string)  {  
  fmt.Println("hi",s)
}  
  • 1 启动一个goroutine
 fmt.Println("main start")
go hi("蕾蕾")//go关键字启动一个协程去执行hi函数
fmt.Println("main end")
time.Sleep(time.Second)
//sleep等待goroutine执行完毕,协程有点儿类似于异步的守护进程,
//当主线程结束的时候,其守护进程也将同时结束,  

上面的demo打印如下:

 main start
main end
hi 蕾蕾  

如果去掉关键字go的话,那么程序将会变成同步的,从上往下执行。

  • 2 启动多个goroutine
 fmt.Println("main start")
go hi("协程1")//go启动协程1
go hi("协程2")//go启动协程2
go hi("协程3")//go启动协程3
fmt.Println("main end")
time.Sleep(time.Second)  

输出如下:

 main start
main end
hi 协程1
hi 协程2
hi 协程3  
  • 3 channel 多个goroutine间的通信,当通道为空/满的时候,会阻塞接收数据/发送数据

格式:var chanName chan chanElementType

比如:var cha chan int

chan必须在初始化分配内存以后才可以使用

格式:make(chan chanElementType,[缓冲区大小])

比如:make(chan int,10) 如果未指定10的话,那么cap就是0

注意:chan一般是用在goroutine之间,如果用在主线程上,很容易引发panic如下:

fatal error: all goroutines are asleep – deadlock!

  • 3.1 有缓冲通道
 ch1 := make(chan int,10)//初始化chan
fmt.Println(cap(ch1))
ch1 <- 10  //往通道中放入元素10
ch1 <- 20  //往通道中放入元素20
//<-ch1       //取出通道中的一个元素丢弃
//x := <- ch1 //取出通道中的一个元素并赋值给变量x
x,ok := <- ch1  //在通道open的时候,ok永远返回true,如果取不到值就会阻塞.
fmt.Println(x,ok)
close(ch1)  //关闭通道
x,ok = <- ch1  //在通道close的时候,ok才有可能返回false,             
//通道中没有数据的话不会阻塞,得到的是对应的零值;            
  //如果返回true表明取到了chan里面的值,            
//如果返回false,表明chan已关闭,得到的是x对应的零值.
fmt.Println(x,ok)  
  • 3.2 无缓冲通道,又称为同步通道也就是说需要有两个goroutine参与,如果缺少一方,将导致阻塞,如果是主线程被阻塞而又没有其他的活跃的goroutine将引发上面提到的panic(有缓冲通道容量满的场景也适用).
 ch2 := make(chan int)
go func() {  
  ch2 <- 1  
  fmt.Println("1...发送")  
  time.Sleep(time.Millisecond*100)  
  ch2 <- 2  
  fmt.Println("2...发送")
}()
fmt.Println("接收",<-ch2)
fmt.Println("接收",<-ch2)
time.Sleep(time.Second)  

打印结果如下:

 1...发送
接收 1
2...发送
接收 2  
  • 3.3 从通道循环取值for range
 ch3 := make(chan int)
go func() {  
  for i:=0;i<10;i++  {    
    ch3 <- i  
  }  
  close(ch3)
}()
//方式一
go func() {  
  //如果不close,接收方将一直接收数据,如果没数据就阻塞  
  //如果通道被close,在通道没有数据的情况下,接收方就会自动退出for range
  for v := range ch3{    
    fmt.Println(v)  
  }  
  fmt.Println("finished...")
}()
//方式二
go func() {  
  for{    
    v,ok := <- ch3    
    if !ok {      
      break    
    }else {      
      fmt.Println(v)    
    }  
  }  
  fmt.Println("finished...")
}()
time.Sleep(time.Second)  
  • 3.4 单向chan
 ch5 := make(chan int,10)
go func() {  
  //输入型通道,不允许取值  
  func(cha chan <- int){    
    cha <- 1  
  }(ch5)  
  //输出型通道,不允许写入数据  
  func(cha <-chan int){    
    fmt.Println(<-cha)  
  }(ch5)
}()
time.Sleep(time.Second)  
  • 3.5 多个goroutine同时接收通道数据
 ch6 := make(chan int,10)
//1.准备多个goroutine一直消费通道的数据,不会重复消费
for i := 0; i < 10; i++ {  
  go func(goindex int) {    
    for v := range ch6 {      
      fmt.Println(goindex,"开始执行任务",v)      
      time.Sleep(time.Second)    
    }  
  }(i)
}
//2.往通道发送数据
go func() {  
  for i := 0; i < 20; i++ {    
    ch6 <- i  
  }
}()
time.Sleep(time.Second*11)  
  • 4 select语句,多路复用

在多个通道case中自动选择可以操作的case,如果都没有符合条件的会执行default逻辑,如果没有default逻辑则被阻塞,如果多个case同时符合条件,则随机选择一个执行.

 ch7 := make(chan int)
ch8 := make(chan int,1)
go func() {  
  for i := 0; i < 10; i++ {    
    select {    
      case ch7 <- 1:      
      fmt.Println("ch7发送成功了")    
      case ch8 <- 1:      
      fmt.Println("ch8发送成功了")    
      case v := <-ch8 :      
      fmt.Println("ch8接收到了",v)    
      default:      
      fmt.Println("default")    
    }    
    fmt.Println("select over")  
  }
}()
time.Sleep(time.Second*5)  
  • 5 sync包 (包中的大部分工具类都是值传递而非引用传递)
  • 5.1 WaitGroup类似于java中的CountDownLacth,多阻塞一,多用于判断多个goroutine的任务是否都已完成
 var wg sync.WaitGroup
wg.Add(10)  //表明goroutine任务数
for i := 0; i < 10; i++ {  
  go func(goindex int) {    
    time.Sleep(time.Second*time.Duration(rand.Intn(10)))    
    fmt.Println(goindex,"over")    
    wg.Done() //当前goroutine结束  
  }(i)
}
fmt.Println("wait start")
wg.Wait() //等待其他的goroutine全部完成
fmt.Println("all over")  
  • 5.2 Mutex互斥锁

在这个示例中如果不用互斥锁的话,最后得到的j很可能不是1000.

 var mutex sync.Mutex
var wg sync.WaitGroup
j := 0
wg.Add(1000)
for i:=0;i<1000 ;i++  {  
  go func() {    
    mutex.Lock()    
    j++    
    mutex.Unlock()    
    wg.Done()  
  }()
}
wg.Wait()
fmt.Println("over",j)  
  • 5.3 RWMutex读写互斥锁.

总结:读锁先,读锁是写锁否;写锁先,读写都否。

 var rwMutex sync.RWMutex
var wg sync.WaitGroup
wg.Add(2)
go func() {  
  rwMutex.RLock()  
  fmt.Println("获取读锁")  
  time.Sleep(time.Second*2)  
  fmt.Println("释放读锁")  
  rwMutex.RUnlock()  
  wg.Done()
}()
go func() {  
  //time.Sleep(time.Millisecond*5)  
  rwMutex.Lock()  
  fmt.Println("获得写锁")  
  time.Sleep(time.Second*2)  
  fmt.Println("释放写锁")  
  rwMutex.Unlock()  
  wg.Done()
}()
wg.Wait()  
  • 5.4 Once保证指定函数只被执行一次

这个函数是无参类型,所以想要给函数传递参数的话,需要用到闭包的写,常用场景比如加载配置文件或者单例模式。

 var func_t = func() {  
  fmt.Println("I am be called")
}
var once sync.Once
for i := 0; i < 10; i++ {  
  once.Do(func_t)
}  
  • 5.5 线程安全的map
 syncMap := sync.Map{}
syncMap.Store("key1","value1")//增
v, ok :=syncMap.Load("key1")  //查
fmt.Println(v,ok)  //如果查询到了返回true并且返回对应的值
v, ok = syncMap.LoadOrStore("key1","value2")
fmt.Println(v,ok) //value1 true
//如果没有查询到返回false并且存储和返回给定的值
v, ok = syncMap.LoadOrStore("key2","value2")
fmt.Println(v,ok)    //value2 false
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
syncMap.Range(func(key, value interface{}) bool {  
  fmt.Println("range",key,value)  
  return true
})
syncMap.Delete("key1")   //删
v, _ =syncMap.Load("key1")
fmt.Println(v)  
  • 6 原子操作包atomic
 n1 := int64(100)atomic.AddInt64(&n1,200)fmt.Println(n1)  

这个包里面很多原子操作相关工具比如add、cas、swap等,有兴趣可以研究下。

让我们一起进步学习,共同成长;欢迎大家关注收藏点赞,关注微信公众号:芝麻技术栈

相关文章