sync包提供了基本的同步基元,如锁,WaitGroup、Once 和 Cond等同步原语。除了Once和WaitGroup类型,大部分都是适用于普通程序线程,大型并发同步使用channel通信(csp)更好一些。

sync

sync同步功能主要提供了once,mutex,cond,并发安全map,安全并发pool,waitgroup。

sync.Once

sync.Once是一个简单而强大的原语,可确保一个函数仅执行一次。

定义

type Once struct {
        // 非暴露字段
}

func (o *Once) Do(f func())

使用前先定义 Once 类型变量:

var once Once

使用的时候向 Once 类型变量传入函数:

once.Do(func() { init() })

多次调用 once.Do(f) 只会触发一次 f 的执行,即第一次 f 的执行。

用法实例

某些操作只需要执行一次(比如一些初始化动作),这时就可使用 Once,如:

func main() {
    var once sync.Once
    onceBody := func() {
        fmt.Println("Only once")
    }
    done := make(chan bool)

    // 创建 10 个 goroutine,但是 onceBody 只会执行 1 次
    for i := 0; i < 10; i++ {
        go func() {
            once.Do(onceBody)
            done <- true
        }()
    }

    // 等待 10 个 goroutine 结束
    for i := 0; i < 10; i++ {
        <-done
    }
}

其实 Once 的实现非常简单,就是互斥锁+原子变量

sync.WaitGroup

sync.WaitGroup拥有一个内部计数器。当计数器等于0时,则Wait()方法会立即返回。否则它将阻塞执行Wait()方法的goroutine直到计数器等于0时为止。

WaitGroup 的使用

type WaitGroup struct {
    // 包含隐藏或非导出字段
}

WaitGroup用于等待一组线程的结束。父线程调用Add方法来设定应等待的线程的数量。每个被等待的线程在结束时应调用Done方法。同时,主线程里可以调用Wait方法阻塞至所有线程结束。

func (*WaitGroup) Add
func (wg *WaitGroup) Add(delta int)

Add方法向内部计数加上delta,delta可以是负数;如果内部计数器变为0,Wait方法阻塞等待的所有线程都会释放,如果计数器小于0,方法panic。注意Add加上正数的调用应在Wait之前,否则Wait可能只会等待很少的线程。一般来说本方法应在创建新的线程或者其他应等待的事件之前调用。

func (*WaitGroup) Done
func (wg *WaitGroup) Done()

Done方法减少WaitGroup计数器的值,应在线程的最后执行。

func (*WaitGroup) Wait
func (wg *WaitGroup) Wait()

Wait方法阻塞直到WaitGroup计数器减为0。

sync.Pool

sync.Pool是一个并发池,负责安全地保存一组对象。

定义

type Pool struct {
        // 当 Get() 找不到一个对象时,会使用 New() 生成一个对象
        New func() interface{}

        // 剩下的是非暴露字段
}

// 任意从 pool 中挑选一个对象返回给客户端,如果找不到就使用 p.New 生成
func (p *pool) Get() interface{}

// 将对象 x 放回到 pool 中
func (p *pool) Put(x interface{})

使用 Pool 可以用以管理一些临时对象供多个 package 的客户端使用,客户端对 Pool 的逻辑是无感知的:需要的时候 Get,不需要的时候 Put,而且 Pool 可根据当前负载自动调整对象池的大小。

一个典型的应用是日志,如:

var bufPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)
    },
}

func Log(w io.Writer, key, val string) {
    // 从对象池中获取 buffer
    b := bufPool.Get().(*bytes.Buffer)
    b.Reset()
    b.WriteString(time.Now().Format(time.RFC3339))
    b.WriteByte(' ')
    b.WriteString(key)
    b.WriteByte('=')
    b.WriteString(val)
    w.Write(b.Bytes())
    // 使用完毕,归还 buffer
    bufPool.Put(b)
}

func main() {
    Log(os.Stdout, "path", "/search?q=flowers")
}

那么什么时候使用sync.Pool?

  • 第一个是当我们必须重用共享的和长期存在的对象(例如,数据库连接)时。
  • 第二个是用于优化内存分配。

sync.Mutex

sync.Mutex可能是sync包中使用最广泛的原语。它允许在共享资源上互斥访问(不能同时访问),在并发安全中有着很重要的作用。

mutex := &sync.Mutex{}

mutex.Lock()
// Update共享变量 (比如切片,结构体指针等)
mutex.Unlock()

sync.RWMutex是一个读写互斥锁,它提供了我们上面的刚刚看到的sync.Mutex的Lock和UnLock方法(因为这两个结构都实现了sync.Locker接口)。但是,它还允许使用RLock和RUnlock方法进行并发读取:

mutex := &sync.RWMutex{}

mutex.Lock()
// Update 共享变量
mutex.Unlock()

mutex.RLock()
// Read 共享变量
mutex.RUnlock()

只有在频繁读取和不频繁写入的场景里,才应该使用sync.RWMutex。

sync.Cond

sync.Cond可能是sync包提供的同步原语中最不常用的一个,它用于发出信号(一对一)或广播信号(一对多)到goroutine。

条件变量做的事情很简单:让多个 goroutine 等待在某个条件上,如果条件不满足,进入等待状态;如果条件满足,继续运行。

Cond 内部维护着一个 notifyList,当条件不满足的时候,则将对应的 goroutine 添加到列表上然后进入等待状态。当条件满足时,一般会有其他执行者显式使用 Signal() 或者 Broadcast() 去唤醒 notifyList 上 goroutine。

当进行条件的判断时,必须使用互斥锁来保证条件的安全,即在判断的时候条件没有被其他人修改。所以 Cond 一般会与一个符合 Lock 接口的 Mutex 一起使用。

定义

type Cond struct {
    // 读写条件状态需要加锁
    L Locker
    // 剩下的是非暴露字段
}

func NewCond(l Locker) *Cond

// 广播所以等待的 goroutine,条件已经满足
func (c *Cond) Broadcast()

// 单播其中一个等待的 goroutine,条件已经满足
func (c *Cond) Signal()

// 如果条件不满足,调用 Wait() 进入等待状态
func (c *Cond) Wait()
此处要特别小心 Wait() 的使用。正如前文所说,条件的判断需要使用互斥锁来确保条件读取前后是一致的,即:

    c.L.Lock() // 进行条件判断,加锁
    if !condition() { // 如果不满足条件,进入 if 中
        c.Wait() // Wait() 内部会自动解锁
    }

    ... 这里可能会对 condition 作出改变 ...
    c.L.Unlock()

上述代码其实还有一个很严重的问题,为了说明这个问题,让我们来看看 Wait() 的实现:

func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify) // 加入 notifyList
    c.L.Unlock() // 解锁
    runtime_notifyListWait(&c.notify, t) // 进入等待模式
    c.L.Lock() // 运行到此处说明条件已经满足,开始获取互斥锁,如果锁已经被别人用了,开始等待
}

从上面的例子可以看出,当 Wait() 返回时(即已经获取到了互斥锁),有可能条件已经被其他先获取互斥锁的 goroutine 改变了,所以此时必须再次判断一下条件,即:

c.L.Lock() // 进行条件判断,加锁
if !condition() { // 如果不满足条件,进入 if 中
    c.Wait() // Wait() 内部会自动解锁
}

if !condition() { // 如果不满足条件,进入 if 中
    c.Wait() // Wait() 内部会自动解锁
}
... 这里可能会对 condition 作出改变 ...
c.L.Unlock()

如果代码这么写,就太费劲了,上面代码可以简化为:

c.L.Lock() // 进行条件判断,加锁
for !condition() { // 如果不满足条件,进入 if 中
    c.Wait() // Wait() 内部会自动解锁
}

... 这里可能会对 condition 作出改变 ...
c.L.Unlock()

即将 if 替换为 for,从而当从 Wait() 返回时,再次判断条件是否满足。

用法实例

用一个简单的例子来介绍一下 Cond 如何使用,即:

var (
    wakeup    = false
    workerNum = 3
)

func worker(workerID int, c *sync.Cond) {
    fmt.Printf("Worker [%d] is RUNNING\n", workerID)
    c.L.Lock()
    for !wakeup {
        fmt.Printf("Worker [%d] check conditon\n", workerID)
        c.Wait()
    }
    fmt.Printf("Worker [%d] wakeup, DO something\n", workerID)
    // 将唤醒标志改为 false
    // 此时其他已经醒来并抢夺互斥锁的 goroutine 重新判断条件后
    // 将再次进入 wait 状态
    wakeup = false 
    c.L.Unlock()
}

func main() {
    cond := sync.NewCond(&sync.Mutex{})
    for i := 0; i < workerNum; i++ {
        go worker(i, cond)
    }

    time.Sleep(2 * time.Second)
    wakeup = true
    cond.Broadcast() // 向所有 goroutine 进行广播,条件已经满足,即 wakeup = true

    time.Sleep(2 * time.Second)
}

执行后的输出为:

Worker [0] is RUNNING
Worker [1] is RUNNING
Worker [0] check conditon
Worker [1] check conditon
Worker [2] is RUNNING
Worker [2] check conditon
Worker [0] wakeup, DO something
Worker [1] check conditon
Worker [2] check conditon

当 worker0 醒来后,又重新把条件变量进行了修改,从而导致 worker1 和 worker2 获取到互斥锁后重新检查到条件不满足,再次进入 wait 状态。

map

这是一个重点的数据结构,实现十分经典,具体看map

sync/atomic

atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。支持类型共有六种:int32, int64, uint32, uint64, uintptr, unsafe.Pinter,实现的操作共五种:增减,比较并交换,载入,存储,交换。

增或减

顾名思义,原子增或减即可实现对被操作值的增大或减少。因此该操作只能操作数值类型。    被用于进行增或减的原子操作都是以“Add”为前缀,并后面跟针对具体类型的名称。

//方法源码
func AddUint32(addr *uint32, delta uint32) (new uint32)

func AddInt64(addr *int64, delta int64) (new int64)
AddInt64原子性的将val的值添加到*addr并返回新值。

栗子:(在原来的基础上加n)
atomic.AddUint32(&addr,n)

栗子:(在原来的基础上加n(n为负数))
atomic.AddUint32(*addr,uint32(int32(n)))
//或
atomic.AddUint32(&addr,^uint32(-n-1))

比较并交换

比较并交换—-Compare And Swap 简称CAS

他是假设被操作的值未曾被改变(即与旧值相等),并一旦确定这个假设的真实性就立即进行值替换,这是典型的乐观实现思想。

如果想安全的并发一些类型的值,我们总是应该优先使用CAS

//方法源码
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

栗子:(如果addr和old相同,就用new代替addr)

ok:=atomic.CompareAndSwapInt32(&addr,old,new)

载入

如果一个写操作未完成,有一个读操作就已经发生了,这样读操作使很糟糕的。

为了原子的读取某个值sync/atomic代码包同样为我们提供了一系列的函数。这些函数都以”Load”为前缀,意为载入。

//方法源码
func LoadInt32(addr *int32) (val int32)

栗子

fun addValue(delta int32){
    for{
        v:=atomic.LoadInt32(&addr)
        if atomic.CompareAndSwapInt32(&v,addr,(delta+v)){
            break;
        }
    }
}

存储

   与读操作对应的是写入操作,sync/atomic也提供了与原子的值载入函数相对应的原子的值存储函数。这些函数的名称均以“Store”为前缀   

在原子的存储某个值的过程中,任何cpu都不会进行针对进行同一个值的读或写操作。如果我们把所有针对此值的写操作都改为原子操作,那么就不会出现针对此值的读操作读操作因被并发的进行而读到修改了一半的情况。    原子操作总会成功,因为他不必关心被操作值的旧值是什么。

//方法源码
func StoreInt32(addr *int32, val int32)

栗子

atomic.StoreInt32(被操作值的指针,新值)
atomic.StoreInt32(&value,newaddr)

交换

   原子交换操作,这类函数的名称都以“Swap”为前缀,与CAS不同,交换操作直接赋予新值,不管旧值。    会返回旧值

//方法源码
func SwapInt32(addr *int32, new int32) (old int32)

栗子

atomic.SwapInt32(被操作值的指针,新值)(返回旧值)
oldval:=atomic.StoreInt32(&value,newaddr)