Golang etcd 服务发现与负载均衡

  1. 注册: 同一服务下的所有节点注册到相同目录下,节点启动后将自己的信息注册到所属服务的目录中。
  2. 健康: 服务节点定时发送心跳,注册到服务目录中的信息设置一个较短的 TTL,运行正常的服务节点每隔一段时间会去更新信息的 TTL。
  3. 发现: 通过名称能查询到服务提供外部访问的 IP 和端口号。比如网关代理服务时能够及时的发现服务中新增节点、丢弃不可用的服务节点,同时各个服务间也能感知对方的存在。

etcd

  • 是由CoreOS开发,用于可靠地存储集群的配置数据的一种持久性,轻量型的,分布式的键-值数据存储组件。该组件可表示在任何给定时间点处的集群的整体状态。其他组件在注意到存储的变化之后,会变成相应的状态。采用raft协议作为一致性算法,etcd基于Go语言实现。

服务注册

  • 下载依赖
go get go.etcd.io/etcd/clientv3
  • 创建注册接口
type Registry interface {
	RegistryNode(node PutNode) error
	UnRegistry()
}
  • 具体代码实现
var prefix = "/registry/server/"
type Registry interface {
	RegistryNode(node PutNode) error
	UnRegistry()
}
type registryServer struct {
	cli        *clientv3.Client
	stop       chan bool
	isRegistry bool
	options    Options
	leaseID    clientv3.LeaseID
}
type PutNode struct {
	Addr string `json:"addr"`
}
type Node struct {
	Id   uint32 `json:"id"`
	Addr string `json:"addr"`
}
type Options struct {
	name   string
	ttl    int64
	config clientv3.Config
}
func NewRegistry(options Options) (Registry, error) {
	cli, err := clientv3.New(options.config)
	if err != nil {
		return nil, err
	}
	return &registryServer{
		stop:       make(chan bool),
		options:    options,
		isRegistry: false,
		cli:        cli,
	}, nil
}

func (s *registryServer) RegistryNode(put PutNode) error {
	if s.isRegistry {
		return errors.New("only one node can be registered")
	}
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(s.options.ttl)*time.Second)
	defer cancel()
	grant, err := s.cli.Grant(context.Background(), s.options.ttl)
	if err != nil {
		return err
	}
	var node = Node{
		Id:   s.HashKey(put.Addr),
		Addr: put.Addr,
	}
	nodeVal, err := s.GetVal(node)
	if err != nil {
		return err
	}
	_, err = s.cli.Put(ctx, s.GetKey(node), nodeVal, clientv3.WithLease(grant.ID))
	if err != nil {
		return err
	}
	s.leaseID = grant.ID
	s.isRegistry = true
	go s.KeepAlive()
	return nil
}
func (s *registryServer) UnRegistry() {
	s.stop <- true
}
func (s *registryServer) Revoke() error {
	_, err := s.cli.Revoke(context.TODO(), s.leaseID)
	if err != nil {
		log.Printf("[Revoke] err : %s", err.Error())
	}
	s.isRegistry=false
	return err
}
func (s *registryServer) KeepAlive() error {
	keepAliveCh, err := s.cli.KeepAlive(context.TODO(), s.leaseID)
	if err != nil {
		log.Printf("[KeepAlive] err : %s", err.Error())
		return err
	}
	for {
		select {
		case <-s.stop:
			_ = s.Revoke()
			return nil
		case _, ok := <-keepAliveCh:
			if !ok {
				_ = s.Revoke()
				return nil
			}
		}
	}
}
func (s *registryServer) GetKey(node Node) string {
	return fmt.Sprintf("%s%s/%d", prefix, s.options.name, s.HashKey(node.Addr))
}
func (s *registryServer) GetVal(node Node) (string, error) {
	data, err := json.Marshal(&node)
	return string(data), err
}
func (e *registryServer) HashKey(addr string) uint32 {
	return crc32.ChecksumIEEE([]byte(addr))
}

服务发现与负载均衡

func init() {
	rand.Seed(time.Now().UnixNano())
}
type Selector interface {
	Next() (Node, error)
}
type selectorServer struct {
	cli     *clientv3.Client
	node    []Node
	options SelectorOptions
}
type SelectorOptions struct {
	name   string
	config clientv3.Config
}
func NewSelector(options SelectorOptions) (Selector, error) {
	cli, err := clientv3.New(options.config)
	if err != nil {
		return nil, err
	}
	var s = &selectorServer{
		options: options,
		cli:     cli,
	}
	go s.Watch()
	return s, nil
}
func (s *selectorServer) Next() (Node, error) {
	if len(s.node) == 0 {
		return Node{}, fmt.Errorf("no node found on the %s", s.options.name)
	}
	i := rand.Int() % len(s.node)
	return s.node[i], nil
}
func (s *selectorServer) Watch() {
	res, err := s.cli.Get(context.TODO(), s.GetKey(), clientv3.WithPrefix(), clientv3.WithSerializable())
	if err != nil {
		log.Printf("[Watch] err : %s", err.Error())
		return
	}
	for _, kv := range res.Kvs {
		node, err := s.GetVal(kv.Value)
		if err != nil {
			log.Printf("[GetVal] err : %s", err.Error())
			continue
		}
		s.node = append(s.node, node)
	}
	ch := s.cli.Watch(context.TODO(), prefix, clientv3.WithPrefix())
	for {
		select {
		case c := <-ch:
			for _, e := range c.Events {
				switch e.Type {
				case clientv3.EventTypePut:
					node, err := s.GetVal(e.Kv.Value)
					if err != nil {
						log.Printf("[EventTypePut] err : %s", err.Error())
						continue
					}
					s.AddNode(node)
				case clientv3.EventTypeDelete:
					keyArray := strings.Split(string(e.Kv.Key), "/")
					if len(keyArray) <= 0 {
						log.Printf("[EventTypeDelete] key Split err : %s", err.Error())
						return
					}
					nodeId, err := strconv.Atoi(keyArray[len(keyArray)-1])
					if err != nil {
						log.Printf("[EventTypePut] key Atoi : %s", err.Error())
						continue
					}
					s.DelNode(uint32(nodeId))
				}
			}
		}
	}
}
func (s *selectorServer) DelNode(id uint32) {
	var node []Node
	for _, v := range s.node {
		if v.Id != id {
			node = append(node, v)
		}
	}
	s.node = node
}
func (s *selectorServer) AddNode(node Node) {
	var exist bool
	for _, v := range s.node {
		if v.Id == node.Id {
			exist = true
		}
	}
	if !exist {
		s.node = append(s.node, node)
	}
}
func (s *selectorServer) GetKey() string {
	return fmt.Sprintf("%s%s", prefix, s.options.name)
}
func (s *selectorServer) GetVal(val []byte) (Node, error) {
	var node Node
	err := json.Unmarshal(val, &node)
	if err != nil {
		return node, err
	}
	return node, nil
}

结语

  • 文中只实现了简单的服务注册,发现,负载均衡更加高级的用法欢迎添加QQ一起讨论
  • 有任何疑问请联系我QQ

完整代码地址

联系 QQ: 3355168235