Go语言修炼(十二):高并发下的通信方式Channel
前言
想象一下,在一个繁忙的交通枢纽(业务系统),车辆(Go协程)川流不息,它们需要高效、有序地交换信息以完成各自的使命。
而Channel
就像是这些车辆之间的专用通道,既保证了信息的准确传递,又避免了交通拥堵和混乱。通过Channel
,Go协程之间可以安全地进行数据交换,无需担心数据竞争和同步问题,让开发者能够专注于业务逻辑的实现,而非繁琐的并发控制。
Channel的使用和设计理念
Channel的声明
- 无缓冲Channel的申请:
make(chan int)
或make(chan bool,0)
- 有缓冲Channel的申请:
make(chan string,2)
Channel的基本使用
- 向Channel发送数据x:
ch <- x
- 从Channel接收数据:
x = <- ch
- 从Channel接收数据并丢弃:
<- ch
Channel常见错误用法:无缓冲Channel阻塞
func main(){
ch := make(chan string)
ch <- "hello" // 阻塞在这里
fmt.Println(<-ch)
}
Channel的设计理念
Channel的理念有一句非常经典的话:“不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。”
所谓共享内存,典型就是传入一个变量的指针并修改之。通信的方式,则是直接从Channel中拿数据。
这么做很多优势:
- 避免协程竞争和数据冲突的问题
- 是一种更高级的抽象,可以降低开发难度,增加程序的可读性
- 这种编程方式解耦了模块,增强了程序的扩展性和可维护性。
Channel的底层数据结构
在查看Channel的底层数据结构之前,我们从Channel的使用过程已经可以大致绘制出其结构的示意图了:
按照之前的使用,Channel应该有一个读和写协程的等待队列,一个缓存区。
我们查看runtime/chan.go/hchan
结构体:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
里面的重要字段如下:
- 环形缓冲区:之所以设计为环形,是因为可以大幅度降低GC的开销。
qcount
:缓冲区数据数量。dataqsiz
:缓冲区的大小。buf
:指针,指向Buf的第一个数据。elemsize
:类型大小elemtype
:数据类型
- 发送队列和接收队列:
sendx
、sendq
、recvq
、recvx
waitq
为链表,里面记录链表头和链表尾
- 互斥锁:
lock
,用于保护hchan
结构体本身。Channel
并不是无锁的。在塞入数据和取出数据的时候需要加锁,开销不大。
- 状态值:
close
,0为开启,1为关闭。
Channel的工作原理与算法
发送数据
c<-是一个Go语言的语法糖,在编译阶段,c<-
会转化为runtime.chansend1()
。
channel的发送数据情景可以分为三种:
- 直接发送
- 放入缓存
- 休眠等待
直接发送
状态:数据发送前,已经有协程G在休眠等待(Receive Queue)。此时缓存必然是空的,不用考虑。
算法:数据直接拷贝给G的接收变量,唤醒G。
算法实现:
- 从队列取出一个等待接收的G
- 将数据直接拷贝到接收变量的G里面
- 唤醒G
相关源码
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
// 从队列取出等待接收的协程
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 将数据拷贝到sudog的elem字段
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// ...省略一些逻辑
// 唤醒G协程
goready(gp, skip+1)
}
// 将数据拷贝到sg的elem
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
放入缓存
状态:没有G在休眠,并且缓冲区有空间。
算法:直接将数据放到缓冲区中。
算法实现:
- 获取可存入的缓冲的地址。
- 数据拷贝到缓冲区地址
- 维护索引
相关源码:
// chansend方法
if c.qcount < c.dataqsiz {
// 缓冲区还有空间
qp := chanbuf(c, c.sendx)
// 数据移动到缓冲区
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 维护索引
c.qcount++
unlock(&c.lock)
return true
}
休眠等待
状态:没有G在休眠等待,而且没有缓冲区或者缓冲区满了。
算法:协程进入发送队列,休眠等待。
算法实现:
- 协程包装为sudog
- 将sudog放入sendq队列
- 休眠并解锁
- 被唤醒后,数据已经被取走了,维护其它数据
相关源码
// chansend方法
// 拿到sudog结构体
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 放入发送队列
c.sendq.enqueue(mysg)
// 休眠
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
数据接收
接收数据
接收数据的源码位置:
- 编译阶段,
i<-c
会转化为runtime.chanrecv1()
,i,ok<-c
转化为chanrecv2()
- 最后会调用
chanrecv()
Channel接收数据的情形:
- 有等待的协程,从协程接收。
- 有等待的协程,从缓存接收。
- 接收缓存
- 阻塞接收
从等待的协程接收
状态:已经有协程处于发送队列之中;Channel没有缓存。
算法:将数据直接从发送队列的协程中拷贝过来。
算法实现:
- 判断是否有协程在发送队列等待,进入recv方法
- 判断这个Channel是不是无缓存
- 直接从发送队列中的协程中取走数据,顺便唤醒这个协程。
源码分析
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// block被写死,就是true
lock(&c.lock)
// 如果发送队列里面有协程在等待
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
// 缓冲没有数据,直接从等待队列中的协程拷贝
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
}
// ...
// 唤醒等待队列的协程
goready(gp, skip+1)
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
有等待的协程,从缓存接收
状态:有协程在发送队列里面,但是Channel的缓冲里面有数据。
算法:从缓冲取走一个数据,将休眠的协程从等待队列放进缓存,唤醒协程。
算法实现:
- 判断如果有协程在发送队列等待,进入recv(和前面一样)
- 判断channel是否有缓存
- 如果有,从缓存取走一个数据
- 将发送队列里面的协程数据放入缓存,唤醒这个协程
源码分析
// 第一步的逻辑是一样的
// 我们直接看recv方法里面的代码
if c.dataqsiz == 0 {
// 缓存为空的逻辑
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
//缓存不为空,从缓存里面拿数据
qp := chanbuf(c, c.recvx)
// 数据从缓存拷贝到接收者
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将发送者队列中的协程数据拷贝到缓存
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
// ...
// 唤醒协程
goready(gp, skip+1)
接收缓存
状态:没有协程在发送队列里面,但是缓存有数据。
算法:直接从缓存里面取走数据。
算法实现:
- 判断队列没有发送协程
- 从缓存里面拷贝数据到接收者
源码分析
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c.qcount > 0 {
// 直接从缓存里面拿到数据
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
}
阻塞接收
状态:没有协程在发送队列,没有缓冲空闲区。
算法:将协程自身进入接受队列,休眠等待。
算法实现:
- 判断是否发送队列没有协程在等待
- 判断Channel是否无缓冲
- 将接受协程包装为sudog,放入接受等待队列,gopark休眠
- 唤醒时,发送的协程已经把数据拷贝到位。
源码分析
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// ...
c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 唤醒后
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
非阻塞channel的用法
select
案例:在下方代码中,我们从两个管道里面读数据和写数据。
在没有select之前,我们的程序会阻塞到这些读写channel的语句上。如果我们用了select,要是无法从case语句中读写channel,就会走default,不会阻塞。
func main() {
c1 := make(chan int, 5)
c2 := make(chan int)
select {
case <-c1: // 如果没有select,这里会阻塞
fmt.Println("从C1接收到数据")
case c2 <- 1:
fmt.Println("将数据发送到C2")
default: // select保证了这两个走不了直接走default
fmt.Println("None")
}
}
select的原理
编译后的代码会判断,是否同时存在接受、发送、默认路径。
- 首先查看是否有可以立即执行的case
- 没有的话,有default就走default
- 没有default的话,会将自己注册到每个case语句的队列里面
timer
timer会在倒计时结束的时候,向t.C
放入数据。
func main() {
t := time.NewTimer(time.Second)
t.C
}
timer适合做定时相关的任务。
- 感谢你赐予我前进的力量