Nacos2.1.0源码解析-附带全流程图

Nacos2.1.0源码解析-附带全流程图

首页枪战射击源代码删除官网更新时间:2024-04-26

一、概括

上篇文章解析了一下Nacos1.4.1的源码流程及全流程图,包括客户端注册/发现/删除/集群通信/心跳等功能。本次拿Nacos2.1.0与其做个对比,最大的区别就是Nacos由http通信变为rpc通信,且增加很多事件处理功能。

二、客户端服务注册

服务注册功能大部分跟1.4.1很像可以参考https://www.toutiao.com/article/7174284508034974240/?log_from=0128e8ae14ea9_1670654861328,不同的地方是进入NacosNamingService的registerInstance函数后调用clientProxy.registerService(此函数就是通过发送RPC注册服务请求给server端注册服务),代码及流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:

@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); // clientProxy是NamingClientProxyDelegate,在NacosNamingService初始化时调用它的init时创建的 clientProxy.registerService(serviceName, groupName, instance);//进行service注册 }

@Override public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { // 临时实例 getExecuteClientProxy(instance)=NamingGRPCClientProxy getExecuteClientProxy(instance).registerService(serviceName, groupName, instance); }

@Override public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, instance); //将serviceName/groupNmae/instance通过InstanceRedoData对象关联并放到registeredInstances缓存中 redoService.cacheInstanceForRedo(serviceName, groupName, instance); doRegisterService(serviceName, groupName, instance);//执行注册操作 }

public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException { //封装 InstanceRequest 对象 RPC调用服务端交互对象 InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance); requestToServer(request, Response.class);//服务注册请求通过rpc发送给客户端,最后会封装Payload对象通过存根对象grpcFutureServiceStub发送给server redoService.instanceRegistered(serviceName, groupName);//从registeredInstances缓存中获取InstanceRedoData对象并设置注册状态为true }

客户端服务注册流程

三、客户端服务发现

服务发现是服务第一次调用客户端时根据服务名获取,会先从缓存serviceInfoMap中获取,若缓存中为空,则通过RPC发送SubscribeServiceRequest对象,请求服务端获取服务实例列表,代码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:

@Override public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; String clusterString = StringUtils.join(clusters, ","); if (subscribe) {//是否订阅,默认是true //从缓存中获取服务实例列表,第一次肯定是空,具体逻辑见下面代码 serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString); if (null == serviceInfo) { //服务订阅,大致分三步,1、执行定时任务,定时从注册中心获取服务实例,并存到本地缓存serviceInfoMap中 //2、发送RPC订阅请求server 3、缓存第二步得到的信息到本地缓存map中,并发布事件缓存到本地文件 serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString); } } else { serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false); } return selectInstances(serviceInfo, healthy);//获取ServiceInfo中instance实例集合,循环排除掉不满足healthy的实例 }

public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) { NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch()); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//获取组名 String key = ServiceInfo.getKey(groupedServiceName, clusters);//通过组名和集群名组装成缓存的key if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key);//如果开启了failover 策略 就从failoverReactor获取服务信息 } return serviceInfoMap.get(key);//从缓存中获取 }

@Override public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters); String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName); String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);//service名、组名、集群名组合成key // 执行定时任务UpdateTask,定时从注册中心获取服务实例,并存到本地缓存serviceInfoMap中 serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters); ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);//再次从缓存中获取有没有要订阅的服务信息 if (null == result || !isSubscribed(serviceName, groupName, clusters)) { //发送RPC订阅请求server,获取服务信息 result = grpcClientProxy.subscribe(serviceName, groupName, clusters); } //缓存result的信息到本地缓存serviceInfoMap中,并发布事件InstancesChangeEvent,缓存到本地文件DiskCache.write serviceInfoHolder.processServiceInfo(result); return result; }

public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException { if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters); } //缓存SubscriberRedoData信息到订阅者缓存subscribes中 redoService.cacheSubscriberForRedo(serviceName, groupName, clusters); return doSubscribe(serviceName, groupName, clusters);//发送RPC请求 }

public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException { SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters, true);//封装SubscribeServiceRequest对象,为了rpc请求服务端订阅服务信息 //下面这一步跟注册是一样的,只不过请求对象换成SubscribeServiceRequest SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class); redoService.subscriberRegistered(serviceName, groupName, clusters);//从服务端获取完信息后,更改SubscriberRedoData对象Registered状态为true return response.getServiceInfo(); }

客户端订阅服务流程图

四、服务端注册流程

自改为RPC调用后,服务端处理的逻辑处理类改为handler类(rpcn内部原理,有很多个handler对应客户端传的request对象),因客户端注册时发送的InstanceRequest故服务端在InstanceRequestHandler,执行handle方法,分别获取service对象Client对象及InstancePublishInfo对象,放服务信息到publishers缓存,发布客户端变化事件,源代码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:

private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) { clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());//注册表注册实例 return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE); }

@Override public void registerInstance(Service service, Instance instance, String clientId) { //获取Service对象,singletonRepository缓存中获得service对象,没有的话放入,namespaceSingletonMaps放入service,key是服务对应的命名空间名称 Service singleton = ServiceManager.getInstance().getSingleton(service); if (!singleton.isEphemeral()) {//必须是临时实例 throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName())); } Client client = clientManager.getClient(clientId);//通过clientid(也就是两个客户端和服务端的连接id)获取Client对象,此对象是rpc必须封装的,通过EphemeralIpPortClientManager的clients来get的 if (!clientIsLegal(client, clientId)) { return; } InstancePublishInfo instanceInfo = getPublishInfo(instance);//封装InstancePublishInfo信息 //缓存服务Service和InstancePublishInfo到publishers缓存,其中key是service,然后发布客户端变化事件 client.addServiceInstance(singleton, instanceInfo); client.setLastUpdatedTime(); //发布客户端服务注册事件 NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId)); //发布实例元数据变动事件 NotifyCenter .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false)); }

//调用IpPortBasedClient的addServiceInstance方法,super再掉其父类AbstractClient public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) { return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo)); }

public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) { ///缓存服务Service和InstancePublishInfo到publishers注册者缓存 if (null == publishers.put(service, instancePublishInfo)) { MetricsMonitor.incrementInstanceCount();//监控器,自加1,统计注册实例个数 } //发布客户端变化事件,细节流程见 六、流程中的事件处理 NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this)); Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId()); return true; }

服务端注册流程

五、服务端订阅流程(服务查询流程跟这个类似)

服务端订阅流程通过SubscribeServiceRequestHandler处理,进入handle方法,大致分两步,第一步,从缓存serviceDataIndexes中获取,缓中没有时从注册表获得服务信息,获取实例放到service中,缓存集群信息,最后返回实例集合并且缓存到serviceDataIndexes中,第二步,把service和订阅者放到subscribers缓存中并且发布订阅事件,代码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:

@Override @Secured(action = ActionTypes.READ) public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException { String namespaceId = request.getNamespace();//获取命名空间名称 String serviceName = request.getServiceName();//获取服务名字 String groupName = request.getGroupName();//获取组名 String app = request.getHeader("app", "unknown"); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//整合service名和组名 Service service = Service.newService(namespaceId, groupName, serviceName, true); //封装订阅者对象 Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters()); //获取ServiceInfo对象信息,主要看serviceStorage.getData(service) ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service), metadataManager.getServiceMetadata(service).orElse(null), subscriber); if (request.isSubscribe()) {//是否是订阅请求 //把service和订阅者放到subscribers缓存中并且发布订阅事件ClientSubscribeServiceEvent clientOperationService.subscribeService(service, subscriber, meta.getConnectionId()); } else { clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId()); } return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo); }

public ServiceInfo getData(Service service) { //先从serviceDataIndexes缓存中获取ServiceInfo信息,没有的话调用getPushData(service)获取 return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service); }

public ServiceInfo getPushData(Service service) { ServiceInfo result = emptyServiceInfo(service);//创建ServiceInfo if (!ServiceManager.getInstance().containSingleton(service)) { return result; } result.setHosts(getAllInstancesFromIndex(service));//从注册表获得服务信息,获取实例放到service中,缓存集群信息,最后返回实例集合 serviceDataIndexes.put(service, result);//缓存服务信息到serviceDataIndexes return result; }

public void subscribeService(Service service, Subscriber subscriber, String clientId) { //从singletonRepository中获取Service对象 Service singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service); Client client = clientManager.getClient(clientId);//从clients缓存中获取Client对象 if (!clientIsLegal(client, clientId)) { return; } client.addServiceSubscriber(singleton, subscriber);//添加服务和订阅者 client.setLastUpdatedTime();//添加上次更新时间 //发布客户端订阅服务事件 NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId)); }

服务端订阅流程

六、流程中的事件处理

1、发布客户端变化事件:调用DistroClientDataProcessor的onEvent函数,分两部分,第一,向定时任务NacosDelayTaskExecuteEngine的tasks集合中添加定时任务DistroDelayTask,第二,调用集群的其他节点发送DistroDataRequest,用来同步数据,发布发布客户端注册或注销事件、发布客户端断连事件,源码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:

private void syncToAllServer(ClientEvent event) { Client client = event.getClient(); // Only ephemeral data sync by Distro, persist client should sync by raft. if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) { return; } if (event instanceof ClientEvent.ClientDisconnectEvent) { DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.DELETE);//删除 } else if (event instanceof ClientEvent.ClientChangedEvent) { DistroKey distroKey = new DistroKey(client.getClientId(), TYPE); distroProtocol.sync(distroKey, DataOperation.CHANGE);//客户端改变 } }

public void sync(DistroKey distroKey, DataOperation action, long delay) { for (Member each : memberManager.allMembersWithoutSelf()) { syncToTarget(distroKey, action, each.getAddress(), delay);//同步数据给其他集群 } }

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) { DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer); DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); //向定时任务NacosDelayTaskExecuteEngine的tasks集合中添加定时任务DistroDelayTask,最终会执行 //NacosDelayTaskExecuteEngine是Nacos的定时任务引擎,最终会执行子类的process方法 //这一步会执行DistroDelayTaskProcessor.process distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer); } }

public boolean process(NacosTask task) { if (!(task instanceof DistroDelayTask)) { return true; } DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); switch (distroDelayTask.getAction()) { case DELETE://删除 DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask); return true; case CHANGE://修改 case ADD://添加 //执行DistroSyncChangeTask父类的run,然后执行DistroSyncChangeTask的doExecuteWithCallback方法 DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true; default: return false; } }

protected void doExecuteWithCallback(DistroCallback callback) { String type = getDistroKey().getResourceType(); DistroData distroData = getDistroData(type);//根据类型获取Distro协议数据 if (null == distroData) { Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString()); return; } getDistroComponentHolder().findTransportAgent(type) .syncData(distroData, getDistroKey().getTargetServer(), callback);//发送给其他集群节点数据 }

public void syncData(DistroData data, String targetServer, DistroCallback callback) { if (isNoExistTarget(targetServer)) { callback.onSuccess(); return; } DistroDataRequest request = new DistroDataRequest(data, data.getType());//封装DistroDataRequest Member member = memberManager.find(targetServer); try { //集群间RPC通信发送DistroDataRequest,再进入DistroDataRequestHandler.handle方法,再根据不同的操作到不同的方法,改变是handleSyncData方法 //然后DistroProtocol的执行onReceive方法,再执行DistroClientDataProcessor的processData方法 clusterRpcClientProxy.asyncRequest(member, request, new DistroRpcCallbackWrapper(callback, member)); } catch (NacosException nacosException) { callback.onFailed(nacosException); } }

public boolean processData(DistroData distroData) { switch (distroData.getType()) { case ADD: case CHANGE: ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class) .deserialize(distroData.getContent(), ClientSyncData.class); handlerClientSyncData(clientSyncData);//同步客户端连接,更新clients注册表数据,发布客户端注册或注销事件,详见下面源码及流程图 return true; case DELETE: String deleteClientId = distroData.getDistroKey().getResourceKey(); Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId); clientManager.clientDisconnected(deleteClientId);//clients.remove移除客户端,并发布发布客户端断连事件,详见下面源码及流程图 return true; default: return false; } }

客户端变化事件流程图

2、客户端注册事件和客户端订阅事件和客户端断连事件:前两个事件都属于是ClientOperationEvent事件,后面的是ClientDisconnectEvent事件,都会执行ClientServiceIndexesManager.onEvent方法,然后根据不同的操作进入不同的方法,注册、注销、订阅、取消订阅、断连,源码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:

public void onEvent(Event event) { if (event instanceof ClientEvent.ClientDisconnectEvent) { handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);//执行客户端断连事件 } else if (event instanceof ClientOperationEvent) { handleClientOperation((ClientOperationEvent) event);//注册、注销、订阅、取消订阅事件 } }

//执行客户端断连事件 private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) { Client client = event.getClient(); for (Service each : client.getAllSubscribeService()) { removeSubscriberIndexes(each, client.getClientId());//注册表中移除客户端id,下面会解析 } for (Service each : client.getAllPublishedService()) { removePublisherIndexes(each, client.getClientId());//订阅表也移除客户端id,下面会解析 } } //根据事件具体分类区分注册、注销、订阅、取消订阅事件 private void handleClientOperation(ClientOperationEvent event) { Service service = event.getService(); String clientId = event.getClientId(); if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) { addPublisherIndexes(service, clientId);//注册服务 } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) { removePublisherIndexes(service, clientId);//注销服务 } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) { addSubscriberIndexes(service, clientId);//添加订阅 } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) { removeSubscriberIndexes(service, clientId);//取消订阅 } } //注册表publisherIndexes添加客户端id,发布服务改变事件 private void addPublisherIndexes(Service service, String clientId) { publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>()); publisherIndexes.get(service).add(clientId); NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); } //注册表publisherIndexes移除客户端id,发布服务改变事件 private void removePublisherIndexes(Service service, String clientId) { if (!publisherIndexes.containsKey(service)) { return; } publisherIndexes.get(service).remove(clientId); NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true)); } //订阅表subscriberIndexes添加客户端id,发布服务订阅事件 private void addSubscriberIndexes(Service service, String clientId) { subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>()); if (subscriberIndexes.get(service).add(clientId)) { NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId)); } } //订阅表subscriberIndexes移除客户端id private void removeSubscriberIndexes(Service service, String clientId) { if (!subscriberIndexes.containsKey(service)) { return; } subscriberIndexes.get(service).remove(clientId); if (subscriberIndexes.get(service).isEmpty()) { subscriberIndexes.remove(service); } }

客户端注册事件事件和客户端订阅事件和客户端断连事件流程

3、服务改变事件和服务订阅事件:向定时任务NacosDelayTaskExecuteEngine的tasks集合中添加定时任务PushDelayTask,通过参数区分,订阅只推送给当前订阅者,注册时推送给所有订阅者,都会执行NamingSubscriberServiceV2Impl.onEvent方法,然后执行PushExecuteTask.run,最后会通过rpc请求客户端推送最新的信息,客户端缓存起来,请求对象是ServerRequest,源码和流程图(流程图是完整流程图中截取的一部分,完整流程图参考七、流程图 )如下:

public void onEvent(Event event) { if (!upgradeJudgement.isUseGrpcFeatures()) { return; } if (event instanceof ServiceEvent.ServiceChangedEvent) { //如果 service发生变化, 推送给所有的订阅者.new PushDelayTask只有两个参数 ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event; Service service = serviceChangedEvent.getService(); //添加任务 delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay())); } else if (event instanceof ServiceEvent.ServiceSubscribedEvent) { //如果订阅变化, 推送给当前的订阅者.new PushDelayTask有3个参数,最终subscribedEvent.getClientId()会一直传下去,有一个地方有区分 ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event; Service service = subscribedEvent.getService(); //添加任务 delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId())); } }

NacosDelayTaskExecuteEngine初始化时产生定时任务,在processTasks()取任务处理,这儿代码相对简单不在分析,最终会进入PushDelayTaskExecuteEngine.PushDelayTaskProcessor.process方法。

public boolean process(NacosTask task) { PushDelayTask pushDelayTask = (PushDelayTask) task; Service service = pushDelayTask.getService(); //执行任务PushExecuteTask,执行PushExecuteTask.run,循环得到ClientID,获取订阅者对象,推送给客户端变化数据 //在进入PushExecutorRpcImpl.doPushWithCallback,再到RpcPushService.pushWithCallback //通过connection.asyncRequest发送ServerRequest到客户端,然后客户端获取service信息放到本地缓存serviceInfoMap中 //并发布服务改变InstancesChangeEvent事件,最终写到本地缓存文件中 NamingExecuteTaskDispatcher.getInstance() .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask)); return true; }

服务改变事件和服务订阅事件流程图

七、流程图

完整流程图链接:https://kdocs.cn/l/cucHfYowAItw

Nacos2.1.0源码解析完整流程图

查看全文
大家还看了
也许喜欢
更多游戏

Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved