一起玩Dubbo,万字长文揭秘服务暴露
日常写组件,起玩最近又接了一个需求,万文揭务暴让我负责实现一个rpc组件,字长提高公司游戏跨服开发的秘服效率,为了写好这个组件,起玩算是万文揭务暴将dubbo里里外外研究了一波,目前组件的字长实现也接近尾声了,因此打算给dubbo的秘服学习做个总结,并穿插说说rpc实现的起玩心路历程,同样需要实现rpc的万文揭务暴朋友,或者对dubbo有兴趣的字长朋友可以关注这个系列。
在写rpc组件之前,秘服我先提了几个灵魂疑问,起玩并从dubbo中找到了答案。万文揭务暴
服务是字长啥?一个模块,一种玩法,只要是需要进行远程调度的都可以用服务的概念进行包装,我这边简单包装了一个副本服务,类情况如下:
平平无奇,等等,我们来看看提供方如何标记服务
到了这一步,服务已经完成了基本定义。
服务最终被注册到了那里?在xml配置上,香港云服务器我们已经看到了有注册中心的配置
没错,最后提供方定义好的服务会注册到注册中心,目前支持的类型有多种
具体可以查看里边提供的demo实例,那么注册中心有什么作用呢?
简单点描述就是注册中心就是管理服务的地方,提供方将服务放到了这个管理处,而订阅方要用的话则从这个管理处将服务拿过来用,通过注册中心实现了服务的感知。
服务谁来消费?消费方来使用,我们可以看到
同样也是平平无奇的代码,就是消费方拿到boss接口后,直接调用对应接口即可。
对应提供方有xml去定义服务的注册,同样消费方也是有xml去定义服务的订阅信息,可以看到
简单来说就是,提供方将服务放到注册中心,订阅方从注册中心拿来用。
接下来会涉及到源码部分,网站模板以下源码的示例接来自dubbo2.6x,源码方面的注释都已经提交到github上,有需要的可以clone:
https://github.com/wiatingpub/dubbo/tree/2.6.x
什么时候触发的服务暴露
在设计rpc组件的时候,不得不面对这个问题,本着抄dubbo的想法,研究了下dubbo的实现方案
dubbo采用了比较经典的xml配置,并理所当然的使用了NamespaceHandlerSupport将xml中的节点配置映射成了对应对象
可以看到在dubbo-config-spring包底下有个spring.handlers的配置,通过该配置指定了DubboNamespaceHandler
DubboNamespaceHandler会将xml配置对应标签的配置映射成对象,比如service
看看ServiceBean在映射成对象后做了啥,先看看ServiceBean结构
自身是一个监听器,再通过CTRL+F12看看有哪些方法
看到export暴露这个方法后,ALT+F7反调下发现除了注解Annoatition外有两个地方调用,分别是
第一种是在属性被设置后调用,可以看到如果是延迟函数则不会调用。
第二种是看到isDelay的时候才会调用export,站群服务器也就是说延迟暴露的服务是在监听到ContextRefreshedEvent事件后进行调用的。
在export方法内可以看到
可以针对不同的服务配置配置delay延迟时间,具体的肯定是在xml上配置了。
触发机制到这里基本就结束了,总结一下dubbo的触发机制就是建立在NamespaceHandlerSupport上,将xml中的标签实例化,并通过在afterPropertiesSet或者在监听到Spring容器抛出的容器刷新事件后,触发服务的暴露。
画个流程图总结下
由于我司这边的服务配置最终落地在使用yaml方案上,不引入xml,最终我并没有使用NamespaceHandlerSupport去实例化,而是模仿dubbo3.0的方案包装了一个ServiceBootstrap对象,依赖SmartLifeCycle的生命周期,在start的时候取到yaml的配置,遍历进行服务暴露。dubbo3.0做了比较大调整,后续会专门讲,有兴趣的持续关注该系列。
提一波URL
在说服务暴露之前必须先提一波URL,否则主线没了,后续不好讲。
在我没有接触到dubbo之前,我对URL的定位是指网络地址,而在dubbo中,可以认为是一种约定,几乎dubbo的所有模块都是通过URL来传参,这有什么好处呢?
我们可以想想,如果没有约定好,那么不同的接口之间进行交互的参数便会乱掉,一会是字符串,一会是map,而有了统一的约定后,代码便会更加的规范和统一,我们在看代码的时候也会比较清晰,也容易拓展,比如如果你想拓展什么东西,直接往URL上拼接参数就可以了。
我们可以看到,除了几个基础的参数外,很多参数其实最终都放到了parameters中。
而在我司项目中,我们参考了URL的设计,构建了元数据的结构,也就是map,将服务的部分动态参数通过map进行传递。
服务暴露过程
在深入源码之前先大概总结下服务暴露的几个步骤,分别是:
配置的构建、合并、检查。 URL的组装。 服务的暴露、注册。我将这三个主要的过程放入流程图内
继续跟进服务暴露的具体逻辑,也就是doExport后
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("Already unexported!"); } if (exported) { return; } exported = true; if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!"); } // TODO: 2021/5/27 检查provider是否为空,为空则创建一个,并通过系统变量为其初始化 checkDefault(); /** 各种初始值的设置 **/ // TODO: 2021/5/27 检查Application是否为空 checkApplication(); // TODO: 2021/5/27 检查注册中心是否为空 checkRegistry(); // TODO: 2021/5/27 检查protocols是否为空 checkProtocol(); // TODO: 2021/5/27 补充各种参数 appendProperties(this); // TODO: 2021/5/27 Stub合法性检查 checkStub(interfaceClass); // TODO: 2021/5/27 mock合法性检查 checkMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } // TODO: 2021/5/27 多协议多注册中心暴露服务 doExportUrls(); ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref); ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); }总结下来不外乎两步:
对各类配置进行校验,并且更新部分配置; 多协议多注册中心暴露服务;其中检查的细节暂时不铺开,因为服务暴露整个过程才是重点,后续服务治理了再重新讲这块,接下来继续讲重点doExportUrls方法
@SuppressWarnings({ "unchecked", "rawtypes"}) private void doExportUrls() { // TODO: 2021/5/27 加载注册中心URL List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { // TODO: 2021/5/27 根据不同协议进行服务暴露 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }loadRegistries也很简单,其实就是根据注册中心的配置组装成URL,这里多个注册中心比较好理解,多个protocols是什么鬼呢?
其实是这样的,一个服务如果有多个协议那么就都需要暴露,比如同时支持 dubbo 协议和 hessian 协议,那么需要将这个服务用两种协议分别向多个注册中心暴露注册。
参考了这块逻辑,在我司项目中,我们规范了注册中心的接口,允许注册中心有多种实现, 甚至是本地注册中心,但是并不允许有多个注册中心,目前来说是没有这种需求,而要选择哪个注册中心,只需要在yaml文件上进行配置即可
接下来看doExportUrlsFor1Protocol方法
在分析服务暴露流程之前便有提到过,dubbo内部使用URL来携带各类数据,从而贯穿整个生命周期的,而入口其实就是从这个方法开始的,等下我们便可以看到该方法可以分为两个步骤,前个步骤是组装URL的逻辑,后个步骤是真正实现暴露dubbo服务等逻辑的地方,不说了,继续code
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { /**组装服务的URL开始**/ // TODO: 2021/5/27 获取协议名 String name = protocolConfig.getName(); // TODO: 2021/5/27 如果为空,则默认是dubbo if (name == null || name.length() == 0) { name = "dubbo"; } // TODO: 2021/5/27 设置map等各种参数 Map<String, String> map = new HashMap<String, String>(); map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // TODO: 2021/5/27 添加application、module、provider等信息到map中 appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); // TODO: 2021/5/27 如果methods的配置列表不为空,则遍历methods配置列表 if (methods != null && !methods.isEmpty()) { for (MethodConfig method : methods) { // TODO: 2021/5/27 把方法名加入map appendParameters(map, method, method.getName()); // TODO: 2021/5/27 添加methodconfig对象的字段信息到map中 String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } // TODO: 2021/5/27 添加ArgumentConfig列表 List<ArgumentConfig> arguments = method.getArguments(); if (arguments != null && !arguments.isEmpty()) { for (ArgumentConfig argument : arguments) { // convert argument type if (argument.getType() != null && argument.getType().length() > 0) { // TODO: 2021/5/27 利用反射拿到接口类的所有方法 Method[] methods = interfaceClass.getMethods(); if (methods != null && methods.length > 0) { // TODO: 2021/5/27 遍历methods for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); // TODO: 2021/5/27 找到目标方法 if (methodName.equals(method.getName())) { // TODO: 2021/5/27 通过反射拿到方法参数类型 Class<?>[] argtypes = methods[i].getParameterTypes(); // TODO: 2021/5/27 如果下表为-1 if (argument.getIndex() != -1) { // TODO: 2021/5/27 检测argtypes的名称与ArgumentConfig中的type是否一致 if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { // TODO: 2021/5/27 不一致则抛出异常 throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } else { // TODO: 2021/5/27 遍历参数,查找argument.type的类型 for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; // TODO: 2021/5/27 如果找得到则将ArgumentConfig字段添加map中 if (argclazz.getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); } } } } } } } } else if (argument.getIndex() != -1) { // TODO: 2021/5/27 用户未配置type属性,但配置了index属性,且index != -1,则直接添加到map中 appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index=0 .../> or <dubbo:argument type=xxx .../>"); } } } } // end of methods for } // TODO: 2021/5/27 如果是泛化调用,则在map中设置generic和methods if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { // TODO: 2021/5/27 获得版本号 String revision = Version.getVersion(interfaceClass, version); // TODO: 2021/5/27 放入map中 if (revision != null && revision.length() > 0) { map.put("revision", revision); } // TODO: 2021/5/27 获得方法集合 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); // TODO: 2021/5/27 设置方法为* map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { // TODO: 2021/5/27 否则加入方法集合中 map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } // TODO: 2021/5/27 将token加入map if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } } if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } // TODO: 2021/5/27 获得地址、端口号 String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); // TODO: 2021/5/27 组装生成URL URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); /**组装服务的URL结束**/ /* * 后续讲解服务暴露 */ }这个方法实在是又臭又长,我特意分成两部分,目前这部分是组装服务的URL部分,其实简单点说就是:
先将provider、applicaiton、module等各种基础配置直接放入map中,再针对method配置等进行校验,查看该配置是否有配置方法存在,并进行方法签名的校验,如果是才放入map中,然后还额外将一些多余数据,比如泛化调用、版本号等加入map中,最终根据host和port,结合map组装成URL,貌似还是有点长。
总归就是结合服务自身的各种配置放入map中,然后根据host和port以及map等生成URL就是了。
接下来看看后续服务暴露部分
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { /* * 前面URL组装 */ // TODO: 2021/5/27 加载ConfiguratorFactory,并生成Configurator实例,判断是否有该协议的实现存在 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { // TODO: 2021/5/27 通过SPI机制配置URL url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(Constants.SCOPE_KEY); // TODO: 2021/5/27 如果scope为none,则什么都不做 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { // TODO: 2021/5/27 如果scope不是远程,则暴露到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { /** 本地服务暴露 **/ exportLocal(url); } // TODO: 2021/5/27 如果不是local,则暴露到远程 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && !registryURLs.isEmpty()) { // TODO: 2021/5/27 遍历注册中心 for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); // TODO: 2021/5/27 加载监视器连接 URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { // TODO: 2021/5/27 如果没有则添加一个 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } // TODO: 2021/5/27 根据URL拿到代理方式 String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { // TODO: 2021/5/27 给注册中心的URL添加代理方式 registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } // TODO: 2021/5/24 通过SPI机制拿到对应的proxyFactory /** 根据proxyFactory拿到Invoker **/ Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // TODO: 2021/5/24 通过SPI机制拿到对应的protocol,先是RegistryProtocol,再被AOP强化 /** 服务暴露 **/ Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { // TODO: 2021/5/24 通过SPI机制拿到对应的proxyFactory /** 根据proxyFactory拿到Invoker **/ Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // TODO: 2021/5/24 通过SPI机制拿到对应的protocol /** 服务暴露 **/ Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } } this.urls.add(url); }后续重要的地方可以认为其实就是遍历注册中心进行服务暴露,只是会根据服务配置域scope来针对性做一些暴露处理,比如如果scope不是远程,则暴露到本地,如果不是local,则暴露到远程。
该方法中又包含了几个核心的拓展实现,包括:
本地服务暴露 根据proxyFactory拿到Invoker 远程服务暴露、注册继续补充流程图,整理思路
首先第1点,看看本地服务暴露逻辑
private void exportLocal(URL url) { if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(LOCALHOST) .setPort(0); StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref)); // TODO: 2021/5/27 根据SPI拿到了InjvmProtocol调用了export方 Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); // 放入集合中缓存 exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); } } @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }暴露到本地的大致逻辑其实就是根据SPI机制拿到了InjvmProtocol生成了InjvmExporter,之后放入集合缓存中,至于SPI机制,后续需要开个文章专门讲讲,有兴趣持续关注该系列。
为啥要有本地服务暴露?大致原因应该是因为可能存在同一个 JVM 内部引用自身服务的情况,因此暴露的本地服务在内部调用的时候可以直接消费同一个 JVM 的服务避免了网络间的通信。
继续看第2点,根据proxyFactory拿到Invoker部分,首先我们看ProxyFactory类名就大概可以猜到该类具备生成代理对象的能力,我们看proxyFactory的生成模式
可以看到,该对象也是通过SPI机制生成的,由于SPI机制也是比较庞大的,为了避免混淆,后续再开篇文章讲解,有兴趣的持续关注。
通过SPI机制拿到了ProxyFactory的实现对象JavassisProxyFactory,最终调用的代码
@Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO: 2021/5/23 为目标类创建Wrapper final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf($) < 0 ? proxy.getClass() : type); // TODO: 2021/5/23 创建匿名的Invoker对象,并实现doInvoker方法 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // TODO: 2021/5/23 调用Wrapper的invokeMethod方法,invokeMethod最终会调用目标方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }该方法就是创建了一个匿名的Inovker对象,在doInvker方法中调用wrapper.invokeMethod方法,invokeMethod最终会调用目标方法。
那么wrapper又是啥?Wrapper是一个抽象类,在调用Wrapper.getWrapper创建子类的时候,会根据目标Class对象进行解析,拿到各种方法、类成员变量等信息,以及生成invokeMethod方法等代码,在代码生成完毕后,通过Javassist生成Class对象,可以理解为该Class对象就是BossServiceImpl的代理实例,有兴趣了解生成过程的可以看Wrapper.makeWrapper方法。
为啥一定要封装Invoker?其实就是为了屏蔽本地调用或者远程调用或者集群调用的细节,统一暴露出一个可执行体,方便调用者调用,而不管怎么封装,其实最终都是调向目标方法。
为啥要封装Exporter?这个涉及到后续服务被具体调用,后面会开一篇文章专门讲这个,有兴趣的可以持续关注。
在我司的rpc框架中,倒是没有使用Javassist去生成代理对象,而是选择了使用jdk提供的Proxy生成机制。
继续补充流程图,整理思路
接下来说说远程服务暴露
远程服务暴露要比本地复杂的多,在doExportUrlsFor1Protocol后半部分,通过proxyFactory生成Inovker后,就需要调用protocol.export做真的服务暴露了,我们可以看到protocol是如何实例化的
又是通过SPI实例化的,通过断点可以看到会先被AOP切面拦截额外做了一些其他的操作,不过最终走向的RegisterProtocol,AOP这块后续再分析,有兴趣的持续关注。
接下来继续看RegisterProtocol.export做了啥
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // TODO: 2021/5/29 服务暴露 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // TODO: 2021/5/23 获得注册中心的URL URL registryUrl = getRegistryUrl(originInvoker); final Registry registry = getRegistry(originInvoker); // TODO: 2021/5/23 获得已经注册的服务提供者URL final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); boolean register = registeredProviderUrl.getParameter("register", true); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); if (register) { // TODO: 2021/5/29 真正做服务注册的地方 register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // TODO: 2021/5/23 获取override订阅URL final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); // TODO: 2021/5/23 创建override的监听器 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); // TODO: 2021/5/23 缓存监听器到集合中 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // TODO: 2021/5/23 向注册中心订阅override数据 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // TODO: 2021/5/23 创建并返回DestroyableExporter return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }从代码上看,该方法其实做了两件事情,分别是服务暴露和注册:
执行了doLocalExport进行服务暴露 加载注册中心实现类,向注册中心注册服务 向注册中心订阅override数据 创建并返回DestroyableExporter接下来继续看看doLocalExport做了啥
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { // TODO: 2021/5/24 创建Invoker为委托对象 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // TODO: 2021/5/24 调用protocol的export方法暴露服务 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }看逻辑比较简单,主要是根据不同协议配置,根据SPI调用不同的protocol实现,跟暴露到本地时实现的InjvmPortocol一样,默认这里调用的是DubboProtocol.export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // TODO: 2021/5/29 得到服务key,格式:group+"/"+serviceName+":"+serviceVersion+":"+port String key = serviceKey(url); // TODO: 2021/5/29 创建exporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // TODO: 2021/5/24 开启服务器 openServer(url); // TODO: 2021/5/29 序列化 optimizeSerialization(url); return exporter; }可以到export先是new了一个DubboExporter对象, 后续打开了服务,接下来继续看openServer做了啥
private void openServer(URL url) { String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { // TODO: 2021/5/24 启动一个服务实例 serverMap.put(key, createServer(url)); } else { // server supports reset, use together with override server.reset(url); } } } private ExchangeServer createServer(URL url) { // TODO: 2021/5/29 服务器关闭是发送readonly时间 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // TODO: 2021/5/29 心跳默认时间 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // TODO: 2021/5/29 获得远程通讯服务端实现方式 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); // TODO: 2021/5/29 添加编解码器DubboCodec实现 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { // TODO: 2021/5/29 启动服务器 server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }可以看到最终还是依赖URL携带的远程通讯实现方法创建了一个服务器对象。
总结一下:doLocalExport最终其实就是根据URL开启了服务器,并返回了Exporter。
接下来继续看注册服务部分
public void register(URL registryUrl, URL registedProviderUrl) { // TODO: 2021/5/29 获取注册中心实例 Registry registry = registryFactory.getRegistry(registryUrl); // TODO: 2021/5/29 调用register registry.register(registedProviderUrl); }Regsitry的生成最终也是依赖了SPI机制,最终走向FailbackRegistry.register
@Override public void register(URL url) { super.register(url); // TODO: 2021/5/24 从失败的集合中移除 failedRegistered.remove(url); failedUnregistered.remove(url); try { // TODO: 2021/5/24 向注册中心发起注册请求 doRegister(url); } catch (Exception e) { Throwable t = e; boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // TODO: 2021/5/29 发生异常则放入failedRegistered failedRegistered.add(url); } }可以看到注册的核心实现是在doRegister中,不过通过代码机制我们也可以看出,在注册报错的时候会被trycatch拦截,然后放入failedRegistered容器中,结合FailbackRegistry该类名可以推测应该是有个重试机制存在,看看构造方法
// TODO: 2021/5/24 从url中获取重试频率参数,启动定时器进行重试逻辑 public FailbackRegistry(URL url) { super(url); this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { // TODO: 2021/5/29 定时重试 retry(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); }果不其然,最终如果注册发生了异常,则会进行定时重试。
关于重试机制也是要有的,在我司的rpc框架中,我们将重试时间放在yaml上去配置,不过定时器并没有采用Executor机制,而是模仿了dubbo3.0的写法,也就是时间轮的机制,性能更好。
接下来看注册核心部分doRegister,可以看到该方法是一个抽象方法,由于我在xml配置中配置的注册中心是Zookeeper,因而最终走向ZookeeperRegistry
@Override protected void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }服务注册走到这里基本到头了,再深入便是看注册中心的实现了。
接下来看看向注册中心订阅override数据部分
上面有说过registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener)最终走向的方法是FailbackRegistry.subscribe
public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // TODO: 2021/5/29 真正做订阅的地方 doSubscribe(url, listener); } catch (Exception e) { Throwable t = e; List<URL> urls = getCacheUrls(url); if (urls != null && !urls.isEmpty()) { notify(url, listener, urls); logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); } else { // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } } // TODO: 2021/5/29 订阅失败,则放入失败容器中 addFailedSubscribed(url, listener); } }同样,订阅失败后也是放入失败容器中,定时重试进行订阅。
再看看核心实现方法doSubscribe方法,最终走向ZookeeperRegistry.doSubscribe中
@Override protected void doSubscribe(final URL url, final NotifyListener listener) { try { // TODO: 2021/5/29 处理URL参数中interface为*的订阅,例如监控中心的订阅 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { /** 先无视 **/ } else { List<URL> urls = new ArrayList<URL>(); // TODO: 2021/5/29 遍历分类数组 for (String path : toCategoriesPath(url)) { // TODO: 2021/5/29 获得监听器集合 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); // TODO: 2021/5/29 如果没有则创建 if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } // TODO: 2021/5/29 获得监听器 ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { // TODO: 2021/5/29 通知服务变化,回调NotifyListener ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } // TODO: 2021/5/29 创建节点,如:/dubbo/com.alibaba.dubbo.demo.DemoService/providers zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // TODO: 2021/5/29 通知数据变更,如RegistryDirectory notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }这个方法主要做了订阅和监听触发逻辑,具体逻辑就是订阅了某个服务的URL,在服务变更的时候触发逻辑变化。其实此处已经是可以归纳入服务治理模块了,后续会有专门的文章分享服务治理,有兴趣可以持续关注。
画个流程图,整理下思路
看到这里服务暴露流程基本理完了,还是有点东西在里面的,并且还需要掌握 Dubbo SPI,不然有些点例如自适应什么的还是很难理解的,为了写这篇文章,我前前后后也是花了不少的时间。
最后我再来一张完整的流程图带大家再过一遍,具体还是有很多细节,不过不是主干我就不做分析了,不然文章散掉了。
后续服务治理、APO、SPI机制也会在该流程图上进行拓展,有兴趣的也可以关注流程图链接:
https://www.processon.com/view/link/60b25f275653bb3c7e646934
总结
虽然看完了该篇文章,但是还是建议大家自己打断点过一遍,可以更加清晰,而如果是为了应付面试官提问的话,基本上记住上面流程图的内容就差不多了,当你研究完了dubbo后,其实会发现dubbo有很多东西可以写,比如服务应用、SPI、dubbo中的AOP机制、服务治理等好几个模块,最后就是带大家撸一个RPC框架了,还是那句话,想学dubbo的可以持续关注这一系列。
本文转载自微信公众号「 稀饭下雪」,可以通过以下二维码关注。转载本文请联系 稀饭下雪公众号。
原文链接:https://mp.weixin.qq.com/s/gsPa2KHS1ZqxU6z3_wI46w