Reflecting on Worker Pools in Go
Jul 15, 2019 23:55 · 2925 words · 6 minute read
众多 Gopher 或许自行实现过协程池,利用 Go 令人惊叹的并发特性来并行作业。我在第 N 次盘它后扪心自问,有更好的方法吗?我已经用其他语言实现了类似的模式,并通过泛型抽象地构建这些线程管道。但是 Go 不支持泛型,至少现在还没有。。。那我怎么来曲线救国呢?反射。
什么是协程池?
下面的代码实现了一个三阶协程池:
package main
import (
"fmt"
"strconv"
"sync"
)
func main() {
// Section 1
parallelism := 5
inCh1 := make(chan int, parallelism)
inCh2 := make(chan int, parallelism)
inCh3 := make(chan int, parallelism)
outCh := make(chan string, parallelism)
var wg1 sync.WaitGroup
var wg2 sync.WaitGroup
var wg3 sync.WaitGroup
// Section 2
for i := 0; i < parallelism; i++ {
wg1.Add(1)
wg2.Add(1)
wg3.Add(1)
go func() {
for v := range inCh1 {
inCh2 <- add1(v)
}
wg1.Done()
}()
go func() {
for v := range inCh2 {
inCh3 <- add2(v)
}
wg2.Done()
}()
go func() {
for v := range inCh3 {
outCh <- stringify(v)
}
wg3.Done()
}()
}
// Section 3
for i := 0; i < 10; i++ {
inCh1 <- i
output := <-outCh
fmt.Println(output)
}
// Section 4
close(inCh1)
wg1.Wait()
close(inCh2)
wg2.Wait()
close(inCh3)
wg3.Wait()
close(outCh)
}
func add1(i int) int {
return i + 1
}
func add2(i int) int {
return i + 2
}
func stringify(i int) string {
return strconv.Itoa(i)
}
-
第一部分
所有的预备工作。我们选择5作为并行因数,为协程池创建一些 channel 来进行通讯,接着我们又创建 WaitGroups 来追踪所有衍生的协程。
-
第二部分
启动协程池,读取上一个已启动的协程池中的值。
-
第三部分
输入一些值并等待打印结果。也可以通过缓冲容量为5的 channel 来实现,现在我们利用主协程来简化这项工作。
-
第四部分
收尾。先关闭第一个 channel,消耗完缓存后自然地退出循环,WaitGroup 阻塞,重复以上直到优雅地关闭所有东西。
这一切都奏效,而且代码清晰无可挑剔。即便这样,我还是很好奇,有更好的办法吗?
介绍下 flo
我决定打造 flo 来抽象化这个模式。利用 flo 实现与上面代码同样的效果:
package main
import (
"context"
"fmt"
"log"
"strconv"
"sync"
"github.com/codyoss/flo"
)
func main() {
// Section 1
parallelism := 5
inCh := make(chan int, parallelism)
outCh := make(chan string, parallelism)
var wg sync.WaitGroup
// Section 2
go func() {
wg.Add(1)
for i := 0; i < 10; i++ {
inCh <- i
output := <-outCh
fmt.Println(output)
}
close(inCh)
wg.Done()
}()
// Section 3
err := flo.NewBuilder(flo.WithParallelism(parallelism), flo.WithInput(inCh), flo.WithOutput(outCh)).
Add(add1).
Add(add2).
Add(stringify).
BuildAndExecute(context.Background())
if err != nil {
log.Fatal(err)
}
wg.Wait()
close(outCh)
}
// Section 4
func add1(ctx context.Context, i int) (int, error) {
return i + 1, nil
}
func add2(ctx context.Context, i int) (int, error) {
return i + 2, nil
}
func stringify(ctx context.Context, i int) (string, error) {
return strconv.Itoa(i), nil
}
-
第一节
为了和第一个例子保持形似,我保留了输入输出 channel,前者作为 flo 的数据源,而后者从 flo 收获结果。
-
第二节
和第一个例子的第三节差不多。之所以这个顺序是因为下一节会封杀现在的协程。我还弄了个 WaitGroup 来追踪这段代码。
-
第三节
这是主要的不同之处:我们使用
NewBuilder
方法来构造 flo 对象。这有几个可选参数;在这个例子中我注册了输入和输出 channel(前者作为 flo 的数据源,而后者从 flo 收获结果)。并行因数也是可选的,用来配置每个池中启动的协程数量以及它们之间通讯的缓冲 channel 大小。 -
第四节 你可能留意到了函数的签名有点不一样:
func add1(i int) int
变成了:
func add1(ctx context.Context, i int) (int, error)
context 作为第一个参数,当
BuildAndExecute
方法被调用时 context 将启动 flo。
flo 如何使用反射?
看一下上面例子中调用的一些方法的签名:
func WithInput(ch interface{}) Option { ... }
func WithOutput(ch interface{}) Option { ... }
type Step interface{}
func (b *Builder) Add(s Step, options ...StepOption) *Builder { .. }
我觉得你大概心里有数。据我所知 Go 中唯一支持泛 api 的只有 interface{}
。但要让用户编写的代码能够使用强类型而不是空接口,flo 搞了一堆反射来正确断言所有类型。如果感兴趣可以查看godoc 中更详细的说明。在更抽象的层面,在 flo 处理任何数据之前都成竹在胸。
需要怎么反射来实现这个功能?
- 获取
reflect.Value
和reflect.Type of
- 调函数
- 转换空接口
- 检查函数的输入和输出类型
- 对比
reflect.Kind
- channel IO
- 检查数据流经 channel 的方向
The worker pool pattern in Go is a pattern many Gophers have implemented at least once in their career. It is a great way to play with some of Go’s awesome concurrency primitives and to parallelize work. As I typed out this pattern for the Nth time I wonder to myself , is there a better way? I have implement similar patterns in other languages, and have been able to abstractly build these worker pipelines via generics. But Go does not have generics, at least not yet… So what does Go have that I could use in its place? Reflection.
Is there a better way?
Let’s not put the cart before the horse though, taking a step back…
What is the Worker Pool Pattern?
The snippet below shows an example of a three step worker pool. Lets break down the sections of it.
package main
import (
"fmt"
"strconv"
"sync"
)
func main() {
// Section 1
parallelism := 5
inCh1 := make(chan int, parallelism)
inCh2 := make(chan int, parallelism)
inCh3 := make(chan int, parallelism)
outCh := make(chan string, parallelism)
var wg1 sync.WaitGroup
var wg2 sync.WaitGroup
var wg3 sync.WaitGroup
// Section 2
for i := 0; i < parallelism; i++ {
wg1.Add(1)
wg2.Add(1)
wg3.Add(1)
go func() {
for v := range inCh1 {
v := add1(v)
inCh2 <- v
}
wg1.Done()
}()
go func() {
for v := range inCh2 {
v := add2(v)
inCh3 <- v
}
wg2.Done()
}()
go func() {
for v := range inCh3 {
v := stringify(v)
outCh <- v
}
wg3.Done()
}()
}
// Section 3
for i := 0; i < 10; i++ {
inCh1 <- i
output := <-outCh
fmt.Println(output)
}
// Section 4
close(inCh1)
wg1.Wait()
close(inCh2)
wg2.Wait()
close(inCh3)
wg3.Wait()
close(outCh)
}
func add1(i int) int {
return i + 1
}
func add2(i int) int {
return i + 2
}
func stringify(i int) string {
return strconv.Itoa(i)
}
- This is all the setup. We pick a parallelism factor (in this case 5) we want to work with, we create some channels for our worker pools to communicate over, and we create WaitGroups to keep track of all of the goroutines we are about to spawn.
- We launch our worker pools and have the proceeding worker pool read from the output channel of the previous pool launched.
- We insert some input and wait for the the result to be printed. This could also be done as its own pool as the channel also has a buffer of 5, but to keep things simple for now this work is being kept in the main goroutine.
- This is the cleanup portion. Close the first channel, let its input drain and naturally break out of the range loop, wait on the WaitGroup, and repeat until everything has been gracefully shutdown.
This all works, and to be very clear there is absolutely nothing wrong with this code (unless you tell me otherwise in the comments). Again though, I was curious, is there a better way?
Enter flo
So I decided to build an abstraction over this pattern. I call the project flo (short for workflow). Let me first show you what the equivalent code looks like in flo.
package main
import (
"context"
"fmt"
"log"
"strconv"
"sync"
"github.com/codyoss/flo"
)
func main() {
// Section 1
parallelism := 5
inCh := make(chan int, parallelism)
outCh := make(chan string, parallelism)
var wg sync.WaitGroup
// Section 2
go func() {
wg.Add(1)
for i := 0; i < 10; i++ {
inCh <- i
output := <-outCh
fmt.Println(output)
}
close(inCh)
wg.Done()
}()
// Section 3
err := flo.NewBuilder(flo.WithParallelism(parallelism), flo.WithInput(inCh), flo.WithOutput(outCh)).
Add(add1).
Add(add2).
Add(stringify).
BuildAndExecute(context.Background())
if err != nil {
log.Fatal(err)
}
wg.Wait()
close(outCh)
}
// Section 4
func add1(ctx context.Context, i int) (int, error) {
return i + 1, nil
}
func add2(ctx context.Context, i int) (int, error) {
return i + 2, nil
}
func stringify(ctx context.Context, i int) (string, error) {
return strconv.Itoa(i), nil
}
- To keep the example similar to the one above I decided to still have an input channel used to feed the flo and an output channel used to pull the final results from the flo.
- This is the equivalent to section three of the first example. I needed to launch this before the next section of code because that section will block the current goroutine. Also, because I am launching this in a goroutine I added a WaitGroup to keep track of this code.
- This is the major difference from the previous example. We start building a flo with the NewBuilder function. This takes some optional arguments; in this case the registration of an input and an output channel to feed and receive data from the flo. Also, a parallelism factor is chosen. This will configure how many workers to launch for each pool and the size of the buffered channel they communicate over.
- Here you will notice that the signatures to the function changed a little bit from our first example:
func add1(i int) int
changed to:
func add1(ctx context.Context, i int) (int, error)
A context is added as the first parameter so that the context that is used to start the flo when BuildAndExecute is called can be properly propagated to all of the worker goroutines. This allows flo to support context cancellation from top-to-bottom. An error is also added to the output parameters. Although it is not being used in this example, it allows to stop processing a stream of data at any step in the pipeline. If add2 returned an error, stringify would never be called for that stream of data. This does not stop the execution of any worker goroutines. Optionally, flo allows you to register error handlers should you want to do anything when an error does occur at any step.
How does flo use reflect?
Well, lets take a look at the signatures of some of those methods that were being called in the example above.
func WithInput(ch interface{}) Option { ... }
func WithOutput(ch interface{}) Option { ... }
type Step interface{}
func (b *Builder) Add(s Step, options ...StepOption) *Builder { .. }
I think you get the picture. The only way to support a generic api in Go, that I know of, is interface{}. But to make the the bits of code that the user of the library writes be able to use strong types instead of the empty interface, flo does a bunch of reflection to assert that all the types line up properly. This is much better explained in the godoc if you are curious. At a high level though, everything is validated before the flo processes any data.
What kind of reflection is required to make this work?
- Get the reflect.Value and reflect.Type of many things
- invoking functions
- converting things to empty interfaces
- looking at the types of inputs and outputs of functions
- Comparing against reflect.Kind
- Sending and receiving data from channels
- Looking at the direction data can flow through a channel
- Checking if types implement interfaces
I know what you are thinking…
That sounds like a lot of reflection, doesn’t that slow down your code?
Yup. Although a lot of the reflection done in the library is executed before any data processing occurs, a fair bit is still needed at runtime.
Benchmarking flo vs Native Code
In the flo Github repo I wrote a benchmark to compare the performance hit for all of this reflection. The code used to test this is more-or-less the code from the examples above. If you want to checkout the benchmark for yourself look in flo_test.go. The results looked like this when running the benchmark from my laptop:
$ go test -bench=. -benchmem
goos: linux
goarch: amd64
pkg: github.com/codyoss/flo
BenchmarkFlo-8 200000 6577 ns/op 520 B/op 12 allocs/op
BenchmarkNonFlo-8 1000000 1057 ns/op 0 B/op 0 allocs/op
PASS
ok github.com/codyoss/flo 2.462s
I may be wrong, as I have not looked too deeply into this yet, but I believe most if not all of the slowdown and the allocations come from the reflect package. Some of these allocations seem legit but it also looks like there are a couple places the reflect package could be cleaned up for a bit of a performance boost. The main thing I saw was this todo:
func valueInterface(v Value, safe bool) interface{} {
...
// TODO: pass safe to packEface so we don't need to copy if
// safe==true?
return packEface(v)
}
But at the end of the day, I knew this code was not going to be blazingly fast. I think it is a basic understanding that when you play with reflection, you pay the price.
The real reason I created this project was to explore the reflect package more and that much I did accomplish. I will tell you that it is not the most straight-forward package to use. A lot of the methods will panic if they are used on a value of the wrong underlying type. The api expects you to have clearly read the docs. This is not such a bad thing, but I think it does discourage many from exploring it. I encourage you to checkout flo and see how I used reflection to accomplish abstracting away the worker pattern.
Takeaways
- Was there a better way? Meh… I wrote a library that traded, what I find to be, a fun syntax that reduces lines of code for performance. I have no regrets!
- Reflection is powerful, but comes with a performance cost.
- I need to look into the todo listed above and see if fixing it would improve the performance of this package. If it does maybe I will open an issue/PR.
- Learning more about the packages in the standard library can give you good perspective into new patterns and ways of doing things. Don’t be afraid!
- I need to use the reflect package a little more still before I feel comfortable.
- I am curious to see if the performance of this code would improve should it be re-implemented with generics/contracts once that feature comes out. I will follow-up when this happens.
- Checkout the repo: https://github.com/codyoss/flo