Go 并发模式:流水线
May 30, 2021 17:00 · 3339 words · 7 minute read
译文
介绍
Go 对并发的原生支持使得构建流式数据流水线,高效利用 I/O 和多核 CPU。本文介绍了这种流水线的例子,强调了操作失败时会出现的状况,并介绍了干净利落地处理失败的相关技术。
何为流水线?
在 Go 中并没有对流水线的明确定义;这只是并发程序中的一种。白话来说,流水线是一系列由通道连接的阶段,其中每个阶段又是一组运行相同函数的 goroutine。goroutine 在每个阶段:
- 通过入向通道来接收来自上游的值
- 对那个值进行处理,通常产生新的值
- 通过出向通道向下游发送值
每个阶段都有任意数量的入向和出向通道,但第一个和最后一个阶段除外,它们各自只有出向或入向通道。第一阶有时被称为源或生产者;末阶呢就是消费者了。
我们用一个简单的示例流水线来解释这个想法和技术。后面将介绍一个更贴近现实的例子。
平方运算
想象一条有三个阶段的流水线。
第一阶,gen
是一个将整数列表中的元素一个一个发往通道的函数。gen
函数将启动一个 goroutine 来向通道发送整数,并会在所有数据发完后关掉通道:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
第二阶,sq
从通道接收数据,并返回一个通道,用于发送每个接收到的整数的平方。在入向通道关掉后,这个阶段已经将所有的值发往下游,然后关掉出向通道:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
main
函数建立一条流水线并运行最后一个阶段:从第二阶接收数值并一个一个打印出来,直到通道被关闭:
func main() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)
// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
既然 sq
入向和出向的通道类型相同,我们可以对其任意编排。我们还可以将 main
重写成一个 range
循环,就行其他阶段那样:
func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}
扇出 & 扇入
多个函数可以从同一个通道读取数据,直到这个通道被关闭,这被称为扇出。这提供了一种在一组 worker 之间分配任务的方法,来并行使用 CPU 和 I/O。
一个函数可以从多个输入读取和处理数据,直到所有输入都被关闭为止,通过将输入通道复用到一个单独的通道上。这就是所谓的扇入。
我们修改流水线以运行两个 sq
实例,都从相同的输入通道读取。引入一个新函数 merge
,来扇入结果:
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
merge
函数为每个入向通道启动一个 goroutine,将数值复制到唯一的出向通道,从而将通道列表转换为一个单独的通道。一旦所有 output
goroutine 启动,merge
会再开一个 goroutine,在通道的所有发送完成后关闭通道。
向已关闭的通道发数据会 panic,所以要确保在调用关闭函数前发完所有东西。sync.WaitGroup 类型提供了一种简单的方法来实现这种同步:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
消除短板
我们的管道函数有个模式:
- 当所有发送操作完成后关闭其出向通道。
- 持续从入向通道接收数值直到通道关闭为止。
这种模式允许将每个接收阶段写成一个 range
循环,并确保所有 goroutine 在所有数值被成功发送到下游后退出。
但是在真实的流水线中,各阶段并不总是收到所有入向的值。有时候就是这么设计的:接受者可能只需要一个子集的值来取得进展。更常见的情况是,一个阶段因为一个代表错误的值提前退出了。这两种情况,接受者都不应该等待剩下的值到来,我们希望早期阶段停止生产后期阶段不需要的值。
在我们的示例流水线中,如果一个阶段无法消费所有的入值,发送这些值的 goroutine 将无限期地阻塞。
// Consume the first value from the output.
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// Since we didn't receive the second value from out,
// one of the output goroutines is hung attempting to send it.
}
这是一种资源泄漏:goroutine 会占用内存和运行时的资源,goroutine 堆栈中的堆引用会阻止数据被垃圾回收。goroutine 不被垃圾回收;必须自己退出。
我们要安排流水线的上游阶段退出,即使下游阶段未能收到所有的入值。一种方法是为出向通道赋予缓冲区。缓冲区可以容纳固定数量的值;只要缓冲区内还有空间,发送操作就会立即完成。
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
当要发送多少数值在通道创建时就已知,缓冲区可以简化代码。例如,我们可以改写 gen
,将整数列表复制到一个缓冲通道中,避免创建一个新的 goroutine:
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
回到流水线中被阻塞的 goroutine,我们可以考虑给 merge
返回的出向通道添加缓冲区:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // enough space for the unread inputs
// ... the rest is unchanged ...
虽然这就修复了程序中 goroutine 被阻塞,但还是烂代码。缓冲区大小取决于已经知道 merge
将接收多少个数值和下游阶段将消费多少个数值。这就很脆弱:如果我们传递一个额外的值给 gen
,或者下游阶段读取的值不够多,goroutine 阻塞将再次出现。
所以我们要提供一种办法,让下游阶段向发送者表明它将停止接收。
原文
Introduction
Go’s concurrency primitives make it easy to construct streaming data pipelines that make efficient use of I/O and multiple CPUs. This article presents examples of such pipelines, highlights subtleties that arise when operations fail, and introduces techniques for dealing with failures cleanly.
What is a pipeline?
There’s no formal definition of a pipeline in Go; it’s just one of many kinds of concurrent programs. Informally, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function. In each stage, the goroutines
- receive values from upstream via inbound channels
- perform some function on that data, usually producing new values
- send values downstream via outbound channels
Each stage has any number of inbound and outbound channels, except the first and last stages, which have only outbound or inbound channels, respectively. The first stage is sometimes called the source or producer; the last stage, the sink or consumer.
We’ll begin with a simple example pipeline to explain the ideas and techniques. Later, we’ll present a more realistic example.
We’ll begin with a simple example pipeline to explain the ideas and techniques. Later, we’ll present a more realistic example.
Squaring numbers
Consider a pipeline with three stages.
The first stage, gen
, is a function that converts a list of integers to a channel that emits the integers in the list. The gen
function starts a goroutine that sends the integers on the channel and closes the channel when all the values have been sent:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
The second stage, sq
, receives integers from a channel and returns a channel that emits the square of each received integer. After the inbound channel is closed and this stage has sent all the values downstream, it closes the outbound channel:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
The main
function sets up the pipeline and runs the final stage: it receives values from the second stage and prints each one, until the channel is closed:
func main() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)
// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
Since sq
has the same type for its inbound and outbound channels, we can compose it any number of times. We can also rewrite main
as a range loop, like the other stages:
Fan-out, fan-in
Multiple functions can read from the same channel until that channel is closed; this is called fan-out. This provides a way to distribute work amongst a group of workers to parallelize CPU use and I/O.
A function can read from multiple inputs and proceed until all are closed by multiplexing the input channels onto a single channel that’s closed when all the inputs are closed. This is called fan-in.
We can change our pipeline to run two instances of sq
, each reading from the same input channel. We introduce a new function, merge
, to fan in the results:
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
The merge
function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies the values to the sole outbound channel. Once all the output
goroutines have been started, merge
starts one more goroutine to close the outbound channel after all sends on that channel are done.
Sends on a closed channel panic, so it’s important to ensure all sends are done before calling close. The sync.WaitGroup type provides a simple way to arrange this synchronization:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
Stopping short
There is a pattern to our pipeline functions:
- stages close their outbound channels when all the send operations are done.
- stages keep receiving values from inbound channels until those channels are closed.
This pattern allows each receiving stage to be written as a range
loop and ensures that all goroutines exit once all values have been successfully sent downstream.
But in real pipelines, stages don’t always receive all the inbound values. Sometimes this is by design: the receiver may only need a subset of values to make progress. More often, a stage exits early because an inbound value represents an error in an earlier stage. In either case the receiver should not have to wait for the remaining values to arrive, and we want earlier stages to stop producing values that later stages don’t need.
In our example pipeline, if a stage fails to consume all the inbound values, the goroutines attempting to send those values will block indefinitely:
// Consume the first value from the output.
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// Since we didn't receive the second value from out,
// one of the output goroutines is hung attempting to send it.
}
This is a resource leak: goroutines consume memory and runtime resources, and heap references in goroutine stacks keep data from being garbage collected. Goroutines are not garbage collected; they must exit on their own.
We need to arrange for the upstream stages of our pipeline to exit even when the downstream stages fail to receive all the inbound values. One way to do this is to change the outbound channels to have a buffer. A buffer can hold a fixed number of values; send operations complete immediately if there’s room in the buffer:
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
When the number of values to be sent is known at channel creation time, a buffer can simplify the code. For example, we can rewrite gen
to copy the list of integers into a buffered channel and avoid creating a new goroutine:
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
Returning to the blocked goroutines in our pipeline, we might consider adding a buffer to the outbound channel returned by merge
:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // enough space for the unread inputs
// ... the rest is unchanged ...
While this fixes the blocked goroutine in this program, this is bad code. The choice of buffer size of 1 here depends on knowing the number of values merge
will receive and the number of values downstream stages will consume. This is fragile: if we pass an additional value to gen
, or if the downstream stage reads any fewer values, we will again have blocked goroutines.
Instead, we need to provide a way for downstream stages to indicate to the senders that they will stop accepting input.