nacos1.4.1注册中心源码解析(临时实例AP模式)-含流程图

nacos1.4.1注册中心源码解析(临时实例AP模式)-含流程图

首页枪战射击源代码删除官网更新时间:2024-04-17

一、Nacos核心功能点

服务注册:Nacos Client会通过发送REST请求的方式向Nacos Server注册自己的服务,提供自身的元数据,比如ip地址、端口等信息。Nacos Server接收到注册请求后,就会把这些元数据信息存储在一个双层的内存Map中。

服务心跳:在服务注册后,Nacos Client会维护一个定时心跳来持续通知Nacos Server,说明服务一直处于可用状态,防止被剔除。默认5s发送一次心跳。

服务健康检查:Nacos Server会开启一个定时任务用来检查注册服务实例的健康情况,对于超过15s没有收到客户端心跳的实例会将它的healthy属性置为false(客户端服务发现时不会发现),如果某个实例超过30秒没有收到心跳,直接剔除该实例(被剔除的实例如果恢复发送心跳则会重新注册)

服务发现:服务消费者(Nacos Client)在调用服务提供者的服务时,会发送一个REST请求给Nacos Server,获取上面注册的服务清单,并且缓存在Nacos Client本地,同时会在Nacos Client本地开启一个定时任务定时拉取服务端最新的注册表信息更新到本地缓存

服务同步:Nacos Server集群之间会互相同步服务实例,用来保证服务信息的一致性。

二、Nacos 核心概念

Namespace:用于进行租户粒度的配置隔离。不同的命名空间下,可以存在相同的 Group 或 Data ID 的配置。Namespace 的常用场景之一是不同环境的配置的区分隔离,例如开发测试环境和生产环境的资源(如配置、服务)隔离等。如果不配置默认为:public

Group:服务分组,不同的服务可以归类到同一分组。

Service:服务提供的标识,通过该标识可以唯一确定其指代的服务。默认获取的${spring.application.name}作为服务名。

Cluster:同一个服务下的所有服务实例组成一个默认集群, 集群可以被进一步按需求划分,划分的单位可以是虚拟集群。

Instance:提供一个或多个服务的具有可访问网络地址(IP:Port)的进程

三、Nacos服务注册表结构:Map<namespace, Map<group::serviceName, Service>>

Nacos服务注册表源码中是多层map嵌套结构,对应图示如下图:

Nacos服务注册表结构图

四、Nacos1.4.1注册中心源码解析流程图

源码流程图链接如下(红色字体部分,下图看不清可以去链接查看看):
https://kdocs.cn/l/ceHmB9MF2TuD

Nacos1.4.1注册中心源码流程图

五、客户端源码解析

1、客户端服务注册源码解析

springboot启动过程中会进行自动配置流程(可参考我的这篇文章
https://www.toutiao.com/article/7169764929400996360/?log_from=93d2be341a552_1670061351848
),寻找nacos依赖包下的META-INF下的spring.factories文件,解析到
NacosServiceRegistryAutoConfiguration.java类,并生成NacosAutoServiceRegistration的bean对象,如下图、源代码所示:

nacos服务自动注入-自动配置

@Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);//nacos服务注测配置类 }

由于
NacosAutoServiceRegistration其父类继承AbstractAutoServiceRegistration,且父类实现了ApplicationListener,则会在spring容器初始化完成后执行父类的onApplicationEvent方法,然后执行bind方法,再调用start(),后调用NacosAutoServiceRegistration的register(),最后又调到父类的register方法,父类的register()如下:

protected void register() { //this.serviceRegistry就是NacosServiceRegistryAutoConfiguration.java类中的NacosServiceRegistry对象 //getRegistration()获得的就是NacosServiceRegistryAutoConfiguration.java类中的NacosRegistration对象 this.serviceRegistry.register(getRegistration()); }

调用NacosServiceRegistry对象的register方法进行NamingService注册实例
namingService.registerInstance(serviceId, group, instance),源码如下,如果是临时实例会添加心跳定时任务BeatTask,然后调nacos服务端的/instance/beat接口,最后一步发送发送HttpMethod.POST调服务端/instance接口,详细见流程图:

@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance);//校验实例对象是否存活状态 String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//得到group名 if (instance.isEphemeral()) { //如果是临时实例 BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);//创建心跳信息 beatReactor.addBeatInfo(groupedServiceName, beatInfo);//添加心跳信息到定时任务,最后执行心跳任务的run方法,调nacos服务端/instance/beat接口 } //注册实例给服务端,发送发送HttpMethod.POST调服务端/instance接口 serverProxy.registerService(groupedServiceName, groupName, instance); }

2、客户端服务发现源码解析

服务发现是通过NacosNamingService.java调用getAllInstances方法实现的,再调用HostReactor的getServiceInfo方法,从nacos服务端(调/instance/list接口)获取所有实例信息放到本地map缓存中,添加定时更新service的任务UpdateTask,定时更新service。流程可具体参考流程图示意,方法注释如下:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters);//根据是否集群获取key if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } //先从缓存中获取service信息,没有则先创建 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); //从nacos服务端获取所有的服务实例,并放到缓存中,调/instance/list接口,发送HttpMethod.GET请求 updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" serviceName ", clusters:" clusters, e); } } } } //添加定时更新service的任务UpdateTask,定时更新service,调/instance/list接口 scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); }

六、服务端源码解析

1、注册一个新实例

InstanceController的register方法用来注册实例信息,第一步创建服务端service对象并初始化,并且执行心跳检测定时任务(如果实例超过15s没有心跳则把实例状态置为false,并且发布实例状态更新事件;如果某个实例超过30s没有心跳,则调用服务端的/instance接口发送HttpMethod.DELETE请求;最后DistroTaskEngineHolder会执行定时任务处理service变更的信息,可以参考流程图,流程图及其详细,在这只做简单描述)

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //如果实例超过15s没有心跳则把实例状态置为false,并且发布实例状态更新事件;如果某个实例超过30s没有心跳,则调用服务端的/instance接口发送HttpMethod.DELETE请求 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " namespaceId ", service: " serviceName); } //调用addIpAddresses,从clusterMap中获取所有的实例信息;把获取到的实例信息放到队列当中,后续会由定时任务从队列当中取任务 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }

2、心跳接口

调用InstanceController的beat方法,从客户端集群表中获取实例信息,如果实例为空,则调注册方法registerInstance注册,把客户端心跳处理器ClientBeatProcessor放到定时任务中(最后会执行其run方法,发布事件),做了两件事,第一,更新客户端实例的最新心跳时间。第二、发布Service改变事件;也可以参考流程图。

@CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletrequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); RsInfo clientBeat = null; if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } String clusterName = WebUtils .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getcluster())) { clusterName = clientBeat.getCluster(); } else { clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); //从集群列表clusterMap中获取实例信息 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); if (instance == null) { if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); //注册实例 serviceManager.registerInstance(namespaceId, serviceName, instance); } //获取service信息 Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " serviceName "@" namespaceId); } if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } service.processClientBeat(clientBeat);//更新客户端实例的最新心跳时间。发布Service改变事件 result.put(CommonParams.CODE, NamingResponseCode.OK); if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; }

3、集群同步、实例状态同步、删除实例等模块源码分析参考流程图-流程图详细的(一步步画的)

查看全文
大家还看了
也许喜欢
更多游戏

Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved