souce code
Go并发编程(十) 深入理解 Channel
Golang源码探索(二) 协程的实现原理
Goroutine Leaks - The Forgotten Sender
The Behavior Of Channels Author image
what:
管道;goroutine 之间 传输数据 ;
g <—> channel < —> g
how:
channel.send
1
2
3
4
5
6
7
|
if channe.receiveQueue.lengt==0 &&buffer.full ;
block:
sleep(current);channel.sendqueue.push(g)
if channel.reqceveiqueue.length>0;
g = receiveq.first;
wakeup g;
|
how
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
channel struct{
recevieQueue
sendQueue
buffer
}
sendtoChannel(data):
if buffer.isFull:
g.status = waiting
channel.pushtoSendqueue(currentG);
reScheule()
if buffer.isFull == NO:
if channel.receiveQ notEmpty:
g = receiveQ.pop()
g.status = runable;
g.receive(data)
put in procesoor local queue
else:
buffer.add(data)
|
structure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
type sudog struct{
g *g
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer //data element
...
}
|
how
-
overview
- block: gopark(g);
- unblock: goready(waitq.g)
-
send:
- block: have no wait g or buffer is full;
- gopark g, sendq.add g: 当前g进入等待状态,excute next g
- unblock: have wait g or buffer is not full;
- copy data to waitg or buffer;
- if receiveq.length != 0, goready(receive.g): 等待的g被重新唤醒,即将被调度;
-
receive:
- block: have no wait g or buffer is empty;
- gopark g, receiveq.add g
- unblock: have wait g or buffer is not empty
- copy data from wait g or buufer;
- if sendq.length !=0, goready(sendq.g)
goReady
-
pseudo code
- g.statue=runable;
- runput;
-
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
if trace.enabled {
traceGoUnpark(gp, traceskip)
}
status := readgstatus(gp)
// Mark runnable.
_g_ := getg()
mp := acquirem() // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
wakep()
releasem(mp)
}
|
gopark
-
pseudo code
1
2
|
g.status=waiting;
scheuele();
|
-
code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}
func park_m(gp *g) {
// g0
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
//线程安全更新gp的状态,置为_Gwaiting
casgstatus(gp, _Grunning, _Gwaiting)
// 移除gp与m的绑定关系
dropg()
if _g_.m.waitunlockf != nil {
fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
// 重新做一次调度
schedule()
}
|
channel state
state:
- nil
- read block;
- write block
- close: panic
- open:
- close
- read: deafult value
- write: panic
- close: panic
why:
-
write panic:
- 造成数据损坏, 由于receiver 预期不会从closed channle获取数据,sender 发送数据会导致数据丢失
-
read not panic:
- 不会造成数据损坏
- 方便开发者
- 判断channel 是否关闭
- for loop 结束循环
-
close won’t panic: 多次关闭会导致数据竞争, 通过明确panic 提醒开发者小心管理好channel的共享
why send and close chanel panic:
系统故意设计, 防止引发严重后果: 数据丢失;数据竞争
why receive closed channle won’t panic:
- 不会导致什么后果
- 方便消费者使用
how close channel work:
- channel.closed = true;
- wakeup all g in waiting queue;
check channel is closed:
1
2
3
4
5
6
7
8
9
|
data, open :=<-c
if !open print("closed")
data range channel{
...
}
fmt.Println("now is cloese")
|
concurrency program
- parallel workers(share data): do task at one thread

1
2
|
g1 handle data; Locker update data UnLocker
g2 ...
|
assembly line(communicate): do task in two parts

producer: handle data, work g
consumer: update data: cacaulte g
1
2
|
g1 handle data -> g3: update data
g2 handle data -> g3: update data
|
assembly line
pros:
- no need locker
- more 调理
models
- 1 G -> 1 G
1
2
|
go: channel<- data
go: <-channel
|
- N G->1 G: fan-in, most common
1
2
3
|
for data range datas: go run data
for data range channel: updateby data
|
- 1 G -> N G -> 1: pool
example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
producer:
go: queryMysql(data1) ;
go: queryMysql(data2)
consumer and new producer:
for data range c{
go: queryES(data)
}
// or
for data range c{
list.append(data)
}
go ES(data[0]+ data[1])
go ES(data[1]+ data[2])
consumer:
for range channel
|
code
1
2
3
4
5
6
7
8
9
10
11
|
func search(c chan, ctx context):
var innerChannel = make(chan int, 1)
go:
data = doSearch(innerChannel);
innerChannel <- data;
select:
case <-ctx.done(): //
return;
case data := <-innerChannel:
outChannel <-data
|
g leak
leak: 资源长时间占用不退出
what: g长时间占用不退出
when:
- 接受和 发送不平衡,导致
- 请求时间过长
1
2
3
|
go func(){
<-c // 没有接受者
}()
|
solve:
- channel 使用完需要关闭;
- 对于耗时任务需要设置超时时间;
principle, who produce , who close
- producer wait and close
1
2
3
4
5
|
go func1: defer wg.done
go func2: defer wg.done;
go func: wg.wait; close(chanel)
|
- buffer channel : consumer exit first
1
2
3
4
5
6
|
go func: buffer<-data
go func:
select
<-bufer
<-ctx
|
buffer channel
buffer: send/receive are non-blocking unless full or empty
un-buffer: send/receive are blocking
un-buffer: sync, 一直等待对方 接收到或者发送 才进行下一步
use case:
- 保持两个goroutine相同处理速度
- 传递一系列重要信息,确保每个信息对方都收到
buffer: aync 不管对方是否已经接收; 大部分情况下都可以使用buffer;
- 生产者不阻塞
- 解放生产者,让生产者继续做其他事情
- 避免潜在的消息丢失, 一些生产者为了减少阻塞,从而更快响应更重要的事件,会放弃消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func sendLog(logC chan string, w *sync.WaitGroup) {
defer w.Done()
select {
case logC <- "hello":
fmt.Println("log file is empty")
case <-time.After(time.Millisecond * 100):
fmt.Println("abort log data")
}
}
func mockWriteToDisk(logC <-chan string) {
for v := range logC {
// mock write to disk operation
time.Sleep(time.Second)
fmt.Println("write to disk: ", v)
}
}
|
- 避免可能内存泄露, unbuffer,生产者数据未被接受,则会一直被阻塞
unbuffer use case:
- 限流: 控制最大并发数
- 解耦: producer and consumer, 让生产者不会被消费者所阻塞
1
2
3
4
5
6
7
|
cotrol = make(chan struct{},3)
for _,value := range []int{1,2,3}:
control <- struct{}
go func:
xxx
<-control
|
unbuffer use case:
-
synchronize, ensure that data is processed immediately
-
not block and not sync
-
block only if
- producer: buffer is full
- consumer: bufer is empty
vs unbuffer channle:
- not block, block only channle is full(block producer) or empty(block consumer )
- producer and consumer not sync
use case:
*use feature: block only full or empty *
- avoid leak: producer, block only channel full
1
2
|
go sender:
buffer<- data
|
-
control max: producer, block only channel full;
-
ordercontrol:
- producer block only if channel full: g1
- consuemr block only if channel empty g2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
c1 := make(chan int,1)
c2 : = make(chan int,1)
g1: // c1.full, block.
//
c1<-1
fmt.Println(1)
c2<-1 // unblock c2
c1<-1 // block me
fmt.Println(3)
c2<-1 // unblock c2
c1<-1 // block me
...
g2: // c2.empty, bloock
<-c2 // block
fmt.Println(2)
<-c1 // unbock g1
<-c2 //block
fmt.Println(4)
<-c1 // unblock c1
<-c2 // block me
|
wait-closed and cancel
wait-closed
core:
- send notificatin after g end, by work g
- listen notification at one g, by count g
channel:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
go func1():
defer signalChan<-
do sth...
go func2():
....
for{
<-signalChann<-
finish++;
if finish == num:
close(c)
}
|
wait.group :
1
2
3
4
5
6
7
|
w.Add(N);
go fun1:
defer w.Done
....
go: w.wait()
|
cancel
producer send cancel signal, consumer listen signal
1
2
3
4
5
6
7
8
9
10
11
12
|
func longJob: ctx,cha // producer
select:
case <-ctx.Done():
return
case data <-innerChan:
cha<-data
go:
data = query()
innerChan<-dat
|
pool
simple: not resue the g
1
2
3
4
5
|
controlChannel = make(chan struct{},poolSize);
controlChannel <- struct{}{};
go func(){
}()
|
complex:
1 generator –(fan-out)–>N * go consumer—-(fan-in)–>1;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// generator
for data := range datas:
dataC<-data
// consumer1
go for:
defer wg.done;
v,ok := <-dataC
if !ok return ;
excute task
resultC <- result
// consumer2
go for:
....
// --- -----
go countG:
wg.wait
close(resultC)
for result range resultG:
|