1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| package common
import ( "context" "errors" "github.com/go-kit/kit/sd" "log" "strconv" "strings" "sync" "time"
kitlog "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd/etcdv3" )
type EtcdV3DiscoverClient struct { client etcdv3.Client registrar *etcdv3.Registrar
instanceMutex sync.Mutex instances map[string]*etcdv3.Instancer
serviceMutex sync.Mutex serviceUrls map[string][]string }
func NewEctdV3DiscoverClient(ectdHost string, ectdPort int) *EtcdV3DiscoverClient { ctx := context.Background()
clientOption := etcdv3.ClientOptions{ DialTimeout: 3 * time.Second, DialKeepAlive: 3 * time.Second, }
addr := ectdHost + ":" + strconv.Itoa(ectdPort) client, err := etcdv3.NewClient(ctx, []string{addr}, clientOption) if err != nil { log.Println(err) return nil }
return &EtcdV3DiscoverClient{ client: client, instances: make(map[string]*etcdv3.Instancer), serviceUrls: make(map[string][]string), } }
func (cli *EtcdV3DiscoverClient) Register(serviceName string, serviceHost string, servicePort int) { addr := serviceHost + ":" + strconv.Itoa(servicePort) key := strings.TrimRight(serviceName, "/") + "/" + addr value := "http://" + addr
cli.registrar = etcdv3.NewRegistrar(cli.client, etcdv3.Service{Key: key, Value: value}, kitlog.NewNopLogger()) cli.registrar.Register() }
func (cli *EtcdV3DiscoverClient) DeRegister() { cli.registrar.Deregister() }
func (cli *EtcdV3DiscoverClient) DiscoveryServices(serviceName string) ([]string, error) { var err error
prefix := strings.TrimRight(serviceName, "/") + "/"
cli.instanceMutex.Lock() instancer, ok := cli.instances[serviceName] cli.instanceMutex.Unlock() if !ok { instancer, err = etcdv3.NewInstancer(cli.client, prefix, kitlog.NewNopLogger()) if err != nil { return nil, err }
cli.serviceMutex.Lock() cli.instances[serviceName] = instancer cli.serviceMutex.Unlock()
cli.watch(instancer, serviceName)
return cli.client.GetEntries(prefix) }
cli.serviceMutex.Lock() serviceUrl, ok := cli.serviceUrls[serviceName] cli.serviceMutex.Unlock() if !ok { return nil, errors.New("no service url") } return serviceUrl, nil }
func (cli *EtcdV3DiscoverClient) watch(instancer *etcdv3.Instancer , serviceName string) { ch := make(chan sd.Event)
go func(){ instancer.Register(ch) }()
go func() { for event := range ch { if event.Err != nil {
}
cli.serviceMutex.Lock() cli.serviceUrls[serviceName] = event.Instances log.Println(time.Now()) cli.serviceMutex.Unlock() } instancer.Deregister(ch) cli.instanceMutex.Lock() delete(cli.instances, serviceName) cli.instanceMutex.Unlock() }() }
|