Go-深入理解channel和select
深入理解channel和select
数据结构
hchan channel的结构体
type hchan struct {
qcount uint // 队列中的数据总量
dataqsiz uint // 环形队列大小, 大于0代表有缓存, 等于0代表无缓存
buf unsafe.Pointer // 元素数组的指针
elemsize uint16 // 单个元素的大小
closed uint32 // 是否close
elemtype *_type // 元素类型
sendx uint // 发送数组的索引
recvx uint // 接收数组的索引
recvq waitq // 等待recv的数据链表
sendq waitq // 等待send的数据链表
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
waitq的数据结构
type waitq struct {
first *sudog
last *sudog
}
chan
makechan
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 检查size是否过去打
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 检查内存对齐
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 检查size是否小于0或者内存溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
// size为0,无缓存
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
// 元素不包含内存,分配缓存
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
// 包含指针,不分配缓存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 元素大小
c.elemsize = uint16(elem.size)
// 元素类型
c.elemtype = elem
// 数据大小
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
chansend
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
// 如果c为nil,则会block,等待其他唤醒
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// 如果channel是closed,则会panic send on closed channel
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// 如果发现有个等待的接收者,则直接发给接收者,跳过buf
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 如果当前buf没有满,则加入到队尾
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 直接拷贝
typedmemmove(c.elemtype, qp, ep)
// 更新sendx索引
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
// 如果没有接收者或者buf满了,则需要block
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 初始化sudog
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// block 直接到有人唤醒
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
send
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
// 将ep直接发送到对面的sg上
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒
goready(gp, skip+1)
}
chanrecv
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
// 如果是nil,同样也会阻塞
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
// 清除内存
typedmemclr(c.elemtype, ep)
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 如果有一个发送方,如果buffer为0,接收方会直接从发送方接收数据,发送的数据会直接写到队尾
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
if c.qcount > 0 {
// Receive directly from queue
// 如果buf有数据,则直接从队列中读取
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 读取完成后清除
typedmemclr(c.elemtype, qp)
// 更新sendx索引
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
// 没有sender,block
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// 阻塞等待唤醒
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
recv
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
// 无缓存buf
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
// 如果ep不为空,则直接copy数据给sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// 有缓存buf
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
// 如果队列满了,则从队头去除一个元素
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
// 拷贝数据给接收方
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
// 将当前的数据写入到队列中
typedmemmove(c.elemtype, qp, sg.elem)
// 更新索引
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
closechan
关闭channel
func closechan(c *hchan) {
// channel为空,则报panic
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 如果channel已经关闭了
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
// 把所有的接收方全部放入到glist中
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
// 把所有的发送方放入到glist,他们会报panic
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// 开始唤醒g
goready(gp, 3)
}
}
Select
reflect_rselect
func reflect_rselect(cases []runtimeSelect) (int, bool) {
// 如果没有case,则block
if len(cases) == 0 {
block()
}
sel := make([]scase, len(cases))
order := make([]uint16, 2*len(cases))
for i := range cases {
rc := &cases[i]
switch rc.dir {
case selectDefault:
sel[i] = scase{kind: caseDefault}
case selectSend:
// rc.val 就是send的val
sel[i] = scase{kind: caseSend, c: rc.ch, elem: rc.val}
case selectRecv:
// rc.val 就是recv的val
sel[i] = scase{kind: caseRecv, c: rc.ch, elem: rc.val}
}
if raceenabled || msanenabled {
selectsetpc(&sel[i])
}
}
return selectgo(&sel[0], &order[0], len(cases))
}
selectgo
selectgo代码比较多,有些不重要的地方做了些精简
大致流程如下:
- 循环判断case,如果是阻塞,则跳过,如果有default,则执行default
- 所有case都阻塞并且没有default,则创建一个waiting队列,绑定sudog到对应recvq和sendq上,并且阻塞等待唤醒
- 如果某个sudog对应的case被G唤醒了,则清除waiting队列的数据
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
// if debugSelect {}
// order 是2倍的ncases, cas是1倍的ncases
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
// scases 从0-ncases
scases := cas1[:ncases:ncases]
// pollorder 从0-ncases
pollorder := order1[:ncases:ncases]
// lockorder 从ncases-2*ncases
lockorder := order1[ncases:][:ncases:ncases]
// Replace send/receive cases involving nil channels with
// caseNil so logic below can assume non-nil channel.
for i := range scases {
cas := &scases[i]
if cas.c == nil && cas.kind != caseDefault {
*cas = scase{}
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
for i := 0; i < ncases; i++ {
scases[i].releasetime = -1
}
}
// generate permuted order
// 生成排列的顺序
for i := 1; i < ncases; i++ {
j := fastrandn(uint32(i + 1))
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}
// sort the cases by Hchan address to get the locking order.
// simple heap sort, to guarantee n log n time and constant stack footprint.
// 通过Hchan的地址来确定锁排序,使用堆排序减少时间复杂度,创建一个最大堆
for i := 0; i < ncases; i++ {
j := i
// Start with the pollorder to permute cases on the same channel.
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
for i := ncases - 1; i >= 0; i-- {
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {
break
}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
k++
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}
// if debugSelect {}
// lock all the channels involved in the select
// 锁定所有的channel按照上面的顺序
sellock(scases, lockorder)
var (
gp *g
sg *sudog
c *hchan
k *scase
sglist *sudog
sgnext *sudog
qp unsafe.Pointer
nextp **sudog
)
loop:
// pass 1 - look for something already waiting
var dfli int
var dfl *scase
var casi int
var cas *scase
var recvOK bool
for i := 0; i < ncases; i++ {
casi = int(pollorder[i])
cas = &scases[casi]
c = cas.c
switch cas.kind {
case caseNil:
// nil的case,忽略
continue
case caseRecv:
// 接收类型的case,判断sendq中有没有待发送的数据,如果有,就recv
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
// 缓存里面有数据,跳转到bufrecv
if c.qcount > 0 {
goto bufrecv
}
// 如果channel关闭了,则跳转到rclose
if c.closed != 0 {
goto rclose
}
case caseSend:
if raceenabled {
racereadpc(c.raceaddr(), cas.pc, chansendpc)
}
// 如果channel关闭了,则跳转到sclose
if c.closed != 0 {
goto sclose
}
// 看下待recv有没有数据,有的话,就跳转到发送
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
// 如果有缓存,切没有满,则跳转到bufsend
if c.qcount < c.dataqsiz {
goto bufsend
}
case caseDefault:
// 有default,更新case的索引和地址
dfli = casi
dfl = cas
}
}
//根据dfl判断是否有default,并且命中了,则直接跳转到default
if dfl != nil {
selunlock(scases, lockorder)
casi = dfli
cas = dfl
goto retc
}
// pass 2 - enqueue on all chans
// 所有的case都要等待,并且没有default执行
gp = getg()
if gp.waiting != nil {
throw("gp.waiting != nil")
}
nextp = &gp.waiting
// 遍历所有的case, 然后将其放到g.waitlink中,
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
if cas.kind == caseNil {
continue
}
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
// Construct waiting list in lock order.
*nextp = sg
nextp = &sg.waitlink
// 根据不同的cas类型,在recvq和sendq中插入这个sudog
switch cas.kind {
case caseRecv:
c.recvq.enqueue(sg)
case caseSend:
c.sendq.enqueue(sg)
}
}
// 挂起等待唤醒
gp.param = nil
gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
sellock(scases, lockorder)
gp.selectDone = 0
sg = (*sudog)(gp.param)
gp.param = nil
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
casi = -1
cas = nil
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
// 释放waiting队列前,需要清除数据
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
for _, casei := range lockorder {
k = &scases[casei]
if k.kind == caseNil {
continue
}
if sglist.releasetime > 0 {
k.releasetime = sglist.releasetime
}
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
// 确定这个sudog被唤醒的G出列
casi = int(casei)
cas = k
} else {
// 把其他还爱等待的sudog从waiting队列溢出
c = k.c
if k.kind == caseSend {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
if cas == nil {
// We can wake up with gp.param == nil (so cas == nil)
// when a channel involved in the select has been closed.
// It is easiest to loop and re-run the operation;
// we'll see that it's now closed.
// Maybe some day we can signal the close explicitly,
// but we'd have to distinguish close-on-reader from close-on-writer.
// It's easiest not to duplicate the code and just recheck above.
// We know that something closed, and things never un-close,
// so we won't block again.
// 由于可以用gp.param == nil唤醒,所以cas为空,要再次循环判断下
goto loop
}
c = cas.c
if debugSelect {
print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
}
if cas.kind == caseRecv {
recvOK = true
}
if raceenabled {
if cas.kind == caseRecv && cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
} else if cas.kind == caseSend {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
}
if msanenabled {
if cas.kind == caseRecv && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
} else if cas.kind == caseSend {
msanread(cas.elem, c.elemtype.size)
}
}
selunlock(scases, lockorder)
goto retc
bufrecv:
// can receive from buffer
// 从buffer中接收到了数据
if raceenabled {
if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
}
raceacquire(chanbuf(c, c.recvx))
racerelease(chanbuf(c, c.recvx))
}
if msanenabled && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
}
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
// 开始拷贝
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
// 解锁当前case
selunlock(scases, lockorder)
goto retc
bufsend:
// can send to buffer
// 可以发送给buffer
if raceenabled {
raceacquire(chanbuf(c, c.sendx))
racerelease(chanbuf(c, c.sendx))
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc
recv:
// can receive from sleeping sender (sg)
// 可以从其他等待的sender中获取数据
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncrecv: cas0=", cas0, " c=", c, "\n")
}
recvOK = true
goto retc
rclose:
// read at end of closed channel
// 读取到关闭的channel,解锁
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
// 清除数据
typedmemclr(c.elemtype, cas.elem)
}
if raceenabled {
raceacquire(c.raceaddr())
}
goto retc
send:
// can send to a sleeping receiver (sg)
// 将数据发送给一个等待的receiver
if raceenabled {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncsend: cas0=", cas0, " c=", c, "\n")
}
goto retc
retc:
if cas.releasetime > 0 {
blockevent(cas.releasetime-t0, 1)
}
return casi, recvOK
sclose:
// send on closed channel
// 发送给一个关闭的channel,发生painc
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
}