上篇 介绍了通过Redis的分布式锁解决对缓存数据的存取过程中,如果高并发而造成的重复获取数据、甚至造成数据的不一致性的问题。Redis只是一种解决方案,并且适合多节点多副本的场景。
如果单节点或者没有Redis的情况下,我们能否实现呢。答案是当然可以的,只要不断探寻,总能解决的。本篇就介绍通过Go的传统锁,select和channel来实现这样的高并发数据缓存的存取逻辑。
为了便于读者看本篇内容和代码时,更易于理解,本篇还引用上篇的流程草图,但具体流程、场景不再细说,只是本篇不采用redis,而是把数据缓存到内存中。
贴代码前,先看2个知识点,有助于理解本篇主题的代码,Go的读写锁RWMutex和select结合channel的多路复用的用法。
读写锁RWMutex
名词解释
- 读锁,写锁是一个名词
- 读锁定RLock(),写锁定Lock()是一个动作、操作、函数调用
- 读解锁RUnlock(),写解锁Unlock()是一个动作、操作、函数调用
RWMutex是一个专门提供读或写的锁,这个锁可以被很多读锁锁定,或被某个写锁锁定,有如下特性,也就保证了操作数据的原子性。
- 读锁之间不互斥,读锁与写锁,写锁与写锁之间都会互
- 适合多读少写的场景,因为读锁之间不互斥
- 当读锁锁定的时候,且此时有写锁锁定调用,会阻止任何一个协程来获取这个锁,包括读锁
- 不管是读锁还是写锁,如果之前没有锁定,此时调用解锁会Panic runtime error
- 不独享某个线程,可以某个线程锁定,另外某个线程解锁
这些特性我们通过代码只验证这一条“ 当读锁锁定的时候,且此时有写锁锁定调用,会阻止任何一个协程来获取这个锁,包括读锁 ”,其他比较容易理解和自行验证。
var rwmu sync.RWMutex
func RWMuFoo() {
go func() {
time.Sleep(time.Millisecond * 100)
for i := 0; i < 5; i++ {
go func(a int) {
if a/2 == 0 {
rwmu.Lock()
fmt.Println("wlock")
rwmu.Unlock()
} else {
time.Sleep(time.Millisecond * 100)
rwmu.RLock()
fmt.Println("rlock")
rwmu.RUnlock()
}
}(i)
}
}()
rwmu.RLock()
fmt.Println("start to rlock")
time.Sleep(time.Second)
fmt.Println("end to rlock")
rwmu.RUnlock()
}
- 第1行声明了一个读写锁的变量rwmu
- 第22-26行,演示进行读锁定,并且停留一秒钟,才进行读解锁。停留一秒钟是为了让其他协程有机会进行读锁定或写锁定。
- 第4-20行,启动协程来对rwmu继续读锁定和写锁定。由于协程调度的机制并不能保证代码写的那样的顺序执行,所以为了演示该效果,代码中有2处停留
- 第5行停留100毫秒是为了让第22-26行代码先执行。
- 第13行停留100毫秒是为了让第9-11行的写锁先锁定。
所以以上代码可以验证“ 当读锁锁定的时候,且此时有写锁锁定调用,会阻止任何一个协程来获取这个锁,包括读锁 ”。现在运行代码
c := make(chan int)
select {
case c <- 1:
fmt.Println("c<-1")
default:
fmt.Println("default")
}
是符合预期的。
select, channel
c := make(chan int)
select {
case c <- 1:
fmt.Println("c<-1")
default:
fmt.Println("default")
}
select语句可以让一个协程等待多个通信操作。如果没有default语句时,select会一直阻塞,知道某个case可以执行,如果同时有多个case均可以执行,系统会随机选择一个来执行。
channel的概念不多讲,有兴趣的可以看看官方文档。
数据缓存的高并发存取
通过对RWMutex, select, channel的介绍,我们就可以开发类似上篇的逻辑代码了。
代码片段1
// NBLock likes redis's set nx
type NBLock struct {
locked chan bool
}
func (l NBLock) lock() bool {
select {
case l.locked <- true:
fmt.Println("locked")
return true
default:
return false
}
}
func (l NBLock) unlock() {
<-l.locked
}
var nblock = NBLock{locked: make(chan bool, 1)}
var result string
var rwmu sync.RWMutex
func HandleReq() {
// rwmu.RLock()
reslen := len(result)
// rwmu.RUnlock()
if reslen > 0 {
fmt.Println("get global var: ok")
return
}
if nblock.lock() {
// rwmu.Lock()
result = fetch()
fmt.Println("fetch: ok")
// rwmu.Unlock()
nblock.unlock()
} else {
for {
// rwmu.RLock()
reslen1 := len(result)
// rwmu.RUnlock()
if reslen1 > 0 {
fmt.Println("waiting: ok ")
return
}
}
}
}
- 第1-17行,自定义了一个非阻塞的锁,并且只能锁定成功一次,在没有解锁之前,不能再次进行解锁。非常类似Redis的分布式锁。
- 第25-52行的逻辑比较类似上篇的逻辑。即高并发请求进来,会锁定第一次的数据获取,其他请求则等待或从缓存获取。
现在运行并发10次运行代码
for i := 0; i < 10; i++ {
if i > 4 {
time.Sleep(time.Millisecond * 100)
}
go igo.ONReqWithRWMu()
}
运行结果如下
locked
waiting: ok
get global var: ok
waiting: ok
waiting: ok
fetch: ok
waiting: ok
get global var: ok
get global var: ok
get global var: ok
get global var: ok
结果是符合我们预期的,lock了一次,获取数据fetch了一次,剩下的有等待的,也有直接从变量获取的。
咦,有没搞错,怎么没有用到读写锁呢RWMutex(其实先注释掉了)。这段代码里为什么要用读写锁,现在我们添加 -race 竟态检查运行下代码,就知道了。
Write at 0x000001667a90 by goroutine 8:
interview-go/igo.ONReqWithRWMu()
/.../onenode_waitresult.go:36 +0x125
Previous read at 0x000001667a90 by goroutine 10:
interview-go/igo.ONReqWithRWMu()
/.../onenode_waitresult.go:44 +0x87
==================
运行结果告诉我们
- 协程8在代码片段1的36行处result = fetch()对result赋值。
- 协程10在代码片段1的44行处reslen1 := len (result)对result读取长度。
- 这2个协程发生了对变量的竞争,如果不处理,很可能会造成数据的不一致或错乱。
此时读写锁就发挥出作用了,现在将代码片段1中的关于读写锁的注释去掉,即第26、28、35、38、43、45行去掉注释,然后运行代码,并加上-race进行竟态检查
go test ./test/onenode_waitresult_test.go -v -count=1 -race
locked
fetch: ok
get global var: ok
get global var: ok
get global var: ok
get global var: ok
waiting: ok
get global var: ok
get global var: ok
get global var: ok
get global var: ok
运行结果也符合预期,且没有了对result的竞争。
本篇内容基本就到这里。
总结
- 本篇内容和上篇内容还是有区别,并不能很好地通过设置timeout来强制等待的协程返回内容。
- 可以通过在fetch的方法里面进行判断timeout或出错后,一定要写解锁。否则其他协程就会一直处于阻塞状态。
- 本篇代码更多是一种实现的思想,也更好地介绍了读写锁RWMutex和自己通过select, channel来实现一个非阻塞的锁,且没解锁前只能成功锁定一次。
篇篇更精彩,章章有深度
感谢大家转发、关注和一起讨论学习