七叶笔记 » golang编程 » Go硬核RWMutex,select,chan实现数据高并发

Go硬核RWMutex,select,chan实现数据高并发

上篇 介绍了通过Redis的分布式锁解决对缓存数据的存取过程中,如果高并发而造成的重复获取数据、甚至造成数据的不一致性的问题。Redis只是一种解决方案,并且适合多节点多副本的场景。

如果单节点或者没有Redis的情况下,我们能否实现呢。答案是当然可以的,只要不断探寻,总能解决的。本篇就介绍通过Go的传统锁,select和channel来实现这样的高并发数据缓存的存取逻辑。

为了便于读者看本篇内容和代码时,更易于理解,本篇还引用上篇的流程草图,但具体流程、场景不再细说,只是本篇不采用redis,而是把数据缓存到内存中。

贴代码前,先看2个知识点,有助于理解本篇主题的代码,Go的读写锁RWMutex和select结合channel的多路复用的用法。

读写锁RWMutex

名词解释

  • 读锁,写锁是一个名词
  • 读锁定RLock(),写锁定Lock()是一个动作、操作、函数调用
  • 读解锁RUnlock(),写解锁Unlock()是一个动作、操作、函数调用

RWMutex是一个专门提供读或写的锁,这个锁可以被很多读锁锁定,或被某个写锁锁定,有如下特性,也就保证了操作数据的原子性。

  • 读锁之间不互斥,读锁与写锁,写锁与写锁之间都会互
  • 适合多读少写的场景,因为读锁之间不互斥
  • 当读锁锁定的时候,且此时有写锁锁定调用,会阻止任何一个协程来获取这个锁,包括读锁
  • 不管是读锁还是写锁,如果之前没有锁定,此时调用解锁会Panic runtime error
  • 不独享某个线程,可以某个线程锁定,另外某个线程解锁

这些特性我们通过代码只验证这一条“ 当读锁锁定的时候,且此时有写锁锁定调用,会阻止任何一个协程来获取这个锁,包括读锁 ”,其他比较容易理解和自行验证。

 var rwmu sync.RWMutex

func RWMuFoo() {
  go func() {
    time.Sleep(time.Millisecond * 100)
    for i := 0; i < 5; i++ {
      go func(a int) {
        if a/2 == 0 {
          rwmu.Lock()
          fmt.Println("wlock")
          rwmu.Unlock()
        } else {
          time.Sleep(time.Millisecond * 100)
          rwmu.RLock()
          fmt.Println("rlock")
          rwmu.RUnlock()
        }
      }(i)
    }
  }()

  rwmu.RLock()
  fmt.Println("start to rlock")
  time.Sleep(time.Second)
  fmt.Println("end to rlock")
  rwmu.RUnlock()
}  
  • 第1行声明了一个读写锁的变量rwmu
  • 第22-26行,演示进行读锁定,并且停留一秒钟,才进行读解锁。停留一秒钟是为了让其他协程有机会进行读锁定或写锁定。
  • 第4-20行,启动协程来对rwmu继续读锁定和写锁定。由于协程调度的机制并不能保证代码写的那样的顺序执行,所以为了演示该效果,代码中有2处停留
    • 第5行停留100毫秒是为了让第22-26行代码先执行。
    • 第13行停留100毫秒是为了让第9-11行的写锁先锁定。

所以以上代码可以验证“ 当读锁锁定的时候,且此时有写锁锁定调用,会阻止任何一个协程来获取这个锁,包括读锁 ”。现在运行代码

   c := make(chan int)
  select {
  case c <- 1:
    fmt.Println("c<-1")
  default:
    fmt.Println("default")
  }  

是符合预期的。

select, channel

   c := make(chan int)
  select {
  case c <- 1:
    fmt.Println("c<-1")
  default:
    fmt.Println("default")
  }  

select语句可以让一个协程等待多个通信操作。如果没有default语句时,select会一直阻塞,知道某个case可以执行,如果同时有多个case均可以执行,系统会随机选择一个来执行。

channel的概念不多讲,有兴趣的可以看看官方文档。

数据缓存的高并发存取

通过对RWMutex, select, channel的介绍,我们就可以开发类似上篇的逻辑代码了。

代码片段1

 // NBLock likes redis's set nx
type NBLock struct {
  locked chan bool
}

func (l NBLock) lock() bool {
  select {
  case l.locked <- true:
    fmt.Println("locked")
    return true
  default:
    return false
  }
}

func (l NBLock) unlock() {
  <-l.locked
}

var nblock = NBLock{locked: make(chan bool, 1)}
var result string

var rwmu sync.RWMutex

func HandleReq() {
  // rwmu.RLock()
  reslen := len(result)
  // rwmu.RUnlock()
  if reslen > 0 {
    fmt.Println("get global var: ok")
    return
  }

  if nblock.lock() {
    // rwmu.Lock()
    result = fetch()
    fmt.Println("fetch: ok")
    // rwmu.Unlock()

    nblock.unlock()
  } else {
    for {
      // rwmu.RLock()
      reslen1 := len(result)
      // rwmu.RUnlock()
      if reslen1 > 0 {
        fmt.Println("waiting: ok ")
        return
      }
    }
  }
}  
  • 第1-17行,自定义了一个非阻塞的锁,并且只能锁定成功一次,在没有解锁之前,不能再次进行解锁。非常类似Redis的分布式锁。
  • 第25-52行的逻辑比较类似上篇的逻辑。即高并发请求进来,会锁定第一次的数据获取,其他请求则等待或从缓存获取。

现在运行并发10次运行代码

   for i := 0; i < 10; i++ {
    if i > 4 {
      time.Sleep(time.Millisecond * 100)
    }
    go igo.ONReqWithRWMu()
  }  

运行结果如下

 locked
waiting: ok 
get global var: ok
waiting: ok 
waiting: ok 
fetch: ok
waiting: ok 
get global var: ok
get global var: ok
get global var: ok
get global var: ok  

结果是符合我们预期的,lock了一次,获取数据fetch了一次,剩下的有等待的,也有直接从变量获取的。

咦,有没搞错,怎么没有用到读写锁呢RWMutex(其实先注释掉了)。这段代码里为什么要用读写锁,现在我们添加 -race 竟态检查运行下代码,就知道了。

 Write at 0x000001667a90 by goroutine 8:
  interview-go/igo.ONReqWithRWMu()
      /.../onenode_waitresult.go:36 +0x125

Previous read at 0x000001667a90 by goroutine 10:
  interview-go/igo.ONReqWithRWMu()
      /.../onenode_waitresult.go:44 +0x87
==================  

运行结果告诉我们

  • 协程8在代码片段1的36行处result = fetch()对result赋值。
  • 协程10在代码片段1的44行处reslen1 := len (result)对result读取长度。
  • 这2个协程发生了对变量的竞争,如果不处理,很可能会造成数据的不一致或错乱。

此时读写锁就发挥出作用了,现在将代码片段1中的关于读写锁的注释去掉,即第26、28、35、38、43、45行去掉注释,然后运行代码,并加上-race进行竟态检查

 go test ./test/onenode_waitresult_test.go -v -count=1 -race  
 locked
fetch: ok
get global var: ok
get global var: ok
get global var: ok
get global var: ok
waiting: ok 
get global var: ok
get global var: ok
get global var: ok
get global var: ok  

运行结果也符合预期,且没有了对result的竞争。

本篇内容基本就到这里。

总结

  • 本篇内容和上篇内容还是有区别,并不能很好地通过设置timeout来强制等待的协程返回内容。
  • 可以通过在fetch的方法里面进行判断timeout或出错后,一定要写解锁。否则其他协程就会一直处于阻塞状态。
  • 本篇代码更多是一种实现的思想,也更好地介绍了读写锁RWMutex和自己通过select, channel来实现一个非阻塞的锁,且没解锁前只能成功锁定一次。

篇篇更精彩,章章有深度

感谢大家转发、关注和一起讨论学习

相关文章