Go 并发

Oct 20, 2021 23:30 · 3307 words · 7 minute read Golang

用通信来共享

并发编程是一个很大的话题,我们只说 Go 特有的闪光点。

由于需要实现对共享变量的正确访问是很微妙的,并发编程在诸多环境中都不简单。Go 鼓励的是一种截然不同的方式,共享的值在通道中传递,实际上,永不被执行的线程主动共享。在某个特定的时间只有一条 goroutine 能够访问那个值。设计上数据竞争就不会发生。为了鼓励这种思维方式,我们将其简化为:

Do not communicate by sharing memory; instead, share memory by communicating.(不要通过共享内存来通信,而应该通过通信来共享内存。)

这种方式可能走的太远了。例如,引用计数最好在一个变量附近搞一个 mutex(互斥锁)来完成。但是作为高级别的方式,使用通道来控制访问更容易写出清晰、正确的代码。

在一个 CPU 上跑一个典型的单线程程序,完全不需要同步原语。再运行一个这样的实例;也不需要同步原语。现在让这两个程序通信;如果通信本身是同步器,那就无需其他同步方式了。举个栗子。Unix 管道完美适合这个模型。尽管 Go 的并发方式起源于霍尔的 Communicating Sequential Processes(CSP),它也可以被视为 Unix 管道一种类型安全的泛化。

Goroutine

之所以被叫做 goroutine 是因为现有的术语——线程、协程、进程之类无法准确地传达内涵。Goroutine 有简洁的模型:在相同的地址空间和其他 goroutine 并发地执行函数。它非常轻量,成本只比分配堆栈空间多一点点。堆栈开始时很小,因此它们很“便宜”,并且随着堆存储的分配增长。

Goroutine 在多条 OS 线程上多路复用,所以如果某个阻塞了,比如等待 I/O,其他的会继续运行。它们的设计掩盖了线程创建和管理的复杂性。

使用 go 关键字作为前缀在一个新的 goroutine 中调用函数。当调用完成,goroutine 静默退出。(效果有点像 Unix shell 用于在后台执行命令的 & 符号。)

go list.Sort()  // run list.Sort concurrently; don't wait for it.

函数字面量可以很方便的在 goroutine 中调用。

func Announce(message string, delay time.Duration) {
    go func() {
        time.Sleep(delay)
        fmt.Println(message)
    }()  // Note the parentheses - must call the function.
}

在 Go 中,函数字面量都是闭包:该实现确保函数引用的变量持续存活。

这些实例不怎么实用因为函数没有发出完成信号。为此我们需要通道。

通道(Channel)

和 map 一样,通道由 make 分配,返回值作为对底层数据结构的引用。通过提供可选参数设置通道的缓冲容量。默认为 0,即通道是无缓冲或同步的。

ci := make(chan int)            // unbuffered channel of integers
cj := make(chan int, 0)         // unbuffered channel of integers
cs := make(chan *os.File, 100)  // buffered channel of pointers to Files

无缓冲通道将通信(值的传递)与同步相结合——保证两边计算(goroutine)都处于已知的状态。

有不少使用通道的场景,我们从第一个开始。上一节我们在后台启动了一个排序,通道能够允许启动的 goroutine 等待排序完成。

c := make(chan int)  // Allocate a channel.
// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
    list.Sort()
    c <- 1  // Send a signal; value does not matter.
}()
doSomethingForAWhile()
<-c   // Wait for sort to finish; discard sent value.

接收者会阻塞直到有值过来。如果通道没有缓冲,发送者会阻塞直到值被接收;如果通道有缓冲,发送者只会在值被复制到缓冲的期间阻塞;如果缓冲区满了,发送者也要等待直到有接收者收下一个值。

带缓冲的通道可以像信号量(semaphore)一样使用,比如限制吞吐。在这个例子中,到来的请求被传递给 handle 函数,它会向通道发送一个值,然后处理请求,再从通道接收一个值,为下一个消费者将信号量标记为 ready 状态。通道缓冲的容量限制了 process 同时调用的数量。

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1    // Wait for active queue to drain.
    process(r)  // May take a long time.
    <-sem       // Done; enable next request to run.
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // Don't wait for handle to finish.
    }
}

MaxOutstanding 个 handler 正在执行 process,任何更多的 handler 尝试向缓冲区爆满的通道发送都将被阻塞,直到其中某个 handler 处理完了并从通道接收了一个值。

这种设计有问题,尽管 Serve 为每个到来的请求都创建一个新的 goroutine,甚至其中只有 MaxOutstanding 个能够运行。但 goroutine 是结结实实被创建出来了,因此如果请求来得太快,程序会无限地消耗资源。我们可以通过修改 Serve 控制 goroutine 的创建来解决这个缺陷。下面是一种解决方案,但要注意有问题,我们随后会修复:

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func() {
            process(req) // Buggy; see explanation below.
            <-sem
        }()
    }
}

这个 bug 是在 Go 的 for 循环中,循环变量在每次迭代被重用,所以 req 变量被所有 goroutine 共享,那不是我们想要的。我们要确保每个 goroutine 的 req 都是唯一的。有一种办法是将 req 的值作为参数传递给 goroutine 中的闭包:

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func(req *Request) {
            process(req)
            <-sem
        }(req)
    }
}

对比这个版本和前一个版本来感受闭包声明和运行的不同之处。另一个解决方案是创建一个同名的新变量:

func Serve(queue chan *Request) {
    for req := range queue {
        sem <- 1
        go func(req *Request) {
            process(req)
            <-sem
        }(req)
    }
}

这么写看着可能有点奇怪

req := req

但在 Go 中这么写是合法的,你得到一个新版本的同名变量,故意在本地隐藏循环变量但对每个 goroutine 来说是唯一的。

回到写服务器的问题上来,管理好资源的另一个办法是启动固定数量的 handle goroutine,都从请求通道读取。goroutine 的数量限制了同时调用 process 的数量。以下 Serve 函数还接收一个通道用于被通知以退出;在所有 goroutine 启动后就阻塞。

func handle(queue chan *Request) {
    for r := range queue {
        process(r)
    }
}

func Serve(clientRequests chan *Request, quit chan bool) {
    // Start handlers
    for i := 0; i < MaxOutstanding; i++ {
        go handle(clientRequests)
    }
    <-quit  // Wait to be told to exit.
}

通道的通道

Go 最重要的特性之一是通道是“一等公民”,可以像任何其他值一样分配和传递。常被用于实现安全、并行的解复用。

前一节的例子中,handle 是理想化的请求处理 handler,但我们并没有定义它所处理的类型。要是这个类型包括一个回复通道,每个客户端都可以有自己的应答路径。下面是 Request 类型的定义:

type Request struct {
    args        []int
    f           func([]int) int
    resultChan  chan int
}

客户端提供函数和它自己的参数,request 对象中还有一个通道用于接收应答。

func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// Send request
clientRequests <- request
// Wait for response.
fmt.Printf("answer: %d\n", <-request.resultChan)

在服务端,只要修改 handler 函数。

func handle(queue chan *Request) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}

显然离现实还很遥远,但这段代码就是一个限速的、并行的、非阻塞的 RPC 系统的框架,并且没有看到任何互斥锁(mutex)。

并行

这些想法另一个应用场景是在多个 CPU 核心上并行化计算。如果计算能够被分解成可以独立执行的片段,那么它就可以被并行化,每部分完成时都有一个通道来发送信号。

假设我们要对一列数组上执行很费的操作,而且每个元素上操作的值都是独立的,这是个理想化的例子。

type Vector []float64

// Apply the operation to v[i], v[i+1] ... up to v[n-1].
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
    for ; i < n; i++ {
        v[i] += u.Op(v[i])
    }
    c <- 1    // signal that this piece is done
}

我们在一个循环中独立启动这些部分,每个 CPU 一个。它们可以按任何顺序完成,但这不重要;我们只要在启动所有 goroutine 后排空通道来数完成信号。

const numCPU = 4 // number of CPU cores

func (v Vector) DoAll(u Vector) {
    c := make(chan int, numCPU)  // Buffering optional but sensible.
    for i := 0; i < numCPU; i++ {
        go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
    }
    // Drain the channel.
    for i := 0; i < numCPU; i++ {
        <-c    // wait for one task to complete
    }
    // All done.
}

我们还可以向 runtime 查询什么值合适,而不是定死一个常量。runtime.NumCPU 函数返回机器的 CPU 核心数量,所以我们可以这样写

var numCPU = runtime.NumCPU()

还有一个函数 runtime.GOMAXPROCS,返回(或设置)用户指定 Go 程序可以同时运行的 CPU 核心数。默认是 runtime.NumCPU 的值但是会被 shell 环境变量或调用该函数覆盖。调用它时传 0 仅仅查询该值。因此如果想要尊重用户的资源请求,我们应该写

var numCPU = runtime.GOMAXPROCS(0)

注意不要混淆并发/将程序构建为独立执行的组件/并行/在多个 CPU 上并行执行计算来提升效率的概念。虽然 Go 的并发功能使得一些问题可以轻松地被转换为并行计算,但 Go 是一种并发语言,而不是并行,不是所有的并行化问题都适合 Go 的模型。关于这个问题的讨论,请查看这篇博客 https://blog.golang.org/2013/01/concurrency-is-not-parallelism.html 中的讲座。

泄漏的缓冲区

并发编程的工具甚至可以让非并发的想法更容易表达。这是一个从 RPC 包中抽象处理的实例。客户端 goroutine 循环从某个源(可能是网络)接收数据。为了避免分配和释放 buffer,它维护了一个列表,并使用一个缓冲通道来表示。如果通道是空的,则会分配一个新的 buffer。一旦消息 buffer 准备好了,就会被发送到 serverChan

var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func client() {
    for {
        var b *Buffer
        // Grab a buffer if available; allocate if not.
        select {
        case b = <-freeList:
            // Got one; nothing more to do.
        default:
            // None free, so allocate a new one.
            b = new(Buffer)
        }
        load(b)              // Read next message from the net.
        serverChan <- b      // Send to server.
    }
}

server 循环从客户端接收每条消息,处理,并将 buffer 返回给空闲列表。

func server() {
    for {
        b := <-serverChan    // Wait for work.
        process(b)
        // Reuse buffer if there's room.
        select {
        case freeList <- b:
            // Buffer on free list; nothing more to do.
        default:
            // Free list full, just carry on.
        }
    }
}

客户端尝试从 freeList 接收一个 buffer;如果没有可用的就分配一个新的。服务端会将 b 发回 freeList 除非列表满了,在这种情况下 buffer 被丢弃由垃圾收集器回收。select 表达式中的 default 语句意味着这个 selects 永不阻塞。此实现仅用几行代码就构建了一个漏桶空闲列表,依靠缓冲通道和垃圾收集器。