背景

在go服务器中,对于每个请求的request都是在单独的goroutine中进行的,处理一个request也可能设计多个goroutine之间的交互, 使用context可以使开发者方便的在这些goroutine里传递request相关的数据、取消goroutine的signal或截止日期。

Context是Golang官方定义的一个package,它定义了Context类型,里面包含了Deadline/Done/Err方法以及绑定到Context上的成员变量值Value,具体定义如下:

 

Context结构

// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
    // Done returns a channel that is closed when this Context is canceled
    // or times out.
    Done() <-chan struct{}

    // Err indicates why this context was canceled, after the Done channel
    // is closed.
    Err() error

    // Deadline returns the time when this Context will be canceled, if any.
    Deadline() (deadline time.Time, ok bool)

    // Value returns the value associated with key or nil if none.
    Value(key interface{}) interface{}
}

 

那么到底什么Context?

可以字面意思可以理解为上下文,比较熟悉的有进程/线程上线文,关于Golang中的上下文,一句话概括就是:goroutine的相关环境快照,其中包含函数调用以及涉及的相关的变量值。
通过Context可以区分不同的goroutine请求,因为在Golang Severs中,每个请求都是在单个goroutine中完成的。

注:关于goroutine的理解可以移步这里

由于在Golang severs中,每个request都是在单个goroutine中完成,并且在单个goroutine(不妨称之为A)中也会有请求其他服务(启动另一个goroutine(称之为B)去完成)的场景,这就会涉及多个Goroutine之间的调用。如果某一时刻请求其他服务被取消或者超时,则作为深陷其中的当前goroutine B需要立即退出,然后系统才可回收B所占用的资源。
即一个request中通常包含多个goroutine,这些goroutine之间通常会有交互。

 

 那么,如何有效管理这些goroutine成为一个问题(主要是退出通知和元数据传递问题),Google的解决方法是Context机制,相互调用的goroutine之间通过传递context变量保持关联,这样在不用暴露各goroutine内部实现细节的前提下,有效地控制各goroutine的运行。

 

 如此一来,通过传递Context就可以追踪goroutine调用树,并在这些调用树之间传递通知和元数据。
虽然goroutine之间是平行的,没有继承关系,但是Context设计成是包含父子关系的,这样可以更好的描述goroutine调用之间的树型关系。

 

使用:

Done 方法在Context被取消或超时时返回一个close的channel,close的channel可以作为广播通知,告诉给context相关的函数要停止当前工作然后返回。

当一个父operation启动一个goroutine用于子operation,这些子operation不能够取消父operation。下面描述的WithCancel函数提供一种方式可以取消新创建的Context.

Context可以安全的被多个goroutine使用。开发者可以把一个Context传递给任意多个goroutine然后cancel这个context的时候就能够通知到所有的goroutine。

Err方法返回context为什么被取消。

Deadline返回context何时会超时。

Value返回context相关的数据。

继承的Context

BackGround(顶层Context:Background)

要创建Context树,首先就是要创建根节点

 

// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level Context for incoming requests.
func Background() Context

BackGound是所有Context的root,不能够被cancel。

该Context通常由接收request的第一个goroutine创建,它不能被取消、没有值、也没有过期时间,常作为处理request的顶层context存在。

 

 

下层Context:WithCancel/WithDeadline/WithTimeout

了根节点之后,接下来就是创建子孙节点。为了可以很好的控制子孙节点,Context包提供的创建方法均是带有第二返回值(CancelFunc类型),它相当于一个Hook,在子goroutine执行过程中,可以通过触发Hook来达到控制子goroutine的目的(通常是取消,即让其停下来)。再配合Context提供的Done方法,子goroutine可以检查自身是否被父级节点Cancel:

select { 
    case <-ctx.Done(): 
        // do some clean… 
}

 

注:父节点Context可以主动通过调用cancel方法取消子节点Context,而子节点Context只能被动等待。同时父节点Context自身一旦被取消(如其上级节点Cancel),其下的所有子节点Context均会自动被取消。

有三种创建方法:

// 带cancel返回值的Context,一旦cancel被调用,即取消该创建的context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) 

// 带有效期cancel返回值的Context,即必须到达指定时间点调用的cancel方法才会被执行
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) 

// 带超时时间cancel返回值的Context,类似Deadline,前者是时间点,后者为时间间隔
// 相当于WithDeadline(parent, time.Now().Add(timeout)).
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

 

 

 

WithCancel

WithCancel返回一个继承的Context,这个Context在父Context的Done被关闭时关闭自己的Done通道,或者在自己被Cancel的时候关闭自己的Done。
WithCancel同时还返回一个取消函数cancel,这个cancel用于取消当前的Context。

package main

import (
    "context"
    "log"
    "os"
    "time"
)

var logg *log.Logger

func someHandler() {
    ctx, cancel := context.WithCancel(context.Background())
    go doStuff(ctx)

//10秒后取消doStuff
    time.Sleep(10 * time.Second)
    cancel()

}

//每1秒work一下,同时会判断ctx是否被取消了,如果是就退出
func doStuff(ctx context.Context) {
    for {
        time.Sleep(1 * time.Second)
        select {
        case <-ctx.Done():
            logg.Printf("done")
            return
        default:
            logg.Printf("work")
        }
    }
}

func main() {
    logg = log.New(os.Stdout, "", log.Ltime)
    someHandler()
    logg.Printf("down")
}

 

返回:

E:\wdy\goproject>go run context_learn.go
15:06:44 work
15:06:45 work
15:06:46 work
15:06:47 work
15:06:48 work
15:06:49 work
15:06:50 work
15:06:51 work
15:06:52 work
15:06:53 down

 

package main

import (
    "context"
    "fmt"
    "time"
)

func someHandler() {
    // 创建继承Background的子节点Context
    ctx, cancel := context.WithCancel(context.Background())
    go doSth(ctx)

    //模拟程序运行 - Sleep 5秒
    time.Sleep(5 * time.Second)
    cancel()
}

//每1秒work一下,同时会判断ctx是否被取消,如果是就退出
func doSth(ctx context.Context) {
    var i = 1
    for {
        time.Sleep(1 * time.Second)
        select {
        case <-ctx.Done():
            fmt.Println("done")
            return
        default:
            fmt.Printf("work %d seconds: \n", i)
        }
        i++
    }
}

func main() {
    fmt.Println("start...")
    someHandler()
    fmt.Println("end.")
}

输出结果:

 

 

 

 

 

withDeadline withTimeout

WithTimeout func(parent Context, timeout time.Duration) (Context, CancelFunc)
WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).

WithTimeout 等价于 WithDeadline(parent, time.Now().Add(timeout)).

对上面的样例代码进行修改

func timeoutHandler() {
    // ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
    // go doTimeOutStuff(ctx)
    go doStuff(ctx)

    time.Sleep(10 * time.Second)

    cancel()

}

func main() {
    logg = log.New(os.Stdout, "", log.Ltime)
    timeoutHandler()
    logg.Printf("end")
}

返回:

15:59:22 work
15:59:24 work
15:59:25 work
15:59:26 work
15:59:27 done
15:59:31 end

可以看到doStuff在context超时的时候被取消了,ctx.Done()被关闭。
将context.WithDeadline替换为context.WithTimeout

func timeoutHandler() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    // ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
    // go doTimeOutStuff(ctx)
    go doStuff(ctx)

    time.Sleep(10 * time.Second)

    cancel()

}
16:02:47 work
16:02:49 work
16:02:50 work
16:02:51 work
16:02:52 done
16:02:56 end

doTimeOutStuff替换doStuff

func doTimeOutStuff(ctx context.Context) {
    for {
        time.Sleep(1 * time.Second)

        if deadline, ok := ctx.Deadline(); ok { //设置了deadl
            logg.Printf("deadline set")
            if time.Now().After(deadline) {
                logg.Printf(ctx.Err().Error())
                return
            }

        }

        select {
        case <-ctx.Done():
            logg.Printf("done")
            return
        default:
            logg.Printf("work")
        }
    }
}

func timeoutHandler() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    // ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
    go doTimeOutStuff(ctx)
    // go doStuff(ctx)

    time.Sleep(10 * time.Second)

    cancel()

}
16:03:55 deadline set
16:03:55 work
16:03:56 deadline set
16:03:56 work
16:03:57 deadline set
16:03:57 work
16:03:58 deadline set
16:03:58 work
16:03:59 deadline set
16:03:59 context deadline exceeded
16:04:04 end
WithTimeout
package main

import (
    "math/rand"
    "time"
    "sync"
    "fmt"
    "context"
)

func main()  {
    rand.Seed(time.Now().Unix())

    ctx,_:=context.WithTimeout(context.Background(),time.Second*3)

    var wg sync.WaitGroup
    wg.Add(1)
    go GenUsers(ctx,&wg)
    wg.Wait()

    fmt.Println("生成幸运用户成功")
}
func GenUsers(ctx context.Context,wg *sync.WaitGroup)  { //生成用户ID
fmt.Println("开始生成幸运用户")
   users:=make([]int,0)
   guser:for{
        select{
           case <- ctx.Done(): //代表父context发起 取消操作

             fmt.Println(users)
               wg.Done()
             break guser
             return
        default:
            users=append(users,getUserID(1000,100000))
        }
   }

}
func getUserID(min int ,max int) int  {
    return rand.Intn(max-min)+min
}

 

 

 

 

context deadline exceeded就是ctx超时的时候ctx.Err的错误消息。

搜索测试程序

完整代码参见官方文档Go Concurrency Patterns: Context,其中关键的地方在于函数httpDo

func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
    // Run the HTTP request in a goroutine and pass the response to f.
    tr := &http.Transport{}
    client := &http.Client{Transport: tr}
    c := make(chan error, 1)
    go func() { c <- f(client.Do(req)) }()
    select {
    case <-ctx.Done():
        tr.CancelRequest(req)
        <-c // Wait for f to return.
        return ctx.Err()
    case err := <-c:
        return err
    }
}

httpDo关键的地方在于

 select {
    case <-ctx.Done():
        tr.CancelRequest(req)
        <-c // Wait for f to return.
        return ctx.Err()
    case err := <-c:
        return err
    }

要么ctx被取消,要么request请求出错。

 

httpserver中实现超时

package main

import (
    "net/http"
    "context"
    "time"
)

func CountData(c chan string) chan string {
    time.Sleep(time.Second*5)
    c<- "统计结果"
    return c
}

type IndexHandler struct {}
func(this *IndexHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)  {
    if r.URL.Query().Get("count")==""{
        w.Write([]byte("这是首页"))
    }else {
        ctx,cancel:=context.WithTimeout(r.Context(),time.Second*3)
        defer cancel()
        c:=make(chan string)
        go CountData(c)
        select {
           case <-ctx.Done():
               w.Write([]byte("超时"))
            case ret:=<-c:
                w.Write([]byte(ret))
        }


    }


}

func main()  {
     mux:=http.NewServeMux()
     mux.Handle("/",new(IndexHandler))

     http.ListenAndServe(":8082",mux)
}

 

 

超时场景:

package main

import (
    "context"
    "fmt"
    "time"
)

func timeoutHandler() {
    // 创建继承Background的子节点Context
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    go doSth(ctx)

    //模拟程序运行 - Sleep 10秒
    time.Sleep(10 * time.Second)
    cancel() // 3秒后将提前取消 doSth goroutine
}

//每1秒work一下,同时会判断ctx是否被取消,如果是就退出
func doSth(ctx context.Context) {
    var i = 1
    for {
        time.Sleep(1 * time.Second)
        select {
        case <-ctx.Done():
            fmt.Println("done")
            return
        default:
            fmt.Printf("work %d seconds: \n", i)
        }
        i++
    }
}

func main() {
    fmt.Println("start...")
    timeoutHandler()
    fmt.Println("end.")
}

 

 

 

WithValue

func WithValue(parent Context, key interface{}, val interface{}) Context
// NewContext returns a new Context carrying userIP.
func NewContext(ctx context.Context, userIP net.IP) context.Context {
    return context.WithValue(ctx, userIPKey, userIP)
}

// FromContext extracts the user IP address from ctx, if present.
func FromContext(ctx context.Context) (net.IP, bool) {
    // ctx.Value returns nil if ctx has no value for the key;
    // the net.IP type assertion returns ok=false for nil.
    userIP, ok := ctx.Value(userIPKey).(net.IP)
    return userIP, ok
}

 

前面铺地了这么多。

确实,通过引入Context包,一个request范围内所有goroutine运行时的取消可以得到有效的控制。但是这种解决方式却不够优雅。

一旦代码中某处用到了Context,传递Context变量(通常作为函数的第一个参数)会像病毒一样蔓延在各处调用它的地方。比如在一个request中实现数据库事务或者分布式日志记录,创建的context,会作为参数传递到任何有数据库操作或日志记录需求的函数代码处。即每一个相关函数都必须增加一个context.Context类型的参数,且作为第一个参数,这对无关代码完全是侵入式的。

更多详细内容可参见:Michal Strba 的context-should-go-away-go2文章

Google Group上的讨论可移步这里

 

Context机制最核心的功能是在goroutine之间传递cancel信号,但是它的实现是不完全的。

Cancel可以细分为主动与被动两种,通过传递context参数,让调用goroutine可以主动cancel被调用goroutine。但是如何得知被调用goroutine什么时候执行完毕,这部分Context机制是没有实现的。而现实中的确又有一些这样的场景,比如一个组装数据的goroutine必须等待其他goroutine完成才可开始执行,这是context明显不够用了,必须借助sync.WaitGroup。

func serve(l net.Listener) error {
        var wg sync.WaitGroup
        var conn net.Conn
        var err error
        for {
                conn, err = l.Accept()
                if err != nil {
                        break
                }
                wg.Add(1)
                go func(c net.Conn) {
                        defer wg.Done()
                        handle(c)
                }(conn)
        }
        wg.Wait()
        return err
}

 

版权声明:本文为sunlong88原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/sunlong88/p/11272559.html