应用场景

etcd 是一个高可用强一致性的键值仓库,在很多分布式系统架构中得到了广泛的应用,其最经典的使用场景就是服务发现。

etcd 的场景默认处理的数据都是系统中的控制数据。所以etcd在系统中的角色不是其他NoSQL产品的替代品,更不能作为应用的主要数据存储。etcd中应该尽量只存储系统中服务的配置信息,对于应用数据只推荐把数据量很小,但是更新和访问频次都很高的数据存储在etcd中。

主要作用

etcd 主要提供可靠的配置共享、服务发现和服务注册。

可用于实现微服务网关,相当于一个反向代理服务器。

etcd 提供的分布式并发原语:互斥锁、读写锁、Leader 选举、队列、栅栏、事务。

特点

  • 简单:易于部署,易使用。基于 HTTP+JSON 的 API 让你用 curl 就可以轻松使用。
  • 安全:可选 SSL 客户认证机制。
  • 快速:每个实例每秒支持一千次写操作。
  • 可信:使用一致性 Raft 算法充分实现了分布式。(Etcd 是基于 Raft 协议作为分布式一致性算法来解决领导者选举和日志复制问题)

系统中实现服务注册与发现所需的基本功能

  • 服务注册:同一 service 的所有节点注册到相同目录下,节点启动后将自己的信息注册到所属服务的目录中。
  • 健康检查:服务节点定时发送心跳,注册到服务目录中的信息设置一个较短的 TTL,运行正常的服务节点每隔一段时间会去更新信息的 TTL。
  • 服务发现:通过名称能查询到服务提供外部访问的 IP 和端口号。比如网关代理服务时能够及时的发现服务中新增节点、丢弃不可用的服务节点,同时各个服务间也能感知对方的存在。

etcd 流程图

核心组件

核心组件

从 etcd 的架构图中我们可以看到,etcd 主要分为四个部分。

  • HTTP Server:用于处理用户发送的 API 请求以及其它 etcd 节点的同步与心跳信息请求。
  • Store:用于处理 etcd 支持的各类功能的事务,包括数据索引、节点状态变更、监控与反馈、事件处理与执行等等,是 etcd 对用户提供的大多数 API 功能的具体实现。
  • Raft:Raft 强一致性算法的具体实现,是 etcd 的核心。
  • WAL:Write Ahead Log(预写式日志),是 etcd 的数据存储方式。除了在内存中存有所有数据的状态以及节点的索引以外,etcd 就通过 WAL 进行持久化存储。WAL 中,所有的数据提交前都会事先记录日志。Snapshot 是为了防止数据过多而进行的状态快照;Entry 表示存储的具体日志内容。

通常,一个用户的请求发送过来,会经由 HTTP Server 转发给 Store 进行具体的事务处理,如果涉及到节点的修改,则交给 Raft 模块进行状态的变更、日志的记录,然后再同步给别的 etcd 节点以确认数据提交,最后进行数据的提交,再次同步。

服务发现

服务发现要解决的是分布式系统中最常见的问题之一,即在同一个分布式集群中的进程或服务,要如何才能找到对方并建立连接。本质上来说,服务发现就是想要了解集群中是否有进程在监听 udp 或 tcp 端口,并且通过名字就可以查找和连接。要解决服务发现的问题,需要有下面三大支柱,缺一不可。

  • 一个强一致性、高可用的服务存储目录。基于 Raft 算法的 etcd 就是一个强一致性高可用的服务存储目录。
  • 一种注册服务和监控服务健康状态的机制。用户可以在 etcd 中注册服务,并且对注册的服务设置 key TTL,定时保持服务的心跳以达到监控健康状态的效果。
  • 一种查找和连接服务的机制。通过在 etcd 指定的主题(由服务名称构成的服务目录)下注册的服务也能在对应的主题下查找到。

服务注册

etcd 可以提前知道服务器是否有单点故障或网络问题,然后不再转发流量到有问题的服务器

租约模式

etcd 是一个 分布式键值存储系统,在一个集群中,如果一个节点配置了某些属性,集群中的每个节点都可以使用完整的存档,我们每次在网关节点后的计算节点集群加一个服务,只需要向etcd 注册 该服务(其实就是 存一个值)然后向etcd 发送心跳,当etcd 没有检测到心跳就会 把这个键值对 删了(这整个动作是etcd里的租约模式),网关那边 就只需要 watch 这个 key ,就能够知道 所有服务的所有动态了。

启动服务端

./etcd1 –listen-client-urls ‘http://0.0.0.0:2379’ –advertise-client-urls ‘http://0.0.0.0:2379’

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// docker-compose.yml
etcd1: # 服务发现 etcd
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    container_name: etcd1
    command:
      - '--name=etcd1'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=http://etcd1:2380'
      - '--listen-peer-urls=http://0.0.0.0:2380'
      - '--listen-client-urls=http://0.0.0.0:2379'
      - '--advertise-client-urls=http://0.0.0.0:23791' # 解决 rpc error: code = DeadlineExceeded desc = latest balancer error: all SubConns are in TransientFailure, latest connection error: connection error: desc = \"transport: Error while dialing dial tcp 127.0.0.1:2379: connect: connection refused
      - '--initial-cluster-token=mys1cr2tt1k7n'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - 23791:2379
      - 23801:2380

--name      节点名称
--data-dir      指定节点的数据存储目录
--listen-peer-urls      监听URL,用于与其他节点通讯
--listen-client-urls    对外提供服务的地址:比如 http://ip:2379,http://127.0.0.1:2379 ,客户端会连接到这里和 etcd 交互
--initial-advertise-peer-urls   该节点同伴监听地址,这个值会告诉集群中其他节点
--initial-cluster   集群中所有节点的信息,格式为 node1=http://ip1:2380,node2=http://ip2:2380,… 。注意:这里的 node1 是节点的 --name 指定的名字;后面的 ip1:2380 是 --initial-advertise-peer-urls 指定的值
--initial-cluster-state     新建集群的时候,这个值为 new ;假如已经存在的集群,这个值为 existing
--initial-cluster-token     创建集群的 token,这个值每个集群保持唯一。这样的话,如果你要重新创建集群,即使配置和之前一样,也会再次生成新的集群和节点 uuid;否则会导致多个集群之间的冲突,造成未知的错误
--advertise-client-urls     对外公告的该节点客户端监听地址,这个值会告诉集群中其他节点

交互命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
> etcdctl version
etcdctl version: 3.4.7
API version: 3.4

> etcdctl put hello v1
OK

> etcdctl get hello
hello
v1

> etcdctl get /test/ok --prefix
/test/ok
/test/ok/id

> etcdctl get hello --print-value-only
v1

> etcdctl del hello
1

# 开始监听某个 key
> etcdctl watch hello
# 。。。什么都不打印,在等待结果

# 当另一个窗口执行了 etcdctl put hello 2 之后
> etcdctl watch hello
PUT
hello
2

续租(租约模式)

租约被删除,归属的 kv 也同时会被删除。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 查找已创建的 lease
> etcdctl lease list

# 创建一个20s的租约
> etcdctl lease grant 20
lease 694d673115905e37 granted with TTL(20s)

# 使用租约的 id 进行 put 操作
> etcdctl put --lease=694d673115905e37 "name" "amber"

# 20s后get发现 key被删除了
> etcdctl get "name"
# 空应答

# 删除租约
> etcdctl lease revoke 694d673115905e49
lease 694d673115905e49 revoked

# 自动续租
> etcdctl lease keep-alive 694d673115905e4f
lease 694d673115905e4f keepalived with TTL(20)
lease 694d673115905e4f keepalived with TTL(20)

用 etcd 实现 API 网关的部分源代码

注意事项

  1. API 网关实现了服务和路由的注册与发现
  2. API 网关为什么要初始化路由,因为可以实现路由权限校验、利用中间件判断路由归属哪个服务并执行转发等
  3. 匹配服务对应路由规则:需要考虑固定路由、动态路由(如/test/:id/tes2/:id)、通配符(如/test//id/
  4. 代码生成的 lease id 是 10 进制,终端命令生成的 lease id 是 16 进制

注册服务和路由,需考虑部署多服务器或 k8s 多节点的情况

  • 注册 service: 使用 sync.Map 存储 2 对 kv,第一个 key 为 微服务名称,第二个 key 为 lease id,value 均为微服务对应的 host
  • 注册 route: 使用 sync.Map 存储 2 对 kv,第一个 key 为 method+path,第二个 key 为 lease id, value 均为微服务名称
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
const (
    Separator  = "@"
    ServiceKey = "service"
    RouteKey   = "route"
)

// service
type ServiceKV struct {
    prefix string
    sync.Map
    lease sync.Map // 存放节点
}

// route
type RouteKV struct {
    prefix string
    sync.Map
    lease sync.Map // 存放节点
}

func NewRoute(prefix string) *RouteKV {
    return &RouteKV{prefix: prefix}
}

func (r *RouteKV) GetPrefix() string {
    return r.prefix
}

// 监听 kv 变化、包括更新、删除
func (r *RouteKV) Watch(watchChan clientv3.WatchChan) {
    for wresp := range watchChan {
        for _, ev := range wresp.Events {
            key := string(ev.Kv.Key)
            switch ev.Type {
            case mvccpb.PUT:
                val := string(ev.Kv.Value)
                r.Set(key, val)
            case mvccpb.DELETE:
                r.Del(key)
            }
        }
    }
}

func (r *RouteKV) Extra(resp *clientv3.GetResponse) []string {
    ret := make([]string, 0)
    if resp == nil || resp.Kvs == nil {
        return ret
    }
    for i := range resp.Kvs {
        if v := resp.Kvs[i].Value; v != nil {
            r.Set(string(resp.Kvs[i].Key), string(resp.Kvs[i].Value))
            ret = append(ret, string(v))
        }
    }
    return ret
}

func (r *RouteKV) List() []string {
    var ret []string
    r.Range(func(key, value interface{}) bool {
        k, _ := key.(string)
        if v, ok := value.(string); ok {
            ret = append(ret, strings.Join([]string{k, v}, "->"))
        }
        return true
    })
    return ret
}

func (r *RouteKV) Set(key, val string) {
    keys := strings.Split(key, Separator)
    if len(keys) < 3 {
        return
    }
    r.Store(strings.Join(keys[2:], Separator), val)
    r.lease.Store(keys[1], val) // store lease->service
    logger.Sugar.Debugf("add:%s->%s", key, val)
}

func (r *RouteKV) Get(key string) string {
    v, ok := r.Load(key)
    if !ok {
        return ""
    }
    val, _ := v.(string)
    return val
}

// 匹配
func (r *RouteKV) Match(key string) string {
    if ret := r.Get(key); ret != "" {
        return ret
    }

    keys := strings.Split(key, Separator)
    if len(keys) < 2 {
        return ""
    }
    method := keys[0]
    path := strings.Join(keys[1:], Separator)

    l := len(strings.Split(path, "/"))

    var ret string
    r.Range(func(k, v interface{}) bool {
        k2, ok := k.(string)
        if !ok {
            return true
        }
        keys2 := strings.Split(k2, Separator)
        if len(keys2) < 2 {
            return true
        }
        method2 := keys2[0]
        if method != method2 {
            return true
        }
        path2 := strings.Join(keys2[1:], Separator)

        // 动态参数路由
        wildcard := ":"
        isWild := true
        if strings.Contains(path2, wildcard) && l == len(strings.Split(path2, "/")) { // 动态参数路由必须同级
            temp := path
            wilds := strings.Split(path2, wildcard)
            for _, wild := range wilds {
                if !strings.HasPrefix(wild, "/") { // 去除通配符级,跳下一级
                    ws := strings.Split(wild, "/")
                    wild = strings.Join(ws[1:], "/")
                }
                if !strings.HasPrefix(temp, wild) {
                    isWild = false
                    break
                }
                temp = temp[len(wild):]
                temps := strings.Split(temp, "/")
                temp = strings.Join(temps[1:], "/")
            }
            if isWild {
                ret, _ = v.(string)
                return false
            }
        }

        // 通配符
        wildcard = "*"
        isWild = true
        if strings.Contains(path2, wildcard) {
            wilds := strings.Split(path2, wildcard)
            temp := path
            for i, wild := range wilds {
                if !strings.HasPrefix(wild, "/") { // 去除通配符级,跳下一级
                    ws := strings.Split(wild, "/")
                    wild = strings.Join(ws[1:], "/")
                }
                if !strings.HasPrefix(temp, wild) {
                    isWild = false
                    break
                }
                temp = temp[len(wild):]
                temps := strings.Split(temp, "/")
                temp = strings.Join(temps[1:], "/")
                if temp == "" && i != len(wilds)-1 {
                    isWild = false
                    break
                }
            }
            if isWild && ret == "" {
                ret, _ = v.(string)
            }
        }
        return true
    })
    return ret
}

func (r *RouteKV) Del(key string) {
    keys := strings.Split(key, Separator)
    if len(keys) < 3 {
        return
    }
    r.lease.Delete(keys[1])
    logger.Sugar.Debug("del lease:", keys[1])
    // 判断是否存在节点
    service := r.Get(strings.Join(keys[2:], Separator))
    var podExists bool
    r.lease.Range(func(key, value interface{}) bool {
        v, ok := value.(string)
        if !ok {
            return true
        }
        if v == service {
            podExists = true
            return false
        }
        return true
    })
    if !podExists {
        r.Delete(strings.Join(keys[2:], Separator))
        logger.Sugar.Debug("del key:", strings.Join(keys[2:], Separator))
    }
}

其他微服务网关方案

Consul

微服务 API 网关 Kong

相关文章

etcd实现服务发现和注册

golang etcd服务注册与发现