我们使用 go 关键字可以轻松创建一个 goroutine, 并在其中执行我们的用户代码,快速地实现并发编程。这简单的用户层面使用语法,背后是运行时调度系统做的一系列复杂的工作,这里简单的描述一下相关的过程。
Go 语言中的调度流程与 GMP 模型的设计息息相关,但本文这里并不会去介绍相关 GMP 模型的内容,我们更希望将关注的焦点放在运行时的调度行为上。用户使用 go 关键字创建一个 goroutine 后,运行时会对其进行一系列的初始化操作,并将其加入到相关 P 的本地可运行队列中,等待一下次调度循环中被选中执行。我们关键就来看一看这一个 runtime.schedule 函数具体都干了些什么工作。
runtime.schedule 将主要的寻找可运行 g 的逻辑写到了 runtime.findRunnable 函数中,其会从下面几处查找待执行的 goroutine,
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if_p_.schedtick%61==0&&sched.runqsize>0{lock(&sched.lock)gp=globrunqget(_p_,1)unlock(&sched.lock)ifgp!=nil{returngp,false,false}}
从处理器(P)的 LRQ 中查找待执行的 Goroutine;
1
2
3
4
// local runq
ifgp,inheritTime:=runqget(_p_);gp!=nil{returngp,inheritTime,false}
从 GRQ 中查找待执行的 Goroutine;
1
2
3
4
5
6
7
8
9
// global runq
ifsched.runqsize!=0{lock(&sched.lock)gp:=globrunqget(_p_,0)unlock(&sched.lock)ifgp!=nil{returngp,false,false}}
从网络轮询器中查找是否有 Goroutine 等待运行;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
ifnetpollinited()&&atomic.Load(&netpollWaiters)>0&&atomic.Load64(&sched.lastpoll)!=0{iflist:=netpoll(0);!list.empty(){// non-blocking
gp:=list.pop()injectglist(&list)casgstatus(gp,_Gwaiting,_Grunnable)iftrace.enabled{traceGoUnpark(gp,0)}returngp,false,false}}
// Spinning Ms: steal work from other Ps.
//
// Limit the number of spinning Ms to half the number of busy Ps.
// This is necessary to prevent excessive CPU consumption when
// GOMAXPROCS>>1 but the program parallelism is low.
procs:=uint32(gomaxprocs)if_g_.m.spinning||2*atomic.Load(&sched.nmspinning)<procs-atomic.Load(&sched.npidle){if!_g_.m.spinning{_g_.m.spinning=trueatomic.Xadd(&sched.nmspinning,1)}gp,inheritTime,tnow,w,newWork:=stealWork(now)now=tnowifgp!=nil{// Successfully stole.
returngp,inheritTime,false}ifnewWork{// There may be new timer or GC work; restart to
// discover.
gototop}ifw!=0&&(pollUntil==0||w<pollUntil){// Earlier timer to wait for.
pollUntil=w}}
如果上述过程都没有找到一个能够执行的 goroutine 的话,那么就意味着目前的处理器(P)十分有空并且当前整个程序运行的工作负载压力也不大,这时 runtime.schedule 就会去尝试干其它工作。如果我们这时正处于 GC mark 阶段的话,处理器就可以运行一个任务帮助进行对象的扫描与标记。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// If we're in the GC mark phase, can safely scan and blacken objects,
// and have work to do, run idle-time marking rather than give up the P.
ifgcBlackenEnabled!=0&&gcMarkWorkAvailable(_p_)&&gcController.addIdleMarkWorker(){node:=(*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())ifnode!=nil{_p_.gcMarkWorkerMode=gcMarkWorkerIdleModegp:=node.gp.ptr()casgstatus(gp,_Gwaiting,_Grunnable)iftrace.enabled{traceGoUnpark(gp,0)}returngp,false,false}gcController.removeIdleMarkWorker()}
// return P and block
lock(&sched.lock)ifsched.gcwaiting!=0||_p_.runSafePointFn!=0{unlock(&sched.lock)gototop}ifsched.runqsize!=0{gp:=globrunqget(_p_,0)unlock(&sched.lock)returngp,false,false}ifreleasep()!=_p_{throw("findrunnable: wrong p")}pidleput(_p_)unlock(&sched.lock)
// Check all runqueues once again.
_p_=checkRunqsNoP(allpSnapshot,idlepMaskSnapshot)if_p_!=nil{acquirep(_p_)_g_.m.spinning=trueatomic.Xadd(&sched.nmspinning,1)gototop}// Check for idle-priority GC work again.
_p_,gp=checkIdleGCNoP()if_p_!=nil{acquirep(_p_)_g_.m.spinning=trueatomic.Xadd(&sched.nmspinning,1)// Run the idle worker.
_p_.gcMarkWorkerMode=gcMarkWorkerIdleModecasgstatus(gp,_Gwaiting,_Grunnable)iftrace.enabled{traceGoUnpark(gp,0)}returngp,false,false}
最后的最后,调度器还会尝试从网络轮询器中查找是否已经有准备好的 goroutine 等待执行,如果有的话就从空闲 P 队列中取出一个处理器,并返回队列中一个待执行的 goroutine。如果实在确实是找不到工作的话,就执行 stopm 将当前线程挂起进入睡眠。
list:=netpoll(delay)// block until new work is available
atomic.Store64(&sched.pollUntil,0)atomic.Store64(&sched.lastpoll,uint64(nanotime()))iffaketime!=0&&list.empty(){// Using fake time and nothing is ready; stop M.
// When all M's stop, checkdead will call timejump.
stopm()gototop}lock(&sched.lock)_p_=pidleget()unlock(&sched.lock)if_p_==nil{injectglist(&list)}else{acquirep(_p_)if!list.empty(){gp:=list.pop()injectglist(&list)casgstatus(gp,_Gwaiting,_Grunnable)iftrace.enabled{traceGoUnpark(gp,0)}returngp,false,false}
// Schedules gp to run on the current M.
// If inheritTime is true, gp inherits the remaining time in the
// current time slice. Otherwise, it starts a new time slice.
// Never returns.
//
// Write barriers are allowed because this is called immediately after
// acquiring a P in several places.
//
//go:yeswritebarrierrec
funcexecute(gp*g,inheritTimebool){_g_:=getg()// Assign gp.m before entering _Grunning so running Gs have an
// M.
_g_.m.curg=gpgp.m=_g_.mcasgstatus(gp,_Grunnable,_Grunning)gp.waitsince=0gp.preempt=falsegp.stackguard0=gp.stack.lo+_StackGuardif!inheritTime{_g_.m.p.ptr().schedtick++}// Check whether the profiler needs to be turned on or off.
hz:=sched.profilehzif_g_.m.profilehz!=hz{setThreadCPUProfiler(hz)}iftrace.enabled{// GoSysExit has to happen when we have a P, but before GoStart.
// So we emit it here.
ifgp.syscallsp!=0&&gp.sysblocktraced{traceGoSysExit(gp.sysexitticks)}traceGoStart()}gogo(&gp.sched)}
runtime.gogo 是一段用 Go 汇编写成的函数,在不同的处理器架构上有不同的实现,不过总体的实现大同小异,下面是该函数在 amd64 架构上的实现,
// func gogo(buf *gobuf)
// restore state from Gobuf; longjmp
TEXTruntime·gogo(SB),NOSPLIT,$0-8MOVQbuf+0(FP),BX// gobuf
MOVQgobuf_g(BX),DXMOVQ0(DX),CX// make sure g != nil
JMPgogo<>(SB)TEXTgogo<>(SB),NOSPLIT,$0get_tls(CX)MOVQDX,g(CX)MOVQDX,R14// set the g register
MOVQgobuf_sp(BX),SP// restore SP
MOVQgobuf_ret(BX),AXMOVQgobuf_ctxt(BX),DXMOVQgobuf_bp(BX),BPMOVQ$0,gobuf_sp(BX)// clear to help garbage collector
MOVQ$0,gobuf_ret(BX)MOVQ$0,gobuf_ctxt(BX)MOVQ$0,gobuf_bp(BX)MOVQgobuf_pc(BX),BX// 获取待执行任务函数的程序计数器
JMPBX// 执行传入 goroutine 的任务代码
// The top-most function running on a goroutine
// returns to goexit+PCQuantum.
TEXTruntime·goexit(SB),NOSPLIT|TOPFRAME,$0-0BYTE$0x90// NOP
CALLruntime·goexit1(SB)// does not return
// traceback from goexit1 must hit code range of goexit
BYTE$0x90// NOP
其中 runtime.goexit1 的实现如下,
1
2
3
4
5
6
7
8
9
10
// Finishes execution of the current goroutine.
funcgoexit1(){ifraceenabled{racegoend()}iftrace.enabled{traceGoEnd()}mcall(goexit0)}
funcgoexit0(gp*g){_g_:=getg()_p_:=_g_.m.p.ptr()casgstatus(gp,_Grunning,_Gdead)// Goroutine 状态转移
gcController.addScannableStack(_p_,-int64(gp.stack.hi-gp.stack.lo))ifisSystemGoroutine(gp,false){atomic.Xadd(&sched.ngsys,-1)}gp.m=nillocked:=gp.lockedm!=0gp.lockedm=0_g_.m.lockedg=0gp.preemptStop=falsegp.paniconfault=falsegp._defer=nil// should be true already but just in case.
gp._panic=nil// non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf=nilgp.waitreason=0gp.param=nilgp.labels=nilgp.timer=nilifgcBlackenEnabled!=0&&gp.gcAssistBytes>0{// Flush assist credit to the global pool. This gives
// better information to pacing if the application is
// rapidly creating an exiting goroutines.
assistWorkPerByte:=gcController.assistWorkPerByte.Load()scanCredit:=int64(assistWorkPerByte*float64(gp.gcAssistBytes))atomic.Xaddint64(&gcController.bgScanCredit,scanCredit)gp.gcAssistBytes=0}dropg()// 移除当前 Goroutine 与线程的关联
ifGOARCH=="wasm"{// no threads yet on wasm
gfput(_p_,gp)schedule()// never returns
}if_g_.m.lockedInt!=0{print("invalid m->lockedInt = ",_g_.m.lockedInt,"\n")throw("internal lockOSThread error")}gfput(_p_,gp)// 将 Goroutine 键入处理器的 Goroutine 空闲列表 gFree
iflocked{// The goroutine may have locked this thread because
// it put it in an unusual kernel state. Kill it
// rather than returning it to the thread pool.
// Return to mstart, which will release the P and exit
// the thread.
ifGOOS!="plan9"{// See golang.org/issue/22227.
gogo(&_g_.m.g0.sched)}else{// Clear lockedExt on plan9 since we may end up re-using
// this thread.
_g_.m.lockedExt=0}}schedule()// 重新触发新一轮的调度循环
}