go源码解读-sync.Cond

Cond

  • Cond结构体如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type Cond struct {
noCopy noCopy // noCopy保证Cond第一次被使用之后不能被copy复制
L Locker // L为获取Cond的值或者改变Cond的值的时候需要获取的锁
notify notifyList // notifyList记录Cond将要通知的信号量列表
checker copyChecker // 用于检查copy复制的状态
}
// 初始化方法
func NewCond(l Locker) *Cond {
return &Cond{L: l}
}
type noCopy struct{}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}

Cond实现的方法

  • Wait方法会对c.L进行解锁,并挂起当前执行的线程,之后当前线程的执行
  • Wait操作实际上是会释放当前线程的锁,进入等待队列,等待被唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (c *Cond) Wait() {
// 检查是否被复制
c.checker.check()
// 将当前线程加入到等待队列中
t := runtime_notifyListAdd(&c.notify)
// 解锁
c.L.Unlock()
// 等待锁
runtime_notifyListWait(&c.notify, t)
// 加锁继续执行
c.L.Lock()
}
// 将调用线程加入到等待队列中
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
}
  • Signal将会唤醒一个等待c的线程
1
2
3
4
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
  • Broadcast将会唤醒所有等待c的线程
1
2
3
4
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func TestCondSignal(t *testing.T) {
var m sync.Mutex
c := sync.NewCond(&m)
n := 10
running := make(chan bool, n)
awake := make(chan bool, n)
for i := 0; i < n; i++ {
go func(i int) {
m.Lock()
running <- true
t.Logf("goroutine %d is waiting \n", i+1)
c.Wait()
awake <- true
t.Logf("goroutine %d is awake\n", i+1)
m.Unlock()
}(i)
}
for i := 0; i < n; i++ {
<-running // Wait for everyone to run.
}
for n > 5 {
select {
case <-awake:
t.Fatal("goroutine not asleep")
default:
}
m.Lock()
c.Signal()
m.Unlock()
<-awake // Will deadlock if no goroutine wakes up
select {
case <-awake:
t.Fatal("too many goroutines awake")
default:
}
n--
}
c.Signal()
}
/*
=== RUN TestCondSignal
--- PASS: TestCondSignal (0.00s)
cond_test.go:18: goroutine 10 is waiting
cond_test.go:18: goroutine 3 is waiting
cond_test.go:18: goroutine 1 is waiting
cond_test.go:18: goroutine 8 is waiting
cond_test.go:18: goroutine 2 is waiting
cond_test.go:18: goroutine 7 is waiting
cond_test.go:18: goroutine 9 is waiting
cond_test.go:18: goroutine 4 is waiting
cond_test.go:18: goroutine 5 is waiting
cond_test.go:18: goroutine 6 is waiting
cond_test.go:21: goroutine 10 is awake
cond_test.go:21: goroutine 3 is awake
cond_test.go:21: goroutine 1 is awake
cond_test.go:21: goroutine 8 is awake
cond_test.go:21: goroutine 2 is awake
cond_test.go:21: goroutine 7 is awake
PASS
*/
Donate comment here