微服务之服务注册和服务发现篇

有了服务注册和发现机制,微服务之务消费者不需要知道具体服务提供者的服务真实物理地址就可以进行调用,也无须知道具体有多少个服务者可用;而服务提供者只需要注册到注册中心,注册就可以对外提供服务,和服在对外服务时不需要知道具体是现篇哪些服务调用了自己。

RPC 配置Etcd:

Hosts:

- 127.0.0.1:2379

Key: user.rpc

这里分析go-zero 的微服务之务etcd 部分源码, 源码引用https://github.com/zeromicro/go-zero-demo/tree/master/mall

被调方-服务注册mall/user/rpc/user.go 源码如下package main

import (

"flag"

"fmt"

"go-zero-demo-rpc/mall/user/rpc/internal/config"

"go-zero-demo-rpc/mall/user/rpc/internal/server"

"go-zero-demo-rpc/mall/user/rpc/internal/svc"

"go-zero-demo-rpc/mall/user/rpc/types/user"

"github.com/zeromicro/go-zero/core/conf"

"github.com/zeromicro/go-zero/core/service"

"github.com/zeromicro/go-zero/zrpc"

"google.golang.org/grpc"

"google.golang.org/grpc/reflection"

)

var configFile = flag.String("f", "etc/user.yaml", "the config file")

func main() {

flag.Parse()

var c config.Config

conf.MustLoad(*configFile, &c)

ctx := svc.NewServiceContext(c)

svr := server.NewUserServer(ctx)

s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {

user.RegisterUserServer(grpcServer, svr)

if c.Mode == service.DevMode || c.Mode == service.TestMode {

reflection.Register(grpcServer)

}

})

defer s.Stop()

fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)

s.Start()

}MustNewServer 内部实现调用了NewServer 方法, 这里我们关注NewServer 通过internal.NewRpcPubServer 方法实例化了internal.Serverif c.HasEtcd() {

server, err = internal.NewRpcPubServer(c.Etcd, c.ListenOn, serverOptions...)

if err != nil {

return nil, err

}

}internal.NewRpcPubServer 中的registerEtcd 会调用Publisher.KeepAlive 方法// KeepAlive keeps key:value alive.

func (p *Publisher) KeepAlive() error {

// 这里获取 etcd 的连接

cli, err := internal.GetRegistry().GetConn(p.endpoints)

if err != nil {

return err

}

p.lease, err = p.register(cli)

if err != nil {

return err

}

proc.AddWrapUpListener(func() {

p.Stop()

})

return p.keepAliveAsync(cli)

}p.register 这里把自己注册到服务中func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {

// 这里新建一个租约

resp, err := client.Grant(client.Ctx(), TimeToLive)

if err != nil {

return clientv3.NoLease, err

}

// 得到租约的 ID

lease := resp.ID

// 这里拼接出实际存储的 key

if p.id > 0 {

p.fullKey = makeEtcdKey(p.key, p.id)

} else {

p.fullKey = makeEtcdKey(p.key, int64(lease))

}

// p.value 是前面的 figureOutListenOn 方法获取到自己的地址

_, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))

return lease, err

}注册完之后,keepAliveAsync 开了一个协程保活这个服务当这个服务意外宕机时, 就不会再向etcd 保活,etcd 就会删除这个key注册好的服务如图

1.png

调用方-服务发现order/api/order.go 源码如下package main

import (

"flag"

"fmt"

"go-zero-demo-rpc/order/api/internal/config"

"go-zero-demo-rpc/order/api/internal/handler"

"go-zero-demo-rpc/order/api/internal/svc"

"github.com/zeromicro/go-zero/core/conf"

"github.com/zeromicro/go-zero/rest"

)

var configFile = flag.String("f", "etc/order.yaml", "the config file")

func main() {

flag.Parse()

var c config.Config

conf.MustLoad(*configFile, &c)

server := rest.MustNewServer(c.RestConf)

defer server.Stop()

ctx := svc.NewServiceContext(c)

handler.RegisterHandlers(server, ctx)

fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)

server.Start()

}在svc.NewServiceContext 方法内部又调用了zrpc.MustNewClient ,zrpc.MustNewClient 主要实现在zrpc.NewClientfunc NewServiceContext(c config.Config) *ServiceContext {

return &ServiceContext{

Config: c,

UserRpc: user.NewUser(zrpc.MustNewClient(c.UserRpc)),

}

}最后实际调用了internal.NewClient 去实例化rpc clientfunc NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {

var opts []ClientOption

if c.HasCredential() {

opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{

App: c.App,

Token: c.Token,

})))

}

if c.NonBlock {

opts = append(opts, WithNonBlock())

}

if c.Timeout > 0 {

opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))

}

opts = append(opts, options...)

target, err := c.BuildTarget()

if err != nil {

return nil, err

}

client, err := internal.NewClient(target, opts...)

if err != nil {

return nil, err

}

return &RpcClient{

client: client,

}, nil

}在zrpc/internal/client.go 文件里, 包含一个init 方法, 这里就是实际发现服务的地方, 在这里注册服务发现者func init() {

resolver.Register()

}resolver.Register 方法实现package resolver

import (

"github.com/zeromicro/go-zero/zrpc/resolver/internal"

)

// Register registers schemes defined zrpc.

// Keep it in a separated package to let third party register manually.

func Register() {

internal.RegisterResolver()

}最后又回到interval 包的internal.RegisterResolver 方法, 这里我们关注etcdResolverBuilderfunc RegisterResolver() {

resolver.Register(&directResolverBuilder)

resolver.Register(&discovResolverBuilder)

resolver.Register(&etcdResolverBuilder)

resolver.Register(&k8sResolverBuilder)

}etcdBuilder 的内嵌了discovBuilder 结构体,Build 方法调用过程:实例化服务端:internal.NewClient ->client.dial ->grpc.DialContext由于etcd 是resolver.BuildDiscovTarget 生成的taget 所以是类似这样子的:discov://127.0.0.1:2379/user.rpc解析服务发现:ClientConn.parseTargetAndFindResolver ->grpc.parseTarget ->ClientConn.getResolver然后在grpc.newCCResolverWrapper 调用resolver.Builder.Build 方法去发现服务我们着重关注discovBuilder.Build 方法type etcdBuilder struct {

discovBuilder

}

type discovBuilder struct{ }

func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (

resolver.Resolver, error) {

hosts := strings.FieldsFunc(targets.GetAuthority(target), func(r rune) bool {

return r == EndpointSepChar

})

sub, err := discov.NewSubscriber(hosts, targets.GetEndpoints(target))

if err != nil {

return nil, err

}

update := func() {

var addrs []resolver.Address

for _, val := range subset(sub.Values(), subsetSize) {

addrs = append(addrs, resolver.Address{

Addr: val,

})

}

if err := cc.UpdateState(resolver.State{

Addresses: addrs,

}); err != nil {

logx.Error(err)

}

}

sub.AddListener(update)

update()

return &nopResolver{ cc: cc}, nil

}

func (b *discovBuilder) Scheme() string {

return DiscovScheme

}discov.NewSubscriber 方法调用internal.GetRegistry().Monitor 最后调用Registry.monitor 方法进行监视

cluster.getClient 拿到etcd 连接

cluster.load 作为第一次载入数据

cluster.watch 去watch 监听etcd 前缀key 的改动

func (c *cluster) monitor(key string, l UpdateListener) error {

c.lock.Lock()

c.listeners[key] = append(c.listeners[key], l)

c.lock.Unlock()

cli, err := c.getClient()

if err != nil {

return err

}

c.load(cli, key)

c.watchGroup.Run(func() {

c.watch(cli, key)

})

return nil

}如下图是云服务器cluster.load 的实现, 就是根据前缀拿到user.prc 服务注册的所有地址

2.png

Q

为什么不用Redis 做注册中心(反正只是把被调方的地址存储, 过期Redis 也能胜任), 找了很久找到这个说法

简单从以下几个方面说一下瑞迪斯为啥在微服务中不能取代 etcd:

1、redis 没有版本的服务概念,历史版本数据在大规模微服务中非常有必要,注册对于状态回滚和故障排查,和服甚至定锅都很重要

2、现篇redis 的微服务之务注册和发现目前只能通过 pub 和 sub 来实现,这两个命令完全不能满足生产环境的服务要求,具体原因可以 gg 或看源码实现

3、注册etcd 在 2.+版本时,和服watch 到数据官方文档均建议再 get 一次,现篇因为会存在数据延迟,3.+版本不再需要,可想 redis 的 pub 和 sub 能否达到此种低延迟的要求

4、楼主看到的微服务架构应该都是b2b供应网将 etcd 直接暴露给 client 和 server 的,etcd 的性能摆在那,能够承受多少的 c/s 直连呢,更好的做法应该是对 etcd 做一层保护,当然这种做法会损失一些功能

5、redis 和 etcd 的集群实现方案是不一致的,etcd 采用的是 raft 协议,一主多从,只能写主,底层采用 boltdb 作为 k/v 存储,直接落盘

6、redis 的持久化方案有 aof 和 rdb,这两种方案在宕机的时候都或多或少的免费信息发布网会丢失数据

引用自https://www.v2ex.com/t/520367

滇ICP备2023000592号-31