「一发入魂」Nacos服务注册源码解析

「一发入魂」Nacos服务注册源码解析

首页枪战射击源代码删除官网更新时间:2024-04-30
本章内容

阅读说明:本文主要剖析Nacos服务注册的关键源码,为了讲清楚源码中的关键步骤与实现细节,会从三个维度进行解析:

由于本文篇幅较长,各位看官可以通过时序图 方法说明的方式快速了解Nacos服务注册的实现步骤以及设计思想,源码当作补充说明。

版本说明:服务注册源码分析基于Nacos-1.4.3版本。

服务注册客户端入口

Nacos客户端依赖:

<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency>

基于Springboot自动装备原理,服务启动时会自动加载Nacos客户端相关的配置类。

如图所示:

其中,与服务注册相关的配置类为:NacosServiceRegistryAutoConfiguration。

如图所示:

NacosServiceRegistryAutoConfiguration是Nacos服务注册的自动配置类,该类的主要功能是按条件注入NacosAutoServiceRegistration、NacosServiceRegistry、NacosRegistration等服务注册相关的Bean。

客户端发送服务注册请求

服务注册客户端交互逻辑,如图所示:

NacosAutoServiceRegistration

NacosAutoServiceRegistration继承了AbstractAutoServiceRegistration,该类实现了ApplicationListener接口的onApplicationEvent()方法。

服务启动,Spring初始化完成后,发布对应的初始化完成事件(WebServerInitializedEvent),触发AbstractAutoServiceRegistration.onApplicationEvent()方法调用逻辑,开启注册流程。

源码如下:

@Override @SuppressWarnings("deprecation") public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); } @Deprecated public void bind(WebServerInitializedEvent event) { ... // 通过CAS方式设置服务端口 this.port.compareAndSet(0, event.getWebServer().getPort()); // 开启服务注册流程 this.start(); }

start()方法主要做以下事情:

源码如下:

public void start() { ... // 服务启动时,服务运行状态为false if (!this.running.get()) { // 发布服务预注册事件 this.context.publishEvent( new InstancePreRegisteredEvent(this, getRegistration())); // 执行服务注册 register(); if (shouldRegisterManagement()) { registerManagement(); } // 发布服务注册完成事件 this.context.publishEvent( new InstanceRegisteredEvent<>(this, getConfiguration())); // 更新服务运行状态为true this.running.compareAndSet(false, true); } }

register()方法主要做以下事情:

源码如下:

protected void register() { this.serviceRegistry.register(getRegistration()); }

其中,getRegistration()主要作用是从NacosRegistration中获取Nacos服务注册基本信息。

NacosRegistration

NacosRegistration主要用于管理Nacos服务注册基本信息,该类主要封装了NacosDiscoveryProperties的配置信息(application.yml),同时,该类实现了Registration接口(继承自ServiceInstance),Registration是Springcloud提供的服务注册信息通用接口,用于管理服务注册的基本信息(如:服务ID、服务IP、端口等)。

NacosServiceRegistry

NacosServiceRegistry类实现了ServiceRegistry接口,该接口是Springcloud提供的服务注册通用接口,声明了服务注册、取消注册等方法。

NacosServiceRegistry#register方法主要做以下事情:

源码如下:

// com.alibaba.cloud.nacos.registry.NacosServiceRegistry.register @Override public void register(Registration registration) { // ... // 获取Nacos命名服务 NamingService namingService = namingService(); // 获取服务名,默认为应用名 String serviceId = registration.getServiceId(); // 获取服务分组名,默认为DEFAULT_GROUP String group = nacosDiscoveryProperties.getGroup(); // 创建服务实例,封装服务注册信息(如:服务IP、端口、权重、服务节点类型等) Instance instance = getNacosInstanceFromRegistration(registration); try { // 调用NamingService.registerInstance()方法进行服务注册 namingService.registerInstance(serviceId, group, instance); // ... }catch (Exception e) { // ... } }

NamingService

NamingService接口定义了服务注册、服务发现等服务注册相关的核心方法,它的默认实现为NacosNamingService。

NacosNamingService#registerInstance方法主要做以下事情:

源码如下:

// com.alibaba.nacos.client.naming.NacosNamingService#registerInstance public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); // 判断是否为临时实例 if (instance.isEphemeral()) { // 构建服务心跳信息 BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); // 构建心跳信息,并向Nacos服务端定时(5秒/次)发送心跳请求 beatReactor.addBeatInfo(groupedServiceName, beatInfo); } // 向Nacos服务端注册服务实例 serverProxy.registerService(groupedServiceName, groupName, instance); }

NamingProxy#registerService方法主要做以下事情:

源码如下:

// com.alibaba.nacos.client.naming.net.NamingProxy.registerService public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { // ... // 组装请求参数 final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); // 调用Nacos服务端服务注册接口(/v1/ns/instance)注册服务实例。 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); } public String reqApi(String api, Map<String, String> params, String method) throws NacosException { return reqApi(api, params, Collections.EMPTY_MAP, method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException { return reqApi(api, params, body, getServerList(), method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException { // ... if (StringUtils.isNotBlank(nacosDomain)) { for (int i = 0; i < maxRetry; i ) { try { // 调用服务注册接口(/v1/ns/instance)向Nacos服务端注册服务实例 return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { // ... } } } else { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); for (int i = 0; i < servers.size(); i ) { String server = servers.get(index); try { // 调用服务注册接口(/v1/ns/instance)向Nacos服务端注册服务实例 return callServer(api, params, body, server, method); } catch (NacosException e) { // ... } index = (index 1) % servers.size(); } } // ... }服务端服务注册入口

Nacos服务端启动类为nacos-console模块中的com.alibaba.nacos.Nacos,在nacos-console模块中引入了nacos-naming模块,该模块中的com.alibaba.nacos.naming.controllers包中提供了服务注册、服务发现、健康检查等核心接口。如图所示:

服务注册与同步

获取服务最新服务实例列表交互逻辑,如图所示:

InstanceController

Nacos服务端接收客户端发送的服务注册请求后,调用InstanceController#register方法处理服务注册请求。

InstanceController#register方法主要做以下事情:

源码如下:

// com.alibaba.nacos.naming.controllers.InstanceController.register @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { // 根据客户端请求参数获取namespaceId,默认为:public final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 根据客户端请求参数获取serviceName,格式:groupName@@serviceName final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); // 根据客户端请求参数构建服务实例 final Instance instance = parseInstance(request); // 调用ServiceManage#registerInstance方法注册服务实例 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }

ServiceManage

ServiceManage是Nacos服务的核心管理类,其中定义了服务注册表以及服务相关的各种核心API。如图所示:

ServiceManager#registerInstance方法主要做以下事情:

源码如下:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // 创建并初始化服务,将其添加到注册表中(第一次注册服务) createEmptyService(namespaceId, serviceName, instance.isEphemeral()); // 根据namespaceId,、serviceName获取对应的服务 Service service = getService(namespaceId, serviceName); // ... // 向服务中添加要注册的服务实例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }

ServiceManager#createEmptyService方法主要做以下事情:

源码如下:

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException { createServiceIfAbsent(namespaceId, serviceName, local, null); } // com.alibaba.nacos.naming.core.ServiceManager#createServiceIfAbsent public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { // 根据namespaceId、serviceName从注册表中获取对应的服务 Service service = getService(namespaceId, serviceName); // 如果服务不存在,则创建并初始化服务 if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); // 创建服务 service = new Service(); // 设置服务名 service.setName(serviceName); // 设置namespaceId service.setNamespaceId(namespaceId); // 设置分组名 service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); // 根据服务实例列表生成MD5校验值(如:3c84b68316cb8449ca2842a990e2dd7d) service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } // 校验服务名和服务集群名格式 service.validate(); // 该方法主要做一下事情: // 1.将服务添加到注册表中 // 2.初始化服务并启动心跳检测(5秒/次) // 3.监听服务实例变更 putServiceAndInit(service); // 不是创建本地服务 if (!local) { // 根据服务实例类型(临时、持久)选择不同协议进行服务注册与同步: // 临时实例:基于Distro协议将服务信息同步给集群中的其他Nacos节点(即:AP模式) // 持久实例:基于Raft协议将服务信息同步给集群中的其他Nacos节点(即:CP模式) addOrReplaceService(service); } } }

ServiceManager#addInstance方法主要做以下事情:

源码如下:

// com.alibaba.nacos.naming.core.ServiceManager#addInstance public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 根据namespaceId、serviceName、实例类型(临时、持久)构建服务唯一标识: // 临时实例:com.alibaba.nacos.naming.iplist.ephemeral.namespaceId##groupName@@serviceName // 持久实例:com.alibaba.nacos.naming.iplist.namespaceId##groupName@@serviceName String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // 根据namespaceId,、serviceName获取对应的服务 Service service = getService(namespaceId, serviceName); // 对服务对象添加同步锁 synchronized (service) { // 通过对比待注册实例列表与旧实例列表,获取最新的实例列表(旧实例列表 待注册实例列表)。 // 注意:此处会将当前服务实例列表拷贝到一个新的服务实例列表中,基于CopyOnWrite思想 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); // 将新的服务实例列表封装为Instances Instances instances = new Instances(); instances.setInstanceList(instanceList); // 根据服务实例类型(临时、持久)选择不同协议进行服务注册与同步: // 临时实例:基于Distro协议将服务信息同步给集群中的其他Nacos节点(即:AP模式) // 持久实例:基于Raft协议将服务信息同步给集群中的其他Nacos节点(即:CP模式) consistencyService.put(key, instances); } }

ServiceManager#addIpAddresses->updateIpAddresses方法主要做以下事情:

源码如下:

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips); } public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { // 根据服务唯一标识获取当前数据集(dataStore)中旧的服务实例列表,返回值是Datum(key:服务唯一标识,value:服务实例列表) Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); // 根据实例类型(临时、持久)获取现有服务实例列表 List<Instance> currentIPs = service.allIPs(ephemeral); // 创建Map来保存现有服务实例列表,key:服务实例IP Port,value:服务实例 Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); // 创建Set来保存现有服务实例的instanceId(实例唯一标识) Set<String> currentInstanceIds = Sets.newHashSet(); // 遍历现有服务实例列表 for (Instance instance : currentIPs) { // 将现有服务实例保存到新创建的Map中 currentInstances.put(instance.toIpAddr(), instance); // 将现有服务实例的instanceId保存到新创建的Set中 currentInstanceIds.add(instance.getInstanceId()); } // 定义新的Map来保存更新后的服务实例列表 Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { // 如果服务存在旧的实例列表,则更新旧实例的健康状态和心跳时间并将其添加到新的Map中 instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { // 如果服务不存在旧的实例列表,则创建新Map instanceMap = new HashMap<>(ips.length); } // 遍历待处理服务实例列表 for (Instance instance : ips) { // 如果待处理服务对应的集群信息不存在,则创建并初始化服务对应的集群信息 if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(), service); cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); // ... } if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { // 如果是删除操作,则将待处理服务实例从Map中移除 instanceMap.remove(instance.getDatumKey()); } else { // 根据待处理服务信息组成服务唯一标识从旧的服务实例列表中查找 Instance oldInstance = instanceMap.get(instance.getDatumKey()); // 如果是旧的服务实例,则更新服务实例信息 if (oldInstance != null) { instance.setInstanceId(oldInstance.getInstanceId()); } else { // 如果是新的服务实例,则新增服务实例信息(即:生成新的instanceId) instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } // 将新的服务实例加入Map中 instanceMap.put(instance.getDatumKey(), instance); } } // ... // 将最新服务实例列表对应的Map转为对应的List,并返回 return new CopyOnWriteArrayList<>(instanceMap.values()); }

ConsistencyService#put方法为服务注册与同步核心逻辑,该方法主要做以下事情:

根据服务唯一标识前缀判断服务实例类型(临时、持久):

源码如下:

// com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put public void put(String key, Record value) throws NacosException { // 根据服务唯一标识前缀判断服务实例类型(临时、持久) mapConsistencyService(key).put(key, value); } // com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put @Override public void put(String key, Record value) throws NacosException { // 1.将服务对应的实例列表信息封装成Datum加入到DataStore(作用是根据服务唯一标识获取对应的服务实例列表)中。 // 2.将服务唯一标识、操作类型封装成一个Task任务加入到DistroConsistencyServiceImpl.Notifier的任务队列中。 onPut(key, value); // 遍历集群中除自己以外的其他Nacos节点: // 先将服务信息(资源)、资源类型(如:服务信息同步、元数据同步)以及目标节点封装成DistroKey。 // 再将DistroKey、操作类型(如:新增、删除实例)封装成延时任务DelayTask(延时时长:1秒)添加到Nacos延时任务执行器的任务集合中。 distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } public void onPut(String key, Record value) { // 判断是否为临时实例 if (KeyBuilder.matchEphemeralInstanceListKey(key)) { // 将服务信息封装成Datum(key:服务唯一标识,value:服务实例列表) Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); // 将Datum添加到DataStore中 dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } // 向Notifier的任务队列中添加任务 notifier.addTask(key, DataOperation.CHANGE); } public void addTask(String datumKey, DataOperation action) { // 如果服务集合中存在服务并且服务操作类型为更新,则直接返回 if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } // 如果服务操作类型为更新,则将服务添加到服务集合中 if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } // 向Notifier的任务队列中添加任务 tasks.offer(Pair.with(datumKey, action)); } public void sync(DistroKey distroKey, DataOperation action, long delay) { // 遍历集群中除自己以外的其他Nacos节点 for (Member each : memberManager.allMembersWithoutSelf()) { // 构建DistroKey,其中: // resourceKey为服务唯一标识 // resourceType为com.alibaba.nacos.naming.iplist.(表示同步服务信息) // targetServer为Nacos节点 DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); // 根据创建的DistroKey和操作类型构建Distro延时任务(延时时长:1秒) DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); // 向延时任务执行器(NacosDelayTaskExecuteEngine)中添加延时任务 distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress()); } } }服务注册

服务注册交互逻辑,如图所示:

DistroConsistencyServiceImpl初始化时会开启一个线程池DISTRO_NOTIFY_EXECUTOR,其中只有一个线程(即:DistroConsistencyServiceImpl.Notifier),主要作用是通过死循环的方式从任务队列(Notifier.tasks)中取出任务进行处理(即:将服务实例列表更新到注册表中)。

源码如下:

// com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#init @PostConstruct public void init() { GlobalExecutor.submitDistroNotifyTask(notifier); } // com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); // ... @Override public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ; ) { try { // 从任务队列中取出任务 Pair<String, DataOperation> pair = tasks.take(); // 任务处理 handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } //... }

DistroConsistencyServiceImpl.Notifier#handle方法主要作用是遍历任务中的服务信息:

源码如下:

private void handle(Pair<String, DataOperation> pair) { try { // ... // 遍历任务中服务信息(Service继承来RecordListener) for (RecordListener listener : listeners.get(datumKey)) { count ; try { // 更新操作 if (action == DataOperation.CHANGE) { // 更新服务对应的实例列表 listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } // 删除操作 if (action == DataOperation.DELETE) { // 从服务对应的实例列表中删除实例 listener.onDelete(datumKey); continue; } } catch (Throwable e) { // ... } } // ... } catch (Throwable e) { // ... } }

更新操作

更新操作由Service#onChange方法进行处理,该方法主要做以下事情:

源码如下:

public void onChange(String key, Instances value) throws Exception { // ... // 更新服务对应的实例列表信息 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); // 更新服务MD5校验码 recalculateChecksum(); } public void updateIPs(Collection<Instance> instances, boolean ephemeral) { // 创建Map:用于保存服务对应的Cluster信息 Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } // 遍历服务实例列表 for (Instance instance : instances) { try { if (instance == null) { continue; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } // 如果服务实例对应的集群信息不存在,则创建并初始化集群信息 if (!clusterMap.containsKey(instance.getClusterName())) { // ... 日志信息 Cluster cluster = new Cluster(instance.getClusterName(), this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } // 如果服务对应的集群实例列表不存在,则创建集群实例列表 List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } // 将服务实例添加到服务集群实例列表中 clusterIPs.add(instance); } catch (Exception e) { // ... 日志信息 } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); // 更新集群实例列表 clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } // 设置服务最后更新时间 setLastModifiedMillis(System.currentTimeMillis()); // 发布服务信息变更事件(即:将服务变更通知订阅该服务的客户端) getPushService().serviceChanged(this); // ... } // com.alibaba.nacos.naming.core.Cluster#updateIps public void updateIps(List<Instance> ips, boolean ephemeral) { // 根据实例类型(临时、持久)获取旧的服务实例列表 Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; // 将旧的服务实例列表转储到新创建Map中(key:服务唯一标识,value:服务实例) HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size()); for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } // 对比待处理服务实例列表和旧服务实例列表,获取需要更新的服务实例列表 List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values()); if (updatedIPs.size() > 0) { for (Instance ip : updatedIPs) { Instance oldIP = oldIpMap.get(ip.getDatumKey()); if (!ip.isMarked()) { ip.setHealthy(oldIP.isHealthy()); } // ... } } // 获取待处理服务实例列表中不存在于旧服务实例列表中的实例(即:新增实例),更新该实例的健康状态为健康 List<Instance> newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0) { // ... 日志信息 for (Instance ip : newIPs) { // 更新服务实例健康状态为健康 HealthCheckStatus.reset(ip); } } // 获取旧服务实例列表中不存在于待处理实例列表中的实例(即:移除实例),更新该实例的健康状态为死亡 List<Instance> deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0) { // ... 日志信息 for (Instance ip : deadIPs) { // 更新服务实例健康状态为死亡 HealthCheckStatus.remv(ip); } } toUpdateInstances = new HashSet<>(ips); // 用最新的服务实例列表覆盖旧的服务实例列表(CopyOnWrite思想) if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances; } } /** * 获取需要更新的服务实例列表 * @param newInstance 待处理服务实例列表 * @param oldInstance 旧服务实例列表 * @return */ private List<Instance> updatedIps(Collection<Instance> newInstance, Collection<Instance> oldInstance) { // 获得待处理服务实例列表和旧服务实例列表的交集 List<Instance> intersects = (List<Instance>) CollectionUtils.intersection(newInstance, oldInstance); // 将交集中的实例存储到stringIpAddressMap中,key:ip port,value:instance Map<String, Instance> stringIpAddressMap = new ConcurrentHashMap<>(intersects.size()); for (Instance instance : intersects) { stringIpAddressMap.put(instance.getIp() ":" instance.getPort(), instance); } // 定义交集Map,key:服务实例,value:标识 // 1-表示实例只存在于待处理服务实例列表或者旧服务实例列表中 // 2-表示实例同时存在于待处理服务实例列表和旧服务实例列表中 Map<String, Integer> intersectMap = new ConcurrentHashMap<>(newInstance.size() oldInstance.size()); // 定义需要更新的服务实例Map Map<String, Instance> updatedInstancesMap = new ConcurrentHashMap<>(newInstance.size()); // 定义待处理的服务实例Map Map<String, Instance> newInstancesMap = new ConcurrentHashMap<>(newInstance.size()); // 将旧服务实例列表中实例加入交集Map中,并设置实例的标识为1 for (Instance instance : oldInstance) { if (stringIpAddressMap.containsKey(instance.getIp() ":" instance.getPort())) { intersectMap.put(instance.toString(), 1); } } // 将待处理服务实例列表中实例加入交集Map中,如果实例在交集Map中已经存在,则设置为2(即:替换),否则设置为1(即:新增) for (Instance instance : newInstance) { if (stringIpAddressMap.containsKey(instance.getIp() ":" instance.getPort())) { if (intersectMap.containsKey(instance.toString())) { intersectMap.put(instance.toString(), 2); } else { intersectMap.put(instance.toString(), 1); } } // 将待处理实例加入newInstancesMap中(这一步很重要,后续用于新实例覆盖旧实例) newInstancesMap.put(instance.toString(), instance); } // 遍历交集Map for (Map.Entry<String, Integer> entry : intersectMap.entrySet()) { String key = entry.getKey(); Integer value = entry.getValue(); // 将交集Map中所有标识为1(注意:标识为1的实例包含待处理实例和旧实例)且存在于待处理实例列表中的实例加入最新的服务实例列表中(即:需要更新的旧实例 新实例) if (value == 1) { if (newInstancesMap.containsKey(key)) { updatedInstancesMap.put(key, newInstancesMap.get(key)); } } } return new ArrayList<>(updatedInstancesMap.values()); }

删除操作

更新操作由Service#onDelete方法进行处理,该方法主要做以下事情:

源码如下:

@Override public void onDelete(String key) throws Exception { boolean isEphemeral = KeyBuilder.matchEphemeralInstanceListKey(key); for (Cluster each : clusterMap.values()) { each.updateIps(Collections.emptyList(), isEphemeral); } } // 其中each.updateIps方法就是更新操作源码中的updateIps方法。服务信息同步

在前面分析ConsistencyService#put方法时,会将服务信息封装成延时任务DelayTask(延时时长:1秒)添加到Nacos延时任务执行器(NacosDelayTaskExecuteEngine)的任务集合中。

NacosDelayTaskExecuteEngine在创建时会创建一个定时任务去执行NacosDelayTaskExecuteEngine#processTasks方法。

源码如下:

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity); // 创建一个单线程的线程池 processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); // 创建定时任务处理延时任务(延时时长:100ms,时间间隔:100ms) processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); } // com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine.ProcessRunnable // 延时任务处理线程 private class ProcessRunnable implements Runnable { @Override public void run() { try { // 处理任务,如:Nacos集群中节点之间的数据同步 processTasks(); } catch (Throwable e) { getEngineLog().error(e.toString(), e); } } }

NacosDelayTaskExecuteEngine#processTasks方法主要做以下事情:

源码如下:

protected void processTasks() { Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { // 根据taskKey(DistroKey)获取对应的延时任务 AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } // 根据taskKey获取对应的任务处理器,默认为:TaskExecuteWorker NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " task); continue; } try { // ReAdd task if process failed // 任务处理失败,则将任务重新加入到任务集合中进行重试。 if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error : " e.toString(), e); retryFailedTask(taskKey, task); } } } private void retryFailedTask(Object key, AbstractDelayTask task) { task.setLastProcessTime(System.currentTimeMillis()); addTask(key, task); }

【阅读推荐】

更多精彩内容(如:Redis、数据结构与算法、Kafka等)请移步【南秋同学】个人主页进行查阅。

【作者简介】

一枚热爱技术和生活的老贝比,专注于Java领域,关注【南秋同学】带你一起学习成长~

,
大家还看了
也许喜欢
更多游戏

Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved