七叶笔记 » golang编程 » 浅谈golang的sync包

浅谈golang的sync包

Golang sync包提供了一些基础的异步操作方法,非常值得学习,这里对sync包几个重要的结构体和方法做个介绍。

sync包

sync包是 golang 一个官方的异步库,提供了一些各种基础的异步的实现,如互斥锁等。sync 包主要包括了以下几种类型:

  • sync.Mutex 和 sync.WaitGroup
  • sync.Once
  • sync.Map
  • sync.Pool
  • sync.Cond

sync.Once

sync.Once 是 Golang package 中使方法 只执行一次 的对象实现,作用与 init 函数类似。 sync.Once 比较多用在初始化,注册和对象创建上。 sync.Once 源码:

 type Once struct {
// done 表示被执行标识
// 同时因为 done 是高频放在结构体的第一个位置,
// 可以通过结构体指针直接进行访问,而访问其他的
// 字段需要通过偏移量计算相对就会慢一些
done uint32
m    Mutex
}

// Once结构体Do方法只会被执行一次
// ,执行一次后 done 被设置为 1, 后面不再执行
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
// 加锁执行,这里没办法用
//if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
//f()
//}
// 这种形式,这样做等于在f()执行之前已经赋值,如果这时候有两个
// 同时进行调用,一个会执行f(),另一个则会在f()执行结束之前就返回
// 这样会存在问题,因此doSlow加了锁,同时在f()执行之后,再将done存储为1
o.doSlow(f)
}
}

func (o *Once) doSlow(f func()) {
o.m. Lock ()
defer o.m.Unlock()
// 这里是做了 double check
if o.done == 0 {
// 在func执行之后再将done值改变
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
  

sync.Once用法

sync.Once 其中一个应用就是实现一个单例模式,比如在一个结构体中申明,并在初始化的时候使用:

 var capInstance struct {
once         sync.Once
lock         sync.Mutex
capabilities *Capabilities
}

// Initialize the capability set.  This can only be done once per binary, subsequent calls are ignored.
func Initialize(c Capabilities) {
// Only do this once
capInstance.once.Do(func() {
capInstance.capabilities = &c
})
}
  

sync.Map

sync.Map 是一个线程安全的map结构,一般用于多读少写的并发操作,下图是 sync.Map 的数据结构

图引至码农桃花源公众号

 type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}
  

mu Map 的互斥锁用于对并发操作进行加锁保护, read 是用于存储只读内容的,可以提供高并发的读操作。 dirty 是一个原始的 map 结构体,对 dirty 的操作需要加锁, dirty 包涵了全量的数据,在读数据的时候会先读取 read read 读取不到再读 dirty misses read 读取失败的次数,当多次读取失败后 misses 累计特定值, dirty 就会升级成 read sync.Map 这里采用的策略类似数据库常用的”读写分离”,技术都是相通的O(∩_∩)O

sync.Map用法

 func main() {
var value sync.Map
// 写入
value.Store("your name", "shi")
value.Store("her name", "kanon")
// 读取
name, ok := value.Load("your name")
if !ok {
println("can't find name")
}
fmt.Println(name)
// 遍历
value.Range(func(ki, vi interface{}) bool {
k, v := ki.(string), vi.(string)
fmt.Println(k, v)
return true
})
// 删除
value.Delete("your name")
// 读取,如果不存在则写入
activename, loaded := value.LoadOrStore("his name", "baba")
fmt.Println(activename.(string), loaded)
}
  

sync.Pool

sync.Pool 是一个用来缓存大量重复对象,减少大量对象创建给GC压力,是 sync 异步包中很重要的一种数据结构,看其基本数据结构:

 type Pool struct {
  // noCopy 表示不支持值拷贝,如果出现值拷贝用 go vet 编译检查的时候会报错
noCopy noCopy

  // [P]poolLocal,表示每个local的P池
local     unsafe.Pointer
  // local的长度
localSize uintptr

  // 也是[P]poolLocal,表示上一个生命周期的local
victim     unsafe.Pointer
  // victim的长度
victimSize uintptr

  // 用于创建新对象方法,get获取不到就会调用创建一个新对象,一般由用户传入
New func() interface{}
}
  

图引至码农桃花源公众号

sync.Pool 的用法

sync.Pool的用法很简单,就三个方法:

 //初始化pool对象
var pool sync.Pool

type shikanon struct {
num int
}

// 创建新对象创建方法
func initPool() {
pool = sync.Pool{
New: func() interface{} {
return &shikanon{num: rand.Int()}
},
}
}

func main() {
  initPool()
  // 从pool对象池中取对象
p1 := pool.Get().(*shikanon)
fmt.Println("p1", p1.num)

  // 将对象放入pool对象池
pool.Put(p1)

p2 := pool.Get().(*shikanon)
  fmt.Println("p2", p2.num)
}
  

sync.Cond

sync.Cond 是用于条件变量(condition variable)实现 —— 它可以让 Goroutine 都在满足特定条件时被唤醒,因此通常和锁一起使用,比如 Mutex 或 RWMutex。Cond 就是 condition 的意思。
sync.Cond 的数据结构:

 type Cond struct {
// noCopy 保证结构体不在编译期间被拷贝,如果出现值拷贝用 go vet 编译检查的时候会报错
noCopy noCopy
// 等待条件的锁
L Locker
// 通知列表
notify  notifyList
// 用于禁止运行期间发生的拷贝
checker copyChecker
}
  

从数据结构可以看出,sync.Cond 等于在sync.Mutext的基础上,增加了一个通知列表notify做条件通知。

sync.Cond 主要有三种方法:等待通知(wait),单发通知(signal),广播通知(broadcast)。

 // 生成一个cond,需要传入一个Locker,
// 因为阻塞等待通知的操作以及通知解除阻塞的操作就是基于Locker来实现的。
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}

// 用于等待通知
func (c *Cond) Wait() {
// 检查cond是否被拷贝
c.checker.check()
// 将获得锁的goroutine加入等待队列
t :=  runtime _notifyListAdd(&c.notify)
c.L.Unlock()
// 将当前 Goroutine 追加到notifyList链表的末端,并让其处于休眠状态,这个操作是阻塞的,
// 让当前 goroutine 休眠主要是通过调用 runtime.goparkunlock 实现
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}

// 用于发送单个通知
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}

// 用于广播
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
  

runtime_notifyListNotifyOne runtime_notifyListNotifyAll runtime_notifyListAdd runtime_notifyListWait
这几个函数都是用的 runtime 下的 sema.go 文件link过来的,这里先不深究,看看 notifyList 数据结构:

 type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait uint32

// notify is the ticket number of the next waiter to be notified. It can
// be read outside the lock, but is only written to with lock held.
//
// Both wait & notify can wrap around, and such cases will be correctly
// handled as long as their "unwrapped" difference is bounded by 2^31.
// For this not to be the case, we'd need to have 2^31+ goroutines
// blocked on the same condvar, which is currently not possible.
notify uint32

// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}
  

sync.Cond 的用法

sync.Cond 主要用于消息广播中(主要是单通知大家用的更多地是 channel + select )。比较经典的sync.Cond实现有 etcd 的 FIFO Scheduler 的实现:

 func NewFIFOScheduler() Scheduler {
f := &fifo{
resume: make(chan struct{}, 1),
donec:  make(chan struct{}, 1),
}
// 生成一个Cond对象
f.finishCond = sync.NewCond(&f.mu)
f.ctx, f.cancel = context.WithCancel(context.Background())
go f.run()
return f
}

// WaitFinish 用于 等待至少 n 个任务被完成 或所有 pending任务被完成
func (f *fifo) WaitFinish(n int) {
f.finishCond.L.Lock()
for f.finished < n || len(f.pendings) != 0 {
// 等待通知
f.finishCond.Wait()
}
f.finishCond.L.Unlock()
}

func (f *fifo) run() {
...

for {
var todo Job
...
// 完成一个上下文
todo(f.ctx)
// 加锁
f.finishCond.L.Lock()
f.finished++
f.pendings = f.pendings[1:]
// 广播通知唤醒其他所有goroutine
f.finishCond.Broadcast()
f.finishCond.L.Unlock()
}
}
  

参考文献


相关文章