控制并发有两种经典的方式,一种是WaitGroup,另外一种就是Context,当然还可以简单的直接用channel通知。
waitgroup
WaitGroup以前我们在并发的时候介绍过,它是一种控制并发的方式,它的这种方式是控制多个goroutine同时完成。
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
time.Sleep(2*time.Second)
fmt.Println("1号完成")
wg.Done()
}()
go func() {
time.Sleep(2*time.Second)
fmt.Println("2号完成")
wg.Done()
}()
wg.Wait()
fmt.Println("好了,大家都干完了,放工")
}
一个很简单的例子,一定要例子中的2个goroutine同时做完,才算是完成,先做好的就要等着其他未完成的,所有的goroutine要都全部完成才可以。
这是一种控制并发的方式,这种尤其适用于好多个goroutine协同做一件事情的时候,因为每个goroutine做的都是这件事情的一部分,只有全部的goroutine都完成,这件事情才算是完成,这是等待的方式。
这边说一下wg的传输,像上面这种事全局的,直接使用就好,但是很多时候wg的局部变量需要传输使用,这个时候需要传输地址
package main
import (
"fmt"
"sync"
"time"
)
func dosomething(millisecs time.Duration, wg *sync.WaitGroup) {
duration := millisecs * time.Millisecond
time.Sleep(duration)
fmt.Println("Function in background, duration:", duration)
wg.Done()
}
func main() {
var wg sync.WaitGroup
wg.Add(4)
go dosomething(200, &wg)
go dosomething(400, &wg)
go dosomething(150, &wg)
go dosomething(600, &wg)
wg.Wait()
fmt.Println("Done")
}
在实际的业务种,我们可能会有这么一种场景:需要我们主动的通知某一个goroutine结束。比如我们开启一个后台goroutine一直做事情,比如监控,现在不需要了,就需要通知这个监控goroutine结束,不然它会一直跑,就泄漏了,这个时候我们就需要使用chan了。
chan通知
我们都知道一个goroutine启动后,我们是无法控制他的,大部分情况是等待它自己结束,那么如果这个goroutine是一个不会自己结束的后台goroutine呢?比如监控等,会一直运行的。
这种情况,一种傻瓜式的办法是全局变量,其他地方通过修改这个变量完成结束通知,然后后台goroutine不停的检查这个变量,如果发现被通知关闭了,就自我结束。
这种方式也可以,但是首先我们要保证这个变量在多线程下的安全,基于此,有一种更好的方式:chan + select 。
func main() {
stop := make(chan bool)
go func() {
for {
select {
case <-stop:
fmt.Println("监控退出,停止了...")
return
default:
fmt.Println("goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}()
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
stop<- true
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
例子中我们定义一个stop的chan,通知他结束后台goroutine。实现也非常简单,在后台goroutine中,使用select判断stop是否可以接收到值,如果可以接收到,就表示可以退出停止了;如果没有接收到,就会执行default里的监控逻辑,继续监控,只到收到stop的通知。
有了以上的逻辑,我们就可以在其他goroutine种,给stop chan发送值了,例子中是在main goroutine中发送的,控制让这个监控的goroutine结束。
发送了stop<- true结束的指令后,我这里使用time.Sleep(5 * time.Second)故意停顿5秒来检测我们结束监控goroutine是否成功。如果成功的话,不会再有goroutine监控中…的输出了;如果没有成功,监控goroutine就会继续打印goroutine监控中…输出。这其实是一种异步的思想,可以好好琢磨对比一下。
这种chan+select的方式,是比较优雅的结束一个goroutine的方式,不过这种方式也有局限性,如果有很多goroutine都需要控制结束怎么办呢?如果这些goroutine又衍生了其他更多的goroutine怎么办呢?如果一层层的无穷尽的goroutine呢?这就非常复杂了,即使我们定义很多chan也很难解决这个问题,因为goroutine的关系链就导致了这种场景非常复杂。
初识Context
上面说的这种场景是存在的,比如一个网络请求Request,每个Request都需要开启一个goroutine做一些事情,这些goroutine又可能会开启其他的goroutine。所以我们需要一种可以跟踪goroutine的方案,才可以达到控制他们的目的,这就是Go语言为我们提供的Context,称之为上下文非常贴切,它就是goroutine的上下文。
下面我们就使用Go Context重写上面的示例。
func main() {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("监控退出,停止了...")
return
default:
fmt.Println("goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}(ctx)
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
重写比较简单,就是把原来的chan stop 换成Context,使用Context跟踪goroutine,以便进行控制,比如结束等。
context.Background() 返回一个空的Context,这个空的Context一般用于整个Context树的根节点。然后我们使用context.WithCancel(parent)函数,创建一个可取消的子Context,然后当作参数传给goroutine使用,这样就可以使用这个子Context跟踪这个goroutine。
在goroutine中,使用select调用<-ctx.Done()判断是否要结束,如果接受到值的话,就可以返回结束goroutine了;如果接收不到,就会继续进行监控。
那么是如何发送结束指令的呢?这就是示例中的cancel函数啦,它是我们调用context.WithCancel(parent)函数生成子Context的时候返回的,第二个返回值就是这个取消函数,它是CancelFunc类型的。我们调用它就可以发出取消指令,然后我们的监控goroutine就会收到信号,就会返回结束。
Context控制多个goroutine
使用Context控制一个goroutine的例子如上,非常简单,下面我们看看控制多个goroutine的例子,其实也比较简单。
func main() {
ctx, cancel := context.WithCancel(context.Background())
go watch(ctx,"【监控1】")
go watch(ctx,"【监控2】")
go watch(ctx,"【监控3】")
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
func watch(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Println(name,"监控退出,停止了...")
return
default:
fmt.Println(name,"goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}
示例中启动了3个监控goroutine进行不断的监控,每一个都使用了Context进行跟踪,当我们使用cancel函数通知取消时,这3个goroutine都会被结束。这就是Context的控制能力,它就像一个控制器一样,按下开关后,所有基于这个Context或者衍生的子Context都会收到通知,这时就可以进行清理操作了,最终释放goroutine,这就优雅的解决了goroutine启动后不可控的问题。
Context接口
Context的接口定义的比较简洁,我们看下这个接口的方法。
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
这个接口共有4个方法,了解这些方法的意思非常重要,这样我们才可以更好的使用他们。
Deadline方法是获取设置的截止时间的意思,第一个返回式是截止时间,到了这个时间点,Context会自动发起取消请求;第二个返回值ok==false时表示没有设置截止时间,如果需要取消的话,需要调用取消函数进行取消。
Done方法返回一个只读的chan,类型为struct{},我们在goroutine中,如果该方法返回的chan可以读取,则意味着parent context已经发起了取消请求,我们通过Done方法收到这个信号后,就应该做清理操作,然后退出goroutine,释放资源。
Err方法返回取消的错误原因,因为什么Context被取消。
Value方法获取该Context上绑定的值,是一个键值对,所以要通过一个Key才可以获取对应的值,这个值一般是线程安全的。
以上四个方法中常用的就是Done了,如果Context取消的时候,我们就可以得到一个关闭的chan,关闭的chan是可以读取的,所以只要可以读取的时候,就意味着收到Context取消的信号了,以下是这个方法的经典用法。
func Stream(ctx context.Context, out chan<- Value) error {
for {
v, err := DoSomething(ctx)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case out <- v:
}
}
}
Context接口并不需要我们实现,Go内置已经帮我们实现了2个,我们代码中最开始都是以这两个内置的作为最顶层的partent context,衍生出更多的子Context。
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
一个是Background,主要用于main函数、初始化以及测试代码中,作为Context这个树结构的最顶层的Context,也就是根Context。
一个是TODO,它目前还不知道具体的使用场景,如果我们不知道该使用什么Context的时候,可以使用这个。
他们两个本质上都是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context。
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
这就是emptyCtx实现Context接口的方法,可以看到,这些方法什么都没做,返回的都是nil或者零值。
Context的继承衍生
有了如上的根Context,那么是如何衍生更多的子Context的呢?这就要靠context包为我们提供的With系列的函数了。
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
这四个With函数,接收的都有一个partent参数,就是父Context,我们要基于这个父Context创建出子Context的意思,这种方式可以理解为子Context对父Context的继承,也可以理解为基于父Context的衍生。
通过这些函数,就创建了一颗Context树,树的每个节点都可以有任意多个子节点,节点层级可以有任意多个。
WithCancel函数,传递一个父Context作为参数,返回子Context,以及一个取消函数用来取消Context。
WithDeadline函数,和WithCancel差不多,它会多传递一个截止时间参数,意味着到了这个时间点,会自动取消Context,当然我们也可以不等到这个时候,可以提前通过取消函数进行取消。
WithTimeout和WithDeadline基本上一样,这个表示是超时自动取消,是多少时间后自动取消Context的意思。
WithValue函数和取消Context无关,它是为了生成一个绑定了一个键值对数据的Context,这个绑定的数据可以通过Context.Value方法访问到,后面我们会专门讲。
大家可能留意到,前三个函数都返回一个取消函数CancelFunc,这是一个函数类型,它的定义非常简单。
type CancelFunc func()
这就是取消函数的类型,该函数可以取消一个Context,以及这个节点Context下所有的所有的Context,不管有多少层级。
WithValue传递元数据
通过Context我们也可以传递一些必须的元数据,这些数据会附加在Context上以供使用。
var key string="name"
func main() {
ctx, cancel := context.WithCancel(context.Background())
//附加值
valueCtx:=context.WithValue(ctx,key,"【监控1】")
go watch(valueCtx)
time.Sleep(10 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(5 * time.Second)
}
func watch(ctx context.Context) {
for {
select {
case <-ctx.Done():
//取出值
fmt.Println(ctx.Value(key),"监控退出,停止了...")
return
default:
//取出值
fmt.Println(ctx.Value(key),"goroutine监控中...")
time.Sleep(2 * time.Second)
}
}
}
在前面的例子,我们通过传递参数的方式,把name的值传递给监控函数。在这个例子里,我们实现一样的效果,但是通过的是Context的Value的方式。
我们可以使用context.WithValue方法附加一对K-V的键值对,这里Key必须是等价性的,也就是具有可比性;Value值要是线程安全的。
这样我们就生成了一个新的Context,这个新的Context带有这个键值对,在使用的时候,可以通过Value方法读取ctx.Value(key)。
记住,使用WithValue传值,一般是必须的值,不要什么值都传递。
Context 使用原则
- 不要把Context放在结构体中,要以参数的方式传递
- 以Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。命名为ctx。 func DoSomething(ctx context.Context,arg Arg)error { // … use ctx … }
- 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO
- Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递,不要用它来传递一些可选的参数
- Context是线程安全的,可以放心的在多个goroutine中传递,相同的 Context 可以传递给在不同的goroutine;
应用场景
在 Go http 包的 Server 中,每一个请求在都有一个对应的goroutine去处理。请求处理函数通常会启动额外的goroutine用来访问后端服务,比如数据库和 RPC 服务。用来处理一个请求的goroutine通常需要访问一些与请求特定的数据,比如终端用户的身份认证信息、验证相关的 token、请求的截止时间。当一个请求被取消或超时时,所有用来处理该请求的goroutine都应该迅速退出,然后系统才能释放这些goroutine占用的资源。
注意:go1.6及之前版本请使用golang.org/x/net/context。go1.7及之后已移到标准库context。
实战
WithCancel 例子
WithCancel 以一个新的 Done channel 返回一个父 Context 的拷贝。
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{
Context: parent,
done: make(chan struct{}),
}
}
此示例演示使用一个可取消的上下文,以防止 goroutine 泄漏。示例函数结束时,defer 调用 cancel 方法,gen goroutine 将返回而不泄漏。
package main
import (
"context"
"fmt"
)
func main() {
// gen generates integers in a separate goroutine and
// sends them to the returned channel.
// The callers of gen need to cancel the context once
// they are done consuming generated integers not to leak
// the internal goroutine started by gen.
gen := func(ctx context.Context) <-chan int {
dst := make(chan int)
n := 1
go func() {
for {
select {
case <-ctx.Done():
return // returning not to leak the goroutine
case dst <- n:
n++
}
}
}()
return dst
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel when we are finished consuming integers
for n := range gen(ctx) {
fmt.Println(n)
if n == 5 {
break
}
}
}
WithDeadline 例子
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: deadline,
}
可以清晰的看到,当派生出的子 Context 的deadline在父Context之后,直接返回了一个父Context的拷贝。故语义上等效为父。
WithDeadline 的最后期限调整为不晚于 d 返回父上下文的副本。如果父母的截止日期已经早于 d,WithDeadline (父,d) 是在语义上等效为父。返回的上下文完成的通道关闭的最后期限期满后,返回的取消函数调用时,或当父上下文完成的通道关闭,以先发生者为准。
看看官方例子:
package main
import (
"context"
"fmt"
"time"
)
func main() {
d := time.Now().Add(50 * time.Millisecond)
ctx, cancel := context.WithDeadline(context.Background(), d)
// Even though ctx will be expired, it is good practice to call its
// cancelation function in any case. Failure to do so may keep the
// context and its parent alive longer than necessary.
defer cancel()
select {
case <-time.After(1 * time.Second):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err())
}
}
WithTimeout 例子
WithTimeout 返回 WithDeadline(parent, time.Now().Add(timeout))。
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
看看官方例子:
package main
import (
"context"
"fmt"
"time"
)
func main() {
// Pass a context with a timeout to tell a blocking function that it
// should abandon its work after the timeout elapses.
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
select {
case <-time.After(1 * time.Second):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err()) // prints "context deadline exceeded"
}
}
自己写的例子
func Requset(ctx context.Context,ip string,w http.ResponseWriter){
c := make(chan string, 1024)
go func(ip string){
cfg := g.GlbServerCfg.Conf
url := "http://" + ip + ":" + strconv.Itoa(cfg.PromesPort) + "/-/reload"
log.Debug("Reload %s", ip)
client := &http.Client{
Timeout: time.Duration(cfg.TimeOut)*time.Second,
}
request, err := http.NewRequest("POST",url,nil)
if err != nil {
log.Error("New Request Error ",err)
}
resp, err := client.Do(request)
defer resp.Body.Close()
//resp, err := http.Post(url, "", nil)
if err != nil || resp.StatusCode != 200 {
log.Error("Http Post Faild : %s, Status Code: %d", err, resp.StatusCode)
c <- string(ip + " Reload Faild\n")
}
log.Debug("Http Post Successd , Status Code: : %d", resp.StatusCode)
c <- string(ip + " Reload Success\n")
}(ip)
for {
select {
case <-ctx.Done():
w.Write([]byte(ip + " Reload TimeOut\n"))
return
case response := <-c:
w.Write([]byte(response))
return
}
}
}
WithValue 例子
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflect.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
WithValue 返回的父与键关联的值在 val 的副本。
使用上下文值仅为过渡进程和 Api 的请求范围的数据,而不是将可选参数传递给函数。
提供的键必须是可比性和应该不是字符串类型或任何其他内置的类型以避免包使用的上下文之间的碰撞。WithValue 用户应该定义自己的键的类型。为了避免分配分配给接口 {} 时,上下文键经常有具体类型结构 {}。另外,导出的上下文关键变量静态类型应该是一个指针或接口。
看看官方例子:
package main
import (
"context"
"fmt"
)
func main() {
type favContextKey string
f := func(ctx context.Context, k favContextKey) {
if v := ctx.Value(k); v != nil {
fmt.Println("found value:", v)
return
}
fmt.Println("key not found:", k)
}
k := favContextKey("language")
ctx := context.WithValue(context.Background(), k, "Go")
f(ctx, k)
f(ctx, favContextKey("color"))
}