七叶笔记 » golang编程 » Etcd服务注册与发现封装实现–golang

Etcd服务注册与发现封装实现–golang

  • 服务注册 register.go
 package register

import (
"fmt"
"time"
etcd3 "github.com/coreos/etcd/clientv3"
"context"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"Test/common/service"
"log"
)

type RegisterClient struct {
etcdClient      *etcd3.Client
EtcdEndpoints   []string
refreshInterval time.Duration
keyTTL          int
serviceName     string
host            string
port            int
stopSignal      chan bool
}

// 服务注册,命名方式"/service/{serviceName}/{serviceHost}:{servicePort}", 值为:"{serviceHost}:{servicePort}", 注册是当前服务本身而已,故可直接注册
//Tips: 如果需要,可以将值的内容转换成json格式,增减内容丰富度
func Register(serviceName string, host string, port int, EtcdEndpoints []string, refreshInterval time.Duration, keyTTL int) (*RegisterClient, error) {
client := &RegisterClient{}
client.serviceName = serviceName
client.host = host
client.port = port
client.EtcdEndpoints = EtcdEndpoints
client.refreshInterval = refreshInterval

// 保证存活时间比刷新时间长
if keyTTL < int(refreshInterval.Seconds()) {
keyTTL = 2 * int(refreshInterval.Seconds())
}
client.keyTTL = keyTTL

etcdClient, err := etcd3.New(etcd3.Config{
Endpoints: client.EtcdEndpoints,
})
if err != nil {
return nil, fmt.Errorf("Create etcd client failed: s%s", err)
}
client.stopSignal = make(chan bool, 1)
client.etcdClient = etcdClient
client.register()
return client, nil
}

func (c *RegisterClient) register() error {

go func() {
// invoke self-register with ticker
ticker := time.NewTicker(c.refreshInterval)
for {
// should get first, if not exist, set it
resp, err := c.etcdClient.Grant(context.Background(), int64(c.keyTTL))
if err != nil {
log.Printf("Register service failed. serviceName: %s, err: %s", c.serviceName, err.Error())
}
fmt.Println(c.getKey())
_, err = c.etcdClient.Get(context.Background(), c.getKey())
value := fmt.Sprintf("%s:%d", c.host, c.port)

if err != nil {
if err == rpctypes.ErrKeyNotFound {
if _, err := c.etcdClient.Put(context.Background(), c.getKey(), value, etcd3.WithLease(resp.ID)); err != nil {
//if _, err := c.etcdClient.Put(context.Background(), c.getKey(), value); err != nil {
log.Printf("Register service failed. serviceName: %s, value: %s, err: %s", c.serviceName, value, err.Error())
}
} else {
log.Printf("Get Value from etcd failed: %s", err)
}
} else {

if _, err := c.etcdClient.Put(context.Background(), c.getKey(), value, etcd3.WithLease(resp.ID)); err != nil {
//if _, err := c.etcdClient.Put(context.Background(), c.getKey(), value); err != nil {
log.Printf("Register service failed. serviceName: %s, value: %s, err: %s", c.serviceName, value, err.Error())
} else {
log.Printf("Register current service: %s, value: %s", c.serviceName, value)
}
}
select {
case <-c.stopSignal: // 手动停止注册
return
case <-ticker.C: // 等待一定时间间隔
}
}
}()
return nil
}

// 只有服务在注册之后,才能够取消注册
func (c *RegisterClient) UnRegister() error {
c.stopSignal <- true

_, err := c.etcdClient.Get(context.Background(), c.getKey())

if err != nil {
log.Printf("Get Value from etcd failed: %s", err.Error())
} else if _, err := c.etcdClient.Delete(context.Background(), c.getKey()); err != nil {
log.Printf("UnRegister service failed. serviceName: %s, err: %s", c.serviceName, err.Error())
}
return err
}

func (c *RegisterClient) getKey() string {
return fmt.Sprintf("/%s/%s/%s:%d", service.Prefix, c.serviceName, c.host, c.port)
}
  
  • 服务发现 discovery.go
 /* Copyright 2018 Bruce Liu.  All rights reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package discovery

import (
etcd3 "github.com/coreos/etcd/clientv3"
"fmt"
"errors"
"math/rand"
"context"
"Test/common/service"
"github.com/coreos/etcd/mvcc/mvccpb"
"log"
)

// 基本方式:本地维护一个服务发现的结果,优先从本地服务读取值. 第一次获取一个服务的地址时,因本地没有,故从etcd服务获取,然后换到到本地,并同时启动该服务的watcher,有更新时,及时更新本地缓存。
// Tips 一个服务需要发现多个其他服务,故需要有client,且后期通过client来获取服务的值
type DiscoveryClient struct {
etcdClient    *etcd3.Client
EtcdEndpoints []string

// 服务列表: key 为service 名字 todo 本地缓存也需要有一个时效,以及一个反馈服务失效的地方,这个可以后期做服务降级之类的东西的时候再做
serviceMap map[string][]string

watcher map[string]bool
}

func NewClient(etcdEndpoints []string) (*DiscoveryClient, error) {
client := &DiscoveryClient{}
client.EtcdEndpoints = etcdEndpoints
etcdClient, err := etcd3.New(etcd3.Config{
Endpoints: client.EtcdEndpoints,
})
if err != nil {
return nil, fmt.Errorf("Create register etcdClient failed: %s", err.Error())
}
client.etcdClient = etcdClient

client.serviceMap = make(map[string][]string)

client.watcher = make(map[string]bool)

return client, nil
}

func (c *DiscoveryClient) GetService(serviceName string) (string, error) {

if serviceName == "" {
return "", errors.New("Service name should not be null")
}

// 从本地缓存的获取
if len(c.serviceMap[serviceName]) > 0 {
i := rand.Intn(len(c.serviceMap[serviceName]))
return c.serviceMap[serviceName][i], nil
}

// 从 etcd服务获取
resp, err := c.etcdClient.Get(context.Background(), c.getKey(serviceName), etcd3.WithPrefix())
if err != nil {
log.Printf("Get service failed: %s, error: %s", serviceName, err.Error())
return "", err
}

// 建立watcher,有变化,及时更新 只启动一个watch?
if !c.watcher[serviceName] {
go c.watch(serviceName)
}

serviceList := service.ExtractAddress(resp)

if len(serviceList) == 0 {
return "", errors.New("Service not found.")
}

c.serviceMap[serviceName] = serviceList

i := rand.Intn(len(c.serviceMap[serviceName]))
return c.serviceMap[serviceName][i], nil
}

func (c *DiscoveryClient) getKey(serviceName string) string {
return fmt.Sprintf("/%s/%s", service.Prefix, serviceName)
}

// 开始watch这个service的注册信息
func (c *DiscoveryClient) watch(serviceName string) {
c.watcher[serviceName] = true
defer func() {
// 如果此函数退出,那么则可以重新建立watcher
c.watcher[serviceName] = false
}()
// prefix is the etcd prefix/value to watch
prefix := c.getKey(serviceName)
log.Printf("Watch service : %s", serviceName)
// 创建watch
rch := c.etcdClient.Watch(context.Background(), prefix, etcd3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
log.Printf("Catch changes: %s, event: %d", serviceName, ev.Type)
switch ev.Type {
case mvccpb.PUT:
find := false
for _, v := range c.serviceMap[serviceName] {
if v == string(ev.Kv.Value) {
find = true
break
}
}
if !find {
log.Printf("Add new service: %s, value: %s", serviceName, string(ev.Kv.Value))
c.serviceMap[serviceName] = append(c.serviceMap[serviceName], string(ev.Kv.Value))
}
break
case mvccpb.DELETE:
index := -1
key := string(ev.Kv.Key)
value := key[len(c.getKey(serviceName))+1:len(key)]

for i, v := range c.serviceMap[serviceName] {
if v == value {
index = i
break
}
}

if index >= 0 && index < len(c.serviceMap[serviceName]) {
log.Printf("Delete service: %s, value: %s", serviceName, string(ev.Kv.Value))
c.serviceMap[serviceName] = append(c.serviceMap[serviceName][0:index], c.serviceMap[serviceName][index+1: len(c.serviceMap[serviceName])]...)
}
break
}
}
}

}
  

相关文章