原文鏈接:https://gopl-zh.github.io/ch8/ch8-04.html
如果說goroutine是Go語言程序的并發(fā)體的話,那么channels則是它們之間的通信機(jī)制。一個channel是一個通信機(jī)制,它可以讓一個goroutine通過它給另一個goroutine發(fā)送值信息。每個channel都有一個特殊的類型,也就是channels可發(fā)送數(shù)據(jù)的類型。一個可以發(fā)送int類型數(shù)據(jù)的channel一般寫為chan int。
使用內(nèi)置的make函數(shù),我們可以創(chuàng)建一個channel:
ch := make(chan int) // ch has type 'chan int'
和map類似,channel也對應(yīng)一個make創(chuàng)建的底層數(shù)據(jù)結(jié)構(gòu)的引用。當(dāng)我們復(fù)制一個channel或用于函數(shù)參數(shù)傳遞時,我們只是拷貝了一個channel引用,因此調(diào)用者和被調(diào)用者將引用同一個channel對象。和其它的引用類型一樣,channel的零值也是nil。
兩個相同類型的channel可以使用==運(yùn)算符比較。如果兩個channel引用的是相同的對象,那么比較的結(jié)果為真。一個channel也可以和nil進(jìn)行比較。
一個channel有發(fā)送和接受兩個主要操作,都是通信行為。一個發(fā)送語句將一個值從一個goroutine通過channel發(fā)送到另一個執(zhí)行接收操作的goroutine。發(fā)送和接收兩個操作都使用<-
運(yùn)算符。在發(fā)送語句中,<-
運(yùn)算符分割channel和要發(fā)送的值。在接收語句中,<-
運(yùn)算符寫在channel對象之前。一個不使用接收結(jié)果的接收操作也是合法的。
ch <- x // a send statement
x = <-ch // a receive expression in an assignment statement
<-ch // a receive statement; result is discarded
Channel還支持close操作,用于關(guān)閉channel,隨后對基于該channel的任何發(fā)送操作都將導(dǎo)致panic異常。對一個已經(jīng)被close過的channel進(jìn)行接收操作依然可以接受到之前已經(jīng)成功發(fā)送的數(shù)據(jù);如果channel中已經(jīng)沒有數(shù)據(jù)的話將產(chǎn)生一個零值的數(shù)據(jù)。
使用內(nèi)置的close函數(shù)就可以關(guān)閉一個channel:
close(ch)
以最簡單方式調(diào)用make函數(shù)創(chuàng)建的是一個無緩存的channel,但是我們也可以指定第二個整型參數(shù),對應(yīng)channel的容量。如果channel的容量大于零,那么該channel就是帶緩存的channel。
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3
我們將先討論無緩存的channel,然后在8.4.4節(jié)討論帶緩存的channel。
一個基于無緩存Channels的發(fā)送操作將導(dǎo)致發(fā)送者goroutine阻塞,直到另一個goroutine在相同的Channels上執(zhí)行接收操作,當(dāng)發(fā)送的值通過Channels成功傳輸之后,兩個goroutine可以繼續(xù)執(zhí)行后面的語句。反之,如果接收操作先發(fā)生,那么接收者goroutine也將阻塞,直到有另一個goroutine在相同的Channels上執(zhí)行發(fā)送操作。
基于無緩存Channels的發(fā)送和接收操作將導(dǎo)致兩個goroutine做一次同步操作。因?yàn)檫@個原因,無緩存Channels有時候也被稱為同步Channels。當(dāng)通過一個無緩存Channels發(fā)送數(shù)據(jù)時,接收者收到數(shù)據(jù)發(fā)生在再次喚醒發(fā)送者goroutine之前(譯注:happens before,這是Go語言并發(fā)內(nèi)存模型的一個關(guān)鍵術(shù)語!)。
在討論并發(fā)編程時,當(dāng)我們說x事件在y事件之前發(fā)生(happens before),我們并不是說x事件在時間上比y時間更早;我們要表達(dá)的意思是要保證在此之前的事件都已經(jīng)完成了,例如在此之前的更新某些變量的操作已經(jīng)完成,你可以放心依賴這些已完成的事件了。
當(dāng)我們說x事件既不是在y事件之前發(fā)生也不是在y事件之后發(fā)生,我們就說x事件和y事件是并發(fā)的。這并不是意味著x事件和y事件就一定是同時發(fā)生的,我們只是不能確定這兩個事件發(fā)生的先后順序。在下一章中我們將看到,當(dāng)兩個goroutine并發(fā)訪問了相同的變量時,我們有必要保證某些事件的執(zhí)行順序,以避免出現(xiàn)某些并發(fā)問題。
在8.3節(jié)的客戶端程序,它在主goroutine中(譯注:就是執(zhí)行main函數(shù)的goroutine)將標(biāo)準(zhǔn)輸入復(fù)制到server,因此當(dāng)客戶端程序關(guān)閉標(biāo)準(zhǔn)輸入時,后臺goroutine可能依然在工作。我們需要讓主goroutine等待后臺goroutine完成工作后再退出,我們使用了一個channel來同步兩個goroutine:
gopl.io/ch8/netcat3
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // NOTE: ignoring errors
log.Println("done")
done <- struct{}{} // signal the main goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // wait for background goroutine to finish
}
當(dāng)用戶關(guān)閉了標(biāo)準(zhǔn)輸入,主goroutine中的mustCopy函數(shù)調(diào)用將返回,然后調(diào)用conn.Close()關(guān)閉讀和寫方向的網(wǎng)絡(luò)連接。關(guān)閉網(wǎng)絡(luò)連接中的寫方向的連接將導(dǎo)致server程序收到一個文件(end-of-file)結(jié)束的信號。關(guān)閉網(wǎng)絡(luò)連接中讀方向的連接將導(dǎo)致后臺goroutine的io.Copy函數(shù)調(diào)用返回一個“read from closed connection”(“從關(guān)閉的連接讀”)類似的錯誤,因此我們臨時移除了錯誤日志語句;在練習(xí)8.3將會提供一個更好的解決方案。(需要注意的是go語句調(diào)用了一個函數(shù)字面量,這是Go語言中啟動goroutine常用的形式。)
在后臺goroutine返回之前,它先打印一個日志信息,然后向done對應(yīng)的channel發(fā)送一個值。主goroutine在退出前先等待從done對應(yīng)的channel接收一個值。因此,總是可以在程序退出前正確輸出“done”消息。
基于channels發(fā)送消息有兩個重要方面。首先每個消息都有一個值,但是有時候通訊的事實(shí)和發(fā)生的時刻也同樣重要。當(dāng)我們更希望強(qiáng)調(diào)通訊發(fā)生的時刻時,我們將它稱為消息事件。有些消息事件并不攜帶額外的信息,它僅僅是用作兩個goroutine之間的同步,這時候我們可以用struct{}
空結(jié)構(gòu)體作為channels元素的類型,雖然也可以使用bool或int類型實(shí)現(xiàn)同樣的功能,done <- 1
語句也比done <- struct{}{}
更短。
練習(xí) 8.3: 在netcat3例子中,conn雖然是一個interface類型的值,但是其底層真實(shí)類型是*net.TCPConn
,代表一個TCP連接。一個TCP連接有讀和寫兩個部分,可以使用CloseRead和CloseWrite方法分別關(guān)閉它們。修改netcat3的主goroutine代碼,只關(guān)閉網(wǎng)絡(luò)連接中寫的部分,這樣的話后臺goroutine可以在標(biāo)準(zhǔn)輸入被關(guān)閉后繼續(xù)打印從reverb1服務(wù)器傳回的數(shù)據(jù)。(要在reverb2服務(wù)器也完成同樣的功能是比較困難的;參考練習(xí) 8.4。)
Channels也可以用于將多個goroutine連接在一起,一個Channel的輸出作為下一個Channel的輸入。這種串聯(lián)的Channels就是所謂的管道(pipeline)。下面的程序用兩個channels將三個goroutine串聯(lián)起來,如圖8.1所示。
第一個goroutine是一個計數(shù)器,用于生成0、1、2、……形式的整數(shù)序列,然后通過channel將該整數(shù)序列發(fā)送給第二個goroutine;第二個goroutine是一個求平方的程序,對收到的每個整數(shù)求平方,然后將平方后的結(jié)果通過第二個channel發(fā)送給第三個goroutine;第三個goroutine是一個打印程序,打印收到的每個整數(shù)。為了保持例子清晰,我們有意選擇了非常簡單的函數(shù),當(dāng)然三個goroutine的計算很簡單,在現(xiàn)實(shí)中確實(shí)沒有必要為如此簡單的運(yùn)算構(gòu)建三個goroutine。
gopl.io/ch8/pipeline1
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()
// Squarer
go func() {
for {
x := <-naturals
squares <- x * x
}
}()
// Printer (in main goroutine)
for {
fmt.Println(<-squares)
}
}
如您所料,上面的程序?qū)⑸?、1、4、9、……形式的無窮數(shù)列。像這樣的串聯(lián)Channels的管道(Pipelines)可以用在需要長時間運(yùn)行的服務(wù)中,每個長時間運(yùn)行的goroutine可能會包含一個死循環(huán),在不同goroutine的死循環(huán)內(nèi)部使用串聯(lián)的Channels來通信。但是,如果我們希望通過Channels只發(fā)送有限的數(shù)列該如何處理呢?
如果發(fā)送者知道,沒有更多的值需要發(fā)送到channel的話,那么讓接收者也能及時知道沒有多余的值可接收將是有用的,因?yàn)榻邮照呖梢酝V共槐匾慕邮盏却?。這可以通過內(nèi)置的close函數(shù)來關(guān)閉channel實(shí)現(xiàn):
close(naturals)
當(dāng)一個channel被關(guān)閉后,再向該channel發(fā)送數(shù)據(jù)將導(dǎo)致panic異常。當(dāng)一個被關(guān)閉的channel中已經(jīng)發(fā)送的數(shù)據(jù)都被成功接收后,后續(xù)的接收操作將不再阻塞,它們會立即返回一個零值。關(guān)閉上面例子中的naturals變量對應(yīng)的channel并不能終止循環(huán),它依然會收到一個永無休止的零值序列,然后將它們發(fā)送給打印者goroutine。
沒有辦法直接測試一個channel是否被關(guān)閉,但是接收操作有一個變體形式:它多接收一個結(jié)果,多接收的第二個結(jié)果是一個布爾值ok,ture表示成功從channels接收到值,false表示channels已經(jīng)被關(guān)閉并且里面沒有值可接收。使用這個特性,我們可以修改squarer函數(shù)中的循環(huán)代碼,當(dāng)naturals對應(yīng)的channel被關(guān)閉并沒有值可接收時跳出循環(huán),并且也關(guān)閉squares對應(yīng)的channel.
// Squarer
go func() {
for {
x, ok := <-naturals
if !ok {
break // channel was closed and drained
}
squares <- x * x
}
close(squares)
}()
因?yàn)樯厦娴恼Z法是笨拙的,而且這種處理模式很常見,因此Go語言的range循環(huán)可直接在channels上面迭代。使用range循環(huán)是上面處理模式的簡潔語法,它依次從channel接收數(shù)據(jù),當(dāng)channel被關(guān)閉并且沒有值可接收時跳出循環(huán)。
在下面的改進(jìn)中,我們的計數(shù)器goroutine只生成100個含數(shù)字的序列,然后關(guān)閉naturals對應(yīng)的channel,這將導(dǎo)致計算平方數(shù)的squarer對應(yīng)的goroutine可以正常終止循環(huán)并關(guān)閉squares對應(yīng)的channel。(在一個更復(fù)雜的程序中,可以通過defer語句關(guān)閉對應(yīng)的channel。)最后,主goroutine也可以正常終止循環(huán)并退出程序。
gopl.io/ch8/pipeline2
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
// Squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()
// Printer (in main goroutine)
for x := range squares {
fmt.Println(x)
}
}
其實(shí)你并不需要關(guān)閉每一個channel。只有當(dāng)需要告訴接收者goroutine,所有的數(shù)據(jù)已經(jīng)全部發(fā)送時才需要關(guān)閉channel。不管一個channel是否被關(guān)閉,當(dāng)它沒有被引用時將會被Go語言的垃圾自動回收器回收。(不要將關(guān)閉一個打開文件的操作和關(guān)閉一個channel操作混淆。對于每個打開的文件,都需要在不使用的時候調(diào)用對應(yīng)的Close方法來關(guān)閉文件。)
試圖重復(fù)關(guān)閉一個channel將導(dǎo)致panic異常,試圖關(guān)閉一個nil值的channel也將導(dǎo)致panic異常。關(guān)閉一個channels還會觸發(fā)一個廣播機(jī)制,我們將在8.9節(jié)討論。
隨著程序的增長,人們習(xí)慣于將大的函數(shù)拆分為小的函數(shù)。我們前面的例子中使用了三個goroutine,然后用兩個channels來連接它們,它們都是main函數(shù)的局部變量。將三個goroutine拆分為以下三個函數(shù)是自然的想法:
func counter(out chan int)
func squarer(out, in chan int)
func printer(in chan int)
其中計算平方的squarer函數(shù)在兩個串聯(lián)Channels的中間,因此擁有兩個channel類型的參數(shù),一個用于輸入一個用于輸出。兩個channel都擁有相同的類型,但是它們的使用方式相反:一個只用于接收,另一個只用于發(fā)送。參數(shù)的名字in和out已經(jīng)明確表示了這個意圖,但是并無法保證squarer函數(shù)向一個in參數(shù)對應(yīng)的channel發(fā)送數(shù)據(jù)或者從一個out參數(shù)對應(yīng)的channel接收數(shù)據(jù)。
這種場景是典型的。當(dāng)一個channel作為一個函數(shù)參數(shù)時,它一般總是被專門用于只發(fā)送或者只接收。
為了表明這種意圖并防止被濫用,Go語言的類型系統(tǒng)提供了單方向的channel類型,分別用于只發(fā)送或只接收的channel。類型chan<- int
表示一個只發(fā)送int的channel,只能發(fā)送不能接收。相反,類型<-chan int
表示一個只接收int的channel,只能接收不能發(fā)送。(箭頭<-
和關(guān)鍵字chan的相對位置表明了channel的方向。)這種限制將在編譯期檢測。
因?yàn)殛P(guān)閉操作只用于斷言不再向channel發(fā)送新的數(shù)據(jù),所以只有在發(fā)送者所在的goroutine才會調(diào)用close函數(shù),因此對一個只接收的channel調(diào)用close將是一個編譯錯誤。
這是改進(jìn)的版本,這一次參數(shù)使用了單方向channel類型:
gopl.io/ch8/pipeline3
func counter(out chan<- int) {
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals)
go squarer(squares, naturals)
printer(squares)
}
調(diào)用counter(naturals)時,naturals的類型將隱式地從chan int轉(zhuǎn)換成chan<- int。調(diào)用printer(squares)也會導(dǎo)致相似的隱式轉(zhuǎn)換,這一次是轉(zhuǎn)換為<-chan int
類型只接收型的channel。任何雙向channel向單向channel變量的賦值操作都將導(dǎo)致該隱式轉(zhuǎn)換。這里并沒有反向轉(zhuǎn)換的語法:也就是不能將一個類似chan<- int
類型的單向型的channel轉(zhuǎn)換為chan int
類型的雙向型的channel。
帶緩存的Channel內(nèi)部持有一個元素隊(duì)列。隊(duì)列的最大容量是在調(diào)用make函數(shù)創(chuàng)建channel時通過第二個參數(shù)指定的。下面的語句創(chuàng)建了一個可以持有三個字符串元素的帶緩存Channel。圖8.2是ch變量對應(yīng)的channel的圖形表示形式。
ch = make(chan string, 3)
向緩存Channel的發(fā)送操作就是向內(nèi)部緩存隊(duì)列的尾部插入元素,接收操作則是從隊(duì)列的頭部刪除元素。如果內(nèi)部緩存隊(duì)列是滿的,那么發(fā)送操作將阻塞直到因另一個goroutine執(zhí)行接收操作而釋放了新的隊(duì)列空間。相反,如果channel是空的,接收操作將阻塞直到有另一個goroutine執(zhí)行發(fā)送操作而向隊(duì)列插入元素。
我們可以在無阻塞的情況下連續(xù)向新創(chuàng)建的channel發(fā)送三個值:
ch <- "A"
ch <- "B"
ch <- "C"
此刻,channel的內(nèi)部緩存隊(duì)列將是滿的(圖8.3),如果有第四個發(fā)送操作將發(fā)生阻塞。
如果我們接收一個值,
fmt.Println(<-ch) // "A"
那么channel的緩存隊(duì)列將不是滿的也不是空的(圖8.4),因此對該channel執(zhí)行的發(fā)送或接收操作都不會發(fā)生阻塞。通過這種方式,channel的緩存隊(duì)列解耦了接收和發(fā)送的goroutine。
在某些特殊情況下,程序可能需要知道channel內(nèi)部緩存的容量,可以用內(nèi)置的cap函數(shù)獲?。?
fmt.Println(cap(ch)) // "3"
同樣,對于內(nèi)置的len函數(shù),如果傳入的是channel,那么將返回channel內(nèi)部緩存隊(duì)列中有效元素的個數(shù)。因?yàn)樵诓l(fā)程序中該信息會隨著接收操作而失效,但是它對某些故障診斷和性能優(yōu)化會有幫助。
fmt.Println(len(ch)) // "2"
在繼續(xù)執(zhí)行兩次接收操作后channel內(nèi)部的緩存隊(duì)列將又成為空的,如果有第四個接收操作將發(fā)生阻塞:
fmt.Println(<-ch) // "B"
fmt.Println(<-ch) // "C"
在這個例子中,發(fā)送和接收操作都發(fā)生在同一個goroutine中,但是在真實(shí)的程序中它們一般由不同的goroutine執(zhí)行。Go語言新手有時候會將一個帶緩存的channel當(dāng)作同一個goroutine中的隊(duì)列使用,雖然語法看似簡單,但實(shí)際上這是一個錯誤。Channel和goroutine的調(diào)度器機(jī)制是緊密相連的,如果沒有其他goroutine從channel接收,發(fā)送者——或許是整個程序——將會面臨永遠(yuǎn)阻塞的風(fēng)險。如果你只是需要一個簡單的隊(duì)列,使用slice就可以了。
下面的例子展示了一個使用了帶緩存channel的應(yīng)用。它并發(fā)地向三個鏡像站點(diǎn)發(fā)出請求,三個鏡像站點(diǎn)分散在不同的地理位置。它們分別將收到的響應(yīng)發(fā)送到帶緩存channel,最后接收者只接收第一個收到的響應(yīng),也就是最快的那個響應(yīng)。因此mirroredQuery函數(shù)可能在另外兩個響應(yīng)慢的鏡像站點(diǎn)響應(yīng)之前就返回了結(jié)果。(順便說一下,多個goroutines并發(fā)地向同一個channel發(fā)送數(shù)據(jù),或從同一個channel接收數(shù)據(jù)都是常見的用法。)
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
func request(hostname string) (response string) { /* ... */ }
如果我們使用了無緩存的channel,那么兩個慢的goroutines將會因?yàn)闆]有人接收而被永遠(yuǎn)卡住。這種情況,稱為goroutines泄漏,這將是一個BUG。和垃圾變量不同,泄漏的goroutines并不會被自動回收,因此確保每個不再需要的goroutine能正常退出是重要的。
關(guān)于無緩存或帶緩存channels之間的選擇,或者是帶緩存channels的容量大小的選擇,都可能影響程序的正確性。無緩存channel更強(qiáng)地保證了每個發(fā)送操作與相應(yīng)的同步接收操作;但是對于帶緩存channel,這些操作是解耦的。同樣,即使我們知道將要發(fā)送到一個channel的信息的數(shù)量上限,創(chuàng)建一個對應(yīng)容量大小的帶緩存channel也是不現(xiàn)實(shí)的,因?yàn)檫@要求在執(zhí)行任何接收操作之前緩存所有已經(jīng)發(fā)送的值。如果未能分配足夠的緩存將導(dǎo)致程序死鎖。
Channel的緩存也可能影響程序的性能。想象一家蛋糕店有三個廚師,一個烘焙,一個上糖衣,還有一個將每個蛋糕傳遞到它下一個廚師的生產(chǎn)線。在狹小的廚房空間環(huán)境,每個廚師在完成蛋糕后必須等待下一個廚師已經(jīng)準(zhǔn)備好接受它;這類似于在一個無緩存的channel上進(jìn)行溝通。
如果在每個廚師之間有一個放置一個蛋糕的額外空間,那么每個廚師就可以將一個完成的蛋糕臨時放在那里而馬上進(jìn)入下一個蛋糕的制作中;這類似于將channel的緩存隊(duì)列的容量設(shè)置為1。只要每個廚師的平均工作效率相近,那么其中大部分的傳輸工作將是迅速的,個體之間細(xì)小的效率差異將在交接過程中彌補(bǔ)。如果廚師之間有更大的額外空間——也是就更大容量的緩存隊(duì)列——將可以在不停止生產(chǎn)線的前提下消除更大的效率波動,例如一個廚師可以短暫地休息,然后再加快趕上進(jìn)度而不影響其他人。
另一方面,如果生產(chǎn)線的前期階段一直快于后續(xù)階段,那么它們之間的緩存在大部分時間都將是滿的。相反,如果后續(xù)階段比前期階段更快,那么它們之間的緩存在大部分時間都將是空的。對于這類場景,額外的緩存并沒有帶來任何好處。
生產(chǎn)線的隱喻對于理解channels和goroutines的工作機(jī)制是很有幫助的。例如,如果第二階段是需要精心制作的復(fù)雜操作,一個廚師可能無法跟上第一個廚師的進(jìn)度,或者是無法滿足第三階段廚師的需求。要解決這個問題,我們可以再雇傭另一個廚師來幫助完成第二階段的工作,他執(zhí)行相同的任務(wù)但是獨(dú)立工作。這類似于基于相同的channels創(chuàng)建另一個獨(dú)立的goroutine。
我們沒有太多的空間展示全部細(xì)節(jié),但是gopl.io/ch8/cake包模擬了這個蛋糕店,可以通過不同的參數(shù)調(diào)整。它還對上面提到的幾種場景提供對應(yīng)的基準(zhǔn)測試(§11.4) 。
![]() | ![]() |
更多建議: