go语言channel 原理,go编程说说channel哪些事

channel是什么

channel中文翻译为通道,它是Go语言内置的数据类型,使用channel不需要导入任何包,像int/float一样直接使用。它主要用于goroutine之间的消息传递和事件通知。 在Go语言中流传着一句话,就是说不要通过共享内存来通信,而是应该通过通信来共享内存。

Don't communicate by sharing memory, share memory by communicating.

上面这句话也包含了通信的两种方式:1是通过共享内存 2是通过通信。channel来源灵感要追溯到CSP(communicating sequential process)模型,CSP是用来描述并发系统中进行交互的一种模式。CSP概念最早由Tony Hoare提出,对这个人我们可能很陌生。但说到快速排序算法(Quicksort),你一定不陌生,快速排序太经典了,对Tony Hoare就是快速排序算法的作者。CSP允许使用进程组件来描述系统,各个组件独立运行,它们之间通过消息传递的方式进行通信。 Go语言中使用channel实现CSP思想,整个channel实现只有短短的700行代码,非常精炼,非常值得一读。channel和goroutine的结合,为并发编程提供了优雅的、便利的、 与传统并发控制不同的方案,并产生了在Go中特有的并发模式。

为什么需要channel

channel不是必需的,不用channel也可以完成goroutine之间的消息传递和事件通知,比如通过共享变量的方式。但使用了channel,会大大提升开发的效率,因为channel是并发安全的,channel的设计与goroutine之间完美配合,降低了并发编程的难度,减少了data race产生,大幅提升了生产力,嗯,程序员的福音。

channel基本用法channel创建

channel有3种类型,分别为只能接收、只能发送、既能接收又能发送。定义方法如下:

funcmain(){ //readChan只能接收 varreadChan<-chanint //writeChan只能发送 varwriteChanchan<-int //rwChan既能接收又能发送 varrwChanchanint fmt.Println(readChan==nil)//true fmt.Println(writeChan==nil)//true fmt.Println(rwChan==nil)//true }

channel中的元素对类型没有限制,任意类型都可以,所以元素的类型也可以是chan类型。那怎么判断<-属于哪个chan呢? <-的匹配规则是总是尽量与它左边的chan结合(the <-operator associates with the leftmost chan possible)。所以writeIntChan是一个只能发送的chan,发送的元素为chan int(双向的int型chan), readIntChan是一个只能接收的chan,发送的元素为chan int, rwIntChan是一个双向的chan, 发送的元素为chan int.

funcelemIsChan(){ //chan元素类型也可以是channel varwriteIntCahnchan<-chanint varreadIntChan<-chanchanint varrwIntChanchanchanint }

channel定义完成之后,并不能直接使用,需要初始化。上面readChan/writeChan/rwChan都是未被初始化的,它们的值都是nil. channel用make初始化,不能用new方法。make创建的时候可以传一个数字,表示创建有缓冲区的chan, 如果没有设置,它是无缓冲区的。

funcmakeChan(){ //无缓冲区的chan unbufferedCh:=make(chanint) //有缓冲区的chan,可以缓存10个int数据 bufferedCh:=make(chanint,10) }

向channel中发送数据

往chan中发送一个数据使用“ch<-”,下面的操作往ch发送一个int数据200. ch是一个双向的chan,可以往里面发送数据。 ch2是一个只能发送的ch,也可以往里面发送数据。

funcsendDataToChan(){ ch:=make(chanint,1) ch<-200 ch2:=make(chan<-int,1) ch2<-100 }

从channel中取数据

使用<-ch从chan中接收一条数据,ch是一个双向chan或者只读chan.下面的操作从ch读取数据,ch2是一个只读chan,也可以进行读取数据的操作。 chan接收操作,可以返回一个值可以可以返回两个值,第一个值是返回chan中的元素,第二个值是个bool类型,表示是否成功地从chan中读取到了一个值。如果chan已经被关闭而且所有的数据都已经读完,第一个值将是零值。需要注意的是,如果从chan读取到一个零值,可能是sender真实的在发送零值,也有可能是chan被关闭且没有缓存的元素了产生的零值。

funcrecvDataFromChan(){ //双向chan ch:=make(chanint,1) <-ch //只读chan ch2:=make(<-chanint,1) <-ch2 //只读取数据 _=<-ch2 //读取数据并想知道ch2是否已关闭 _,_=<-ch2 }

关闭channel

close(ch)直接将一个chan关闭,需要注意的是,如果一个chan未被初始化,也就是没有执行make操作,是不能close的,否则引发panic.还有就是不能重复关闭一个chan,重复关闭一个chan也会产生panic.还有就是不能往一个关闭的chan中发送数据,也会产生panic. 最后一个需要注意的是不能close一个只读的chan,直接编译不会通过。

funccloseNilChan(){ varchchanint close(ch) //panic:closeofnilchannel } funccloseOnlyReadChan(){ varch<-chanint ch=make(<-chanint) close(ch) //invalidoperation:close(ch)(cannotclosereceive-onlychannel) }

其他操作

Go内置的cap、len都可以操作chan,cap返回chan的容量,len返回chan中缓存的还未被取走的元素数量。还可以在select case中向chan发送数据或从chan中接收数据。 for-range操作chan,从chan读取数据。当ch被close之后,for-rang循环都会结束。

funcforRangeChan(){ ch:=make(chanint,1) wg:=sync.WaitGroup{} wg.Add(1) gofunc(){ deferwg.Done() //ch被close后,for-range会结束循环 forv:=rangech{ fmt.Println(v) } fmt.Println("for-rangeend") }() ch<-1 close(ch) wg.Wait() } funcforRangeChan2(){ ch:=make(chanint,1) wg:=sync.WaitGroup{} wg.Add(1) gofunc(){ deferwg.Done() //ch被close后,for-range结束循环 forrangech{ } fmt.Println("for-rangeend") }() close(ch) wg.Wait() }

channel实现原理

下面介绍channel底层是怎么实现的,源码在runtime/chan.go文件中。chan分配的是一个hchan的数据结构,定义如下:

const( maxAlign=8 //hchanSize为8的倍数,如果不是调整到下一个最小的8的倍数,假如hchan大小9,hchanSize为16 hchanSize=unsafe.Sizeof(hchan{}) uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) debugChan=false ) typehchanstruct{ //队列中元素的个数 qcountuint //环形队列的大小 dataqsizuint //指向含有dataqsiz个元素的数组的地址 bufunsafe.Pointer //元素的大小 elemsizeuint16 //通道是否关闭标识 closeduint32 //元素的类型 elemtype*_type //发送下标,即发送元素在环形队列的位置 sendxuint //接收下标,即接收元素在环形队列的位置 recvxuint //等待接收元素的goroutine队列 recvqwaitq //等待发送元素的goroutine队列 sendqwaitq //互斥锁,保护所有字段,包括sudogs中的字段,当持有锁的时候,不要修改其他G的状态 //因为这可能导致堆栈在收缩时产生死锁。 lockmutex }

qcount:代表chan中已经接收但还没被取走的元素的个数,使用len函数可以返回qcount的值

dataqsiz:队列的大小,chan中用一个循环队列来存放数据。

buf:存放元素的循环队列的buffer地址。

elemtype和elemsize:表示chan中元素的类型和size。chan一旦申明,它的元素类型是固定的,即普通类型或指针类型,所以元素大小也是固定的。

sendx:处理发送数据的指针在buf中的位置,一旦接收了新数据,指针就会加上elemsize,移动到下一个位置。buf的总大小是elemsize的整数倍,而且buf是一个循环列表。

recvx:处理接收请求时的指针在buf中的位置,一旦取出数据,该指针会移动到下一个位置。

recvq:chan是多生产者多消费者的模式,如果消费者因为没有数据可读而被阻塞了,就会被加入到recq队列中。

sendq: 如果生产者因为buf满了而阻塞,会被加入到sendq队列中。

上述的字段概括起来分为3部分,第一个部分是存储元素相关的,元素的类型,元素的大小,第二部分是buf相关的,存储数据buf的地址,buf的大小,buf中数据元素的个数,收发下标在buf中的索引位置,第三部分是调用相关的,recvq和sendq等待被调度的G队列。

channel创建实现

在编译的时候,编译器会根据容量的大小选择调用makechan64还是makechan。 makechan64只是做了size检查,后面调用的还是makechan函数,makechan函数的目的就是产生hchan结构体对象。下面对源码做了比较详细的解析。在分配buf的时候,根据存放元素是否含有指针做了不同的分配策略,如果通道中存放的元素中不含有指针类型,一次性分配hchanSize mem大小的空间,如果含有指针,需要进行两次分配,第一次分配通道结构头hchan,第二次为通道中元素分配存储空间buf。需要注意的时,产生的hchan对象肯定是8字节的倍数,不够8字节时候,分配时候会调整。

funcmakechan64(t*chantype,sizeint64)*hchan{ ifint64(int(size))!=size{ panic(plainError("makechan:sizeoutofrange")) } returnmakechan(t,int(size)) } //创建一个chan类型的结构体指针*hchan,传入参数有2个:t表示 //通道中放的元素类型,size表示通道的大小 funcmakechan(t*chantype,sizeint)*hchan{ elem:=t.elem //检查通道中元素大小是否小于2^16(65536),如果大于这个将抛异常 ifelem.size>=1<<16{ throw("makechan:invalidchannelelementtype") } //检查hchanSize是否为8的倍数,如果不是将抛出异常 ifhchanSize%maxAlign!=0||elem.align>maxAlign{ throw("makechan:badalignment") } mem,overflow:=math.MulUintptr(elem.size,uintptr(size)) //如果要申请的空间大小溢出或者超过最大分配值(maxAlloc)-hchanSize的值,或者申请通道 //的大小为负数,将引发panic ifoverflow||mem>maxAlloc-hchanSize||size<0{ panic(plainError("makechan:sizeoutofrange")) } varc*hchan switch{ casemem==0: //申请的是无缓冲区的channel //直接申请hchanSize字节大小的空间,hchanSize大小就是hchan结构体的大小 //如果不是8的倍数,调整到8的倍数 c=(*hchan)(mallocgc(hchanSize,nil,true)) //c.buf指向自己(即buf的位置) c.buf=c.raceaddr() caseelem.ptrdata==0: //通道中存放的元素中不含有指针类型,一次性分配hchanSize mem大小的空间 //存放元素占用的空间是一个连续的数组,跟hchan分配在一起 c=(*hchan)(mallocgc(hchanSize mem,nil,true)) //c.buf指向连续数组的起始位置 c.buf=add(unsafe.Pointer(c),hchanSize) default: //通道中存放的元素中含有指针,需要进行两次分配,第一次分配通道结构头hchan //第二次为通道中元素分配存储空间,调用mallocgc分配mem个字节空间,并将分配 //的地址位置赋值给c.buf c=new(hchan) c.buf=mallocgc(mem,elem,true) } //填充hchan元素大小、类型和环形队列大小字段 c.elemsize=uint16(elem.size) c.elemtype=elem c.dataqsiz=uint(size) //调试打印 ifdebugChan{ print("makechan:chan=",c,";elemsize=",elem.size,";dataqsiz=",size,"\n") } returnc } constMaxUintptr=^uintptr(0) //MulUintPtr返回a*b的值和一个bool类型,bool类型表示a*b是否溢出 funcMulUintptr(a,buintptr)(uintptr,bool){ ifa|b<1<<(4*sys.PtrSize)||a==0{ returna*b,false } overflow:=b>MaxUintptr/a returna*b,overflow }

发送操作实现原理

在编译发送数据给chan的时候,会把ch<-语句转换成chansend1函数,chansend1函数会调用chansend。chansend源码解析如下,整个chansend函数处理逻辑比较复杂,可以分为6部分处理逻辑。第一部分是对chan进行判空操作,如果chan是nil,调用者就永远被阻塞住了。第二部分是向一个缓存队列满的chan对象发送数据的时候,并且想不阻塞当前调用,这里的处理方式是直接返回。这里多说一点,什么时候不想阻塞呢?看chansend1调用chansend的时候block传递的是true。什么时候传递false呢?在处理select case default时候,如果chan被阻塞,需要执行default逻辑,像这种情况下传递的就是false.第三部分是如果chan已经被关闭,在向里面发送数据的话会panic.第四部分,如果等待队列中有等待的receiver,直接将数据拷贝的目标位置,不需要经过buf进行中转,只有1次拷贝。第五部分是说当前没有receiver,需要将数据放到buf,然后成功返回。第六部分是处理buf满的情况,如果buf满了,发送者的goroutine会被加入到发送者的等待队列中,直到被唤醒。

//发送操作c<-x调用入口 funcchansend1(c*hchan,elemunsafe.Pointer){ chansend(c,elem,true,getcallerpc()) } //往通道发送元素处理逻辑, funcchansend(c*hchan,epunsafe.Pointer,blockbool,callerpcuintptr)bool{ //c为nil表示通道还未初始化,如果不阻塞即block传false,会直接返回false //chansend1中调用block参数传的是true,chansend1是发送操作的入口, //所以这里可知,往未初始化的通道发送数据,会走到下面的gopark逻辑里。 ifc==nil{ if!block{ returnfalse } //挂起当前G gopark(nil,nil,waitReasonChanSendNilChan,traceEvGoStop,2) throw("unreachable") } ifdebugChan{ print("chansend:chan=",c,"\n") } ifraceenabled{ racereadpc(c.raceaddr(),callerpc,funcPC(chansend)) } //特殊情况的处理,非阻塞(block为false),且通道未被关闭,非缓冲通道且没有接收者直接返回, //对于缓冲性通道,但通道满了,也直接返回 if!block&&c.closed==0&&((c.dataqsiz==0&&c.recvq.first==nil)|| (c.dataqsiz>0&&c.qcount==c.dataqsiz)){ returnfalse } vart0int64 ifblockprofilerate>0{ t0=cputicks() } lock(&c.lock) //通过已经关闭,不能再发送元素,直接报panic ifc.closed!=0{ unlock(&c.lock) panic(plainError("sendonclosedchannel")) } //找到第一个等待的G,直接将数据ep拷贝给它,存在等待的G有两种情况 //情况1:通道非缓冲,receiver先被运行,被gopark了 //情况2:缓冲通道,通道中ringbuffer是空的,receiver先被运行,被gopark了 ifsg:=c.recvq.dequeue();sg!=nil{ //Foundawaitingreceiver.Wepassthevaluewewanttosend //directlytothereceiver,bypassingthechannelbuffer(ifany). send(c,sg,ep,func(){unlock(&c.lock)},3) returntrue } //有缓冲的通道,通道还没满,将元素加入到缓冲区 ifc.qcount<c.dataqsiz{ //qp是当前元素应该放入在环形队列的位置 qp:=chanbuf(c,c.sendx) ifraceenabled{ raceacquire(qp) racerelease(qp) } //将ep的内容拷贝到qp的位置,即将发送元素拷贝到环形队列中 typedmemmove(c.elemtype,qp,ep) //[c.recex,c.sendx]位置已填充带接收得元素,发送缓存区的位置c.sendx加1 //如果c.sendx加1操作之后已到达数组的尾部,即数组满了,回到起始位置0 c.sendx ifc.sendx==c.dataqsiz{ c.sendx=0 } c.qcount unlock(&c.lock) returntrue } if!block{ unlock(&c.lock) returnfalse } //走到这里表示,缓冲性通道已经满了,要挂起当前的G,构造一个sudog,加入到sendq队列 gp:=getg() mysg:=acquireSudog() mysg.releasetime=0 ift0!=0{ mysg.releasetime=-1 } //在elem被赋值到mysg加入sendq队列这段期间,栈是不会被分裂的 mysg.elem=ep mysg.waitlink=nil mysg.g=gp mysg.isSelect=false mysg.c=c gp.waiting=mysg gp.param=nil //将mysg加入到sendq c.sendq.enqueue(mysg) //gopark将当前的g挂起,切换到g0执行调度逻辑,当前的g状态会从running->waiting gopark(chanparkcommit,unsafe.Pointer(&c.lock),waitReasonChanSend,traceEvGoBlockSend,2) //确保要发送的数据已经被接收者copy获取到,因为ep是一个栈上的对象,mysg.elem指向的 //是一个栈上的对象 KeepAlive(ep) //被唤醒后要执行的逻辑 ifmysg!=gp.waiting{ throw("Gwaitinglistiscorrupted") } gp.waiting=nil gp.activeStackChans=false ifgp.param==nil{ ifc.closed==0{ throw("chansend:spuriouswakeup") } panic(plainError("sendonclosedchannel")) } gp.param=nil ifmysg.releasetime>0{ blockevent(mysg.releasetime-t0,2) } mysg.c=nil releaseSudog(mysg) returntrue } funcsend(c*hchan,sg*sudog,epunsafe.Pointer,unlockffunc(),skipint){ //数据竞争检查,执行gorun-race会进入 ifraceenabled{ ifc.dataqsiz==0{ racesync(c,sg) }else{ //修改缓冲区中发送者和接收者的下标位置 qp:=chanbuf(c,c.recvx) raceacquire(qp) racerelease(qp) raceacquireg(sg.g,qp) racereleaseg(sg.g,qp) c.recvx ifc.recvx==c.dataqsiz{ c.recvx=0 } c.sendx=c.recvx//c.sendx=(c.sendx 1)%c.dataqsiz } } //发送的数据不为空,直接调用sendDirect将数据ep拷贝到sg的elem中 ifsg.elem!=nil{ sendDirect(c.elemtype,sg,ep) sg.elem=nil } gp:=sg.g unlockf() gp.param=unsafe.Pointer(sg) ifsg.releasetime!=0{ sg.releasetime=cputicks() } //切换gp的状态为runnable goready(gp,skip 1) } //直接在两个栈上进行数据拷贝,src是发送者栈G上待发送的数据,sg是接收G funcsendDirect(t*_type,sg*sudog,srcunsafe.Pointer){ //一旦对sg.elem进行了读操作,如果发送栈扩容,它将永不会被更新 dst:=sg.elem typeBitsBulkBarrier(t,uintptr(dst),uintptr(src),t.size) //从src拷贝t.size字节到dst memmove(dst,src,t.size) }

接收操作实现原理

在编译器处理<-ch操作时,会将其转成chanrecv1函数,如果要返回两个返回值,会转成chanrecv2, chanrecv1和chanrecv2都会调用chanrecv。chanrecv1和chanrecv2传入的block参数都是true。分析的时候先不考虑block为false的情况。接收操作源码解析如下

//接收操作入口<-c,有2个入参,c表示通道结构体指针 //elem是接收通道元素的变量地址,即<-c左边的接收者 funcchanrecv1(c*hchan,elemunsafe.Pointer){ chanrecv(c,elem,true) } //_,_:=<-cchanrecv2比chanrecv1会返回一个bool类型 //该bool表示是否成功地从chan中读取到了一个值,chanrecv1和chanrecv2内部调用 //都是同一个函数 funcchanrecv2(c*hchan,elemunsafe.Pointer)(receivedbool){ _,received=chanrecv(c,elem,true) return } //chanrecv将通道c中接收到的数据写入到ep执行的地址内存中,ep指向的位置可能是一个堆空间也可能在栈空间上 //如果忽略接收通道值,ep将是nil值,如果传入的block为false并且通道中没有元素,将返回false,false //如果通道c被关闭,ep将会填充类型的零值,返回true和false,其他情况ep会填充通道中接收到的值,返回 //值为true,true funcchanrecv(c*hchan,epunsafe.Pointer,blockbool)(selected,receivedbool){ //调试输出 ifdebugChan{ print("chanrecv:chan=",c,"\n") } //c为nil,即未初始化,调用gopark挂起当前的 ifc==nil{ if!block{ return } gopark(nil,nil,waitReasonChanReceiveNilChan,traceEvGoStop,2) throw("unreachable") } //非阻塞接收且通道未被关闭,非缓冲通道且没有发送中G直接返回false,false //有缓冲通道且通道中没有元素,也直接返回false,false if!block&&(c.dataqsiz==0&&c.sendq.first==nil|| c.dataqsiz>0&&atomic.Loaduint(&c.qcount)==0)&& atomic.Load(&c.closed)==0{ return } vart0int64 ifblockprofilerate>0{ t0=cputicks() } lock(&c.lock) //通过已关闭且通道中没有元素了 ifc.closed!=0&&c.qcount==0{ ifraceenabled{ raceacquire(c.raceaddr()) } unlock(&c.lock) ifep!=nil{ typedmemclr(c.elemtype,ep) } returntrue,false } //等待发送的G队列非空,即有G在等待发送元素,如果是非缓冲队列,直接将数据从 //发送者的sg中的elem拷贝到ep中;如果是缓冲队列,将队列头的元素添加到ep //中,并将发送者sg中的元素拷贝到队列的尾部。 ifsg:=c.sendq.dequeue();sg!=nil{ recv(c,sg,ep,func(){unlock(&c.lock)},3) returntrue,true } //队列中有元素,将待拷贝的元素拷贝到ep中 ifc.qcount>0{ //Receivedirectlyfromqueue //qp为环形队列中待拷贝的元素的位置 qp:=chanbuf(c,c.recvx) ifraceenabled{ raceacquire(qp) racerelease(qp) } ifep!=nil{ //将qp位置的元素拷贝到ep中 typedmemmove(c.elemtype,ep,qp) } typedmemclr(c.elemtype,qp) //接收位置的下标 1,如果接收位置到达队尾,重置为0 c.recvx ifc.recvx==c.dataqsiz{ c.recvx=0 } c.qcount-- unlock(&c.lock) returntrue,true } if!block{ unlock(&c.lock) returnfalse,false } //走到这里有2种情况,1是非缓冲通道发送者没有准备好,2是缓冲型通道但没有元素, //构造一个sudog加入到通道的等待接收的G队列,调用gopark挂起当前的G,即将状态 //标为waiting,进入G0执行调度逻辑。 gp:=getg() mysg:=acquireSudog() mysg.releasetime=0 ift0!=0{ mysg.releasetime=-1 } //在elem被赋值到mysg加入sendq队列这段期间,栈是不会被分裂的 mysg.elem=ep mysg.waitlink=nil gp.waiting=mysg mysg.g=gp mysg.isSelect=false mysg.c=c gp.param=nil //将mysg加入到等待接收的G队列 c.recvq.enqueue(mysg) //gopark将当前的g挂起,切换到g0执行调度逻辑,当前的g状态会从running->waiting gopark(chanparkcommit,unsafe.Pointer(&c.lock),waitReasonChanReceive,traceEvGoBlockRecv,2) //执行唤醒后的逻辑 ifmysg!=gp.waiting{ throw("Gwaitinglistiscorrupted") } gp.waiting=nil gp.activeStackChans=false ifmysg.releasetime>0{ blockevent(mysg.releasetime-t0,2) } closed:=gp.param==nil gp.param=nil mysg.c=nil releaseSudog(mysg) returntrue,!closed } //通道接收操作,包含2部分处理逻辑,1是将sg中发送者的元素拷贝到通道中,并唤醒发送者;2是将 //通道中待接收处理的元素拷贝到ep中。注意,走到recv只有2中情况,队列为空或满了,队列没 //满不会走到这里。 funcrecv(c*hchan,sg*sudog,epunsafe.Pointer,unlockffunc(),skipint){ //非缓冲型通道,直接将元素从sg拷贝到ep中,不经过环形队列作为中转 ifc.dataqsiz==0{ ifraceenabled{ racesync(c,sg) } ifep!=nil{ //直接将数据从sg拷贝到ep recvDirect(c.elemtype,sg,ep) } }else{ //队列满的情况,走到这里只可能是队列满的情况,对于队列不满的情况,不会走 //到recv函数里c.recvx和c.sendx是在同一个位置,先将环形队列 //中待接收处理位置qp地方的元素拷贝到ep中,然后将发送者sg.elem //中的元素拷贝到队列的尾部,对于队列满的情况 qp:=chanbuf(c,c.recvx) ifraceenabled{ raceacquire(qp) racerelease(qp) raceacquireg(sg.g,qp) racereleaseg(sg.g,qp) } //将qp位置的数据拷贝到ep中 ifep!=nil{ typedmemmove(c.elemtype,ep,qp) } //将sg.elem中的元素拷贝到qp位置 typedmemmove(c.elemtype,qp,sg.elem) c.recvx ifc.recvx==c.dataqsiz{ c.recvx=0 } c.sendx=c.recvx//c.sendx=(c.sendx 1)%c.dataqsiz } sg.elem=nil gp:=sg.g unlockf() gp.param=unsafe.Pointer(sg) ifsg.releasetime!=0{ sg.releasetime=cputicks() } //对gp执行goread,将其状态从waiting修改为runnable状态 goready(gp,skip 1) } //直接在两个栈上进行数据拷贝,dst是接收者栈G上待接收数据的地址,sg是发送G funcrecvDirect(t*_type,sg*sudog,dstunsafe.Pointer){ src:=sg.elem typeBitsBulkBarrier(t,uintptr(dst),uintptr(src),t.size) memmove(dst,src,t.size) }

第一部分处理的是chan为nil的情况,从nil chan中接收数据,调用者会被永远阻塞。第二部分是对特殊情况的处理,非阻塞(block为false),且通道未被关闭,非缓冲通道且没有接收者直接返回,对于缓冲性通道,但通道满了,也直接返回。第三部分是chan已经被close的情况,如果chan已经被close了,并且队列中没有缓存的元素,直接返回true,false.第四部分是处理等待发送队列中有等待G的情况,这时,如果buf中有数据,先从buf中读取数据,否则直接从等待队列中获取一个发送者,把它的数据复制给这个receiver.第五部分是处理没有等待发送者G的情况,如果buf有元素,就取出一个元素给接收者。第六部分是处理buf中没有元素的情况,如果没有元素,阻塞当前的G,直到它从发送者中获取到了数据,或是chan被关闭了,才返回。

关闭操作实现原理

执行close(ch)关闭chan ch,最终会调用的是closechan方法。下面是closechan的源码解析。关闭nil chan会产生panic. 如果chan已经关闭再次关闭也会产生panic.否则将ch等待队列中的全部接收者和发送者G从队列中全部移除加入了glist.然后将glist中的所有G执行goready唤醒。

//关闭通道 funcclosechan(c*hchan){ //通道为空进行关闭引发panic ifc==nil{ panic(plainError("closeofnilchannel")) } //加锁,会对c进行修改 lock(&c.lock) ifc.closed!=0{ unlock(&c.lock) panic(plainError("closeofclosedchannel")) } ifraceenabled{ callerpc:=getcallerpc() racewritepc(c.raceaddr(),callerpc,funcPC(closechan)) racerelease(c.raceaddr()) } //closed修改为1表示,通道已关闭 c.closed=1 varglistgList //将所有的等待接收队列中的G出队,加入到glist中 for{ sg:=c.recvq.dequeue() ifsg==nil{ break } ifsg.elem!=nil{ typedmemclr(c.elemtype,sg.elem) sg.elem=nil } ifsg.releasetime!=0{ sg.releasetime=cputicks() } gp:=sg.g gp.param=nil ifraceenabled{ raceacquireg(gp,c.raceaddr()) } glist.push(gp) } //将所有等待发送队列中的G加入到glist中 for{ sg:=c.sendq.dequeue() ifsg==nil{ break } sg.elem=nil ifsg.releasetime!=0{ sg.releasetime=cputicks() } gp:=sg.g gp.param=nil ifraceenabled{ raceacquireg(gp,c.raceaddr()) } glist.push(gp) } unlock(&c.lock) //ReadyallGsnowthatwe'vedroppedthechannellock. //将glist中所有的G唤醒,即他们的状态都变为可运行态runnable for!glist.empty(){ gp:=glist.pop() gp.schedlink=0 goready(gp,3) } }

其他操作实现原理

下面讲解chan接收和发送与select结合是如何实现的。先看一个发送的情景。select chan default的语句会被编译器转换成if selectnbased(c,v){}else do{...}语句。selectnbased内部调用的也是chansend方法,只是在block参数传递上与前面的发送操作不同,这里传递的是false,就是不要阻塞在chansend, 不能发送的时候,要返回回来走default逻辑。同理,接收操作方法select中同时又有default的时候,会翻译成selectnbrecv或selectnbrecv2,两者的不同是接收操作有2种类型,一种是带bool的表示是否成功地从chan中读取到了一个值,另一种是不关心该参数。

//compilerimplements // //select{ //casec<-v: //...foo //default: //...bar //} // //as // //ifselectnbsend(c,v){ //...foo //}else{ //...bar //} // funcselectnbsend(c*hchan,elemunsafe.Pointer)(selectedbool){ returnchansend(c,elem,false,getcallerpc()) }

funcselectnbrecv(elemunsafe.Pointer,c*hchan)(selectedbool){ selected,_=chanrecv(c,elem,false) return } funcselectnbrecv2(elemunsafe.Pointer,received*bool,c*hchan)(selectedbool){ //TODO(khr):justreturn2valuesfromthisfunction,nowthatitisinGo. selected,*received=chanrecv(c,elem,false) return }

channel使用容易犯的错误
  • close一个nil型chan
  • 往已关闭的chan中发送数据
  • close已经关闭的chan
  • goroutine泄露

close一个nil值的chan会产生panic.我们在关闭chan的时候要清楚chan是否是nil值,如果不确定可以判断一下。第2点和第3点涉及到chan的关闭,不当的操作会导致panic.如何优雅的关闭channel, 可以读一读小冰的这篇文章优雅关闭channel,里面有讲关闭思路。

下面将一个groutine泄露的例子,分析产生的原因。这个例子来自于go-zero作者,原文见这里一文搞懂如何实现 Go 超时控制。可以从这个例子中吸取经验,避免写出类似的bug.

funcmain(){ consttotal=1000 varwgsync.WaitGroup wg.Add(total) now:=time.Now() fori:=0;i<total;i { gofunc(){ deferwg.Done() requestWork(context.Background(),"any") }() } wg.Wait() fmt.Println("elapsed:",time.Since(now)) time.Sleep(time.Minute*2) fmt.Println("numberofgoroutines:",runtime.NumGoroutine()) } funchardWork(jobinterface{})error{ time.Sleep(time.Minute) returnnil } funcrequestWork(ctxcontext.Context,jobinterface{})error{ ctx,cancel:=context.WithTimeout(ctx,time.Second*2) defercancel() done:=make(chanerror) gofunc(){ done<-hardWork(job) }() select{ caseerr:=<-done: returnerr case<-ctx.Done(): returnctx.Err() } }

上面的代码输出结果如下:

go语言channel 原理,go编程说说channel哪些事(1)

啥,打印1001个goroutine,goroutine泄露了。问题在于requestWork函数中,会执行到select <-ctx.Done()这句,因为ctx是一个超时取消context,这里超时时间设置的是2秒钟,hardWork是一个比较耗时的任务,这里模拟sleep 1分钟。所以2秒后,requestWork退出了, 当1分钟后执行 done<-hardWork(job)的时候会被卡住,因为没有接收者了,当前的G会执行gopark挂起了,并且永远得不到执行。所以main函数中的打印会输出1001(有1个是main groutine)。 那怎么修改呢?把done:=make(chan error,1)改成带有1个缓冲区的chan就可以了,因为有1个容量的缓冲区,所以当hardWork(job)执行完后,会将结果放在缓冲区中,然后协程就退出了,不会卡住。也许有读者会问题,这里协程退出之后,done怎么办,会不会存在泄漏。不会出现泄漏,当与chan绑定的的G都不存在的时候,chan会被gc回收掉。

总结

chan值和状态存在很多种情况,不同状态下执行发送、接收、关闭操作会产生不同的情况,下面将各种情况汇总,看完下表可以梳理清楚chan的各个知识点。

go语言channel 原理,go编程说说channel哪些事(2)

欢迎关注公众号-数据小冰,更多精彩内容和你一起分享

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页