在使用分布式并发原语时,除了需要考虑可用性和数据一致性,还需要考虑分布式设计带来的性能损耗问题。所以,在使用之前,你一定要做好性能的评估。

分布式互斥锁

基于redis的分布式互斥锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package tool

import (
    "context"
    "errors"
    "github.com/go-redis/redis/v8"
    "time"
)

// 分布式互斥锁

// 默认重试次数
var retryTimes = 20

// 默认重试频率
var retryInterval = time.Millisecond * 50

var ctx = context.Background()

type DcsLock struct {
    rdb           *redis.Client
    RetryTimes    int
    RetryInterval time.Duration
}

func (m *DcsLock) SetRdb(rdb *redis.Client) *DcsLock {
    m.rdb = rdb
    m.RetryTimes = retryTimes
    m.RetryInterval = retryInterval
    return m
}

func (m *DcsLock) SetRetryTimes(i int) *DcsLock {
    m.RetryTimes = i
    return m
}

func (m *DcsLock) SetRetryInterval(t time.Duration) *DcsLock {
    m.RetryInterval = t
    return m
}

func (m *DcsLock) Lock(cacheKey string, ttl time.Duration) error {
    resp := false
    for i := 0; i < m.RetryTimes; i++ {
        if m.SetNX(cacheKey, ttl) {
            resp = true
            break
        }
        time.Sleep(m.RetryInterval)
    }
    if !resp {
        return errors.New("抢锁失败,请稍后重试")
    }
    return nil
}

func (m *DcsLock) Unlock(cacheKey string) {
    m.DelSetNX(cacheKey)
}

func (m *DcsLock) SetNX(cacheKey string, ttl time.Duration) bool {
    if m.rdb.SetNX(ctx, cacheKey, 1, ttl).Val() {
        return true
    }
    return false
}

func (m *DcsLock) DelSetNX(cacheKey string) {
    m.rdb.Del(ctx, cacheKey)
}

基于etcd的分布式互斥锁

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

import (
    "context"
    "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/concurrency"
    "log"
    "math/rand"
    "strings"
    "time"
)

// etcd实现分布式互斥锁
var (
    addr     = "http://127.0.0.1:23791,http://127.0.0.1:23792,http://127.0.0.1:23793"
    lockName = "my-test-lock"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    // etcd地址
    endpoints := strings.Split(addr, ",")

    // 生成一个etcd client
    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    useLock(cli) // 测试锁
}

func useLock(cli *clientv3.Client) {
    // 为锁生成session「节点宕机对应 session 销毁,持有的锁会被释放」
    s1, err := concurrency.NewSession(cli)
    if err != nil {
        log.Fatal(err)
    }
    defer s1.Close()

    // 得到一个分布式锁
    locker := concurrency.NewMutex(s1, lockName)

    // 请求锁
    log.Println("acquiring lock")
    if err := locker.Lock(context.TODO()); err != nil {
        log.Fatal(err)
    }
    log.Println("acquired lock")

    // 等待一段时间
    time.Sleep(time.Duration(rand.Intn(30)+3) * time.Second)

    // 释放锁
    if err := locker.Unlock(context.TODO()); err != nil {
        log.Fatal(err)
    }

    log.Println("released lock")
}

分布式读写锁

基于etcd的分布式读写锁

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
package main

import (
    "bufio"
    "flag"
    "fmt"
    "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/concurrency"
    "go.etcd.io/etcd/client/v3/experimental/recipes"
    "log"
    "math/rand"
    "os"
    "strings"
    "time"
)

// etcd实现分布式读写锁
var (
    rwAddr     = "http://127.0.0.1:23791,http://127.0.0.1:23792,http://127.0.0.1:23793"
    rwLockName = "my-test-lock"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    // 解析etcd地址
    endpoints := strings.Split(rwAddr, ",")

    // 创建etcd的client
    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }

    defer cli.Close()

    // 创建session
    s1, err := concurrency.NewSession(cli) // 节点宕机对应 session 销毁,持有的锁会被释放
    if err != nil {
        log.Fatal(err)
    }

    defer s1.Close()

    m1 := recipe.NewRWMutex(s1, rwLockName)

    // 从命令行读取命令
    consolescanner := bufio.NewScanner(os.Stdin)
    log.Println("请输入指令w/r:")
    for consolescanner.Scan() {
        action := consolescanner.Text()
        switch action {
        case "w": // 请求写锁
            testWriteLocker(m1)
        case "r": // 请求读锁
            testReadLocker(m1)
        default:
            fmt.Println("unknown action")
        }
        log.Println("请输入指令w/r:")
    }
}

func testWriteLocker(m1 *recipe.RWMutex) {
    // 请求写锁
    log.Println("acquiring write lock")
    if err := m1.Lock(); err != nil {
        log.Fatal(err)
    }

    log.Println("acquired write lock")

    // 等待一段时间
    time.Sleep(time.Duration(rand.Intn(10)+3) * time.Second)

    // 释放写锁
    if err := m1.Unlock(); err != nil {
        log.Fatal(err)
    }

    log.Println("released write lock")
}

func testReadLocker(m1 *recipe.RWMutex) {
    // 请求读锁
    log.Println("acquiring read lock")
    if err := m1.RLock(); err != nil {
        log.Fatal(err)
    }

    log.Println("acquired read lock")

    // 等待一段时间
    time.Sleep(time.Duration(rand.Intn(10)+3) * time.Second)

    // 释放写锁
    if err := m1.RUnlock(); err != nil {
        log.Fatal(err)
    }

    log.Println("released read lock")
}

分布式队列

基于etcd的分布式队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

package main

import (
    "bufio"
    "fmt"
    "go.etcd.io/etcd/client/v3"
    recipe "go.etcd.io/etcd/client/v3/experimental/recipes"
    "log"
    "os"
    "strings"
)

// etcd实现分布式队列
var (
    etcdAddrQ     = "http://127.0.0.1:23791,http://127.0.0.1:23792,http://127.0.0.1:23793"
    queueName = "my-test-queue"
)

func main() {
    // 解析etcd地址
    endpoints := strings.Split(etcdAddrQ, ",")

    // 创建etcd的client
    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    // 创建/获取队列
    q := recipe.NewQueue(cli, queueName)

    // 从命令行读取命令
    fmt.Println("请输入指令:push/pop,多参数用空格间隔")
    consolescanner := bufio.NewScanner(os.Stdin)
    for consolescanner.Scan() {
        action := consolescanner.Text()
        items := strings.Split(action, " ")
        switch items[0] {
        case "push": // 加入队列
            if len(items) != 2 {
                fmt.Println("must set value to push")
                continue
            }
            q.Enqueue(items[1]) // 入队
        case "pop": // 从队列弹出
            v, err := q.Dequeue() // 出队
            if err != nil {
                log.Fatal(err)
            }
            fmt.Println(v) // 输出出队的元素
        case "quit", "exit": //退出
            return
        default:
            fmt.Println("unknown action")
        }

        fmt.Println("请输入指令:push/pop,多参数用空格间隔")
    }
}

分布式栅栏

基于etcd的分布式栅栏

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
    "bufio"
    "fmt"
    "go.etcd.io/etcd/client/v3"
    recipe "go.etcd.io/etcd/client/v3/experimental/recipes"
    "log"
    "os"
    "strings"
)

// 分布式栅栏
var (
    etcdAddrB   = "http://127.0.0.1:23791,http://127.0.0.1:23792,http://127.0.0.1:23793"
    barrierName = "my-test-queue"
)

func main() {

    // 解析etcd地址
    endpoints := strings.Split(etcdAddrB, ",")

    // 创建etcd的client
    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    // 创建/获取栅栏
    b := recipe.NewBarrier(cli, barrierName)

    // 从命令行读取命令
    fmt.Println("请输入指令hold/release/wait:")
    consolescanner := bufio.NewScanner(os.Stdin)
    for consolescanner.Scan() {
        action := consolescanner.Text()
        items := strings.Split(action, " ")
        switch items[0] {
        case "hold": // 持有这个barrier
            if err := b.Hold(); err != nil {
                fmt.Printf("hold fail: %v\n", err)
            } else {
                fmt.Println("hold success")
            }
        case "release": // 释放这个barrier
            if err := b.Release(); err != nil {
                fmt.Printf("released fail: %v\n", err)
            } else {
                fmt.Println("released success")
            }
        case "wait": // 等待barrier被释放
            if err := b.Wait(); err != nil {
                fmt.Printf("wait fail: %v\n", err)
            } else {
                fmt.Println("after wait")
            }
        case "quit", "exit": // 退出
            return
        default:
            fmt.Println("unknown action")
        }
        fmt.Println("请输入指令hold/release/wait:")
    }
}

分布式事务

todo