併發

有人把 Go 比作 21 世紀的 C 語言,第一是因為 Go 語言設計簡單,第二,21 世紀最重要的就是並行程式設計,而 Go 從語言層面就支援了並行。

goroutine

goroutine 是 Go 並行設計的核心。goroutine 說到底其實就是協程 (Coroutine),但是它比執行緒更小,十幾個 goroutine 可能體現在底層就是五六個執行緒,Go 語言內部幫你實現了這些 goroutine 之間的記憶體共享。執行 goroutine 只需極少的棧記憶體(大概是 4~5KB),當然會根據相應的資料伸縮。也正因為如此,可同時執行成千上萬個併發任務。goroutine 比 thread 更易用、更高效、更輕便。

goroutine 是透過 Go 的 runtime 管理的一個執行緒管理器。goroutine 透過 go 關鍵字實現了,其實就是一個普通的函式。

go hello(a, b, c)

透過關鍵字 go 就啟動了一個 goroutine。我們來看一個例子

package main

import (
    "fmt"
    "runtime"
)

func say(s string) {
    for i := 0; i < 5; i++ {
        runtime.Gosched()
        fmt.Println(s)
    }
}

func main() {
    go say("world") //開一個新的 Goroutines 執行
    say("hello") //當前 Goroutines 執行
}

// 以上程式執行後將輸出:
// hello
// world
// hello
// world
// hello
// world
// hello
// world
// hello

我們可以看到 go 關鍵字很方便的就實現了併發程式設計。 上面的多個 goroutine 執行在同一個程序裡面,共享記憶體資料,不過設計上我們要遵循:不要透過共享來通訊,而要透過通訊來共享。

runtime.Gosched()表示讓 CPU 把時間片讓給別人,下次某個時候繼續恢復執行該 goroutine。

預設情況下,在 Go 1.5 將標識併發系統執行緒個數的 runtime.GOMAXPROCS 的初始值由 1 改為了執行環境的 CPU 核數。

但在 Go 1.5 以前排程器僅使用單執行緒,也就是說只實現了併發。想要發揮多核處理器的並行,需要在我們的程式中明確的呼叫 runtime.GOMAXPROCS(n) 告訴排程器同時使用多個執行緒。GOMAXPROCS 設定了同時執行邏輯程式碼的系統執行緒的最大數量,並回傳之前的設定。如果 n < 1,不會改變當前設定。

channels

goroutine 執行在相同的地址空間,因此存取共享記憶體必須做好同步。那麼 goroutine 之間如何進行資料的通訊呢,Go 提供了一個很好的通訊機制 channel。channel 可以與 Unix shell 中的雙向管道做類別比:可以透過它傳送或者接收值。這些值只能是特定的型別:channel 型別。定義一個 channel 時,也需要定義傳送到 channel 的值的型別。注意,必須使用 make 建立 channel:

ci := make(chan int)
cs := make(chan string)
cf := make(chan interface{})

channel 透過運算子<-來接收和傳送資料

ch <- v    // 傳送 v 到 channel ch.
v := <-ch  // 從 ch 中接收資料,並賦值給 v

我們把這些應用到我們的例子中來:

package main

import "fmt"

func sum(a []int, c chan int) {
    total := 0
    for _, v := range a {
        total += v
    }
    c <- total  // send total to c
}

func main() {
    a := []int{7, 2, 8, -9, 4, 0}

    c := make(chan int)
    go sum(a[:len(a)/2], c)
    go sum(a[len(a)/2:], c)
    x, y := <-c, <-c  // receive from c

    fmt.Println(x, y, x + y)
}

預設情況下,channel 接收和傳送資料都是阻塞的,除非另一端已經準備好,這樣就使得 Goroutines 同步變的更加的簡單,而不需要明確的 lock。所謂阻塞,也就是如果讀取(value := <-ch)它將會被阻塞,直到有資料接收。其次,任何傳送(ch<-5)將會被阻塞,直到資料被讀出。無緩衝 channel 是在多個 goroutine 之間同步很棒的工具。

Buffered Channels

上面我們介紹了預設的非快取型別的 channel,不過 Go 也允許指定 channel 的緩衝大小,很簡單,就是 channel 可以儲存多少元素。ch:= make(chan bool, 4),建立了可以儲存 4 個元素的 bool 型 channel。在這個 channel 中,前 4 個元素可以無阻塞的寫入。當寫入第 5 個元素時,程式碼將會阻塞,直到其他 goroutine 從 channel 中讀取一些元素,騰出空間。

ch := make(chan type, value)

當 value = 0 時,channel 是無緩衝阻塞讀寫的,當 value > 0 時,channel 有緩衝、是非阻塞的,直到寫滿 value 個元素才阻塞寫入。

我們看一下下面這個例子,你可以在自己本機測試一下,修改相應的 value 值

package main

import "fmt"

func main() {
    c := make(chan int, 2)//修改 2 為 1 就報錯,修改 2 為 3 可以正常執行
    c <- 1
    c <- 2
    fmt.Println(<-c)
    fmt.Println(<-c)
}
        //修改為 1 報如下的錯誤:
        //fatal error: all goroutines are asleep - deadlock!

Range 和 Close

上面這個例子中,我們需要讀取兩次 c,這樣不是很方便,Go 考慮到了這一點,所以也可以透過 range,像操作 slice 或者 map 一樣操作快取型別的 channel,請看下面的例子

package main

import (
    "fmt"
)

func fibonacci(n int, c chan int) {
    x, y := 1, 1
    for i := 0; i < n; i++ {
        c <- x
        x, y = y, x + y
    }
    close(c)
}

func main() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)
    for i := range c {
        fmt.Println(i)
    }
}

for i := range c能夠不斷的讀取 channel 裡面的資料,直到該 channel 被明確的關閉。上面程式碼我們看到可以明確的關閉 channel,生產者透過內建函式 close 關閉 channel。關閉 channel 之後就無法再發送任何資料了,在消費方可以透過語法v, ok := <-ch測試 channel 是否被關閉。如果 ok 回傳 false,那麼說明 channel 已經沒有任何資料並且已經被關閉。

記住應該在生產者的地方關閉 channel,而不是消費的地方去關閉它,這樣容易引起 panic

另外記住一點的就是 channel 不像檔案之類別的,不需要經常去關閉,只有當你確實沒有任何傳送資料了,或者你想明確的結束 range 迴圈之類別的

Select

我們上面介紹的都是隻有一個 channel 的情況,那麼如果存在多個 channel 的時候,我們該如何操作呢,Go 裡面提供了一個關鍵字select,透過 select 可以監聽 channel 上的資料流動。

select 預設是阻塞的,只有當監聽的 channel 中有傳送或接收可以進行時才會執行,當多個 channel 都準備好的時候,select 會隨機選擇其中一個執行。

package main

import "fmt"

func fibonacci(c, quit chan int) {
    x, y := 1, 1
    for {
        select {
        case c <- x:
            x, y = y, x + y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}

func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

select 裡面還有 default 語法,select其實就是類似 switch 的功能,default 就是當監聽的 channel 都沒有準備好的時候,預設執行的(select 不再阻塞等待 channel)。

select {
case i := <-c:
    // use i
default:
    // 當 c 阻塞的時候執行這裡
}

超時

有時候會出現 goroutine 阻塞的情況,那麼我們如何避免整個程式進入阻塞的情況呢?我們可以利用 select 來設定超時,透過如下的方式實現:

func main() {
    c := make(chan int)
    o := make(chan bool)
    go func() {
        for {
            select {
                case v := <- c:
                    println(v)
                case <- time.After(5 * time.Second):
                    println("timeout")
                    o <- true
                    break
            }
        }
    }()
    <- o
}

runtime goroutine

runtime 套件中有幾個處理 goroutine 的函式:

  • Goexit

    退出當前執行的 goroutine,但是 defer 函式還會繼續呼叫

  • Gosched

    讓出當前 goroutine 的執行許可權,排程器安排其他等待的任務執行,並在下次某個時候從該位置恢復執行。

  • NumCPU

    回傳 CPU 核數量

  • NumGoroutine

    回傳正在執行和排隊的任務總數

  • GOMAXPROCS

    用來設定可以平行計算的 CPU 核數的最大值,並回傳之前的值。

Last updated