Go Channel
Jul 16, 2022 21:30 · 3637 words · 8 minute read
谚语说得好:Don’t communicate by sharing memory, share memory by communicating(出自 Rob Pike),而利用通信共享内存方法正是 Go Channel。
channel 真正的数据结构是 hchan
:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
qcount
循环队列中的元素数量dataqsiz
循环队列的长度buf
指向循环队列的底层数组elemsize
能够接收和发送的元素大小closed
是否已关闭elemtype
channel 中元素类型sendx
已发送的数据在循环队列底层数组中的索引recvx
已接收的数据在循环队列底层数组中的索引recvq
等待接收的 goroutine 队列,双向链表sendq
等待发送的 goroutine 队列,双向链表lock
互斥锁
channel 内部也有互斥锁存在,并非通过通信来共享内存效率会更高。
创建 channel
使用 make 初始化 channel:
- 无缓冲 channel:
var ch = make(chan int)
- 缓冲 channel:
var ch = make(chan int, 10)
对应 runtime.makechan64
和 runtime.makechan
:
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}
最终还是会调用 makechan
函数:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
- 无缓冲 channel(
mem == 0
):调用mallocgc
为hchan
分配一段连续的内存空间 - 有缓冲 channel(
elem.ptrdata == 0
):调用mallocgc
为hchan
连同底层数组buf
一起分配一段连续的内存空间
mallocgc
分配的内存都在堆上,所以 channel 都在堆上,存在垃圾回收。
发送数据
ch <- 1
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
调用的是 chansend
函数:
-
前置检查
-
检查 channel 是否为 nil
if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") }
- 如果是非阻塞发送,直接返回
- 否则调用
gopark
函数将当前执行的 goroutine 暂停
-
快速通道——调用
full
函数判断非阻塞 channel 是否满了:func full(c *hchan) bool { // c.dataqsiz is immutable (never written after the channel is created) // so it is safe to read at any time during channel operation. if c.dataqsiz == 0 { // Assumes that a pointer read is relaxed-atomic. return c.recvq.first == nil } // Assumes that a uint read is relaxed-atomic. return c.qcount == c.dataqsiz }
- 如果当前 channel 无缓冲(
c.dataqsiz == 0
),没有接收者 goroutine 就返回true
- 如果
c.qcount == c.dataqsiz
,表示缓冲区已满
- 如果当前 channel 无缓冲(
-
-
检查 channel 是否已关闭,向关闭的 channel 发送数据会 panic
panic(plainError("send on closed channel"))
-
从尝试接收的 goroutine 队列出队一个接收者 goroutine:
if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
-
如果有 goroutine 正在等待接收数据,调用
send
函数直接发送:func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
看一下
sendDirect
函数:func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // src is on our stack, dst is a slot on another stack. // Once we read sg.elem out of sg, it will no longer // be updated if the destination's stack gets copied (shrunk). // So make sure that no preemption points can happen between read & use. dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) // No need for cgo write barrier checks because dst is always // Go memory. memmove(dst, src, t.size) }
调用
memmove
从发送者ep
直接将数据拷贝至接收者 goroutine 内存栈。
-
-
判断缓冲区是否还有空间(c.qcount < c.dataqsiz),维护 channel 自身数据结构:
c.qcount
是循环队列中的元素数量c.dataqsiz
是循环队列的长度(容量)
if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
- 调用
chanbuf
函数获取底层数组中sendx
索引的元素指针 - 调用
typedmemmove
将发送的值拷贝到底层数组 sendx
自增,指向下一个为待发送元素准备好的底层数组格子,如果c.sendx == c.dataqsiz
下一个索引等于循环队列长度,归零qcount
自增然后释放锁,返回
-
如果缓冲区已满,要么直接返回要么阻塞等待
-
如果非阻塞发送,释放锁然后返回:
if !block { unlock(&c.lock) return false }
-
否则(阻塞发送):
gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
- 调用
getg
函数获取当前执行的 goroutine - 调用
acquireSudog
函数初始化一个sudog
实例 - 调用
c.sendq.enqueue(mysg)
方法将当前 goroutine 入队至等待发送的 goroutine 队列c.sendq
- 调用
gopark
暂停当前 goroutine
- 调用
-
-
调用
KeepAlive(ep)
函数保活,避免被垃圾回收 -
唤醒等待状态的 goroutine:
if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg)
- 看一下 goroutine 是否已经被其他人唤醒了(
mysg != gp.waiting
) mysg.c = nil
解绑 channel- 调用
releaseSudog
函数释放 goroutine
- 看一下 goroutine 是否已经被其他人唤醒了(
接收数据
-
val := <- ch
(阻塞接收)func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) }
-
val, ok <- ch
(非阻塞接收)func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
两者都是调用 chanrecv
函数:
-
前置检查
-
检查 channel 是否为 nil,从 nil 的 channel 接收数据
if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
- 如果是非阻塞接收,直接返回
- 否则(阻塞接收),调用
gopark
函数将当前执行的 goroutine 暂停
-
快速通道——调用
empty
函数检查非阻塞接收的 channel 中是否有数据:func empty(c *hchan) bool { // c.dataqsiz is immutable. if c.dataqsiz == 0 { return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil } return atomic.Loaduint(&c.qcount) == 0 }
-
如果当前 channel 无缓冲(
c.dataqsiz == 0
),等待发送队列中是否存在 goroutine -
如果是缓冲 channel 就检查循环队列中是否有数据
-
如果 channel 未关闭,直接返回:
if atomic.Load(&c.closed) == 0 { return }
-
如果 channel 已经关闭且无数据,调用
typedmemclr
函数 清理接收者指针ep
:if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
-
-
-
检查 channel 是否关闭,如果 channel 被关闭(
c.closed != 0
)且循环队列中无数据(c.qcount == 0
),释放锁然后调用typedmemclr
函数清理ep
直接返回:if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
-
尝试从等待发送的 goroutine 队列出队一个 goroutine,直接从发送者接收:
if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }
这里主要调用
recv
函数:func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } // copy data from queue to receiver if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
-
如果是无缓冲 channel(
c.dataqsiz == 0
),接着调用recvDirect
函数:func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { // dst is on our stack or the heap, src is on another stack. // The channel is locked, so src will not move during this // operation. src := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) }
调用
memmove
函数从发送者 goroutine 内存栈直接拷贝至接收者ep
内存。 -
如果 channel 有缓冲,但是缓冲区已满:
-
调用
chanbuf
函数根据recvx
已接收的数据在循环队列底层数组中的索引读取循环队列的底层数组的元素 -
调用
typedmemmove
函数将底层数组的元素拷贝到接收者ep
-
维护 channel 自身数据结构
c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
当已接收的数据在循环队列底层数组中的索引达到循环队列的长度(
c.recvx == c.dataqsiz
),归零
-
最后调用
goready
函数唤醒出队的发送 goroutine。 -
-
如果 channel 循环队列(缓冲区)还有数据(
c.qcount > 0
):if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
- 调用
chanbuf
函数根据recvx
索引在循环队列底层数组读取要接收的元素 - 调用
typedmemmove
函数将底层数组的元素拷贝到接收者ep
- 调用
typedmemclr
函数清理循环队列中相应位置的值 - 维护 channel 自身数据结构,当已接收的数据在循环队列底层数组中的索引达到循环队列的长度(
c.recvx == c.dataqsiz
),归零
- 调用
-
如果是非阻塞接收,释放锁然后返回:
if !block { unlock(&c.lock) return false, false }
-
剩下的和发送差不多,阻塞接收:
gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
- 调用
getg
函数获取当前执行的 goroutine - 调用
acquireSudog
函数初始化一个sudog
实例 - 调用
c.recvq.enqueue(mysg)
方法将当前 goroutine 入队等待接收的 goroutine 队列 - 调用
gopark
挂起该 goroutine
- 调用
-
唤醒等待状态的 goroutine:
if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success
- 看一下 goroutine 是否已经被其他人唤醒了(
mysg != gp.waiting
) mysg.c = nil
解绑 channel- 调用
releaseSudog
函数释放 goroutine
- 看一下 goroutine 是否已经被其他人唤醒了(
channel 关闭
close(ch)
对应 runtime.closechan
:
-
首先判断 channel 是否为 nil(
c == nil
),试图关闭一个 nil channel 会 panic:if c == nil { panic(plainError("close of nil channel")) }
-
上锁
lock(&c.lock)
-
如果 channel 已关闭(
c.closed != 0
),也会 panic:if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) }
-
出队所有等待接收的 goroutine 并推入
gList
:var glist gList // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) }
-
出队所有等待写入的 goroutine 并推入
gList
:// release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = unsafe.Pointer(sg) sg.success = false if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) }
-
unlock(&c.lock)
释放锁 -
弹出
gList
中所有 goroutine 并唤醒:// Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) }