go-kit/etcdv3库学习
lookupman Lv2

  本章介绍使用go-kit的工具包etcdv3进行对etcd做服务注册和发现操作

预备知识

etcd的key-value存储和查找方式

  etcd是使用key-value的形式进行存储的,与consul不同的是,etcd的查找是通过前缀进行查找的。举个例子,服务名为hellworld的服务部署了3个实例在不同的服务器上:
  ”http://192.168.10.111:8888"
  ”http://192.168.10.112:8888"
  ”http://192.168.10.113:8888"
那么在etcd服务器中存储的值一般的存储方式为:
key1 = “/service/helloworld/192.168.10.111:8888” value1 = “http://192.168.10.111:8888"
key2 = “/service/helloworld/192.168.10.112:8888” value2 = “http://192.168.10.112:8888"
key3 = “/service/helloworld/192.168.10.113:8888” value3 = “http://192.168.10.113:8888"
  那么在服务发现时,就通过查找前缀为”/service/helloworld/“的所有的key-value,etcd就会返回前缀为”/service/helloworld/“所有的值。

服务注册

  go-kit提供了对etcd进行服务注册的封装的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type ClientOptions struct {
Cert string //用于SSL
Key string //用于SSL
CACert string //用于SSL
DialTimeout time.Duration //建立连接失败的超时时间
DialKeepAlive time.Duration //存活检测时间
Username string //etcd的用户名,未在etcd中设置,则不需要
Password string //etcd的密码,未在etcd中设置,则不需要
}

//etcd创建客户端接口,machines为etcd的地址
func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error)

type Service struct {
Key string //存储在etcd的key
Value string //存储在etcd的value
TTL *TTLOption //心跳检测时间
}

//NewRegistrar
func NewRegistrar(client Client, service Service, logger log.Logger) *Registrar

使用etcdv3进行服务注册,etcdv3的接口中封装了对etcd存活检测的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func main(){
etcdHost := "127.0.0.1"
etcdPort := 2379

serviceHost := "127.0.0.1"
servicePort := 8888
serviceName := "service/hellworld"

ctx := context.Background()

clientOption := etcdv3.ClientOption{
DialTimeout: 3 * time.Second,
DialKeepAlive: 3* time.Second,
}

addr := etcdHost+":"+strconv.Itoa(etcdPort)
//创建etcd客户端
client, err := etcdv3.NewClient(ctx, []string{addr}, clientOption)
if err != nil{
log.Println(err)
return
}

addr := serviceHost + ":" + strconv.Itoa(servicePort)
key := string.TrimRight(serviceName, "/") + "/" + addr
value := "http://" + addr

//创建注册器
registrar := etcdv3.NewRegistrar(client, etcdv3.Service{Key: key, Value: value}, kitlog.NewNopLogger())
//服务注册
registrar.Registrar()

errChan := make(chan error)
go func(){
errChan <- http.ListenAndServe(:+strconv.Itoa(servicePort, nil))
}()

go func(){
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errChan <- fmt.Errorf("%s", <-c)
}()

err := <-errChan
registrar.DeRegister()
}

服务发现

  etcdv3中同样封装了服务发现的接口,同时接口提供了服务变更时通知的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func main(){
etcdHost := "127.0.0.1"
etcdPort := 2379

prefix := "/service/helloworld/"

ctx := context.Background()

clientOption := etcdv3.ClientOption{
DialTimeout: 3 * time.Second,
DialKeepAlive: 3* time.Second,
}

addr := etcdHost+":"+strconv.Itoa(etcdPort)
//创建etcd客户端
client, err := etcdv3.NewClient(ctx, []string{addr}, clientOption)
if err != nil{
log.Println(err)
return
}

instancer, err := etcdv3.NewInstancer(client, prefix, kitlog.NewNopLogger())
if err != nil{
log.Println(err)
return
}

ch := make(chan sd.Event)

//将chan注册到instancer中,当服务发生变化时,会将新的数据更新到chan
go func() {
instancer.Register(ch)
}

//获取更新的值
go func() {
for event := range ch{
if event.Err != nil{
log.Println(event.Err)
}

fmt.Println("service url is: ", event.Instances)
}
}()

go func(){
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errChan <- fmt.Errorf("%s", <-c)
}()

err := <-errChan
instancer.DeRegister(ch)

}

自己的接口封装

  为了简化以后的使用,自己封装了一个结构体,不足之处请指出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package common

import (
"context"
"errors"
"github.com/go-kit/kit/sd"
"log"
"strconv"
"strings"
"sync"
"time"

kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd/etcdv3"
)

type EtcdV3DiscoverClient struct {
client etcdv3.Client
registrar *etcdv3.Registrar

instanceMutex sync.Mutex
instances map[string]*etcdv3.Instancer

serviceMutex sync.Mutex
serviceUrls map[string][]string
}

func NewEctdV3DiscoverClient(ectdHost string, ectdPort int) *EtcdV3DiscoverClient {
ctx := context.Background()

clientOption := etcdv3.ClientOptions{
DialTimeout: 3 * time.Second,
DialKeepAlive: 3 * time.Second,
}

addr := ectdHost + ":" + strconv.Itoa(ectdPort)
client, err := etcdv3.NewClient(ctx, []string{addr}, clientOption)
if err != nil {
log.Println(err)
return nil
}

return &EtcdV3DiscoverClient{
client: client,
instances: make(map[string]*etcdv3.Instancer),
serviceUrls: make(map[string][]string),
}
}

func (cli *EtcdV3DiscoverClient) Register(serviceName string, serviceHost string, servicePort int) {
addr := serviceHost + ":" + strconv.Itoa(servicePort)
key := strings.TrimRight(serviceName, "/") + "/" + addr
value := "http://" + addr

cli.registrar = etcdv3.NewRegistrar(cli.client, etcdv3.Service{Key: key, Value: value}, kitlog.NewNopLogger())
cli.registrar.Register()
}

func (cli *EtcdV3DiscoverClient) DeRegister() {
cli.registrar.Deregister()
}

func (cli *EtcdV3DiscoverClient) DiscoveryServices(serviceName string) ([]string, error) {
var err error

prefix := strings.TrimRight(serviceName, "/") + "/"

cli.instanceMutex.Lock()
instancer, ok := cli.instances[serviceName]
cli.instanceMutex.Unlock()
if !ok {
instancer, err = etcdv3.NewInstancer(cli.client, prefix, kitlog.NewNopLogger())
if err != nil {
return nil, err
}

cli.serviceMutex.Lock()
cli.instances[serviceName] = instancer
cli.serviceMutex.Unlock()

cli.watch(instancer, serviceName)

return cli.client.GetEntries(prefix)
}

cli.serviceMutex.Lock()
serviceUrl, ok := cli.serviceUrls[serviceName]
cli.serviceMutex.Unlock()
if !ok {
return nil, errors.New("no service url")
}
return serviceUrl, nil
}

func (cli *EtcdV3DiscoverClient) watch(instancer *etcdv3.Instancer , serviceName string) {
ch := make(chan sd.Event)

go func(){
instancer.Register(ch)
}()

go func() {
for event := range ch {
if event.Err != nil {

}

cli.serviceMutex.Lock()
cli.serviceUrls[serviceName] = event.Instances
log.Println(time.Now())
cli.serviceMutex.Unlock()
}
instancer.Deregister(ch)
cli.instanceMutex.Lock()
delete(cli.instances, serviceName)
cli.instanceMutex.Unlock()
}()
}

参考文章:

  • 本文标题:go-kit/etcdv3库学习
  • 本文作者:lookupman
  • 创建时间:2020-12-06 20:08:01
  • 本文链接:https://lookupman.cn/2020/12/06/go-kit-etcdv3库学习/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!