在使用分布式并发原语时,除了需要考虑可用性和数据一致性,还需要考虑分布式设计带来的性能损耗问题。所以,在使用之前,你一定要做好性能的评估。
分布式互斥锁
基于redis的分布式互斥锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
package tool
import (
"context"
"errors"
"github.com/go-redis/redis/v8"
"time"
)
// 分布式互斥锁
// 默认重试次数
var retryTimes = 20
// 默认重试频率
var retryInterval = time.Millisecond * 50
var ctx = context.Background()
type DcsLock struct {
rdb *redis.Client
RetryTimes int
RetryInterval time.Duration
}
func (m *DcsLock) SetRdb(rdb *redis.Client) *DcsLock {
m.rdb = rdb
m.RetryTimes = retryTimes
m.RetryInterval = retryInterval
return m
}
func (m *DcsLock) SetRetryTimes(i int) *DcsLock {
m.RetryTimes = i
return m
}
func (m *DcsLock) SetRetryInterval(t time.Duration) *DcsLock {
m.RetryInterval = t
return m
}
func (m *DcsLock) Lock(cacheKey string, ttl time.Duration) error {
resp := false
for i := 0; i < m.RetryTimes; i++ {
if m.SetNX(cacheKey, ttl) {
resp = true
break
}
time.Sleep(m.RetryInterval)
}
if !resp {
return errors.New("抢锁失败,请稍后重试")
}
return nil
}
func (m *DcsLock) Unlock(cacheKey string) {
m.DelSetNX(cacheKey)
}
func (m *DcsLock) SetNX(cacheKey string, ttl time.Duration) bool {
if m.rdb.SetNX(ctx, cacheKey, 1, ttl).Val() {
return true
}
return false
}
func (m *DcsLock) DelSetNX(cacheKey string) {
m.rdb.Del(ctx, cacheKey)
}
|
基于etcd的分布式互斥锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
package main
import (
"context"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"log"
"math/rand"
"strings"
"time"
)
// etcd实现分布式互斥锁
var (
addr = "http://127.0.0.1:23791,http://127.0.0.1:23792,http://127.0.0.1:23793"
lockName = "my-test-lock"
)
func main() {
rand.Seed(time.Now().UnixNano())
// etcd地址
endpoints := strings.Split(addr, ",")
// 生成一个etcd client
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
useLock(cli) // 测试锁
}
func useLock(cli *clientv3.Client) {
// 为锁生成session「节点宕机对应 session 销毁,持有的锁会被释放」
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
// 得到一个分布式锁
locker := concurrency.NewMutex(s1, lockName)
// 请求锁
log.Println("acquiring lock")
if err := locker.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
log.Println("acquired lock")
// 等待一段时间
time.Sleep(time.Duration(rand.Intn(30)+3) * time.Second)
// 释放锁
if err := locker.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
log.Println("released lock")
}
|
分布式读写锁
基于etcd的分布式读写锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
package main
import (
"bufio"
"flag"
"fmt"
"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/client/v3/experimental/recipes"
"log"
"math/rand"
"os"
"strings"
"time"
)
// etcd实现分布式读写锁
var (
rwAddr = "http://127.0.0.1:23791,http://127.0.0.1:23792,http://127.0.0.1:23793"
rwLockName = "my-test-lock"
)
func main() {
rand.Seed(time.Now().UnixNano())
// 解析etcd地址
endpoints := strings.Split(rwAddr, ",")
// 创建etcd的client
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 创建session
s1, err := concurrency.NewSession(cli) // 节点宕机对应 session 销毁,持有的锁会被释放
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := recipe.NewRWMutex(s1, rwLockName)
// 从命令行读取命令
consolescanner := bufio.NewScanner(os.Stdin)
log.Println("请输入指令w/r:")
for consolescanner.Scan() {
action := consolescanner.Text()
switch action {
case "w": // 请求写锁
testWriteLocker(m1)
case "r": // 请求读锁
testReadLocker(m1)
default:
fmt.Println("unknown action")
}
log.Println("请输入指令w/r:")
}
}
func testWriteLocker(m1 *recipe.RWMutex) {
// 请求写锁
log.Println("acquiring write lock")
if err := m1.Lock(); err != nil {
log.Fatal(err)
}
log.Println("acquired write lock")
// 等待一段时间
time.Sleep(time.Duration(rand.Intn(10)+3) * time.Second)
// 释放写锁
if err := m1.Unlock(); err != nil {
log.Fatal(err)
}
log.Println("released write lock")
}
func testReadLocker(m1 *recipe.RWMutex) {
// 请求读锁
log.Println("acquiring read lock")
if err := m1.RLock(); err != nil {
log.Fatal(err)
}
log.Println("acquired read lock")
// 等待一段时间
time.Sleep(time.Duration(rand.Intn(10)+3) * time.Second)
// 释放写锁
if err := m1.RUnlock(); err != nil {
log.Fatal(err)
}
log.Println("released read lock")
}
|
分布式队列
基于etcd的分布式队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
package main
import (
"bufio"
"fmt"
"go.etcd.io/etcd/client/v3"
recipe "go.etcd.io/etcd/client/v3/experimental/recipes"
"log"
"os"
"strings"
)
// etcd实现分布式队列
var (
etcdAddrQ = "http://127.0.0.1:23791,http://127.0.0.1:23792,http://127.0.0.1:23793"
queueName = "my-test-queue"
)
func main() {
// 解析etcd地址
endpoints := strings.Split(etcdAddrQ, ",")
// 创建etcd的client
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 创建/获取队列
q := recipe.NewQueue(cli, queueName)
// 从命令行读取命令
fmt.Println("请输入指令:push/pop,多参数用空格间隔")
consolescanner := bufio.NewScanner(os.Stdin)
for consolescanner.Scan() {
action := consolescanner.Text()
items := strings.Split(action, " ")
switch items[0] {
case "push": // 加入队列
if len(items) != 2 {
fmt.Println("must set value to push")
continue
}
q.Enqueue(items[1]) // 入队
case "pop": // 从队列弹出
v, err := q.Dequeue() // 出队
if err != nil {
log.Fatal(err)
}
fmt.Println(v) // 输出出队的元素
case "quit", "exit": //退出
return
default:
fmt.Println("unknown action")
}
fmt.Println("请输入指令:push/pop,多参数用空格间隔")
}
}
|
分布式栅栏
基于etcd的分布式栅栏
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
package main
import (
"bufio"
"fmt"
"go.etcd.io/etcd/client/v3"
recipe "go.etcd.io/etcd/client/v3/experimental/recipes"
"log"
"os"
"strings"
)
// 分布式栅栏
var (
etcdAddrB = "http://127.0.0.1:23791,http://127.0.0.1:23792,http://127.0.0.1:23793"
barrierName = "my-test-queue"
)
func main() {
// 解析etcd地址
endpoints := strings.Split(etcdAddrB, ",")
// 创建etcd的client
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 创建/获取栅栏
b := recipe.NewBarrier(cli, barrierName)
// 从命令行读取命令
fmt.Println("请输入指令hold/release/wait:")
consolescanner := bufio.NewScanner(os.Stdin)
for consolescanner.Scan() {
action := consolescanner.Text()
items := strings.Split(action, " ")
switch items[0] {
case "hold": // 持有这个barrier
if err := b.Hold(); err != nil {
fmt.Printf("hold fail: %v\n", err)
} else {
fmt.Println("hold success")
}
case "release": // 释放这个barrier
if err := b.Release(); err != nil {
fmt.Printf("released fail: %v\n", err)
} else {
fmt.Println("released success")
}
case "wait": // 等待barrier被释放
if err := b.Wait(); err != nil {
fmt.Printf("wait fail: %v\n", err)
} else {
fmt.Println("after wait")
}
case "quit", "exit": // 退出
return
default:
fmt.Println("unknown action")
}
fmt.Println("请输入指令hold/release/wait:")
}
}
|
分布式事务
todo