Go 网络轮询器 epoll 实现 · Listen 篇

Apr 8, 2021 23:15 · 3374 words · 7 minute read Golang Linux OS

Go 运行时中支持 I/O 多路复用的网络轮询器(netpoll)在各种操作系统中都有相应的实现:

本文将基于 Linux 操作系统,聚焦 Go 网络轮询器的 I/O 多路复用底层是如何基于 epoll 实现的。

所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。

首先看一眼 src/runtime/netpoll_epoll.go ,其中定义了几个以 netpoll 为前缀的函数:

net.Listen

我们自行编写 TCP server(https://github.com/smallnest/1m-go-tcp-server/blob/master/1_simple_tcp_server/server.go) 时,一定要 net.Listen,并且传入协议和监听地址(端口),就先从这个函数入手:

func Listen(network, address string) (Listener, error) {
    var lc ListenConfig
    return lc.Listen(context.Background(), network, address)
}

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
    addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
    if err != nil {
        return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
    }
    sl := &sysListener{
        ListenConfig: *lc,
        network:      network,
        address:      address,
    }
    var l Listener
    la := addrs.first(isIPv4)
    switch la := la.(type) {
    case *TCPAddr:
        l, err = sl.listenTCP(ctx, la)
    case *UnixAddr:
        l, err = sl.listenUnix(ctx, la)
    default:
        return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
    }
    if err != nil {
        return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
    }
    return l, nil
}

顺着 sl.ListenTCP 方法找到其定义:

func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
    fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
    if err != nil {
        return nil, err
    }
    return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

internetSocket 函数的返回值变量名称看着和文件描述符有关,在 src/net/ipsock_posix.go 定义:

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
        raddr = raddr.toLocal(net)
    }
    family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
    return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
}

其返回值为 netFD 结构的指针。

netFD

netFD 是 Golang 自己封装的一个“网络文件描述符”结构:

type netFD struct {
    pfd poll.FD

    // immutable until Close
    family      int
    sotype      int
    isConnected bool // handshake completed or use of association with peer
    net         string
    laddr       Addr
    raddr       Addr
}

最主要的就是名为 pfd 的字段,是一个 poll.FD 结构,一眼就能看出来和 epoll 有关:

type FD struct {
    // Lock sysfd and serialize access to Read and Write methods.
    fdmu fdMutex

    // System file descriptor. Immutable until Close.
    Sysfd int

    // I/O poller.
    pd pollDesc

    // Writev cache.
    iovecs *[]syscall.Iovec

    // Semaphore signaled when file is closed.
    csema uint32

    // Non-zero if this file has been set to blocking mode.
    isBlocking uint32

    // Whether this is a streaming descriptor, as opposed to a
    // packet-based descriptor like a UDP socket. Immutable.
    IsStream bool

    // Whether a zero byte read indicates EOF. This is false for a
    // message based socket connection.
    ZeroReadIsEOF bool

    // Whether this is a file rather than a network socket.
    isFile bool
}

poll.FD 又包含了两个重要字段,Sysfd 就是真正的 Linux 操作系统分配的文件描述符,而 pollDesc 结构则是对底层事件驱动的封装:

type pollDesc struct {
    runtimeCtx uintptr
}

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        if ctx != 0 {
            runtime_pollUnblock(ctx)
            runtime_pollClose(ctx)
        }
        return errnoErr(syscall.Errno(errno))
    }
    pd.runtimeCtx = ctx
    return nil
}

真正的 pollDesc 结构在 src/runtime/netpoll.go 文件中:

type pollDesc struct {
    link *pollDesc // in pollcache, protected by pollcache.lock

    // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.
    // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.
    // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)
    // proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated
    // in a lock-free way by all operations.
    // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),
    // that will blow up when GC starts moving objects.
    lock    mutex // protects the following fields
    fd      uintptr
    closing bool
    everr   bool      // marks event scanning error happened
    user    uint32    // user settable cookie
    rseq    uintptr   // protects from stale read timers
    rg      uintptr   // pdReady, pdWait, G waiting for read or nil
    rt      timer     // read deadline timer (set if rt.f != nil)
    rd      int64     // read deadline
    wseq    uintptr   // protects from stale write timers
    wg      uintptr   // pdReady, pdWait, G waiting for write or nil
    wt      timer     // write deadline timer
    wd      int64     // write deadline
    self    *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}

包含一个自身类型的指针,这是要组链表的节奏。表头是一个叫 pollCache 的结构:

type pollCache struct {
    lock  mutex
    first *pollDesc
    // PollDesc objects must be type-stable,
    // because we can get ready notification from epoll/kqueue
    // after the descriptor is closed/reused.
    // Stale notifications are detected using seq variable,
    // seq is incremented when deadlines are changed or descriptor is reused.
}

但是这条以 pollDesc 为表头的链表的方法名称却与行为相反,alloc 方法从链表中取出表头指向的 pollDesc 节点,而 free 方法反而是将新的 pollDesc 节点挂接到表头。

socket

我们继续回到上面 internetSocket 函数,它在返回时又调用了 socket 函数:

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    s, err := sysSocket(family, sotype, proto)
    if err != nil {
        return nil, err
    }
    if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }
    if fd, err = newFD(s, family, sotype, net); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }

    // This function makes a network file descriptor for the
    // following applications:
    //
    // - An endpoint holder that opens a passive stream
    //   connection, known as a stream listener
    //
    // - An endpoint holder that opens a destination-unspecific
    //   datagram connection, known as a datagram listener
    //
    // - An endpoint holder that opens an active stream or a
    //   destination-specific datagram connection, known as a
    //   dialer
    //
    // - An endpoint holder that opens the other connection, such
    //   as talking to the protocol stack inside the kernel
    //
    // For stream and datagram listeners, they will only require
    // named sockets, so we can assume that it's just a request
    // from stream or datagram listeners when laddr is not nil but
    // raddr is nil. Otherwise we assume it's just for dialers or
    // the other connection holders.

    if laddr != nil && raddr == nil {
        switch sotype {
        case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
            if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
        case syscall.SOCK_DGRAM:
            if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
                fd.Close()
                return nil, err
            }
            return fd, nil
        }
    }
    if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil {
        fd.Close()
        return nil, err
    }
    return fd, nil
}

sysSocket 函数当然是要追的,应该离真正的系统调用不远了:

func sysSocket(family, sotype, proto int) (int, error) {
    s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
    // On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were
    // introduced in 2.6.27 kernel and on FreeBSD both flags were
    // introduced in 10 kernel. If we get an EINVAL error on Linux
    // or EPROTONOSUPPORT error on FreeBSD, fall back to using
    // socket without them.
    switch err {
    case nil:
        return s, nil
    default:
        return -1, os.NewSyscallError("socket", err)
    case syscall.EPROTONOSUPPORT, syscall.EINVAL:
    }

    // See ../syscall/exec_unix.go for description of ForkLock.
    syscall.ForkLock.RLock()
    s, err = socketFunc(family, sotype, proto)
    if err == nil {
        syscall.CloseOnExec(s)
    }
    syscall.ForkLock.RUnlock()
    if err != nil {
        return -1, os.NewSyscallError("socket", err)
    }
    if err = syscall.SetNonblock(s, true); err != nil {
        poll.CloseFunc(s)
        return -1, os.NewSyscallError("setnonblock", err)
    }
    return s, nil
}

socketFunc        func(int, int, int) (int, error)  = syscall.Socket

果然在这里就有 Linux 的 socket 系统调用创建一个非阻塞的 socket 并返回其文件描述符,在实例化 netFD 时要用的。

再往下看 fd.listenStream 方法:

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    var err error
    if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
        return err
    }
    var lsa syscall.Sockaddr
    if lsa, err = laddr.sockaddr(fd.family); err != nil {
        return err
    }
    if ctrlFn != nil {
        c, err := newRawConn(fd)
        if err != nil {
            return err
        }
        if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
            return err
        }
    }
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
    if err = fd.init(); err != nil {
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

listenFunc        func(int, int) error              = syscall.Listen

完成了对 socket 的 bindlisten,至此 socket 从初始化到监听都走完,就差 accept 了。

继续往下是 netFD 初始化 init

func (fd *netFD) init() error {
    return fd.pfd.Init(fd.net, true)
}

找到 poll.FD 结构的 Init 方法:

func (fd *FD) Init(net string, pollable bool) error {
    // We don't actually care about the various network types.
    if net == "file" {
        fd.isFile = true
    }
    if !pollable {
        fd.isBlocking = 1
        return nil
    }
    err := fd.pd.init(fd)
    if err != nil {
        // If we could not initialize the runtime poller,
        // assume we are using blocking mode.
        fd.isBlocking = 1
    }
    return err
}

pollDesc 结构的 init 方法,刚才就见过了:

func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        if ctx != 0 {
            runtime_pollUnblock(ctx)
            runtime_pollClose(ctx)
        }
        return errnoErr(syscall.Errno(errno))
    }
    pd.runtimeCtx = ctx
    return nil
}

终于看到和 epoll 相关的函数了 runtime_pollServerInit 还有 runtime_pollOpen

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

func netpollGenericInit() {
    if atomic.Load(&netpollInited) == 0 {
        lockInit(&netpollInitLock, lockRankNetpollInit)
        lock(&netpollInitLock)
        if netpollInited == 0 {
            netpollinit()
            atomic.Store(&netpollInited, 1)
        }
        unlock(&netpollInitLock)
    }
}

go:linkname 注释会引导编译器将 runtime_pollServerInit 函数链接至 src/runtime/netpoll.go 中的 poll_runtime_pollServerInit 函数,一开始就看到的 netpollinit 来了:

func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd < 0 {
        epfd = epollcreate(1024)
        if epfd < 0 {
            println("runtime: epollcreate failed with", -epfd)
            throw("runtime: netpollinit failed")
        }
        closeonexec(epfd)
    }
    r, w, errno := nonblockingPipe()
    if errno != 0 {
        println("runtime: pipe failed with", -errno)
        throw("runtime: pipe failed")
    }
    ev := epollevent{
        events: _EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
    if errno != 0 {
        println("runtime: epollctl failed with", -errno)
        throw("runtime: epollctl failed")
    }
    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

netpollinit 函数中创建了一个 epoll 实例,并将返回的 epoll 文件描述符赋值给全局的 epfd 变量。

epollcreate1 函数在 Golang 源码中是一串汇编代码 https://github.com/golang/go/blob/go1.16.3/src/runtime/sys_linux_amd64.s#L696-L702

#define SYS_epoll_create1	291

// int32 runtime·epollcreate1(int32 flags);
TEXT runtime·epollcreate1(SB),NOSPLIT,$0
    MOVL	flags+0(FP), DI
    MOVL	$SYS_epoll_create1, AX
    SYSCALL
    MOVL	AX, ret+8(FP)
    RET

虽然看不太懂但是 SYS_epoll_create1 与 Linux 系统调用表中的 epoll_create1 系统调用编号是一致的(291)。

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    pd := pollcache.alloc()
    lock(&pd.lock)
    if pd.wg != 0 && pd.wg != pdReady {
        throw("runtime: blocked write on free polldesc")
    }
    if pd.rg != 0 && pd.rg != pdReady {
        throw("runtime: blocked read on free polldesc")
    }
    pd.fd = fd
    pd.closing = false
    pd.everr = false
    pd.rseq++
    pd.rg = 0
    pd.rd = 0
    pd.wseq++
    pd.wg = 0
    pd.wd = 0
    pd.self = pd
    unlock(&pd.lock)

    var errno int32
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

go:linkname 注释会引导编译器将 runtime_pollOpen 函数链接至 src/runtime/netpoll.go 中的 poll_runtime_pollOpen 函数。

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

netpollopen 将 socket 文件描述符注册到 epfd 代表的 epoll 实例中。

epollctl 也是汇编代码 https://github.com/golang/go/blob/go1.16.3/src/runtime/sys_linux_amd64.s#L704-#L713

#define SYS_epoll_ctl		233

// func epollctl(epfd, op, fd int32, ev *epollEvent) int
TEXT runtime·epollctl(SB),NOSPLIT,$0
    MOVL	epfd+0(FP), DI
    MOVL	op+4(FP), SI
    MOVL	fd+8(FP), DX
    MOVQ	ev+16(FP), R10
    MOVL	$SYS_epoll_ctl, AX
    SYSCALL
    MOVL	AX, ret+24(FP)
    RET

最后我们来看一下 epoll_create1epoll_ctl 系统调用的相关文档:

  • epoll_create1

    If flags is 0, then, other than the fact that the obsolete size argument is dropped, epoll_create1() is the same as epoll_create().

    EPOLL_CLOEXEC

    Set the close-on-exec (FD_CLOEXEC) flag on the new file descriptor. See the description of the O_CLOEXEC flag in open(2) for reasons why this may be useful.

  • epoll_ctl

    This system call is used to add, modify, or remove entries in the interest list of the epoll(7) instance referred to by the file descriptor epfd. It requests that the operation op be performed for the target file descriptor, fd.

epoll_create 创建一个 epoll 实例并返回其文件描述符;epoll_ctl 注册想要监控的文件描述符和关心的 I/O 事件(_EPOLLIN_EPOLLOUT_EPOLLRDHUP_EPOLLET)到 epoll 实例上。至于 epoll_wait,将在 Accept 篇中讨论。