go源码解读-sync.WaitGroup

Pool

  • 一个Pool是一群对象的集合,集合可能是单独存储
  • Pool中存储的对象可以自动在池中移除而不用通知,当只有Pool持有对象的引用时,对象有可能会释放内存
  • Pool是线程安全的
  • Pool的创建用意是缓存已经分配内存且已经用完的对象,方便后续复用的时候不必申请内存,同时减轻gc的压力
  • Pool适用于管理一组对象,可以被多个独立的客户端复用,从而避免每个客户端都要申请内存。
  • 如果对象存在的声明周期很短,则不适合用Pool来管理

Pool结构体

1
2
3
4
5
6
7
8
type Pool struct {
noCopy noCopy // 确保pool在使用之后不会被复制
local unsafe.Pointer // 指向[]poolLocal的指针
localSize uintptr // poolLocal的长度
victim unsafe.Pointer // victim一次GC之后的幸存者,这个地方也是指向[]poolLocal的指针,local经过一次GC就变为了victim
victimSize uintptr // victim的长度
New func() interface{} // 当无法从Pool中获取到新的对象的时候,会调用New函数来创建个对象来返回
}
  • poolLocal是每个调度器P存储对象的结构体
  • pad防止伪共享
  • private为每个调度器的私有空间
  • shared空间当前调度器可以pushHead和popTail,所有调度器都可以popTail
1
2
3
4
5
6
7
8
9
10
11
12
13
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
  • poolChain为动态版的poolDequeue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type poolChain struct {
// push节点,由当前P来push,不需要加锁
head *poolChainElt
// get节点,所有调度器都可以操作,需要用原子方法写入和读取
tail *poolChainElt
}
// poolChain的一个节点
type poolChainElt struct {
poolDequeue // 为一个无锁,固定大小的单生产者多消费穿者的唤醒队列
next, prev *poolChainElt
}
type poolDequeue struct {
// headTail表示下标,高32位表示头下标,低32位表示尾下标,poolDequeue定义了,head tail的pack和unpack函数方便转化,实际用的时候都会mod ( len(vals) - 1 ) 来防止溢出
headTail uint64
// vals的大小必须是2的幂,因为go的内存管理策略是将内存分为2的幂大小的链表,申请2的幂大小的内存可以有效减小分配内存的开销
vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}

Pool结构体的方法

  • Put方法将对象添加到Pool中
    • pin函数将当前goroutine绑定到指定的调度器P上,同时禁止抢占,返回poolLocal对象和绑定的Pid
    • 从Pool的local中获取调度器P的poolLocal,没有需要新建
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      func (p *Pool) Put(x interface{}) {
      if x == nil { // 放入的对象为空,直接返回
      return
      }
      if race.Enabled { // 关闭竞争检测
      if fastrand()%4 == 0 {
      // Randomly drop x on floor.
      return
      }
      race.ReleaseMerge(poolRaceAddr(x))
      race.Disable() // 关闭操作
      }
      l, _ := p.pin()
      // 私有空间为空,则将x放置到私有空间
      if l.private == nil {
      l.private = x
      x = nil
      }
      // 私有空间已满,则将x放到公有空间中
      if x != nil {
      l.shared.pushHead(x)
      }
      runtime_procUnpin()
      if race.Enabled {
      race.Enable()
      }
      }
      func (p *Pool) pin() (*poolLocal, int) {
      pid := runtime_procPin() // 关闭抢占,这个goroutine工作完才释放时间片
      s := atomic.LoadUintptr(&p.localSize) // load-acquire
      l := p.local // load-consume
      if uintptr(pid) < s { // 如果p.local的长度大于pid,则直接取数据即可
      return indexLocal(l, pid), pid
      }
      // pinSlow函数来完成p.local的新增
      return p.pinSlow()
      }
      func indexLocal(l unsafe.Pointer, i int) *poolLocal {
      lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
      return (*poolLocal)(lp)
      }
      func (p *Pool) pinSlow() (*poolLocal, int) {
      // 尝试重新将goroutine绑定到其他的goroutine,查看是否有poolLocal使用
      runtime_procUnpin()
      allPoolsMu.Lock()
      defer allPoolsMu.Unlock()
      pid := runtime_procPin()
      // poolCleanup won't be called while we are pinned.
      s := p.localSize
      l := p.local
      // 新绑定的P有poolLocal空间,则直接获取返回
      if uintptr(pid) < s {
      return indexLocal(l, pid), pid
      }
      // Pool为空,则将pool添加到allPools中
      if p.local == nil {
      allPools = append(allPools, p)
      }
      // 创建一个cpu数量大小的[]poolLocal并返回
      size := runtime.GOMAXPROCS(0)
      local := make([]poolLocal, size)
      atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
      atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
      return &local[pid], pid
      }

Get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
func (p *Pool) Get() interface{} {
// 关闭竞争检测
if race.Enabled {
race.Disable()
}
// 将goroutine固定到P上,并获取其localPool和pid
l, pid := p.pin()
x := l.private
l.private = nil
// localPool的私有空间为空,则从共享头空间中pop数据
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
x, _ = l.shared.popHead()
if x == nil {
// pop数据为空,则调用getSlow方法来获取
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
// 如果没有获取到,则调用pool的New函数来完成创建
if x == nil && p.New != nil {
x = p.New()
}
return x
}
// 懒获取
func (p *Pool) getSlow(pid int) interface{} {
// 获取Pool的localSize和local
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 遍历其他调度器的polLocal,看起poptail中是否可以取出对象
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 到pool的victim中查询
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
// 私有空间有直接返回
if x := l.private; x != nil {
l.private = nil
return x
}
// 遍历victim中其他调度的poolLocal,看是否可以通过popTail获取到对象
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 没有查到,编辑victim为空,下次就不查找victim
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
Donate comment here