Contents

context && select

Go Concurrency Patterns: Context

深度解密Go语言之context

context

what: 可以理解为一个特殊的局部变量;

  1. 存取变量是 线程安全;
  2. 可以传递阻塞/取消信号;

使用场景: 需要在多个协程里传递变量,信号;

举例说明:

  1. api handle:

context: cancel a batch of g; pass value to specify-scoped of g; channel: share data between g select: receive/send data from/to mutiple channel;

cancel or pass data;

what: go 提供的线程安全的共享信号和变量

相比全局变量:

  1. block a group of goroutine
  2. thread safe
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type Context interface {
    Done() <-chan struct{} // return a will be be closed channel;
    Err() error;
    Deadline() (deadline time.Time, ok bool)
    Value(key interface{}) interface{}
}

type canceler interface {
	cancel(removeFromParent bool, err, cause error)
	Done() <-chan struct{}
}

https://cdn.jsdelivr.net/gh/atony2099/imgs@master/20211113/Gg2khI.jpg

pass value

1
2
3
ctx  := context.WithValue("123","123")

value := ctx.Value("123")

pass signal

1
2
ctx, cancel := context.WithCanel(ctx)
ctx, cacel : =context.WithTimeout(ctx)
pass value, cancel signal between  G;

context: func cancelChann), func Value()
  1. types?

    1. value: valueCtx
    2. signal: cancelCtx->timerCtx;
  2. features?

    1. thread safe;
    2. cancel mutiple g at one time;

use case

  1. web server, pass request info: userid
1
2
3
4
5
context.withValue("userID", 123 )

func haneleRequest(ctx){

}
  1. cancel a set of goroutine, a long request g usually have a context to exit in early
1
2
3
4
5
6
7
8
9
// request 
func httpRequest(ctx,url):
	go dolong(chan,url)
	select{
		resp := <-chan:
				return resp	
		<-ctx.done
			return err
	}

how it works

contxt value: 使用不可变值确保线程安全

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type struct{
	parent Contxt
	key, value any
} 
WithValue(ctx, key,value) Context:
	newCtx = struct{parent:ctx, key:key,value:value }
	return newCtx 
	 

func (c *valueCtx) Value(key interface{}) interface{} {

	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}

cancel ctx: by close empty channel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
ctx:
	cancelChannel
	[]childCtx

Done:
	return ctx.cancelChanel

cancel:
	close(ctx.channel)
	for child range  childCtx:
		child.cancel

withCancel:
	return func(){
		cancel()
	}
	
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
type cancelCtx struct {
	Context
	mu       sync.Mutex            // protects following fields
	done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
	cause    error                 // set to non-nil by the first cancel call
}

func (c *cancelCtx) cancel(removeFromParent bool, err, cause error){
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	if cause == nil {
		cause = err
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // already canceled
	}
	c.err = err
	c.cause = cause
	d, _ := c.done.Load().(chan struct{})
	if d == nil {
		c.done.Store(closedchan)
	} else {
		close(d)
	}
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err, cause)
	}
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		removeChild(c.Context, c)
	}
}

exit loop;

  1. in repeated job

    for { select { case <- stopSignal // ctx.Done(): default: // do other thing

     }
    

    }

  2. in long job; isCancle(){ select{ case ctx.Donec/<-chan: return true case default:return false } }

    step1: do sth check isCancle(); step2: do sth

select

use case:

  1. cancel long job
1
2
3
4
select{
	<-ctx.Done()
	<- longJob
}
  1. 并发执行不同类型的任务
1
2
3
4
5
for i =0; i <= 2; i++:
	select{
		 <- querySql
		 <- requestApi
	}
  1. 同一个类型任务 错误、正确
1
2
3
4
5
for i =0; i <= 2; i++:
	select{
		 <- querySqlChan
		 <- querySqlErrorChan
	}

how

1
2
3
4
5
6

select:
    if any channle read/write able == true 
	  random select a  channle to read/write
    add current g to all channle's read/write queue, gopark until be ready:
	    select