搭建etcd集群
机制
- 静态
- etcd动态发现
- DNS发现
常见参数
–listen-peer-urls
环境变量: ETCD_LISTEN_PEER_URLS
–listen-client-urls
用于监听客户端通讯的URL列表。这个标记告诉 etcd 在特定的 scheme://IP:port 组合上从客户端接收进来的请求。scheme 可是 http 或者 https。
环境变量: ETCD_LISTEN_CLIENT_URLS
–initial-advertise-peer-urls
列出这个成员的伙伴 URL 以便通告给集群的其他成员。这些地方用于在集群中通讯 etcd 数据。至少有一个必须对所有集群成员可以路由的。这些 URL 可以包含域名。
环境变量: ETCD_INITIAL_ADVERTISE_PEER_URLS
–initial-cluster
为启动初始化集群配置。
环境变量: ETCD_INITIAL_CLUSTER
–initial- cluster -state
初始化集群状态(“new” or “existing”)。在初始化静态(initial static )或者 DNS 启动 (DNS bootstrapping) 期间为所有成员设置为 new 。如果这个选项被设置为 existing , etcd 将试图加入已有的集群。如果设置为错误的值,etcd 将尝试启动但安全失败。
环境变量: ETCD_INITIAL_CLUSTER_STATE
–advertise-client-urls
列出这个成员的客户端URL,通告给集群中的其他成员。这些 URL 可以包含域名。
环境变量: ETCD_ADVERTISE_ client _URLS
图形化配置
在 kubernetes 里部署
1.code
headless svc, 像DNS RR ClusterIP:None
kubectl -n stg1 get endpoints
apiVersion: v1
kind: Service
metadata:
name: etcd
labels:
app: etcd
spec:
ports:
- port: 2380
name: etcd-server
- port: 2379
name: etcd-client
clusterIP: None
selector:
app: etcd
publishNotReadyAddresses: true
client 怎么访问:
kubectl port-forward svc/etcd-cluster-client 2379:2379
apiVersion: v1
kind: Service
metadata:
labels:
app: etcd
name: etcd-cluster-client
spec:
ports:
- name: etcd-cluster-2379
port: 2379
protocol : TCP
targetPort: 2379
selector:
app: etcd
sessionAffinity: None
type: ClusterIP
2.配置文件
- PDB
- service
- sts
3.apply
官方的code有两个问题
- etcd版本老
- 配置不对: 2022-01-29 14:47:51.344590 E | etcdmain: error verifying flags, expected IP in URL for binding ( See ‘etcd –help’ ,暂时不要用3.4,3.5的 image
本地访问
kubectl port-forward svc/etcd-cluster-client 2379:2379
扩容
~/etcd ❯❯❯ kubectl scale --replicas=5 sts etcd
statefulset.apps/etcd scaled
~/etcd ❯❯❯ kubectl get pod
NAME READY STATUS RESTARTS AGE
etcd-0 1/1 Running 0 12m
etcd-1 1/1 Running 0 12m
etcd-2 1/1 Running 0 12m
etcd-3 0/1 Pending 0 2s
利用反亲和性 分布etcd pod到不同节点
常用命令
- 集群状态
/ # export ETCDCTL_API=3
/ # etcdctl --endpoints etcd-0.etcd:2379,etcd-1.etcd:2379,etcd-2.etcd:2379 endpoint status --write-out=table
+------------------+------------------+---------+---------+-----------+-----------+------------+
| ENDPOINT | ID | VERSION | DB SIZE | IS LEADER | raft TERM | RAFT INDEX |
+------------------+------------------+---------+---------+-----------+-----------+------------+
| etcd-0.etcd:2379 | c799a6ef06bc8c14 | 3.3.8 | 20 kB | false | 6 | 9 |
| etcd-1.etcd:2379 | 9869f0647883a00d | 3.3.8 | 20 kB | true | 6 | 9 |
| etcd-2.etcd:2379 | 42c8b94265b9b79a | 3.3.8 | 20 kB | false | 6 | 9 |
+------------------+------------------+---------+---------+-----------+-----------+------------+
- 查看member
bash-3.2$ etcdctl member list
42c8b94265b9b79a, started, etcd-2, false
58996ec4dced38b4, started, etcd-4, false
9869f0647883a00d, started, etcd-1, false
b4476a6cf0b4d72c, started, etcd-3, false
c799a6ef06bc8c14, started, etcd-0, false
- 健康检查
bash-3.2$ etcdctl endpoint health
127.0.0.1 :2379 is healthy: successfully committed proposal: took = 1.051829439s
- 增删改查
bash-3.2$ etcdctl put /test/key "fmeng"
OK
bash-3.2$ etcdctl get /test/key
/test/key
fmeng
bash-3.2$ etcdctl put /test/key "fmeng is dashuaib"
OK
bash-3.2$ etcdctl get /test/key
/test/key
fmeng is dashuaib
~ ❯❯❯ etcdctl get / –prefix
- watch
bash-3.2$ etcdctl watch /test/key --rev=1
PUT
/test/key
fmeng
PUT
/test/key
fmeng is dashuaib
备份与还原
# 备份命令
$ export ETCDCTL_API=3
$ etcdctl --endpoints=${ENDPOINTS} Snapshot save /data/etcd_backup_dir/etcd-snapshot.db
$ etcdctl --endpoints=${ENDPOINTS} snapshot restore snapshot.db
原理
架构
从 etcd 的架构图中我们可以看到,etcd 主要分为四个部分。
- HTTP Server:用于处理用户发送的 API 请求以及其它 etcd 节点的同步与心跳信息请求。
- Store:用于处理 etcd 支持的各类功能的事务,包括数据索引、节点状态变更、监控与反馈、事件处理与执行等等,是 etcd 对用户提供的大多数 API 功能的具体实现。
- Raft:Raft 强一致性算法的具体实现,是 etcd 的核心。
- WAL:Write Ahead Log(预写式日志),是 etcd 的数据存储方式。除了在内存中存有所有数据的状态以及节点的索引以外,etcd 就通过 WAL 进行持久化存储。 WAL 中,所有的数据提交前都会事先记录日志。Snapshot 是为了防止数据过多而进行的状态快照;Entry 表示存储的具体日志内容。
- etcd 面向 client 和 peer 节点开放 http 服务以及 grpc 服务,对于像 watch 机制就是基于 grpc 的 stream 通信模式实现的;
- EtcdServer 是 etcd 上层结构体,其负责对外提供服务,且负责应用层的实现,比如操作应用层存储器,管理 leassor 、 watch ;
- raftNode 负责上层与 raft 层的衔接。其负责将应用的需求传递到 raft 中进行处理(通过 Step 函数)、在消息发送到其他节点前将消息保存到 WAL 中、调用传输器发送消息;
- raft 是 raft 协议的承载者;
- raftLog 用于存储状态机信息: memoryStorge 保存稳定的记录, unstable 保存不稳定的记录。
v2/v3
etcd 目前支持 V2 和 V3 两个大版本,这两个版本在实现上有比较大的不同,一方面是对外提供接口的方式,另一方面就是底层的存储引擎,V2 版本的实例是一个纯内存的实现,所有的数据都没有存储在磁盘上,而 V3 版本的实例就支持了数据的持久化。
v3默认boltdb
consortium etcd2+mysql
存储
数据默认会存放在 /var/lib/etcd/default/ 目录。我们会发现数据所在的目录,会被分为两个文件夹中,分别是 snap 和 wal目录。
- snap
- 存放快照数据,存储etcd的数据状态
- etcd防止WAL文件过多而设置的快照
- wal
- 存放预写式日志
- 最大的作用是记录了整个数据变化的全部历程
- 在etcd中,所有数据的修改在提交前都要先写入到WAL中
Raft算法
解决三个问题:节点选举、日志复制以及安全性 。
每一个 Raft 集群中都包含多个服务器,在任意时刻,每一台服务器只可能处于 Leader 、 Follower 以及 Candidate 三种状态;在处于正常的状态时,集群中只会存在一个 Leader 状态,其余的服务器都是 Follower 状态。
所有的 Follower 节点都是被动的,它们不会主动发出任何的请求 ,只会响应 Leader 和 Candidate 发出的请求。对于每一个用户的可变操作,都会被路由给 Leader 节点进行处理,除了 Leader 和 Follower 节点之外,Candidate 节点其实只是集群运行过程中的一个临时状态。
每一个服务器都会存储当前集群的最新任期,它就像是一个单调递增的逻辑时钟,能够同步各个节点之间的状态,当前节点持有的任期会随着每一个请求被传递到其他的节点上。Raft 协议在每一个任期的开始时都会从一个集群中选出一个节点作为集群的 Leader 节点,这个节点会负责集群中的日志的复制以及管理工作。
Watch
客户端通过 监听指定的key可以迅速感知key的变化并作出相应处理 ,watch机制的实现依赖于 资源版本号revision的设计 ,每一次key的更新都会使得revision原子递增,因此根据不同的版本号revision的对比就可以感知新事件的发生。etcd watch机制有着广泛的应用,比如利用etcd实现分布式锁; k8s中监听各种资源的变化 ,从而实现各种 controller 逻辑等。
watch机制的实现主要可分为三个部分
- 客户端gRPC调用
- server端gRPC处理
- 从底层存储获取更新事件
client使用 watchClient 的watch接口发起watch请求,与server端建立一个 gRPCStream 连接。
server端会为每个client生成唯一一个watch id,并记录每个client也就是watcher监听的key或者key range,通过recvLoop接收client请求,通过sendLoop发送请求,server端只负责收发请求和响应。
主要的实现都放在了watchalbStore层,watchalbStore会监听key的变化,然后通过syncWatchersLoop和syncVictimsLoop两个处理流程将key的更新变化包装成event,通过channel发送给gRPC server。
MVCC
MVCC(Multiversion Concurrency Control)多版本并发控制机制
- 多版本
bash-3.2$ etcdctl put /奥特曼 "他很帅"
OK
bash-3.2$
bash-3.2$ etcdctl put /奥特曼 "他真的很帅吗?"
OK
bash-3.2$ etcdctl del /奥特曼
1
bash-3.2$ etcdctl watch /奥特曼 --rev=1
PUT
/奥特曼
他很帅
PUT
/奥特曼
他真的很帅吗?
DELETE
/奥特曼
多版本的本质就是把一个key的历史变化都存下来
多版本是watch机制的基石
- 并发控制
场景1:
程序A:我要修改 fmeng 这个key的value
数据库:好,你改吧,我加锁。
程序B:我也要修改 fmeng
数据库:你等着,等A改好,你再来。具体看我的锁什么时候释放。
这就是悲观锁
悲观锁:悲观得认为并发事务会冲突,所以要先拿锁,拿到锁的作修改操作
- 读写锁(可多读,写时互斥)、互斥锁(线程互斥)等
- 控制粒度太大,高并发下大量事务会被阻塞。
场景2
程序A:我要修改 fmeng 这个key的value
数据库:好的,你修改value时候请将版本号设置为12
程序A:改好了,帮我写回磁盘
程序B:我修改 fmeng 的value
数据库:好的,你修改value时候请将版本号设置为13
程序B:改好了,帮我写回磁盘
同时
程序C:我修改 fmeng 的value
数据库:好的,你修改value时候请将版本号设置为13
程序C:改好了,帮我写回磁盘
数据库:写回磁盘,A写好了。哎,B和C都是version 13,我咋写?算了,报错吧。。
就是 乐观锁 ,默认不加锁,你尽管写,冲突我认怂!乐观锁其实不是锁,只是相对悲观锁来定义,适合读多写少。
乐观锁:乐观得认为数据不会冲突,但发生冲突时要能检测到。
场景3
程序A:我要更新 fmeng
数据库:你干吧,你的版本是20
程序B:我要更新 fmeng
数据库:你干吧,你的版本是22
程序C:我要更新 fmeng
数据库:你干吧,你的版本是23
这就是MVCC,在 MVCC 数据库中,你更新一个 key-value 数据的时候,它并不会直接覆盖原数据,而是 新增一个版本来存储新的数据,每个数据都有一个版本号 ,版本号是一个逻辑时钟,不会因为服务器时间的差异而受影响。
MVCC不等于乐观锁!
- etcd中的MVCC
type revision struct {
main int64 // 一个全局递增的主版本号,随put/txn/delete事务递增,一个事务内的key main版本号是一致的
sub int64 // 一个事务内的子版本号,从0开始随事务内put/delete操作递增
}
–rev 查的是main
go etcdctl watch /奥特曼 --rev=1
在底层boltdb里,实际分布是这样的:
底层的key是revision,/奥特曼是用户key,“他很帅”就是用户value
删除
之前有delete动作,但是依然有版本记录。为什么?
etcdctl del /奥特曼
删除这个动作,其实etcd是在blotdb里写了一条,“删除用户/奥特曼”
此时有个问题:用户说我的确删除了啊,真的不要了!请把空间还给我啊!
回收 compact(压缩)
为了防止空间不够用,必须定期释放一些用户已经声明删除的数据,这个动作就叫做 compact。
etcdctl compact {version}
compact 需要一个版本号。这个版本号就是写事务递增的那个版本号,compact 12345,就是说把版本12345以前的 标记删除了的数据 释放掉,用户没删除的数据肯定不能回收。
如何压缩:
- etcd 可以使用带有小时时间单位的 –auto-compaction 选项来设置为自动压缩键空间:
- etcdctl 如下发起压缩工作: etcdctl compact 123456
开发篇
CRUD
注意修改go.mod
replace github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.6
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
// init client
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Printf("connect to etcd failed, err:%v\n", err )
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "奥特曼", "大帅哥")
cancel()
if err != nil {
fmt.Printf("put to etcd failed, err:%v\n", err)
return
}
// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, "奥特曼")
cancel()
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
fmt.Printf("%s:%s\n", ev.Key, ev.Value)
}
}
Watch
package main
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
// watch demo
func main() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
fmt.Println("connect to etcd success")
defer cli.Close()
rch := cli.Watch(context.Background(), "/test/key") // <-chan WatchResponse
for wresp := range rch {
for _, ev := range wresp.Events {
fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
}
}
}
服务发现
服务发现要解决的也是分布式系统中最常见的问题之一,即在同一个分布式集群中的进程或服务,要如何才能找到对方并建立连接。本质上来说,服务发现就是想要了解集群中是否有进程在监听 udp 或 tcp 端口,并且通过名字就可以查找和连接。
需要实现的功能;
- 服务发现:通过服务节点能查询到服务提供外部访问的 IP 和端口号。比如网关代理服务时能够及时的发现服务中新增节点、丢弃不可用的服务节点。
- 服务注册:同一service的所有节点注册到相同目录下,节点启动后将自己的信息注册到所属服务的目录中。
- 健康检查:服务节点定时进行健康检查。注册到服务目录中的信息设置一个较短的TTL,运行正常的服务节点每隔一段时间会去更新信息的TTL ,从而达到健康检查效果。
discover.go
package main
import (
"context"
"log"
"sync"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
)
//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
cli *clientv3.Client //etcd client
serverList map[string]string //服务列表
lock sync.Mutex
}
//NewServiceDiscovery 新建发现服务
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
return &ServiceDiscovery{
cli: cli,
serverList: make(map[string]string),
}
}
//WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
//根据前缀获取现有的key
resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return err
}
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
//监视前缀,修改变更的server
go s.watcher(prefix)
return nil
}
//watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: //修改或者新增
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: //删除
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
//SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.lock.Lock()
defer s.lock.Unlock()
s.serverList[key] = string(val)
log.Println("put key :", key, "val:", val)
}
//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.serverList, key)
log.Println("del key:", key)
}
//GetServices 获取服务地址
func (s *ServiceDiscovery) GetServices() []string {
s.lock.Lock()
defer s.lock.Unlock()
addrs := make([]string, 0)
for _, v := range s.serverList {
addrs = append(addrs, v)
}
return addrs
}
//Close 关闭服务
func (s *ServiceDiscovery) Close() error {
return s.cli.Close()
}
func main() {
var endpoints = []string{"localhost:2379"}
ser := NewServiceDiscovery(endpoints)
defer ser.Close()
ser.WatchService("/web/")
//ser.WatchService("/gRPC/")
for {
select {
case <-time.Tick(10 * time.Second):
log.Println(ser.GetServices())
}
}
}
//resgister.go
package main
import (
"context"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
//ServiceRegister 创建租约注册服务
type ServiceRegister struct {
cli *clientv3.Client //etcd client
leaseID clientv3.LeaseID //租约ID
//租约keepalieve相应chan
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string //key
val string //value
}
//NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, key, val string, lease int64) (*ServiceRegister, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
ser := &ServiceRegister{
cli: cli,
key: key,
val: val,
}
//申请租约设置时间keepalive
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}
return ser, nil
}
//设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
//设置租约时间
resp, err := s.cli.Grant(context.Background(), lease)
if err != nil {
return err
}
//注册服务并绑定租约
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
if err != nil {
return err
}
//设置续租 定期发送需求请求
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return err
}
s.leaseID = resp.ID
log.Println(s.leaseID)
s.keepAliveChan = leaseRespChan
log.Printf("Put key:%s val:%s success!", s.key, s.val)
return nil
}
//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
for leaseKeepResp := range s.keepAliveChan {
log.Println("续约成功", leaseKeepResp)
}
log.Println("关闭续租")
}
// Close 注销服务
func (s *ServiceRegister) Close() error {
//撤销租约
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
return err
}
log.Println("撤销租约")
return s.cli.Close()
}
func main() {
var endpoints = []string{"localhost:2379"}
ser, err := NewServiceRegister(endpoints, "/web/node1", "localhost:8000", 5)
if err != nil {
log.Fatalln(err)
}
//监听续租相应chan
go ser.ListenLeaseRespChan()
select {
case <-time.After(20 * time.Second):
ser.Close()
}
}
面试题
- etcd的raft和zookeeper的paxos的区别
- etcd和redis区别
- etcd性能优化(存储,网络,数据分离)
- etcd是如何实现一致性的?
- etcd的存储是如何实现的?
- etcd的watch机制是如何实现的?
- etcd的key过期机制是如何实现的,lease
- 乐观锁与悲观锁,MVCC
案例和扩展
eBay payment
ebay kubernetes 控制面架构
问题
- 怎么看etcd最多支持多少个watch的链接啊
- CAP etcd放弃了哪个?