Go-深入理解Goroutine
深入理解Goroutine
CSP
CSP模型的全称为Communicating Sequential Processes,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。相对于Actor模型,CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。
理解并发不是并行
举个排队的例子,有两个队伍,但是只有1个窗口审核,是并发,如果有两个窗口则是并行,Golang里面就是并发
MGP
M
指的是Machine,一个M关联了一个内核线程
P
指的是Processor,代表了M的上下文环境
G
指的是Goroutine
三者关系如下:
P的数量可以由runtime.GOMAXPROCS()来指定,它代表了真正的并发能力
数据结构
注:下面的代码有些地方省略显示
G(goroutine)
type g struct {
// Stack parameters.
// stack describes the actual stack memory: [stack.lo, stack.hi).
// stackguard0 is the stack pointer compared in the Go stack growth prologue.
// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
// stackguard1 is the stack pointer compared in the C stack growth prologue.
// It is stack.lo+StackGuard on g0 and gsignal stacks.
// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
// 当前g使用的栈空间,栈结构包含两个成员lo和hi
stack stack // offset known to runtime/cgo
// 检测栈是否需要扩展,用Go实现
stackguard0 uintptr // offset known to liblink
// 检测栈是否需要扩展,用C实现
stackguard1 uintptr // offset known to liblink
// 最内层的Panic
_panic *_panic // innermost panic - offset known to liblink
// 最内层的dfer
_defer *_defer // innermost defer
// 当前所绑定的M
m *m // current m; offset known to arm liblink
// 当前g的调度数据,当goroutine切换时,保留当前g的上下文,用于恢复g
sched gobuf
syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc
syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc
stktopsp uintptr // expected sp at top of stack, to check in traceback
param unsafe.Pointer // 传递唤醒参数
// 当前g的状态
atomicstatus uint32
stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
// 当前g的id
goid int64
// 下一个g的地址,通过guintptr
schedlink guintptr
waitsince int64 // 当前g阻塞的时间
waitreason waitReason // 当前g被阻塞的原因,当前g处在Gwaiting
// 是否抢占
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
paniconfault bool // panic (instead of crash) on unexpected fault address
preemptscan bool // 被强占是否进行gc
}
G的状态
_Gidle 表示Goroutine刚刚被分配,还未初始化
_Grunnable 表示Goroutine已经在运行队列中,但是还未运行代码,没有自己的堆栈
_Grunning 表示Goroutine可能在运行用户代码,已经拥有自己的堆栈,并且已经分配到M和P
_Gsyscall 表示Goroutine正在执行系统调用,分配有M
_Gwaiting 表示Goroutine被系统库Block
_Gmoribund_unused 没使用,但是在gdb脚本中硬编码
_Gdead 表示Goroutine不在使用,可能是刚退出、或者刚刚初始化,没有自己的堆栈
_Genqueue_unused 没有使用
_Gscan 表示Gc正在扫描
P(process)
type p struct {
lock mutex
id int32
status uint32 // P的状态
link puintptr
schedtick uint32 // 每次调度都增加
syscalltick uint32 // 每次系统调用时增加
sysmontick sysmontick // last tick observed by sysmon
m muintptr // 反向连接当前m,如果空闲则为nil
mcache *mcache
racectx uintptr
deferpool [5][]*_defer // pool of available defer structs of different sizes (see panic.go)
deferpoolbuf [5][32]*_defer
// 缓存goroutine ids
goidcache uint64
goidcacheend uint64
// Queue of runnable goroutines. Accessed without lock.
runqhead uint32
runqtail uint32
// 本地可运行的g,用数组实现
runq [256]guintptr
// runnext, if non-nil, is a runnable G that was ready'd by
// the current G and should be run next instead of what's in
// runq if there's time remaining in the running G's time
// slice. It will inherit the time left in the current time
// slice. If a set of goroutines is locked in a
// communicate-and-wait pattern, this schedules that set as a
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
// 如果不为nil, 则存放待运行的g
runnext guintptr
// Available G's (status == Gdead)
gFree struct {
gList
n int32
}
}
P的状态
_Pidle 空闲状态,未和M绑定
_Prunning 运行状态,已和M绑定,并且执行G
_Psyscall 系统调用,此时P中运行的G正在执行系统调用
_Pgcstop runtime正在进行GC
_Pdead 此时P不在使用(调用runtime.GOMAXPROCS减少P的数量的时候,多余的P就会处于这个状态)
M(machine)
type m struct {
g0 *g // 系统调度时候使用的g
gsignal *g // 处理信号的g
tls [6]uintptr // thead-local storage
mstartfn func()
curg *g // 当前正在运行的Goroutine
caughtsig guintptr // goroutine running during fatal signal
p puintptr // 关联p和执行p代码(不执行时为nil)
nextp puintptr // 唤醒m时, m就绑定这个p
oldp puintptr // the p that was attached before executing a syscall
id int64
mallocing int32
throwing int32
preemptoff string // if != "", keep curg running on this m
locks int32
dying int32
profilehz int32
spinning bool // m 是否是处在自旋状态,用于判断m是否空闲,如果空闲则寻找g
blocked bool // m 是否阻塞
inwb bool // m 是否在执行写屏障
newSigstack bool // minit on C thread called sigaltstack
printlock int8
incgo bool // m 执行cgo调用
ncgocall uint64 // cgo调用的数量
ncgo int32 // 当前cgo正在调用的数量
alllink *m // 链接m
schedlink muintptr
mcache *mcache
lockedg guintptr // 锁定g在当前m上
waittraceev byte
waittraceskip int
startingtrace bool
syscalltick uint32
thread uintptr // thread handle
freelink *m // on sched.freem
}
M的状态
spinning 自旋状态,此时M正在寻找P
执行go代码 M正在执行g的代码,此时拥有一个P
执行远程代码 此时M正在执行原生代码或者阻塞syscall,此时M不拥有P
休眠中 M发现没有待运行的g,则会加入到空闲列表,这时不拥有P
schedt 调度
全局调度使用的数据结构,保存有M的idle队列,P的idle队列
type schedt struct {
// 访问是原子性的
goidgen uint64
lastpoll uint64
lock mutex
// When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be
// sure to call checkdead().
midle muintptr // 空闲的M,用链表存储
nmidle int32 // 空闲M的数量
nmidlelocked int32 // M被锁定并等待中的数量
mnext int64 // 下一个M的id,用来记录创建的M的数量
maxmcount int32 // 最多允许M的数量
nmsys int32 // number of system m's not counted for deadlock
nmfreed int64 // 释放m的数量
ngsys uint32 // Goroutine的数量
pidle puintptr // 空闲的P,用链表存储
npidle uint32
nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
// 全局队列, 当本地g过多或者过少时本地的队列会跟全局队列平衡
runq gQueue
runqsize int32
// 存放释放掉的g,链表结构
gFree struct {
lock mutex
stack gList // Gs with stacks
noStack gList // Gs without stacks
n int32
}
// freem is the list of m's waiting to be freed when their
// m.exited is set. Linked through m.freelink.
// 等待释放的m
freem *m
}
创建Goroutine
newproc
func newproc(siz int32, fn *funcval) {
// 获取参数的地址
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
// 获取当前执行的g
gp := getg()
// 获取g的pc地址
pc := getcallerpc()
systemstack(func() {
// 调用g0去执行newproc1
newproc1(fn, (*uint8)(argp), siz, gp, pc)
})
}
newproc1
newproc1会创建或是获取一个空闲的g,并进行初始化,并尝试去寻找m和p去执行g
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
_g_ := getg()
if fn == nil {
_g_.m.throwing = -1 // do not dump full stacks
throw("go of nil func value")
}
_g_.m.locks++ // 加锁禁止抢占
siz := narg
siz = (siz + 7) &^ 7
// We could allocate a larger initial stack if necessary.
// Not worth it: this is almost always an error.
// 4*sizeof(uintreg): extra space added below
// sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
// 如果参数太多,就会抛出异常,栈的大小是2k
if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
throw("newproc: function arguments too large for new goroutine")
}
_p_ := _g_.m.p.ptr()
// 尝试获取一个空闲的g,如果获取不到,则新建一个,并且放到allgs里面
// gfget优先会尝试从绑定的p上获取g,如果获取不到,则会从全局中平衡一些到本地p
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
// append到allgs
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
if newg.stack.hi == 0 {
throw("newproc1: newg missing stack")
}
// 判断g的状态
if readgstatus(newg) != _Gdead {
throw("newproc1: new g is not Gdead")
}
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
// 空间大小对齐
totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
sp := newg.stack.hi - totalSize
spArg := sp
if usesLR {
// caller's LR
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}
if narg > 0 {
// 将参数拷贝入栈
memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
// This is a stack-to-stack copy. If write barriers
// are enabled and the source stack is grey (the
// destination is always black), then perform a
// barrier copy. We do this *after* the memmove
// because the destination stack may have garbage on
// it.
if writeBarrier.needed && !_g_.m.curg.gcscandone {
f := findfunc(fn.fn)
stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
if stkmap.nbit > 0 {
// We're in the prologue, so it's always stack map index 0.
bv := stackmapdata(stkmap, 0)
bulkBarrierBitmap(spArg, spArg, uintptr(bv.n)*sys.PtrSize, 0, bv.bytedata)
}
}
}
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
// 这里保存了goexit的地址,在函数调用结束后,会根据pc来执行goexit
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
}
newg.gcscanvalid = false
// Gdead => Grunnable
casgstatus(newg, _Gdead, _Grunnable)
// 如果goid用完了,则在从Sched申请一部分
if _p_.goidcache == _p_.goidcacheend {
// Sched.goidgen is the last allocated id,
// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
// At startup sched.goidgen=0, so main goroutine receives goid=1.
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcache -= _GoidCacheBatch - 1
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
// 分配goid
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
if raceenabled {
newg.racectx = racegostart(callerpc)
}
if trace.enabled {
traceGoCreate(newg, newg.startpc)
}
// 把g加入到p的可执行队列中
runqput(_p_, newg, true)
// 判断是否需要空闲的p,且是否需要一个m来执行g
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
wakep()
}
_g_.m.locks--
if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
_g_.stackguard0 = stackPreempt
}
}
gfget
看下p有没有空闲的g,如果没有,则从gFree中获取
func gfget(_p_ *p) *g {
retry:
if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
lock(&sched.gFree.lock)
// Move a batch of free Gs to the P.
for _p_.gFree.n < 32 {
// Prefer Gs with stacks.
gp := sched.gFree.stack.pop()
if gp == nil {
gp = sched.gFree.noStack.pop()
if gp == nil {
break
}
}
sched.gFree.n--
_p_.gFree.push(gp)
_p_.gFree.n++
}
unlock(&sched.gFree.lock)
goto retry
}
gp := _p_.gFree.pop()
if gp == nil {
return nil
}
_p_.gFree.n--
if gp.stack.lo == 0 {
// Stack was deallocated in gfput. Allocate a new one.
systemstack(func() {
gp.stack = stackalloc(_FixedStack)
})
gp.stackguard0 = gp.stack.lo + _StackGuard
} else {
if raceenabled {
racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
if msanenabled {
msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)
}
}
return gp
}
runqput
runqput尝试将g放入到本地队列中, 如果next是false,则runqput会把g放到runnable队尾,否则,把g放入了runnext, 如果本地队列满了,会把g放到全局队列中
func runqput(_p_ *p, gp *g, next bool) {
if randomizeScheduler && next && fastrand()%2 == 0 {
next = false
}
// 把g放到runnext中
if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}
retry:
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
// 判断p的队列长度是否超出,runq长度是256,如果超出了则放到全局队列中
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
// 将本地队列的g丢到全局队列中
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
}
至此,新建任务已经结束,剩下就等着调度就行,从上面看队列的优先级是p.runnext > p.runq > sched.runq
wakeup
newproc1创建任务后,会调用wakep 来唤醒M
func wakep() {
// be conservative about spinning threads
// 一次只允许1个M在自旋,否则退出
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
// 开始唤醒M
startm(nil, true)
}
创建M
startm
调度或者创建M来运行P,如果p==nil则会去获取一个空闲的g
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil {
_p_ = pidleget() // 如果p为nil,则尝试获取一个空闲的p
if _p_ == nil {
unlock(&sched.lock)
if spinning {
// The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
return
}
}
mp := mget()
unlock(&sched.lock)
if mp == nil {
// 如果后去不到m,则创建一个spinning为true的M,并将p绑定到M上返回
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_)
return
}
// 判断M是否为自旋
if mp.spinning {
throw("startm: m is spinning")
}
// 判断获取到的M是否有P
if mp.nextp != 0 {
throw("startm: m has p")
}
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// 这里spinning已经被父函数增加,这里只要设置spinning即可
mp.spinning = spinning
mp.nextp.set(_p_)
// 唤醒m
notewakeup(&mp.park)
}
newm
创建新的M
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn)
mp.nextp.set(_p_) // 将新建的M绑定到指定的P上
// 省略...
newm1(mp)
}
func newm1(mp *m) {
// ...省略...
newosproc(mp)
// ...省略...
}
newosproc
newosproc创建一个新的系统线程,并执行mstart_stub函数,之后调用master函数进入调度
func newosproc(mp *m) {
// 省略...
var oset sigset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
// 创建线程,并传入启动函数mstart_stub, 之后调用start
err = pthread_create(&attr, funcPC(mstart_stub), unsafe.Pointer(mp))
sigprocmask(_SIG_SETMASK, &oset, nil)
if err != 0 {
write(2, unsafe.Pointer(&failthreadcreate[0]), int32(len(failthreadcreate)))
exit(1)
}
}
allocm
allocm会分配m,首先从sched.free,然后去创建m,在初始化m
func allocm(_p_ *p, fn func()) *m {
_g_ := getg()
_g_.m.locks++ // disable GC because it can be called from sysmon
if _g_.m.p == 0 {
acquirep(_p_) // temporarily borrow p for mallocs in this function
}
// Release the free M list. We need to do this somewhere and
// this may free up a stack we can use.
// 释放掉freem
if sched.freem != nil {
lock(&sched.lock)
var newList *m
for freem := sched.freem; freem != nil; {
if freem.freeWait != 0 {
next := freem.freelink
freem.freelink = newList
newList = freem
freem = next
continue
}
stackfree(freem.g0.stack)
freem = freem.freelink
}
sched.freem = newList
unlock(&sched.lock)
}
mp := new(m)
// 启动函数,这个fn就是下面这个函数
// var fn func()
// if spinning {
// fn = mspinning
// }
mp.mstartfn = fn
// 初始化m
mcommoninit(mp)
// In case of cgo or Solaris or Darwin, pthread_create will make us a stack.
// Windows and Plan 9 will layout sched stack on OS stack.
if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {
mp.g0 = malg(-1)
} else {
mp.g0 = malg(8192 * sys.StackGuardMultiplier)
}
mp.g0.m = mp
if _p_ == _g_.m.p.ptr() {
releasep()
}
_g_.m.locks--
if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
_g_.stackguard0 = stackPreempt
}
return mp
}
执行
mstart
func mstart() {
// ...省略...
// 调用mstart1 来处理
mstart1()
// ...省略...
}
func mstart1() {
_g_ := getg()
if _g_ != _g_.m.g0 {
throw("bad runtime·mstart")
}
// Record the caller for use as the top of stack in mcall and
// for terminating the thread.
// We're never coming back to mstart1 after we call schedule,
// so other calls can reuse the current frame.
// 保存程序的PC和SP值
save(getcallerpc(), getcallersp())
asminit()
// 初始化m的栈
minit()
// handle signals
if _g_.m == &m0 {
mstartm0()
}
// 如果设置mastartfn, 则执行fn
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
if _g_.m != &m0 {
// 获取nextp
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
schedule()
}
acquirep
acquirep 主要是关联当前的m和p
func acquirep(_p_ *p) {
// Do the part that isn't allowed to have write barriers.
wirep(_p_)
// Have p; write barriers now allowed.
// Perform deferred mcache flush before this P can allocate
// from a potentially stale mcache.
_p_.mcache.prepareForSweep()
if trace.enabled {
traceProcStart()
}
}
wirep
wirep会绑定m和p
func wirep(_p_ *p) {
_g_ := getg()
if _g_.m.p != 0 || _g_.m.mcache != nil {
throw("wirep: already in go")
}
if _p_.m != 0 || _p_.status != _Pidle {
id := int64(0)
if _p_.m != 0 {
id = _p_.m.ptr().id
}
print("wirep: p->m=", _p_.m, "(", id, ") p->status=", _p_.status, "\n")
throw("wirep: invalid p state")
}
// m和p的cache共享
_g_.m.mcache = _p_.mcache
// 将m和p进行绑定
_g_.m.p.set(_p_)
_p_.m.set(_g_.m)
// p的状态变为running
_p_.status = _Prunning
}
schedule 调度
func schedule() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("schedule: holding locks")
}
// 如果有lockedg,停止当前M
if _g_.m.lockedg != 0 {
// 解除lockedm的锁定,并执行当前g
stoplockedm()
execute(_g_.m.lockedg.ptr(), false) // Never returns.
}
// We should not schedule away from a g that is executing a cgo call,
// since the cgo call is using the m's g0 stack.
if _g_.m.incgo {
throw("schedule: in cgo")
}
top:
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _g_.m.p.ptr().runSafePointFn != 0 {
runSafePointFn()
}
var gp *g
var inheritTime bool
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
}
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
}
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
// 每61s从全局队列获取g
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
if gp == nil {
// 全局获取不到runable的g,从local获取
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
// 如果依赖没有获取到g,则从开始查找,从local p、global p、全局io poll获取待运行的g
gp, inheritTime = findrunnable() // blocks until work is available
}
// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
if _g_.m.spinning {
resetspinning()
}
if sched.disable.user && !schedEnabled(gp) {
// Scheduling of this goroutine is disabled. Put it on
// the list of pending runnable goroutines for when we
// re-enable user scheduling and look again.
lock(&sched.lock)
if schedEnabled(gp) {
// Something re-enabled scheduling while we
// were acquiring the lock.
unlock(&sched.lock)
} else {
sched.disable.runnable.pushBack(gp)
sched.disable.n++
unlock(&sched.lock)
goto top
}
}
if gp.lockedm != 0 {
// Hands off own p to the locked m,
// then blocks waiting for a new p.
// 如果g有lockedm,则休眠上交p,休眠m,等待新m,唤醒后跳转到top
startlockedm(gp)
goto top
}
// 开始执行这个g
execute(gp, inheritTime)
}
execute
开始准备执行
func execute(gp *g, inheritTime bool) {
_g_ := getg()
// 更新g的状态,标记为不允许抢占
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
_g_.m.curg = gp
gp.m = _g_.m
// Check whether the profiler needs to be turned on or off.
hz := sched.profilehz
if _g_.m.profilehz != hz {
setThreadCPUProfiler(hz)
}
if trace.enabled {
// GoSysExit has to happen when we have a P, but before GoStart.
// So we emit it here.
if gp.syscallsp != 0 && gp.sysblocktraced {
traceGoSysExit(gp.sysexitticks)
}
traceGoStart()
}
// 执行g的代码
gogo(&gp.sched)
}
goexit
goroutine退出显示调用goexit1, goexit1会调用goexit0
// goexit continuation on g0.
func goexit0(gp *g) {
_g_ := getg()
// 将g状态更改为Dead
casgstatus(gp, _Grunning, _Gdead)
if isSystemGoroutine(gp, false) {
atomic.Xadd(&sched.ngsys, -1)
}
// 清空g的状态
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
_g_.m.lockedg = 0
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil
if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
// Flush assist credit to the global pool. This gives
// better information to pacing if the application is
// rapidly creating an exiting goroutines.
scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes))
atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)
gp.gcAssistBytes = 0
}
// Note that gp's stack scan is now "valid" because it has no
// stack.
gp.gcscanvalid = true
dropg()
if GOARCH == "wasm" { // no threads yet on wasm
gfput(_g_.m.p.ptr(), gp)
schedule() // never returns
}
if _g_.m.lockedInt != 0 {
print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
throw("internal lockOSThread error")
}
// 把g放到到空闲列表中,以备复用
gfput(_g_.m.p.ptr(), gp)
if locked {
// The goroutine may have locked this thread because
// it put it in an unusual kernel state. Kill it
// rather than returning it to the thread pool.
// Return to mstart, which will release the P and exit
// the thread.
if GOOS != "plan9" { // See golang.org/issue/22227.
gogo(&_g_.m.g0.sched)
} else {
// Clear lockedExt on plan9 since we may end up re-using
// this thread.
_g_.m.lockedExt = 0
}
}
// 再次开启调度
schedule()
}
findrunnable
主要功能:寻找可运行的g,主要流程
- 从p的local队列中寻找可运行的g
- 从全局队列中寻找可以运行的g
- 从netpoll中获取一个已经准备好的g
- 从其他p的local队列中偷取可运行的g,并且会随机偷取1个next g
- 再试着从netpoll中获取准备的g,阻塞获取
- 没有的话, 就stopm
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _p_.runSafePointFn != 0 {
runSafePointFn()
}
if fingwait && fingwake {
if gp := wakefing(); gp != nil {
ready(gp, 0, true)
}
}
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
// 从本地队列中寻找可运行的g
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// 从全局队列中寻找可运行的g
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 看看netpoll中有没有已经准备好的g
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(false); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
// Steal work from other P's.
// 从其他的P中偷取g
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
// 如果sched.npidle == procs-1 ,说明除了我们其他的p都是空闲的,无需偷取
// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
// New work can appear from returning syscall/cgocall, network or timers.
// Neither of that submits to local run queues, so no point in stealing.
goto stop
}
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
// 如果当前处在自旋状态M大于非空闲P的一半,则不再查找,避免CPU过度损耗
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
// 修改M的自旋状态
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
// 开始偷取他的P上待运行的g
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
// 优先寻找待运行g超过1的P
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
// 偷取1半的任务,并且根据stealRunNextG判断是否从_p_.runnext随机偷取
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
// We have nothing to do. If we're in the GC mark phase, can
// safely scan and blacken objects, and have work to do, run
// idle-time marking rather than give up the P.
if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
gp := _p_.gcBgMarkWorker.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
// wasm only:
// If a callback returned and no other goroutine is awake,
// then pause execution until a callback was triggered.
if beforeIdle() {
// At least one goroutine got woken.
goto top
}
// Before we drop our P, make a snapshot of the allp slice,
// which can change underfoot once we no longer block
// safe-points. We don't need to snapshot the contents because
// everything up to cap(allp) is immutable.
allpSnapshot := allp
// return P and block
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
pidleput(_p_)
unlock(&sched.lock)
// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// check all runqueues once again
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
//省略
// 从io poll中获取运行的g
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
list := netpoll(true) // block until new work is available
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if !list.empty() {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
injectglist(&list)
}
}
stopm()
goto top
}
系统监控 sysmon
sysmon 是go的监控,主要完成下面几件事:
- 释放闲置超过5分钟的span物理内存
- 如果超过2分钟没有执行垃圾回收,则强制执行
- 将长时间未处理的netpoll添加到全局队列中
- 对长时间运行中的g发起抢占
- 收回因为syscall而长时间阻塞的p
监控不是时刻都在运行,它自己有个唤醒周期,第一次是20us, 每次执行完,增加1倍休眠时间,最大休眠时间是10ms
Retake 偷取
// forcePreemptNS is the time slice given to a G before it is
// preempted.
const forcePreemptNS = 10 * 1000 * 1000 // 10ms
func retake(now int64) uint32 {
n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world.
lock(&allpLock)
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
// 只有sysmon才更新,所以第一次不会抢占,而是第二次开始抢占,至少间隔20us,最多10ms
t := int64(_p_.syscalltick)
if int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
// 是否有空P, 是否有寻找p的m以及空闲的p,以及当p在syscall后超过10ms的
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)
// Need to decrement number of idle locked M's
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1)
// 抢占p,把p的状态改为idle
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
handoffp(_p_)
}
incidlelocked(1)
lock(&allpLock)
} else if s == _Prunning {
// Preempt G if it's running for too long.
// 如果p是running,并且执行时间过长,则抢占
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t { // 在周期内已经调度过,即当前p上运行的g改变过
pd.schedtick = uint32(t)
pd.schedwhen = now // 更新最后一次抢占检测时间
continue
}
// 判断是否超出10ms,不超过则不抢占
if pd.schedwhen+forcePreemptNS > now {
continue
}
// 开始抢占
preemptone(_p_)
}
}
unlock(&allpLock)
return uint32(n)
}
handoffp
handoffp 主要的功能是
func handoffp(_p_ *p) {
// handoffp must start an M in any situation where
// findrunnable would return a G to run on _p_.
// 如果本地有待运行的任务,则立即开始调度
if !runqempty(_p_) || sched.runqsize != 0 {
startm(_p_, false)
return
}
// if it has GC work, start it straight away
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
startm(_p_, false)
return
}
// no local work, check that there are no spinning/idle M's,
// otherwise our help is not required
// 如果本地没有任务,则开始检查有没有自旋和空闲的P,如果有则开始调度
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
startm(_p_, true)
return
}
lock(&sched.lock)
if sched.gcwaiting != 0 {
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
return
}
if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
sched.safePointFn(_p_)
sched.safePointWait--
if sched.safePointWait == 0 {
notewakeup(&sched.safePointNote)
}
}
// 判断全局g队列为不为空,如果不为空,则尝试开始调度
if sched.runqsize != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
// 如果这个没有人运行poll networ,则需要唤醒一个M去执行
if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
// 如果都没有,则把p放到空闲队列中
pidleput(_p_)
unlock(&sched.lock)
}
####preemptone 抢占 当g发现栈不够用,就会去进行栈扩张(调用newstack分配一个新站),
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
// 设置抢占
gp.preempt = true
// stackPreempt是个特殊的常量,比任何栈地址都大
gp.stackguard0 = stackPreempt
return true
}
newstack的抢占判断逻辑
// 发现是抢占,则会重新调度
if preempt {
if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
// Let the goroutine keep running for now.
// gp->preempt is set, so it will be preempted next time.
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}
}
Gdead– newproc –> Grunable – execute –> Grunning –> goexit0 -> Gdead
总结
当调用go func的时候,大致流程如下
graph LR
G的默认状态(Gidle) --newproc--> 生成G(Gdead)
生成G(Gdead) --空闲的P&&sched不在自旋--> 开始尝试唤醒M
开始尝试唤醒M --M不足则新建 --> 获取M
获取M -- startm --> 启动M
启动M -- 等待调度 --> schedule调度
schedule调度 -- 获取待运行的g --> 获取成功
获取g成功 -- 运行g--> G状态更新为Grunning
G状态更新为Grunning -- goexit --> 执行完毕G的状态变为Gdead