go源码解读-sync.RWMutex

RWMutex

  • RWMutex是一个互斥读写锁
  • 可以被多个读线程持有或者被一个写线程持有
  • 零值为未上锁的mutex
  • 第一次使用之后不可复制
  • 多个线程的读取操作不会被阻塞,读写,写读,读读会被阻塞
1
2
3
4
5
6
7
type RWMutex struct {
w Mutex // 互斥量
writerSem uint32 // 写阻塞等待的信号量,最后一个读线程释放锁时会释放信号量
readerSem uint32 // 读阻塞等待信号量,持有写锁的线程完成释放锁后释放的信号量
readerCount int32 // 记录读者的数量
readerWait int32 // 记录写阻塞时读者的个数
}

RWMutex的方法

  • RLock方法完成读锁的加锁,如果是个写线程请求锁将会被阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 完成当前持有该锁的读者的个数的+1,如果没有持有该锁的读线程,则会唤醒等待中的写信号量
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
runtime_SemacquireMutex(&rw.readerSem, false)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
  • RUnlock方法完成读锁的解锁操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}
if race.Enabled {
race.Enable()
}
}
  • Lock完成写锁的加锁
  • 写锁实际上是加到w中的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
  • Unlock方法完成写锁的解锁
  • 逻辑与Mutex的解锁是一致的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// Allow other writers to proceed.
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
  • 封装了专门的读锁
1
2
3
4
5
6
7
8
9
10
// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
func (rw *RWMutex) RLocker() Locker {
return (*rlocker)(rw)
}
type rlocker RWMutex
func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
Donate comment here