Golang etcd 服务发现与负载均衡
- 注册: 同一服务下的所有节点注册到相同目录下,节点启动后将自己的信息注册到所属服务的目录中。
- 健康: 服务节点定时发送心跳,注册到服务目录中的信息设置一个较短的 TTL,运行正常的服务节点每隔一段时间会去更新信息的 TTL。
- 发现: 通过名称能查询到服务提供外部访问的 IP 和端口号。比如网关代理服务时能够及时的发现服务中新增节点、丢弃不可用的服务节点,同时各个服务间也能感知对方的存在。
- 是由CoreOS开发,用于可靠地存储集群的配置数据的一种持久性,轻量型的,分布式的键-值数据存储组件。该组件可表示在任何给定时间点处的集群的整体状态。其他组件在注意到存储的变化之后,会变成相应的状态。采用raft协议作为一致性算法,etcd基于Go语言实现。
服务注册
go get go.etcd.io/etcd/clientv3
type Registry interface {
RegistryNode(node PutNode) error
UnRegistry()
}
var prefix = "/registry/server/"
type Registry interface {
RegistryNode(node PutNode) error
UnRegistry()
}
type registryServer struct {
cli *clientv3.Client
stop chan bool
isRegistry bool
options Options
leaseID clientv3.LeaseID
}
type PutNode struct {
Addr string `json:"addr"`
}
type Node struct {
Id uint32 `json:"id"`
Addr string `json:"addr"`
}
type Options struct {
name string
ttl int64
config clientv3.Config
}
func NewRegistry(options Options) (Registry, error) {
cli, err := clientv3.New(options.config)
if err != nil {
return nil, err
}
return ®istryServer{
stop: make(chan bool),
options: options,
isRegistry: false,
cli: cli,
}, nil
}
func (s *registryServer) RegistryNode(put PutNode) error {
if s.isRegistry {
return errors.New("only one node can be registered")
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.options.ttl)*time.Second)
defer cancel()
grant, err := s.cli.Grant(context.Background(), s.options.ttl)
if err != nil {
return err
}
var node = Node{
Id: s.HashKey(put.Addr),
Addr: put.Addr,
}
nodeVal, err := s.GetVal(node)
if err != nil {
return err
}
_, err = s.cli.Put(ctx, s.GetKey(node), nodeVal, clientv3.WithLease(grant.ID))
if err != nil {
return err
}
s.leaseID = grant.ID
s.isRegistry = true
go s.KeepAlive()
return nil
}
func (s *registryServer) UnRegistry() {
s.stop <- true
}
func (s *registryServer) Revoke() error {
_, err := s.cli.Revoke(context.TODO(), s.leaseID)
if err != nil {
log.Printf("[Revoke] err : %s", err.Error())
}
s.isRegistry=false
return err
}
func (s *registryServer) KeepAlive() error {
keepAliveCh, err := s.cli.KeepAlive(context.TODO(), s.leaseID)
if err != nil {
log.Printf("[KeepAlive] err : %s", err.Error())
return err
}
for {
select {
case <-s.stop:
_ = s.Revoke()
return nil
case _, ok := <-keepAliveCh:
if !ok {
_ = s.Revoke()
return nil
}
}
}
}
func (s *registryServer) GetKey(node Node) string {
return fmt.Sprintf("%s%s/%d", prefix, s.options.name, s.HashKey(node.Addr))
}
func (s *registryServer) GetVal(node Node) (string, error) {
data, err := json.Marshal(&node)
return string(data), err
}
func (e *registryServer) HashKey(addr string) uint32 {
return crc32.ChecksumIEEE([]byte(addr))
}
服务发现与负载均衡
func init() {
rand.Seed(time.Now().UnixNano())
}
type Selector interface {
Next() (Node, error)
}
type selectorServer struct {
cli *clientv3.Client
node []Node
options SelectorOptions
}
type SelectorOptions struct {
name string
config clientv3.Config
}
func NewSelector(options SelectorOptions) (Selector, error) {
cli, err := clientv3.New(options.config)
if err != nil {
return nil, err
}
var s = &selectorServer{
options: options,
cli: cli,
}
go s.Watch()
return s, nil
}
func (s *selectorServer) Next() (Node, error) {
if len(s.node) == 0 {
return Node{}, fmt.Errorf("no node found on the %s", s.options.name)
}
i := rand.Int() % len(s.node)
return s.node[i], nil
}
func (s *selectorServer) Watch() {
res, err := s.cli.Get(context.TODO(), s.GetKey(), clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
log.Printf("[Watch] err : %s", err.Error())
return
}
for _, kv := range res.Kvs {
node, err := s.GetVal(kv.Value)
if err != nil {
log.Printf("[GetVal] err : %s", err.Error())
continue
}
s.node = append(s.node, node)
}
ch := s.cli.Watch(context.TODO(), prefix, clientv3.WithPrefix())
for {
select {
case c := <-ch:
for _, e := range c.Events {
switch e.Type {
case clientv3.EventTypePut:
node, err := s.GetVal(e.Kv.Value)
if err != nil {
log.Printf("[EventTypePut] err : %s", err.Error())
continue
}
s.AddNode(node)
case clientv3.EventTypeDelete:
keyArray := strings.Split(string(e.Kv.Key), "/")
if len(keyArray) <= 0 {
log.Printf("[EventTypeDelete] key Split err : %s", err.Error())
return
}
nodeId, err := strconv.Atoi(keyArray[len(keyArray)-1])
if err != nil {
log.Printf("[EventTypePut] key Atoi : %s", err.Error())
continue
}
s.DelNode(uint32(nodeId))
}
}
}
}
}
func (s *selectorServer) DelNode(id uint32) {
var node []Node
for _, v := range s.node {
if v.Id != id {
node = append(node, v)
}
}
s.node = node
}
func (s *selectorServer) AddNode(node Node) {
var exist bool
for _, v := range s.node {
if v.Id == node.Id {
exist = true
}
}
if !exist {
s.node = append(s.node, node)
}
}
func (s *selectorServer) GetKey() string {
return fmt.Sprintf("%s%s", prefix, s.options.name)
}
func (s *selectorServer) GetVal(val []byte) (Node, error) {
var node Node
err := json.Unmarshal(val, &node)
if err != nil {
return node, err
}
return node, nil
}
结语
- 文中只实现了简单的服务注册,发现,负载均衡更加高级的用法欢迎添加QQ一起讨论
- 有任何疑问请联系我QQ
联系 QQ: 3355168235