并发实践
Go 的开发者极力推荐使用 Channel,不过,这两年,大家意识到,Channel 并不是处理并发问题的“银弹”,有时候使用并发原语更简单,而且不容易出错。
所以,我给你提供一套选择的方法:
- 共享资源的并发访问使用传统并发原语;
- 复杂的任务编排和消息传递使用 Channel;
- 需要和 Select 语句结合,使用 Channel;
- 需要和超时配合时,使用 Channel 和 Context;
- 简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
- 消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;
分段锁ConcurrentMap,map+读写锁,sync.map的效率测试
原文:go 分段锁ConcurrentMap,map+读写锁,sync.map的效率测试
效率测试结论:
- go 自带的 map 不是多协程安全的
- 分段锁 ConcurrentMap 是多协程安全的,且效率最高;sync.map 效率次之,传统的 map + 读写锁效率最低
- ConcurrentMap 中锁的个数越多,效率越高,因为争夺同一把锁的概率降低了
go sync.map 源码分析
原文:go sync.map 源码分析
go 并发安全map 分段锁实现
原文:go 并发安全map 分段锁实现
并行计算并指定key存储结果案例
Golang 在运行时会在 map 的增改删查过程中检测是否有并发读写的情况,当发现并发读写时直接异常退出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
package main
import (
"fmt"
"sync"
"time"
)
//题目
//如果我们列出10以下所有能够被3或者5整除的自然数,那么我们得到的是3,5,6和9。这四个数的和是23。
//那么请计算1000以下(不包括1000)的所有能够被3或者5整除的自然数的和。
//
//这个题目的一个思路就是:
//
//(1) 先计算1000以下所有能够被3整除的整数的和A,
//(2) 然后计算1000以下所有能够被5整除的整数和B,
//(3) 然后再计算1000以下所有能够被3和5整除的整数和C,
//(4) 使用A+B-C就得到了最后的结果。
func main() {
var sum sync.Map
wg := sync.WaitGroup{}
limit := 1000 // 最大数
startTime := time.Now()
divider3 := 3
divider5 := 5
divider15 := 15
wg.Add(3)
go get_sum_of_divisible(limit, divider3, &sum, &wg)
go get_sum_of_divisible(limit, divider5, &sum, &wg)
go get_sum_of_divisible(limit, divider15, &sum, &wg)
wg.Wait() // 此处阻塞,等所有协程跑完才会执行下面的代码
sum3, ok := sum.Load(divider3)
if !ok {
fmt.Println("数据异常,没获取到key:", divider3)
return
}
sum5, ok := sum.Load(divider5)
if !ok {
fmt.Println("数据异常,没获取到key:", divider5)
return
}
sum15, ok := sum.Load(divider15)
if !ok {
fmt.Println("数据异常,没获取到key:", divider15)
return
}
total := sum3.(int) + sum5.(int) - sum15.(int)
endTime := time.Now()
fmt.Println("耗时:", endTime.Sub(startTime))
fmt.Println("计算结果:", total)
}
func get_sum_of_divisible(num int, divider int, sum *sync.Map, wg *sync.WaitGroup) {
defer wg.Done()
total := 0
for value := 0; value < num; value++ {
if value%divider == 0 {
total += value
}
}
sum.Store(divider, total)
}
|
并发限流案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
var (
nextId uint64
limit int = 1000
endAt = time.Now().Format("2006-01-02")
)
ch := make(chan struct{}, 10) // 限制最高并发10
wg := sync.WaitGroup{}
for {
data := GetList(nextId, endAt, limit)
if len(data) <= 0 {
break
}
for _, v := range data {
ch <- struct{}{}
wg.Add(1)
if nextId < v.Id {
nextId = v.Id
}
go func(v model_interact.Rabbit) {
defer recover()
defer wg.Done()
defer func() {
<-ch
}()
UpdateByUserId(v.UserId, map[string]interface{}{
"challenge_chance": gorm.Expr("challenge_chance+?", model_interact.RabbitMaxChallengeTimes),
})
}(v)
}
if len(data) < limit {
break
}
}
wg.Wait()
|
互斥锁的执行顺序
如果 Mutex 已经被一个 goroutine 获取了锁,其它等待中的 goroutine 们只能一直等待。那么,等这个锁释放后,等待中的 goroutine 中哪一个会优先获取 Mutex 呢?
等待的goroutine们是以FIFO排队的
-
当Mutex处于正常模式时,若此时没有新goroutine与队头goroutine竞争,则队头goroutine获得,若有新goroutine竞争则大概率新goroutine获得。
-
当队头goroutine竞争锁失败1ms后,它会将Mutex调整为饥饿模式
。进入饥饿模式后,锁的所有权会直接从解锁goroutine移交给队头goroutine,此时新来的goroutine直接放入队尾。
-
当一个goroutine获取锁后,如果发现自己满足下列条件中的任何一个,则将锁切换回正常模式
golang.org/x/sync
Go 官方扩展库
SingleFlight
SingleFlight 是 Go 开发组提供的一个扩展并发原语。它的作用是,在处理多个 goroutine 同时调用同一个函数的时候,只让一个 goroutine 去调用这个函数,等到这个 goroutine 返回结果的时候,再把结果返回给这几个同时调用的 goroutine,这样可以减少并发调用的数量。
使用场景: 如防缓存击穿、减少并发读库以提高性能、减少并发写相同数据以提高性能。
标准库中的 sync.Once 也可以保证并发的 goroutine 只会执行一次函数 f,那么,SingleFlight 和 sync.Once 有什么区别呢?
其实,sync.Once 不是只在并发的时候保证只有一个 goroutine 执行函数 f,而是会保证永远只执行一次,而 SingleFlight 是每次调用都重新执行,并且在多个请求同时调用的时候只有一个执行。它们两个面对的场景是不同的,sync.Once 主要是用在单次初始化场景中,而 SingleFlight 主要用在合并并发请求的场景中,尤其是缓存场景。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
package main
import (
"fmt"
"golang.org/x/sync/singleflight"
"sync"
"sync/atomic"
)
// 防缓存击穿代码示例
var count = int64(0)
func a() (interface{}, error) {
//time.Sleep(time.Millisecond * 500) // 去掉注释可查看高并发取值情况
return atomic.AddInt64(&count, 1), nil
}
func main() {
g := singleflight.Group{}
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
// 直接返回结果集
val, err, shared := g.Do("cache_key_name", a) // Do方法执行一个函数,并返回函数执行的结果。你需要提供一个 key,对于同一个 key,在同一时间只有一个在执行,同一个 key 并发的请求会等待。第一个执行的请求返回的结果,就是它的返回结果。函数 fn 是一个无参的函数,返回一个结果或者 error,而 Do 方法会返回函数执行的结果或者是 error,shared 会指示运行结果是否返回给多个请求。
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("index: %d, val: %d, shared: %v\n", j, val, shared)
// 返回值为 channel 方式
//result := g.DoChan("cache_key_name2", a)
//ret, _ := <-result
//fmt.Printf("channel->index: %d, val: %d, shared: %v\n", j, ret.Val, ret.Shared)
}(i)
}
wg.Wait()
}
|
semaphore
信号量(Semaphore)是用来控制多个 goroutine 同时访问多个资源的并发原语。(实际上,我们还有很多方法实现信号量,比较典型的就是使用 Channel 来实现。)
信号量是按照先入先出的方式唤醒调用者。当释放 100 个资源的时候,如果第一个等待者需要 101 个资源,那么,队列中的所有等待者都会继续等待,即使有的等待者只需要 1 个资源。这样做的目的是避免饥饿,否则的话,资源可能总是被那些请求资源数小的调用者获取,这样一来,请求资源数巨大的调用者,就没有机会获得资源了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// 使用信号量限制 goroutine 并发数
func main() {
var (
semaWeight int64 = 3 // 最多同时运行 n 个 goroutine
total = 20 // 总共需要执行的协程数量
sema = semaphore.NewWeighted(semaWeight)
ctx = context.Background()
)
for i := 0; i < total; i++ {
if err := sema.Acquire(ctx, 1); err != nil {
log.Printf("sema acquire err: %v\n", err)
continue
}
go func(i int) {
log.Printf("go func: %d, time: %d\n", i, time.Now().Unix())
time.Sleep(time.Second)
sema.Release(1)
}(i)
}
if err := sema.Acquire(ctx, semaWeight); err != nil{
log.Printf("sema acquire err: %v\n", err)
}
}
|
errgroup
处理任务编排:分组执行一批相同的或类似的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
func main() {
g := errgroup.Group{}
result := make([]error, 3) // 保存成功或者失败的结果
g.Go(func() error {
time.Sleep(1*time.Second)
fmt.Println("exec #1")
result[0] = nil
return nil
})
g.Go(func() error {
time.Sleep(3*time.Second)
fmt.Println("exec #2")
result[1] = errors.New("failed to exec #2")
return result[1]
})
g.Go(func() error {
time.Sleep(5 * time.Second)
fmt.Println("exec #3")
result[2] = errors.New("failed to exec #3")
return result[2]
})
if err := g.Wait(); err != nil {
fmt.Printf("failed: %v\n", result)
} else {
fmt.Println("successfully exec all")
}
}
|
循环栅栏 CyclicBarrier
github.com/marusama/cyclicbarrier
CyclicBarrier允许一组 goroutine 彼此等待,到达一个共同的执行点。同时,因为它可以被重复使用,所以叫循环栅栏。具体的机制是,大家都在栅栏前等待,等全部都到齐了,就抬起栅栏放行。
CyclicBarrier 和 WaitGroup 的功能有点类似,但使用场景不同。CyclicBarrier 和 WaitGroup 各自的使用场景:
- 等待同一组 goroutine 同时执行:CyclicBarrier 更适合用在“固定数量的 goroutine 等待同一个执行点”的场景中,而且在放行 goroutine 之后,CyclicBarrier 可以重复利用。
- 等待同一组 goroutine 都完成:WaitGroup 更适合用在“一个 goroutine 等待一组 goroutine 到达同一个执行点”的场景中,或者是不需要重用的场景中。
并发趣题:一氧化二氢制造工厂
有一个名叫大自然的搬运工的工厂,生产一种叫做一氧化二氢的神秘液体。这种液体的分子是由一个氧原子和两个氢原子组成的,也就是水。
这个工厂有多条生产线,每条生产线负责生产氧原子或者是氢原子,每条生产线由一个 goroutine 负责。
这些生产线会通过一个栅栏,只有一个氧原子生产线和两个氢原子生产线都准备好,才能生成出一个水分子,否则所有的生产线都会处于等待状态。也就是说,一个水分子必须由三个不同的生产线提供原子,而且水分子是一个一个按照顺序产生的,每生产一个水分子,就会打印出 HHO、HOH、OHH 三种形式的其中一种。HHH、OOH、OHO、HOO、OOO 都是不允许的。
生产线中氢原子的生产线为 2N 条,氧原子的生产线为 N 条。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
package water
import (
"context"
"github.com/marusama/cyclicbarrier"
"golang.org/x/sync/semaphore"
)
// 定义水分子合成的辅助数据结构
type H2O struct {
semaH *semaphore.Weighted // 氢原子的信号量
semaO *semaphore.Weighted // 氧原子的信号量
b cyclicbarrier.CyclicBarrier // 循环栅栏,用来控制合成
}
func New() *H2O {
return &H2O{
semaH: semaphore.NewWeighted(2), //氢原子需要两个
semaO: semaphore.NewWeighted(1), // 氧原子需要一个
b: cyclicbarrier.New(3), // 需要三个原子才能合成
}
}
func (h2o *H2O) hydrogen(releaseHydrogen func()) {
h2o.semaH.Acquire(context.Background(), 1)
releaseHydrogen() // 输出H
h2o.b.Await(context.Background()) //等待栅栏放行
h2o.semaH.Release(1) // 释放氢原子空槽
}
func (h2o *H2O) oxygen(releaseOxygen func()) {
h2o.semaO.Acquire(context.Background(), 1)
releaseOxygen() // 输出O
h2o.b.Await(context.Background()) //等待栅栏放行
h2o.semaO.Release(1) // 释放氢原子空槽
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
package water
import (
"math/rand"
"sort"
"sync"
"testing"
"time"
)
func TestWaterFactory(t *testing.T) {
//用来存放水分子结果的channel
var ch chan string
releaseHydrogen := func() {
ch <- "H"
}
releaseOxygen := func() {
ch <- "O"
}
// 300个原子,300个goroutine,每个goroutine并发的产生一个原子
var N = 100
ch = make(chan string, N*3)
h2o := New()
// 用来等待所有的goroutine完成
var wg sync.WaitGroup
wg.Add(N * 3)
// 200个氢原子goroutine
for i := 0; i < 2*N; i++ {
go func() {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
h2o.hydrogen(releaseHydrogen)
wg.Done()
}()
}
// 100个氧原子goroutine
for i := 0; i < N; i++ {
go func() {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
h2o.oxygen(releaseOxygen)
wg.Done()
}()
}
//等待所有的goroutine执行完
wg.Wait()
// 结果中肯定是300个原子
if len(ch) != N*3 {
t.Fatalf("expect %d atom but got %d", N*3, len(ch))
}
// 每三个原子一组,分别进行检查。要求这一组原子中必须包含两个氢原子和一个氧原子,这样才能正确组成一个水分子。
var s = make([]string, 3)
for i := 0; i < N; i++ {
s[0] = <-ch
s[1] = <-ch
s[2] = <-ch
sort.Strings(s)
water := s[0] + s[1] + s[2]
if water != "HHO" {
t.Fatalf("expect a water molecule but got %s", water)
}
}
}
|