Go语言修炼(十三):Go语言是如何抽象Epoll的?
前言
Epoll,作为Linux下性能极佳的I/O事件通知机制,它突破了传统select/poll机制的种种限制,以更加高效、灵活的方式处理大量的并发连接。
然而,直接使用Epoll进行网络编程往往需要深入理解底层的系统调用与复杂的状态管理,这对于大多数开发者来说是一个不小的挑战。
幸运的是,Go语言通过其内置的net
包以及netpoll
机制,为我们提供了一套简洁而强大的网络编程接口,让开发者能够在不直接接触Epoll复杂性的情况下,享受其带来的高性能。
今天我们就来揭开Go语言是如何巧妙地抽象Epoll,将其转化为开发者手中得心应手的工具的神秘面纱。让我们一同探索,Go语言背后的这一设计哲学与实现细节。
PS:文中包含大量源码分析,但是只要理解了实现逻辑,源码其实不难理解。
Linux网络编程与IO模型
在讨论Epoll技术之前,我们需要先回顾一下Linux的网络编程与IO模型。
在Linux中,如果我们要操作Socket
进行网络编程,一般需要下面的步骤:
- 客户端:socket->connect->write/read
- 服务端:socket->bind->listen->accept->write/read
在此基础上,如果服务端要同时操作多个socket,就需要讨论IO模型了。所谓的IO模型,指的是服务端同时操作多个Socket的方案。
这里服务端的socket有两种,一种是socket(listen),是在监听时使用的套接字;另外一种是socket(established),是在accept后产生的套接字。
阻塞IO(BIO)
假设服务端要和三个客户端进行通信。阻塞IO(BIO)的方案如下:
- 主线程监听socket(listen),负责与客户端建立新连接。
- 建立新连接后,也就是执行accept方法后,每个socket(established)由一个线程承载,负责与客户端之间进行读写交互。
- 每个线程内部调用
read
系统调用时,如果客户端数据没有发送到,就会阻塞,陷入内核态,直到客户端有数据过来。
BIO的缺点很明显:
- 如果来了很多新连接,就会开辟很多线程,线程资源占用大。
- 读写时,内核态和用户态切换开销大。
非阻塞IO(NIO)
假设客户端要和三个客户端进行通信,NIO的方案是:
- 只会建立一个线程,一个线程同时管理多个客户端的连接。
- 这个线程需要轮询每个连接,查看是否有数据发送过来。这么做不会阻塞。
NIO的缺点是:
- 需要自旋轮询,计算开销大。
- 业务逻辑的编写比较复杂。
多路复用(Epoll)
Epoll是高并发的大杀器,Linux Epoll的设计思路如下:
- 维护一个event poll事件池
- 每次创建新连接,就将socket注册到事件池中。
- 通过非阻塞调用Epoll Api,Linux会返回发生的事件列表。
- 线程只需要对这些socket进行IO操作即可。
Epoll的优点:将监听多个socket的任务从业务方下放到操作系统。
缺点也很明显:Linux提供的Api很难使用,开发难度大,逻辑复杂。
关于Epoll的底层实现,这里引用小林Coding
网站上的两段话,感兴趣的朋友可以前去查看原理。
Epoll 在内核里使用红黑树来跟踪进程所有待检测的文件描述字,红黑树是个高效的数据结构,增删改一般时间复杂度是 O(logn)。epoll 因为在内核维护了红黑树,可以保存所有待检测的 socket ,所以只需要传入一个待检测的 socket,减少了内核和用户空间大量的数据拷贝和内存分配。
epoll 使用事件驱动的机制,内核里维护了一个链表来记录就绪事件,当某个 socket 有事件发生时,通过回调函数内核会将其加入到这个就绪事件列表中,当用户调用 epoll_wait() 函数时,只会返回有事件发生的文件描述符的个数,不需要像 select/poll 那样轮询扫描整个 socket 集合,大大提高了检测的效率。
中庸之道:如何结合BIO和epoll的优点呢?
阻塞IO实现简单,编码更容易,然而效率比较低。多路复用效率很高,然后编码难度也高。如何才能结合阻塞模型和多路复用的优点呢?
实际上,Go语言的网络层帮我们做到了这一点。Golang网络层可以实现每个协程管理一个Socket,同时隐藏了OS的epoll的细节。即底层使用OS的多路复用IO,在协程层次使用BIO,将阻塞线程变为休眠协程。
其示意图如下:
Go是如何抽象Epoll的?
Epoll抽象层
每个操作系统都有类似于Linux的Epoll这样的设计,Go语言是跨平台的,因此首先需要一层抽象层来屏蔽底层的实现差异。各个操作系统的多路复用都具有下面这些功能:
- 新建多路复用器:
epoll_create()
- 向多路复用器插入需要监听的事件:
epoll_ctl()
- 查询发生了什么事件:
epoll_wait()
其中,Windows使用IOCP,Mac使用Kqueue,Linux使用Epoll。我们查看一下Go语言的源码,可以找到:runtime
下面有netpoll.go
,此外还有各种具体平台的实现:netpoll_epoll.go
是Linux,还有netpoll_kqueue.go
、netpoll_windows.go
等。
这个netpoll.go
就是Go语言用来屏蔽底层多路复用的抽象和适配层,也叫做network poller
,它具有下面的Api:
- netpollinit:对
epoll_create
的抽象,用于创建监听器。 - netpollopen;对
epoll_ctl
的抽象,用于插入监听事件。 - netpoll:对
epoll_wait
的抽象,用于查询事件。
netpollinit() 新建多路复用器
功能如下:
- 新建Epoll
- 新建一个pipe管道用于中断Epoll
- 将“管道有数据到达”事件注册到Epoll。
源码分析:
func netpollinit() {
var errno uintptr
// 使用系统调用新建epoll
epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
if errno != 0 {
println("runtime: epollcreate failed with", errno)
throw("runtime: netpollinit failed")
}
// 新建了一个NIO管道,用于关闭epoll
r, w, errpipe := nonblockingPipe()
if errpipe != 0 {
println("runtime: pipe failed with", -errpipe)
throw("runtime: pipe failed")
}
// 插入监听管道事件:用于关闭管道
ev := syscall.EpollEvent{
Events: syscall.EPOLLIN,
}
// ...
}
netpollopen() 插入事件
- 传入一个socket的文件描述符和pollDesc指针(这个指针是socket的详细信息)
- pollDesc中记录了哪些协程休眠在等待这个Socket
- 将Socket的可读、可写、断开事件注册到Epoll中。
源码分析:
// 插入事件,参数为:socket、socket描述
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
var ev syscall.EpollEvent
// 插入四个事件
ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.Data)) = pd
// 使用系统调用,底层用调用epoll_ctl
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}
netpoll() 查询发生了什么事件
- 调用
epoll_wait
查询哪些事件发生 - 根据pollDesc内的信息,返回哪些协程可以唤醒。注意是返回协程列表,而不是事件。
源码分析:
// 查询发生事件,参数:等待时间
func netpoll(delay int64) gList {
// ...
var events [128]syscall.EpollEvent
retry:
// 传入epoll、长度为128的事件数组(epoll会填写)
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
// ...
// 新建协程列表,遍历事件数组
var toRun gList
for i := int32(0); i < n; i++ {
ev := events[i]
if ev.Events == 0 {
continue
}
// 如果是断开事件
if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
if ev.Events != syscall.EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.Events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
netpollWakeSig.Store(0)
}
continue
}
// 判断是否为读、写
var mode int32
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
// 可读
mode += 'r'
}
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
// 可写
mode += 'w'
}
if mode != 0 {
// 找到socket,传入ready方法
// 方法内部:判断socket可读还是可写,放到torun列表
pd := *(**pollDesc)(unsafe.Pointer(&ev.Data))
pd.setEventErr(ev.Events == syscall.EPOLLERR)
netpollready(&toRun, pd, mode)
// 返回协程的torun协程链表,代表哪些socket可以用的协程
}
}
// 返回协程的torun协程链表
return toRun
}
network poller如何工作
数据结构支持
pollCache:带锁的pollDesc的链表缓存
type pollCache struct {
lock mutex
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}
- first:pollDesc的链表头地址
- lock:保证同个时间内只有1个协程能操作链表。
pollDesc:runtime包对socket的详细描述,链表节点
type pollDesc struct {
_ sys.NotInHeap
link *pollDesc // in pollcache, protected by pollcache.lock
fd uintptr // constant for pollDesc usage lifetime
atomicInfo atomic.Uint32 // atomic pollInfo
// rg, wg are accessed atomically and hold g pointers.
// (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
lock mutex // protects the following fields
closing bool
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline (a nanotime in the future, -1 when expired)
wseq uintptr // protects from stale write timers
wt timer // write deadline timer
wd int64 // write deadline (a nanotime in the future, -1 when expired)
self *pollDesc // storage for indirect interface. See (*pollDesc).makeArg.
}
重要字段如下:
- link:指向下一个数据。
- fd:socket的文件描述符
- rg:可能是pdReady(1), pdWait(2),等待读取的协程G的地址 or pdNil
- wg:可能是pdReady(1), pdWait(2), 等待写的协程G的地址 or pdNil
初始化
初始化的步骤如下:
- 调用
poll_runtime_pollServerInit()
- 使用原子操作保证只初始化一次
- 调用
netpollinit()
源码分析:
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
// 使用原子操作保证netpoll只会被初始化一次
if netpollInited.Load() == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited.Load() == 0 {
// 使用多路复用抽象层的Api初始化,开辟一个Epoll
netpollinit()
netpollInited.Store(1)
}
unlock(&netpollInitLock)
}
}
func netpollinited() bool {
return netpollInited.Load() != 0
}
// Load accesses and returns the value atomically.
//go:nosplit
func (u *Uint32) Load() uint32 {
return Load(&u.value)
}
新增监听
监听的步骤如下:
- 调用
poll_runtime_pollOpen()
- 在
pollcache
链表中分配一个pollDesc
- 初始化
pollDesc
(wg、rg为0) - 调用
netpollopen()
源码分析:
// go编译的提示,不同包之间小写的方法可以互相调用。
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
// 新增监听,传入文件描述符
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
// 在链表分配一个新的socket描述结构体
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
if wg != pdNil && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != pdNil && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
// 给pollDesc赋值
pd.fd = fd
pd.closing = false
pd.setEventErr(false)
pd.rseq++
pd.rg.Store(pdNil)
pd.rd = 0
pd.wseq++
pd.wg.Store(pdNil)
pd.wd = 0
pd.self = pd
pd.publishInfo()
unlock(&pd.lock)
// 调用OS的api,传入文件描述符和socket描述
errno := netpollopen(fd, pd)
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
return pd, 0
}
收发数据:gc来循环调用netpoll
收发数据根据socket可否读写可以划分为两个场景:
- 协程需要收发数据的时候,socket已经可以读写
- 协程需要收发数据的时候,socket无法读写
场景1:Socket可以读写
socket的状态改变:
- runtime循环调用
netpoll
方法(g0协程),比如系统在做GC的时候在gcstart()
会顺便调用netpoll
,因为垃圾回收是一个周期性行为,go顺便给它下了个hook。 - 发现socket可以读写,给对应的rg或者wg置为pdReady(1)
协程方何时读写socket(业务方法):
- 协程调用
poll_runtime_pollWait()
- 判断rg或者wg已经置为pdReady(1),返回0,表示socket可以操作。
场景2:Socket无法读写
socket状态的改变:
- runtime循环调用
netpoll
方法(g0协程) - pollDesc的rg和wg不会被改变,还是0
协程方:
- 协程调用
poll_runtime_polWait()
- 发现对应的rg或者wg为0
- 给对应的rg或者wg设置为协程地址
- 休眠等待
此时runtime依旧会循环调用netpoll
方法:
- 如果发现socket可以读写时,查看对应的rg或者wg
- 如果为协程地址就返回协程地址(push到toRun列表里面)
- 调度器开始调度对应的协程
源码分析:
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 主要逻辑
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
// socket是pdReady,表示可读写
if gpp.CompareAndSwap(pdReady, pdNil) {
return true
}
// wait=2,中间状态,表示有人在操作socket
if gpp.CompareAndSwap(pdNil, pdWait) {
break
}
//....
}
// 如果socket不可读也不可写,休眠协程。
// gopark的第一个成员是netpollblockcommit,会被调用,将pdWait改成协程地址。
if waitio || netpollcheckerr(pd, mode) == pollNoError {
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
old := gpp.Swap(pdNil)
return old == pdReady
}
小结
Network Poller是Runtime强大的工具,它抽象了操作系统底层的多路复用器操作。在Go底层,runtime不断调用netpoll方法自动监控多个Socket的状态,netpoll调用底层的操作系统Epoll完成监控。
在协程的角度,当socket状态可用,netpoll直接快速返回。当socket不可用的时候,协程会进入休眠状态,如果socket可以读写的话调度器会唤醒对应的协程。
Go语言对Epoll的抽象是其设计哲学与实用主义精神的完美体现。它让我们看到了,在追求高效与简洁的道路上,一个优秀的编程语言是如何通过智能的抽象与封装,为开发者铺设出一条通往成功的康庄大道。
目前我们已经知道了Go语言是如何检测socket状态的。然而,我们还有两个问题没有解决,第一个是这个socket从哪里来?第二个是socket可操作后,我们怎么操作?我们会在下一篇文章中分析net包,找到这两个问题的答案。感谢阅读。
- 感谢你赐予我前进的力量