1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| package discovery
import ( "github.com/go-kit/kit/sd/consul" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api/watch" "log" "strconv" "sync" )
type KitDiscoverClient struct { consulHost string consulPort int client consul.Client config *api.Config mutex sync.Mutex instanceMap sync.Map }
func NewKitDiscoverClient(consulHost string, consulPort int) (DiscoverClient, error) { consulConfig := api.DefaultConfig() consulConfig.Address = consulHost + ":" + strconv.Itoa(consulPort) apiClient, err := api.NewClient(consulConfig) if err != nil { return nil, err } client := consul.NewClient(apiClient) return &KitDiscoverClient{ consulHost: consulHost, consulPort: consulPort, client: client, config: consulConfig, }, nil
}
func (consulClient *KitDiscoverClient) Register(serviceName, instanceId, healthCheckUrl string, instanceHost string, instancePort int, meta map[string]string, logger *log.Logger) bool { serviceRegistration := &api.AgentServiceRegistration{ ID: instanceId, Name: serviceName, Address: instanceHost, Port: instancePort, Meta: meta, Check: &api.AgentServiceCheck{ DeregisterCriticalServiceAfter: "30s", HTTP: "http://" + instanceHost + ":" + strconv.Itoa(instancePort) + healthCheckUrl, Interval: "15s", }, }
err := consulClient.client.Register(serviceRegistration) if err != nil { log.Println("Register Service Error!") return false } log.Println("Register Service Success!") return true }
func (consulClient *KitDiscoverClient) DeRegister(instanceId string, logger *log.Logger) bool { serviceRegistration := &api.AgentServiceRegistration{ ID: instanceId, }
err := consulClient.client.Deregister(serviceRegistration) if err != nil { logger.Println("Deregister Service Error!") return false }
log.Println("Deregister Service Success!") return true }
func (consulClient *KitDiscoverClient) DiscoveryServices(serviceName string, logger *log.Logger) []interface{} { instanceList, ok := consulClient.instanceMap.Load(serviceName) if ok { return instanceList.([]interface{}) }
consulClient.mutex.Lock() defer consulClient.mutex.Unlock() instanceList, ok = consulClient.instanceMap.Load(serviceName) if ok { return instanceList.([]interface{}) }
go func() { params := make(map[string]interface{}) params["type"] = "service" params["service"] = serviceName plan, _ := watch.Parse(params) plan.Handler = func(u uint64, i interface{}) { if i == nil { return }
v, ok := i.([]*api.ServiceEntry) if !ok { return }
if len(v) == 0 { consulClient.instanceMap.Store(serviceName, []interface{}{}) }
var healthServices []interface{} for _, service := range v { if service.Checks.AggregatedStatus() == api.HealthPassing { healthServices = append(healthServices, service) } } consulClient.instanceMap.Store(serviceName, healthServices) } defer plan.Stop() plan.Run(consulClient.config.Address) }()
entries, _, err := consulClient.client.Service(serviceName, "", false, nil) if err != nil { consulClient.instanceMap.Store(serviceName, []interface{}{}) logger.Println("Discover Service Error") return nil }
instances := make([]interface{}, 0, len(entries)) for _, instance := range entries { instances = append(instances, instance) }
consulClient.instanceMap.Store(serviceName, instances) return instances }
|