Go-深入理解WaitGroup

深入理解WaitGroup

waitgroup主要是用来等待一些goroutine完成

数据结构

type WaitGroup struct {
	noCopy noCopy   // 不允许拷贝,用go vet检测

	// 高32位存储counter数量,低32位存储waiter数量,最后4个字节存储sema
	state1 [3]uint32
}

主要方法

Add

func (wg *WaitGroup) Add(delta int) {
    // 获取statep和sema
	statep, semap := wg.state()
	if race.Enabled {
		_ = *statep // trigger nil deref early
		if delta < 0 {
			// Synchronize decrements with Wait.
			race.ReleaseMerge(unsafe.Pointer(wg))
		}
		race.Disable()
		defer race.Enable()
	}
	// goroutine+delta,delta可能是负数
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	v := int32(state >> 32)
	// 获取waiters
	w := uint32(state)
	if race.Enabled && delta > 0 && v == int32(delta) {
		// The first increment must be synchronized with Wait.
		// Need to model this as a read, because there can be
		// several concurrent wg.counter transitions from 0.
		race.Read(unsafe.Pointer(semap))
	}
	// add后的counter不能为负数
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	// 此时wait已经执行(waiter>0),不能再执行Add
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 没有waiter 直接return
	if v > 0 || w == 0 {
		return
	}

	// 到这里v=0, 并w>0,这里状态又不一致,说明又有别的add改变了状态
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 重置waiters为0
	*statep = 0
	// 开始批量释放信号量
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false)
	}
}

Done

Done就很简单,相当于Add(-1)

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

Wait

	statep, semap := wg.state()
	if race.Enabled {
		_ = *statep // trigger nil deref early
		race.Disable()
	}
	for {
		state := atomic.LoadUint64(statep)
		// 获取counter和waiter的值
		v := int32(state >> 32)
		w := uint32(state)
		if v == 0 {
			// counter为0,不需要等待
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
		// 增加waiter的count
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			if race.Enabled && w == 0 {
				race.Write(unsafe.Pointer(semap))
			}
			// 阻塞的获取sema
			runtime_Semacquire(semap)
			// Add都结束了,但是又有Add执行了
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
	}
}

2021-06-27 09:30 +0800