Go Channel

Jul 16, 2022 21:30 · 3637 words · 8 minute read Golang

谚语说得好:Don’t communicate by sharing memory, share memory by communicating(出自 Rob Pike),而利用通信共享内存方法正是 Go Channel。

channel 真正的数据结构是 hchan

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}
  • qcount 循环队列中的元素数量
  • dataqsiz 循环队列的长度
  • buf 指向循环队列的底层数组
  • elemsize 能够接收和发送的元素大小
  • closed 是否已关闭
  • elemtype channel 中元素类型
  • sendx发送的数据在循环队列底层数组中的索引
  • recvx接收的数据在循环队列底层数组中的索引
  • recvq 等待接收的 goroutine 队列,双向链表
  • sendq 等待发送的 goroutine 队列,双向链表
  • lock 互斥锁

channel 内部也有互斥锁存在,并非通过通信来共享内存效率会更高。

创建 channel

使用 make 初始化 channel:

  • 无缓冲 channel:var ch = make(chan int)
  • 缓冲 channel:var ch = make(chan int, 10)

对应 runtime.makechan64runtime.makechan

func makechan64(t *chantype, size int64) *hchan {
    if int64(int(size)) != size {
        panic(plainError("makechan: size out of range"))
    }

    return makechan(t, int(size))
}

最终还是会调用 makechan 函数:

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}
  • 无缓冲 channel(mem == 0):调用 mallocgchchan 分配一段连续的内存空间
  • 有缓冲 channel(elem.ptrdata == 0):调用 mallocgchchan 连同底层数组 buf 一起分配一段连续的内存空间

mallocgc 分配的内存都在堆上,所以 channel 都在堆上,存在垃圾回收。

发送数据

ch <- 1

对应 runtime.chansend1

func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

调用的是 chansend函数:

  1. 前置检查

    1. 检查 channel 是否为 nil

      if c == nil {
          if !block {
              return false
          }
          gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
          throw("unreachable")
      }
      
      • 如果是非阻塞发送,直接返回
      • 否则调用 gopark 函数将当前执行的 goroutine 暂停
    2. 快速通道——调用 full 函数判断非阻塞 channel 是否满了:

      func full(c *hchan) bool {
          // c.dataqsiz is immutable (never written after the channel is created)
          // so it is safe to read at any time during channel operation.
          if c.dataqsiz == 0 {
              // Assumes that a pointer read is relaxed-atomic.
              return c.recvq.first == nil
          }
          // Assumes that a uint read is relaxed-atomic.
          return c.qcount == c.dataqsiz
      }
      
      • 如果当前 channel 无缓冲(c.dataqsiz == 0),没有接收者 goroutine 就返回 true
      • 如果 c.qcount == c.dataqsiz,表示缓冲区已满
  2. 上锁 lock(&c.lock)

  3. 检查 channel 是否已关闭,向关闭的 channel 发送数据会 panic panic(plainError("send on closed channel"))

  4. 从尝试接收的 goroutine 队列出队一个接收者 goroutine:

    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    
    • 如果有 goroutine 正在等待接收数据,调用 send 函数直接发送:

      func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
          if raceenabled {
              if c.dataqsiz == 0 {
                  racesync(c, sg)
              } else {
                  // Pretend we go through the buffer, even though
                  // we copy directly. Note that we need to increment
                  // the head/tail locations only when raceenabled.
                  racenotify(c, c.recvx, nil)
                  racenotify(c, c.recvx, sg)
                  c.recvx++
                  if c.recvx == c.dataqsiz {
                      c.recvx = 0
                  }
                  c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
              }
          }
          if sg.elem != nil {
              sendDirect(c.elemtype, sg, ep)
              sg.elem = nil
          }
          gp := sg.g
          unlockf()
          gp.param = unsafe.Pointer(sg)
          sg.success = true
          if sg.releasetime != 0 {
              sg.releasetime = cputicks()
          }
          goready(gp, skip+1)
      }
      

      看一下 sendDirect 函数:

      func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
          // src is on our stack, dst is a slot on another stack.
      
          // Once we read sg.elem out of sg, it will no longer
          // be updated if the destination's stack gets copied (shrunk).
          // So make sure that no preemption points can happen between read & use.
          dst := sg.elem
          typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
          // No need for cgo write barrier checks because dst is always
          // Go memory.
          memmove(dst, src, t.size)
      }
      

      调用 memmove 从发送者 ep 直接将数据拷贝至接收者 goroutine 内存栈。

  5. 判断缓冲区是否还有空间(c.qcount < c.dataqsiz),维护 channel 自身数据结构:

    • c.qcount 是循环队列中的元素数量
    • c.dataqsiz 是循环队列的长度(容量)
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            racenotify(c, c.sendx, nil)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }
    
    1. 调用 chanbuf 函数获取底层数组中 sendx 索引的元素指针
    2. 调用 typedmemmove 将发送的值拷贝到底层数组
    3. sendx 自增,指向下一个为待发送元素准备好的底层数组格子,如果 c.sendx == c.dataqsiz 下一个索引等于循环队列长度,归零
    4. qcount 自增然后释放锁,返回
  6. 如果缓冲区已满,要么直接返回要么阻塞等待

    • 如果非阻塞发送,释放锁然后返回:

      if !block {
          unlock(&c.lock)
          return false
      }
      
    • 否则(阻塞发送):

      gp := getg()
      mysg := acquireSudog()
      mysg.releasetime = 0
      if t0 != 0 {
          mysg.releasetime = -1
      }
      // No stack splits between assigning elem and enqueuing mysg
      // on gp.waiting where copystack can find it.
      mysg.elem = ep
      mysg.waitlink = nil
      mysg.g = gp
      mysg.isSelect = false
      mysg.c = c
      gp.waiting = mysg
      gp.param = nil
      c.sendq.enqueue(mysg)
      // Signal to anyone trying to shrink our stack that we're about
      // to park on a channel. The window between when this G's status
      // changes and when we set gp.activeStackChans is not safe for
      // stack shrinking.
      atomic.Store8(&gp.parkingOnChan, 1)
      gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
      
      1. 调用 getg 函数获取当前执行的 goroutine
      2. 调用 acquireSudog 函数初始化一个 sudog 实例
      3. 调用 c.sendq.enqueue(mysg) 方法将当前 goroutine 入队至等待发送的 goroutine 队列 c.sendq
      4. 调用 gopark 暂停当前 goroutine
  7. 调用 KeepAlive(ep) 函数保活,避免被垃圾回收

  8. 唤醒等待状态的 goroutine:

    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    mysg.c = nil
    releaseSudog(mysg)
    
    1. 看一下 goroutine 是否已经被其他人唤醒了(mysg != gp.waiting
    2. mysg.c = nil 解绑 channel
    3. 调用 releaseSudog 函数释放 goroutine

接收数据

  1. val := <- ch(阻塞接收)

    对应 runtime.chanrecv1

    func chanrecv1(c *hchan, elem unsafe.Pointer) {
        chanrecv(c, elem, true)
    }
    
  2. val, ok <- ch(非阻塞接收)

    对应 runtime.chanrecv2

    func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
        _, received = chanrecv(c, elem, true)
        return
    }
    

两者都是调用 chanrecv 函数:

  1. 前置检查

    1. 检查 channel 是否为 nil,从 nil 的 channel 接收数据

      if c == nil {
          if !block {
              return
          }
          gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
          throw("unreachable")
      }
      
      • 如果是非阻塞接收,直接返回
      • 否则(阻塞接收),调用 gopark 函数将当前执行的 goroutine 暂停
    2. 快速通道——调用 empty 函数检查非阻塞接收的 channel 中是否有数据:

      func empty(c *hchan) bool {
          // c.dataqsiz is immutable.
          if c.dataqsiz == 0 {
              return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
          }
          return atomic.Loaduint(&c.qcount) == 0
      }
      
      • 如果当前 channel 无缓冲(c.dataqsiz == 0),等待发送队列中是否存在 goroutine

      • 如果是缓冲 channel 就检查循环队列中是否有数据

      • 如果 channel 未关闭,直接返回:

        if atomic.Load(&c.closed) == 0 {
            return
        }
        
      • 如果 channel 已经关闭且无数据,调用 typedmemclr 函数 清理接收者指针 ep

        if empty(c) {
            // The channel is irreversibly closed and empty.
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
        
  2. 上锁 lock(&c.lock)

  3. 检查 channel 是否关闭,如果 channel 被关闭(c.closed != 0)且循环队列中无数据(c.qcount == 0),释放锁然后调用 typedmemclr 函数清理 ep 直接返回:

    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }
    
  4. 尝试从等待发送的 goroutine 队列出队一个 goroutine,直接从发送者接收:

    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
    

    这里主要调用 recv 函数:

    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        if c.dataqsiz == 0 {
            if raceenabled {
                racesync(c, sg)
            }
            if ep != nil {
                // copy data from sender
                recvDirect(c.elemtype, sg, ep)
            }
        } else {
            // Queue is full. Take the item at the
            // head of the queue. Make the sender enqueue
            // its item at the tail of the queue. Since the
            // queue is full, those are both the same slot.
            qp := chanbuf(c, c.recvx)
            if raceenabled {
                racenotify(c, c.recvx, nil)
                racenotify(c, c.recvx, sg)
            }
            // copy data from queue to receiver
            if ep != nil {
                typedmemmove(c.elemtype, ep, qp)
            }
            // copy data from sender to queue
            typedmemmove(c.elemtype, qp, sg.elem)
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        }
        sg.elem = nil
        gp := sg.g
        unlockf()
        gp.param = unsafe.Pointer(sg)
        sg.success = true
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        goready(gp, skip+1)
    }
    
    • 如果是无缓冲 channel(c.dataqsiz == 0),接着调用 recvDirect 函数:

      func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
          // dst is on our stack or the heap, src is on another stack.
          // The channel is locked, so src will not move during this
          // operation.
          src := sg.elem
          typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
          memmove(dst, src, t.size)
      }
      

      调用 memmove 函数从发送者 goroutine 内存栈直接拷贝至接收者 ep 内存。

    • 如果 channel 有缓冲,但是缓冲区已满:

      1. 调用 chanbuf 函数根据 recvx 已接收的数据在循环队列底层数组中的索引读取循环队列的底层数组的元素

      2. 调用 typedmemmove 函数将底层数组的元素拷贝到接收者 ep

      3. 维护 channel 自身数据结构

        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
        

        当已接收的数据在循环队列底层数组中的索引达到循环队列的长度(c.recvx == c.dataqsiz),归零

    最后调用 goready 函数唤醒出队的发送 goroutine。

  5. 如果 channel 循环队列(缓冲区)还有数据(c.qcount > 0):

    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
    
    1. 调用 chanbuf 函数根据 recvx 索引在循环队列底层数组读取要接收的元素
    2. 调用 typedmemmove 函数将底层数组的元素拷贝到接收者 ep
    3. 调用 typedmemclr 函数清理循环队列中相应位置的值
    4. 维护 channel 自身数据结构,当已接收的数据在循环队列底层数组中的索引达到循环队列的长度(c.recvx == c.dataqsiz),归零
  6. 如果是非阻塞接收,释放锁然后返回:

    if !block {
        unlock(&c.lock)
        return false, false
    }
    
  7. 剩下的和发送差不多,阻塞接收:

    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    
    1. 调用 getg 函数获取当前执行的 goroutine
    2. 调用 acquireSudog 函数初始化一个 sudog 实例
    3. 调用 c.recvq.enqueue(mysg) 方法将当前 goroutine 入队等待接收的 goroutine 队列
    4. 调用 gopark 挂起该 goroutine
  8. 唤醒等待状态的 goroutine:

    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
    
    1. 看一下 goroutine 是否已经被其他人唤醒了(mysg != gp.waiting
    2. mysg.c = nil 解绑 channel
    3. 调用 releaseSudog 函数释放 goroutine

channel 关闭

close(ch) 对应 runtime.closechan

  1. 首先判断 channel 是否为 nil(c == nil),试图关闭一个 nil channel 会 panic:

    if c == nil {
        panic(plainError("close of nil channel"))
    }
    
  2. 上锁 lock(&c.lock)

  3. 如果 channel 已关闭(c.closed != 0),也会 panic:

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    
  4. 出队所有等待接收的 goroutine 并推入 gList

    var glist gList
    
    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    
  5. 出队所有等待写入的 goroutine 并推入 gList

    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    
  6. unlock(&c.lock) 释放锁

  7. 弹出 gList 中所有 goroutine 并唤醒:

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }