基于 Go 1.4,相關(guān)文件位于 src/runtime 目錄。文章忽略了 32bit 代碼,有興趣的可自行查看源碼文件。為便于閱讀,示例代碼做過裁剪。
Go 內(nèi)存分配器基于 tcmalloc 模型,這在 malloc.h 頭部注釋中有明確說明。
Memory allocator, based on tcmalloc.
http://goog-perftools.sourceforge.net/doc/tcmalloc.html
核心目標(biāo)很簡單:
分配器以頁為單位向操作系統(tǒng)申請大塊內(nèi)存。這些大塊內(nèi)存由 n 個地址連續(xù)的頁組成,并用名為 span 的對象進(jìn)行管理。
malloc.h
PageShift" = 13,
PageSize" = 1<<PageShift, // 8192 bytes
當(dāng)需要時,span 所管理內(nèi)存被切分成多個大小相等的小塊,每個小塊可存儲一個對象,故稱作 object。
分配器以 32KB 為界,將對象分為大小兩種。
malloc.h
MaxSmallSize = 32<<10,
大對象直接找一個大小合適的 span,這個無需多言。小對象則以 8 的倍數(shù)分為不同大小等級 (size class)。比如 class1 為 8 字節(jié),可存儲 1 ~ 8 字節(jié)大小的對象。
NumSizeClasses = 67,
當(dāng)然,實際的對應(yīng)規(guī)則并不是連續(xù)和固定的,會根據(jù)一些經(jīng)驗和測試結(jié)果進(jìn)行調(diào)整,以獲得最佳的性能和內(nèi)存利用率。
malloc.h
// Size classes. Computed and initialized by InitSizes.
//
// SizeToClass(0 <= n <= MaxSmallSize) returns the size class,
//" 1 <= sizeclass < NumSizeClasses, for n.
//" Size class 0 is reserved to mean "not small".
//
// class_to_size[i] = largest size in class i
// class_to_allocnpages[i] = number of pages to allocate when
//" making new objects in class i
int32" runtime·SizeToClass(int32);
extern"int32" runtime·class_to_size[NumSizeClasses];
extern"int32" runtime·class_to_allocnpages[NumSizeClasses];
extern"int8" runtime·size_to_class8[1024/8 + 1];
extern"int8" runtime·size_to_class128[(MaxSmallSize-1024)/128 + 1];
為了管理好內(nèi)存,分配器使用三級組件來完成不同操作。
簡單描述一下內(nèi)存分配和回收流程。
分配流程:
回收流程:
從 heap 申請和回收 span 的過程中,分配器會嘗試合并地址相鄰的 span 塊,以形成更大內(nèi)存塊,減少碎片。
分配器管理算法依賴連續(xù)內(nèi)存地址。因此,在初始化時,分配器會預(yù)留一塊巨大的虛擬地址空間。該空間被成三個部分:
在 64 位系統(tǒng)下,arena 最大容量是 128GB,bitmap 8GB,spans 128MB。這些內(nèi)存并非一次性分配,而是隨著 arena 線性增加,每個區(qū)域都有指針標(biāo)記當(dāng)前分配位置。
malloc.h
struct MHeap
{
// span lookup
MSpan** spans;
uintptr spans_mapped;
// range of addresses we might see in the heap
byte *bitmap;
uintptr bitmap_mapped;
byte *arena_start;
byte *arena_used;
byte *arena_end;
bool arena_reserved;
};
虛擬地址預(yù)留操作并非物理內(nèi)存分配,因此看到 “Hello, World” 消耗上百 GB “內(nèi)存”,無需大驚小怪。
在運行時初始化時,會調(diào)用內(nèi)存分配器初始化函數(shù)。
proc.c
void runtime·schedinit(void)
{
runtime·mallocinit();
}
malloc.c
void runtime·mallocinit(void)
{
// 初始化 size class 反查表。
runtime·InitSizes();
// 64-bit
if(sizeof(void*) == 8 && (limit == 0 || limit > (1<<30))) {
arena_size = MaxMem; // 128GB
bitmap_size = arena_size / (sizeof(void*)*8/4); // 8GB
spans_size = arena_size / PageSize * sizeof(runtime·mheap.spans[0]);
spans_size = ROUND(spans_size, PageSize); // 128MB
// 嘗試從 0xc000000000 開始設(shè)置保留地址。
// 如果失敗,則嘗試 0x1c000000000 ~ 0x7fc000000000。
for(i = 0; i <= 0x7f; i++) {
p = (void*)(i<<40 | 0x00c0ULL<<32);
p_size = bitmap_size + spans_size + arena_size + PageSize;
p = runtime·SysReserve(p, p_size, &reserved);
if(p != nil)
break;
}
}
// 32-bit
if (p == nil) {
// 忽略
}
// 按 PageSize 對齊地址。
// 分配器使用 Address<<PageShift 作為 PageID。
p1 = (byte*)ROUND((uintptr)p, PageSize);
// 設(shè)定不同區(qū)域的起始地址。
runtime·mheap.spans = (MSpan**)p1;
runtime·mheap.bitmap = p1 + spans_size;
runtime·mheap.arena_start = p1 + spans_size + bitmap_size;
runtime·mheap.arena_used = runtime·mheap.arena_start;
runtime·mheap.arena_end = p + p_size;
runtime·mheap.arena_reserved = reserved;
// 初始化 heap 和當(dāng)前 cache。
runtime·MHeap_Init(&runtime·mheap);
g->m->mcache = runtime·allocmcache();
}
內(nèi)存地址預(yù)留操作通過 mmap PORT_NONE 實現(xiàn)。不過,在 darwin/OSX 中,并未使用 MAP_FIXED 參數(shù),因此未必從 0xc000000000 開始。
mem_darwin.c
void* runtime·SysReserve(void *v, uintptr n, bool *reserved)
{
void *p;
*reserved = true;
p = runtime·mmap(v, n, PROT_NONE, MAP_ANON|MAP_PRIVATE, -1, 0);
if(p < (void*)4096)
return nil;
return p;
}
分配器根對象 heap 的初始化工作,主要是幾個 span 管理鏈表和 central 數(shù)組的創(chuàng)建。
malloc.h
MaxMHeapList = 1<<(20 - PageShift), // Maximum page length for fixed-size list in MHeap.
struct MHeap
{
MSpan free[MaxMHeapList]; // free lists of given length
MSpan busy[MaxMHeapList]; // busy lists of large objects of given length
MSpan freelarge; // free lists length >= MaxMHeapList
MSpan busylarge; // busy lists of large objects length >= MaxMHeapList
struct MHeapCentral {
MCentral mcentral;
byte pad[CacheLineSize];
} central[NumSizeClasses];
};
其中,free 和 busy 數(shù)組以 span 頁數(shù)為序號管理多個鏈表。當(dāng) central 有需要時,只需從 free 找到頁數(shù)合適的鏈表,從中提取可用 span 即可。busy 記錄的自然是已經(jīng)被使用的 span。
至于 large 鏈表,用于保存所有超出 free/busy 頁數(shù)限制的 span。
mheap.c
void runtime·MHeap_Init(MHeap *h)
{
uint32 i;
// 初始化一些管理類型的固定分配器。
runtime·FixAlloc_Init(&h->spanalloc, sizeof(MSpan), RecordSpan, ...);
runtime·FixAlloc_Init(&h->cachealloc, sizeof(MCache), ...);
runtime·FixAlloc_Init(&h->specialfinalizeralloc, sizeof(SpecialFinalizer), ...);
runtime·FixAlloc_Init(&h->specialprofilealloc, sizeof(SpecialProfile), ...);
// 初始化 free/busy 數(shù)組。
for(i=0; i<nelem(h->free); i++) {
runtime·MSpanList_Init(&h->free[i]);
runtime·MSpanList_Init(&h->busy[i]);
}
// 初始化 large 鏈表。
runtime·MSpanList_Init(&h->freelarge);
runtime·MSpanList_Init(&h->busylarge);
// 創(chuàng)建所有等級的 central 對象。
for(i=0; i<nelem(h->central); i++)
runtime·MCentral_Init(&h->central[i].mcentral, i);
}
像 span、cache 這類管理對象,并不從 arena 區(qū)域分配,而是使用專門的 FixAlloc 分配器單獨管理。其具體實現(xiàn)細(xì)節(jié)可參考后續(xù)章節(jié)。
在 span 內(nèi)部有兩個指針,用于將多個對象串成雙向鏈表。
malloc.h
struct MSpan
{
MSpan *next; // in a span linked list
MSpan *prev; // in a span linked list
pageID start; // starting page number
uintptr npages; // number of pages in span
MLink *freelist; // list of free objects
uint8 sizeclass; // size class
uint8 state; // MSpanInUse etc
uintptr elemsize; // computed from sizeclass or from npages
};
mheap.c
void runtime·MSpanList_Init(MSpan *list)
{
list->state = MSpanListHead;
list->next = list;
list->prev = list;
}
至于 central,同樣是完成兩個 span 管理鏈表的初始化操作。其中 nonempty 鏈表保存有剩余 object 空間,等待被 cache 獲取的 span。而 empty 則保存沒有剩余空間或已被 cache 獲取的 span。
malloc.h
struct MCentral
{
int32 sizeclass;
MSpan nonempty; // list of spans with a free object
MSpan empty; // list of spans with no free objects (or cached in an MCache)
};
mcentral.c
void runtime·MCentral_Init(MCentral *c, int32 sizeclass)
{
c->sizeclass = sizeclass;
runtime·MSpanList_Init(&c->nonempty);
runtime·MSpanList_Init(&c->empty);
}
最后,用固定分配器創(chuàng)建 cache 對象,并初始化其 alloc 數(shù)組。
malloc.h
struct MCache
{
MSpan* alloc[NumSizeClasses]; // spans to allocate from
};
mcache.c
// dummy MSpan that contains no free objects.
MSpan runtime·emptymspan;
MCache* runtime·allocmcache(void)
{
// 使用固定分配器創(chuàng)建 cache 對象。
c = runtime·FixAlloc_Alloc(&runtime·mheap.cachealloc);
// 初始化內(nèi)存。
runtime·memclr((byte*)c, sizeof(*c));
// 初始化 alloc 數(shù)組,用來保存從 central 獲取的不同等級 span 對象。
for(i = 0; i < NumSizeClasses; i++)
c->alloc[i] = &runtime·emptymspan;
return c;
}
相關(guān)包裝函數(shù),最終通過 mallocgc 函數(shù)完成內(nèi)存分配操作。
malloc.go
func newobject(typ *_type) unsafe.Pointer {
return mallocgc(uintptr(typ.size), typ, flags)
}
func newarray(typ *_type, n uintptr) unsafe.Pointer {
return mallocgc(uintptr(typ.size)*n, typ, flags)
}
在分配過程中,需要判斷大小對象,還有對小于 16 字節(jié)的微小對象做額外處理。
malloc.h
MaxSmallSize = 32<<10,
TinySize = 16,
TinySizeClass = 2,
malloc.go
func mallocgc(size uintptr, typ *_type, flags uint32) unsafe.Pointer {
// 當(dāng)前 cache 對象。
c := gomcache()
var s *mspan
var x unsafe.Pointer
// 判斷是否小對象。
if size <= maxSmallSize {
// 對于小于 16 字節(jié)的微小對象,做額外處理。
if flags&flagNoScan != 0 && size < maxTinySize {
// 獲取當(dāng)前 cache tiny 塊剩余大小。
tinysize := uintptr(c.tinysize)
// 如果 tiny 塊空間足夠...
if size <= tinysize {
tiny := unsafe.Pointer(c.tiny)
// 地址對齊。
if size&7 == 0 {
tiny = roundup(tiny, 8)
} else if size&3 == 0 {
tiny = roundup(tiny, 4)
} else if size&1 == 0 {
tiny = roundup(tiny, 2)
}
// 實際大小 = 對象大小 + 對齊所需大小(對齊后地址 - 原地址)。
size1 := size + (uintptr(tiny) - uintptr(unsafe.Pointer(c.tiny)))
// 再次判斷空間是否足夠...
if size1 <= tinysize {
// x = 對齊后地址
x = tiny
// 調(diào)整剩余空間記錄。
c.tiny = (*byte)(add(x, size))
c.tinysize -= uintptr(size1)
c.local_tinyallocs++
return x
}
}
// 如果 tiny 塊空間不足,則從 alloc[2] 獲取新的 tiny/object 塊。
s = c.alloc[tinySizeClass]
v := s.freelist
// 如果該 span 沒有可用 object ...
if v == nil {
// 從 central 獲取新的 span。
mp := acquirem()
mp.scalararg[0] = tinySizeClass
onM(mcacheRefill_m)
releasem(mp)
// 獲取 tiny/object 塊。
s = c.alloc[tinySizeClass]
v = s.freelist
}
// 提取 tiny 塊后,調(diào)整 span.freelist 鏈表。
s.freelist = v.next
s.ref++
// 初始化 tiny 塊內(nèi)存。
x = unsafe.Pointer(v)
(*[2]uint64)(x)[0] = 0
(*[2]uint64)(x)[1] = 0
// 如果新 tiny 塊剩余空間大于原 tiny 塊,那么就換一下。
if maxTinySize-size > tinysize {
// 調(diào)整剩余位置指針和大小。
c.tiny = (*byte)(add(x, size))
c.tinysize = uintptr(maxTinySize - size)
}
size = maxTinySize
} else { // 普通小對象
var sizeclass int8
// 計算對應(yīng)的等級。
if size <= 1024-8 {
sizeclass = size_to_class8[(size+7)>>3]
} else {
sizeclass = size_to_class128[(size-1024+127)>>7]
}
size = uintptr(class_to_size[sizeclass])
// 從 alloc 數(shù)組獲取對應(yīng)的 span。
s = c.alloc[sizeclass]
// 從 span 鏈表提取 object。
v := s.freelist
// 如果 span 沒有剩余 object,則從 central 獲取新的 span。
if v == nil {
mp := acquirem()
mp.scalararg[0] = uintptr(sizeclass)
onM(mcacheRefill_m)
releasem(mp)
s = c.alloc[sizeclass]
v = s.freelist
}
// 調(diào)整 span 鏈表。
s.freelist = v.next
s.ref++
// 初始化內(nèi)存。
x = unsafe.Pointer(v)
if flags&flagNoZero == 0 {
v.next = nil
if size > 2*ptrSize && ((*[2]uintptr)(x))[1] != 0 {
memclr(unsafe.Pointer(v), size)
}
}
}
c.local_cachealloc += intptr(size)
} else { // 大對象
mp := acquirem()
mp.scalararg[0] = uintptr(size)
mp.scalararg[1] = uintptr(flags)
// 直接從 heap 分配一個適用的 span。
// onM 是切換到 M.g0 棧執(zhí)行函數(shù),相關(guān)細(xì)節(jié)參考后續(xù)章節(jié)。
onM(largeAlloc_m)
s = (*mspan)(mp.ptrarg[0])
mp.ptrarg[0] = nil
releasem(mp)
x = unsafe.Pointer(uintptr(s.start << pageShift))
size = uintptr(s.elemsize)
}
// 在 bitmap 做標(biāo)記。
{
arena_start := uintptr(unsafe.Pointer(mheap_.arena_start))
off := (uintptr(x) - arena_start) / ptrSize
xbits := (*uint8)(unsafe.Pointer(arena_start - off/wordsPerBitmapByte - 1))
shift := (off % wordsPerBitmapByte) * gcBits
// ...
}
marked:
// 檢查分配計數(shù)器,以決定是否觸發(fā)垃圾回收操作。
if memstats.heap_alloc >= memstats.next_gc {
gogc(0)
}
return x
}
函數(shù)雖然有點長,但不算太復(fù)雜。
malloc.h
struct MCache
{
// Allocator cache for tiny objects w/o pointers.
byte* tiny;
uintptr tinysize;
MSpan* alloc[NumSizeClasses]; // spans to allocate from
};
除基本的分配操作外,還需要關(guān)注內(nèi)存不足時的 “擴張” 過程。這需要一點耐心和細(xì)心。
首先,當(dāng) cache.alloc[] 中對應(yīng)的 span 沒有剩余 object 時,會觸發(fā)從 central 獲取新span 操作。
malloc.c
void runtime·mcacheRefill_m(void)
{
runtime·MCache_Refill(g->m->mcache, (int32)g->m->scalararg[0]);
}
mcache.c
MSpan* runtime·MCache_Refill(MCache *c, int32 sizeclass)
{
MSpan *s;
// 當(dāng)前沒有剩余空間的 span。
s = c->alloc[sizeclass];
if(s->freelist != nil)
runtime·throw("refill on a nonempty span");
// 取消 incache 標(biāo)記。
if(s != &runtime·emptymspan)
s->incache = false;
// 從 heap.central[] 數(shù)組找到對應(yīng)的 central,并獲取新的 span。
s = runtime·MCentral_CacheSpan(&runtime·mheap.central[sizeclass].mcentral);
// 保存到 cache.alloc 數(shù)組。
c->alloc[sizeclass] = s;
return s;
}
從 central 新獲取的 span 會替代原有對象,被保存到 alloc 數(shù)組中。
需要提前說明一點背景知識:從 Go 1.3 開始,垃圾回收算法就有很大變動。其中標(biāo)記階段需要執(zhí)行 StopTheWorld,然后用多線程并發(fā)執(zhí)行標(biāo)記操作。待標(biāo)記結(jié)束后,立即恢復(fù)StartTheWorld,用單獨的 goroutine 執(zhí)行清理操作。
因此在執(zhí)行 CacheSpan 時,某些 span 可能還未完成清理。此時主動觸發(fā)回收操作,有助于提高內(nèi)存復(fù)用率,避免向操作系統(tǒng)過度申請內(nèi)存。
malloc.h
sweep generation:
if sweepgen == h->sweepgen - 2, the span needs sweeping
if sweepgen == h->sweepgen - 1, the span is currently being swept
if sweepgen == h->sweepgen, the span is swept and ready to use
h->sweepgen is incremented by 2 after every GC
mcentral.c
MSpan* runtime·MCentral_CacheSpan(MCentral *c)
{
// 當(dāng)前垃圾回收代齡 (隨每次回收操作遞增)。
sg = runtime·mheap.sweepgen;
retry:
// 嘗試從 nonempty 鏈表中獲取可用 span。
for(s = c->nonempty.next; s != &c->nonempty; s = s->next) {
// 如果 span 標(biāo)記為等待回收,那么主動執(zhí)行清理操作。
if(s->sweepgen == sg-2 && runtime·cas(&s->sweepgen, sg-2, sg-1)) {
// 將 span 移動到鏈表尾部。
runtime·MSpanList_Remove(s);
runtime·MSpanList_InsertBack(&c->empty, s);
// 執(zhí)行垃圾清理。
runtime·MSpan_Sweep(s, true);
goto havespan;
}
// 如果正在后臺回收,則跳過。
if(s->sweepgen == sg-1) {
// the span is being swept by background sweeper, skip
continue;
}
// 可用 span,將其轉(zhuǎn)移到 empty 鏈表。
runtime·MSpanList_Remove(s);
runtime·MSpanList_InsertBack(&c->empty, s);
goto havespan;
}
// 嘗試從 emtpy 鏈表獲取 span,目標(biāo)是那些等待清理的 span。
for(s = c->empty.next; s != &c->empty; s = s->next) {
// 如果是等待回收的 span,主動執(zhí)行回收操作。
if(s->sweepgen == sg-2 && runtime·cas(&s->sweepgen, sg-2, sg-1)) {
// 將該 span 移到 empty 鏈表尾部。
runtime·MSpanList_Remove(s);
runtime·MSpanList_InsertBack(&c->empty, s);
// 執(zhí)行垃圾清理操作。
runtime·MSpan_Sweep(s, true);
// 如果回收后 freelist 鏈表不為空,表示有可用空間。
if(s->freelist != nil)
goto havespan;
goto retry;
}
// 如果正在后臺回收,跳過。
if(s->sweepgen == sg-1) {
continue;
}
// 處理過的 span,其代齡都已經(jīng)標(biāo)記為 sg,終止嘗試。
break;
}
// 如果 central 中沒有找到可用 span,則向 heap 獲取新的 span。
s = MCentral_Grow(c);
if(s == nil)
return nil;
// 將 span 插入到 empty 鏈表。
runtime·MSpanList_InsertBack(&c->empty, s);
havespan:
// 設(shè)置待返回 span 的相關(guān)屬性。
cap = (s->npages << PageShift) / s->elemsize;
n = cap - s->ref;
// 標(biāo)記被 cache 使用。
s->incache = true;
return s;
}
相比 Go 1.3,cache 部分又做了很大的改進(jìn)。代碼更加簡潔,流程也更加清晰。
而當(dāng) central 空間不足時,就需要從 heap 獲取新 span 來完成擴張操作。這其中就包括對 span 所管理內(nèi)存進(jìn)行切分,形成 object freelist 鏈表。
mcentral.c
static MSpan* MCentral_Grow(MCentral *c)
{
MLink **tailp, *v;
byte *p;
MSpan *s;
// 計算所需 span 的大小信息。
npages = runtime·class_to_allocnpages[c->sizeclass];
size = runtime·class_to_size[c->sizeclass];
n = (npages << PageShift) / size;
// 從 heap 獲取 span。
s = runtime·MHeap_Alloc(&runtime·mheap, npages, c->sizeclass, 0, 1);
if(s == nil)
return nil;
// 將 span 所管理的內(nèi)存切分成 freelist/object 鏈表。
tailp = &s->freelist;
p = (byte*)(s->start << PageShift); // 起始地址。PageID(start) = p >> PageShift
s->limit = p + size*n;
for(i=0; i<n; i++) {
v = (MLink*)p;
*tailp = v;
tailp = &v->next;
p += size;
}
*tailp = nil;
// 標(biāo)記。
runtime·markspan((byte*)(s->start<<PageShift), size, n, ...));
return s;
}
前面在 mallocgc 中提及的大對象分配,也是用的 MHeap_Alloc 函數(shù)。
malloc.c
void runtime·largeAlloc_m(void)
{
size = g->m->scalararg[0];
npages = size >> PageShift;
s = runtime·MHeap_Alloc(&runtime·mheap, npages, 0, 1, !(flag & FlagNoZero));
g->m->ptrarg[0] = s;
}
mheap.c
MSpan* runtime·MHeap_Alloc(MHeap *h, uintptr npage, int32 sizeclass, bool large, ...)
{
// 判斷是否在 g0 棧執(zhí)行。
if(g == g->m->g0) {
s = mheap_alloc(h, npage, sizeclass, large);
} else {
...
}
return s;
}
static MSpan* mheap_alloc(MHeap *h, uintptr npage, int32 sizeclass, bool large)
{
MSpan *s;
// 如果垃圾回收操作未結(jié)束,那么嘗試主動收回一些空間,以避免內(nèi)存過度增長。
// we need to sweep and reclaim at least n pages.
if(!h->sweepdone)
MHeap_Reclaim(h, npage);
// 返回可用 span。
s = MHeap_AllocSpanLocked(h, npage);
if(s != nil) {
// 標(biāo)記代齡等狀態(tài)。
runtime·atomicstore(&s->sweepgen, h->sweepgen);
s->state = MSpanInUse;
s->freelist = nil;
s->ref = 0;
s->sizeclass = sizeclass;
s->elemsize = (sizeclass==0
s->npages<<PageShift : runtime·class_to_size[sizeclass]);
// 如果是大對象...
if(large) {
mstats.heap_objects++;
mstats.heap_alloc += npage<<PageShift;
// 根據(jù)頁數(shù),插入到合適的 busy 鏈表。
if(s->npages < nelem(h->free))
runtime·MSpanList_InsertBack(&h->busy[s->npages], s);
else
runtime·MSpanList_InsertBack(&h->busylarge, s);
}
}
return s;
}
從 heap 獲取 span 算法:
mheap.c
static MSpan* MHeap_AllocSpanLocked(MHeap *h, uintptr npage)
{
uintptr n;
MSpan *s, *t;
pageID p;
// 以頁數(shù)為序號,從 heap.free[] 中查找鏈表。
// 如果當(dāng)前鏈表沒有可用 span,則從頁數(shù)更大的鏈表中提取。
for(n=npage; n < nelem(h->free); n++) {
if(!runtime·MSpanList_IsEmpty(&h->free[n])) {
s = h->free[n].next;
goto HaveSpan;
}
}
// 如果 free 所有鏈表都沒找到合適的 span,則嘗試更大的 large 鏈表。
if((s = MHeap_AllocLarge(h, npage)) == nil) {
// 還沒找到,就只能新申請內(nèi)存了。
if(!MHeap_Grow(h, npage))
return nil;
// 重新查找合適的 span。
// 每次向操作系統(tǒng)申請內(nèi)存最少 1MB/128Pages,而 heap.free 最大下標(biāo) 127,
// 因此 FreeSpanLocked 函數(shù)會將其放到 freelarge 鏈表中。
if((s = MHeap_AllocLarge(h, npage)) == nil)
return nil;
}
HaveSpan:
// 將找到的 span 從 free 鏈表中移除。
runtime·MSpanList_Remove(s);
// 如果該 span 曾釋放過物理內(nèi)存,那么重新映射。
if(s->npreleased > 0) {
runtime·SysUsed((void*)(s->start<<PageShift), s->npages<<PageShift);
mstats.heap_released -= s->npreleased<<PageShift;
s->npreleased = 0;
}
// 如果返回的 span 頁數(shù)多于需要 ...
if(s->npages > npage) {
// 新建一個 span 對象 t,用來管理尾部多余內(nèi)存空間。
t = runtime·FixAlloc_Alloc(&h->spanalloc);
runtime·MSpan_Init(t, s->start + npage, s->npages - npage);
// 調(diào)整實際所需的內(nèi)存大小。
s->npages = npage;
p = t->start;
p -= ((uintptr)h->arena_start>>PageShift);
// 在 spans 區(qū)域標(biāo)記 span 指針。
if(p > 0)
h->spans[p-1] = s;
h->spans[p] = t;
h->spans[p+t->npages-1] = t;
// 將切出來的多余 span,重新放回 heap 管理鏈表中。
MHeap_FreeSpanLocked(h, t, false, false);
s->state = MSpanFree;
}
// 在 spans 中標(biāo)記待所有頁對應(yīng)指針。
p = s->start;
p -= ((uintptr)h->arena_start>>PageShift);
for(n=0; n<npage; n++)
h->spans[p+n] = s;
return s;
}
當(dāng)找到的 span 大小超出預(yù)期時,分配器會執(zhí)行切割操作,將多余的內(nèi)存做成新 span 放回 heap 管理鏈表中。
從 large 里查找 span 的算法被稱作 BestFit。很簡單,通過循環(huán)遍歷,找到大小最合適的目標(biāo)。
mheap.c
MHeap_AllocLarge(MHeap *h, uintptr npage)
{
return BestFit(&h->freelarge, npage, nil);
}
static MSpan* BestFit(MSpan *list, uintptr npage, MSpan *best)
{
MSpan *s;
for(s=list->next; s != list; s=s->next) {
if(s->npages < npage)
continue;
if(best == nil
|| s->npages < best->npages
|| (s->npages == best->npages && s->start < best->start))
best = s;
}
return best;
}
接著看看將 span 放回 heap 管理鏈表的 FreeSpanLocked 操作。
mheap.c
static void MHeap_FreeSpanLocked(MHeap *h, MSpan *s, bool acctinuse, bool acctidle)
{
MSpan *t;
pageID p;
// 修正狀態(tài)標(biāo)記。
s->state = MSpanFree;
// 從當(dāng)前鏈表中移除。
runtime·MSpanList_Remove(s);
// 這兩個參數(shù)會影響垃圾回收的物理內(nèi)存釋放操作。
s->unusedsince = runtime·nanotime();
s->npreleased = 0;
// 實際地址。
p = s->start;
p -= (uintptr)h->arena_start >> PageShift;
// 通過 heap.spans 檢查左側(cè)相鄰 span。
// 如果左側(cè)相鄰 span 也是空閑狀態(tài),則合并。
if(p > 0 && (t = h->spans[p-1]) != nil && t->state != MSpanInUse &&
t->state != MSpanStack) {
// 修正屬性。
s->start = t->start;
s->npages += t->npages;
s->npreleased = t->npreleased; // absorb released pages
s->needzero |= t->needzero;
// 新起始地址。
p -= t->npages;
// 重新標(biāo)記 spans。
h->spans[p] = s;
// 釋放左側(cè) span 原對象。
runtime·MSpanList_Remove(t);
t->state = MSpanDead;
runtime·FixAlloc_Free(&h->spanalloc, t);
}
// 嘗試合并右側(cè) span。
if((p+s->npages)*sizeof(h->spans[0]) < h->spans_mapped &&
(t = h->spans[p+s->npages]) != nil &&
t->state != MSpanInUse && t->state != MSpanStack) {
s->npages += t->npages;
s->npreleased += t->npreleased;
s->needzero |= t->needzero;
h->spans[p + s->npages - 1] = s;
runtime·MSpanList_Remove(t);
t->state = MSpanDead;
runtime·FixAlloc_Free(&h->spanalloc, t);
}
// 根據(jù) span 頁數(shù),插入到合適的鏈表中。
if(s->npages < nelem(h->free))
runtime·MSpanList_Insert(&h->free[s->npages], s);
else
runtime·MSpanList_Insert(&h->freelarge, s);
}
在此,我們看到了 heap.spans 的作用。合并零散內(nèi)存塊,以提供更大復(fù)用空間,這有助于減少內(nèi)存碎片,是內(nèi)存管理算法的一個重要設(shè)計目標(biāo)。
最后,就是剩下如何向操作系統(tǒng)申請新的內(nèi)存了。
malloc.h
HeapAllocChunk = 1<<20," " // Chunk size for heap growth
mheap.c
static bool MHeap_Grow(MHeap *h, uintptr npage)
{
// 每次申請的內(nèi)存總是 64KB 的倍數(shù),最小 1MB。
npage = ROUND(npage, (64<<10)/PageSize);
ask = npage<<PageShift;
if(ask < HeapAllocChunk)
ask = HeapAllocChunk;
// 申請內(nèi)存。
v = runtime·MHeap_SysAlloc(h, ask);
// 創(chuàng)建新的 span 對象進(jìn)行管理。
s = runtime·FixAlloc_Alloc(&h->spanalloc);
runtime·MSpan_Init(s, (uintptr)v>>PageShift, ask>>PageShift);
p = s->start;
p -= ((uintptr)h->arena_start>>PageShift);
// 在 heap.spans 中標(biāo)記地址。
h->spans[p] = s;
h->spans[p + s->npages - 1] = s;
// 設(shè)置狀態(tài)。
runtime·atomicstore(&s->sweepgen, h->sweepgen);
s->state = MSpanInUse;
// 放回 heap 的管理鏈表,嘗試執(zhí)行合并操作。
MHeap_FreeSpanLocked(h, s, false, true);
return true;
}
申請時,需判斷目標(biāo)地址是否在 arena 范圍內(nèi),且必須從 arena_used 開始。
malloc.c
void* runtime·MHeap_SysAlloc(MHeap *h, uintptr n)
{
// 在 arena 范圍內(nèi)。
if(n <= h->arena_end - h->arena_used) {
// 使用 arena_used 地址。
p = h->arena_used;
runtime·SysMap(p, n, h->arena_reserved, &mstats.heap_sys);
// 調(diào)整下一次分配位置。
h->arena_used += n;
// 同步增加 spans、bitmap 管理內(nèi)存。
runtime·MHeap_MapBits(h);
runtime·MHeap_MapSpans(h);
return p;
}
...
}
mem_linux.c
void runtime·SysMap(void *v, uintptr n, bool reserved, uint64 *stat)
{
p = runtime·mmap(v, n, PROT_READ|PROT_WRITE, MAP_ANON|MAP_FIXED|MAP_PRIVATE, -1, 0);
}
mem_darwin.c
void runtime·SysMap(void *v, uintptr n, bool reserved, uint64 *stat)
{
p = runtime·mmap(v, n, PROT_READ|PROT_WRITE, MAP_ANON|MAP_FIXED|MAP_PRIVATE, -1, 0);
}
至此,對象內(nèi)存分配和內(nèi)存擴展的步驟結(jié)束。
垃圾回收器通過調(diào)用 MSpan_Sweep 函數(shù)完成內(nèi)存回收操作。
mgc0.c
bool runtime·MSpan_Sweep(MSpan *s, bool preserve)
{
// 當(dāng)前垃圾回收代齡。
sweepgen = runtime·mheap.sweepgen;
arena_start = runtime·mheap.arena_start;
// 獲取 span 相關(guān)信息。
cl = s->sizeclass;
size = s->elemsize;
if(cl == 0) {
// 大對象。
n = 1;
} else {
// 小對象。
npages = runtime·class_to_allocnpages[cl];
n = (npages << PageShift) / size;
}
res = false;
nfree = 0;
end = &head;
c = g->m->mcache;
sweepgenset = false;
// 標(biāo)記 freelist 里的 object,這些對象未被使用,無需再次檢查。
for(link = s->freelist; link != nil; link = link->next) {
off = (uintptr*)link - (uintptr*)arena_start;
bitp = arena_start - off/wordsPerBitmapByte - 1;
shift = (off % wordsPerBitmapByte) * gcBits;
*bitp |= bitMarked<<shift;
}
// 釋放 finalizer、profiler 關(guān)聯(lián)對象。
specialp = &s->specials;
special = *specialp;
while(special != nil) {
// ...
}
// 計算標(biāo)記位開始位置。
p = (byte*)(s->start << PageShift);
off = (uintptr*)p - (uintptr*)arena_start;
bitp = arena_start - off/wordsPerBitmapByte - 1;
shift = 0;
step = size/(PtrSize*wordsPerBitmapByte);
bitp += step;
if(step == 0) {
// 8-byte objects.
bitp++;
shift = gcBits;
}
// 遍歷該 span 所有 object。
for(; n > 0; n--, p += size) {
// 獲取標(biāo)記位。
bitp -= step;
if(step == 0) {
if(shift != 0)
bitp--;
shift = gcBits - shift;
}
xbits = *bitp;
bits = (xbits>>shift) & bitMask;
// 如果 object 對象標(biāo)記為可達(dá) (Marked),則跳過。
// 包括 freelist 里的未使用對象。
if((bits&bitMarked) != 0) {
*bitp &= ~(bitMarked<<shift);
continue;
}
// 重置標(biāo)記位。
*bitp = (xbits & ~((bitMarked|(BitsMask<<2))<<shift)) |
((uintptr)BitsDead<<(shift+2));
if(cl == 0) { // 大對象。
// 清除全部標(biāo)記位。
runtime·unmarkspan(p, s->npages<<PageShift);
// 重置代齡。
runtime·atomicstore(&s->sweepgen, sweepgen);
sweepgenset = true;
if(runtime·debug.efence) {
// ...
} else
// 將大對象所使用的 span 歸還給 heap。
runtime·MHeap_Free(&runtime·mheap, s, 1);
// 調(diào)整 next_gc 閾值。
runtime·xadd64(&mstats.next_gc,
-(uint64)(size * (runtime·gcpercent + 100)/100));
res = true;
} else { // 小對象。
// 將可回收對象添加到一個鏈表中。
end->next = (MLink*)p;
end = (MLink*)p;
nfree++;
}
}
// 如可回收小對象數(shù)量大于0。
if(nfree > 0) {
// 調(diào)整 next_gc 閾值。
runtime·xadd64(&mstats.next_gc,
-(uint64)(nfree * size * (runtime·gcpercent + 100)/100));
// 釋放收集的 object 鏈表。
res = runtime·MCentral_FreeSpan(&runtime·mheap.central[cl].mcentral, s, nfree,head.next, end, preserve);
}
return res;
}
該回收函數(shù)在分配流程 CacheSpan 中也曾提及過。
大對象釋放很簡單,調(diào)用 FreeSpanLocked 將 span 重新放回 heap 管理鏈表即可。
mheap.c
void runtime·MHeap_Free(MHeap *h, MSpan *s, int32 acct)
{
mheap_free(h, s, acct);
}
static void mheap_free(MHeap *h, MSpan *s, int32 acct)
{
MHeap_FreeSpanLocked(h, s, true, true);
}
至于收集的所有小對象,會被追加到 span.freelist 鏈表。如該 span 收回全部 object,則也將其歸還給 heap。
mcentral.c
bool runtime·MCentral_FreeSpan(MCentral *c, MSpan *s, int32 n, MLink *start, ...)
{
// span 不能是 cache 正在使用的對象。
if(s->incache)
runtime·throw("freespan into cached span");
// 將收集的 object 鏈表追加到 span.freelist。
wasempty = s->freelist == nil;
end->next = s->freelist;
s->freelist = start;
s->ref -= n;
// 將 span 轉(zhuǎn)移到 central.nonempty 鏈表。
if(wasempty) {
runtime·MSpanList_Remove(s);
runtime·MSpanList_Insert(&c->nonempty, s);
}
// 重置回收代齡。
runtime·atomicstore(&s->sweepgen, runtime·mheap.sweepgen);
if(s->ref != 0) {
return false;
}
// 如果 span 收回全部 object (span.ref == 0),從 central 管理鏈表移除。
runtime·MSpanList_Remove(s);
s->needzero = 1;
s->freelist = nil;
// 清除標(biāo)記位。
runtime·unmarkspan((byte*)(s->start<<PageShift), s->npages<<PageShift);
// 將 span 交還給 heap。
runtime·MHeap_Free(&runtime·mheap, s, 0);
return true;
}
釋放操作最終結(jié)果,僅僅是將可回收對象歸還給 span.freelist 或 heap.free 鏈表,以便后續(xù)分配操作復(fù)用。至于物理內(nèi)存釋放,則由垃圾回收器的特殊定時操作完成。
除了用戶內(nèi)存,分配器還需額外的 span、cache 等對象來維持系統(tǒng)運轉(zhuǎn)。這些管理對象所需內(nèi)存不從 arena 區(qū)域分配,不占用與 GC Heap 分配算法有關(guān)的內(nèi)存地址。
系統(tǒng)為每種管理對象初始化一個固定分配器 FixAlloc。
malloc.h
struct FixAlloc
{
uintptr size; // 固定分配長度。
void (*first)(void *arg, byte *p); // 關(guān)聯(lián)函數(shù)。
void* arg; // first 函數(shù)調(diào)用參數(shù)。
MLink* list; // 可復(fù)用空間鏈表。
byte* chunk; // 后備內(nèi)存塊當(dāng)前分配指針。
uint32 nchunk; // 后備內(nèi)存塊可用長度。
uintptr inuse; // 后備內(nèi)存塊已使用長度。
};
mheap.c
void runtime·MHeap_Init(MHeap *h)
{
runtime·FixAlloc_Init(&h->spanalloc, sizeof(MSpan), RecordSpan, ...);
runtime·FixAlloc_Init(&h->cachealloc, sizeof(MCache), nil, ...);
runtime·FixAlloc_Init(&h->specialfinalizeralloc, sizeof(SpecialFinalizer), ...);
runtime·FixAlloc_Init(&h->specialprofilealloc, sizeof(SpecialProfile), ...);
}
FixAlloc 初始化過程很簡單。
mfixalloc.c
void runtime·FixAlloc_Init(FixAlloc *f, uintptr size,
void (*first)(void*, byte*), void *arg, uint64 *stat)
{
f->size = size;
f->first = first;
f->arg = arg;
f->list = nil;
f->chunk = nil;
f->nchunk = 0;
f->inuse = 0;
f->stat = stat;
}
分配算法和 cache 類似。首先從復(fù)用鏈表提取,如果沒找到,就從后備內(nèi)存塊截取。
malloc.h
FixAllocChunk = 16<<10," " // Chunk size for FixAlloc
mfixalloc.c
void* runtime·FixAlloc_Alloc(FixAlloc *f)
{
void *v;
// 如果空閑鏈表不為空,直接從鏈表提取。
if(f->list) {
v = f->list;
f->list = *(void**)f->list;
f->inuse += f->size;
return v;
}
// 如果后備內(nèi)存塊空間不足...
if(f->nchunk < f->size) {
// 重新申請 16KB 后備內(nèi)存。
f->chunk = runtime·persistentalloc(FixAllocChunk, 0, f->stat);
f->nchunk = FixAllocChunk;
}
// 從后備內(nèi)存塊截取。
v = f->chunk;
// 執(zhí)行 first 函數(shù)。
if(f->first)
f->first(f->arg, v);
// 調(diào)整剩余后備塊參數(shù)。
f->chunk += f->size;
f->nchunk -= f->size;
f->inuse += f->size;
return v;
}
后備內(nèi)存塊策略有點類似 heap span,申請大塊內(nèi)存以減少系統(tǒng)調(diào)用開銷。實際上,不同類別的 FixAlloc 會共享一個超大塊內(nèi)存,稱之為 persistent。
malloc.go
var persistent struct { // 全局變量,為全部 FixAlloc 提供后備內(nèi)存塊。
lock mutex
pos unsafe.Pointer
end unsafe.Pointer
}
func persistentalloc(size, align uintptr, stat *uint64) unsafe.Pointer {
const (
chunk = 256 << 10
maxBlock = 64 << 10 // VM reservation granularity is 64K on windows
)
// 如果需要 64KB 以上,直接從 mmap 返回。
if size >= maxBlock {
return sysAlloc(size, stat)
}
// 對齊分配地址。
persistent.pos = roundup(persistent.pos, align)
// 如果剩余空間不足 ...
if uintptr(persistent.pos)+size > uintptr(persistent.end) {
// 重新從 mmap 申請 256KB 內(nèi)存,保存到 persistent。
persistent.pos = sysAlloc(chunk, &memstats.other_sys)
persistent.end = add(persistent.pos, chunk)
}
// 截取內(nèi)存,調(diào)整下次分配地址。
p := persistent.pos
persistent.pos = add(persistent.pos, size)
return p
}
mem_linux.c
void* runtime·sysAlloc(uintptr n, uint64 *stat)
{
p = runtime·mmap(nil, n, PROT_READ|PROT_WRITE, MAP_ANON|MAP_PRIVATE, -1, 0);
return p;
}
釋放操作僅僅是將對象收回到復(fù)用鏈表。
mfixalloc.c
void runtime·FixAlloc_Free(FixAlloc *f, void *p)
{
f->inuse -= f->size;
*(void**)p = f->list;
f->list = p;
}
另外,在 FixAlloc 初始化時,還可額外提供一個 first 函數(shù)作為參數(shù),比如 spanalloc 中的 RecordSpan。
該函數(shù)為 heap.allspans 分配內(nèi)存,其內(nèi)存儲了所有 span 指針,GC Sweep 和 Heap Dump 操作都會用到這些信息。
mheap.c
static void RecordSpan(void *vh, byte *p)
{
MHeap *h;
MSpan *s;
MSpan **all;
uint32 cap;
h = vh;
s = (MSpan*)p;
// 如果空間不足 ...
if(h->nspan >= h->nspancap) {
// 計算新容量。
cap = 64*1024/sizeof(all[0]);
if(cap < h->nspancap*3/2)
cap = h->nspancap*3/2;
// 分配新空間。
all = (MSpan**)runtime·sysAlloc(cap*sizeof(all[0]), &mstats.other_sys);
if(h->allspans) {
// 將數(shù)據(jù)拷貝到新分配空間。
runtime·memmove(all, h->allspans, h->nspancap*sizeof(all[0]));
// 釋放原內(nèi)存。
if(h->allspans != runtime·mheap.gcspans)
runtime·SysFree(h->allspans, h->nspancap*sizeof(all[0]),
&mstats.other_sys);
}
// 指向新內(nèi)存空間。
h->allspans = all;
h->nspancap = cap;
}
// 存儲 span 指針。
h->allspans[h->nspan++] = s;
}
精確垃圾回收,很經(jīng)典的 Mark-and-Sweep 算法。
當(dāng)分配 (malloc) 總量超出預(yù)設(shè)閾值,就會引發(fā)垃圾回收。操作前,須暫停用戶邏輯執(zhí)行(StopTheWorld),然后啟用多個線程執(zhí)行并行掃描工作,直到標(biāo)記出所有可回收對象。
從 Go 1.3 開始,默認(rèn)采用并發(fā)內(nèi)存清理模式。也就是說,標(biāo)記結(jié)束后,立即恢復(fù)邏輯執(zhí)行 (StartTheWorld)。用一個專門的 goroutine 在后臺清理內(nèi)存。這縮短了暫停時間,在一定程度上改善了垃圾回收所引發(fā)的問題。
完成清理后,新閾值通常是存活對象所用內(nèi)存的 2 倍。需要注意的是,清理操作只是調(diào)用內(nèi)存分配器的相關(guān)方法,收回不可達(dá)對象內(nèi)存進(jìn)行復(fù)用,并未釋放物理內(nèi)存。
物理內(nèi)存釋放由專門線程定期執(zhí)行。它檢查最后一次垃圾回收時間,如超過 2 分鐘,則執(zhí)行強制回收。還會讓操作系統(tǒng)收回閑置超過 5 分鐘的 span 物理內(nèi)存。
初始化函數(shù)創(chuàng)建并行標(biāo)記狀態(tài)對象 markfor,讀取 GOGC 環(huán)境變量值。
proc.c
void runtime·schedinit(void)
{
runtime·gcinit();
}
mgc0.c
void runtime·gcinit(void)
{
runtime·work.markfor = runtime·parforalloc(MaxGcproc);
runtime·gcpercent = runtime·readgogc();
}
int32 runtime·readgogc(void)
{
byte *p;
p = runtime·getenv("GOGC");
// 默認(rèn)值 100。
if(p == nil || p[0] == '\0')
return 100;
// 關(guān)閉垃圾回收。
if(runtime·strcmp(p, (byte*)"off") == 0)
return -1;
return runtime·atoi(p);
}
在內(nèi)存分配器中提到過,函數(shù) mallocgc 會檢查已分配內(nèi)存是否超過閾值,并以此觸發(fā)垃圾回收操作。
malloc.go
func mallocgc(size uintptr, typ *_type, flags uint32) unsafe.Pointer {
if memstats.heap_alloc >= memstats.next_gc {
gogc(0)
}
}
啟動垃圾回收有三種不同方式。
malloc.go
func gogc(force int32) {
// 如果 GOGC < 0,禁用垃圾回收,直接返回。
if gp := getg(); gp == mp.g0 || mp.locks > 1 || !memstats.enablegc ||
panicking != 0 || gcpercent < 0 {
return
}
semacquire(&worldsema, false)
// 普通回收,會再次檢查是否達(dá)到回收閾值。
if force == 0 && memstats.heap_alloc < memstats.next_gc {
semrelease(&worldsema)
return
}
// 準(zhǔn)備回收 ...
startTime := nanotime()
mp = acquirem()
mp.gcing = 1
// 停止用戶邏輯執(zhí)行。
onM(stoptheworld)
// 清理 sync.Pool 的相關(guān)緩存對象,這個后面有專門的剖析章節(jié)。
clearpools()
// 如果設(shè)置環(huán)境變量 GODEBUG=gctrace=2,那么會引發(fā)兩次回收操作。
n := 1
if debug.gctrace > 1 {
n = 2
}
for i := 0; i < n; i++ {
if i > 0 {
startTime = nanotime()
}
// 將 64-bit 開始時間保存到 scalararg 。
mp.scalararg[0] = uintptr(uint32(startTime)) // low 32 bits
mp.scalararg[1] = uintptr(startTime >> 32) // high 32 bits
// 清理行為標(biāo)記。
if force >= 2 {
mp.scalararg[2] = 1 // eagersweep
} else {
mp.scalararg[2] = 0
}
// 在 g0 棧執(zhí)行垃圾回收操作。
onM(gc_m)
}
// 回收結(jié)束。
mp.gcing = 0
semrelease(&worldsema)
// 恢復(fù)用戶邏輯執(zhí)行。
onM(starttheworld)
}
總體邏輯倒不復(fù)雜,StopTheWorld -> GC -> StartTheWorld。暫時拋開周邊細(xì)節(jié),看看垃圾回收流程。
mgc0.c
void runtime·gc_m(void)
{
a.start_time = (uint64)(g->m->scalararg[0]) | ((uint64)(g->m->scalararg[1]) << 32);
a.eagersweep = g->m->scalararg[2];
gc(&a);
}
static void gc(struct gc_args *args)
{
// 如果前次回收的清理操作未完成,那么先把這事結(jié)束了。
while(runtime·sweepone() != -1)
runtime·sweep.npausesweep++;
// 為回收操作準(zhǔn)備相關(guān)環(huán)境狀態(tài)。
runtime·mheap.gcspans = runtime·mheap.allspans;
runtime·work.spans = runtime·mheap.allspans;
runtime·work.nspan = runtime·mheap.nspan;
runtime·work.nwait = 0;
runtime·work.ndone = 0;
runtime·work.nproc = runtime·gcprocs();
// 初始化并行標(biāo)記狀態(tài)對象 markfor。
// 使用 nproc 個線程執(zhí)行并行標(biāo)記任務(wù)。
// 任務(wù)總數(shù) = 固定內(nèi)存段(RootCount) + 當(dāng)前 goroutine G 的數(shù)量。
// 標(biāo)記函數(shù) markroot。
runtime·parforsetup(runtime·work.markfor, runtime·work.nproc,
RootCount + runtime·allglen, nil, false, markroot);
if(runtime·work.nproc > 1) {
// 重置結(jié)束標(biāo)記。
runtime·noteclear(&runtime·work.alldone);
// 喚醒 nproc - 1 個線程準(zhǔn)備執(zhí)行 markroot 函數(shù),因為當(dāng)前線程也會參與標(biāo)記工作。
runtime·helpgc(runtime·work.nproc);
}
// 讓當(dāng)前線程也開始執(zhí)行標(biāo)記任務(wù)。
gchelperstart();
runtime·parfordo(runtime·work.markfor);
scanblock(nil, 0, nil);
if(runtime·work.nproc > 1)
// 休眠,等待標(biāo)記全部結(jié)束。
runtime·notesleep(&runtime·work.alldone);
// 收縮 stack 內(nèi)存。
runtime·shrinkfinish();
// 更新所有 cache 統(tǒng)計參數(shù)。
cachestats();
// 計算上一次回收后 heap_alloc 大小。
// 當(dāng)前 next_gc = heap0 + heap0 * (gcpercent/100)
// 那么 heap0 = next_gc / (1 + gcpercent/100)
heap0 = mstats.next_gc*100/(runtime·gcpercent+100);
// 計算下一次 next_gc 閾值。
// 這個值只是預(yù)估,會隨著清理操作而改變。
mstats.next_gc = mstats.heap_alloc+mstats.heap_alloc*runtime·gcpercent/100;
runtime·atomicstore64(&mstats.last_gc, runtime·unixnanotime());
// 目標(biāo)是 heap.allspans 里的所有 span 對象。
runtime·mheap.gcspans = runtime·mheap.allspans;
// GC 使用遞增的代齡來表示 span 當(dāng)前回收狀態(tài)。
runtime·mheap.sweepgen += 2;
runtime·mheap.sweepdone = false;
runtime·work.spans = runtime·mheap.allspans;
runtime·work.nspan = runtime·mheap.nspan;
runtime·sweep.spanidx = 0;
if(ConcurrentSweep && !args->eagersweep) { // 并發(fā)清理
// 新建或喚醒用于清理操作的 goroutine。
if(runtime·sweep.g == nil)
runtime·sweep.g = runtime·newproc1(&bgsweepv, nil, 0, 0, gc);
else if(runtime·sweep.parked) {
runtime·sweep.parked = false;
runtime·ready(runtime·sweep.g); // 喚醒
}
} else { // 串行回收
// 立即執(zhí)行清理操作。
while(runtime·sweepone() != -1)
runtime·sweep.npausesweep++;
}
}
算法的核心是并行回收和是否啟用一個 goroutine 來執(zhí)行清理操作。這個 goroutine 在清理操作結(jié)束后被凍結(jié),再次使用前必須喚醒。
如果用專門的 goroutine 執(zhí)行清理操作,那么 gc 函數(shù)不等清理操作結(jié)束就立即返回,上級的 gogc 會立即調(diào)用 StartTheWorld 恢復(fù)用戶邏輯執(zhí)行,這就是并發(fā)回收的關(guān)鍵。
我們回過頭,看看一些中間環(huán)節(jié)的實現(xiàn)細(xì)節(jié)。
在設(shè)置并行回收狀態(tài)對象 markfor 里提到過兩個參數(shù):任務(wù)總數(shù)和標(biāo)記函數(shù)。
mgc0.c
enum {
RootCount = 5
}
任務(wù)總數(shù)其實是 5 個根內(nèi)存段 RootData、RootBBS、RootFinalizers、RootSpans、RootFlushCaches,外加所有 goroutine stack 的總和。
mgc0.c
static void markroot(ParFor *desc, uint32 i)
{
switch(i) {
case RootData:
...
break;
case RootBss:
...
break;
case RootFinalizers:
...
break;
case RootSpans:
...
break;
case RootFlushCaches:
flushallmcaches(); // 清理 cache、stack。
break;
default:
gp = runtime·allg[i - RootCount];
runtime·shrinkstack(gp); // 收縮 stack。
scanstack(gp);
...
break;
}
}
核心算法 scanblock 函數(shù)通過掃描內(nèi)存塊,找出存活對象和可回收對象,并在 bitmap 區(qū)域進(jìn)行標(biāo)記。具體實現(xiàn)細(xì)節(jié),本文不做詳述,有興趣可自行閱讀源碼或相關(guān)論文。
那么 parfor 是如何實現(xiàn)并行回收的呢?
這里面有個很大誤導(dǎo)。其實 parfor 實現(xiàn)非常簡單,僅是一個狀態(tài)對象,核心是將要執(zhí)行的多個任務(wù)序號平均分配個多個線程。
parfor.c
struct ParForThread
{
// the thread's iteration space [32lsb, 32msb)
uint64 pos;
};
void runtime·parforsetup(ParFor *desc, uint32 nthr, uint32 n, void *ctx, bool wait,
void (*body)(ParFor*, uint32))
{
uint32 i, begin, end;
uint64 *pos;
desc->body = body; // 任務(wù)函數(shù)
desc->nthr = nthr; // 并發(fā)線程數(shù)量
desc->thrseq = 0;
desc->cnt = n; // 任務(wù)總數(shù)
// 為線程平均分配任務(wù)編號。
// 比如 10 個任務(wù)分配給 5 個線程,那么 thr[0] 就是 [0,2),也就是 0 和 1 這兩個任務(wù)。
// 起始和結(jié)束編號分別保存在 ParForThread.pos 字段的高低位。
for(i=0; i<nthr; i++) {
begin = (uint64)n*i / nthr;
end = (uint64)n*(i+1) / nthr;
pos = &desc->thr[i].pos;
*pos = (uint64)begin | (((uint64)end)<<32);
}
}
現(xiàn)在任務(wù)被平均分配,并保存到全局變量 markfor 里。接下來的操作,其實是由被喚醒的線程主動完成,如同當(dāng)前 GC 主線程主動調(diào)用 parfordo 一樣。
執(zhí)行標(biāo)記任務(wù)的多個線程由 helpgc 函數(shù)喚醒,其中的關(guān)鍵就是設(shè)置 M.helpgc 標(biāo)記。
proc.c
void runtime·helpgc(int32 nproc)
{
pos = 0;
// 從 1 開始,因為當(dāng)前線程也會參與標(biāo)記任務(wù)。
for(n = 1; n < nproc; n++) {
// 檢查 P 是否被當(dāng)前線程使用,如果是就跳過。
if(runtime·allp[pos]->mcache == g->m->mcache)
pos++;
// 獲取空閑線程。
mp = mget();
// 這是關(guān)鍵,線程喚醒后會檢查該標(biāo)記。
mp->helpgc = n;
// 為線程分配用戶執(zhí)行的 P.cache。
mp->mcache = runtime·allp[pos]->mcache;
pos++;
// 喚醒線程。
runtime·notewakeup(&mp->park);
}
}
如果你熟悉線程 M 的工作方式,那么就會知道它通過 stopm 完成休眠操作。
proc.c
static void stopm(void)
{
// 放回空閑隊列。
mput(g->m);
// 休眠,直到被喚醒。
runtime·notesleep(&g->m->park);
// 被喚醒后,清除休眠標(biāo)記。
runtime·noteclear(&g->m->park);
// 檢查 helpgc 標(biāo)記,執(zhí)行 gchelper 函數(shù)。
if(g->m->helpgc) {
runtime·gchelper();
g->m->helpgc = 0;
g->m->mcache = nil;
goto retry;
}
}
mgc0.c
void runtime·gchelper(void)
{
gchelperstart();
runtime·parfordo(runtime·work.markfor);
scanblock(nil, 0, nil);
// 檢查標(biāo)記是否全部結(jié)束。
nproc = runtime·work.nproc;
if(runtime·xadd(&runtime·work.ndone, +1) == nproc-1)
// 喚醒 GC 主線程。
runtime·notewakeup(&runtime·work.alldone);
g->m->traceback = 0;
}
最終和 GC 主線程調(diào)用過程一致。當(dāng) alldone 被喚醒后,GC 主線程恢復(fù)后續(xù)步驟執(zhí)行。
至于被線程調(diào)用的 parfordo,其實也很簡單。
parfor.c
void runtime·parfordo(ParFor *desc)
{
// 每次調(diào)用,都會遞增 thrseq 值。
tid = runtime·xadd(&desc->thrseq, 1) - 1;
// 如果任務(wù)線程數(shù)量為 1,那么沒什么好說的,直接循環(huán)執(zhí)行 body,也就是 markroot。
if(desc->nthr==1) {
for(i=0; i<desc->cnt; i++)
desc->body(desc, i);
return;
}
body = desc->body;
// 用 tid 作為當(dāng)前線程的編號,以此提取任務(wù)范圍值。
me = &desc->thr[tid];
mypos = &me->pos;
for(;;) {
// 先完成自己的任務(wù)。
for(;;) {
// 遞增當(dāng)前任務(wù)范圍的開始編號。
pos = runtime·xadd64(mypos, 1);
// 注意:只有低32位被修改,高32位結(jié)束編號不變。
begin = (uint32)pos-1;
end = (uint32)(pos>>32);
// 如果小于結(jié)束編號,循環(huán)。
if(begin < end) {
// 執(zhí)行 markroot 標(biāo)記函數(shù)。
body(desc, begin);
continue;
}
break;
}
// 嘗試從其他線程偷點任務(wù)過來,以便盡快完成所有標(biāo)記操作。
idle = false;
for(try=0;; try++) {
// 如果長時間沒有偷到任務(wù),設(shè)置結(jié)束標(biāo)記。
// increment the done counter...
if(try > desc->nthr*4 && !idle) {
idle = true;
runtime·xadd(&desc->done, 1);
}
// 如果所有線程都結(jié)束,那么退出。
if(desc->done + !idle == desc->nthr) {
if(!idle)
runtime·xadd(&desc->done, 1);
goto exit;
}
// 隨機選擇一個線程任務(wù)。
victim = runtime·fastrand1() % (desc->nthr-1);
if(victim >= tid)
victim++;
victimpos = &desc->thr[victim].pos;
for(;;) {
// 偷取任務(wù)。
pos = runtime·atomicload64(victimpos);
begin = (uint32)pos;
end = (uint32)(pos>>32);
if(begin+1 >= end) {
begin = end = 0;
break;
}
if(idle) {
runtime·xadd(&desc->done, -1);
idle = false;
}
begin2 = begin + (end-begin)/2;
newpos = (uint64)begin | (uint64)begin2<<32;
if(runtime·cas64(victimpos, pos, newpos)) {
begin = begin2;
break;
}
}
// 成功偷到任務(wù)...
if(begin < end) {
// 添加到自己的任務(wù)列表中。
runtime·atomicstore64(mypos, (uint64)begin | (uint64)end<<32);
// 返回外層循環(huán),上面的任務(wù)處理代碼再次被激活。
break;
}
// ...
}
}
exit:
// ...
}
每個線程調(diào)用 parfordo 的時候,都拿到一個遞增的唯一 thrseq 編號,并以此獲得事先由 parforsetup 分配好的任務(wù)段。接下來,自然是該線程循環(huán)執(zhí)行分配給自己的所有任務(wù),任務(wù)編號被傳遞給 markroot 作為選擇目標(biāo)的判斷條件。
在完成自己的任務(wù)后,嘗試分擔(dān)其他線程任務(wù),以盡快完成全部任務(wù)。這種 steal 算法,在運行時的很多地方都有體現(xiàn),算是并行開發(fā)的一個 “標(biāo)準(zhǔn)” 做法了。
至此,并行標(biāo)記的所有秘密被揭開,我們繼續(xù)探究清理操作過程。
不管是并發(fā)還是串行清理,最終都是調(diào)用 sweepone 函數(shù)。
mgc0.c
static FuncVal bgsweepv = {runtime·bgsweep};
mgc0.go
func bgsweep() {
for {
for gosweepone() != ^uintptr(0) {
sweep.nbgsweep++
Gosched()
}
if !gosweepdone() {
continue
}
// 設(shè)置休眠標(biāo)志。
sweep.parked = true
// 休眠當(dāng)前清理 goroutine。
goparkunlock(&gclock, "GC sweep wait")
}
}
mgc0.c
uintptr runtime·gosweepone(void)
{
void (*fn)(void);
fn = sweepone_m;
runtime·onM(&fn);
return g->m->scalararg[0];
}
static void sweepone_m(void)
{
g->m->scalararg[0] = runtime·sweepone();
}
清理函數(shù)實現(xiàn)很簡潔,每次找到一個待清理的 span,然后調(diào)用 span_sweep 收回對應(yīng)的內(nèi)存,這在內(nèi)存分配器的釋放過程中已經(jīng)說得很清楚了。
mgc0.c
uintptr runtime·sweepone(void)
{
// 當(dāng)前代齡,清理前 += 2。
sg = runtime·mheap.sweepgen;
// 循環(huán)掃描所有 spans。
for(;;) {
idx = runtime·xadd(&runtime·sweep.spanidx, 1) - 1;
// 結(jié)束判斷。
if(idx >= runtime·work.nspan) {
runtime·mheap.sweepdone = true;
return -1;
}
// 獲取 span。
s = runtime·work.spans[idx];
// 如果不是正在使用的 span,無需清理。
if(s->state != MSpanInUse) {
s->sweepgen = sg;
continue;
}
// 如果不是待清理 span,跳過。
if(s->sweepgen != sg-2 || !runtime·cas(&s->sweepgen, sg-2, sg-1))
continue;
npages = s->npages;
// 清理。
if(!runtime·MSpan_Sweep(s, false))
npages = 0;
return npages;
}
}
最后剩下的,就是 StopTheWorld 和 StartTheWorld 如何停止和恢復(fù)用戶邏輯執(zhí)行。因這會涉及一些 Goroutine Scheduler 知識,您可以暫時跳過,等看完后面的相關(guān)章節(jié)再回頭研究。
proc.c
void runtime·stoptheworld(void)
{
runtime·lock(&runtime·sched.lock);
// 計數(shù)器。
runtime·sched.stopwait = runtime·gomaxprocs;
// 設(shè)置關(guān)鍵停止標(biāo)記。
runtime·atomicstore((uint32*)&runtime·sched.gcwaiting, 1);
// 在所有運行的 goroutine 上設(shè)置搶占標(biāo)志。
preemptall();
// 設(shè)置當(dāng)前 P 的狀態(tài)。
g->m->p->status = Pgcstop; // Pgcstop is only diagnostic.
runtime·sched.stopwait--;
// 設(shè)置所有處理系統(tǒng)調(diào)用 P 的狀態(tài)。
for(i = 0; i < runtime·gomaxprocs; i++) {
p = runtime·allp[i];
s = p->status;
if(s == Psyscall && runtime·cas(&p->status, s, Pgcstop))
runtime·sched.stopwait--;
}
// 設(shè)置所有空閑 P 狀態(tài)。
while(p = pidleget()) {
p->status = Pgcstop;
runtime·sched.stopwait--;
}
wait = runtime·sched.stopwait > 0;
runtime·unlock(&runtime·sched.lock);
// 等待所有 P 停止。
if(wait) {
for(;;) {
// 等待 100us,直到休眠標(biāo)記被喚醒。
if(runtime·notetsleep(&runtime·sched.stopnote, 100*1000)) {
// 清除休眠標(biāo)記。
runtime·noteclear(&runtime·sched.stopnote);
break;
}
// 再次發(fā)出搶占標(biāo)記。
preemptall();
}
}
}
從代碼上來看,StopTheWorld 只是設(shè)置了一些標(biāo)記,包括搶占行為也不過是在在運行的 goroutine 上設(shè)置搶占標(biāo)記。具體這些標(biāo)記是如何讓正在運行的 goroutine 暫停的呢?
如果了解 goroutine 運行機制,必然知道它總是循環(huán)執(zhí)行 schedule 函數(shù),在這個函數(shù)頭部會檢查 gcwaiting 標(biāo)記,并以此停止當(dāng)前任務(wù)執(zhí)行。
proc.c
static void schedule(void)
{
// 檢查 gcwaiting 標(biāo)記,停止當(dāng)前任務(wù)執(zhí)行。
if(runtime·sched.gcwaiting) {
gcstopm();
}
...
}
static void gcstopm(void)
{
// 釋放關(guān)聯(lián)的 P。
p = releasep();
runtime·lock(&runtime·sched.lock);
p->status = Pgcstop;
// 遞減計數(shù)器,直到喚醒.
if(--runtime·sched.stopwait == 0)
runtime·notewakeup(&runtime·sched.stopnote);
runtime·unlock(&runtime·sched.lock);
stopm();
}
這樣一來,所有正在執(zhí)行的 goroutine 會被放回隊列,相關(guān)任務(wù)線程也被休眠。至于發(fā)出搶占標(biāo)記,是為了讓一直處于忙碌狀態(tài)的 goroutine 有機會檢查停止標(biāo)記。
反過來,StartTheWorld 就是恢復(fù)這些被停止的任務(wù),并喚醒線程繼續(xù)執(zhí)行。
proc.c
void runtime·starttheworld(void)
{
...
// 重置標(biāo)記。
runtime·sched.gcwaiting = 0;
p1 = nil;
// 循環(huán)所有 P。
while(p = pidleget()) {
// 如果該 P 沒有任務(wù),那么放回空閑隊列。
// 因為沒有任務(wù)的 P 被放在列表尾部,故無需繼續(xù)遍歷。
if(p->runqhead == p->runqtail) {
pidleput(p);
break;
}
// 關(guān)聯(lián)一個空閑 M 線程。
p->m = mget();
// 將準(zhǔn)備工作的 P 串成鏈表。
p->link = p1;
p1 = p;
}
// 喚醒 sysmon。
if(runtime·sched.sysmonwait) {
runtime·sched.sysmonwait = false;
runtime·notewakeup(&runtime·sched.sysmonnote);
}
runtime·unlock(&runtime·sched.lock);
// 遍歷準(zhǔn)備工作的 P。
while(p1) {
p = p1;
p1 = p1->link;
// 檢查并喚醒關(guān)聯(lián)線程 M。
if(p->m) {
mp = p->m;
runtime·notewakeup(&mp->park);
} else {
// 如果沒有關(guān)聯(lián)線程,新建。
newm(nil, p);
add = false;
}
}
// ...
}
垃圾回收操作雖然關(guān)聯(lián)很多東西,但我們基本理清了它的運作流程。如同在分配器一章中所說,垃圾回收只是將回收內(nèi)存,并沒有釋放空閑的物理內(nèi)存。
在 main goroutine 入口,運行時使用一個專用線程運行 sysmon 操作。
proc.go
// The main goroutine.
func main() {
...
onM(newsysmon)
...
main_init()
main_main()
}
proc.c
void runtime·newsysmon(void)
{
newm(sysmon, nil);
}
在 sysmon 里面會定時啟動強制垃圾回收和物理內(nèi)存釋放操作。
proc.c
static void sysmon(void)
{
// 如果超過 2 分鐘沒有運行 gc,則強制回收。
forcegcperiod = 2*60*1e9;
// 如果空閑 span 超過 5 分鐘未被使用,則釋放其關(guān)聯(lián)物理內(nèi)存。
scavengelimit = 5*60*1e9;
for(;;) {
runtime·usleep(delay);
// 啟動強制垃圾回收。
lastgc = runtime·atomicload64(&mstats.last_gc);
if(lastgc != 0 && unixnow - lastgc > forcegcperiod && ...) {
runtime·forcegc.idle = 0;
runtime·forcegc.g->schedlink = nil;
// 將強制垃圾回收 goroutine 放回任務(wù)隊列。
injectglist(runtime·forcegc.g);
}
// 啟動物理內(nèi)存釋放操作。
if(lastscavenge + scavengelimit/2 < now) {
runtime·MHeap_Scavenge(nscavenge, now, scavengelimit);
lastscavenge = now;
nscavenge++; // 計數(shù)器。
}
}
}
先說強制垃圾回收操作,這個神秘的 forcegc.g 從何而來?
proc.go
// start forcegc helper goroutine
func init() {
go forcegchelper()
}
依照 Go 語言規(guī)則,這個 init 初始化函數(shù)會被 main goroutine 執(zhí)行,它創(chuàng)建了一個用來執(zhí)行強制回收操作的 goroutine。
proc.go
func forcegchelper() {
forcegc.g = getg()
forcegc.g.issystem = true
for {
// 休眠該 goroutine。
// park 會暫停 goroutine,但不會放回待運行隊列。
goparkunlock(&forcegc.lock, "force gc (idle)")
// 喚醒后,執(zhí)行強制垃圾回收。
gogc(1)
}
}
這個 forcegc.g 會循環(huán)執(zhí)行,每次完成后休眠,直到被 sysmon 重新返回任務(wù)隊列。
為什么要定期運行強制回收?試想一下,假設(shè)回收后已分配內(nèi)存是 1GB,那么下次回收閾值就是 2GB,這可能導(dǎo)致很長時間無法觸發(fā)回收操作。這就存在很大的內(nèi)存浪費,所以強制回收是非常必要的。
接下來看看如何釋放物理內(nèi)存,這是另外一個關(guān)注焦點。
heap.c
void runtime·MHeap_Scavenge(int32 k, uint64 now, uint64 limit)
{
h = &runtime·mheap;
// 保存本次釋放的物理內(nèi)存數(shù)量。
sumreleased = 0;
// 循環(huán)處理 heap.free 里的空閑 span。
for(i=0; i < nelem(h->free); i++)
sumreleased += scavengelist(&h->free[i], now, limit);
// 處理 heap.freelarge 里的空閑 span。
sumreleased += scavengelist(&h->freelarge, now, limit);
}
釋放操作的目標(biāo)自然是 heap 里的那些空閑 span 內(nèi)存塊。
mheap.c
static uintptr scavengelist(MSpan *list, uint64 now, uint64 limit)
{
sumreleased = 0;
// 遍歷 span 鏈表。
for(s=list->next; s != list; s=s->next) {
// 條件:
// 未使用時間超過 5 分鐘;
// 已釋放物理內(nèi)存頁數(shù)不等于 span 總頁數(shù) (未釋放或部分釋放);
if((now - s->unusedsince) > limit && s->npreleased != s->npages) {
// 待釋放頁數(shù)。為什么不是全部?
released = (s->npages - s->npreleased) << PageShift;
mstats.heap_released += released;
sumreleased += released;
// 現(xiàn)在整個 span.npages 都會被釋放。
s->npreleased = s->npages;
runtime·SysUnused((void*)(s->start << PageShift), s->npages << PageShift);
}
}
return sumreleased;
}
至于 npreleased != npages 的問題,先得看看 SysUnused 做了什么。
mem_linux.c
void runtime·SysUnused(void *v, uintptr n)
{
runtime·madvise(v, n, MADV_DONTNEED);
}
mem_darwin.c
void runtime·SysUnused(void *v, uintptr n)
{
// Linux's MADV_DONTNEED is like BSD's MADV_FREE.
runtime·madvise(v, n, MADV_FREE);
}
對 Linux、darwin 等系統(tǒng)而言,MADV_DONTNEED、MADV_FREE 告訴操作系統(tǒng),這段物理內(nèi)存暫時不用,可解除 MMU 映射。再次使用時,由操作系統(tǒng)重新建立映射。
注意,盡管物理內(nèi)存被釋放了,但這個 span 管理對象依舊存活,它所占用的虛擬內(nèi)存并未釋放,依然會和左右相鄰進(jìn)行合并。這就是 npreleased 可能不等于 npages 的關(guān)鍵。
另外,在 Windows 系統(tǒng)下,事情有點特殊,它不支持類似 MADV_DONTNEED 行為。
mem_windows.c
void runtime·SysUnused(void *v, uintptr n)
{
r = runtime·stdcall3(runtime·VirtualFree, (uintptr)v, n, MEM_DECOMMIT);
}
顯然,VirtualFree 會釋放掉 span 管理的虛擬內(nèi)存。因此,從 heap 獲取 span 時需要重新分配內(nèi)存。
mheap.c
static MSpan* MHeap_AllocSpanLocked(MHeap *h, uintptr npage)
{
if(s->npreleased > 0) {
runtime·SysUsed((void*)(s->start<<PageShift), s->npages<<PageShift);
mstats.heap_released -= s->npreleased<<PageShift;
s->npreleased = 0;
}
}
mem_windows.c
void runtime·SysUsed(void *v, uintptr n)
{
r = runtime·stdcall4(runtime·VirtualAlloc, (uintptr)v, n, MEM_COMMIT,
PAGE_READWRITE);
}
除了 Windows 系統(tǒng),其他 Unix-Like 系統(tǒng)的 SysUsed 什么都不做。
mem_linux.c
void runtime·SysUsed(void *v, uintptr n)
{
USED(v);
USED(n);
}
mem_darwin.c
void runtime·SysUsed(void *v, uintptr n)
{
USED(v);
USED(n);
}
除自動回收外,還可手工調(diào)用 debug/FreeOSMemory 釋放物理內(nèi)存。
mgc0.go
func freeOSMemory() {
gogc(2) // force GC and do eager sweep
onM(scavenge_m)
}
mheap.c
void runtime·scavenge_m(void)
{
runtime·MHeap_Scavenge(-1, ~(uintptr)0, 0); // ~(uintptr)0 = 18446744073709551615
}
這個調(diào)用的參數(shù),now 是比當(dāng)前實際時間大得多的整數(shù),而 limit 是 0。這意味這所有的空閑 span 都過期,都會被釋放物理內(nèi)存。
與內(nèi)存和垃圾回收相關(guān)的狀態(tài)對象。
malloc.h
struct MStats
{
// General statistics.
uint64 alloc; // 正在使用的 object 容量 (malloc)。
uint64 total_alloc; // 歷史分配總量,含已釋放內(nèi)存。
uint64 sys; // 當(dāng)前消耗的內(nèi)存總量,包括 heap、fixalloc 等。
uint64 nmalloc; // 分配操作次數(shù)。
uint64 nfree; // 釋放操作次數(shù)。
// Statistics about malloc heap.
uint64 heap_alloc; // 同 alloc,在使用的 object 容量。
uint64 heap_sys; // 當(dāng)前消耗的 heap 內(nèi)存總量 (mmap-munmap, inuse+idle)。
uint64 heap_idle; // 空閑 span 容量。
uint64 heap_inuse; // 正在使用 span 容量。
uint64 heap_released; // 交還給操作系統(tǒng)的物理內(nèi)存容量。
uint64 heap_objects; // 正在使用的 object 數(shù)量。
// Statistics about garbage collector.
uint64 next_gc; // 下次垃圾回收閾值。
uint64 last_gc; // 上次垃圾回收結(jié)束時間。
uint32 numgc; // 垃圾回收次數(shù)。
};
統(tǒng)計狀態(tài)更新函數(shù)。
mgc0.c
void runtime·updatememstats(GCStats *stats)
{
// 重置狀態(tài)對象。
if(stats)
runtime·memclr((byte*)stats, sizeof(*stats));
for(mp=runtime·allm; mp; mp=mp->alllink) {
if(stats) {
src = (uint64*)&mp->gcstats;
dst = (uint64*)stats;
for(i=0; i<sizeof(*stats)/sizeof(uint64); i++)
dst[i] += src[i];
runtime·memclr((byte*)&mp->gcstats, sizeof(mp->gcstats));
}
}
// FixAlloc 正在使用內(nèi)存統(tǒng)計。
mstats.mcache_inuse = runtime·mheap.cachealloc.inuse;
mstats.mspan_inuse = runtime·mheap.spanalloc.inuse;
// 從系統(tǒng)獲取的內(nèi)存總量 (mmap-munmap)。
mstats.sys = mstats.heap_sys + mstats.stacks_sys + mstats.mspan_sys +
mstats.mcache_sys + mstats.buckhash_sys + mstats.gc_sys + mstats.other_sys;
mstats.alloc = 0;
mstats.total_alloc = 0;
mstats.nmalloc = 0;
mstats.nfree = 0;
for(i = 0; i < nelem(mstats.by_size); i++) {
mstats.by_size[i].nmalloc = 0;
mstats.by_size[i].nfree = 0;
}
// 將所有 P.cache.alloc 所持有的 spans 歸還給 central。
if(g == g->m->g0)
flushallmcaches();
else {
fn = flushallmcaches_m;
runtime·mcall(&fn);
}
// 更新 cache 統(tǒng)計。
cachestats();
// 統(tǒng)計所有 spans 里正在使用的 object。
for(i = 0; i < runtime·mheap.nspan; i++) {
s = runtime·mheap.allspans[i];
if(s->state != MSpanInUse)
continue;
// 統(tǒng)計活躍的 object。
if(s->sizeclass == 0) {
mstats.nmalloc++;
mstats.alloc += s->elemsize;
} else {
mstats.nmalloc += s->ref;
mstats.by_size[s->sizeclass].nmalloc += s->ref;
mstats.alloc += s->ref*s->elemsize;
}
}
// 按 size class 統(tǒng)計累計分配和釋放次數(shù)。
smallfree = 0;
mstats.nfree = runtime·mheap.nlargefree;
for(i = 0; i < nelem(mstats.by_size); i++) {
mstats.nfree += runtime·mheap.nsmallfree[i];
mstats.by_size[i].nfree = runtime·mheap.nsmallfree[i];
mstats.by_size[i].nmalloc += runtime·mheap.nsmallfree[i];
smallfree += runtime·mheap.nsmallfree[i] * runtime·class_to_size[i];
}
mstats.nfree += mstats.tinyallocs;
mstats.nmalloc += mstats.nfree;
// 總分配容量 = 正在使用 object + 已釋放容量。
mstats.total_alloc = mstats.alloc + runtime·mheap.largefree + smallfree;
mstats.heap_alloc = mstats.alloc;
mstats.heap_objects = mstats.nmalloc - mstats.nfree;
}
標(biāo)準(zhǔn)庫 runtime.ReadMemStats 函數(shù)可刷新并讀取該狀態(tài)數(shù)據(jù)。
啟用環(huán)境變量 GODEBUG="gotrace=1" 可輸出垃圾回收相關(guān)狀態(tài)信息,這有助于對程序運行狀態(tài)進(jìn)行監(jiān)控,是常見的一種測試手段。
第一類輸出信息來自垃圾回收函數(shù)。
mgc0.c
static void gc(struct gc_args *args)
{
t0 = args->start_time;
// 第 1 階段: 包括 stoptheworld、clearpools 在內(nèi)的初始化時間。
if(runtime·debug.gctrace)
t1 = runtime·nanotime();
// 第 2 階段: 標(biāo)記前的準(zhǔn)備時間。包括完成上次未結(jié)束的清理操作,準(zhǔn)備并行標(biāo)記環(huán)境等。
if(runtime·debug.gctrace)
t2 = runtime·nanotime();
// 第 3 階段: 并行標(biāo)記。
if(runtime·debug.gctrace)
t3 = runtime·nanotime();
// 第 4 階段: 收縮棧內(nèi)存,更新統(tǒng)計信息。
t4 = runtime·nanotime();
if(runtime·debug.gctrace) {
heap1 = mstats.heap_alloc;
runtime·updatememstats(&stats);
obj = mstats.nmalloc - mstats.nfree;
runtime·printf(
"gc%d(%d):" // 0, 1
" %D+%D+%D+%D us," // 2, 3, 4, 5
" %D -> %D MB," // 6, 7
" %D (%D-%D) objects," // 8, 9, 10
" %d goroutines," // 11
" %d/%d/%d sweeps," // 12, 13, 14
...,
mstats.numgc, // 0: GC 執(zhí)行次數(shù)。
runtime·work.nproc, // 1: 并行標(biāo)記線程數(shù)量。
(t1-t0)/1000, // 2: 含 StopTheWorld 在內(nèi)的初始化時間。
(t2-t1)/1000, // 3: 并行標(biāo)記準(zhǔn)備時間,包括上次未完成清理任務(wù)。
(t3-t2)/1000, // 4: 并行標(biāo)記時間。
(t4-t3)/1000, // 5: 收縮棧內(nèi)存,更新狀態(tài)等時間。
heap0>>20, // 6: 上次回收后 alloc 容量。
heap1>>20, // 7: 本次回收后 alloc 容量。
obj, // 8: 本次回收后正在使用的 object 數(shù)量。
mstats.nmalloc, // 9: 總分配次數(shù)。
mstats.nfree, // 10: 總釋放次數(shù)。
runtime·gcount(), // 11: 待運行 Goroutine 任務(wù)數(shù)量。
runtime·work.nspan, // 12: heap.spans 數(shù)量。
runtime·sweep.nbgsweep, // 13: 本次并發(fā)清理 span 次數(shù)。
runtime·sweep.npausesweep, // 14: 本次串行清理 span 次數(shù)。
...
);
}
}
在并發(fā)清理模式下,信息輸出時,清理工作尚未完成,因此標(biāo)出的容量信息并不準(zhǔn)確,只能通過多次輸出結(jié)果進(jìn)行大概評估。
第二類信息來自物理內(nèi)存釋放函數(shù)。
mheap.c
void runtime·MHeap_Scavenge(int32 k, uint64 now, uint64 limit)
{
if(runtime·debug.gctrace > 0) {
// 本次釋放的物理內(nèi)存容量。
if(sumreleased > 0)
runtime·printf("scvg%d: %D MB released\n", k, (uint64)sumreleased>>20);
runtime·printf(
"scvg%d: " // 0
"inuse: %D, " // 1
"idle: %D, " // 2
"sys: %D, " // 3
"released: %D, " // 4
"consumed: %D (MB)\n", // 5
k, // 0: 釋放次數(shù)。
mstats.heap_inuse>>20, // 1: 正在使用的 spans 容量。
mstats.heap_idle>>20, // 2: 空閑 spans 容量。
mstats.heap_sys>>20, // 3: 當(dāng)前 heap 虛擬內(nèi)存總?cè)萘俊? mstats.heap_released>>20, // 4: 已釋放物理內(nèi)存總?cè)萘俊? (mstats.heap_sys - mstats.heap_released)>>20 // 5: 實際消耗內(nèi)存容量。
);
}
}
現(xiàn)代操作系統(tǒng)通常會采用機會主義分配策略。內(nèi)核雖然承諾分配內(nèi)存,但實際并不會立即分配物理內(nèi)存。只有在發(fā)生讀寫操作時,內(nèi)核才會把之前承諾的內(nèi)存轉(zhuǎn)換為物理內(nèi)存。而且也不是一次性完成,而是以頁的方式逐步分配,按需執(zhí)行頁面請求調(diào)度和寫入時復(fù)制。
所以,相關(guān)輸出結(jié)果更多表示虛擬內(nèi)存分配值,且和具體操作系統(tǒng)也有很大關(guān)系。
調(diào)度器是運行時最核心的內(nèi)容,其基本理論建立在三種基本對象上。
首先,每次 go 關(guān)鍵詞調(diào)用都會創(chuàng)建一個 goroutine 對象,代表 G 并發(fā)任務(wù)。其次,所有 G 任務(wù)都由系統(tǒng)線程執(zhí)行,這些線程被稱作 M。
每個 G 對象都有自己的獨立棧內(nèi)存。當(dāng) M 執(zhí)行任務(wù)時,從 G 用來保存執(zhí)行現(xiàn)場的字段中恢復(fù)相關(guān)寄存器值即可。當(dāng) M 需要切換任務(wù)時,將寄存器值保存回當(dāng)前 G 對象,然后從另一 G 對象中恢復(fù),如此實現(xiàn)線程多路復(fù)用。
G 初始化棧內(nèi)存只有幾 KB 大小,按需擴張、收縮。這種輕量級設(shè)計開銷極小,可輕松創(chuàng)建成千上萬的并發(fā)任務(wù)。
除此之外,還有抽象處理器 P,其數(shù)量決定了 G 并發(fā)任務(wù)數(shù)量。每個運行 M 都必須獲取并綁定一個 P 對象,如同線程必須被調(diào)度到某個 CPU Core 才能執(zhí)行。P 還為 M 提供內(nèi)存分配器緩存和 G 任務(wù)隊列等執(zhí)行資源。
通常情況下,P 數(shù)量在初始化時確定,運行時基本固定,但 M 的數(shù)量未必和 P 對應(yīng)。例如,某 M 因系統(tǒng)調(diào)用長時間阻塞,其關(guān)聯(lián) P 就會被運行時收回。然后,調(diào)度器會喚醒或新建 M 去執(zhí)行其他排隊任務(wù)。失去 P 的 M 被休眠,直到被重新喚醒。
由匯編代碼實現(xiàn)的 bootstrap 過程。
rt0_linux_amd64.s
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
LEAQ 8(SP), SI // argv
MOVQ 0(SP), DI // argc
MOVQ $main(SB), AX
JMP AX
TEXT main(SB),NOSPLIT,$-8
MOVQ $runtime·rt0_go(SB), AX
JMP AX
要確定這個很簡單,隨便找個可執(zhí)行文件,然后反匯編 entry point 即可。
(gdb) info files
Local exec file:
Entry point: 0x437940
(gdb) disass 0x437940
Dump of assembler code for function _rt0_amd64_linux:
現(xiàn)在可以確定初始化調(diào)用由 rt0_go 匯編完成。
amd_asm64.s
TEXT runtime·rt0_go(SB),NOSPLIT,$0
LEAQ runtime·g0(SB), CX
LEAQ runtime·m0(SB), AX
MOVQ CX, m_g0(AX) // save m->g0 = g0
MOVQ AX, g_m(CX) // save m0 to g0->m
CALL runtime·args(SB)
CALL runtime·osinit(SB)
CALL runtime·schedinit(SB)
// create a new goroutine to start program
MOVQ $runtime·main·f(SB), BP // entry
PUSHQ BP
PUSHQ $0 // arg size
CALL runtime·newproc(SB)
POPQ AX
POPQ AX
// start this M
CALL runtime·mstart(SB)
MOVL $0xf1, 0xf1 // crash
RET
按圖索驥,可以看到初始化過程相關(guān)的幾個函數(shù)都做了什么。
runtime.h
MaxGomaxprocs = 1<<8, // The max value of GOMAXPROCS.
proc.c
void runtime·schedinit(void)
{
// 設(shè)置最大 M 線程數(shù),超出會導(dǎo)致進(jìn)程崩潰。
runtime·sched.maxmcount = 10000;
// 初始化內(nèi)存分配器。
runtime·mallocinit();
// 獲取命令行參數(shù)、環(huán)境變量。
runtime·goargs();
runtime·goenvs();
// 垃圾回收器初始化。
runtime·gcinit();
// 初始化 P。
procs = 1;
p = runtime·getenv("GOMAXPROCS");
if(p != nil && (n = runtime·atoi(p)) > 0) {
if(n > MaxGomaxprocs)
n = MaxGomaxprocs;
procs = n;
}
procresize(procs);
}
其中內(nèi)存分配器、垃圾回收器前面都已研究過,此處不多費唇舌?,F(xiàn)在需要關(guān)心是 procs 這個最關(guān)鍵的 goroutine 并發(fā)控制參數(shù)。
proc.c
SchedT runtime·sched; // 調(diào)度器實例。
int32 runtime·gomaxprocs; // 當(dāng)前 GOMAXPROCS 值。
P* runtime·allp[MaxGomaxprocs+1]; // 存儲所有的 P 對象,最多 256 個實例。
proc.c
static void procresize(int32 new)
{
old = runtime·gomaxprocs;
// 初始化新 P 對象。
for(i = 0; i < new; i++) {
p = runtime·allp[i];
// 新建 P。
if(p == nil) {
p = runtime·newP();
p->id = i;
p->status = Pgcstop;
runtime·atomicstorep(&runtime·allp[i], p);
}
// 創(chuàng)建 P.cache。
if(p->mcache == nil) {
if(old==0 && i==0)
p->mcache = g->m->mcache; // bootstrap
else
p->mcache = runtime·allocmcache();
}
}
// 將 old P 里面的任務(wù)重新分布。
empty = false;
while(!empty) {
empty = true;
// 內(nèi)層 for 循環(huán)遍歷所有 old P,每次從中取一個 G 任務(wù)。
// 外層 while 循環(huán)重復(fù)該過程,如此所有先生成的 G 會保存到全局隊列的前面,F(xiàn)IFO。
for(i = 0; i < old; i++) {
p = runtime·allp[i];
// 檢查 P 的 G 任務(wù)隊列。
if(p->runqhead == p->runqtail)
continue;
empty = false;
// 獲取尾部最后一個 G。
p->runqtail--;
gp = p->runq[p->runqtail%nelem(p->runq)];
// 將 G 添加到全局任務(wù)鏈表。
gp->schedlink = runtime·sched.runqhead;
runtime·sched.runqhead = gp;
if(runtime·sched.runqtail == nil)
runtime·sched.runqtail = gp;
runtime·sched.runqsize++;
}
}
// 將最多 new * (256/2) 個任務(wù)轉(zhuǎn)移到 P 本地隊列。
for(i = 1; i < new * nelem(p->runq)/2 && runtime·sched.runqsize > 0; i++) {
gp = runtime·sched.runqhead;
runtime·sched.runqhead = gp->schedlink;
if(runtime·sched.runqhead == nil)
runtime·sched.runqtail = nil;
runtime·sched.runqsize--;
runqput(runtime·allp[i%new], gp);
}
// 如果 new < old,"釋放" 掉多余的 P 對象。
for(i = new; i < old; i++) {
p = runtime·allp[i];
runtime·freemcache(p->mcache);
p->mcache = nil;
gfpurge(p);
p->status = Pdead;
// can't free P itself because it can be referenced by an M in syscall
}
// 關(guān)聯(lián) P 到當(dāng)前 M。
p = runtime·allp[0];
acquirep(p);
// 將其他 P 放到空閑隊列。
for(i = new-1; i > 0; i--) {
p = runtime·allp[i];
p->status = Pidle;
pidleput(p);
}
runtime·atomicstore((uint32*)&runtime·gomaxprocs, new);
}
待運行的 G 任務(wù)保存在 P 本地隊列和全局隊列中,因此增加或減少 P 數(shù)量都需要重新分布這些任務(wù)。還須確保先生成的 G 任務(wù)優(yōu)先放到隊列頭部,以優(yōu)先執(zhí)行。
在完成調(diào)度器初始化后,創(chuàng)建新 goroutine 運行 main 函數(shù)。
proc.go
// The main goroutine.
func main() {
// 當(dāng)前 G。
g := getg()
// 確定最大棧內(nèi)存大小。
if ptrSize == 8 {
maxstacksize = 1000000000 // 1 GB
} else {
maxstacksize = 250000000 // 250 MB
}
// 使用單獨線程運行 sysmon。
onM(newsysmon)
runtime_init()
main_init()
main_main()
// 終止進(jìn)程。
exit(0)
}
編譯器會將每條 go func 語句編譯成 newproc 函數(shù)調(diào)用,創(chuàng)建 G 對象。
反編譯一個簡單的示例。
test.go
package main
import ()
func main() {
go println("Hello, World!")
}
(gdb) disass main.main
Dump of assembler code for function main.main:
0x000000000000202f <+47>:" lea rcx,[rip+0xff582] # 0x1015b8 <main.print.1.f>
0x0000000000002036 <+54>:" push rcx
0x0000000000002037 <+55>:" push 0x10
0x0000000000002039 <+57>:" call 0x2e880 <runtime.newproc>
先熟悉 G 里面幾個常見的字段成員。
runtime.h
struct Stack
{
uintptr lo; // 棧內(nèi)存開始地址。
uintptr hi; // 結(jié)束地址。
};
struct Gobuf
{
uintptr sp; // 對應(yīng) SP 寄存器。
uintptr pc; // IP/PC 寄存器。
void* ctxt;
uintreg ret;
uintptr lr; // ARM LR 寄存器。
};
struct G
{
Stack stack; // 自定義棧。
uintptr stackguard0; // 棧溢出檢查邊界。
Gobuf sched; // 執(zhí)行現(xiàn)場。
G* schedlink; // 鏈表。
};
跟蹤 newproc 的調(diào)用過程,最終目標(biāo)是 newproc1。
proc.c
G* runtime·newproc1(FuncVal *fn, byte *argp, int32 narg, int32 nret, void *callerpc)
{
siz = narg + nret;
siz = (siz+7) & ~7; // 8 字節(jié)對齊
// 當(dāng)前 P。
p = g->m->p;
// 獲取可復(fù)用的空閑 G 對象,或新建。
if((newg = gfget(p)) == nil) {
newg = runtime·malg(StackMin);
runtime·casgstatus(newg, Gidle, Gdead);
// 添加到 allg 全局變量。
runtime·allgadd(newg);
}
// 將參數(shù)和返回值入棧。
sp = (byte*)newg->stack.hi;
sp -= 4*sizeof(uintreg);
sp -= siz;
runtime·memmove(sp, argp, narg);
// thechar 5 代表 ARM,在 arch_xxx.h 中定義。
// 因為 ARM 需要額外保存 Caller's LR 寄存器值。
if(thechar == '5') {
// caller's LR
sp -= sizeof(void*);
*(void**)sp = nil;
}
// 在 sched 里保存執(zhí)行現(xiàn)場參數(shù)。
runtime·memclr((byte*)&newg->sched, sizeof newg->sched);
newg->sched.sp = (uintptr)sp;
newg->sched.pc = (uintptr)runtime·goexit + PCQuantum;
newg->sched.g = newg;
// 這個調(diào)用很關(guān)鍵,不過我們在后面詳說。
runtime·gostartcallfn(&newg->sched, fn);
newg->gopc = (uintptr)callerpc;
runtime·casgstatus(newg, Gdead, Grunnable);
// 將生成的 G 對象放到 P 本地隊列或全局隊列。
runqput(p, newg);
// 如果有空閑 P,且沒有處于自旋狀態(tài)的 M ...
if(runtime·atomicload(&runtime·sched.npidle) != 0 &&
runtime·atomicload(&runtime·sched.nmspinning) == 0 &&
fn->fn != runtime·main)
// 喚醒一個休眠的 M,或新建。
wakep();
return newg;
}
提取可復(fù)用 G 對象,將參數(shù)、返回值入棧,設(shè)置執(zhí)行現(xiàn)場的寄存器值。最后,放到待運行隊列等待被 M 執(zhí)行。
P 使用 gfree 鏈表存儲可復(fù)用 G 對象,這很好理解。除本地復(fù)用鏈表外,還有一個全局復(fù)用鏈表。當(dāng)某 P 本地鏈表過長時,就轉(zhuǎn)移一部分到全局鏈表,以供其他 P 使用。
runtime.h
struct SchedT
{
// Global cache of dead G's. (任務(wù)結(jié)束,復(fù)用對象)
G* gfree;
int32 ngfree;
};
struct P
{
// Available G's (status == Gdead)
G* gfree;
int32 gfreecnt;
};
proc.c
static G* gfget(P *p)
{
G *gp;
void (*fn)(G*);
retry:
// 從 P 本地鏈表獲取一個可復(fù)用 G 對象。
gp = p->gfree;
// 如果為空,轉(zhuǎn)向全局鏈表。
if(gp == nil && runtime·sched.gfree) {
// 從全局鏈表提取一些復(fù)用對象到本地,直到填滿 32 個。
while(p->gfreecnt < 32 && runtime·sched.gfree != nil) {
p->gfreecnt++;
gp = runtime·sched.gfree;
runtime·sched.gfree = gp->schedlink;
runtime·sched.ngfree--;
gp->schedlink = p->gfree;
p->gfree = gp;
}
// 填充后再從本地鏈表獲取。
goto retry;
}
// 如果找到可復(fù)用 G 對象。
if(gp) {
// 調(diào)整本地鏈表。
p->gfree = gp->schedlink;
p->gfreecnt--;
// 檢查自定義棧。
if(gp->stack.lo == 0) {
// 重新分配棧內(nèi)存。
if(g == g->m->g0) {
gp->stack = runtime·stackalloc(FixedStack);
} else {
g->m->scalararg[0] = FixedStack;
g->m->ptrarg[0] = gp;
fn = mstackalloc;
runtime·mcall(&fn);
g->m->ptrarg[0] = nil;
}
// 設(shè)置棧頂。
gp->stackguard0 = gp->stack.lo + StackGuard;
}
}
return gp;
}
暫時不去理會自定義棧,后面有專門的章節(jié)說明這個問題。
沒有可復(fù)用對象時,新建。
proc.c
G* runtime·malg(int32 stacksize)
{
G *newg;
void (*fn)(G*);
// 新建 G 對象。
newg = allocg();
// 分配自定義棧內(nèi)存。
if(stacksize >= 0) {
stacksize = runtime·round2(StackSystem + stacksize);
if(g == g->m->g0) {
newg->stack = runtime·stackalloc(stacksize);
} else {
g->m->scalararg[0] = stacksize;
g->m->ptrarg[0] = newg;
fn = mstackalloc;
runtime·mcall(&fn);
g->m->ptrarg[0] = nil;
}
newg->stackguard0 = newg->stack.lo + StackGuard;
newg->stackguard1 = ~(uintptr)0;
}
return newg;
}
static G* allocg(void)
{
return runtime·newG();
}
proc.go
func newG() *g {
return new(g)
}
新建 G 對象被添加到全局變量 allg。
proc.c
Slice runtime·allgs; // Go Slice。
G** runtime·allg; // 當(dāng)前所有 G 對象,包括完成任務(wù),等待復(fù)用的。
uintptr runtime·allglen; // 數(shù)量。
proc.go
func allgadd(gp *g) {
allgs = append(allgs, gp)
allg = &allgs[0]
allglen = uintptr(len(allgs))
}
所有參數(shù)設(shè)置好后,G 對象所代表的并發(fā)任務(wù)被放入待運行隊列。
runtime.h
struct SchedT
{
// Global runnable queue. (待運行任務(wù))
G* runqhead;
G* runqtail;
int32 runqsize;
};
struct P
{
// Queue of runnable goroutines. (用數(shù)組實現(xiàn)的環(huán)狀隊列)
uint32 runqhead;
uint32 runqtail;
G* runq[256];
};
proc.c
static void runqput(P *p, G *gp)
{
uint32 h, t;
retry:
// 很典型的數(shù)組環(huán)狀隊列實現(xiàn)。
// 累加 head、tail 位置計數(shù)器,然后取模獲取實際存儲索引。
h = runtime·atomicload(&p->runqhead);
t = p->runqtail;
if(t - h < nelem(p->runq)) {
p->runq[t%nelem(p->runq)] = gp;
runtime·atomicstore(&p->runqtail, t+1);
return;
}
// 如果本地隊列已滿,則放入全局待運行隊列。
if(runqputslow(p, gp, h, t))
return;
goto retry;
}
static bool runqputslow(P *p, G *gp, uint32 h, uint32 t)
{
// 從本地隊列提取一半待運行 G 任務(wù)。
n = t-h;
n = n/2;
for(i=0; i<n; i++)
batch[i] = p->runq[(h+i)%nelem(p->runq)];
// 調(diào)整本地隊列位置。
if(!runtime·cas(&p->runqhead, h, h+n))
return false;
// 添加當(dāng)前 G。
batch[n] = gp;
// 鏈表結(jié)構(gòu)。
for(i=0; i<n; i++)
batch[i]->schedlink = batch[i+1];
// 將這一批 G 放到全局隊列。
globrunqputbatch(batch[0], batch[n], n+1);
return true;
}
static void globrunqputbatch(G *ghead, G *gtail, int32 n)
{
// 直接將鏈表附加到全局鏈表尾部。
gtail->schedlink = nil;
if(runtime·sched.runqtail)
runtime·sched.runqtail->schedlink = ghead;
else
runtime·sched.runqhead = ghead;
runtime·sched.runqtail = gtail;
runtime·sched.runqsize += n;
}
兩個隊列采用了不同的設(shè)計。本地隊列長度固定,用數(shù)組自然是效率最高。而全局隊列長度未知,只能用鏈表實現(xiàn)。
調(diào)度器在很多地方都采用兩級隊列設(shè)計,本地隊列是為了當(dāng)前線程無鎖獲取資源,而全局隊列則是為了在多個 P/M 間進(jìn)行平衡。當(dāng) P 管理的對象數(shù)量過多時就會上交一部分到全局,反過來,就從全局提取一批到本地??傊?,最終目的是為了更好地復(fù)用內(nèi)存,更快地完成任務(wù)執(zhí)行。
不管語言層面如何抽象,所有 G 任務(wù)總歸要由線程執(zhí)行,每個系統(tǒng)線程對應(yīng)一個 M。
runtime.h
struct M
{
G* g0; // 運行時管理棧。
void (*mstartfn)(void); // 啟動函數(shù),比如執(zhí)行 sysmon。
G* curg; // 當(dāng)前運行的 G。
P* p; // 當(dāng)前關(guān)聯(lián)的 P。
P* nextp; // 臨時存放獲取的 P,用于后續(xù)任務(wù)。
Note park; // 休眠標(biāo)記。
M* alllink; // 全局 allm 鏈表。
};
先了解 M 的創(chuàng)建過程。
proc.c
static void newm(void(*fn)(void), P *p)
{
// 創(chuàng)建 M 對象。
mp = runtime·allocm(p);
// 設(shè)置待綁定 P 和啟動函數(shù)。
mp->nextp = p;
mp->mstartfn = fn;
// 創(chuàng)建系統(tǒng)線程。
runtime·newosproc(mp, (byte*)mp->g0->stack.hi);
}
M* runtime·allocm(P *p)
{
mp = runtime·newM();
// 初始化。
mcommoninit(mp);
// 創(chuàng)建一個 G,用于初始化 g0 棧。
if(runtime·iscgo || Solaris || Windows || Plan9)
mp->g0 = runtime·malg(-1);
else
mp->g0 = runtime·malg(8192);
mp->g0->m = mp;
return mp;
}
調(diào)度器會檢查 M 總數(shù),如超出限制會導(dǎo)致進(jìn)程崩潰。默認(rèn) 10000,多數(shù)時候無需關(guān)心,也可調(diào)用 debug/SetMaxThreads 修改。
proc.c
static void mcommoninit(M *mp)
{
// 增加計數(shù)器,設(shè)置 ID。
mp->id = runtime·sched.mcount++;
// 檢查系統(tǒng)當(dāng)前 M 總數(shù),如果超出限制,引發(fā)進(jìn)程崩潰。
checkmcount();
// 添加到全局鏈表。
mp->alllink = runtime·allm;
runtime·atomicstorep(&runtime·allm, mp);
}
static void checkmcount(void)
{
if(runtime·sched.mcount > runtime·sched.maxmcount){
runtime·printf("runtime: program exceeds %d-thread limit\n",
runtime·sched.maxmcount);
runtime·throw("thread exhaustion");
}
}
最關(guān)鍵的是 newosproc 創(chuàng)建系統(tǒng)線程。
os_linux.c
void runtime·newosproc(M *mp, void *stk)
{
flags = CLONE_VM /* share memory */
| CLONE_FS /* share cwd, etc */
| CLONE_FILES /* share fd table */
| CLONE_SIGHAND /* share sig handler table */
| CLONE_THREAD; /* revisit - okay for now */
ret = runtime·clone(flags, stk, mp, mp->g0, runtime·mstart);
}
os_darwin.c
void runtime·newosproc(M *mp, void *stk)
{
errno = runtime·bsdthread_create(stk, mp, mp->g0, runtime·mstart);
}
我們看到了線程函數(shù) mstart,這是后面要跟蹤的目標(biāo)。
M 有個很神秘的 g0 成員,它被傳遞給 newosproc 作為線程棧內(nèi)存,用來執(zhí)行運行時管理指令,以避免在 G 用戶棧上切換上下文。
假如 M 線程直接使用 G 棧,那么就不能在執(zhí)行管理操作時將它放回隊列,也不能轉(zhuǎn)交給其他 M 執(zhí)行,那會導(dǎo)致多個線程共用棧內(nèi)存。同樣不能執(zhí)行用戶棧的擴張或收縮操作。因此,在執(zhí)行管理指令前,必須將線程棧切換到 g0。
在前面章節(jié)中時常出現(xiàn)的 onM、mcall 就是用 g0 來執(zhí)行管理命令。
runtime.h
struct M
{
uintptr scalararg[4]; // scalar argument/return for mcall
void* ptrarg[4]; // pointer argument/return for mcall
};
asm_amd64.s
TEXT runtime·mcall(SB), NOSPLIT, $0-8
MOVQ fn+0(FP), DI // DI 保存要運行的管理函數(shù)指針。
// 保存當(dāng)前 G 執(zhí)行現(xiàn)場。
get_tls(CX)
MOVQ g(CX), AX // save state in g->sched
MOVQ 0(SP), BX // caller's PC
MOVQ BX, (g_sched+gobuf_pc)(AX)
LEAQ fn+0(FP), BX // caller's SP
MOVQ BX, (g_sched+gobuf_sp)(AX)
MOVQ AX, (g_sched+gobuf_g)(AX)
// 切換到 g0 棧,執(zhí)行管理函數(shù)。
MOVQ g(CX), BX // g
MOVQ g_m(BX), BX // g.m
MOVQ m_g0(BX), SI // m.g0
MOVQ SI, g(CX) // g = m->g0
MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp
PUSHQ AX
MOVQ DI, DX
MOVQ 0(DI), DI
CALL DI // fn arg
RET
在創(chuàng)建 G 時,調(diào)度器會調(diào)用 wakep 喚醒 M 執(zhí)行任務(wù)。
proc.c
static void wakep(void)
{
startm(nil, true);
}
static void startm(P *p, bool spinning)
{
M *mp;
void (*fn)(void);
// 獲取空閑 P。如果沒有,直接返回。
if(p == nil) {
p = pidleget();
if(p == nil) {
return;
}
}
// 獲取空閑 M, 或新建。
mp = mget();
if(mp == nil) {
fn = nil;
newm(fn, p);
return;
}
// 臨時保存待用 P。
mp->nextp = p;
// 喚醒。
runtime·notewakeup(&mp->park);
}
static M* mget(void)
{
// 從空閑列表獲取 M。
if((mp = runtime·sched.midle) != nil){
runtime·sched.midle = mp->schedlink;
runtime·sched.nmidle--;
}
return mp;
}
當(dāng) M 線程找不到后續(xù)待運行 G 任務(wù),或因某種原因被剝奪關(guān)聯(lián) P 時,會休眠線程,并被保存到 sched.midle 空閑鏈表中,直到被重新獲取、喚醒。
proc.c
static void stopm(void)
{
...
retry:
// 添加到空閑鏈表。
mput(g->m);
// 休眠線程,直到被喚醒后繼續(xù)執(zhí)行。
runtime·notesleep(&g->m->park);
// 被喚醒后,清除休眠標(biāo)志。
runtime·noteclear(&g->m->park);
// 處理 GC 任務(wù) (這個因為 StopTheWorld,并不需要 P)。
if(g->m->helpgc) {
runtime·gchelper();
g->m->helpgc = 0;
g->m->mcache = nil;
goto retry;
}
// 既然被喚醒,必然獲取了可用 P,關(guān)聯(lián)。
acquirep(g->m->nextp);
g->m->nextp = nil;
}
static void mput(M *mp)
{
// 添加到空閑鏈表。
mp->schedlink = runtime·sched.midle;
runtime·sched.midle = mp;
runtime·sched.nmidle++;
}
static void acquirep(P *p)
{
g->m->mcache = p->mcache;
g->m->p = p;
p->m = g->m;
p->status = Prunning;
}
休眠操作通過 futex 實現(xiàn),這是一種快速用戶區(qū)互斥實現(xiàn)。該鎖定在用戶空間用原子指令完成,只在結(jié)果不一致時才進(jìn)入系統(tǒng)內(nèi)核,有非常高的執(zhí)行效率。
lock_futex.go
func notesleep(n *note) {
for atomicload(key32(&n.key)) == 0 {
futexsleep(key32(&n.key), 0, -1) // 休眠直到被喚醒 (timeout = -1)。
} // 喚醒后,n.key = 1,終止循環(huán)。
}
os_linux.c
void runtime·futexsleep(uint32 *addr, uint32 val, int64 ns)
{
Timespec ts;
// 不超時。
if(ns < 0) {
runtime·futex(addr, FUTEX_WAIT, val, nil, nil, 0);
return;
}
ts.tv_nsec = 0;
ts.tv_sec = runtime·timediv(ns, 1000000000LL, (int32*)&ts.tv_nsec);
runtime·futex(addr, FUTEX_WAIT, val, &ts, nil, 0);
}
喚醒操作會修改標(biāo)記值,成功后調(diào)用 noteclear 重置狀態(tài)。
lock_futex.go
func notewakeup(n *note) {
old := xchg(key32(&n.key), 1)
futexwakeup(key32(&n.key), 1)
}
func noteclear(n *note) {
n.key = 0
}
os_linux.c
void runtime·futexwakeup(uint32 *addr, uint32 cnt)
{
ret = runtime·futex(addr, FUTEX_WAKE, cnt, nil, nil, 0);
}
線程函數(shù) mstart 讓 M 進(jìn)入調(diào)度器核心循環(huán),它不停從 P 本地隊列、全局隊列查找并執(zhí)行待運行 G 任務(wù)。期間,會處理一下垃圾回收等額外操作,完成后繼續(xù)回來執(zhí)行任務(wù)。
proc.c
static void mstart(void)
{
// 執(zhí)行啟動函數(shù)。
if(g->m->mstartfn)
g->m->mstartfn();
if(g->m->helpgc) {
// 如果正在垃圾回收,休眠線程。
g->m->helpgc = 0;
stopm();
} else if(g->m != &runtime·m0) {
// 關(guān)聯(lián) P。
acquirep(g->m->nextp);
g->m->nextp = nil;
}
// 執(zhí)行調(diào)度函數(shù)。
schedule();
}
核心循環(huán)過程: schedule -> execute -> G.func -> goexit 。
proc.c
static void schedule(void)
{
gp = nil;
// 當(dāng)前 P 任務(wù)執(zhí)行次數(shù)計數(shù)器。
tick = g->m->p->schedtick;
// 每隔 61 次,就從全局隊列提取一個任務(wù),以確保公平。
// This is a fancy way to say tick%61==0,
if(tick - (((uint64)tick*0x4325c53fu)>>36)*61 == 0 && runtime·sched.runqsize > 0) {
gp = globrunqget(g->m->p, 1); // 僅返回一個 G,不轉(zhuǎn)移。
}
// 從本地隊列提取任務(wù)。
if(gp == nil) {
gp = runqget(g->m->p);
}
// 從其他地方查找任務(wù)。
if(gp == nil) {
gp = findrunnable(); // blocks until work is available
}
// 執(zhí)行任務(wù)。
execute(gp);
}
全局隊列存儲了超出 P 本地數(shù)量限制的待運行任務(wù),是所有 P/M 的后備資源。
proc.c
static G* globrunqget(P *p, int32 max)
{
G *gp, *gp1;
int32 n;
if(runtime·sched.runqsize == 0)
return nil;
// 確定要轉(zhuǎn)移的任務(wù)數(shù)。
n = runtime·sched.runqsize/runtime·gomaxprocs+1;
if(n > runtime·sched.runqsize)
n = runtime·sched.runqsize;
if(max > 0 && n > max)
n = max;
if(n > nelem(p->runq)/2)
n = nelem(p->runq)/2;
runtime·sched.runqsize -= n;
if(runtime·sched.runqsize == 0)
runtime·sched.runqtail = nil;
// 將第一個任務(wù)返回。
gp = runtime·sched.runqhead;
runtime·sched.runqhead = gp->schedlink;
n--;
// 轉(zhuǎn)移一批任務(wù)到本地隊列。
while(n--) {
gp1 = runtime·sched.runqhead;
runtime·sched.runqhead = gp1->schedlink;
runqput(p, gp1);
}
return gp;
}
本地隊列優(yōu)先為線程提供無鎖任務(wù)獲取。
proc.c
static G* runqget(P *p)
{
G *gp;
uint32 t, h;
// 從數(shù)組循環(huán)隊列返回任務(wù)。
for(;;) {
h = runtime·atomicload(&p->runqhead);
t = p->runqtail;
if(t == h)
return nil;
gp = p->runq[h%nelem(p->runq)];
if(runtime·cas(&p->runqhead, h, h+1))
return gp;
}
}
如果本地和全局隊列中都沒找到可用任務(wù),調(diào)度器就會費盡心思檢查各個角落。包括網(wǎng)絡(luò)任務(wù),甚至是從其他 P 隊列中偷一些過來。
proc.c
static G* findrunnable(void)
{
top:
// 本地隊列。
gp = runqget(g->m->p);
if(gp)
return gp;
// 全局隊列。
if(runtime·sched.runqsize) {
gp = globrunqget(g->m->p, 0); // 轉(zhuǎn)移一批到本地。
if(gp)
return gp;
}
// 網(wǎng)絡(luò)任務(wù)。
gp = runtime·netpoll(false); // non-blocking
if(gp) {
injectglist(gp->schedlink); // 插入全局隊列。
runtime·casgstatus(gp, Gwaiting, Grunnable);
return gp;
}
// 從其他 P 偷一些任務(wù)。
for(i = 0; i < 2*runtime·gomaxprocs; i++) {
p = runtime·allp[runtime·fastrand1()%runtime·gomaxprocs]; // 隨機選擇。
if(p == g->m->p) // 當(dāng)前 P。
gp = runqget(p);
else // 其他 P。
gp = runqsteal(g->m->p, p);
if(gp)
return gp;
}
stop:
// 再次檢查全局隊列。
if(runtime·sched.runqsize) {
gp = globrunqget(g->m->p, 0);
return gp;
}
// 解除 P 綁定,返回空閑列表。
p = releasep();
pidleput(p);
// 循環(huán)檢查其他 P 任務(wù)列表,如果有未完成任務(wù),那么跳轉(zhuǎn)到 top 重試,以便偷一些過來。
for(i = 0; i < runtime·gomaxprocs; i++) {
p = runtime·allp[i];
if(p && p->runqhead != p->runqtail) {
// 重新關(guān)聯(lián) P。
p = pidleget();
if(p) {
acquirep(p);
goto top;
}
break;
}
}
// 再次檢查網(wǎng)絡(luò)任務(wù)。
if(runtime·xchg64(&runtime·sched.lastpoll, 0) != 0) {
gp = runtime·netpoll(true); // block until new work is available
runtime·atomicstore64(&runtime·sched.lastpoll, runtime·nanotime());
if(gp) {
p = pidleget();
if(p) {
// 重新關(guān)聯(lián) P。
acquirep(p);
// 將其他任務(wù)添加到全局隊列。
injectglist(gp->schedlink);
runtime·casgstatus(gp, Gwaiting, Grunnable);
return gp;
}
// 如果沒有可用 P,添加到全局隊列。
injectglist(gp);
}
}
// 如果什么任務(wù)都沒拿到,休眠當(dāng)前線程。
stopm();
goto top;
}
這種偷竊行為就是官方文檔所提及的 workstealing算法。
proc.c
static G* runqsteal(P *p, P *p2)
{
G *gp;
G *batch[nelem(p->runq)/2];
// 將 P2 一半任務(wù)轉(zhuǎn)移到 batch。
n = runqgrab(p2, batch);
if(n == 0)
return nil;
// 返回一個任務(wù)。
n--;
gp = batch[n];
if(n == 0)
return gp;
// 將剩余任務(wù)添加到 P 隊列。
h = runtime·atomicload(&p->runqhead);
t = p->runqtail;
for(i=0; i<n; i++, t++)
p->runq[t%nelem(p->runq)] = batch[i];
runtime·atomicstore(&p->runqtail, t);
return gp;
}
等拿到 G 任務(wù),接下來就交由 execute 負(fù)責(zé)執(zhí)行。
proc.c
static void execute(G *gp)
{
// 修改狀態(tài)。
runtime·casgstatus(gp, Grunnable, Grunning);
gp->waitsince = 0;
gp->preempt = false;
gp->stackguard0 = gp->stack.lo + StackGuard;
g->m->p->schedtick++; // 執(zhí)行計數(shù)器。
g->m->curg = gp;
gp->m = g->m;
runtime·gogo(&gp->sched);
}
asm_amd64.s
TEXT runtime·gogo(SB), NOSPLIT, $0-8
MOVQ buf+0(FP), BX // gobuf
MOVQ gobuf_g(BX), DX
MOVQ 0(DX), CX // make sure g != nil
get_tls(CX)
MOVQ DX, g(CX)
MOVQ gobuf_sp(BX), SP // 從 G.sched 恢復(fù)執(zhí)行現(xiàn)場。
MOVQ gobuf_ret(BX), AX
MOVQ gobuf_ctxt(BX), DX
MOVQ gobuf_pc(BX), BX // G.sched.pc 指向 goroutine 任務(wù)函數(shù)。
JMP BX // 執(zhí)行該函數(shù)。
匯編函數(shù)從 sched 恢復(fù)現(xiàn)場,將寄存器指向 G 用戶棧,然后跳轉(zhuǎn)到任務(wù)函數(shù)開始執(zhí)行。
這里有個問題,匯編函數(shù)并沒有保存 execute 返回現(xiàn)場,也就是說等任務(wù)函數(shù)結(jié)束后,執(zhí)行緒不會回到 execute。那 goexit 如何執(zhí)行?如何繼續(xù)循環(huán)調(diào)用?
要解釋清這個問題,需要埋一個在創(chuàng)建任務(wù)時留下的坑:gostartcallfn。
proc.c
G* runtime·newproc1(FuncVal *fn, byte *argp, int32 narg, int32 nret, void *callerpc)
{
...
newg->sched.sp = (uintptr)sp;
newg->sched.pc = (uintptr)runtime·goexit + PCQuantum;
newg->sched.g = newg;
runtime·gostartcallfn(&newg->sched, fn);
...
return newg;
}
stack.c
void runtime·gostartcallfn(Gobuf *gobuf, FuncVal *fv)
{
runtime·gostartcall(gobuf, fn, fv);
}
sys_x86.c
void runtime·gostartcall(Gobuf *gobuf, void (*fn)(void), void *ctxt)
{
sp = (uintptr*)gobuf->sp;
// 將 pc,也就是 goexit 地址入棧。
*--sp = (uintptr)gobuf->pc;
gobuf->sp = (uintptr)sp;
// 將 pc 指向 fn。
gobuf->pc = (uintptr)fn;
gobuf->ctxt = ctxt;
}
很有意思,在 gostartcall 中,提前將 goexit 地址壓入 G 棧。
匯編函數(shù) gogo 是 long jmp,也就是說當(dāng)任務(wù)函數(shù)執(zhí)行結(jié)束時,其尾部 RET 指令從棧上彈給 PC 寄存器的是 goexit 地址,這就是秘密所在。
asm_amd64.s
TEXT runtime·goexit(SB),NOSPLIT,$0-0
BYTE $0x90 // NOP
CALL runtime·goexit1(SB) // does not return
proc.c
void runtime·goexit1(void)
{
fn = goexit0;
runtime·mcall(&fn);
}
static void goexit0(G *gp)
{
runtime·casgstatus(gp, Grunning, Gdead);
// 將 G 放回 P.gfree 復(fù)用鏈表。
gfput(g->m->p, gp);
schedule();
}
任務(wù)結(jié)束,當(dāng)前 G 對象被放回復(fù)用鏈表,而線程則繼續(xù) schedule 循環(huán)往復(fù)。
proc.c
static void gfput(P *p, G *gp)
{
stksize = gp->stack.hi - gp->stack.lo;
// 如果不是默認(rèn)棧,釋放。
if(stksize != FixedStack) {
runtime·stackfree(gp->stack);
gp->stack.lo = 0;
gp->stack.hi = 0;
gp->stackguard0 = 0;
}
// 添加到復(fù)用鏈表。
gp->schedlink = p->gfree;
p->gfree = gp;
p->gfreecnt++;
// 如果 P 復(fù)用鏈表過長 ...
if(p->gfreecnt >= 64) {
// 將超出的復(fù)用對象轉(zhuǎn)移到全局鏈表。
while(p->gfreecnt >= 32) {
p->gfreecnt--;
gp = p->gfree;
p->gfree = gp->schedlink;
gp->schedlink = runtime·sched.gfree;
runtime·sched.gfree = gp;
runtime·sched.ngfree++;
}
}
}
另外,在 schedule 里還提到 lockedg,這表示該 G 任務(wù)只能交由指定 M 執(zhí)行,且該 M在綁定解除前會被休眠,不再執(zhí)行其他任務(wù)。當(dāng) M 遇到 lockedg,需要將 P 和 G 都交給綁定 M 去執(zhí)行。
proc.c
static void schedule(void)
{
// 如果當(dāng)前 M 被綁定給某個 G,那么交出 P,休眠。
// 直到被某個拿到 locked G 的 M 喚醒。
if(g->m->lockedg) {
stoplockedm();
execute(g->m->lockedg); // Never returns.
}
...
// 如果當(dāng)前 G 被綁定到某個 M,那么將 P 和 G 都交給對方。
// 喚醒綁定 M,自己回空閑隊列。
if(gp->lockedm) {
startlockedm(gp);
goto top; // 喚醒后回到頭部重新獲取任務(wù)。
}
}
static void startlockedm(G *gp)
{
// 獲取 G 綁定的 M。
mp = gp->lockedm;
// 將當(dāng)前 P 交給對方,并喚醒。
p = releasep();
mp->nextp = p;
runtime·notewakeup(&mp->park);
// 休眠當(dāng)前 M。
stopm();
}
static void stoplockedm(void)
{
// 上交 P,喚醒其他 M 執(zhí)行任務(wù)。
if(g->m->p) {
p = releasep();
handoffp(p);
}
// 休眠,直到被拿到 locked G 的 M 喚醒。
runtime·notesleep(&g->m->park);
runtime·noteclear(&g->m->park);
// 綁定對方交過來的 P。
acquirep(g->m->nextp);
g->m->nextp = nil;
}
static void goexit0(G *gp)
{
// 解除綁定。
gp->lockedm = nil;
g->m->lockedg = nil;
schedule();
}
在 cgo 里就用 lockOSThread 鎖定線程。
proc.c
static void lockOSThread(void)
{
g->m->lockedg = g;
g->lockedm = g->m;
}
static void unlockOSThread(void)
{
g->m->lockedg = nil;
g->lockedm = nil;
}
cgocall.go
func cgocall(fn, arg unsafe.Pointer) {
cgocall_errno(fn, arg)
}
func cgocall_errno(fn, arg unsafe.Pointer) int32 {
/*
* Lock g to m to ensure we stay on the same stack if we do a
* cgo callback. Add entry to defer stack in case of panic.
*/
lockOSThread()
mp := getg().m
mp.ncgocall++
mp.ncgo++
defer endcgo(mp)
/*
* Announce we are entering a system call
* so that the scheduler knows to create another
* M to run goroutines while we are in the
* foreign code.
*/
entersyscall()
errno := asmcgocall_errno(fn, arg) // 切換到 g0 stack 執(zhí)行。
exitsyscall()
return errno
}
func endcgo(mp *m) {
unlockOSThread()
}
調(diào)度器提供了兩種方式暫時中斷 G 任務(wù)執(zhí)行。
proc.go
func Gosched() {
mcall(gosched_m) // mcall 保存執(zhí)行現(xiàn)場到 G.sched,然后切換到 g0 棧。
}
proc.c
void runtime·gosched_m(G *gp)
{
// 將狀態(tài)從正在運行調(diào)整為可運行。
runtime·casgstatus(gp, Grunning, Grunnable);
// 將 G 重新放回全局隊列。
globrunqput(gp);
// 當(dāng)前 M 繼續(xù)查找并執(zhí)行其他任務(wù)。
schedule();
}
與 gosched 不同,gopark 并不會將 G 放回待運行隊列。
proc.go
func gopark(unlockf unsafe.Pointer, lock unsafe.Pointer, reason string) {
mcall(park_m)
}
proc.c
void runtime·park_m(G *gp)
{
// 修改狀態(tài)為等待。
runtime·casgstatus(gp, Grunning, Gwaiting);
// 當(dāng)前 M 繼續(xù)獲取并執(zhí)行其他任務(wù)。
schedule();
}
直到顯式調(diào)用 goready 將該 G 重新放回隊列。
proc.go
func goready(gp *g) {
onM(ready_m)
}
proc.c
void runtime·ready_m(void)
{
runtime·ready(gp);
}
void runtime·ready(G *gp)
{
// 修改狀態(tài)為可運行。
runtime·casgstatus(gp, Gwaiting, Grunnable);
// 將 G 重新放回本地待運行隊列。
runqput(g->m->p, gp);
// 喚醒某個 M 執(zhí)行任務(wù)。
if(runtime·atomicload(&runtime·sched.npidle) != 0 &&
runtime·atomicload(&runtime·sched.nmspinning) == 0)
wakep();
}
連續(xù)棧的地位被正式確定下來,Go 1.4 已經(jīng)移除了分段棧代碼。
相比較分段棧,連續(xù)棧結(jié)構(gòu)更簡單,實現(xiàn)算法也更加簡潔。除棧開始、結(jié)束地址外,只需維護(hù)一個用于溢出檢查的指針即可。
runtime.h
struct Stack
{
uintptr lo;
uintptr hi;
};
struct G
{
// stack describes the actual stack memory: [stack.lo, stack.hi).
// stackguard0 is the stack pointer compared in the Go stack growth prologue.
// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a
// preemption.
Stack stack;
uintptr stackguard0;
}
結(jié)構(gòu)示意圖:
在 stack.h 頭部棧結(jié)構(gòu)布局說明中有對 StackGuard 的詳細(xì)解釋。
stack.h
/*
The per-goroutine g->stackguard is set to point StackGuard bytes
above the bottom of the stack. Each function compares its stack
pointer against g->stackguard to check for overflow. To cut one
instruction from the check sequence for functions with tiny frames,
the stack is allowed to protrude StackSmall bytes below the stack
guard. Functions with large frames don't bother with the check and
always call morestack.
*/
StackGuard = 512 + StackSystem
在經(jīng)過幾個版本的反復(fù)調(diào)整后,棧默認(rèn)大小又回到 2048 字節(jié),這是個利好消息。
stack.h
StackMin = 2048,
proc.c
G* runtime·newproc1(FuncVal *fn, byte *argp, int32 narg, int32 nret, void *callerpc)
{
if((newg = gfget(p)) == nil) {
newg = runtime·malg(StackMin);
}
}
和內(nèi)存分配器的的做法類似,調(diào)度器會在 cache 上緩存 stack 對象。
malloc.h
// Number of orders that get caching. Order 0 is FixedStack
// and each successive order is twice as large.
NumStackOrders = 3,
runtime.h
struct StackFreeList
{
MLink *list; // linked list of free stacks
uintptr size; // total size of stacks in list
};
struct MCache
{
StackFreeList stackcache[NumStackOrders];
};
被緩存的 stack 依據(jù)大小分成 3 種 order,這與 FixedStack 值有很大關(guān)系。
stack.h
#ifdef GOOS_windows
StackSystem = 512 * sizeof(uintptr),
#else
#ifdef GOOS_plan9
StackSystem = 512,
#else
StackSystem = 0,
#endif // Plan 9
#endif // Windows
// The minimum stack size to allocate.
// The hackery here rounds FixedStack0 up to a power of 2.
FixedStack0 = StackMin + StackSystem,
FixedStack1 = FixedStack0 - 1,
FixedStack2 = FixedStack1 | (FixedStack1 >> 1),
FixedStack3 = FixedStack2 | (FixedStack2 >> 2),
FixedStack4 = FixedStack3 | (FixedStack3 >> 4),
FixedStack5 = FixedStack4 | (FixedStack4 >> 8),
FixedStack6 = FixedStack5 | (FixedStack5 >> 16),
FixedStack = FixedStack6 + 1,
對 Linux、darwin 等系統(tǒng)而言,F(xiàn)ixedStack 值和 StackMin 相同。 因此被緩存 stack大小分別是:
FiexedStack = 2048
FixedStack << order 0 = 2048
FixedStack << order 1 = 4096
FixedStack << order 2 = 8192
在確定相關(guān)結(jié)構(gòu)和參數(shù)后,看看 stack 具體的分配過程。
malloc.h
StackCacheSize = 32*1024, // Per-P, per order stack segment cache size.
stack.c
Stack runtime·stackalloc(uint32 n)
{
// 從復(fù)用鏈表分配,或直接從 heap.span 分配。
if(StackCache && n < FixedStack << NumStackOrders && n < StackCacheSize) {
// 計算對應(yīng)的 stack order。
order = 0;
n2 = n;
while(n2 > FixedStack) {
order++;
n2 >>= 1;
}
c = g->m->mcache;
if(c == nil || g->m->gcing || g->m->helpgc) {
// 從全局緩存分配。
x = poolalloc(order);
} else {
// 從本地 cache 分配。
x = c->stackcache[order].list;
// 如果本地沒有復(fù)用 stack,則從全局緩存轉(zhuǎn)移一批過來。
if(x == nil) {
stackcacherefill(c, order);
x = c->stackcache[order].list;
}
// 調(diào)整鏈表。
c->stackcache[order].list = x->next;
c->stackcache[order].size -= n;
}
v = (byte*)x;
} else {
// 直接從 heap.spans 分配。
s = runtime·MHeap_AllocStack(&runtime·mheap, ROUND(n, PageSize) >> PageShift);
v = (byte*)(s->start<<PageShift);
}
return (Stack){(uintptr)v, (uintptr)v+n};
}
整個過程和從 cache 分配 object 如出一轍。而在釋放時,調(diào)度器會主動將過多的復(fù)用對象從本地轉(zhuǎn)移到全局緩存。
stack.c
void runtime·stackfree(Stack stk)
{
n = stk.hi - stk.lo;
v = (void*)stk.lo;
if(StackCache && n < FixedStack << NumStackOrders && n < StackCacheSize) {
// 計算 order。
order = 0;
n2 = n;
while(n2 > FixedStack) {
order++;
n2 >>= 1;
}
x = (MLink*)v;
c = g->m->mcache;
if(c == nil || g->m->gcing || g->m->helpgc) {
// 歸還給全局緩存。
poolfree(x, order);
} else {
// 如果本地緩存超出容量限制,則歸還一批給全局緩存。
if(c->stackcache[order].size >= StackCacheSize)
stackcacherelease(c, order);
// 添加到 cache 本地鏈表。
x->next = c->stackcache[order].list;
c->stackcache[order].list = x;
c->stackcache[order].size += n;
}
} else {
// 歸還給 heap。
s = runtime·MHeap_Lookup(&runtime·mheap, v);
runtime·MHeap_FreeStack(&runtime·mheap, s);
}
}
全局緩存池基本上就是對 span 鏈表的操作,類似做法在內(nèi)存分配器章節(jié)早已見過。
stack.c
MSpan runtime·stackpool[NumStackOrders]; // 全局緩存。
void runtime·stackinit(void)
{
for(i = 0; i < NumStackOrders; i++)
runtime·MSpanList_Init(&runtime·stackpool[i]);
}
static MLink* poolalloc(uint8 order)
{
MSpan *list;
MSpan *s;
MLink *x;
list = &runtime·stackpool[order];
s = list->next;
if(s == list) {
// 如果沒有 stack 可用,則從 heap 獲取一個 span。
s = runtime·MHeap_AllocStack(&runtime·mheap, StackCacheSize >> PageShift);
// 切分。
for(i = 0; i < StackCacheSize; i += FixedStack << order) {
x = (MLink*)((s->start << PageShift) + i);
x->next = s->freelist;
s->freelist = x;
}
// 插入鏈表。
runtime·MSpanList_Insert(list, s);
}
x = s->freelist;
s->freelist = x->next;
s->ref++;
if(s->freelist == nil) {
// all stacks in s are allocated.
runtime·MSpanList_Remove(s);
}
return x;
}
static void poolfree(MLink *x, uint8 order)
{
MSpan *s;
s = runtime·MHeap_Lookup(&runtime·mheap, x);
if(s->freelist == nil) {
// 有對象歸還,自然要重新放回復(fù)用鏈表中。
runtime·MSpanList_Insert(&runtime·stackpool[order], s);
}
x->next = s->freelist;
s->freelist = x;
s->ref--;
// 如果該 span 內(nèi)存被全部收回,還給 heap。
if(s->ref == 0) {
runtime·MSpanList_Remove(s);
s->freelist = nil;
runtime·MHeap_FreeStack(&runtime·mheap, s);
}
}
相關(guān)的批量轉(zhuǎn)移和歸還操作也沒什么值得深究的。
stack.c
static void stackcacherefill(MCache *c, uint8 order)
{
MLink *x, *list;
uintptr size;
// Grab some stacks from the global cache.
// Grab half of the allowed capacity (to prevent thrashing).
list = nil;
size = 0;
while(size < StackCacheSize/2) {
x = poolalloc(order);
x->next = list;
list = x;
size += FixedStack << order;
}
c->stackcache[order].list = list;
c->stackcache[order].size = size;
}
static void stackcacherelease(MCache *c, uint8 order)
{
MLink *x, *y;
uintptr size;
x = c->stackcache[order].list;
size = c->stackcache[order].size;
while(size > StackCacheSize/2) {
y = x->next;
poolfree(x, order);
x = y;
size -= FixedStack << order;
}
c->stackcache[order].list = x;
c->stackcache[order].size = size;
}
連續(xù)棧的調(diào)整行為是由編譯器偷偷完成的。反匯編可執(zhí)行文件,你會看到編譯器會在函數(shù)頭部插入 morestack 調(diào)用,這是運行時檢查棧內(nèi)存的關(guān)鍵。
(gdb) disass
Dump of assembler code for function main.main:
0x000000000000207f <+15>:" call 0x2ddf0 <runtime.morestack_noctxt>
asm_amd64.s
TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
MOVL $0, DX
JMP runtime·morestack(SB)
TEXT runtime·morestack(SB),NOSPLIT,$0-0
MOVQ (g_sched+gobuf_sp)(BP), SP
CALL runtime·newstack(SB)
當(dāng)需要調(diào)整棧大小時,會調(diào)用 newstack 完成連續(xù)棧的重新分配。
stack.c
// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
void runtime·newstack(void)
{
gp = g->m->curg;
// 修改狀態(tài)。
runtime·casgstatus(gp, Grunning, Gwaiting);
gp->waitreason = runtime·gostringnocopy((byte*)"stack growth");
// 調(diào)整執(zhí)行現(xiàn)場參數(shù)。
runtime·rewindmorestack(&gp->sched);
// 新棧需要 2 倍空間。
oldsize = gp->stack.hi - gp->stack.lo;
newsize = oldsize * 2;
// 分配新 stack,并將數(shù)據(jù)拷貝到新站。
copystack(gp, newsize);
// 恢復(fù)狀態(tài),繼續(xù)執(zhí)行。
runtime·casgstatus(gp, Gwaiting, Grunning);
runtime·gogo(&gp->sched);
}
與分段棧相比,連續(xù)棧核心算法 copystack 非常簡單易懂。
stack.c
static void copystack(G *gp, uintptr newsize)
{
old = gp->stack;
used = old.hi - gp->sched.sp;
// 創(chuàng)建新棧。
new = runtime·stackalloc(newsize);
// ... 一些棧內(nèi)容調(diào)整操作 ...
// 拷貝棧數(shù)據(jù)。
runtime·memmove((byte*)new.hi - used, (byte*)old.hi - used, used);
// 切換到新棧。
gp->stack = new;
gp->stackguard0 = new.lo + StackGuard;
gp->sched.sp = new.hi - used;
// 釋放舊棧。
if(newsize > old.hi-old.lo) {
// 擴張, 立即釋放。
runtime·stackfree(old);
} else {
// 收縮操作有點復(fù)雜,因為原棧上的某些數(shù)據(jù)可能對垃圾回收器有用。
// 放到一個臨時隊列,等待垃圾回收器處理。
*(Stack*)old.lo = stackfreequeue;
stackfreequeue = old;
}
}
垃圾回收器會清理所有緩存,釋放掉臨時存儲的 stack,并收縮棧內(nèi)存。
mgc0.c
static void gc(struct gc_args *args)
{
runtime·shrinkfinish();
}
static void markroot(ParFor *desc, uint32 i)
{
switch(i) {
case RootData:
...
case RootFlushCaches:
flushallmcaches(); // 清理 cache。
break;
default:
gp = runtime·allg[i - RootCount];
runtime·shrinkstack(gp); // 收縮棧內(nèi)存。
break;
}
}
static void flushallmcaches(void)
{
// Flush MCache's to MCentral.
for(pp=runtime·allp; p=*pp; pp++) {
c = p->mcache;
runtime·MCache_ReleaseAll(c);
runtime·stackcache_clear(c); // 釋放 cache 里緩存的 stack。
}
}
stack.c
static Stack stackfreequeue;
// 清理臨時 stack。
void runtime·shrinkfinish(void)
{
s = stackfreequeue;
stackfreequeue = (Stack){0,0};
while(s.lo != 0) {
t = *(Stack*)s.lo;
runtime·stackfree(s);
s = t;
}
}
// 收縮棧內(nèi)存。
void runtime·shrinkstack(G *gp)
{
oldsize = gp->stack.hi - gp->stack.lo;
newsize = oldsize / 2;
if(newsize < FixedStack)
return; // don't shrink below the minimum-sized stack
used = gp->stack.hi - gp->sched.sp;
if(used >= oldsize / 4)
return; // still using at least 1/4 of the segment.
copystack(gp, newsize);
}
// 釋放所有 cache 持有的 stack 緩存。
void runtime·stackcache_clear(MCache *c)
{
for(order = 0; order < NumStackOrders; order++) {
x = c->stackcache[order].list;
while(x != nil) {
y = x->next;
poolfree(x, order);
x = y;
}
c->stackcache[order].list = nil;
c->stackcache[order].size = 0;
}
}
調(diào)度器完成 G 任務(wù)后,會將其放回復(fù)用列表,并釋放掉額外分配的棧內(nèi)存。
proc.c
static void gfput(P *p, G *gp)
{
stksize = gp->stack.hi - gp->stack.lo;
// 如果不是默認(rèn)棧,釋放。
if(stksize != FixedStack) {
runtime·stackfree(gp->stack);
gp->stack.lo = 0;
gp->stack.hi = 0;
gp->stackguard0 = 0;
}
}
還有,在減少 P 數(shù)量時,會釋放不再使用的關(guān)聯(lián) cache,這也會引發(fā) stack 清理操作。
proc.c
static void procresize(int32 new)
{
// free unused P's
for(i = new; i < old; i++) {
p = runtime·allp[i];
runtime·freemcache(p->mcache);
}
}
mcache.c
static void freemcache(MCache *c)
{
runtime·stackcache_clear(c);
}
官方一直在宣傳連續(xù)棧的好處,但實際性能表現(xiàn)和具體場景有關(guān),并非處處適宜。另外,緩存對象的確可以提升性能,但過多的緩存對象放在復(fù)用鏈表中,卻成為浪費和負(fù)擔(dān)。興許以后的版本會有更好的表現(xiàn)。
為支持并發(fā)調(diào)度,專門對 syscall、cgo 等操作進(jìn)行包裝,以便在長時間阻塞時能切換執(zhí)行其他任務(wù)。
src/syscall/asm_linux_amd64.s
TEXT ·Syscall(SB),NOSPLIT,$0-56
CALL runtime·entersyscall(SB)
MOVQ 16(SP), DI
MOVQ 24(SP), SI
MOVQ 32(SP), DX
MOVQ $0, R10
MOVQ $0, R8
MOVQ $0, R9
MOVQ 8(SP), AX // syscall entry
SYSCALL
CMPQ AX, $0xfffffffffffff001
JLS ok
MOVQ $-1, 40(SP) // r1
MOVQ $0, 48(SP) // r2
NEGQ AX
MOVQ AX, 56(SP) // errno
CALL runtime·exitsyscall(SB)
RET
ok:
MOVQ AX, 40(SP) // r1
MOVQ DX, 48(SP) // r2
MOVQ $0, 56(SP) // errno
CALL runtime·exitsyscall(SB)
RET
cgocall.go
func cgocall(fn, arg unsafe.Pointer) {
cgocall_errno(fn, arg)
}
func cgocall_errno(fn, arg unsafe.Pointer) int32 {
entersyscall()
errno := asmcgocall_errno(fn, arg)
exitsyscall()
return errno
}
進(jìn)入系統(tǒng)調(diào)用前保存執(zhí)行現(xiàn)場,這是任務(wù)切換的關(guān)鍵。
proc.c
void ·entersyscall(int32 dummy)
{
runtime·reentersyscall((uintptr)runtime·getcallerpc(&dummy),
runtime·getcallersp(&dummy));
}
void runtime·reentersyscall(uintptr pc, uintptr sp)
{
// 保存現(xiàn)場。
save(pc, sp);
g->syscallsp = sp;
g->syscallpc = pc;
runtime·casgstatus(g, Grunning, Gsyscall);
// 喚醒 sysmon 線程。
if(runtime·atomicload(&runtime·sched.sysmonwait)) {
fn = entersyscall_sysmon;
runtime·onM(&fn);
save(pc, sp);
}
// 解除任務(wù)關(guān)聯(lián)引用。
g->m->mcache = nil;
g->m->p->m = nil;
runtime·atomicstore(&g->m->p->status, Psyscall);
}
static void save(uintptr pc, uintptr sp)
{
g->sched.pc = pc;
g->sched.sp = sp;
g->sched.lr = 0;
g->sched.ret = 0;
g->sched.ctxt = 0;
g->sched.g = g;
}
必須確保 sysmon 線程運行,如此才能在長時間阻塞時,回收其關(guān)聯(lián) P 執(zhí)行其他任務(wù)。
proc.c
static void entersyscall_sysmon(void)
{
// 喚醒 sysmon M。
if(runtime·atomicload(&runtime·sched.sysmonwait)) {
runtime·atomicstore(&runtime·sched.sysmonwait, 0);
runtime·notewakeup(&runtime·sched.sysmonnote);
}
}
另有 entersyscallblock 會主動釋放 P,用于執(zhí)行可確定的長時間阻塞調(diào)用。
proc.c
void ·entersyscallblock(int32 dummy)
{
save((uintptr)runtime·getcallerpc(&dummy), runtime·getcallersp(&dummy));
g->syscallsp = g->sched.sp;
g->syscallpc = g->sched.pc;
runtime·casgstatus(g, Grunning, Gsyscall);
// 釋放關(guān)聯(lián) P。
fn = entersyscallblock_handoff;
runtime·onM(&fn);
save((uintptr)runtime·getcallerpc(&dummy), runtime·getcallersp(&dummy));
}
static void entersyscallblock_handoff(void)
{
// 釋放 P,讓其執(zhí)行其他任務(wù)。
handoffp(releasep());
}
從系統(tǒng)調(diào)用退出時,優(yōu)先檢查關(guān)聯(lián) P 是否還在。
proc.c
void ·exitsyscall(int32 dummy)
{
// 如果能關(guān)聯(lián) P。
if(exitsyscallfast()) {
runtime·casgstatus(g, Gsyscall, Grunning);
return;
}
fn = exitsyscall0;
runtime·mcall(&fn);
}
static bool exitsyscallfast(void)
{
// 如果關(guān)聯(lián) P 扔在,嘗試重新關(guān)聯(lián)。
if(g->m->p && g->m->p->status == Psyscall &&
runtime·cas(&g->m->p->status, Psyscall, Prunning)) {
g->m->mcache = g->m->p->mcache;
g->m->p->m = g->m;
return true;
}
// 嘗試關(guān)聯(lián)空閑 P。
g->m->p = nil;
if(runtime·sched.pidle) {
fn = exitsyscallfast_pidle;
runtime·onM(&fn);
if(g->m->scalararg[0]) {
g->m->scalararg[0] = 0;
return true;
}
}
return false;
}
static void exitsyscallfast_pidle(void)
{
p = pidleget();
if(p) {
acquirep(p);
}
}
如快速退出失敗,且無法獲取可用 P,那只能將當(dāng)前 G 任務(wù)放回待運行隊列。
proc.c
static void exitsyscall0(G *gp)
{
runtime·casgstatus(gp, Gsyscall, Grunnable);
// 獲取空閑 P。
p = pidleget();
// 如獲取 P 失敗,將當(dāng)前 G 放回全局隊列。
if(p == nil)
globrunqput(gp);
// 關(guān)聯(lián) P,繼續(xù)執(zhí)行。
if(p) {
acquirep(p);
execute(gp); // Never returns.
}
// 關(guān)聯(lián)失敗,休眠當(dāng)前 M。
stopm();
schedule(); // Never returns.
}
注:以 Raw 開頭的函數(shù)不使用包裝模式。
調(diào)度器使用專門線程跑系統(tǒng)監(jiān)控,主動完成那些長時間沒有觸發(fā)的事件。
// The main goroutine.
func main() {
onM(newsysmon)
}
proc.c
void runtime·newsysmon(void)
{
// 啟動獨立線程運行 sysmon。
newm(sysmon, nil);
}
監(jiān)控函數(shù) sysmon 循環(huán)運行所有檢查任務(wù)。
proc.c
static void sysmon(void)
{
// If we go two minutes without a garbage collection, force one to run.
forcegcperiod = 2*60*1e9;
// If a heap span goes unused for 5 minutes after a garbage collection,
// we hand it back to the operating system.
scavengelimit = 5*60*1e9;
// Make wake-up period small enough for the sampling to be correct.
maxsleep = forcegcperiod/2;
if(scavengelimit < forcegcperiod)
maxsleep = scavengelimit/2;
for(;;) {
if(idle == 0) // start with 20us sleep...
delay = 20;
else if(idle > 50) // start doubling the sleep after 1ms...
delay *= 2;
if(delay > 10*1000) // up to 10ms
delay = 10*1000;
// 根據(jù) idle 調(diào)整循環(huán)暫停時間。
runtime·usleep(delay);
// 如垃圾回收啟動,休眠 sysmon 線程。
if(runtime·debug.schedtrace <= 0 && (runtime·sched.gcwaiting ...)) {
if(runtime·atomicload(&runtime·sched.gcwaiting) || ...) {
// 設(shè)置標(biāo)志,休眠一段時間。
runtime·atomicstore(&runtime·sched.sysmonwait, 1);
runtime·notetsleep(&runtime·sched.sysmonnote, maxsleep);
// 喚醒后清除等待標(biāo)志。
runtime·atomicstore(&runtime·sched.sysmonwait, 0);
runtime·noteclear(&runtime·sched.sysmonnote);
idle = 0;
delay = 20;
}
}
// 如超過 10ms 沒處理 netpoll,立即獲取,并添加到任務(wù)隊列。
lastpoll = runtime·atomicload64(&runtime·sched.lastpoll);
if(lastpoll != 0 && lastpoll + 10*1000*1000 < now) {
runtime·cas64(&runtime·sched.lastpoll, lastpoll, now);
gp = runtime·netpoll(false); // non-blocking
if(gp) {
injectglist(gp);
}
}
// 收回因系統(tǒng)調(diào)用長時間阻塞的 P。
// 向長時間運行的 G 任務(wù)發(fā)出搶占調(diào)度通知。
if(retake(now))
idle = 0;
else
idle++;
// 如超過 2 分鐘未做垃圾回收,強制啟動。
lastgc = runtime·atomicload64(&mstats.last_gc);
if(lastgc != 0 && unixnow - lastgc > forcegcperiod && ...) {
// 將 forcegc.G 放回任務(wù)隊列,使其運行。
injectglist(runtime·forcegc.g);
}
// 釋放長時間閑置 span 物理內(nèi)存。
if(lastscavenge + scavengelimit/2 < now) {
runtime·MHeap_Scavenge(nscavenge, now, scavengelimit);
lastscavenge = now;
}
}
}
forcegc 和 scavenge 前面都已說過。retake 使用計數(shù)器判斷 syscall 或 G 任務(wù)的運行時間。
proc.c
struct Pdesc
{
uint32 schedtick; // scheduler execute 執(zhí)行次數(shù)。
int64 schedwhen;
uint32 syscalltick; // syscall 執(zhí)行次數(shù),在 exitsyscall 結(jié)束前遞增。
int64 syscallwhen;
};
static Pdesc pdesc[MaxGomaxprocs];
static uint32 retake(int64 now)
{
n = 0;
// 循環(huán)檢查所有 P。
for(i = 0; i < runtime·gomaxprocs; i++) {
p = runtime·allp[i];
if(p==nil) continue;
pd = &pdesc[i];
s = p->status;
if(s == Psyscall) {
// 如果和 pdesc 中的計數(shù)不等,表示啟動了新 syscall,刷新計數(shù)器。
// 再次 retake 時,如計數(shù)依然相等,表示依然阻塞在上次 syscall 中,
// 時間起碼超過一次 sysmon sleep (最少 20us)。
t = p->syscalltick;
if(pd->syscalltick != t) {
pd->syscalltick = t;
pd->syscallwhen = now;
continue;
}
// 如 P 沒有其他任務(wù),且沒超過 10ms,跳過。
if(p->runqhead == p->runqtail &&
runtime·atomicload(&runtime·sched.nmspinning) +
runtime·atomicload(&runtime·sched.npidle) > 0 &&
pd->syscallwhen + 10*1000*1000 > now)
continue;
// 收回被 syscall 阻塞的 P,用于執(zhí)行其他任務(wù)。
if(runtime·cas(&p->status, s, Pidle)) {
n++;
handoffp(p);
}
} else if(s == Prunning) {
// 計數(shù)不等,表示啟動新 G 任務(wù)執(zhí)行,刷新計數(shù)器。
// 再次 retake 時,如計數(shù)依然相等,表示該任務(wù)執(zhí)行時間超過一次 sysmon sleep 間隔。
t = p->schedtick;
if(pd->schedtick != t) {
pd->schedtick = t;
pd->schedwhen = now;
continue;
}
// 檢查超時 (10ms)。
if(pd->schedwhen + 10*1000*1000 > now)
continue;
// 設(shè)置搶占調(diào)度標(biāo)記。
preemptone(p);
}
}
return n;
}
前面說過 entersyscall 會保存現(xiàn)場,解除引用,因此 sysmon 可以安全拿回 P。調(diào)度器會積極嘗試讓這個 P 跑起來,這是它的責(zé)任。
proc.c
static void handoffp(P *p)
{
// if it has local work, start it straight away
if(p->runqhead != p->runqtail || runtime·sched.runqsize) {
startm(p, false);
return;
}
// no local work, check that there are no spinning/idle M's,
// otherwise our help is not required
if(runtime·atomicload(&runtime·sched.nmspinning) +
runtime·atomicload(&runtime·sched.npidle) == 0 &&
runtime·cas(&runtime·sched.nmspinning, 0, 1)){
startm(p, true);
return;
}
// gc
if(runtime·sched.gcwaiting) {
p->status = Pgcstop;
if(--runtime·sched.stopwait == 0)
runtime·notewakeup(&runtime·sched.stopnote);
return;
}
if(runtime·sched.runqsize) {
startm(p, false);
return;
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
if(runtime·sched.npidle == runtime·gomaxprocs-1 &&
runtime·atomicload64(&runtime·sched.lastpoll) != 0) {
startm(p, false);
return;
}
pidleput(p);
}
至于搶占調(diào)度通知不過是在 G 棧上設(shè)置一個標(biāo)志。類似操作,在很多地方都能看到。
proc.c
// Tell the goroutine running on processor P to stop.
static bool preemptone(P *p)
{
mp = p->m;
if(mp == nil || mp == g->m)
return false;
gp = mp->curg;
if(gp == nil || gp == mp->g0)
return false;
gp->preempt = true;
// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
gp->stackguard0 = StackPreempt;
return true;
}
實際的調(diào)度行為由編譯器插入到函數(shù)頭部的 morestack 引發(fā)。
asm_amd64.s
TEXT runtime·morestack(SB),NOSPLIT,$0-0
MOVQ (g_sched+gobuf_sp)(BP), SP
CALL runtime·newstack(SB)
stack.c
void runtime·newstack(void)
{
if(gp->stackguard0 == (uintptr)StackPreempt) {
// Act like goroutine called runtime.Gosched.
runtime·casgstatus(gp, Gwaiting, Grunning);
runtime·gosched_m(gp); // never return
}
}
可見搶占調(diào)度的前提是執(zhí)行其他非內(nèi)聯(lián)函數(shù)。如果任務(wù)跑沒有函數(shù)調(diào)用的無限循環(huán),那么 M/P 就會被一直霸占,最慘的是 GOMAXPROCS = 1,其他任務(wù)都會餓死。
使用用環(huán)境變量 GODEBUG="schedtrace=xxx" 輸出調(diào)度器跟蹤信息。
更詳細(xì)的信息,需指定 "scheddetail=1"。
$ GOMAXPROCS=2 GODEBUG="schedtrace=1000,scheddetail=1" ./test
SCHED 1002ms: gomaxprocs=2 idleprocs=0 threads=3 idlethreads=0 runqueue=0 ...
P0: status=1 schedtick=4 syscalltick=3 m=0 runqsize=51 gfreecnt=0
P1: status=1 schedtick=5 syscalltick=0 m=2 runqsize=50 gfreecnt=0
M2: p=1 curg=10 mallocing=0 throwing=0 gcing=0 locks=0 dying=0 helpgc=0 ...
M1: p=-1 curg=-1 mallocing=0 throwing=0 gcing=0 locks=1 dying=0 helpgc=0 ...
M0: p=0 curg=9 mallocing=0 throwing=0 gcing=0 locks=0 dying=0 helpgc=0 ...
G1: status=4(sleep) m=-1 lockedm=-1
G2: status=1() m=-1 lockedm=-1
G3: status=1() m=-1 lockedm=-1
相關(guān)代碼請參考 proc.c/runtime·schedtrace 函數(shù)。
Channel 是 Go 實現(xiàn) CSP 模型的關(guān)鍵,鼓勵用通訊來實現(xiàn)共享。
在具體實現(xiàn)上,類似 FIFO 隊列,多個 G 排隊等待收發(fā)操作。同步模式,從排隊鏈表中獲取一個能與之交換數(shù)據(jù)的對象;異步模式,圍繞數(shù)據(jù)緩沖區(qū)空位排隊。
先了解幾個基本的數(shù)據(jù)類型。
SudoG 對 G 進(jìn)行包裝,而 WaitQ 則是 SudoG 排隊鏈表。
runtime.h
struct SudoG
{
G* g;
uint32* selectdone;
SudoG* next;
SudoG* prev;
void* elem; // 發(fā)送或接收的數(shù)據(jù)。
int64 releasetime;
int32 nrelease; // -1 for acquire
SudoG* waitlink; // G.waiting list
};
chan.h
struct WaitQ
{
SudoG* first;
SudoG* last;
};
每個 channel 除發(fā)送和接收排隊鏈表外,還由一個環(huán)狀數(shù)據(jù)緩沖槽隊列。
chan.h
struct Hchan
{
uintgo qcount; // 緩沖數(shù)據(jù)項數(shù)量。
uintgo dataqsiz; // 緩沖槽數(shù)量。
byte* buf; // 緩沖區(qū)指針。
uint16 elemsize; // 數(shù)據(jù)項長度。
uint32 closed; // 關(guān)閉標(biāo)記。
Type* elemtype; // 數(shù)據(jù)項類型。
uintgo sendx; // 發(fā)送索引。
uintgo recvx; // 接收索引。
WaitQ recvq; // 等待接收 G 排隊鏈表。
WaitQ sendq; // 等待發(fā)送 G 排隊鏈表。
Mutex lock;
};
創(chuàng)建 channel 對象時,需要指定緩沖槽數(shù)量,同步模式為 0。
chan.go
const (
hchanSize = unsafe.Sizeof(hchan{}) +
uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
)
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// 數(shù)據(jù)項長度不能超過 64KB。
if elem.size >= 1<<16 {
gothrow("makechan: invalid channel element type")
}
var c *hchan
if elem.kind&kindNoPointers != 0 || size == 0 {
// 一次性分配 channel 和緩沖區(qū)內(nèi)存。
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*uintptr(elem.size), ...))
// 調(diào)整緩沖區(qū)指針。
if size > 0 && elem.size != 0 {
c.buf = (*uint8)(add(unsafe.Pointer(c), hchanSize))
} else {
c.buf = (*uint8)(unsafe.Pointer(c))
}
} else {
// 如果數(shù)據(jù)項是指針,單獨分配一個指針數(shù)組作為緩沖區(qū)。
c = new(hchan)
c.buf = (*uint8)(newarray(elem, uintptr(size)))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
同步和異步實現(xiàn)算法有很大差異。但不知什么原因,運行時開發(fā)人員硬將這些塞到同一個函數(shù)里。為閱讀方便,我們將其拆開說明。
同步收發(fā)操作的關(guān)鍵,是從排隊鏈表里找到一個合作者。找到,直接私下交換數(shù)據(jù);找不到,把自己打包成 SudoG,放到排隊鏈表里,然后休眠,直到被另一方喚醒。
參數(shù) ep 表示待發(fā)送或接收數(shù)據(jù)內(nèi)存指針。
chan.go
func chansend(t *chantype, c *hchan, ep unsafe.Pointer, ...) bool {
lock(&c.lock)
// --- 同步模式 ------------------------------------------------------
if c.dataqsiz == 0 {
// 從接收排隊鏈表查找接收者。
sg := c.recvq.dequeue()
if sg != nil {
// 找到合作者以后,就不再需要 channel 參與。
unlock(&c.lock)
recvg := sg.g
// 將數(shù)據(jù)拷貝給接收者。
if sg.elem != nil {
memmove(unsafe.Pointer(sg.elem), ep, uintptr(c.elemsize))
sg.elem = nil
}
// 將準(zhǔn)備喚醒的接收者 SudoG 保存到目標(biāo) G.param。
// 同步方式的參與雙方通過檢查該參數(shù)來確定是被另一方喚醒。
// 另外,channel select 用該參數(shù)獲知可用的 SudoG。
// 異步方式僅關(guān)心緩沖槽,并不需要有另一方配合,因此無需填寫該參數(shù)。
recvg.param = unsafe.Pointer(sg)
// 喚醒接收者。
// 注意,此時接收者已經(jīng)離開排隊鏈表,而且數(shù)據(jù)已經(jīng)完成拷貝。
goready(recvg)
return true
}
// 如果找不到接收者,那么打包成 SudoG。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
gp.param = nil
// 放到發(fā)送者排隊鏈表、阻塞。
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send")
// 被喚醒。檢查 param 參數(shù),確定是被接收者而不是 close 喚醒。
// 被喚醒前,數(shù)據(jù)已經(jīng)被接收者拷貝完成。
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
gothrow("chansend: spurious wakeup")
}
panic("send on closed channel")
}
gp.param = nil
releaseSudog(mysg)
return true
}
return true
}
同步接收和同步發(fā)送流程基本一致。
chan.go
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, ...) (selected, received bool) {
lock(&c.lock)
// --- 同步模式 ------------------------------------------------------
if c.dataqsiz == 0 {
// 從發(fā)送排隊鏈表找出一個發(fā)送者。
sg := c.sendq.dequeue()
if sg != nil {
// 撇開 channel,私下交易。
unlock(&c.lock)
// 拷貝數(shù)據(jù)。
if ep != nil {
memmove(ep, sg.elem, uintptr(c.elemsize))
}
sg.elem = nil
gp := sg.g
// 設(shè)置喚醒檢查參數(shù),喚醒發(fā)送者。
// 喚醒前,數(shù)據(jù)已經(jīng)完成拷貝。
gp.param = unsafe.Pointer(sg)
goready(gp)
selected = true
received = true
return
}
// 如果沒有發(fā)送者,打包成 SudoG。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
gp.param = nil
// 放到接收排隊鏈表、阻塞。
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive")
// 檢查是否被發(fā)送者喚醒,以確定數(shù)據(jù)可用。close 喚醒肯定不會有數(shù)據(jù)。
// 喚醒前,發(fā)送者已經(jīng)將數(shù)據(jù)拷貝到 ep。
haveData := gp.param != nil
gp.param = nil
releaseSudog(mysg)
if haveData {
selected = true
received = true
return
}
lock(&c.lock)
return recvclosed(c, ep)
}
}
簡單點說,要么主動找到合作方,要么去排隊等著被動喚醒,這是一個雙向過程。
SudoG 會被緩存到 cache,這個沒什么需要特別說明的。
malloc.h
struct MCache
{
SudoG* sudogcache;
};
proc.go
func acquireSudog() *sudog {
c := gomcache()
s := c.sudogcache
if s != nil {
c.sudogcache = s.next
s.next = nil
return s
}
mp := acquirem()
p := new(sudog)
releasem(mp)
return p
}
func releaseSudog(s *sudog) {
gp := getg()
c := gomcache()
s.next = c.sudogcache
c.sudogcache = s
}
同步關(guān)鍵是查找合作方,而異步關(guān)鍵則是緩沖區(qū)空槽。channel 使用 qcount、sendx、recvx 來維護(hù)一個環(huán)狀緩沖隊列。
chan.go
func chansend(t *chantype, c *hchan, ep unsafe.Pointer, ...) bool {
lock(&c.lock)
// --- 異步模式 ------------------------------------------------------
// 如果緩沖區(qū)沒有空槽。
for c.qcount >= c.dataqsiz {
// 打包成 SudoG。
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.g = gp
mysg.elem = nil
mysg.selectdone = nil
// 放入發(fā)送隊列,休眠。
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send")
// 被喚醒,釋放 SudoG。
// 被喚醒前,SudoG 已經(jīng)被某個接收者彈出排隊鏈表。
releaseSudog(mysg)
// ... 循環(huán)重試 ...
}
// 有空槽,直接將數(shù)據(jù)拷貝到緩沖區(qū)。
memmove(chanbuf(c, c.sendx), ep, uintptr(c.elemsize))
// 調(diào)整緩沖區(qū)參數(shù)。
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
// 發(fā)送操作完成。
// 緩沖區(qū)有了可用數(shù)據(jù),嘗試將某個接收者從排隊鏈表彈出,喚醒它處理數(shù)據(jù)。
sg := c.recvq.dequeue()
if sg != nil {
recvg := sg.g
unlock(&c.lock)
goready(recvg)
}
return true
}
將數(shù)據(jù)拷貝到緩沖區(qū),然后嘗試去喚醒某個接收者處理。同樣,接收者如果找不到可用的緩存數(shù)據(jù),會將自己放到排隊鏈表,等待某個發(fā)送者寫入數(shù)據(jù)后喚醒。
chan.go
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, ...) (selected, received bool) {
lock(&c.lock)
// --- 異步模式 ------------------------------------------------------
// 如果沒有可用緩存數(shù)據(jù)。
for c.qcount <= 0 {
// 打包成 SudoG.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = nil
mysg.g = gp
mysg.selectdone = nil
// 放到接收排隊鏈表、阻塞。
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive")
// 被喚醒。
// 被喚醒前,SudoG 已經(jīng)被彈出排隊鏈表。
releaseSudog(mysg)
lock(&c.lock)
// ... 循環(huán)重試 ...
}
// 如果有可用數(shù)據(jù)項,直接拷貝到 ep。
if ep != nil {
memmove(ep, chanbuf(c, c.recvx), uintptr(c.elemsize))
}
// 清除緩沖槽,調(diào)整緩沖區(qū)參數(shù)。
memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
// 接收完成,表示有空槽,嘗試喚醒某個發(fā)送者。
sg := c.sendq.dequeue()
if sg != nil {
gp := sg.g
unlock(&c.lock)
goready(gp)
}
selected = true
received = true
return
}
對于 nil channel,總是阻塞。而當(dāng)用 close 關(guān)閉 channel 時,會喚醒所有排隊者,讓它們處理完已有的操作,比如已排隊等待發(fā)送的數(shù)據(jù),或已經(jīng)寫入緩沖區(qū)的數(shù)據(jù)。
chan.go
func closechan(c *hchan) {
// 不要 close nil channel。
if c == nil {
panic("close of nil channel")
}
// 不要多次 close channal。
if c.closed != 0 {
panic("close of closed channel")
}
// 關(guān)閉標(biāo)志。
c.closed = 1
// 喚醒所有排隊的接收者。
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
gp := sg.g
sg.elem = nil
gp.param = nil // 如果是同步方式,表明是 closechan 喚醒。
goready(gp)
}
// 喚醒所有排隊的發(fā)送者。
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
gp := sg.g
sg.elem = nil
gp.param = nil
goready(gp)
}
}
總結(jié)規(guī)則如下:
發(fā)送:
編譯器會將所有 case 轉(zhuǎn)換為 Scase 對象,注冊到 select.scase,自然也包括 default。
chan.h
struct Scase
{
void* elem; // data element
Hchan* chan; // chan
uintptr pc; // return pc
uint16 kind;
uint16 so; // vararg of selected bool
bool* receivedp; // pointer to received bool (recv2)
int64 releasetime;
};
struct Select
{
uint16 tcase; // total count of scase[]
uint16 ncase; // currently filled scase[]
uint16* pollorder; // case poll order
Hchan** lockorder; // channel lock order
Scase scase[1]; // one per case (in order of appearance)
};
因為 case 語句數(shù)量是確定的,因此在初始化時,會一次性分配所需的全部內(nèi)存。
select.go
func newselect(sel *_select, selsize int64, size int32) {
// 確認(rèn)內(nèi)存長度。
if selsize != int64(selectsize(uintptr(size))) {
gothrow("bad select size")
}
sel.tcase = uint16(size)
sel.ncase = 0
// 確認(rèn)起始地址。
sel.lockorder = (**hchan)(add(unsafe.Pointer(&sel.scase),
uintptr(size)*unsafe.Sizeof(_select{}.scase[0])))
sel.pollorder = (*uint16)(add(unsafe.Pointer(sel.lockorder),
uintptr(size)*unsafe.Sizeof(*_select{}.lockorder)))
}
// 內(nèi)存組成 select + scase + lockerorder + pollorder。
func selectsize(size uintptr) uintptr {
selsize := unsafe.Sizeof(_select{}) +
(size-1)*unsafe.Sizeof(_select{}.scase[0]) + // Select 已經(jīng)有一個 scase[1]
size*unsafe.Sizeof(*_select{}.lockorder) +
size*unsafe.Sizeof(*_select{}.pollorder)
return round(selsize, _Int64Align)
}
內(nèi)存布局:
+----------+---------------+-------------------+-------------------+
| select | scase array | lockorder array | pollorder array |
+----------+---------------+-------------------+-------------------+
后兩成員是算法需要使用的排序表。pollorder 保存亂序后的 scase 序號,如此遍歷時就形成了隨機選擇。而 lockorder 對 case channel 地址排序,當(dāng)多處使用同一 channel時,可避免重復(fù)加鎖。
注冊函數(shù)并沒多少玄機,無非是通過 ncase 確定注冊位置。依據(jù) case channel 操作方式,分為 send、recv、default 三種類型。
select.go
func selectsend(sel *_select, c *hchan, elem unsafe.Pointer) (selected bool) {
if c != nil {
selectsendImpl(sel, c, getcallerpc(unsafe.Pointer(&sel)), elem,
uintptr(unsafe.Pointer(&selected))-uintptr(unsafe.Pointer(&sel)))
}
return
}
func selectsendImpl(sel *_select, c *hchan, pc uintptr, elem unsafe.Pointer, so uintptr)
{
// 當(dāng)前注冊位置。
i := sel.ncase
// 判斷是否超出限制。
if i >= sel.tcase {
gothrow("selectsend: too many cases")
}
// 下一注冊位置。
sel.ncase = i + 1
// 通過當(dāng)前注冊位置,獲取 scase 指針。
cas := (*scase)(add(unsafe.Pointer(&sel.scase),
uintptr(i)*unsafe.Sizeof(sel.scase[0])))
cas.pc = pc
cas._chan = c
cas.so = uint16(so)
cas.kind = _CaseSend
cas.elem = elem
}
選擇算法的實現(xiàn)有些復(fù)雜,又是一個超長函數(shù),充斥大量的 goto。
select.go
func selectgo(sel *_select) {
pc, offset := selectgoImpl(sel)
}
func selectgoImpl(sel *_select) (uintptr, uint16) {
scaseslice := sliceStruct{unsafe.Pointer(&sel.scase), int(sel.ncase), ...}
scases := *(*[]scase)(unsafe.Pointer(&scaseslice))
// 填充 pollorder,然后洗牌形成亂序。
pollorder := *(*[]uint16)(unsafe.Pointer(&pollslice))
...
// 將 case channel 按地址排序。
lockorder := *(*[]*hchan)(unsafe.Pointer(&lockslice))
...
// 鎖定全部 channel。
sellock(sel)
loop:
// 1: 查找已準(zhǔn)備好的 case。
for i := 0; i < int(sel.ncase); i++ {
// 使用 pollorder 返回 case,這就是 select 隨機選擇的關(guān)鍵。
cas = &scases[pollorder[i]]
c = cas._chan
switch cas.kind {
case _CaseRecv:
if c.dataqsiz > 0 { // 異步
if c.qcount > 0 { // 有緩沖數(shù)據(jù)
goto asyncrecv
}
} else { // 同步
sg = c.sendq.dequeue()
if sg != nil { // 有接收者
goto syncrecv
}
}
if c.closed != 0 { // 關(guān)閉
goto rclose
}
case _CaseSend:
if c.closed != 0 {
goto sclose
}
if c.dataqsiz > 0 {
if c.qcount < c.dataqsiz {
goto asyncsend
}
} else {
sg = c.recvq.dequeue()
if sg != nil {
goto syncsend
}
}
case _CaseDefault:
dfl = cas
}
}
// 如沒有準(zhǔn)備好的 case,嘗試執(zhí)行 default。
if dfl != nil {
selunlock(sel)
cas = dfl
goto retc
}
// 2: 如果沒有任何準(zhǔn)備好的 case ...
// 打包 SudoG,放到所有 channel 排隊鏈表,等待喚醒。
gp = getg()
done = 0
for i := 0; i < int(sel.ncase); i++ {
cas = &scases[pollorder[i]]
c = cas._chan
// 創(chuàng)建 SudoG。
sg := acquireSudog()
sg.g = gp
sg.elem = cas.elem
sg.waitlink = gp.waiting
gp.waiting = sg // 全部 SudoG 鏈表。
// 將 SudoG 放到 channel 排隊鏈表。
switch cas.kind {
case _CaseRecv:
c.recvq.enqueue(sg)
case _CaseSend:
c.sendq.enqueue(sg)
}
}
// 休眠,等待喚醒。
gp.param = nil
gopark(unsafe.Pointer(funcPC(selparkcommit)), unsafe.Pointer(sel), "select")
// 因所有 channel SudoG 都使用當(dāng)前 G,所以可被任何 channel 操作喚醒。
sellock(sel)
sg = (*sudog)(gp.param) // 同步時指向被喚醒的 SudoG,異步為 nil。
// 3: 找出被喚醒的 case channel。
cas = nil
sglist = gp.waiting
// 遍歷檢查被喚醒的 SudoG 是否是第 2 步創(chuàng)建的。
// 注意,sglist 和 pollorder 順序一致。
for i := int(sel.ncase) - 1; i >= 0; i-- {
k = &scases[pollorder[i]]
// 如果屬于 ...
if sg == sglist {
// 同步喚醒。
cas = k
} else {
// 不屬于,取消排隊。
c = k._chan
if k.kind == _CaseSend {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
// 異步喚醒,回到第 1 步處理。
if cas == nil {
goto loop
}
// 同步方式下,在被喚醒前,數(shù)據(jù)已經(jīng)完成交換,直接結(jié)束即可。
selunlock(sel)
goto retc
// 下面這些代碼在前一節(jié)已經(jīng)說過,此處忽略。
asyncrecv:
asyncsend:
syncrecv:
rclose:
syncsend:
retc:
return cas.pc, cas.so
sclose:
// send on closed channel
selunlock(sel)
panic("send on closed channel")
}
簡化后的流程看上去清爽多了。
每次操作都對所有 channel 加鎖,是個不小的代價。
select.go
func sellock(sel *_select) {
var c *hchan
for _, c0 := range lockorder {
// 如果和前一 channel 地址相同,則無需加鎖。
if c0 != nil && c0 != c {
c = c0
lock(&c.lock)
}
}
}
func selunlock(sel *_select) {
n := int(sel.ncase)
r := 0
// 因為 default case 的 channel 為 nil,排序后總是在 lockorder[0],跳過。
if n > 0 && lockorder[0] == nil {
r = 1
}
for i := n - 1; i >= r; i-- {
c := lockorder[i]
if i > 0 && c == lockorder[i-1] {
continue // will unlock it on the next iteration
}
unlock(&c.lock)
}
}
如果在 select 語句外套上循環(huán),那就意味著每次循環(huán)都要創(chuàng)建對象,完成注冊、洗牌、排序、選擇等一大堆操作。
反編譯簡單示例,看看 defer 的真實面目。
package main
import ()
func main() {
x := 0x100
defer println(x)
}
(gdb) disas main.main
Dump of assembler code for function main.main:
0x0000000000002016 <+22>:" sub rsp,0x8
0x000000000000201a <+26>:" mov rcx,0x100
0x0000000000002021 <+33>:" mov QWORD PTR [rsp],rcx
0x0000000000002025 <+37>:" lea rcx,[rip+0x4a5fc] # 0x4c628 <main.print.1.f>
0x000000000000202c <+44>:" push rcx
0x000000000000202d <+45>:" push 0x8
0x000000000000202f <+47>:" call 0xad80 <runtime.deferproc>
0x0000000000002034 <+52>:" pop rcx
0x0000000000002035 <+53>:" pop rcx
0x0000000000002036 <+54>:" test rax,rax
0x0000000000002039 <+57>:" jne 0x2046 <main.main+70>
0x000000000000203b <+59>:" nop
0x000000000000203c <+60>:" call 0xb490 <runtime.deferreturn>
0x0000000000002041 <+65>:" add rsp,0x8
0x0000000000002045 <+69>:" ret
不算太復(fù)雜,編譯器將其處理成 deferproc 和 deferreturn 兩個函數(shù)。
panic.go
func deferproc(siz int32, fn *funcval) {
// 編譯器依次將 args、fn、siz 入棧。
// 通過 fn 在棧上的地址,確認(rèn) args。
argp := uintptr(unsafe.Pointer(&fn))
argp += unsafe.Sizeof(fn)
mp := acquirem()
mp.scalararg[0] = uintptr(siz)
mp.ptrarg[0] = unsafe.Pointer(fn)
mp.scalararg[1] = argp
mp.scalararg[2] = getcallerpc(unsafe.Pointer(&siz))
onM(deferproc_m)
releasem(mp)
}
panic.c
void runtime·deferproc_m(void)
{
siz = g->m->scalararg[0];
fn = g->m->ptrarg[0];
argp = g->m->scalararg[1];
callerpc = g->m->scalararg[2];
d = runtime·newdefer(siz);
d->fn = fn;
d->pc = callerpc;
d->argp = argp;
// 將參數(shù)拷貝到 defer.argp。
runtime·memmove(d+1, (void*)argp, siz);
}
依照 Defer 結(jié)構(gòu)和內(nèi)存對齊,指針運算 "d+1" 就是 argp,只是這寫法真的好嗎?
runtime.h
struct Defer
{
int32 siz;
bool started;
uintptr argp; // where args were copied from
uintptr pc;
FuncVal* fn;
Panic* panic; // panic that is running defer
Defer* link;
};
和以往一樣,Defer 會被復(fù)用。
runtime.h
struct P
{
Defer* deferpool[5];
};
panic.go
func newdefer(siz int32) *_defer {
var d *_defer
// 按 16 字節(jié)對齊后計算長度索引。
sc := deferclass(uintptr(siz))
mp := acquirem()
// 復(fù)用 P.deferpool[5] 里的 Defer 對象。
if sc < uintptr(len(p{}.deferpool)) {
pp := mp.p
d = pp.deferpool[sc]
if d != nil {
pp.deferpool[sc] = d.link
}
}
// 超出長度限制的,直接分配。
if d == nil {
// Allocate new defer+args.
total := goroundupsize(totaldefersize(uintptr(siz)))
d = (*_defer)(mallocgc(total, deferType, 0))
}
d.siz = siz
gp := mp.curg
// 添加到鏈表。
d.link = gp._defer
gp._defer = d
releasem(mp)
return d
}
所有鏈表被保存到 G.defer 鏈表。
runtime.h
struct G
{
Defer* defer;
};
在函數(shù)退出前,deferreturn 完成 Defer 調(diào)用。
panic.go
func deferreturn(arg0 uintptr) {
gp := getg()
// 從鏈表提取一個 Defer。
d := gp._defer
if d == nil {
return
}
// 調(diào)用 deferproc 后,會 pop 掉 siz、fn,那么 arg0 就是 argp。
// 如果地址不等,顯然就屬于無效調(diào)用。
argp := uintptr(unsafe.Pointer(&arg0))
if d.argp != argp {
return
}
mp := acquirem()
// 復(fù)制參數(shù)。
// 很無語,這又出了一個 deferArgs,和前面的 d+1 一個意思。
// 這真的是同一個人寫的代碼?
memmove(unsafe.Pointer(argp), deferArgs(d), uintptr(d.siz))
fn := d.fn
d.fn = nil
gp._defer = d.link
freedefer(d)
releasem(mp)
// 執(zhí)行 defer 函數(shù)。
jmpdefer(fn, argp)
}
匯編函數(shù) jmpdefer 很有意思。
asm_amd64.s
// void jmpdefer(fn, sp);
// called from deferreturn.
// 1. pop the caller
// 2. sub 5 bytes from the callers return
// 3. jmp to the argument
TEXT runtime·jmpdefer(SB), NOSPLIT, $0-16
MOVQ fv+0(FP), DX // fn
MOVQ argp+8(FP), BX // caller sp
LEAQ -8(BX), SP // caller sp after CALL
SUBQ $5, (SP) // return to CALL again
MOVQ 0(DX), BX
JMP BX // but first run the deferred function
簡單點說就是找出 "call deferreturn" 時入棧的 PC 寄存器地址。
0x000000000000203c <+60>:" call 0xb490 <runtime.deferreturn>
0x0000000000002041 <+65>:" add rsp,0x8
因 PC 寄存器指向下一條指令,那么棧上值應(yīng)該就是 0x2041,減去 call 指令長度 5,結(jié)果就是 0x203c。將此地址入棧,等 jmpdefer 結(jié)束,deferreturn RET 指令所恢復(fù)的PC 寄存器值就又回到了 "call deferreturn"。配合對 argp 地址檢查,就實現(xiàn)了函數(shù)內(nèi)多個 Defer 的調(diào)用。
如果調(diào)用 Goexit 終止 goroutine,那么直接循環(huán)調(diào)用鏈上的所有 Defer 即可。
panic.go
func Goexit() {
// Run all deferred functions for the current goroutine.
gp := getg()
for {
d := gp._defer
if d == nil {
break
}
if d.started {
d.fn = nil
gp._defer = d.link
freedefer(d)
continue
}
d.started = true
reflectcall(unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
d._panic = nil
d.fn = nil
gp._defer = d.link
freedefer(d)
}
goexit()
}
回過頭想想,一個完整 defer 過程要處理緩存對象,參數(shù)拷貝,以及多次函數(shù)調(diào)用,顯然要比直接函數(shù)調(diào)用慢得多。
var lock sync.Mutex
func test() {
lock.Lock()
lock.Unlock()
}
func testdefer() {
lock.Lock()
defer lock.Unlock()
}
func BenchmarkTest(b *testing.B) {
for i := 0; i < b.N; i++ {
test()
}
}
func BenchmarkTestDefer(b *testing.B) {
for i := 0; i < b.N; i++ {
testdefer()
}
}
BenchmarkTest" 30000000 43.5 ns/op
BenchmarkTestDefer 10000000 211 ns/op
這對于 CPU 密集型算法有很大影響,需區(qū)別對待。
Finalizer 用途類似析構(gòu)函數(shù),在關(guān)聯(lián)對象被回收時執(zhí)行。
malloc.go
func SetFinalizer(obj interface{}, finalizer interface{}) {
// object 類型信息。
e := (*eface)(unsafe.Pointer(&obj))
etyp := e._type
ot := (*ptrtype)(unsafe.Pointer(etyp))
// 忽略 nil 對象。
_, base, _ := findObject(e.data)
if base == nil {
if e.data == unsafe.Pointer(&zerobase) {
return
}
}
// finalizer 函數(shù)類型信息。
f := (*eface)(unsafe.Pointer(&finalizer))
ftyp := f._type
// 如果 finalizer 為 nil,清除。
if ftyp == nil {
// switch to M stack and remove finalizer
mp := acquirem()
mp.ptrarg[0] = e.data
onM(removeFinalizer_m)
releasem(mp)
return
}
// 確定 finalizer goroutine 啟動。
// 所有可執(zhí)行的 finalizer 都由該 goroutine 執(zhí)行。
createfing()
// 添加 finalizer 記錄。
mp := acquirem()
mp.ptrarg[0] = f.data
mp.ptrarg[1] = e.data
mp.scalararg[0] = nret
mp.ptrarg[2] = unsafe.Pointer(fint)
mp.ptrarg[3] = unsafe.Pointer(ot)
onM(setFinalizer_m)
releasem(mp)
}
malloc.c
void runtime·setFinalizer_m(void)
{
fn = g->m->ptrarg[0];
arg = g->m->ptrarg[1];
nret = g->m->scalararg[0];
fint = g->m->ptrarg[2];
ot = g->m->ptrarg[3];
g->m->scalararg[0] = runtime·addfinalizer(arg, fn, nret, fint, ot);
}
相關(guān)信息打包成 SpecialFinalizer 對象,添加到關(guān)聯(lián)對象所在 span.specials 鏈表。
malloc.h
struct Special
{
Special* next; // linked list in span
uint16 offset; // span offset of object
byte kind; // kind of Special
};
struct SpecialFinalizer
{
Special special;
FuncVal* fn;
uintptr nret;
Type* fint;
PtrType* ot;
};
struct MSpan
{
Special *specials; // linked list of special records sorted by offset.
};
mheap.c
bool runtime·addfinalizer(void *p, FuncVal *f, uintptr nret, Type *fint, PtrType *ot)
{
SpecialFinalizer *s;
// 創(chuàng)建 finalizer special 對象。
s = runtime·FixAlloc_Alloc(&runtime·mheap.specialfinalizeralloc);
s->special.kind = KindSpecialFinalizer;
s->fn = f;
s->nret = nret;
s->fint = fint;
s->ot = ot;
// 添加到待執(zhí)行隊列。
// 雖然傳遞 s->special,但因地址相同,可轉(zhuǎn)換回 SpecialFinalizer。
if(addspecial(p, &s->special))
return true;
// 添加失敗,表示已存在,放棄。
runtime·FixAlloc_Free(&runtime·mheap.specialfinalizeralloc, s);
return false;
}
static bool addspecial(void *p, Special *s)
{
// 查找 p 所在 span。
span = runtime·MHeap_LookupMaybe(&runtime·mheap, p);
// 確保該 span 已經(jīng)完成垃圾清理。
runtime·MSpan_EnsureSwept(span);
offset = (uintptr)p - (span->start << PageShift);
kind = s->kind;
// 使用 offset、kind 檢查該 special 是否已經(jīng)存在。
t = &span->specials;
while((x = *t) != nil) {
if(offset == x->offset && kind == x->kind) {
return false; // already exists
}
if(offset < x->offset || (offset == x->offset && kind < x->kind))
break;
t = &x->next;
}
// 添加到 span.specials 鏈表。
s->offset = offset;
s->next = x;
*t = s;
return true;
}
當(dāng)執(zhí)行垃圾清理操作時,會檢查 span.specials 鏈表。如果關(guān)聯(lián)對象可以被回收,那么就將 finalizer 放到執(zhí)行隊列。
mgc0.c
bool runtime·MSpan_Sweep(MSpan *s, bool preserve)
{
specialp = &s->specials;
special = *specialp;
while(special != nil) {
// 通過 bitmap 檢查 finalizer 關(guān)聯(lián)對象狀態(tài)。
p = (byte*)(s->start << PageShift) + special->offset/size*size;
off = (uintptr*)p - (uintptr*)arena_start;
bitp = arena_start - off/wordsPerBitmapByte - 1;
shift = (off % wordsPerBitmapByte) * gcBits;
bits = (*bitp>>shift) & bitMask;
// 如果是不可達(dá)對象。
if((bits&bitMarked) == 0) {
// 對象地址。
p = (byte*)(s->start << PageShift) + special->offset;
y = special;
// 從鏈表移除。
special = special->next;
*specialp = special;
// 將 finalizer 添加到執(zhí)行隊列,釋放 special。
if(!runtime·freespecial(y, p, size, false)) {
// 將關(guān)聯(lián)對象標(biāo)記為可達(dá)狀態(tài)。
*bitp |= bitMarked << shift;
}
} else {
// 存活對象,保持 special record。
specialp = &special->next;
special = *specialp;
}
}
}
注意,finalizer 會導(dǎo)致關(guān)聯(lián)對象重新變成可達(dá)狀態(tài),也就是說不會被清理操作回收。這是為了保證在 finalizer 函數(shù)內(nèi)能安全訪問關(guān)聯(lián)對象。待下次回收時,finalizer 已不存在,關(guān)聯(lián)對象就可被正常收回。
mheap.c
bool runtime·freespecial(Special *s, void *p, uintptr size, bool freed)
{
SpecialFinalizer *sf;
switch(s->kind) {
case KindSpecialFinalizer:
// 轉(zhuǎn)換回 SpecialFinalizer,放到執(zhí)行隊列。
sf = (SpecialFinalizer*)s;
runtime·queuefinalizer(p, sf->fn, sf->nret, sf->fint, sf->ot);
// Special 已經(jīng)從 span.specials 移除,回收。
runtime·FixAlloc_Free(&runtime·mheap.specialfinalizeralloc, sf);
return false; // don't free p until finalizer is done
}
}
執(zhí)行隊列 FinBlock 保存多個待執(zhí)行的 FianlizerSpecial,而全局變量 finq 用鏈表管理多個 FinBlock。
FinBlock 和其他類型一樣,被緩存、復(fù)用。
malloc.h
struct FinBlock
{
FinBlock *alllink;
FinBlock *next;
int32 cnt;
int32 cap;
Finalizer fin[1];
};
mgc0.c
FinBlock* runtime·finq; // list of finalizers that are to be executed
FinBlock* runtime·finc; // cache of free blocks
mgc0.c
void runtime·queuefinalizer(byte *p, FuncVal *fn, uintptr nret, Type *fint, PtrType *ot)
{
// 檢查是否需要新建 FinBlock。
if(runtime·finq == nil || runtime·finq->cnt == runtime·finq->cap) {
// 如果復(fù)用鏈表為空,新建。
if(runtime·finc == nil) {
runtime·finc = runtime·persistentalloc(FinBlockSize, 0, &mstats.gc_sys);
runtime·finc->cap = (FinBlockSize - sizeof(FinBlock)) / sizeof(Finalizer)+1;
runtime·finc->alllink = runtime·allfin;
runtime·allfin = runtime·finc;
}
block = runtime·finc;
runtime·finc = block->next;
block->next = runtime·finq;
runtime·finq = block;
}
// 添加到 FinBlock 隊列。
f = &runtime·finq->fin[runtime·finq->cnt];
runtime·finq->cnt++;
f->fn = fn;
f->nret = nret;
f->fint = fint;
f->ot = ot;
f->arg = p;
// 有了新任務(wù),設(shè)置喚醒標(biāo)記。
runtime·fingwake = true;
}
在準(zhǔn)備好執(zhí)行隊列后,由專門的 goroutine fing 完成最終執(zhí)行操作。
mgc0.c
G* runtime·fing; // goroutine that runs finalizers
malloc.go
var fingCreate uint32
func createfing() {
// 僅執(zhí)行一次。
if fingCreate == 0 && cas(&fingCreate, 0, 1) {
go runfinq()
}
}
func runfinq() {
for {
// 置換全局隊列。
fb := finq
finq = nil
// 如果隊列為空,休眠。
if fb == nil {
gp := getg()
fing = gp
fingwait = true // 休眠標(biāo)記。
gp.issystem = true
goparkunlock(&finlock, "finalizer wait")
gp.issystem = false
continue
}
// 循環(huán)處理所有 FinBlock。
for fb != nil {
// 循環(huán)處理 FinBlock 隊列里所有 FinalizerSpecial。
for i := int32(0); i < fb.cnt; i++ {
// 執(zhí)行 finalizer 函數(shù)。
f := (*finalizer)(add(unsafe.Pointer(&fb.fin),
uintptr(i)*unsafe.Sizeof(finalizer{})))
reflectcall(unsafe.Pointer(f.fn), frame, uint32(framesz),
uint32(framesz))
// 解除引用。
f.fn = nil
f.arg = nil
f.ot = nil
}
fb.cnt = 0
next := fb.next
// 將當(dāng)前 FinBlock 放回復(fù)用鏈表。
fb.next = finc
finc = fb
fb = next
}
}
}
如執(zhí)行隊列為空,fing 會被休眠。然后在 M 查找任務(wù)時,嘗試喚醒。
proc.c
// Finds a runnable goroutine to execute.
static G* findrunnable(void)
{
// 如果 fing 被休眠,且喚醒標(biāo)記為真,那么執(zhí)行。
if(runtime·fingwait && runtime·fingwake && (gp = runtime·wakefing()) != nil)
runtime·ready(gp);
}
mgc0.c
G* runtime·wakefing(void)
{
G *res;
// 如正在休眠,且喚醒標(biāo)記為真,返回 fing。
if(runtime·fingwait && runtime·fingwake) {
runtime·fingwait = false; // 取消相關(guān)標(biāo)記。
runtime·fingwake = false;
res = runtime·fing; // 返回 fing。
}
return res;
}
更多建議: