go源码解读-sync.Mutex

Mutex

  • sync包提供了互斥量,Once, WaitGroup来保证多线程并发安全
  • 比较简单的同步通过sync包,比较复杂的同步建议通过channel来实现

Mutex结构体

  • Mutex实际上是互斥锁
  • Mutex的初始化状态就是一个没加锁的互斥锁
  • Mutex在被加锁之后不允许被复制,也就是带有状态的Mutex不能被复制,否则会给原来的临界数据复制出一把锁
  • Mutex包含两个字段
    • state
    • sema
  • Mutex现在有两种状态,normal和starvation
    • normal状态下,goroutine是在一个FIFO的队列中排队等待锁的,也就是按照锁的请求时间依次获取锁。但是被唤醒的goroutine并不会立刻拥有mutex,而是需要与新到达的goroutine进行竞争,由于新到达的goroutine已经加载在内存中,所以被唤醒的goroutine大概率会竞争失败,竞争失败之后被唤醒的goroutine会被防止到FIFO的队首。如果一个被唤醒的goroutine在1ms内获取mutex失败,则mutex状态设置为starvation
    • starvation状态下,mutex在被当前持有的线程解锁之后,FIFO队列首的goroutine被唤醒并直接拥有改mutex,新到达的goroutine不再尝试获取mutex,也不再自选等待获取锁,而是直接排到队列的队尾
    • 如果一个goroutine获取到mutex,如果这个goroutine是队列中的最后一个goroutine或者goroutine等待时间小于1ms,则mutex状态变为noraml状态
    • normal状态下性能更好,starvation状态下更加公平,不存在插队和多次获取的情况
1
2
3
4
5
6
7
8
9
10
11
12
type Mutex struct {
state int32
sema uint32
}
const (
mutexLocked = 1 << iota // 锁定状态 001
mutexWoken // 唤醒状态 010
mutexStarving // 饥饿状态 100
mutexWaiterShift = iota // 向右偏移3位之后的值记录等待该mutex的goroutine数量
starvationThresholdNs = 1e6 // normal到starvation状态的阈值 1ms
)

Mutex实现了Locker接口

1
2
3
4
type Locker interface {
Lock()
Unlock()
}
  • lock方法对mutex加锁
  • 如果mutex已经被加锁,尝试加锁的goroutine将会被阻塞直到获取到该锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
func (m *Mutex) Lock() {
// 如果mutex没有被加过锁,则用原子方法CAS修改mutex的状态为mutexLocked,直接返回结果即可
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// 如果当前mutex的状态为starvation,则当前线程不再自旋等待mutex
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// awork为false,mutex为非awoke状态,有等待状态的goroutine且CAS更改awoke位成功,则将awoke设置为true
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 实际的自旋等待操作,更新iter和old的值
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// 如果mutex为starvation状态的话,当前的goroutine需要去队列中排队
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 当前goroutine为starvation状态并且为非锁定状态,设置mutex状态为starvation
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// awoke为true,在等待队列中唤醒一个goroutine
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 将新的mutex状态值通过CAS复制给state
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果old的状态为000,则直接退出,已经完成了上锁操作
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞获取mutex,这个地方会一直阻塞,直到获取到mutex锁
runtime_SemacquireMutex(&m.sema, queueLifo)
// 获取到锁之后,根据获取锁的等待时间,设置mutex的运行状态是否为starvation
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 设置mutex的starvation状态
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
// runtime包中的,返回当前时刻自旋等待是否有意义
func runtime_canSpin(i int) bool
  • Unlock完成解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (m *Mutex) Unlock() {
// 快速解锁
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 快速重置mutexLocked位置为0,完成之后很可能被其他goroutine抢走锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 如果mutex为normal状态,走stravtion分支即可
if new&mutexStarving == 0 {
old := new
for {
// 如果没有等待的goroutine,直接返回即可
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 有等待的goroutine,则会减少等待数量并且唤醒一个goroutine进行竞争mutex
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// starvation分支直接将mutex交给FIFO队列首部的goroutine
// 通过runtime_Semrelease唤醒等待的goroutine
runtime_Semrelease(&m.sema, true)
}
}
Donate comment here