Go源码阅读——runtime.chan
1. chan结构体定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq
lock mutex }
type waitq struct { first *sudog last *sudog }
|
可以看到,chan
底层就是一个循环队列。
2. chan的创建
在 Go 中我们使用 make(chan T, cap)
来创建 channel,make 语法会在编译时,转换为 makechan64
和 makechan
:
1 2 3 4 5 6 7
| func makechan64(t *chantype, size int64) *hchan { if int64(int(size)) != size { panic(plainError("makechan: size out of range")) }
return makechan(t, int(size)) }
|
makechan64
主要是做了一下检查,最终还是会调用 makechan
,在看 makechan
源码之前,我们先来看两个全局常量,接下来会用到:
1 2 3 4
| const ( maxAlign = 8 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) )
|
maxAlign
是内存对齐的最大值,这个等于 64 位 CPU 下的 cacheline 的大小
hchanSize
计算 unsafe.Sizeof(hchan{})
最近的 8 的倍数
2.1. #真正创建chan——#makechan
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| func makechan(t *chantype, size int) *hchan { elem := t.elem
if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") }
mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) }
var c *hchan switch { case mem == 0: c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case elem.ptrdata == 0: c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: c = new(hchan) c.buf = mallocgc(mem, elem, true) }
c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan)
return c }
|
因此,创建的过程可以总结为如下:
- 创建时会做一些检查
- 元素大小不能超过 64K
- 元素的对齐大小不能超过 maxAlign 也就是 8 字节
- 计算出来的内存是否超过限制
- 创建时的策略
- 如果是无缓冲的 channel,会直接给 hchan 分配内存
- 如果是有缓冲的 channel,并且元素不包含指针,那么会为 hchan 和底层数组分配一段连续的地址
- 如果是有缓冲的 channel,并且元素包含指针,那么会为 hchan 和底层数组分别分配地址
3. 发送数据
3.1. 发送数据前检查——#chansend
我们在 x <- chan T
进行发送数据的时候最终会被编译成 chansend1
:
1 2 3
| func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) }
|
而 chansend1
最终还是调用了 chansend
主要的逻辑都在 chansend
上面,注意看下方源码和注释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") }
if !block && c.closed == 0 && full(c) { return false }
lock(&c.lock)
if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
if c.qcount < c.dataqsiz { qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz { c.sendx = 0 }
c.qcount++ unlock(&c.lock) return true }
if !block { unlock(&c.lock) return false }
gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) }
mysg.c = nil releaseSudog(mysg) return true }
|
3.2. 如果有goroutine急着接收数据——#send
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() }
goready(gp, skip+1) }
|
3.3. 发送数据总结
向 channel 中发送数据时大概分为两大块,检查和数据发送,而数据发送又分为三种情况:
- 如果 channel 的
recvq
存在阻塞等待的接收数据的 goroutine 那么将会直接将数据发送给第一个等待的 goroutine
- 这里会直接将数据拷贝到
x <-ch
接收者的变量 x
上
- 然后将接收者的 Goroutine 修改为可运行状态,并把它放到发送方所在处理器的 runnext 上等待下一次调度时执行。
- 如果 channel 是有缓冲的,并且缓冲区没有满,这个时候就会把数据放到缓冲区中
- 如果 channel 的缓冲区满了,这个时候就会走阻塞发送的流程,获取到 sudog 之后将当前 Goroutine 挂起等待唤醒,唤醒后将相关的数据解绑,回收掉 sudog
4. 接收数据
在 Go 中接收 channel 数据有两种方式
x <- ch
编译时会被转换为 chanrecv1
x, ok <- ch
编译时会被转换为 chanrecv2
chanrecv1
和 chanrecv2
没有多大区别,只是 chanrecv2
比 chanrecv1
多了一个返回值,最终都是调用的 chanrecv
来实现的接收数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
|
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
if !block && empty(c) { if atomic.Load(&c.closed) == 0 { return } if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } }
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }
if c.qcount > 0 { qp := chanbuf(c, c.recvx)
if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
if !block { unlock(&c.lock) return false, false }
gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil
c.recvq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }
|
4.1. 如果有goroutine急着发送数据——#recv
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if ep != nil { recvDirect(c.elemtype, sg, ep) } } else {
qp := chanbuf(c, c.recvx)
if ep != nil { typedmemmove(c.elemtype, ep, qp) }
typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
|
4.2. 接收数据总结
数据接收和发送其实大同小异,也是分为检查和数据接收,数据接收又分三种情况
- 直接获取数据,如果当前有阻塞的发送者 Goroutine 走这条路
- 如果是无缓冲 channel,直接从发送者那里把数据拷贝给接收变量
- 如果是有缓冲 channel,并且 channel 已经满了,就先从 channel 的底层数组拷贝数据,再把阻塞的发送者 Goroutine 的数据拷贝到 channel 的循环队列中
- 从 channel 的缓冲中获取数据,有缓冲 channel 并且缓存队列有数据时走这条路
- 阻塞接收,剩余情况走这里
- 和发送类似,先获取当前 Goroutine 信息,构造 sudog 加入到 channel 的 recvq 上
- 然后休眠当前 Goroutine 等待唤醒
- 唤醒后做一些清理工作,释放 sudog 返回
5. 关闭chan
我们使用 close(ch)
来关闭 channel 最后会调用 runtime 中的 closechan
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) }
lock(&c.lock)
if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) }
c.closed = 1
var glist gList
for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil
glist.push(gp) }
for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock)
for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
|
5.1. 关闭chan总结
- 关闭一个 nil 的 channel 和已关闭了的 channel 都会导致 panic
- 关闭 channel 后会释放所有因为 channel 而阻塞的 Goroutine
6. 案例说明(好案例)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func gA(a <-chan int) { val := <-a fmt.Println("g1 received data:", val) return }
func gB(b <-chan int) { val := <-b fmt.Println("g2 received data:", val) return }
func main() { ch := make(chan int) go gA(ch) go gB(ch) ch <- 3 time.Sleep(time.Second) }
|
首先创建了一个无缓冲的 channel,接着启动两个 goroutine,并将前面创建的 channel 传递进 去。然后,向这个 channel 中发送数据 3,最后 sleep 1 秒后程序退出。
7. 问题
7.1. 如果channel为nil,从这个channel中接收数据会怎么样?
- 在非阻塞模式下,会直接返回
- 在阻塞模式下,会调用
gopark
挂起goroutine,并且会一直阻塞下去
8. 关闭一个为nil的channel会怎么样?
关闭一个nil的channel或者一个已经关闭的channel都会导致panic
9. channel什么情况下会引起资源的泄露?
channel资源泄露的原因是goroutine操作channel之后,处于发送或者接收阻塞状态,而channel处于满或者空的状态,一直得不到改变。他同时,垃圾回收器也不会回收这部分的资源,就会导致goroutine一直处于等待的状态。
9.1. channel何时被GC收集?
如果一个channel没有任何goroutine引用,GC就会对其进行回收操作。