Go 并发
Oct 20, 2021 23:30 · 3307 words · 7 minute read
用通信来共享
并发编程是一个很大的话题,我们只说 Go 特有的闪光点。
由于需要实现对共享变量的正确访问是很微妙的,并发编程在诸多环境中都不简单。Go 鼓励的是一种截然不同的方式,共享的值在通道中传递,实际上,永不被执行的线程主动共享。在某个特定的时间只有一条 goroutine 能够访问那个值。设计上数据竞争就不会发生。为了鼓励这种思维方式,我们将其简化为:
Do not communicate by sharing memory; instead, share memory by communicating.(不要通过共享内存来通信,而应该通过通信来共享内存。)
这种方式可能走的太远了。例如,引用计数最好在一个变量附近搞一个 mutex(互斥锁)来完成。但是作为高级别的方式,使用通道来控制访问更容易写出清晰、正确的代码。
在一个 CPU 上跑一个典型的单线程程序,完全不需要同步原语。再运行一个这样的实例;也不需要同步原语。现在让这两个程序通信;如果通信本身是同步器,那就无需其他同步方式了。举个栗子。Unix 管道完美适合这个模型。尽管 Go 的并发方式起源于霍尔的 Communicating Sequential Processes(CSP),它也可以被视为 Unix 管道一种类型安全的泛化。
Goroutine
之所以被叫做 goroutine 是因为现有的术语——线程、协程、进程之类无法准确地传达内涵。Goroutine 有简洁的模型:在相同的地址空间和其他 goroutine 并发地执行函数。它非常轻量,成本只比分配堆栈空间多一点点。堆栈开始时很小,因此它们很“便宜”,并且随着堆存储的分配增长。
Goroutine 在多条 OS 线程上多路复用,所以如果某个阻塞了,比如等待 I/O,其他的会继续运行。它们的设计掩盖了线程创建和管理的复杂性。
使用 go
关键字作为前缀在一个新的 goroutine 中调用函数。当调用完成,goroutine 静默退出。(效果有点像 Unix shell 用于在后台执行命令的 & 符号。)
go list.Sort() // run list.Sort concurrently; don't wait for it.
函数字面量可以很方便的在 goroutine 中调用。
func Announce(message string, delay time.Duration) {
go func() {
time.Sleep(delay)
fmt.Println(message)
}() // Note the parentheses - must call the function.
}
在 Go 中,函数字面量都是闭包:该实现确保函数引用的变量持续存活。
这些实例不怎么实用因为函数没有发出完成信号。为此我们需要通道。
通道(Channel)
和 map 一样,通道由 make
分配,返回值作为对底层数据结构的引用。通过提供可选参数设置通道的缓冲容量。默认为 0,即通道是无缓冲或同步的。
ci := make(chan int) // unbuffered channel of integers
cj := make(chan int, 0) // unbuffered channel of integers
cs := make(chan *os.File, 100) // buffered channel of pointers to Files
无缓冲通道将通信(值的传递)与同步相结合——保证两边计算(goroutine)都处于已知的状态。
有不少使用通道的场景,我们从第一个开始。上一节我们在后台启动了一个排序,通道能够允许启动的 goroutine 等待排序完成。
c := make(chan int) // Allocate a channel.
// Start the sort in a goroutine; when it completes, signal on the channel.
go func() {
list.Sort()
c <- 1 // Send a signal; value does not matter.
}()
doSomethingForAWhile()
<-c // Wait for sort to finish; discard sent value.
接收者会阻塞直到有值过来。如果通道没有缓冲,发送者会阻塞直到值被接收;如果通道有缓冲,发送者只会在值被复制到缓冲的期间阻塞;如果缓冲区满了,发送者也要等待直到有接收者收下一个值。
带缓冲的通道可以像信号量(semaphore)一样使用,比如限制吞吐。在这个例子中,到来的请求被传递给 handle
函数,它会向通道发送一个值,然后处理请求,再从通道接收一个值,为下一个消费者将信号量标记为 ready 状态。通道缓冲的容量限制了 process
同时调用的数量。
var sem = make(chan int, MaxOutstanding)
func handle(r *Request) {
sem <- 1 // Wait for active queue to drain.
process(r) // May take a long time.
<-sem // Done; enable next request to run.
}
func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req) // Don't wait for handle to finish.
}
}
当 MaxOutstanding
个 handler 正在执行 process
,任何更多的 handler 尝试向缓冲区爆满的通道发送都将被阻塞,直到其中某个 handler 处理完了并从通道接收了一个值。
这种设计有问题,尽管 Serve
为每个到来的请求都创建一个新的 goroutine,甚至其中只有 MaxOutstanding
个能够运行。但 goroutine 是结结实实被创建出来了,因此如果请求来得太快,程序会无限地消耗资源。我们可以通过修改 Serve
控制 goroutine 的创建来解决这个缺陷。下面是一种解决方案,但要注意有问题,我们随后会修复:
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func() {
process(req) // Buggy; see explanation below.
<-sem
}()
}
}
这个 bug 是在 Go 的 for
循环中,循环变量在每次迭代被重用,所以 req
变量被所有 goroutine 共享,那不是我们想要的。我们要确保每个 goroutine 的 req 都是唯一的。有一种办法是将 req
的值作为参数传递给 goroutine 中的闭包:
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func(req *Request) {
process(req)
<-sem
}(req)
}
}
对比这个版本和前一个版本来感受闭包声明和运行的不同之处。另一个解决方案是创建一个同名的新变量:
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func(req *Request) {
process(req)
<-sem
}(req)
}
}
这么写看着可能有点奇怪
req := req
但在 Go 中这么写是合法的,你得到一个新版本的同名变量,故意在本地隐藏循环变量但对每个 goroutine 来说是唯一的。
回到写服务器的问题上来,管理好资源的另一个办法是启动固定数量的 handle
goroutine,都从请求通道读取。goroutine 的数量限制了同时调用 process
的数量。以下 Serve
函数还接收一个通道用于被通知以退出;在所有 goroutine 启动后就阻塞。
func handle(queue chan *Request) {
for r := range queue {
process(r)
}
}
func Serve(clientRequests chan *Request, quit chan bool) {
// Start handlers
for i := 0; i < MaxOutstanding; i++ {
go handle(clientRequests)
}
<-quit // Wait to be told to exit.
}
通道的通道
Go 最重要的特性之一是通道是“一等公民”,可以像任何其他值一样分配和传递。常被用于实现安全、并行的解复用。
前一节的例子中,handle
是理想化的请求处理 handler,但我们并没有定义它所处理的类型。要是这个类型包括一个回复通道,每个客户端都可以有自己的应答路径。下面是 Request
类型的定义:
type Request struct {
args []int
f func([]int) int
resultChan chan int
}
客户端提供函数和它自己的参数,request 对象中还有一个通道用于接收应答。
func sum(a []int) (s int) {
for _, v := range a {
s += v
}
return
}
request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// Send request
clientRequests <- request
// Wait for response.
fmt.Printf("answer: %d\n", <-request.resultChan)
在服务端,只要修改 handler 函数。
func handle(queue chan *Request) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}
显然离现实还很遥远,但这段代码就是一个限速的、并行的、非阻塞的 RPC 系统的框架,并且没有看到任何互斥锁(mutex)。
并行
这些想法另一个应用场景是在多个 CPU 核心上并行化计算。如果计算能够被分解成可以独立执行的片段,那么它就可以被并行化,每部分完成时都有一个通道来发送信号。
假设我们要对一列数组上执行很费的操作,而且每个元素上操作的值都是独立的,这是个理想化的例子。
type Vector []float64
// Apply the operation to v[i], v[i+1] ... up to v[n-1].
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
for ; i < n; i++ {
v[i] += u.Op(v[i])
}
c <- 1 // signal that this piece is done
}
我们在一个循环中独立启动这些部分,每个 CPU 一个。它们可以按任何顺序完成,但这不重要;我们只要在启动所有 goroutine 后排空通道来数完成信号。
const numCPU = 4 // number of CPU cores
func (v Vector) DoAll(u Vector) {
c := make(chan int, numCPU) // Buffering optional but sensible.
for i := 0; i < numCPU; i++ {
go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
}
// Drain the channel.
for i := 0; i < numCPU; i++ {
<-c // wait for one task to complete
}
// All done.
}
我们还可以向 runtime 查询什么值合适,而不是定死一个常量。runtime.NumCPU
函数返回机器的 CPU 核心数量,所以我们可以这样写
var numCPU = runtime.NumCPU()
还有一个函数 runtime.GOMAXPROCS
,返回(或设置)用户指定 Go 程序可以同时运行的 CPU 核心数。默认是 runtime.NumCPU
的值但是会被 shell 环境变量或调用该函数覆盖。调用它时传 0 仅仅查询该值。因此如果想要尊重用户的资源请求,我们应该写
var numCPU = runtime.GOMAXPROCS(0)
注意不要混淆并发/将程序构建为独立执行的组件/并行/在多个 CPU 上并行执行计算来提升效率的概念。虽然 Go 的并发功能使得一些问题可以轻松地被转换为并行计算,但 Go 是一种并发语言,而不是并行,不是所有的并行化问题都适合 Go 的模型。关于这个问题的讨论,请查看这篇博客 https://blog.golang.org/2013/01/concurrency-is-not-parallelism.html 中的讲座。
泄漏的缓冲区
并发编程的工具甚至可以让非并发的想法更容易表达。这是一个从 RPC 包中抽象处理的实例。客户端 goroutine 循环从某个源(可能是网络)接收数据。为了避免分配和释放 buffer,它维护了一个列表,并使用一个缓冲通道来表示。如果通道是空的,则会分配一个新的 buffer。一旦消息 buffer 准备好了,就会被发送到 serverChan
。
var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)
func client() {
for {
var b *Buffer
// Grab a buffer if available; allocate if not.
select {
case b = <-freeList:
// Got one; nothing more to do.
default:
// None free, so allocate a new one.
b = new(Buffer)
}
load(b) // Read next message from the net.
serverChan <- b // Send to server.
}
}
server
循环从客户端接收每条消息,处理,并将 buffer 返回给空闲列表。
func server() {
for {
b := <-serverChan // Wait for work.
process(b)
// Reuse buffer if there's room.
select {
case freeList <- b:
// Buffer on free list; nothing more to do.
default:
// Free list full, just carry on.
}
}
}
客户端尝试从 freeList
接收一个 buffer;如果没有可用的就分配一个新的。服务端会将 b
发回 freeList
除非列表满了,在这种情况下 buffer 被丢弃由垃圾收集器回收。select
表达式中的 default
语句意味着这个 selects
永不阻塞。此实现仅用几行代码就构建了一个漏桶空闲列表,依靠缓冲通道和垃圾收集器。