Go 源码学习之 Context

引言

在一次排查某 HTTP 接口请求频繁因 context canceled 错误导致请求处理失败的问题期间,深入了解了下 Go 语言的 Context 实现。本文将首先介绍我们是如何排查诡异的 context canceled 产生原因(也就是在哪儿因为什么而导致取消的);接下来将深入介绍 Context 诞生的目的、源码解析及应用场景等,便于更进一步加深对它的理解;最后我们也会谈及使用 Context 的一些痛点。

排查 context canceled 的艰辛历程

背景

我们部署在生产环境的 HTTP 服务中提供了一个用于记录用户课程学习进度的接口,在 Sentry 中发现,有大量 context canceled 报错出现,导致在执行数据库查询时失败,从而导致完整的请求处理流程没有走完(用户的学习进度计算、业务方消息通知等没有执行)。但早期由于 Sentry 接入存在问题,导致错误记录没有上报;直到问题修复后,才在 Sentry 上观察到大量报错提示。由此,开启了定位 context canceled 问题之旅~

报告详细错误日志

我们首先对发生错误的位置,添加了更详细的错误日志,如请求上下文以及错误发生时的调用栈。待上线后,在 Sentry 中观察到了出错时详细的调用栈如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
File "git.xixi.com/group/project-foo/pkg/models/prog/learn_progress.go", line 109, in Create
d.Detect()
File "git.xixi.com/group/project-foo/pkg/controller/learn_progress.go", line 46, in Update
success := learnProgress.Create(ctx, memberID, unitID, bizType, progress, clientUpdatedAt) != nil
File "git.xixi.com/group/project-foo/pkg/web/handlers/learn_progress.go", line 77, in Post
success := h.ctrl.Update(h.R.Context(), memberID, unitID, bizType, progress, item.ClientUpdateAt)
File "/go/pkg/mod/git.xixi.com/bit/zerzura@v4.1.1+incompatible/rest/handler.go", line 53, in ServeHTTP
render(handler.Post())
File "/go/pkg/mod/github.com/go-chi/chi@v3.3.2+incompatible/mux.go", line 291, in func1
handler.ServeHTTP(w, r)
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/github.com/go-chi/chi@v3.3.2+incompatible/mux.go", line 424, in routeHTTP
h.ServeHTTP(w, r)
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/git.xixi.com/bit/zerzura@v4.1.1+incompatible/rest/middleware/sentry_meta.go", line 19, in func1
next.ServeHTTP(w, r.WithContext(ctx))
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/git.xixi.com/go/box@v0.0.0-20190710074902-1cbc4c2abdad/zapi/middleware/auth/nginx.go", line 200, in func1
next.ServeHTTP(w, r1)
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/git.xixi.com/go/box@v0.0.0-20190710074902-1cbc4c2abdad/zapi/context.go", line 67, in func1
next.ServeHTTP(w, r1)
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/github.com/go-chi/cors@v1.0.0/cors.go", line 199, in func1
next.ServeHTTP(w, r)
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/git.xixi.com/go/box@v0.0.0-20190710074902-1cbc4c2abdad/zapi/middleware/cors.go", line 51, in 1
defaultCORS.Handler(next).ServeHTTP(w, r)
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/github.com/go-chi/chi@v3.3.2+incompatible/middleware/heartbeat.go", line 21, in 1
h.ServeHTTP(w, r)
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/github.com/go-chi/chi@v3.3.2+incompatible/middleware/recoverer.go", line 35, in func1
next.ServeHTTP(w, r)
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/github.com/go-chi/chi@v3.3.2+incompatible/middleware/logger.go", line 46, in 1
next.ServeHTTP(ww, WithLogEntry(r, entry))
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/git.xixi.com/go/box@v0.0.0-20190710074902-1cbc4c2abdad/zapi/middleware/realip.go", line 18, in func1
h.ServeHTTP(w, r)
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/git.xixi.com/go/box@v0.0.0-20190710074902-1cbc4c2abdad/zapi/middleware/sentry.go", line 83, in func1
next.ServeHTTP(w, r)
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/git.xixi.com/bit/zerzura@v4.1.1+incompatible/rest/middleware/stats.go", line 66, in 1
next.ServeHTTP(lw, r1)
File "net/http/server.go", line 1995, in ServeHTTP
f(w, r)
File "/go/pkg/mod/github.com/go-chi/chi@v3.3.2+incompatible/mux.go", line 81, in ServeHTTP
mx.handler.ServeHTTP(w, r)
File "net/http/server.go", line 2774, in ServeHTTP
handler.ServeHTTP(rw, req)
File "net/http/server.go", line 1878, in serve
serverHandler{c.server}.ServeHTTP(w, w.req)

由于我们是将请求的 Context 一直传递到最下层的,而在父 Context 收到取消信号后也会通知到子 Context,所以我们有理由相信这个取消的触发是在某个父 Context 节点。但具体是在哪儿,什么原因导致的并不清楚。

修复问题 & 添加检测

不过为了避免因为 sql/driver 层收到 Context Cancel 信号而导致查询失败,进而导致后续的处理流程未能执行,我们决定先修复问题,再排查原因。那怎么修复呢?其实非常简单,我们提供了一个不带 Cancel 的 Context 继续往 sql/driver 层传递,但该 Context 同样继承了父 Context 的 Value,这样一些元信息也可以被继续传递下去。同时为了在检查到原有子 Context 收到 Cancel 信号时,报告详细的错误和 Context String,我们也实现了一个简单的检测器。相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
type noCancelCtx struct {
ctx context.Context
}
func (c *noCancelCtx) Deadline() (time.Time, bool) {
return time.Time{}, false
}
func (c *noCancelCtx) Done() <-chan struct{} {
return nil
}
func (c *noCancelCtx) Err() error {
return nil
}
func (c *noCancelCtx) Value(key interface{}) interface{} {
return c.ctx.Value(key)
}
func WithoutCancel(ctx context.Context) context.Context {
return &noCancelCtx{ctx: ctx}
}
type Detector struct {
where string
isDone bool
ctx context.Context
}
func NewContextDoneDetector(ctx context.Context) *Detector {
return &Detector{ctx: ctx}
}
func (d *Detector) Detect() {
if d.isDone {
return
}
select {
case <-d.ctx.Done():
d.isDone = true
var where string
_, file, line, ok := runtime.Caller(1)
if ok {
where = fmt.Sprintf("%s:%d", file, line)
} else {
where = "unknown"
}
log.WithContext(d.ctx).Errorf("detect context done signal in \"%s\". context is %s", where, d.ctx)
default:
}
}

然后对原有的业务代码进行一些改造如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Create 创建用户学习进度
func (learnProgressDB *LearnProgressDAO) Create(ctx context.Context, memberID int64, unitID int64, bizType int16, progress int32, clientUpdatedAt int64) *LearnProgress {
d := utils.NewContextDoneDetector(ctx)
d.Detect()
// ...此处省略 N 行
d.Detect()
// 替换成不带 Cancel 的 Context
ctxWithoutCancel := utils.WithoutCancel(ctx)
result, err := db.Exec(ctxWithoutCancel, i, args...)
// ...继续省略
d.Detect()
object := db.QueryRow(ctxWithoutCancel, "SELECT id, member_id, unit_id, biz_type, progress, "+
"client_updated_at FROM learn_progress WHERE id = ?", id)
learnProgress := &LearnProgress{}
d.Detect()
err = object.Scan(&learnProgress.ID, &learnProgress.MemberID, &learnProgress.UnitID, &learnProgress.BizType,
&learnProgress.Progress, &learnProgress.ClientUpdatedAt)
if err != nil {
log.WithContext(ctx).WithError(err).Errorf("create and get progress %d error", id)
return nil
}
d.Detect()
return learnProgress
}

在上线后,因为 context canceled 而导致请求处理流程不能走完的问题解决了。同时也报告出很多检查到上层 Context 取消的信号,详细的日志如下:
context log

分析 HTTP Server 源码

显然,从日志中可以看到有两处 Cancel Context 有极大的嫌疑,那么剩下的问题就是要确定这两个 Cancel Context 是怎么来的?这样,接下来我们再去确认是哪个 Cancel Context 在哪优先被 cancel 从而导致子节点收到了 ctx.Done() 信号,不就可以解答疑惑了吗?

分析这棵 Context 树可以发现,我们优先去看 HTTP Server 处理请求部分的代码,就最容易找顺着请求处理的各个流程来定位到 Cancel Context 是在何处生成的,以及在何处会被调用的。

一般我们启动 Server 是调用了 ListenAndServe 接口,顺着该接口往下分析即可找到线索,详细的分析如下(和我们确定问题无关紧要的代码先忽略了):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// ListenAndServe 用于启动服务并监听指定的端口
func (srv *Server) ListenAndServe() error {
addr := srv.Addr
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}

// Serve 用于接收请求连接,并为每个新的连接服务创建一个 service goroutine。
func (srv *Server) Serve(l net.Listener) error {
// ... 此处省略不少
var tempDelay time.Duration // how long to sleep on accept failure
// 这里创建了一个根 context
baseCtx := context.Background() // base is always background, per Issue 16220
// 第一个 WithValue 正是 `http.&contextKey{"http-server"}`
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, e := l.Accept()
// ...此处省略很多
c := srv.newConn(rw)
c.setState(c.rwc, StateNew) // before Serve can return
// 启动新的 service goroutine 处理连接服务,上面的 ctx 被传递进去了!!
go c.serve(ctx)
}
}

//
// serve 读取请求,并调用 `srv.Handler` 来处理请求,进而执行到业务逻辑,处理完请求后
// 给客户端返回响应
func (c *conn) serve(ctx context.Context) {
// 注意,这里添加了 `http.&contextKey{"local-addr"}` 子 Context
ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())
// ...此处省略很多
// HTTP/1.x from here on.
// Bingo,这里看到了第一个 WithCancel 创建的子 Context 了
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
// 注意这里的 checkConnErrorWriter,后面分析会涉及
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
for {
// readRequest 会返回一个 response,其中包括添加子 Cancel Context
w, err := c.readRequest(ctx)
// ...此处省略很多
// 注意这里的 startBackgroundRead,下面分析会看到
if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
w.conn.r.startBackgroundRead()
}

// 调用 ServeHTTP,进而将请求传递到业务代码中,等待处理完毕
// 在上述日志中,我们可以看到,在这个调用没有结束的时候,context
// 已经 cancel 了。而这个调用中 我们确认没有异步 cancel context 的代码
// 并且,`*http.response` 即 `w` 这个结构体中 `cancelCtx` 是个私有字段,不会
// 被外部访问到,所以不可能在 ServeHTTP 期间调用了 `cancelCtx` 函数
serverHandler{c.server}.ServeHTTP(w, w.req)
// 通过对代码的分析,只有此处调用了一次 `*http.response` 里面的 cancelCtx
// 所以我们确认导致下层收到 context cancel 信号的触发点不在此处!
w.cancelCtx()
w.finishRequest()
if !w.shouldReuseConnection() {
if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
c.closeWriteAndWait()
}
return
}
// ... 此处省略
}
}

// readRequest 从连接中读取下一个请求
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
// ...此处省略很多
// 可以看到第二个 cancel context 节点诞生了
ctx, cancelCtx := context.WithCancel(ctx)
req.ctx = ctx
// ...此处省略很多
w = &response{
conn: c,
cancelCtx: cancelCtx,
req: req,
reqBody: req.Body,
handlerHeader: make(Header),
contentLength: -1,
closeNotifyCh: make(chan bool, 1),
// We populate these ahead of time so we're not
// reading from req.Header after their Handler starts
// and maybe mutates it (Issue 14940)
wants10KeepAlive: req.wantsHttp10KeepAlive(),
wantsClose: req.wantsClose(),
}
if isH2Upgrade {
w.closeAfterReply = true
}
w.cw.res = w
w.w = newBufioWriterSize(&w.cw, bufferBeforeChunkingSize)
return w, nil
}

根据详细打印的 Context 日志,并结合 HTTP Server 处理部分的代码分析,可以简单绘制出这棵 Context 树大体如下:
context tree

在进行 Server 处理连接请求的源码中,可以发现不太可能是第二个 Cancel Context 发送的取消信号。那么,问题只能出现在一个 Cancel Context 上面了。
接下来,就看看 Connection 关联的 cancelCtx() 究竟会在哪几处调用?利用搜索可以找到如下两个嫌疑很大的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// handleReadError 在从客户端读取失败时会被调用。这里的错误之所以
// 被省略,是因为错误通常就是 io.EOF 或者 "use of closed network connection"
// 标准库认为我们对具体报错不感兴趣,所以连 error 是什么在业务代码中是无法获取
// 到的。
// 总之,执行到此处,就意味着连接已经挂了,所以一定要通知取消 context
func (cr *connReader) handleReadError(_ error) {
cr.conn.cancelCtx()
cr.closeNotify()
}

func (w checkConnErrorWriter) Write(p []byte) (n int, err error) {
n, err = w.c.rwc.Write(p)
if err != nil && w.c.werr == nil {
w.c.werr = err
w.c.cancelCtx()
}
return
}

通过进一步分析,checkConnErrorWriter 只有在请求处理完毕,w.finishRequest() 时才可能会在某个时刻被调用。所以不可能是这里的 cancelCtx() 调用导致的,因为在报错时,显然还没有完成 ServeHTTP() 的流程。一通排查下来,只可能是在 handleReadError() 时报错了。结合网上的搜索信息,我们判断极有可能是客户端连接断开导致的大量报错,也就是说 handleReadError() 被调用才导致的。

添加 HTTP Middleware 监测连接断开

那么我们如何验证的确是 connection closed 导致的呢?通过上面的代码可以看到,在处理错误时调用了 closeNotify() 方法,该方法会将 *http.responsecloseNotifyCh 发送一个 true 值。进一步发现,*http.response 实现了接口 CloseNotifier,所以我们可以在代码中监听这个信号来进一步验证连接是不是真的断开了。

为此,我们实现了一个简单的中间件,启动一个 goroutine 去监听关闭信号,并在收到信号时向 Sentry 中打印相关报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func MonitorCloseNotifier(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" && strings.EqualFold(r.URL.Path, "/check_health") {
next.ServeHTTP(w, r)
} else {
go func() {
defer func() {
if err := recover(); err != nil {
log.Warn(err)
}
}()
cc := w.(http.CloseNotifier).CloseNotify()
value := <-cc
ctx := context.WithValue(
r.Context(),
log.SentrySpecificMetaCtxKey,
collectSentryMeta(r, "login_id"),
)
log.WithContext(ctx).Errorf(
"connection read error, maybe 'use of closed network connection' or 'io.EOF'. return value: %v", value)
}()
next.ServeHTTP(w, r)
}
})
}

在完成代码变更,并进行金丝雀小流量验证时,在 Sentry 上看到了相关的报错。由此确认是在何处因为什么导致了 Context Cancel。
connection error

小结

历经多次代码变更和日志分析才最终确定在何处因为什么导致了 Context Cancel,整个过程非常坎坷。那为什么没有直接进行调试呢?那样定位问题不是更快速些吗?原因是这样,最开始我们并不知道什么原因导致的;这样也就没法在本地复现问题,由于这些问题是在线上产生的,也不大可能直接在线上拦截用户请求并调试,那样可能更加繁琐、耗时,且实施成本更大(因为我们也不知道哪个用户使用什么设备在什么时候会发生问题)。

所以采取分析错误日志加验证的方式来确定问题所在。当然,之所以这么麻烦也是因为 Context Cancel 时提供的 Error 太单一了。如果最初 API 设计时就能提供自定义的错误,那么我们可以根据具体错误来定位到可能产生报错的位置,这样会更加快捷!

Context

context 包最初是由 Google 官方开发,并在 Go 1.7 版本正式引入到标准库中的。引入该包的目的是为了提供统一的姿势处理超时、取消信号传递和在 API 之间传递请求上下文数据。它提供了几个重要的接口用于创建 Context 树🌲:

  • WithCancel
  • WithDeadline
  • WithTimeout
  • WithValue

这些接口会接收一个 parent context,并返回一个 derived context。在我们的代码中,应该层层传递该 context,一般约定函数的第一个参数就是 context,签名类似:func foo(ctx context.Context)

对于 WithCancel, WithDeadline, WithTimeout 而言,它们都会返回 cancelFunc 供使用者调用。当 cancelFunc 被调用时,除了自身会被取消外,其子 context 都会被取消。示例图如下:
context cancel

源码分析

Context Interface

Context 本身是一个接口,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type Context interface {
// Deadline 会返回什么时间 context 会被取消。如果没有设置
// 过期时间,则 ok 返回 false。
Deadline() (deadline time.Time, ok bool)

// Done 在 context 被取消时对应的 channel 会被关闭,从而达到
// 通知正在监听的 goroutine 终止手头工作的目的。对于不可取消的
// context,则返回 nil。
// 对于 WithCancel, WithDeadline, WithTimeout 而言,最终都会
// 关闭 done channel。
Done() <-chan struct{}

// Err 会在 Done 被关闭时,返回错误(这里的错误在 context 包内仅限 Canceled 和超时 DeadlineExceeded)
Err() error

// Value 用于返回存储在 Context 中指定 key 对应的上下文数据。如果找不到就返回空。
// 这里如果在当前 context 找不到,就会一直往上找 parent 直到根节点。
// 仅限于使用 Context 存储一些请求相关(request-scoped)数据,并在应用中传递,
// 对于一些额外的参数,传递绝对不推荐使用它!
Value(key interface{}) interface{}
}

TODO & Background

context 包中定义了两种 emptyCtx,分别是 todobackground。相关实现非常简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// emptyCtx 是不会被取消,无任何值和 deadline 的。之所以没有使用空结构体(struct{})
// 是因为要保证该类型的每个值都要有不同的地址
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { return }
func (*emptyCtx) Done() <-chan struct{} { return nil }
func (*emptyCtx) Err() error { return nil }
func (*emptyCtx) Value(key interface{}) interface{} { return nil }

var (
background = new(emptyCtx)
todo = new(emptyCtx)
)

// Background 通常在 main 函数、测试中初始化,或者请求对应的顶层 Context
func Background() Context { return background }

// TODO 通常在不知道该用什么 Context 的时候,可以使用它
func TODO() Context { return todo }

WithCancel

我们通常使用 WithCancel() 接口来创建一个可被取消的 Context,该接口会返回一个新的子 Context 节点和用于取消时调用的函数 cancelFunc。相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// CancelFunc 通知取消任务,不会等待任务执行完毕;只能被有效调用一次
type CancelFunc func()

// WithCancel 会返回一个 parent 的拷贝,同时带有 Done channel
// 取消该 context 时会释放关联的资源,所以当该 Context 完成时需要尽快调用 cancel
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}

// newCancelCtx 会返回一个初始化好的 cancelCtx 实例
// 关于什么是 cancelCtx 会在下面分析它的源码
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}

// propagateCancel 本质上是为了将子 canceler 挂载到父 canceler 节点上
// 这样在父节点收到取消通知时,才能一一通知到子节点
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
// parent 不支持取消的情况
return // parent is never canceled
}
// 这里只是判断是不是有 context 包中定义的
// cancelCtx 结构而已
// parentCancelCtx(parent) 确认 parent 是否为
// cancelCtx 或者 timerCtx 类型
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
// parent 如果被取消,自然不需要 removeChild,因为
// parent 对应的子树会被 detatch 掉,确保释放资源
child.cancel(false, p.err)
} else {
// 延迟初始化了 children
if p.children == nil {
// 为什么是放在字典,而非列表呢?
// 原因很简单,是为了方便删除 child
// delete(p.children, child)
p.children = make(map[canceler]struct{})
}
// 把子节点挂载到 cancel context 的父节点上
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
// 对于父节点返回 Done channel 的进行监听
child.cancel(false, parent.Err())
case <-child.Done():
// 等待 cancel ctx 被结束
// 这里只可能是 timerCtx, cancelCtx
// goroutine 退出
}
}()
}
}

对于可被取消的 Context 都实现了下面的接口:

1
2
3
4
5
6
// canceler 就是实现了 cancel 的 context 类型。在标准库中
// 仅有 `cancelCtx` 和 `timerCtx` 实现了该接口
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}

在分析 WithCancel 的源码时,可以发现,我们创建了一个 cancelCtx 实例,并将原有的 Context 作为父节点记录了下来。那么 cancelCtx 是怎么实现的呢?又是如何处理取消逻辑的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
// Context 会指向 parent
Context
// mu 用来保证 goroutine 安全
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
// 之所以使用字典来存储,是因为不关心子节点顺序,同时为了方便删除
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
// Done 会返回一个延迟初始化的 chan 供下游监听完成信号
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
// 延迟初始化
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}

func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}

// cancel 负责关闭 c.done chan,同时取消每个子节点
// 并根据需要将对应的节点从它的父节点的 children 中移除
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
if c.done == nil {
// 确保下次调用时已经关闭了
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
// 依次 cancel 所有的子节点
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}

WithDeadline & WithTimeout

如果我们需要对 goroutine 设置超时或者到达指定时间后退出的话,就可以使用 WithDeadline()WithTimeout() 来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// WithDeadline 会创建一个带有到期时间控制的 context,在时间到达后会自动
// 关闭 done channel,同时下游的节点也会收到取消通知。需要注意的是,对于
// 这种类型的 cancel,对应的 Error 是 DeadlineExceeded
// 另外,返回的 cancelFunc 一定要被调用一次,确保资源最终能被释放
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
// 如果父节点提前结束,则不用再为子节点添加额外的计时资源了
// 因为当父节点结束时,子节点也会被通知到
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// 这里,如果说已经到期了,自然没必要引入一个计时器资源
// 所以直接返回一个 Cancel Context 了
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
// 如果指定的时间点已经过了的话,则直接取消
c.cancel(true, DeadlineExceeded)
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
// 当时间到了后,会执行 cancel 方法
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}

// WithTimeout 指定超时时间的 Context,其实就是复用了 WithDeadline 方法
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}

timerCtx 是实现带定时功能的 Context 类型,它的实现比较简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type timerCtx struct {
cancelCtx
timer *time.Timer // timer 资源的访问是由 cancelCtx 中的 Lock 来保障的
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) { return c.deadline, true }

// cancel 主要在 timerCtx 取消时,释放掉 timer 资源
func (c *timerCtx) cancel(removeFromParent bool, err error) {
// 将当前节点的 done channel close 掉,同时取消相关子节点
c.cancelCtx.cancel(false, err)
if removeFromParent {
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
// 释放 timer 资源
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}

WithValue

通常使用 WithValue() 方法给 Context 附加一些请求相关的上下文数据,它的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
// 保证 Key 本身是可比较的即可
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}

type valueCtx struct {
Context
key, val interface{}
}

func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
// 往 parent 去找,一直找到为止
return c.Context.Value(key)
}

应用场景

传递请求相关的数据

使用 WithValue 将需要的请求数据存储在 Context 中,方便向下传递。我们一般要尽量避免在业务代码中直接解析 Context 中的某些 Key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type contextKey struct{ name string }

var (
// 不要使用 string key,避免冲突
traceIDCtxKey = &contextKey{name: "request-trace-id"}
)

// 建议使用函数封装下,对外提供统一的接口方便使用
func WithTraceID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, traceIDCtxKey, id)
}

func TraceIDFrom(ctx context.Context) string {
if traceID, ok := ctx.Value(traceIDCtxKey).(string); ok {
return traceID
} else {
return ""
}
}

func exampleWithValue(ctx context.Context) {
doStuff := func(ctx context.Context) {
traceID := TraceIDFrom(ctx)
fmt.Printf("got request trace id: %s\n", traceID)
}

// 模拟传递请求 trace id,这些可以被日志框架等提取并存储到日志中
id, _ := uuid.GenerateUUID()
ctx = WithTraceID(ctx, id)
doStuff(ctx)
}

超时控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// exampleWithTimeout 模拟一组请求,并且假设每个请求处理时间为 0~3 秒的随机时间
// 同时我们将超时时间设置为 1 秒,这样在请求期间就会有些因为超时而主动结束后续执行流程
// 的 goroutine 收到取消信号
func exampleWithTimeout(ctx context.Context) {
doRequest := func(ctx context.Context, id int) {
// 模拟请求
time.Sleep(time.Duration(rand.Int63n(3)) * time.Second)
select {
case <-ctx.Done():
fmt.Printf("[%d]request canceled\n", id)
default:
// 处理其它业务逻辑
}
}
ctx, cancel := context.WithTimeout(ctx, time.Second*1)
defer cancel()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
doRequest(ctx, id+1)
}(i)
}
wg.Wait()
}

总结

  1. 不建议将 Context 存放在结构体中,而应该作为参数显式传递;一般接收 Context 作为参数的函数,应该将该参数放在第一个参数位置(这个是惯例了);
  2. 在给函数传递 Context 时,如果不知道用什么 Context,就可以使用 Context.TODO(),避免传递 nil
  3. Context Value 需要有约束,必须应该符合请求相关的上下文数据,并且一般是在框架或者 HTTP 中间件中设置 Value,业务代码中尽可能避免直接操作 Context Value。在我们的业务代码中应当显式传参,这样可以利用静态语言的特性,使得一些问题可以在编译阶段就能发现;不要为了图方便,用 Context 带一些可选参数。
  4. 在使用 WithValue(key, value) 时,Key 应该避免使用字符串,防止名字冲突和污染;
  5. 在使用 WithCancel, WithTimeout, WithDeadline 时,一定要保证返回的 cancel 方法至少被调用一次,避免 goroutine 泄露或者其它资源泄露;
  6. 由于目前在标准库以及一些第三方库中都在使用 Context,一路传递下来,整体链路很长。而当发生 Context Cancel 时,排查起来就非常麻烦。所以需要对所使用各类框架或者库在整个执行过程的生命周期需要有一定的了解,再配合日志等手段进行排查起来会更加有效。

参考

  1. Go语言实战笔记(二十)| Go Context
  2. Go Concurrency Patterns: Context
  3. How to correctly use context.Context in Go 1.7
  4. Go 1.7 httptrace and context debug patterns
  5. Go: Context and Cancellation by Propagation
  6. How to correctly use context.Context in Go 1.7
  7. 深入理解 Golang HTTP Timeout
  8. Pitfalls of context values and how to avoid or mitigate them in Go
0%