现在主流的线程模型分三种:
用户级线程模型、内核级线程模型和两级线程模型(也称混合型线程模型).
用户级线程:
UTL用户线程和内核KSE(KernelSchedulingEntity)是多对一的映射关系,创建、销毁和调度全部都由自己程序(线程库)决定,相比内核级线程较轻量级,但是由于线程表在用户空间,所以操作系统内核对用户级线程无感知的,操作系统只知道进程的存在(进程表),如果全部是用户级线程的话,可以认为操作系统内核调度的基本单位是进程,内核一次只为一个进程分配一个内核处理器,因此一个线程的阻塞将会导致整个进程的阻塞;由于进程内的用户级多线程相对于内核线程KES来说都是一个个执行的,所以不能做到真正意义的并发.
内核级线程:
KLT用户线程和内核线程KSE是一对一的关系,每个创建的线程都会和一个内核线程KSE进行静态绑定,内核线程的线程表在内核空间内,所以系统内核可以感知线程的存在,有些地方把这些内核级线程也被称为伪进程(轻进程),线程的创建、调度等完全交予操作系统内核来完成,可以做到真正意义的并发,但是由于多个线程之间的上下文切换需要进行用户态/内核态的转换,所以较耗费资源和性能.
两级线程模型:
用户级线程和内核级线程的混合模式,两级线程模型中的一个进程可以与多个内核线程KSE关联,于是进程内的多个线程可以绑定不同的KSE,其实现原理就是将用户级线程和内核线程KSE进行动态绑定,内核线程KSE数量<=用户级线程的数量,其实也可以理解为一种动态的映射,当某个KSE因为其绑定的线程的阻塞操作被内核调度出CPU时,其关联的进程中其余用户线程可以重新与其他KSE绑定运行.
Go语言的goroutine用的是两级线程模型,相比较传统的OS线程独占固定的栈空间,一般2M,多线程上下文切换消耗较大;goroutine 用户态线程,可以创建很多,每个goroutine栈初始默认占用2kb内存空间,可以动态扩展;runtime.GOMAXPROCS()函数设置当前程序并发时启用的P的数量和OS线程的数量,同时也决定了关联内核线程KSE的数量,默认使用CPU逻辑核心数.
goroutine的调度——GPM模型
G :即协程goroutine,用户创建的需要被执行的任务.
P :processor管理器,P里面会存储当前goroutine运行的上下文环境,负责关联G和M,P的数量决定了系统内最大并行的G的数量,拥有一个单独的goroutine队列LRQ(LocalRunQueue),当自己队列goroutine任务执行完毕后,自动往全局goroutine队列GRQ(GlobalRunQueue)中取任务,如果全局的队列中也没有任务了,它将会去抢占其他processor调度管理器队列中的任务,一边索取LPQ的1/2任务.
M :对内核级线程的封装,M>=P,真正执行任务的线程,M并不保存G的状态,与G本身并没有关系,所以G可以在不同的M执行,系统中的每个M都会拥有一个特殊的goroutine,是系统在初始化M时候创建,他包含了各种调度、垃圾回收和栈管理等程序,其他的G被称为用户G,这个特殊的可以称为系统G.
关系 :P的队列持有G,G只有绑定了P才能被执行,而P调度执行G,最终通过M真正执行G的任务;可以认为是P包含G绑定M.
系统调用阻塞:
当G被阻塞在某个系统调用上时,此时G会阻塞在_Gsyscall状态,M也处于block on syscall状态,此时的M可被抢占调度,执行该G的M会与P解绑,而P则尝试与其它idle的M绑定,继续执行其它G,如果没有其它idle的M,但P的Local队列中仍然有G需要执行,则创建一个新的M;当系统调用完成后,G会重新尝试获取一个idle的P(因为G必须被P持有才会被M执行,)进入它的Local队列恢复执行,如果没有idle的P,G会被标记为runnable加入到Global队列.
用户态阻塞:
当goroutine因为channel操作或者network I/O而阻塞时,netpoller实现了goroutine网络I/O阻塞不会导致M被阻塞而是阻塞阻塞G,对应的G会被放置到某个wait队列(如channel的waitq),该G的状态由_Gruning变为_Gwaitting,而M会跳过该G尝试获取并执行下一个G,如果此时没有runnable的G供M运行,那么M将解绑P,并进入sleep状态;当阻塞的G被另一端的G2唤醒时(比如channel的可读/写通知),G被标记为runnable,尝试加入G2所在P的runnext,然后再是P的Local队列和Global队列.
Seched
代表着一个调度器,它维护有存储空闲的M队列和空闲的P队列,可运行的G队列,自由的G队列以及调度器的一些状态信息等;调度器Seched生成一个M,然后M需要持有绑定一个P,接着M会启动一个内核级线程,循环执行P队列的G任务.
DEMO代码
func hi(s string) {
fmt.Println("hi",s)
}
- 1 启动一个goroutine
fmt.Println("main start")
go hi("蕾蕾")//go关键字启动一个协程去执行hi函数
fmt.Println("main end")
time.Sleep(time.Second)
//sleep等待goroutine执行完毕,协程有点儿类似于异步的守护进程,
//当主线程结束的时候,其守护进程也将同时结束,
上面的demo打印如下:
main start
main end
hi 蕾蕾
如果去掉关键字go的话,那么程序将会变成同步的,从上往下执行。
- 2 启动多个goroutine
fmt.Println("main start")
go hi("协程1")//go启动协程1
go hi("协程2")//go启动协程2
go hi("协程3")//go启动协程3
fmt.Println("main end")
time.Sleep(time.Second)
输出如下:
main start
main end
hi 协程1
hi 协程2
hi 协程3
- 3 channel 多个goroutine间的通信,当通道为空/满的时候,会阻塞接收数据/发送数据
格式:var chanName chan chanElementType
比如:var cha chan int
chan必须在初始化分配内存以后才可以使用
格式:make(chan chanElementType,[缓冲区大小])
比如:make(chan int,10) 如果未指定10的话,那么cap就是0
注意:chan一般是用在goroutine之间,如果用在主线程上,很容易引发panic如下:
fatal error: all goroutines are asleep – deadlock!
- 3.1 有缓冲通道
ch1 := make(chan int,10)//初始化chan
fmt.Println(cap(ch1))
ch1 <- 10 //往通道中放入元素10
ch1 <- 20 //往通道中放入元素20
//<-ch1 //取出通道中的一个元素丢弃
//x := <- ch1 //取出通道中的一个元素并赋值给变量x
x,ok := <- ch1 //在通道open的时候,ok永远返回true,如果取不到值就会阻塞.
fmt.Println(x,ok)
close(ch1) //关闭通道
x,ok = <- ch1 //在通道close的时候,ok才有可能返回false,
//通道中没有数据的话不会阻塞,得到的是对应的零值;
//如果返回true表明取到了chan里面的值,
//如果返回false,表明chan已关闭,得到的是x对应的零值.
fmt.Println(x,ok)
- 3.2 无缓冲通道,又称为同步通道也就是说需要有两个goroutine参与,如果缺少一方,将导致阻塞,如果是主线程被阻塞而又没有其他的活跃的goroutine将引发上面提到的panic(有缓冲通道容量满的场景也适用).
ch2 := make(chan int)
go func() {
ch2 <- 1
fmt.Println("1...发送")
time.Sleep(time.Millisecond*100)
ch2 <- 2
fmt.Println("2...发送")
}()
fmt.Println("接收",<-ch2)
fmt.Println("接收",<-ch2)
time.Sleep(time.Second)
打印结果如下:
1...发送
接收 1
2...发送
接收 2
- 3.3 从通道循环取值for range
ch3 := make(chan int)
go func() {
for i:=0;i<10;i++ {
ch3 <- i
}
close(ch3)
}()
//方式一
go func() {
//如果不close,接收方将一直接收数据,如果没数据就阻塞
//如果通道被close,在通道没有数据的情况下,接收方就会自动退出for range
for v := range ch3{
fmt.Println(v)
}
fmt.Println("finished...")
}()
//方式二
go func() {
for{
v,ok := <- ch3
if !ok {
break
}else {
fmt.Println(v)
}
}
fmt.Println("finished...")
}()
time.Sleep(time.Second)
- 3.4 单向chan
ch5 := make(chan int,10)
go func() {
//输入型通道,不允许取值
func(cha chan <- int){
cha <- 1
}(ch5)
//输出型通道,不允许写入数据
func(cha <-chan int){
fmt.Println(<-cha)
}(ch5)
}()
time.Sleep(time.Second)
- 3.5 多个goroutine同时接收通道数据
ch6 := make(chan int,10)
//1.准备多个goroutine一直消费通道的数据,不会重复消费
for i := 0; i < 10; i++ {
go func(goindex int) {
for v := range ch6 {
fmt.Println(goindex,"开始执行任务",v)
time.Sleep(time.Second)
}
}(i)
}
//2.往通道发送数据
go func() {
for i := 0; i < 20; i++ {
ch6 <- i
}
}()
time.Sleep(time.Second*11)
- 4 select语句,多路复用
在多个通道case中自动选择可以操作的case,如果都没有符合条件的会执行default逻辑,如果没有default逻辑则被阻塞,如果多个case同时符合条件,则随机选择一个执行.
ch7 := make(chan int)
ch8 := make(chan int,1)
go func() {
for i := 0; i < 10; i++ {
select {
case ch7 <- 1:
fmt.Println("ch7发送成功了")
case ch8 <- 1:
fmt.Println("ch8发送成功了")
case v := <-ch8 :
fmt.Println("ch8接收到了",v)
default:
fmt.Println("default")
}
fmt.Println("select over")
}
}()
time.Sleep(time.Second*5)
- 5 sync包 (包中的大部分工具类都是值传递而非引用传递)
- 5.1 WaitGroup类似于java中的CountDownLacth,多阻塞一,多用于判断多个goroutine的任务是否都已完成
var wg sync.WaitGroup
wg.Add(10) //表明goroutine任务数
for i := 0; i < 10; i++ {
go func(goindex int) {
time.Sleep(time.Second*time.Duration(rand.Intn(10)))
fmt.Println(goindex,"over")
wg.Done() //当前goroutine结束
}(i)
}
fmt.Println("wait start")
wg.Wait() //等待其他的goroutine全部完成
fmt.Println("all over")
- 5.2 Mutex互斥锁
在这个示例中如果不用互斥锁的话,最后得到的j很可能不是1000.
var mutex sync.Mutex
var wg sync.WaitGroup
j := 0
wg.Add(1000)
for i:=0;i<1000 ;i++ {
go func() {
mutex.Lock()
j++
mutex.Unlock()
wg.Done()
}()
}
wg.Wait()
fmt.Println("over",j)
- 5.3 RWMutex读写互斥锁.
总结:读锁先,读锁是写锁否;写锁先,读写都否。
var rwMutex sync.RWMutex
var wg sync.WaitGroup
wg.Add(2)
go func() {
rwMutex.RLock()
fmt.Println("获取读锁")
time.Sleep(time.Second*2)
fmt.Println("释放读锁")
rwMutex.RUnlock()
wg.Done()
}()
go func() {
//time.Sleep(time.Millisecond*5)
rwMutex.Lock()
fmt.Println("获得写锁")
time.Sleep(time.Second*2)
fmt.Println("释放写锁")
rwMutex.Unlock()
wg.Done()
}()
wg.Wait()
- 5.4 Once保证指定函数只被执行一次
这个函数是无参类型,所以想要给函数传递参数的话,需要用到闭包的写,常用场景比如加载配置文件或者单例模式。
var func_t = func() {
fmt.Println("I am be called")
}
var once sync.Once
for i := 0; i < 10; i++ {
once.Do(func_t)
}
- 5.5 线程安全的map
syncMap := sync.Map{}
syncMap.Store("key1","value1")//增
v, ok :=syncMap.Load("key1") //查
fmt.Println(v,ok) //如果查询到了返回true并且返回对应的值
v, ok = syncMap.LoadOrStore("key1","value2")
fmt.Println(v,ok) //value1 true
//如果没有查询到返回false并且存储和返回给定的值
v, ok = syncMap.LoadOrStore("key2","value2")
fmt.Println(v,ok) //value2 false
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
syncMap.Range(func(key, value interface{}) bool {
fmt.Println("range",key,value)
return true
})
syncMap.Delete("key1") //删
v, _ =syncMap.Load("key1")
fmt.Println(v)
- 6 原子操作包atomic
n1 := int64(100)atomic.AddInt64(&n1,200)fmt.Println(n1)
这个包里面很多原子操作相关工具比如add、cas、swap等,有兴趣可以研究下。
让我们一起进步学习,共同成长;欢迎大家关注收藏点赞,关注微信公众号:芝麻技术栈