前言

Epoll,作为Linux下性能极佳的I/O事件通知机制,它突破了传统select/poll机制的种种限制,以更加高效、灵活的方式处理大量的并发连接。

然而,直接使用Epoll进行网络编程往往需要深入理解底层的系统调用与复杂的状态管理,这对于大多数开发者来说是一个不小的挑战。

幸运的是,Go语言通过其内置的net包以及netpoll机制,为我们提供了一套简洁而强大的网络编程接口,让开发者能够在不直接接触Epoll复杂性的情况下,享受其带来的高性能。

今天我们就来揭开Go语言是如何巧妙地抽象Epoll,将其转化为开发者手中得心应手的工具的神秘面纱。让我们一同探索,Go语言背后的这一设计哲学与实现细节。

PS:文中包含大量源码分析,但是只要理解了实现逻辑,源码其实不难理解。

Linux网络编程与IO模型

在讨论Epoll技术之前,我们需要先回顾一下Linux的网络编程与IO模型。

在Linux中,如果我们要操作Socket进行网络编程,一般需要下面的步骤:

  • 客户端:socket->connect->write/read
  • 服务端:socket->bind->listen->accept->write/read

image.png

在此基础上,如果服务端要同时操作多个socket,就需要讨论IO模型了。所谓的IO模型,指的是服务端同时操作多个Socket的方案

这里服务端的socket有两种,一种是socket(listen),是在监听时使用的套接字;另外一种是socket(established),是在accept后产生的套接字。

阻塞IO(BIO)

假设服务端要和三个客户端进行通信。阻塞IO(BIO)的方案如下:

  • 主线程监听socket(listen),负责与客户端建立新连接。
  • 建立新连接后,也就是执行accept方法后,每个socket(established)由一个线程承载,负责与客户端之间进行读写交互。
  • 每个线程内部调用read系统调用时,如果客户端数据没有发送到,就会阻塞,陷入内核态,直到客户端有数据过来。

BIO的缺点很明显:

  1. 如果来了很多新连接,就会开辟很多线程,线程资源占用大。
  2. 读写时,内核态和用户态切换开销大。

image.png

非阻塞IO(NIO)

假设客户端要和三个客户端进行通信,NIO的方案是:

  • 只会建立一个线程,一个线程同时管理多个客户端的连接。
  • 这个线程需要轮询每个连接,查看是否有数据发送过来。这么做不会阻塞。

NIO的缺点是:

  • 需要自旋轮询,计算开销大。
  • 业务逻辑的编写比较复杂。

image.png

多路复用(Epoll)

Epoll是高并发的大杀器,Linux Epoll的设计思路如下:

  • 维护一个event poll事件池
  • 每次创建新连接,就将socket注册到事件池中。
  • 通过非阻塞调用Epoll Api,Linux会返回发生的事件列表。
  • 线程只需要对这些socket进行IO操作即可。

image.png

Epoll的优点:将监听多个socket的任务从业务方下放到操作系统

缺点也很明显:Linux提供的Api很难使用,开发难度大,逻辑复杂。

关于Epoll的底层实现,这里引用小林Coding网站上的两段话,感兴趣的朋友可以前去查看原理。

Epoll 在内核里使用红黑树来跟踪进程所有待检测的文件描述字,红黑树是个高效的数据结构,增删改一般时间复杂度是 O(logn)。epoll 因为在内核维护了红黑树,可以保存所有待检测的 socket ,所以只需要传入一个待检测的 socket,减少了内核和用户空间大量的数据拷贝和内存分配。

epoll 使用事件驱动的机制,内核里维护了一个链表来记录就绪事件,当某个 socket 有事件发生时,通过回调函数内核会将其加入到这个就绪事件列表中,当用户调用 epoll_wait() 函数时,只会返回有事件发生的文件描述符的个数,不需要像 select/poll 那样轮询扫描整个 socket 集合,大大提高了检测的效率。

中庸之道:如何结合BIO和epoll的优点呢?

阻塞IO实现简单,编码更容易,然而效率比较低。多路复用效率很高,然后编码难度也高。如何才能结合阻塞模型和多路复用的优点呢?

实际上,Go语言的网络层帮我们做到了这一点。Golang网络层可以实现每个协程管理一个Socket,同时隐藏了OS的epoll的细节。即底层使用OS的多路复用IO,在协程层次使用BIO,将阻塞线程变为休眠协程。

其示意图如下:

image.png

Go是如何抽象Epoll的?

Epoll抽象层

每个操作系统都有类似于Linux的Epoll这样的设计,Go语言是跨平台的,因此首先需要一层抽象层来屏蔽底层的实现差异。各个操作系统的多路复用都具有下面这些功能:

  • 新建多路复用器:epoll_create()
  • 向多路复用器插入需要监听的事件:epoll_ctl()
  • 查询发生了什么事件:epoll_wait()

其中,Windows使用IOCP,Mac使用Kqueue,Linux使用Epoll。我们查看一下Go语言的源码,可以找到:runtime下面有netpoll.go,此外还有各种具体平台的实现:netpoll_epoll.go是Linux,还有netpoll_kqueue.gonetpoll_windows.go等。

这个netpoll.go就是Go语言用来屏蔽底层多路复用的抽象和适配层,也叫做network poller,它具有下面的Api:

  • netpollinit:对epoll_create的抽象,用于创建监听器。
  • netpollopen;对epoll_ctl的抽象,用于插入监听事件。
  • netpoll:对epoll_wait的抽象,用于查询事件。

netpollinit() 新建多路复用器

功能如下:

  • 新建Epoll
  • 新建一个pipe管道用于中断Epoll
  • 将“管道有数据到达”事件注册到Epoll。

源码分析:

func netpollinit() {
    var errno uintptr
    // 使用系统调用新建epoll
    epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
    if errno != 0 {
       println("runtime: epollcreate failed with", errno)
       throw("runtime: netpollinit failed")
    }
    // 新建了一个NIO管道,用于关闭epoll
    r, w, errpipe := nonblockingPipe()
    if errpipe != 0 {
       println("runtime: pipe failed with", -errpipe)
       throw("runtime: pipe failed")
    }
    // 插入监听管道事件:用于关闭管道
    ev := syscall.EpollEvent{
       Events: syscall.EPOLLIN,
    }
    // ...
}

netpollopen() 插入事件

  • 传入一个socket的文件描述符和pollDesc指针(这个指针是socket的详细信息)
  • pollDesc中记录了哪些协程休眠在等待这个Socket
  • 将Socket的可读、可写、断开事件注册到Epoll中。

源码分析:

// 插入事件,参数为:socket、socket描述
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
    var ev syscall.EpollEvent
    // 插入四个事件
    ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.Data)) = pd
    // 使用系统调用,底层用调用epoll_ctl
    return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}

netpoll() 查询发生了什么事件

  • 调用epoll_wait查询哪些事件发生
  • 根据pollDesc内的信息,返回哪些协程可以唤醒。注意是返回协程列表,而不是事件。

源码分析:

// 查询发生事件,参数:等待时间
func netpoll(delay int64) gList {
    // ... 
    var events [128]syscall.EpollEvent
retry:
    // 传入epoll、长度为128的事件数组(epoll会填写)
    n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
    // ...
    // 新建协程列表,遍历事件数组
    var toRun gList
    for i := int32(0); i < n; i++ {
       ev := events[i]
       if ev.Events == 0 {
          continue
       }
       // 如果是断开事件
       if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
          if ev.Events != syscall.EPOLLIN {
             println("runtime: netpoll: break fd ready for", ev.Events)
             throw("runtime: netpoll: break fd ready for something unexpected")
          }
          if delay != 0 {
             // netpollBreak could be picked up by a
             // nonblocking poll. Only read the byte
             // if blocking.
             var tmp [16]byte
             read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
             netpollWakeSig.Store(0)
          }
          continue
       }
       // 判断是否为读、写
       var mode int32
       if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
          // 可读
          mode += 'r'
       }
       if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
          // 可写
          mode += 'w'
       }
       if mode != 0 {
          // 找到socket,传入ready方法
          // 方法内部:判断socket可读还是可写,放到torun列表
          pd := *(**pollDesc)(unsafe.Pointer(&ev.Data))
          pd.setEventErr(ev.Events == syscall.EPOLLERR)
          netpollready(&toRun, pd, mode)
          // 返回协程的torun协程链表,代表哪些socket可以用的协程
       }
    }
    // 返回协程的torun协程链表
    return toRun
}

network poller如何工作

数据结构支持

pollCache:带锁的pollDesc的链表缓存

type pollCache struct {
    lock  mutex
    first *pollDesc
    // PollDesc objects must be type-stable,
    // because we can get ready notification from epoll/kqueue
    // after the descriptor is closed/reused.
    // Stale notifications are detected using seq variable,
    // seq is incremented when deadlines are changed or descriptor is reused.
}
  • first:pollDesc的链表头地址
  • lock:保证同个时间内只有1个协程能操作链表。

pollDesc:runtime包对socket的详细描述,链表节点

type pollDesc struct {
    _    sys.NotInHeap
    link *pollDesc // in pollcache, protected by pollcache.lock
    fd   uintptr   // constant for pollDesc usage lifetime
    
    atomicInfo atomic.Uint32 // atomic pollInfo

    // rg, wg are accessed atomically and hold g pointers.
    // (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
    rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
    wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil

    lock    mutex // protects the following fields
    closing bool
    user    uint32    // user settable cookie
    rseq    uintptr   // protects from stale read timers
    rt      timer     // read deadline timer (set if rt.f != nil)
    rd      int64     // read deadline (a nanotime in the future, -1 when expired)
    wseq    uintptr   // protects from stale write timers
    wt      timer     // write deadline timer
    wd      int64     // write deadline (a nanotime in the future, -1 when expired)
    self    *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}

重要字段如下:

  • link:指向下一个数据。
  • fd:socket的文件描述符
  • rg:可能是pdReady(1), pdWait(2),等待读取的协程G的地址 or pdNil
  • wg:可能是pdReady(1), pdWait(2), 等待写的协程G的地址 or pdNil

初始化

初始化的步骤如下:

  • 调用poll_runtime_pollServerInit()
  • 使用原子操作保证只初始化一次
  • 调用netpollinit()

源码分析:

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

func netpollGenericInit() {
    // 使用原子操作保证netpoll只会被初始化一次
    if netpollInited.Load() == 0 {
       lockInit(&netpollInitLock, lockRankNetpollInit)
       lock(&netpollInitLock)
       if netpollInited.Load() == 0 {
          // 使用多路复用抽象层的Api初始化,开辟一个Epoll
          netpollinit()
          netpollInited.Store(1)
       }
       unlock(&netpollInitLock)
    }
}

func netpollinited() bool {
    return netpollInited.Load() != 0
}

// Load accesses and returns the value atomically.
//go:nosplit
func (u *Uint32) Load() uint32 {
    return Load(&u.value)
}

新增监听

监听的步骤如下:

  • 调用poll_runtime_pollOpen()
  • pollcache链表中分配一个pollDesc
  • 初始化pollDesc(wg、rg为0)
  • 调用netpollopen()

源码分析:

// go编译的提示,不同包之间小写的方法可以互相调用。
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
// 新增监听,传入文件描述符
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    // 在链表分配一个新的socket描述结构体
    pd := pollcache.alloc()
    lock(&pd.lock)
    wg := pd.wg.Load()
    if wg != pdNil && wg != pdReady {
       throw("runtime: blocked write on free polldesc")
    }
    rg := pd.rg.Load()
    if rg != pdNil && rg != pdReady {
       throw("runtime: blocked read on free polldesc")
    }
    // 给pollDesc赋值
    pd.fd = fd
    pd.closing = false
    pd.setEventErr(false)
    pd.rseq++
    pd.rg.Store(pdNil)
    pd.rd = 0
    pd.wseq++
    pd.wg.Store(pdNil)
    pd.wd = 0
    pd.self = pd
    pd.publishInfo()
    unlock(&pd.lock)
    // 调用OS的api,传入文件描述符和socket描述
    errno := netpollopen(fd, pd)
    if errno != 0 {
       pollcache.free(pd)
       return nil, int(errno)
    }
    return pd, 0
}

收发数据:gc来循环调用netpoll

收发数据根据socket可否读写可以划分为两个场景:

  • 协程需要收发数据的时候,socket已经可以读写
  • 协程需要收发数据的时候,socket无法读写

场景1:Socket可以读写

socket的状态改变:

  • runtime循环调用netpoll方法(g0协程),比如系统在做GC的时候在gcstart()会顺便调用netpoll,因为垃圾回收是一个周期性行为,go顺便给它下了个hook。
  • 发现socket可以读写,给对应的rg或者wg置为pdReady(1)

协程方何时读写socket(业务方法):

  • 协程调用poll_runtime_pollWait()
  • 判断rg或者wg已经置为pdReady(1),返回0,表示socket可以操作。

场景2:Socket无法读写

socket状态的改变:

  • runtime循环调用netpoll方法(g0协程)
  • pollDesc的rg和wg不会被改变,还是0

协程方:

  • 协程调用poll_runtime_polWait()
  • 发现对应的rg或者wg为0
  • 给对应的rg或者wg设置为协程地址
  • 休眠等待

此时runtime依旧会循环调用netpoll方法:

  • 如果发现socket可以读写时,查看对应的rg或者wg
  • 如果为协程地址就返回协程地址(push到toRun列表里面)
  • 调度器开始调度对应的协程

源码分析:

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    errcode := netpollcheckerr(pd, int32(mode))
    if errcode != pollNoError {
       return errcode
    }
    // As for now only Solaris, illumos, and AIX use level-triggered IO.
    if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
       netpollarm(pd, mode)
    }
    // 主要逻辑
    for !netpollblock(pd, int32(mode), false) {
       errcode = netpollcheckerr(pd, int32(mode))
       if errcode != pollNoError {
          return errcode
       }
       // Can happen if timeout has fired and unblocked us,
       // but before we had a chance to run, timeout has been reset.
       // Pretend it has not happened and retry.
    }
    return pollNoError
}

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}
	for {
		// socket是pdReady,表示可读写
		if gpp.CompareAndSwap(pdReady, pdNil) {
			return true
		}
                // wait=2,中间状态,表示有人在操作socket
		if gpp.CompareAndSwap(pdNil, pdWait) {
			break
		}
                //....
	}
        // 如果socket不可读也不可写,休眠协程。
        // gopark的第一个成员是netpollblockcommit,会被调用,将pdWait改成协程地址。
	if waitio || netpollcheckerr(pd, mode) == pollNoError {
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
	}
	old := gpp.Swap(pdNil)
	return old == pdReady
}

小结

Network Poller是Runtime强大的工具,它抽象了操作系统底层的多路复用器操作。在Go底层,runtime不断调用netpoll方法自动监控多个Socket的状态,netpoll调用底层的操作系统Epoll完成监控。

在协程的角度,当socket状态可用,netpoll直接快速返回。当socket不可用的时候,协程会进入休眠状态,如果socket可以读写的话调度器会唤醒对应的协程。

Go语言对Epoll的抽象是其设计哲学与实用主义精神的完美体现。它让我们看到了,在追求高效与简洁的道路上,一个优秀的编程语言是如何通过智能的抽象与封装,为开发者铺设出一条通往成功的康庄大道。

目前我们已经知道了Go语言是如何检测socket状态的。然而,我们还有两个问题没有解决,第一个是这个socket从哪里来?第二个是socket可操作后,我们怎么操作?我们会在下一篇文章中分析net包,找到这两个问题的答案。感谢阅读。