Go 网络轮询器 epoll 实现 · Accept 篇
Apr 27, 2021 23:30 · 2854 words · 6 minute read
我们自行编写 TCP server(https://github.com/smallnest/1m-go-tcp-server/blob/master/1_simple_tcp_server/server.go)时,总是会在一个死循环中 Accept
来建立连接。本文将着眼于 Accept
方法的实现来继续挖掘 Go 运行时中的 I/O 多路复用的网络轮询器(netpoll)。
Golang 中 TCP 的 Accept
方法在 src/net/tcpsock.go 文件中:
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
层层嵌套 accept
。我们再来回顾一下,netFD 是 Go 自己封装的一个“网络文件描述符”结构:
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
我们找到 netFD 的 poll.FD.Accept
方法定义:
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN:
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}
for
循环中嵌套的 accept
以真正的文件描述符为入参,应该很接近底层了:
// Accept4Func is used to hook the accept4 call.
var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4
// Wrapper around the accept system call that marks the returned file
// descriptor as nonblocking and close-on-exec.
func accept(s int) (int, syscall.Sockaddr, string, error) {
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
// On Linux the accept4 system call was introduced in 2.6.28
// kernel and on FreeBSD it was introduced in 10 kernel. If we
// get an ENOSYS error on both Linux and FreeBSD, or EINVAL
// error on Linux, fall back to using accept.
switch err {
case nil:
return ns, sa, "", nil
default: // errors other than the ones listed
return -1, sa, "accept4", err
case syscall.ENOSYS: // syscall missing
case syscall.EINVAL: // some Linux use this instead of ENOSYS
case syscall.EACCES: // some Linux use this instead of ENOSYS
case syscall.EFAULT: // some Linux use this instead of ENOSYS
}
// See ../syscall/exec_unix.go for description of ForkLock.
// It is probably okay to hold the lock across syscall.Accept
// because we have put fd.sysfd into non-blocking mode.
// However, a call to the File method will put it back into
// blocking mode. We can't take that risk, so no use of ForkLock here.
ns, sa, err = AcceptFunc(s)
if err == nil {
syscall.CloseOnExec(ns)
}
if err != nil {
return -1, nil, "accept", err
}
if err = syscall.SetNonblock(ns, true); err != nil {
CloseFunc(ns)
return -1, nil, "setnonblock", err
}
return ns, sa, "", nil
}
Accept4Func
就是真正的 accept4 系统调用了。在 Go 中,将 socket 设置成了 SOCK_NONBLOCK
也就是非阻塞 I/O。
所以上一层的 Accept
函数中,s, rsa, errcall, err := accept(fd.Sysfd)
函数调用会直接返回,因为是非阻塞的。但是,如果socket 被标记成非阻塞的并且没有任何连接进来,就会抛出 EAGAIN 错误。
The socket is marked nonblocking and no connections are present to be accepted. POSIX.1-2001 and POSIX.1-2008 allow either error to be returned for this case, and do not require these constants to have the same value, so a portable application should check for both possibilities.
这个错误将命中 switch
的第二个 case
:
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
接着我们来看 pollDesc
结构的 waitRead
方法:
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
根据 go:linkname
注释,runtime_pollWait
实际上就是 src/runtime/netpoll.go 中的 poll_runtime_pollWait
函数:
// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.
// This returns an error code; the codes are defined above.
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}
func netpollcheckerr(pd *pollDesc, mode int32) int {
if pd.closing {
return pollErrClosing
}
if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
return pollErrTimeout
}
// Report an event scanning error only on a read event.
// An error on a write event will be captured in a subsequent
// write call that is able to report a more specific error.
if mode == 'r' && pd.everr {
return pollErrNotPollable
}
return pollNoError
}
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to pdWait
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, membarrier, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
poll_runtime_pollWait
方法会检查当前 pollDesc 对象的 netFD 对应的文件描述符是否有 I/O 事件发生。netpollblock
函数检查 pollDesc 对象的 rg/wg 信号量是否处于 pdReady
状态;否则就挂起(gopark
)当前 goroutine 并持续等待 I/O 事件发生。
// pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
// goroutines respectively. The semaphore can be in the following states:
// pdReady - io readiness notification is pending;
// a goroutine consumes the notification by changing the state to nil.
// pdWait - a goroutine prepares to park on the semaphore, but not yet parked;
// the goroutine commits to park by changing the state to G pointer,
// or, alternatively, concurrent io notification changes the state to pdReady,
// or, alternatively, concurrent timeout/close changes the state to nil.
// G pointer - the goroutine is blocked on the semaphore;
// io notification or timeout/close changes the state to pdReady or nil respectively
// and unparks the goroutine.
// nil - none of the above.
const (
pdReady uintptr = 1
pdWait uintptr = 2
)
netpollblockcommit
函数会在 groutine 被挂起时首先被回调:
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
把当前 groutine 的抽象数据结构 g
存入 gpp
指针中,因为后面要用到。
这样 netpoll 通过 gopark
在 groutine 中模拟出了阻塞 I/O 的效果,goroutine 挂起后,会被放置在等待队列中等待唤醒。
而 Listen 篇中剩下的 epollwait
函数会唤醒等待那些 groutine:
// int32 runtime·epollwait(int32 epfd, EpollEvent *ev, int32 nev, int32 timeout);
TEXT runtime·epollwait(SB),NOSPLIT,$0
// This uses pwait instead of wait, because Android O blocks wait.
MOVL epfd+0(FP), DI
MOVQ ev+8(FP), SI
MOVL nev+16(FP), DX
MOVL timeout+20(FP), R10
MOVQ $0, R8
MOVL $SYS_epoll_pwait, AX
SYSCALL
MOVL AX, ret+24(FP)
RET
netpoll
函数会调用 epollwait
来获取就绪的文件描述符列表:
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
}
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
netpollready(&toRun, pd, mode)
}
}
return toRun
}
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
netpoll
拿到就绪的文件描述符列表后,就要将每个文件描述符对应的 groutine 添加到 gList
中。
我们全局搜索一下 netpoll(
,发现该函数只在 src/runtime/proc.go 文件中用到,这是 Go 运行时调度器部分的相关代码:
1154 mp := acquirem() // disable preemption because it can be holding p in a local var
1155 if netpollinited() {
1156: list := netpoll(0) // non-blocking
1157 injectglist(&list)
1158 }
....
2603 // anyway.
2604 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
2605: if list := netpoll(0); !list.empty() { // non-blocking
2606 gp := list.pop()
2607 injectglist(&list)
....
2877 delta = 0
2878 }
2879: list := netpoll(delta) // block until new work is available
2880 atomic.Store64(&sched.pollUntil, 0)
2881 atomic.Store64(&sched.lastpoll, uint64(nanotime()))
....
2931 }
2932 if netpollinited() && atomic.Load(&netpollWaiters) > 0 && sched.lastpoll != 0 {
2933: if list := netpoll(0); !list.empty() {
2934 injectglist(&list)
2935 return true
....
5187 if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
5188 atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
5189: list := netpoll(0) // non-blocking - returns list of goroutines
5190 if !list.empty() {
5191 // Need to decrement number of idle locked M's
Go 在多种场景下都可能会调用 netpoll 检查文件描述符状态。寻找到 I/O 就绪的 socket 文件描述符,并找到这些 socket 文件描述符对应的轮询器中附带的信息,根据这些信息将之前等待这些 socket 文件描述符就绪的 goroutine 状态修改为
_Grunnable
。执行完 netpoll 之后,会返回一个就绪文件描述符列表对应的 goroutine 列表,接下来将就绪的 goroutine 加入到调度队列中,等待调度运行。
如此,就将 I/O 事件与 goroutine 的调度深度结合在了一起。
而最重要的是,netpoll 在 Linux 系统调用时使用了非阻塞 I/O 避免了 Go 应用程序进程进入阻塞状态,此时应用程序代码还是在执行的,这样可以借助强大的 Go 运行时调度器来调度并发任务了。