Go 网络轮询器 epoll 实现 · Accept 篇

Apr 27, 2021 23:30 · 2854 words · 6 minute read Golang Linux OS

我们自行编写 TCP server(https://github.com/smallnest/1m-go-tcp-server/blob/master/1_simple_tcp_server/server.go)时,总是会在一个死循环中 Accept建立连接。本文将着眼于 Accept 方法的实现来继续挖掘 Go 运行时中的 I/O 多路复用的网络轮询器(netpoll)。

Golang 中 TCP 的 Accept 方法在 src/net/tcpsock.go 文件中:

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
    if !l.ok() {
        return nil, syscall.EINVAL
    }
    c, err := l.accept()
    if err != nil {
        return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
    }
    return c, nil
}

func (ln *TCPListener) accept() (*TCPConn, error) {
    fd, err := ln.fd.accept()
    if err != nil {
        return nil, err
    }
    tc := newTCPConn(fd)
    if ln.lc.KeepAlive >= 0 {
        setKeepAlive(fd, true)
        ka := ln.lc.KeepAlive
        if ln.lc.KeepAlive == 0 {
            ka = defaultTCPKeepAlive
        }
        setKeepAlivePeriod(fd, ka)
    }
    return tc, nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
    d, rsa, errcall, err := fd.pfd.Accept()
    if err != nil {
        if errcall != "" {
            err = wrapSyscallError(errcall, err)
        }
        return nil, err
    }

    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
        poll.CloseFunc(d)
        return nil, err
    }
    if err = netfd.init(); err != nil {
        netfd.Close()
        return nil, err
    }
    lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
    return netfd, nil
}

层层嵌套 accept。我们再来回顾一下,netFD 是 Go 自己封装的一个“网络文件描述符”结构:

type netFD struct {
    pfd poll.FD

    // immutable until Close
    family      int
    sotype      int
    isConnected bool // handshake completed or use of association with peer
    net         string
    laddr       Addr
    raddr       Addr
}

我们找到 netFD 的 poll.FD.Accept 方法定义:

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    if err := fd.readLock(); err != nil {
        return -1, nil, "", err
    }
    defer fd.readUnlock()

    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return -1, nil, "", err
    }
    for {
        s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        case syscall.EINTR:
            continue
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        case syscall.ECONNABORTED:
            // This means that a socket on the listen
            // queue was closed before we Accept()ed it;
            // it's a silly error, so try again.
            continue
        }
        return -1, nil, errcall, err
    }
}

for 循环中嵌套的 accept 以真正的文件描述符为入参,应该很接近底层了:

// Accept4Func is used to hook the accept4 call.
var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4

// Wrapper around the accept system call that marks the returned file
// descriptor as nonblocking and close-on-exec.
func accept(s int) (int, syscall.Sockaddr, string, error) {
    ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
    // On Linux the accept4 system call was introduced in 2.6.28
    // kernel and on FreeBSD it was introduced in 10 kernel. If we
    // get an ENOSYS error on both Linux and FreeBSD, or EINVAL
    // error on Linux, fall back to using accept.
    switch err {
    case nil:
        return ns, sa, "", nil
    default: // errors other than the ones listed
        return -1, sa, "accept4", err
    case syscall.ENOSYS: // syscall missing
    case syscall.EINVAL: // some Linux use this instead of ENOSYS
    case syscall.EACCES: // some Linux use this instead of ENOSYS
    case syscall.EFAULT: // some Linux use this instead of ENOSYS
    }

    // See ../syscall/exec_unix.go for description of ForkLock.
    // It is probably okay to hold the lock across syscall.Accept
    // because we have put fd.sysfd into non-blocking mode.
    // However, a call to the File method will put it back into
    // blocking mode. We can't take that risk, so no use of ForkLock here.
    ns, sa, err = AcceptFunc(s)
    if err == nil {
        syscall.CloseOnExec(ns)
    }
    if err != nil {
        return -1, nil, "accept", err
    }
    if err = syscall.SetNonblock(ns, true); err != nil {
        CloseFunc(ns)
        return -1, nil, "setnonblock", err
    }
    return ns, sa, "", nil
}

Accept4Func 就是真正的 accept4 系统调用了。在 Go 中,将 socket 设置成了 SOCK_NONBLOCK 也就是非阻塞 I/O。

所以上一层的 Accept 函数中,s, rsa, errcall, err := accept(fd.Sysfd) 函数调用会直接返回,因为是非阻塞的。但是,如果socket 被标记成非阻塞的并且没有任何连接进来,就会抛出 EAGAIN 错误。

The socket is marked nonblocking and no connections are present to be accepted. POSIX.1-2001 and POSIX.1-2008 allow either error to be returned for this case, and do not require these constants to have the same value, so a portable application should check for both possibilities.

这个错误将命中 switch 的第二个 case

if fd.pd.pollable() {
    if err = fd.pd.waitRead(fd.isFile); err == nil {
        continue
    }
}

接着我们来看 pollDesc 结构的 waitRead 方法:

func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
    if pd.runtimeCtx == 0 {
        return errors.New("waiting for unsupported file type")
    }
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}

根据 go:linkname 注释,runtime_pollWait 实际上就是 src/runtime/netpoll.go 中的 poll_runtime_pollWait 函数:

// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.
// This returns an error code; the codes are defined above.
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    errcode := netpollcheckerr(pd, int32(mode))
    if errcode != pollNoError {
        return errcode
    }
    // As for now only Solaris, illumos, and AIX use level-triggered IO.
    if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
        netpollarm(pd, mode)
    }
    for !netpollblock(pd, int32(mode), false) {
        errcode = netpollcheckerr(pd, int32(mode))
        if errcode != pollNoError {
            return errcode
        }
        // Can happen if timeout has fired and unblocked us,
        // but before we had a chance to run, timeout has been reset.
        // Pretend it has not happened and retry.
    }
    return pollNoError
}

func netpollcheckerr(pd *pollDesc, mode int32) int {
    if pd.closing {
        return pollErrClosing
    }
    if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
        return pollErrTimeout
    }
    // Report an event scanning error only on a read event.
    // An error on a write event will be captured in a subsequent
    // write call that is able to report a more specific error.
    if mode == 'r' && pd.everr {
        return pollErrNotPollable
    }
    return pollNoError
}

// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    // set the gpp semaphore to pdWait
    for {
        old := *gpp
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }

    // need to recheck error states after setting gpp to pdWait
    // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
    if waitio || netpollcheckerr(pd, mode) == 0 {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent pdReady notification
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

poll_runtime_pollWait 方法会检查当前 pollDesc 对象的 netFD 对应的文件描述符是否有 I/O 事件发生。netpollblock 函数检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady 状态;否则就挂起(gopark)当前 goroutine 并持续等待 I/O 事件发生。

// pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
// goroutines respectively. The semaphore can be in the following states:
// pdReady - io readiness notification is pending;
//           a goroutine consumes the notification by changing the state to nil.
// pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
//          the goroutine commits to park by changing the state to G pointer,
//          or, alternatively, concurrent io notification changes the state to pdReady,
//          or, alternatively, concurrent timeout/close changes the state to nil.
// G pointer - the goroutine is blocked on the semaphore;
//             io notification or timeout/close changes the state to pdReady or nil respectively
//             and unparks the goroutine.
// nil - none of the above.
const (
    pdReady uintptr = 1
    pdWait  uintptr = 2
)

netpollblockcommit 函数会在 groutine 被挂起时首先被回调:

func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
    r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    if r {
        // Bump the count of goroutines waiting for the poller.
        // The scheduler uses this to decide whether to block
        // waiting for the poller if there is nothing else to do.
        atomic.Xadd(&netpollWaiters, 1)
    }
    return r
}

把当前 groutine 的抽象数据结构 g 存入 gpp 指针中,因为后面要用到。

这样 netpoll 通过 gopark 在 groutine 中模拟出了阻塞 I/O 的效果,goroutine 挂起后,会被放置在等待队列中等待唤醒。

Listen 篇中剩下的 epollwait 函数会唤醒等待那些 groutine:

// int32 runtime·epollwait(int32 epfd, EpollEvent *ev, int32 nev, int32 timeout);
TEXT runtime·epollwait(SB),NOSPLIT,$0
    // This uses pwait instead of wait, because Android O blocks wait.
    MOVL	epfd+0(FP), DI
    MOVQ	ev+8(FP), SI
    MOVL	nev+16(FP), DX
    MOVL	timeout+20(FP), R10
    MOVQ	$0, R8
    MOVL	$SYS_epoll_pwait, AX
    SYSCALL
    MOVL	AX, ret+24(FP)
    RET

netpoll 函数会调用 epollwait 来获取就绪的文件描述符列表:

func netpoll(delay int64) gList {
    if epfd == -1 {
        return gList{}
    }
    var waitms int32
    if delay < 0 {
        waitms = -1
    } else if delay == 0 {
        waitms = 0
    } else if delay < 1e6 {
        waitms = 1
    } else if delay < 1e15 {
        waitms = int32(delay / 1e6)
    } else {
        // An arbitrary cap on how long to wait for a timer.
        // 1e9 ms == ~11.5 days.
        waitms = 1e9
    }
    var events [128]epollevent
retry:
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        if n != -_EINTR {
            println("runtime: epollwait on fd", epfd, "failed with", -n)
            throw("runtime: netpoll failed")
        }
        // If a timed sleep was interrupted, just return to
        // recalculate how long we should sleep now.
        if waitms > 0 {
            return gList{}
        }
        goto retry
    }
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }

        if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
            if ev.events != _EPOLLIN {
                println("runtime: netpoll: break fd ready for", ev.events)
                throw("runtime: netpoll: break fd ready for something unexpected")
            }
            if delay != 0 {
                // netpollBreak could be picked up by a
                // nonblocking poll. Only read the byte
                // if blocking.
                var tmp [16]byte
                read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
                atomic.Store(&netpollWakeSig, 0)
            }
            continue
        }

        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.everr = false
            if ev.events == _EPOLLERR {
                pd.everr = true
            }
            netpollready(&toRun, pd, mode)
        }
    }
    return toRun
}

func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}

netpoll 拿到就绪的文件描述符列表后,就要将每个文件描述符对应的 groutine 添加到 gList 中。

我们全局搜索一下 netpoll(,发现该函数只在 src/runtime/proc.go 文件中用到,这是 Go 运行时调度器部分的相关代码:

 1154  	mp := acquirem() // disable preemption because it can be holding p in a local var
 1155  	if netpollinited() {
 1156: 		list := netpoll(0) // non-blocking
 1157  		injectglist(&list)
 1158  	}
 ....
 2603  	// anyway.
 2604  	if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
 2605: 		if list := netpoll(0); !list.empty() { // non-blocking
 2606  			gp := list.pop()
 2607  			injectglist(&list)
 ....
 2877  			delta = 0
 2878  		}
 2879: 		list := netpoll(delta) // block until new work is available
 2880  		atomic.Store64(&sched.pollUntil, 0)
 2881  		atomic.Store64(&sched.lastpoll, uint64(nanotime()))
 ....
 2931  	}
 2932  	if netpollinited() && atomic.Load(&netpollWaiters) > 0 && sched.lastpoll != 0 {
 2933: 		if list := netpoll(0); !list.empty() {
 2934  			injectglist(&list)
 2935  			return true
 ....
 5187  		if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
 5188  			atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
 5189: 			list := netpoll(0) // non-blocking - returns list of goroutines
 5190  			if !list.empty() {
 5191  				// Need to decrement number of idle locked M's

Go 在多种场景下都可能会调用 netpoll 检查文件描述符状态。寻找到 I/O 就绪的 socket 文件描述符,并找到这些 socket 文件描述符对应的轮询器中附带的信息,根据这些信息将之前等待这些 socket 文件描述符就绪的 goroutine 状态修改为 _Grunnable。执行完 netpoll 之后,会返回一个就绪文件描述符列表对应的 goroutine 列表,接下来将就绪的 goroutine 加入到调度队列中,等待调度运行。

如此,就将 I/O 事件与 goroutine 的调度深度结合在了一起。

而最重要的是,netpoll 在 Linux 系统调用时使用了非阻塞 I/O 避免了 Go 应用程序进程进入阻塞状态,此时应用程序代码还是在执行的,这样可以借助强大的 Go 运行时调度器来调度并发任务了。