Treasure / sync.pool

Created Fri, 15 Apr 2022 00:00:00 +0000
771 Words

结构分析

type Pool struct {
	noCopy noCopy

	local     unsafe.Pointer // P 本地池,固定尺寸,实际结构 [P]poolLocal,类似 void* 并附加长度构成了一个数组
	localSize uintptr        // size of the local array

	victim     unsafe.Pointer // local from previous cycle
	victimSize uintptr        // size of victims array

	New func() any
}

type poolChain struct{
    head *poolChainElt

    tail *poolChainElt
}

type poolChainElt struct{ // 一个双向链表
    poolDequeue

    next,prev *poolChainElt
}

type poolDequeue struct{
    headtail uint64

    vals []eface
}

type eface struct{  // 数据的真实内存分配,包括一个类型描述和实际数据
    typ,val unsafe.Pointer
}

type poolLocalInternal struct{
    private any // p的私有缓冲区

    shared poolChain // 公共缓冲区
}

type poolLocal struct{
    poolLocalInternal

    pad [128-unsafe.Sizeof(poolLocalInternal{})%128]byte  // 应该是补位,可以确保刚好占满一个 cache line 加速访问
}

申请释放


func (p *Pool) pin() (*poolLocal, int) {
	pid := runtime_procPin() // 将当前G和P绑定,并返回P的id
	s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	l := p.local                              // load-consume
	if uintptr(pid) < s {  // 主要是P的数量可能会变动 重新找一个
		return indexLocal(l, pid), pid
	}
	return p.pinSlow()
}

func (p *Pool) pinSlow() (*poolLocal, int) {
	runtime_procUnpin()  // 禁止 P 抢占,否则当前G会被放回本地或者全局队列,当时之后G不一定在现在这个P上
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	s := p.localSize
	l := p.local
	if uintptr(pid) < s { // double check
		return indexLocal(l, pid), pid
	}
	if p.local == nil {  // 如果本地队列为空,那么此时Pool没被初始化,加入全局池引用
		allPools = append(allPools, p)
	}

	size := runtime.GOMAXPROCS(0) // 查看现在P的个数
	local := make([]poolLocal, size) // 为这个Pool分配跟P数量相同的本地池
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
	return &local[pid], pid // 返回当前和P绑定的本地池
}

func (p *Pool) Get() any {
	if race.Enabled {
		race.Disable()
	}
	l, pid := p.pin() // 先找本地池
	x := l.private
	l.private = nil
	if x == nil { // 如果没有,那么从全局池找
		// Try to pop the head of the local shard. We prefer
		// the head over the tail for temporal locality of
		// reuse.
		x, _ = l.shared.popHead()
		if x == nil {
			x = p.getSlow(pid)
		}
	}
	runtime_procUnpin() //释放P,让其可以被抢占
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}


// Put adds x to the pool.
func (p *Pool) Put(x any) {
	if x == nil {
		return
	}
	if race.Enabled {
		if fastrandn(4) == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}
	l, _ := p.pin() // 老规矩,先禁止P被抢占
	if l.private == nil { // 本地没有 则先放入本地
		l.private = x
	} else {
		l.shared.pushHead(x) // 否则放入全局
	}
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
	}
}

GC

其实很好理解,正好是一次二级缓冲模型,第一次gc会将local放入 victim,第二gc victim不为空才会真正清理,local不会参与gc

func poolCleanup()
	for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}


	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}
	oldPools, allPools = allPools, nil