并发实践

Go 的开发者极力推荐使用 Channel,不过,这两年,大家意识到,Channel 并不是处理并发问题的“银弹”,有时候使用并发原语更简单,而且不容易出错。

所以,我给你提供一套选择的方法:

  • 共享资源的并发访问使用传统并发原语;
  • 复杂的任务编排和消息传递使用 Channel;
  • 需要和 Select 语句结合,使用 Channel;
  • 需要和超时配合时,使用 Channel 和 Context;
  • 简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
  • 消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;

分段锁ConcurrentMap,map+读写锁,sync.map的效率测试

原文:go 分段锁ConcurrentMap,map+读写锁,sync.map的效率测试

效率测试结论:

  1. go 自带的 map 不是多协程安全的
  2. 分段锁 ConcurrentMap 是多协程安全的,且效率最高;sync.map 效率次之,传统的 map + 读写锁效率最低
  3. ConcurrentMap 中锁的个数越多,效率越高,因为争夺同一把锁的概率降低了

go sync.map 源码分析

原文:go sync.map 源码分析

go 并发安全map 分段锁实现

原文:go 并发安全map 分段锁实现

并行计算并指定key存储结果案例

Golang 在运行时会在 map 的增改删查过程中检测是否有并发读写的情况,当发现并发读写时直接异常退出。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package main

import (
    "fmt"
    "sync"
    "time"
)

//题目
//如果我们列出10以下所有能够被3或者5整除的自然数,那么我们得到的是3,5,6和9。这四个数的和是23。
//那么请计算1000以下(不包括1000)的所有能够被3或者5整除的自然数的和。
//
//这个题目的一个思路就是:
//
//(1) 先计算1000以下所有能够被3整除的整数的和A,
//(2) 然后计算1000以下所有能够被5整除的整数和B,
//(3) 然后再计算1000以下所有能够被3和5整除的整数和C,
//(4) 使用A+B-C就得到了最后的结果。

func main() {
    var sum sync.Map
    wg := sync.WaitGroup{}

    limit := 1000 // 最大数
    startTime := time.Now()

    divider3 := 3
    divider5 := 5
    divider15 := 15

    wg.Add(3)
    go get_sum_of_divisible(limit, divider3, &sum, &wg)
    go get_sum_of_divisible(limit, divider5, &sum, &wg)
    go get_sum_of_divisible(limit, divider15, &sum, &wg)
    wg.Wait() // 此处阻塞,等所有协程跑完才会执行下面的代码

    sum3, ok := sum.Load(divider3)
    if !ok {
        fmt.Println("数据异常,没获取到key:", divider3)
        return
    }

    sum5, ok := sum.Load(divider5)
    if !ok {
        fmt.Println("数据异常,没获取到key:", divider5)
        return
    }

    sum15, ok := sum.Load(divider15)
    if !ok {
        fmt.Println("数据异常,没获取到key:", divider15)
        return
    }

    total := sum3.(int) + sum5.(int) - sum15.(int)

    endTime := time.Now()
    fmt.Println("耗时:", endTime.Sub(startTime))
    fmt.Println("计算结果:", total)
}

func get_sum_of_divisible(num int, divider int, sum *sync.Map, wg *sync.WaitGroup) {
    defer wg.Done()
    total := 0
    for value := 0; value < num; value++ {
        if value%divider == 0 {
            total += value
        }
    }
    sum.Store(divider, total)
}

互斥锁的执行顺序

如果 Mutex 已经被一个 goroutine 获取了锁,其它等待中的 goroutine 们只能一直等待。那么,等这个锁释放后,等待中的 goroutine 中哪一个会优先获取 Mutex 呢?

等待的goroutine们是以FIFO排队的

  1. 当Mutex处于正常模式时,若此时没有新goroutine与队头goroutine竞争,则队头goroutine获得,若有新goroutine竞争则大概率新goroutine获得。

  2. 当队头goroutine竞争锁失败1ms后,它会将Mutex调整为饥饿模式。进入饥饿模式后,锁的所有权会直接从解锁goroutine移交给队头goroutine,此时新来的goroutine直接放入队尾。

  3. 当一个goroutine获取锁后,如果发现自己满足下列条件中的任何一个,则将锁切换回正常模式

    • 它是队列中最后一个
    • 它等待锁的时间少于1ms

golang.org/x/sync

Go 官方扩展库

SingleFlight

SingleFlight 是 Go 开发组提供的一个扩展并发原语。它的作用是,在处理多个 goroutine 同时调用同一个函数的时候,只让一个 goroutine 去调用这个函数,等到这个 goroutine 返回结果的时候,再把结果返回给这几个同时调用的 goroutine,这样可以减少并发调用的数量。

使用场景: 如防缓存击穿、减少并发读库以提高性能、减少并发写相同数据以提高性能。

标准库中的 sync.Once 也可以保证并发的 goroutine 只会执行一次函数 f,那么,SingleFlight 和 sync.Once 有什么区别呢?

其实,sync.Once 不是只在并发的时候保证只有一个 goroutine 执行函数 f,而是会保证永远只执行一次,而 SingleFlight 是每次调用都重新执行,并且在多个请求同时调用的时候只有一个执行。它们两个面对的场景是不同的,sync.Once 主要是用在单次初始化场景中,而 SingleFlight 主要用在合并并发请求的场景中,尤其是缓存场景。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main

import (
    "fmt"
    "golang.org/x/sync/singleflight"
    "sync"
    "sync/atomic"
)

// 防缓存击穿代码示例

var count = int64(0)

func a() (interface{}, error) {
    //time.Sleep(time.Millisecond * 500) // 去掉注释可查看高并发取值情况
    return atomic.AddInt64(&count, 1), nil
}

func main() {
    g := singleflight.Group{}

    wg := sync.WaitGroup{}

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(j int) {
            defer wg.Done()

            // 直接返回结果集
            val, err, shared := g.Do("cache_key_name", a) // Do方法执行一个函数,并返回函数执行的结果。你需要提供一个 key,对于同一个 key,在同一时间只有一个在执行,同一个 key 并发的请求会等待。第一个执行的请求返回的结果,就是它的返回结果。函数 fn 是一个无参的函数,返回一个结果或者 error,而 Do 方法会返回函数执行的结果或者是 error,shared 会指示运行结果是否返回给多个请求。
            if err != nil {
                fmt.Println(err)
                return
            }
            fmt.Printf("index: %d, val: %d, shared: %v\n", j, val, shared)

            // 返回值为 channel 方式
            //result := g.DoChan("cache_key_name2", a)
            //ret, _ := <-result
            //fmt.Printf("channel->index: %d, val: %d, shared: %v\n", j, ret.Val, ret.Shared)
        }(i)
    }

    wg.Wait()
}

semaphore

信号量(Semaphore)是用来控制多个 goroutine 同时访问多个资源的并发原语。(实际上,我们还有很多方法实现信号量,比较典型的就是使用 Channel 来实现。)

信号量是按照先入先出的方式唤醒调用者。当释放 100 个资源的时候,如果第一个等待者需要 101 个资源,那么,队列中的所有等待者都会继续等待,即使有的等待者只需要 1 个资源。这样做的目的是避免饥饿,否则的话,资源可能总是被那些请求资源数小的调用者获取,这样一来,请求资源数巨大的调用者,就没有机会获得资源了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 使用信号量限制 goroutine 并发数
func main() {
    var (
        semaWeight int64 = 3  // 最多同时运行 n 个 goroutine
        total            = 20 // 总共需要执行的协程数量
        sema             = semaphore.NewWeighted(semaWeight)
        ctx              = context.Background()
    )

    for i := 0; i < total; i++ {
        if err := sema.Acquire(ctx, 1); err != nil {
            log.Printf("sema acquire err: %v\n", err)
            continue
        }

        go func(i int) {
            log.Printf("go func: %d, time: %d\n", i, time.Now().Unix())
            time.Sleep(time.Second)
            sema.Release(1)
        }(i)
    }

    if err := sema.Acquire(ctx, semaWeight); err != nil{
        log.Printf("sema acquire err: %v\n", err)
    }
}

errgroup

处理任务编排:分组执行一批相同的或类似的任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func main() {
    g := errgroup.Group{}
    result := make([]error, 3) // 保存成功或者失败的结果

    g.Go(func() error {
        time.Sleep(1*time.Second)
        fmt.Println("exec #1")
        result[0] = nil
        return nil
    })

    g.Go(func() error {
        time.Sleep(3*time.Second)
        fmt.Println("exec #2")
        result[1] = errors.New("failed to exec #2")
        return result[1]
    })

    g.Go(func() error {
        time.Sleep(5 * time.Second)
        fmt.Println("exec #3")
        result[2] = errors.New("failed to exec #3")
        return result[2]
    })

    if err := g.Wait(); err != nil {
        fmt.Printf("failed: %v\n", result)
    } else {
        fmt.Println("successfully exec all")
    }
}

循环栅栏 CyclicBarrier

github.com/marusama/cyclicbarrier

CyclicBarrier允许一组 goroutine 彼此等待,到达一个共同的执行点。同时,因为它可以被重复使用,所以叫循环栅栏。具体的机制是,大家都在栅栏前等待,等全部都到齐了,就抬起栅栏放行。

CyclicBarrier 和 WaitGroup 的功能有点类似,但使用场景不同。CyclicBarrier 和 WaitGroup 各自的使用场景:

  • 等待同一组 goroutine 同时执行:CyclicBarrier 更适合用在“固定数量的 goroutine 等待同一个执行点”的场景中,而且在放行 goroutine 之后,CyclicBarrier 可以重复利用。
  • 等待同一组 goroutine 都完成:WaitGroup 更适合用在“一个 goroutine 等待一组 goroutine 到达同一个执行点”的场景中,或者是不需要重用的场景中。

并发趣题:一氧化二氢制造工厂

有一个名叫大自然的搬运工的工厂,生产一种叫做一氧化二氢的神秘液体。这种液体的分子是由一个氧原子和两个氢原子组成的,也就是水。

这个工厂有多条生产线,每条生产线负责生产氧原子或者是氢原子,每条生产线由一个 goroutine 负责。

这些生产线会通过一个栅栏,只有一个氧原子生产线和两个氢原子生产线都准备好,才能生成出一个水分子,否则所有的生产线都会处于等待状态。也就是说,一个水分子必须由三个不同的生产线提供原子,而且水分子是一个一个按照顺序产生的,每生产一个水分子,就会打印出 HHO、HOH、OHH 三种形式的其中一种。HHH、OOH、OHO、HOO、OOO 都是不允许的。

生产线中氢原子的生产线为 2N 条,氧原子的生产线为 N 条。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package water
import (
  "context"
  "github.com/marusama/cyclicbarrier"
  "golang.org/x/sync/semaphore"
)
// 定义水分子合成的辅助数据结构
type H2O struct {
  semaH *semaphore.Weighted // 氢原子的信号量
  semaO *semaphore.Weighted // 氧原子的信号量
  b     cyclicbarrier.CyclicBarrier // 循环栅栏,用来控制合成
}
func New() *H2O {
  return &H2O{
    semaH: semaphore.NewWeighted(2), //氢原子需要两个
    semaO: semaphore.NewWeighted(1), // 氧原子需要一个
    b:     cyclicbarrier.New(3),  // 需要三个原子才能合成
  }
}
func (h2o *H2O) hydrogen(releaseHydrogen func()) {
  h2o.semaH.Acquire(context.Background(), 1)

  releaseHydrogen() // 输出H
  h2o.b.Await(context.Background()) //等待栅栏放行
  h2o.semaH.Release(1) // 释放氢原子空槽
}

func (h2o *H2O) oxygen(releaseOxygen func()) {
  h2o.semaO.Acquire(context.Background(), 1)

  releaseOxygen() // 输出O
  h2o.b.Await(context.Background()) //等待栅栏放行
  h2o.semaO.Release(1) // 释放氢原子空槽
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package water
import (
    "math/rand"
    "sort"
    "sync"
    "testing"
    "time"
)
func TestWaterFactory(t *testing.T) {
    //用来存放水分子结果的channel
    var ch chan string
    releaseHydrogen := func() {
        ch <- "H"
    }
    releaseOxygen := func() {
        ch <- "O"
    }

    // 300个原子,300个goroutine,每个goroutine并发的产生一个原子
    var N = 100
    ch = make(chan string, N*3)


    h2o := New()

    // 用来等待所有的goroutine完成
    var wg sync.WaitGroup
    wg.Add(N * 3)

    // 200个氢原子goroutine
    for i := 0; i < 2*N; i++ {
        go func() {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            h2o.hydrogen(releaseHydrogen)
            wg.Done()
        }()
    }
    // 100个氧原子goroutine
    for i := 0; i < N; i++ {
        go func() {
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
            h2o.oxygen(releaseOxygen)
            wg.Done()
        }()
    }

    //等待所有的goroutine执行完
    wg.Wait()

    // 结果中肯定是300个原子
    if len(ch) != N*3 {
        t.Fatalf("expect %d atom but got %d", N*3, len(ch))
    }

    // 每三个原子一组,分别进行检查。要求这一组原子中必须包含两个氢原子和一个氧原子,这样才能正确组成一个水分子。
    var s = make([]string, 3)
    for i := 0; i < N; i++ {
        s[0] = <-ch
        s[1] = <-ch
        s[2] = <-ch
        sort.Strings(s)


        water := s[0] + s[1] + s[2]
        if water != "HHO" {
            t.Fatalf("expect a water molecule but got %s", water)
        }
    }
}