go-kit中使用consul服务注册与发现、健康检测
lookupman Lv2

  本章介绍在go-kit中进行使用服务发现和注册,以及健康检测

首先使用kit提供的库,定义对consul的访问

  首先定义服务发现的客户端接口,接口需要提供3个方法,服务注册、服务注销以及服务发现。

1
2
3
4
5
6
7
8
9
10
11
12
package discovery

import "log"

type DiscoverClient interface{
Register(serviceName, instanceId, healthCheckUrl string,
instanceHost string, instancePort int, meta map[string]string, logger *log.Logger) bool

DeRegister(instanceId string, logger *log.Logger) bool

DiscoveryServices(serviceName string, logger *log.Logger) []interface{}
}

  然后定义对接口的实现,在此处使用go-kit封装的库对接口进行实现,同时在服务发现部分通过map对服务进行缓存,在服务发生变化时,通过获取consul的通知进行更新map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package discovery

import (
"github.com/go-kit/kit/sd/consul"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"log"
"strconv"
"sync"
)

type KitDiscoverClient struct {
consulHost string //consul的地址
consulPort int //consul的端口
client consul.Client //kit封装consul的客户端
config *api.Config //kit封装的consul配置
mutex sync.Mutex
instanceMap sync.Map
}

func NewKitDiscoverClient(consulHost string, consulPort int) (DiscoverClient, error) {
consulConfig := api.DefaultConfig()
consulConfig.Address = consulHost + ":" + strconv.Itoa(consulPort)
apiClient, err := api.NewClient(consulConfig)
if err != nil {
return nil, err
}
client := consul.NewClient(apiClient)
return &KitDiscoverClient{
consulHost: consulHost,
consulPort: consulPort,
client: client,
config: consulConfig,
}, nil

}

func (consulClient *KitDiscoverClient) Register(serviceName, instanceId, healthCheckUrl string,
instanceHost string, instancePort int, meta map[string]string, logger *log.Logger) bool {
serviceRegistration := &api.AgentServiceRegistration{
ID: instanceId,
Name: serviceName,
Address: instanceHost,
Port: instancePort,
Meta: meta,
Check: &api.AgentServiceCheck{
DeregisterCriticalServiceAfter: "30s",
HTTP: "http://" + instanceHost + ":" + strconv.Itoa(instancePort) + healthCheckUrl,
Interval: "15s",
},
}

err := consulClient.client.Register(serviceRegistration)
if err != nil {
log.Println("Register Service Error!")
return false
}
log.Println("Register Service Success!")
return true
}

func (consulClient *KitDiscoverClient) DeRegister(instanceId string, logger *log.Logger) bool {
serviceRegistration := &api.AgentServiceRegistration{
ID: instanceId,
}

err := consulClient.client.Deregister(serviceRegistration)
if err != nil {
logger.Println("Deregister Service Error!")
return false
}

log.Println("Deregister Service Success!")
return true
}

func (consulClient *KitDiscoverClient) DiscoveryServices(serviceName string, logger *log.Logger) []interface{} {
//判断服务是否已缓存
instanceList, ok := consulClient.instanceMap.Load(serviceName)
if ok {
return instanceList.([]interface{})
}

consulClient.mutex.Lock()
defer consulClient.mutex.Unlock()
//加锁后在判断一次,服务是否已缓存
instanceList, ok = consulClient.instanceMap.Load(serviceName)
if ok {
return instanceList.([]interface{})
}

//响应服务变更通知,更新服务map
go func() {
params := make(map[string]interface{})
params["type"] = "service"
params["service"] = serviceName
plan, _ := watch.Parse(params)
plan.Handler = func(u uint64, i interface{}) {
if i == nil {
return
}

v, ok := i.([]*api.ServiceEntry)
if !ok {
return
}

if len(v) == 0 {
consulClient.instanceMap.Store(serviceName, []interface{}{})
}

var healthServices []interface{}
for _, service := range v {
if service.Checks.AggregatedStatus() == api.HealthPassing {
healthServices = append(healthServices, service)
}
}
consulClient.instanceMap.Store(serviceName, healthServices)
}
defer plan.Stop()
plan.Run(consulClient.config.Address)
}()

//调用go-kit库向consul获取服务
entries, _, err := consulClient.client.Service(serviceName, "", false, nil)
if err != nil {
consulClient.instanceMap.Store(serviceName, []interface{}{})
logger.Println("Discover Service Error")
return nil
}

instances := make([]interface{}, 0, len(entries))
for _, instance := range entries {
instances = append(instances, instance)
}

consulClient.instanceMap.Store(serviceName, instances)
return instances
}

定义服务

  定义服务,本文实例定义两个服务:SayHello和HealthCheck。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package service

import (
"context"
"errors"
"github.com/lookupman/service_discovery/config"
"github.com/lookupman/service_discovery/discovery"
)

//定义服务接口
type Service interface {
HealthCheck() bool
SayHello() string
}

//实现SayHello服务接口
type DiscoveryServiceImpl struct {
discoveryClient discovery.DiscoverClient
}

func NewDiscoveryServiceImpl(discoveryClient discovery.DiscoverClient) Service {
return &DiscoveryServiceImpl{discoveryClient: discoveryClient}
}

func (service *DiscoveryServiceImpl) HealthCheck() bool {
return true
}

func (service DiscoveryServiceImpl) SayHello() string {
return "Hello World!"
}

定义服务端点(Endpoint)

  endpoint是go-kit定义的一个函数类型,定义返回这个类型的函数,传给NewServer作为参数实现对http handler的注册,另外需要定义request解码和response编码的函数。

1
type Endpoint func(ctx context.Context, request interface{}) (response interface{}, err error)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package endpoint

import (
"context"
"github.com/go-kit/kit/endpoint"
"github.com/lookupman/service_discovery/service"
)

type DiscoveryEndpoints struct {
SayHelloEndpoint endpoint.Endpoint
HealthCheckEndpoint endpoint.Endpoint
}

type SayHelloRequest struct {
}

type SayHelloResponse struct {
Message string `json:"message"`
}

func MakeSayHelloEndpoint(svc service.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
message := svc.SayHello()

return SayHelloResponse{Message: message}, nil
}
}

type HealthRequest struct{}

type HealthResponse struct {
Status bool `json:"status"`
}

func MakeHealthCheckEndpoint(svc service.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
status := svc.HealthCheck()
return HealthResponse{Status: status}, nil
}
}

定义http层handler

  在这里做的是,使用go-kit的库实现对endpoint、编解码函数封装,实现http的handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package transport

import (
"context"
"encoding/json"
"errors"
"net/http"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/transport"
kithttp "github.com/go-kit/kit/transport/http"
"github.com/gorilla/mux"
"github.com/lookupman/service_discovery/endpoint"
)

var (
ErrorRequest = errors.New("invalid request parameter")
)

func MakeHttpHandler(ctx context.Context, endpoints endpoint.DiscoveryEndpoints, logger log.Logger) http.Handler {
r := mux.NewRouter()

options := []kithttp.ServerOption{
kithttp.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
kithttp.ServerErrorEncoder(encodeError),
}

r.Methods("GET").Path("/SayHello").Handler(kithttp.NewServer(
endpoints.SayHelloEndpoint,
decodeSayHelloRequest,
encodeJsonResponse,
options...,
))

r.Methods("GET").Path("/health").Handler(kithttp.NewServer(
endpoints.HealthCheckEndpoint,
decodeHealthRequest,
encodeJsonResponse,
options...,
))

return r
}

func decodeSayHelloRequest(ctx context.Context, req *http.Request) (request interface{}, err error) {
return endpoint.SayHelloRequest{}, nil
}

func decodeHealthRequest(ctx context.Context, req *http.Request) (request interface{}, err error) {
return endpoint.HealthRequest{}, nil
}

func encodeJsonResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
w.Header().Set("Content-Type", "application/json;charset=utf-8")
return json.NewEncoder(w).Encode(response)
}

func encodeError(ctx context.Context, err error, w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json;charset=utf-8")
switch err {
default:
w.WriteHeader(http.StatusInternalServerError)
}
json.NewEncoder(w).Encode(map[string]interface{}{
"error": err.Error(),
})
}

主函数

  最后是主函数,在这里绑定监听端口以及将服务注册到consul

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package main

import (
"context"
"fmt"
"github.com/lookupman/service_discovery/config"
"github.com/lookupman/service_discovery/discovery"
"github.com/lookupman/service_discovery/endpoint"
"github.com/lookupman/service_discovery/service"
"github.com/lookupman/service_discovery/transport"
uuid "github.com/satori/go.uuid"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
)

var logger log.Logger

func main() {
serviceHost := "127.0.0.1"
servicePort := 9999
serviceName := "SayHello"


consulHost := "127.0.0.1"
consulPort := 8500


ctx := context.Background()
errChan := make(chan error)



var discoverClient discovery.DiscoverClient
discoverClient, err := discovery.NewKitDiscoverClient(consulHost, consulPort)
if err != nil {
log.Println("Get Consul Client failed")
return
}

svc := service.NewDiscoveryServiceImpl(discoverClient)

sayHelloEndpoint := endpoint.MakeSayHelloEndpoint(svc)
healthCheckEndpoint := endpoint.MakeHealthCheckEndpoint(svc)

endpoints := endpoint.DiscoveryEndpoints{
SayHelloEndpoint: sayHelloEndpoint,
HealthCheckEndpoint: healthCheckEndpoint,
}

router := transport.MakeHttpHandler(ctx, endpoints, config.KitLogger)

instanceId := serviceName + "-" + uuid.NewV4().String()


go func() {
config.Logger.Println("Http Server start at port:" + strconv.Itoa(servicePort))
if !discoverClient.Register(serviceName, instanceId, "/health",
serviceHost, servicePort, nil, config.Logger) {
config.Logger.Printf("string-service for service %s failed.", serviceName)
// 注册失败,服务启动失败
os.Exit(-1)
}


handler := router
errChan <- http.ListenAndServe(":"+strconv.Itoa(servicePort), handler)

}()

go func() {
// 监控系统信号,等待 ctrl + c 系统信号通知服务关闭
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errChan <- fmt.Errorf("%s", <-c)
}()

err = <-errChan

//服务退出取消注册
discoverClient.DeRegister(instanceId, config.Logger)
config.Logger.Println(err)
}


参考文章:
Go语言高并发与微服务实战

  • 本文标题:go-kit中使用consul服务注册与发现、健康检测
  • 本文作者:lookupman
  • 创建时间:2021-01-03 15:50:42
  • 本文链接:https://lookupman.cn/2021/01/03/go-kit中使用consul服务注册与发现、健康检测/
  • 版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!