
-
从空闲process列表中获取一个process
-
从空闲列表获取一个m,或者新建一个m
-
使用newosproc()方法创建一个内核线程,并把内核线程和m以及mstart方法关联,该线程执行时会调用mstart方法
-
由该m的g0开始执行schedule方法
-
g0的作用?
-
为runtime 下调度G的工作提供栈空间,每个m都有一个g0
-
-
获取G,有三种方式
-
每处理n个任务后就去全局g队列中获取G,同时会将全局队列中一定数量的G搬运到自己的本地队列
-
从本地队列中获取。为了增加公平性,获取都从队头获取,新加入都加到队尾
-
从netpoll中获取ready的G
-
尝试4次从其他P中窃取G,同时也会将其他P的一半G窃取到自身队列中,充分利用了CPU
-
-
调用execute执行该G,最终通过汇编方法gogo传入执行栈信息(栈指针、程序计数器等)执行G
- 执行完毕后让出,执行完毕后会调用在newproc时设置的goexit方法,goexit会切换到g0释放该G
- 主动让出,time.Sleep、channel阻塞、io阻塞等场景下会主动调用gopark方法,该方法会切换到g0,将当前G的状态从 running 切换为 waiting,然后开始下一轮循环
- 抢占让出,在进程启动时会启动一个监控任务,叫sysmon,该任务会每隔一段时间去查看每个P是否有执行时间过长的G(10ms),如果有,则会标记抢占,在栈扩张时(newstack)会检查是否有抢占标记,如果有,则将该G从 running 更改为 runnable(goschedImpl),放到全局队列中等待下一次被调度到,最后再次进入schedule()
- 系统调用让出 ,该内核线程无法调度运行其他goroutine,在执行SysCall时在汇编中加入了entersyscall和exitsyscall两个方法,在进入系统调用前,会保存好执行现场,同时更改状态为 syscall ,然后标记为可抢占(因为系统调用会阻塞底层内核线程);系统调用结束后,会切换到G0,先检查是否有空闲的P,如果没有的话就放到全局可执行队列中等待被执行并进入schedule(),如果有的话,将空闲的P跟当前M绑定,并立刻执行
-
在调用方的堆栈上执行newproc1
-
优先从当前P内free列表中复用,如果没有就新分配一个
-
将入参用memmove拷贝到新G的栈指针
-
将goexit方法设置到新G的pc寄存器,用于当执行G结束后找到退出方法,从而再次进入调度循环(goexit会切换为g0重新开始schedule())
-
-
将新创建的G放到
从g切换到g0,在g0的堆栈上执行 fn(g),比如
func goexit1() {
...
mcall(goexit0)//当前执行的是g,这里的调用会切换到g0,并执行 goexit0(g)
}
goexit0分析
主要是将g各个字段重置,然后放回gfree列表便于复用,最后调用schedule()继续进入调度循环
// gp是需要被退出清理的g
// 整个goexit0是由g0执行的(在g0的堆栈上执行的)
func goexit0(gp *g) {
_g_ := getg()//这里获取到的 _g_ 是g0
casgstatus(gp, _Grunning, _Gdead)//将gp设置为 dead
if isSystemGoroutine(gp, false) {
atomic.Xadd(&sched.ngsys, -1)
}
gp.m = nil //跟m解绑
locked := gp.lockedm != 0
gp.lockedm = 0
_g_.m.lockedg = 0
gp.preemptStop = false
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = 0
gp.param = nil
gp.labels = nil
gp.timer = nil
...
dropg()//将当前执行的g0从m的current的位置拿走
...
gfput(_g_.m.p.ptr(), gp)// 将gp放回 p 的 gfree 队列进行复用
...
schedule()
}
reentersyscall分析
func reentersyscall(pc, sp uintptr) {
_g_ := getg()
// Disable preemption because during this function g is in Gsyscall status,
// but can have inconsistent g->sched, do not let GC observe it.
_g_.m.locks++
// Entersyscall must not call any function that might split/grow the stack.
// (See details in comment above.)
// Catch calls that might, by replacing the stack guard with something that
// will trip any stack check and leaving a flag to tell newstack to die.
_g_.stackguard0 = stackPreempt
_g_.throwsplit = true
// Leave SP around for GC and traceback.
save(pc, sp)
_g_.syscallsp = sp
_g_.syscallpc = pc
casgstatus(_g_, _Grunning, _Gsyscall)
if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp {
systemstack(func() {
print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]n")
throw("entersyscall")
})
}
if trace.enabled {
systemstack(traceGoSysCall)
// systemstack itself clobbers g.sched.{pc,sp} and we might
// need them later when the G is genuinely blocked in a
// syscall
save(pc, sp)
}
if atomic.Load(&sched.sysmonwait) != 0 {
systemstack(entersyscall_sysmon)
save(pc, sp)
}
if _g_.m.p.ptr().runSafePointFn != 0 {
// runSafePointFn may stack split if run on this stack
systemstack(runSafePointFn)
save(pc, sp)
}
_g_.m.syscalltick = _g_.m.p.ptr().syscalltick
_g_.sysblocktraced = true
pp := _g_.m.p.ptr()
pp.m = 0 //将 p 和 m 分离
_g_.m.oldp.set(pp)
_g_.m.p = 0 //将 m 和 p 分离
atomic.Store(&pp.status, _Psyscall) //将 p 状态标记为 syscall
if sched.gcwaiting != 0 {
systemstack(entersyscall_gcwait)
save(pc, sp)
}
_g_.m.locks--
}
func entersyscall_sysmon() {
lock(&sched.lock)
if atomic.Load(&sched.sysmonwait) != 0 {
//将全局的sched.sysmonwait置零,同时通知CPU的等待者
atomic.Store(&sched.sysmonwait, 0)
notewakeup(&sched.sysmonnote)
}
unlock(&sched.lock)
}
retake函数
func retake(now int64) uint32 {
n := 0
// Prevent allp slice changes. This lock will be completely
// uncontended unless we're already stopping the world.
// 全局p锁
lock(&allpLock)
// We can't use a range loop over allp because we may
// temporarily drop the allpLock. Hence, we need to re-fetch
// allp each time around the loop.
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
// 1. p == nil 说明通过runtime.GOMAXPROCS 动态对P进行了调大,但当前还未初始化使用 直接跳过
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
sysretake := false
// 2. p 为 running 或 系统调用
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
// G运行时间过长,立刻进行标记抢占
// 注意:因为 syscall 状态下 p 已经和 m 互相分离了,所以 preemptone 不会进行抢占,同时返回 false
preemptone(_p_)
// In case of syscall, preemptone() doesn't
// work, because there is no M wired to P.
sysretake = true
}
}
// 3. 系统调用
if s == _Psyscall {
// Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
t := int64(_p_.syscalltick)
if !sysretake && int64(pd.syscalltick) != t {
pd.syscalltick = uint32(t)
pd.syscallwhen = now
continue
}
// On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep.
if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue
}
// Drop allpLock so we can take sched.lock.
unlock(&allpLock)
// Need to decrement number of idle locked M's
// (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
incidlelocked(-1)
if atomic.Cas(&_p_.status, s, _Pidle) {
if trace.enabled {
traceGoSysBlock(_p_)
traceProcStop(_p_)
}
n++
_p_.syscalltick++
//给p重新分配一个m
handoffp(_p_)
}
incidlelocked(1)
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}
func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
// syscall 状态下, p 和 m 已经分离会在这里返回
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}
gp.preempt = true
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp.stackguard0 = stackPreempt
// Request an async preemption of this P.
if preemptMSupported && debug.asyncpreemptoff == 0 {
_p_.preempt = true
preemptM(mp)
}
return true
}
handoffp
func handoffp(_p_ *p) {
// handoffp must start an M in any situation where
// findrunnable would return a G to run on _p_.
// 如果还有未处理完的任务,则新开一个m处理
if !runqempty(_p_) || sched.runqsize != 0 {
startm(_p_, false)
return
}
// if it has GC work, start it straight away
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
startm(_p_, false)
return
}
// no local work, check that there are no spinning/idle M's,
// otherwise our help is not required
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
startm(_p_, true)
return
}
lock(&sched.lock)
if sched.gcwaiting != 0 {
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0 {
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
return
}
if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
sched.safePointFn(_p_)
sched.safePointWait--
if sched.safePointWait == 0 {
notewakeup(&sched.safePointNote)
}
}
if sched.runqsize != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
unlock(&sched.lock)
startm(_p_, false)
return
}
if when := nobarrierWakeTime(_p_); when != 0 {
wakeNetPoller(when)
}
pidleput(_p_)
unlock(&sched.lock)
}
notesleep和notewakeup
如果是要进行一次性的通知,可以使用 note。note 提供了 notesleep 和 notewakeup。不像传统的 UNIX 的 sleep/wakeup,note 是无竞争的(race-free),所以如果 notewakeup 已经发生了,那么 notesleep 将会立即返回。note 可以在使用后通过 noteclear 来重置,但是要注意 noteclear 和 notesleep、notewakeup 不能发生竞争。类似 mutex,阻塞在 note 上会阻塞整个 M。然而,note 提供了不同的方式来调用 sleep:notesleep 会阻止相关联的 G 和 P 被重新调度;notetsleepg 的表现却像一个阻塞的系统调用一样,允许 P 被重用去运行另一个 G。尽管如此,这仍然比直接阻塞一个 G 要低效,因为这需要消耗一个 M。
栈扩张go的协程设计是stackful coroutine,每一个goroutine都需要有自己的栈空间,
栈空间的内容再goroutine休眠时候需要保留的,等到重新调度时候恢复(这个时候整个调用树是完整的)。
这样就会引出一个问题,如果系统存在大量的goroutine,给每一个goroutine都预先分配一个足够的栈空间那么go就会使用过多的内存。
为了避免内存使用过多问题,go在一开始时候,会默认只为goroutine分配一个很小的栈空间,它的大小在1.92版本中是2k。
当函数发现栈空间不足时,会申请一块新的栈空间并把原来的栈复制过去。
g实例里面的g.stack、g.stackguard0两个变量来描述goroutine实例的栈。
写屏障go支持并行GC的,GC的扫描阶段和go代码可以同时运行。这样带来的问题是,GC扫描的过程中go代码的执行可能改变了对象依赖树。
比如:开始扫描时候发现根对象A和B,B拥有C的指针,GC先扫描A,然后B把C的指针交给A,GC再扫描B,这时C就不会被扫描到。
为了避免这个问题,go在GC扫描标记阶段会启用写屏障(Write Barrier)
启用了Write barrier之后,当B把C指针交给A时,GC会认为在这一轮扫描中C的指针是存活的,即使A 可能在稍后丢掉C,那么C在下一轮GC中再回收。
Write barrier只针对指针启用,而且只在GC的标记阶段启用,平时会直接把值写入到目标地址。
Q: 系统调用的过程?-
reentersyscall
-
标记抢占 并唤醒 sysmon监控任务
-
将p和m解绑
-
将p设置为 syscall
-
-
进行系统调用
-
sysmon被唤醒后执行retake
-
retake检测到 syscall 状态的 p调用handoffp给p分配一个新的m
- 如果本地还有未执行的任务,则分配一个m来执行这些任务
-
-
系统调用完成后执行 exitsyscall()
-
切换到g0将 系统调用完成的g 状态标记为可运行
-
如果没有空闲的p,将 系统调用完成的g 放到全局队列,然后通过 notesleep 停掉当前 m , 最后执行schedule,当前m等待被唤醒(比如 startm )
-
如果有空闲的p,则立刻将p跟当前的m绑定,并执行 系统调用完成的g
-
- Go 语言设计与实现-6.5 调度器
- 详尽干货!从源码角度看 Golang 的调度(上)
- 详尽干货!从源码角度看 Golang 的调度(下)
- 5.2 goroutine的生老病死
- 图解Go运行时调度器 | Tony Bai
- golang 在 runtime 中的一些骚东西
- Golang-Scheduler原理解析_惜暮-CSDN博客_go scheduler
- Goroutine调度实例简要分析 - Go语言中文网 - Golang中文社区
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)