Go 博客

Go 并发模式:Context

Sameer Ajmani
2014 年 7 月 29 日

引言

在 Go 服务器中,每个传入请求都在其自己的 goroutine 中处理。请求处理程序通常会启动额外的 goroutine 来访问后端服务,例如数据库和 RPC 服务。处理请求的一组 goroutine 通常需要访问请求特定的值,例如终端用户的身份、授权令牌和请求的截止时间。当请求被取消或超时时,处理该请求的所有 goroutine 都应该快速退出,以便系统可以回收它们正在使用的所有资源。

在 Google,我们开发了一个 context 包,它使得跨 API 边界向处理请求所涉及的所有 goroutine 传递请求范围的值、取消信号和截止时间变得容易。该包以 context 的形式公开可用。本文介绍了如何使用该包并提供了一个完整的运行示例。

Context

context 包的核心是 Context 类型

// A Context carries a deadline, cancellation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
    // Done returns a channel that is closed when this Context is canceled
    // or times out.
    Done() <-chan struct{}

    // Err indicates why this context was canceled, after the Done channel
    // is closed.
    Err() error

    // Deadline returns the time when this Context will be canceled, if any.
    Deadline() (deadline time.Time, ok bool)

    // Value returns the value associated with key or nil if none.
    Value(key interface{}) interface{}
}

(此描述是精简的;godoc 才是权威文档。)

Done 方法返回一个 channel,它作为运行在代表 Context 的函数上的取消信号:当 channel 关闭时,函数应放弃其工作并返回。Err 方法返回一个错误,指示 Context 被取消的原因。Pipelines and Cancellation(流水线和取消) 一文更详细地讨论了 Done channel 的用法习惯。

Context 没有 Cancel 方法,原因与 Done channel 是只接收的原因相同:接收取消信号的函数通常不是发送信号的函数。特别是,当父操作为子操作启动 goroutine 时,这些子操作不应能够取消父操作。相反,WithCancel 函数(下文描述)提供了一种取消新的 Context 值的方法。

Context 可以安全地被多个 goroutine 同时使用。代码可以将单个 Context 传递给任意数量的 goroutine,并通过取消该 Context 来向它们全部发送信号。

Deadline 方法允许函数确定它们是否应该开始工作;如果剩余时间太少,可能就不值得了。代码还可以使用截止时间来设置 I/O 操作的超时时间。

Value 允许 Context 携带请求范围的数据。这些数据必须能够安全地被多个 goroutine 同时使用。

派生上下文

context 包提供了从现有 Context派生新值的功能。这些值形成一个树形结构:当一个 Context 被取消时,所有从它派生的 Contexts 也会被取消。

Background 是任何 Context 树的根;它永远不会被取消

// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level Context for incoming requests.
func Background() Context

WithCancelWithTimeout 返回派生的 Context 值,它们可以比父 Context 更早被取消。与传入请求关联的 Context 通常在请求处理程序返回时被取消。WithCancel 在使用多个副本时对于取消冗余请求也很有用。WithTimeout 对于设置对后端服务器请求的截止时间很有用

// WithCancel returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed or cancel is called.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// A CancelFunc cancels a Context.
type CancelFunc func()

// WithTimeout returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed, cancel is called, or timeout elapses. The new
// Context's Deadline is the sooner of now+timeout and the parent's deadline, if
// any. If the timer is still running, the cancel function releases its
// resources.
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithValue 提供了一种将请求范围的值与 Context 关联起来的方法

// WithValue returns a copy of parent whose Value method returns val for key.
func WithValue(parent Context, key interface{}, val interface{}) Context

理解如何使用 context 包的最佳方式是通过一个实际示例。

我们的示例是一个 HTTP 服务器,它通过将查询“golang”转发到 Google 网页搜索 API 并渲染结果来处理诸如 /search?q=golang&timeout=1s 的 URL。timeout 参数告诉服务器在该时长过去后取消请求。

代码分为三个包

  • server 包提供了 main 函数和 /search 的处理程序。
  • userip 包提供了从请求中提取用户 IP 地址并将其与 Context 关联的功能。
  • google 包提供了用于向 Google 发送查询的 Search 函数。

服务器程序

server 程序通过返回 golang 的前几个 Google 搜索结果来处理诸如 /search?q=golang 的请求。它注册 handleSearch 来处理 /search 端点。处理程序创建一个名为 ctx 的初始 Context,并安排在处理程序返回时取消它。如果请求包含 timeout URL 参数,则在超时过后 Context 会自动取消

func handleSearch(w http.ResponseWriter, req *http.Request) {
    // ctx is the Context for this handler. Calling cancel closes the
    // ctx.Done channel, which is the cancellation signal for requests
    // started by this handler.
    var (
        ctx    context.Context
        cancel context.CancelFunc
    )
    timeout, err := time.ParseDuration(req.FormValue("timeout"))
    if err == nil {
        // The request has a timeout, so create a context that is
        // canceled automatically when the timeout expires.
        ctx, cancel = context.WithTimeout(context.Background(), timeout)
    } else {
        ctx, cancel = context.WithCancel(context.Background())
    }
    defer cancel() // Cancel ctx as soon as handleSearch returns.

处理程序从请求中提取查询,并通过调用 userip 包提取客户端的 IP 地址。客户端的 IP 地址是后端请求所必需的,因此 handleSearch 将其附加到 ctx

    // Check the search query.
    query := req.FormValue("q")
    if query == "" {
        http.Error(w, "no query", http.StatusBadRequest)
        return
    }

    // Store the user IP in ctx for use by code in other packages.
    userIP, err := userip.FromRequest(req)
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    ctx = userip.NewContext(ctx, userIP)

处理程序使用 ctxquery 调用 google.Search

    // Run the Google search and print the results.
    start := time.Now()
    results, err := google.Search(ctx, query)
    elapsed := time.Since(start)

如果搜索成功,处理程序会渲染结果

    if err := resultsTemplate.Execute(w, struct {
        Results          google.Results
        Timeout, Elapsed time.Duration
    }{
        Results: results,
        Timeout: timeout,
        Elapsed: elapsed,
    }); err != nil {
        log.Print(err)
        return
    }

userip 包

userip 包提供了从请求中提取用户 IP 地址并将其与 Context 关联的功能。Context 提供一个键值映射,其中键和值都属于 interface{} 类型。键类型必须支持相等性比较,并且值必须能够安全地被多个 goroutine 同时使用。像 userip 这样的包隐藏了这种映射的细节,并提供对特定 Context 值的强类型访问。

为了避免键冲突,userip 定义了一个未导出的类型 key,并使用该类型的一个值作为上下文键

// The key type is unexported to prevent collisions with context keys defined in
// other packages.
type key int

// userIPkey is the context key for the user IP address.  Its value of zero is
// arbitrary.  If this package defined other context keys, they would have
// different integer values.
const userIPKey key = 0

FromRequesthttp.Request 中提取 userIP

func FromRequest(req *http.Request) (net.IP, error) {
    ip, _, err := net.SplitHostPort(req.RemoteAddr)
    if err != nil {
        return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr)
    }

NewContext 返回一个新的 Context,其中携带有提供的 userIP

func NewContext(ctx context.Context, userIP net.IP) context.Context {
    return context.WithValue(ctx, userIPKey, userIP)
}

FromContextContext 中提取 userIP

func FromContext(ctx context.Context) (net.IP, bool) {
    // ctx.Value returns nil if ctx has no value for the key;
    // the net.IP type assertion returns ok=false for nil.
    userIP, ok := ctx.Value(userIPKey).(net.IP)
    return userIP, ok
}

google 包

google.Search 函数向 Google 网页搜索 API 发出 HTTP 请求并解析 JSON 编码的结果。它接受一个 Context 参数 ctx,如果在请求进行中 ctx.Done 关闭,则立即返回。

Google 网页搜索 API 请求将搜索查询和用户 IP 作为查询参数包含在内

func Search(ctx context.Context, query string) (Results, error) {
    // Prepare the Google Search API request.
    req, err := http.NewRequest("GET", "https://ajax.googleapis.ac.cn/ajax/services/search/web?v=1.0", nil)
    if err != nil {
        return nil, err
    }
    q := req.URL.Query()
    q.Set("q", query)

    // If ctx is carrying the user IP address, forward it to the server.
    // Google APIs use the user IP to distinguish server-initiated requests
    // from end-user requests.
    if userIP, ok := userip.FromContext(ctx); ok {
        q.Set("userip", userIP.String())
    }
    req.URL.RawQuery = q.Encode()

Search 使用一个辅助函数 httpDo 来发出 HTTP 请求,并在请求或响应处理过程中 ctx.Done 关闭时取消它。Search 将一个闭包传递给 httpDo 来处理 HTTP 响应

    var results Results
    err = httpDo(ctx, req, func(resp *http.Response, err error) error {
        if err != nil {
            return err
        }
        defer resp.Body.Close()

        // Parse the JSON search result.
        // https://developers.google.com/web-search/docs/#fonje
        var data struct {
            ResponseData struct {
                Results []struct {
                    TitleNoFormatting string
                    URL               string
                }
            }
        }
        if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
            return err
        }
        for _, res := range data.ResponseData.Results {
            results = append(results, Result{Title: res.TitleNoFormatting, URL: res.URL})
        }
        return nil
    })
    // httpDo waits for the closure we provided to return, so it's safe to
    // read results here.
    return results, err

httpDo 函数在一个新的 goroutine 中运行 HTTP 请求并处理其响应。如果在 goroutine 退出之前 ctx.Done 关闭,它会取消请求。

func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
    // Run the HTTP request in a goroutine and pass the response to f.
    c := make(chan error, 1)
    req = req.WithContext(ctx)
    go func() { c <- f(http.DefaultClient.Do(req)) }()
    select {
    case <-ctx.Done():
        <-c // Wait for f to return.
        return ctx.Err()
    case err := <-c:
        return err
    }
}

调整代码以适应 Context

许多服务器框架提供了用于携带请求范围值的包和类型。我们可以定义 Context 接口的新实现,以便在使用现有框架的代码和期望 Context 参数的代码之间建立桥梁。

例如,Gorilla 的 github.com/gorilla/context 包允许处理程序通过提供 HTTP 请求到键值对的映射来将数据与传入请求相关联。在 gorilla.go 中,我们提供了一个 Context 实现,其 Value 方法返回在 Gorilla 包中与特定 HTTP 请求关联的值。

其他一些包也提供了类似于 Context 的取消支持。例如,Tomb 提供了一个 Kill 方法,通过关闭 Dying channel 来发出取消信号。Tomb 还提供了等待这些 goroutine 退出的方法,类似于 sync.WaitGroup。在 tomb.go 中,我们提供了一个 Context 实现,当其父 Context 被取消或提供的 Tomb 被杀死时,该 Context 也会被取消。

结论

在 Google,我们要求 Go 程序员将 Context 参数作为传入请求和传出请求之间的调用路径上每个函数的第一个参数传递。这使得由许多不同团队开发的 Go 代码能够很好地互操作。它提供了对超时和取消的简单控制,并确保像安全凭证这样的关键值在 Go 程序中正确传递。

希望基于 Context 构建的服务器框架应该提供 Context 的实现,以便在其包和期望 Context 参数的包之间建立桥梁。它们的客户端库随后会从调用代码接受一个 Context。通过为请求范围的数据和取消建立一个公共接口,Context 使包开发者更容易共享用于创建可伸缩服务的代码。

延伸阅读

下一篇文章:Go 在 OSCON
上一篇文章:Go 将参加 OSCON 2014
博客索引