Nacos源码—1.Nacos服务注册发现分析二
大纲
1.客户端如何发起服务注册 + 发送服务心跳
2.服务端如何处理客户端的服务注册请求
3.注册服务—如何实现高并发支撑上百万服务注册
4.内存注册表—如何处理注册表的高并发读写冲突
2.服务端如何处理客户端的服务注册请求
(1)客户端自动发送服务注册请求梳理
(2)Nacos服务端处理服务请求的代码入口
(3)Nacos服务端处理服务注册请求的源码分析
(4)服务端接收到服务实例注册请求后的处理总结
(1)客户端自动发送服务注册请求梳理
首先,从spring-cloud-starter-alibaba-nacos-discovery中,发现在spring.factories文件定义了很多Configuration配置类,其中就包括了NacosServiceRegistryAutoConfiguration配置类。这个配置类会创建三个Bean对象,其中有个Bean对象便实现了一个监听事件方法。
然后,Spring容器启动时,会发布一个事件。这个事件会被名为NacosAutoServiceRegistration的Bean对象监听到,从而自动发起Nacos服务注册。在注册时会开启心跳健康延时任务,每隔5s执行一次。不管是服务注册还是心跳检查,都是通过HTTP方式调用Nacos服务端。
客户端向服务端发起服务注册请求是通过HTTP接口"/nacos/v1/ns/instance"来实现的,客户端向服务端发起心跳请求是通过HTTP接口"/nacos/v1/ns/instance/beat"来实现的。
(2)Nacos服务端处理服务注册请求的代码入口
Nacos服务端有一个叫nacos-naming的模块,这个nacos-naming模块其实就是一个Spring Boot项目,模块中的controllers包则是用来处理服务相关的HTTP请求。
由于服务端处理服务注册请求的地址是"/nacos/v1/ns/instance",所以对服务实例进行处理的入口是controllers包下的InstanceController。InstanceController的代码很好地遵守了Restful风格,其中的regsiter()方法注册新服务实例对应@PostMapping、deregister()方法注销服务实例对应@DeleteMapping、update()方法修改服务实例对应@PutMapping。虽然都可以使用@PostMapping,但Nacos就严格按照了Restful标准。
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {...//Register new instance.@CanDistro@PostMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String register(HttpServletRequest request) throws Exception {...}//Deregister instances.@CanDistro@DeleteMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String deregister(HttpServletRequest request) throws Exception {...}//Update instance.@CanDistro@PutMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String update(HttpServletRequest request) throws Exception {...}...
}public class UtilsAndCommons {// ********************** Nacos HTTP Context ************************ \\public static final String NACOS_SERVER_CONTEXT = "/nacos";public static final String NACOS_SERVER_VERSION = "/v1";public static final String DEFAULT_NACOS_NAMING_CONTEXT = NACOS_SERVER_VERSION + "/ns";public static final String NACOS_NAMING_CONTEXT = DEFAULT_NACOS_NAMING_CONTEXT;...
}
(3)Nacos服务端处理服务注册请求的源码分析
对于Nacos客户端的服务实例注册请求,会由InstanceController的register()方法进行处理。该方法首先会从请求参数中获取Instance服务实例,然后调用ServiceManager的registerInstance()方法来进行服务实例注册。ServiceManager是Nacos的服务管理者,拥有所有的服务列表,可以通过它来管理所有服务的注册、销毁、修改等。
在ServiceManager的registerInstance()方法中:首先会通过调用ServiceManager的createEmptyService()方法创建一个空服务,然后通过ServiceManager的addInstance()方法添加注册请求中的服务实例。
在ServiceManager的addInstance()方法中:首先构建出要注册的服务实例对应的服务的key,然后使用synchronized锁住要注册的服务实例对应的服务,接着获取要注册的服务实例对应的服务的最新服务实例列表,最后执行DelegateConsistencyServiceImpl的put()方法更新服务实例列表。
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {@Autowiredprivate ServiceManager serviceManager;...//Register new instance.@CanDistro@PostMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String register(HttpServletRequest request) throws Exception {//从request中获取命名空间、服务名称final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);//从request中获取Instance服务实例final Instance instance = parseInstance(request);//调用ServiceManager的注册实例方法serviceManager.registerInstance(namespaceId, serviceName, instance);return "ok";}...
}//服务管理者,拥有所有的服务列表,用于管理所有服务的注册、销毁、修改等
@Component
public class ServiceManager implements RecordListener<Service> {//注册表,Map(namespace, Map(group::serviceName, Service)).private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();@Resource(name = "consistencyDelegate")private ConsistencyService consistencyService;private final Object putServiceLock = new Object();...//Register an instance to a service in AP mode.//This method creates service or cluster silently if they don't exist.public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//1.创建一个空的服务createEmptyService(namespaceId, serviceName, instance.isEphemeral());//2.根据命名空间ID、服务名获取一个服务,如果获取结果为null则抛异常Service service = getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName);}//3.添加服务实例addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);}...//1.创建一个空服务public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {createServiceIfAbsent(namespaceId, serviceName, local, null);}//Create service if not exist.public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {Service service = getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));//now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}}private void putServiceAndInit(Service service) throws NacosException {//把Service放入注册表serviceMap中putService(service);service.init();//把Service作为监听器添加到consistencyService的listeners中consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());}//Put service into manager.public void putService(Service service) {if (!serviceMap.containsKey(service.getNamespaceId())) {synchronized (putServiceLock) {if (!serviceMap.containsKey(service.getNamespaceId())) {serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());}}}serviceMap.get(service.getNamespaceId()).put(service.getName(), service);}public void addOrReplaceService(Service service) throws NacosException {consistencyService.put(KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()), service);}...//2.根据命名空间ID、服务名获取一个服务public Service getService(String namespaceId, String serviceName) {if (serviceMap.get(namespaceId) == null) {return null;}return chooseServiceMap(namespaceId).get(serviceName);}public Map<String, Service> chooseServiceMap(String namespaceId) {return serviceMap.get(namespaceId);}...//3.添加服务实例public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {//构建要注册的服务实例对应的服务的keyString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//根据命名空间以及服务名获取要注册的服务实例对应的服务Service service = getService(namespaceId, serviceName);//使用synchronized锁住要注册的服务实例对应的服务synchronized (service) {//由于一个服务可能存在多个服务实例,所以需要根据当前注册请求的服务实例ips,获取对应服务的最新服务实例列表List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);//Instances实现了用于在Nacos集群进行网络传输的Record接口Instances instances = new Instances();instances.setInstanceList(instanceList);//执行DelegateConsistencyServiceImpl的put()方法consistencyService.put(key, instances);}}private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {//更新对应服务的服务实例列表return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);}//Compare and get new instance list.public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {//先获取已经注册到Nacos的、当前要注册的服务实例对应的服务的、所有服务实例Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));List<Instance> currentIPs = service.allIPs(ephemeral);Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());Set<String> currentInstanceIds = Sets.newHashSet();for (Instance instance : currentIPs) {//把instance实例的IP当作key,instance实例当作value,放入currentInstancescurrentInstances.put(instance.toIpAddr(), instance);//把实例唯一编码添加到currentInstanceIds中currentInstanceIds.add(instance.getInstanceId());}//用来存放当前要注册的服务实例对应的服务的、所有服务实例Map<String, Instance> instanceMap;if (datum != null && null != datum.value) {instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);} else {instanceMap = new HashMap<>(ips.length);}for (Instance instance : ips) {if (!service.getClusterMap().containsKey(instance.getClusterName())) {Cluster cluster = new Cluster(instance.getClusterName(), service);cluster.init();service.getClusterMap().put(instance.getClusterName(), cluster);Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());}if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {instanceMap.remove(instance.getDatumKey());} else {Instance oldInstance = instanceMap.get(instance.getDatumKey());if (oldInstance != null) {instance.setInstanceId(oldInstance.getInstanceId());} else {instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));}//instanceMap的key与IP和端口有关instanceMap.put(instance.getDatumKey(), instance);}}if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));}//最后instanceMap里肯定会包含新注册的Instance实例//并且如果不是第一次注册,里面还会包含之前注册的Instance实例信息return new ArrayList<>(instanceMap.values());}...
}//Package of instance list.
public class Instances implements Record {private List<Instance> instanceList = new ArrayList<>();...
} public class KeyBuilder {public static final String INSTANCE_LIST_KEY_PREFIX = "com.alibaba.nacos.naming.iplist.";private static final String EPHEMERAL_KEY_PREFIX = "ephemeral.";public static final String NAMESPACE_KEY_CONNECTOR = "##";...public static String buildInstanceListKey(String namespaceId, String serviceName, boolean ephemeral) {return ephemeral ? buildEphemeralInstanceListKey(namespaceId, serviceName) : buildPersistentInstanceListKey(namespaceId, serviceName);}//返回的key形如:"com.alibaba.nacos.naming.iplist.ephemeral." + namespaceId + " + "##" + serviceNameprivate static String buildEphemeralInstanceListKey(String namespaceId, String serviceName) {return INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX + namespaceId + NAMESPACE_KEY_CONNECTOR + serviceName;}public static boolean matchEphemeralKey(String key) {//currently only instance list has ephemeral type:return matchEphemeralInstanceListKey(key);}public static boolean matchEphemeralInstanceListKey(String key) {//判定key是否是以这样的字符串开头:"com.alibaba.nacos.naming.iplist.ephemeral."return key.startsWith(INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX);}...
}
DelegateConsistencyServiceImpl的put()方法更新服务实例列表存储时:首先会根据表示服务的key来选择不同的ConsistencyService。如果是临时服务实例,则调用DistroConsistencyServiceImpl的put()方法。如果是持久化服务实例,则调用PersistentConsistencyServiceDelegateImpl的put()方法。
在DistroConsistencyServiceImpl的put()方法中:首先会调用DistroConsistencyServiceImpl的onPut()方法,把包含当前注册的服务实例的、最新服务实例列表存储到DataStore中,然后调用DistroProtocol的sync()方法进行集群节点间的服务实例数据同步,其中DataStore用于存储所有已注册的服务实例数据。
而在DistroConsistencyServiceImpl的onPut()方法中:会先创建Datum对象,注入服务key和服务的所有服务实例Instances,然后才将Datum对象添加到DataStore的Map对象里。最后调用Notifier的addTask()方法添加一个数据变更的任务,也就是把key、action封装成Pair对象,放入一个Notifier的阻塞队列中。
注意:在DistroConsistencyServiceImpl初始化完成后,会提交一个进行无限for循环的任务给一个单线程的线程池来执行。无限for循环中会不断从阻塞队列中获取Pair对象进行处理。而在进行服务实例注册时,会往该任务的阻塞队列添加Pair对象。
//Consistency delegate.
@DependsOn("ProtocolManager")
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;private final EphemeralConsistencyService ephemeralConsistencyService;...@Overridepublic void put(String key, Record value) throws NacosException {//如果是临时实例,则调用DistroConsistencyServiceImpl.put()方法//如果是持久化实例,则调用PersistentConsistencyServiceDelegateImpl.put()方法mapConsistencyService(key).put(key, value);}private ConsistencyService mapConsistencyService(String key) {//根据不同的key选择不同的ConsistencyServicereturn KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;}...
}@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {private final GlobalConfig globalConfig;private final DistroProtocol distroProtocol;private final DataStore dataStore;//用于存储所有已注册的服务实例数据private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();private volatile Notifier notifier = new Notifier();...@PostConstructpublic void init() {//初始化完成后,会将notifier任务提交给GlobalExecutor来执行GlobalExecutor.submitDistroNotifyTask(notifier);}@Overridepublic void put(String key, Record value) throws NacosException {//把包含了当前注册的服务实例的、最新的服务实例列表,存储到DataStore对象中onPut(key, value);//在集群架构下,DistroProtocol.sync()方法会进行集群节点的服务实例数据同步distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}public void onPut(String key, Record value) {if (KeyBuilder.matchEphemeralInstanceListKey(key)) {//创建Datum对象,把服务key和服务的所有服务实例Instances放入Datum对象中Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();//添加到DataStore的Map对象里dataStore.put(key, datum);} if (!listeners.containsKey(key)) {return;}//添加处理任务notifier.addTask(key, DataOperation.CHANGE);}...public class Notifier implements Runnable {private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);//Add new notify task to queue.public void addTask(String datumKey, DataOperation action) {if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {return;}if (action == DataOperation.CHANGE) {services.put(datumKey, StringUtils.EMPTY);}//tasks是一个阻塞队列,把key、action封装成Pair对象,放入队列中tasks.offer(Pair.with(datumKey, action));}public int getTaskSize() {return tasks.size();}@Overridepublic void run() {Loggers.DISTRO.info("distro notifier started");for (; ; ) {try {Pair<String, DataOperation> pair = tasks.take();handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}private void handle(Pair<String, DataOperation> pair) {try {String datumKey = pair.getValue0();DataOperation action = pair.getValue1();services.remove(datumKey);int count = 0;if (!listeners.containsKey(datumKey)) {return;}for (RecordListener listener : listeners.get(datumKey)) {count++;try {if (action == DataOperation.CHANGE) {listener.onChange(datumKey, dataStore.get(datumKey).value);continue;}if (action == DataOperation.DELETE) {listener.onDelete(datumKey);continue;}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);}}if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name());}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}
}//用于存储所有已注册的服务实例数据
@Component
public class DataStore {private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);public void put(String key, Datum value) {dataMap.put(key, value);}...
}
(4)服务端接收到服务实例注册请求后的处理总结
register()注册方法会先从Request对象中获取从客户端传过来的参数,然后在addInstance()方法中会创建一个可以表示服务的key,接着调用DelegateConsistencyServiceImpl的put()方法,根据这个key可以选择具体的ConsistencyService实现类。
在这个put()方法中,通过key选择的是EphemeralConsistencyService,所以会调用DistroConsistencyServiceImpl的put()方法处理服务实例列表。
在DistroConsistencyServiceImpl的put()方法中又调用了onPut()方法,即把key、Instances封装成Datum对象,放入到DataStore的Map里。最后调用addTask()方法,将本次服务实例数据的变更包装成Pair对象,然后放入到一个阻塞队列里,由一个执行无限for循环的线程处理队列。
3.注册服务—如何实现高并发支撑上百万服务注册
(1)服务端处理客户端的服务注册请求梳理
(2)Nacos的异步任务设计思想
(3)异步任务和内存队列源码分析
(1)服务端处理客户端的服务注册请求梳理
Nacos客户端自动注册服务实例时,会通过HTTP的方式,请求"/nacos/v1/ns/instance"地址来调用Nacos服务端的实例注册接口。通过该地址可以找到Nacos服务端naming模块的InstanceController类。在这个类中有个register()方法,它就是服务端处理服务注册请求的入口。在这个register()方法的最后,会调用Notifier的addTask()方法,也就是把key、action包装成Pair对象,放入到一个BlockingQueue里。至此,InstanceController类中register()方法的注册逻辑就执行完了。
(2)Nacos的异步任务设计思想
一.Nacos服务实例注册的压测性能
二.Nacos服务端添加和处理异步任务的流程
三.Nacos采用异步任务来处理服务注册的好处—支撑高并发
一.Nacos服务实例注册的压测性能
参考服务发现性能测试报告。通过对3节点的集群进行服务发现性能压测,可得到接口性能负载和容量。压测容量服务数可达60W,实例注册数达110W,集群运行持续稳定。注册/查询实例TPS达到13000以上,接口达到预期。
二.Nacos服务端添加和处理异步任务的流程
首先客户端发起服务实例注册,服务端把接收的参数包装成一个Pair对象,最后放入到一个BlookingQueue里。这时对服务实例注册接口的处理已结束,服务端返回客户端响应消息了。
然后Nacos服务端会在后台开启一个单线程异步任务,这个任务会不断地获取BlookingQueue队列中的Pair对象。从这个队列获取出Pair对象后,会把信息写入注册表,从而完成服务注册。
三.Nacos采用异步任务来处理服务注册的好处—支撑高并发
好处一:接口响应时效更快
其实Nacos服务端处理服务实例注册的接口,并没有执行真正注册的动作。只是把信息包装好,放入到队列中,接口就结束返回响应给客户端了。由于代码逻辑非常简单,所以响应时效会更快。
好处二:保证服务稳定性
哪怕同时有1千个、1万个客户端同时发起实例注册请求接口,最后只是把服务实例注册任务放入到一个阻塞队列中。这就相当于使用消息队列进行流量削峰一样,后续复杂的处理逻辑,由消费者慢慢处理,异步任务就相当于消费者。
好处三:解决写时并发冲突
Nacos服务端,只有一个单线程在处理队列中的任务。也就是把阻塞队列中的服务实例注册信息,同步到Nacos的注册表中。既然是单线程进行写操作,所以就不用考虑多线程并发写的问题。虽然只会有一个线程在进行写,但是可能会有其他线程在进行读。所以会存在读写并发冲突,此时Nacos会使用写时复制策略来处理。
(3)异步任务和内存队列源码分析
一.异步任务的初始化和处理流程
二.关于无限for循环的问题
一.异步任务的初始化和处理流程
在创建DistroConsistencyServiceImpl类实例时,会直接创建一个实现了Runnable接口的Notifier类实例。
在DistroConsistencyServiceImpl类中有个init()方法。由于这个init()方法上加了@PostConstruct注解,所以在Spring创建这个类实例时会自动调用这个init()方法。init()方法会提交这个实现了Runnable接口的Notifier任务给线程池运行。
而在Notifier类的run()方法中,会通过无限for循环不断从tasks阻塞队列中获取任务来进行处理。获取出任务后,如果判断出action类型为CHANGE类型,则先把Instances对象从DataStore类中取出来,再调用listener的onChange()方法来将服务实例信息写入到注册表中。
二.关于无限for循环的问题
无限循环是否合理、是否会占用CPU资源、如果异常是否会导致循环结束?
因为Nacos服务端要一直处理Nacos客户端所发起的服务实例注册请求,而Nacos服务端它是不知道到底有多少个客户端需要进行服务注册的,所以只能写一个无限for循环一直不断重复地去执行。
既然是无限循环,就要考虑是否占用CPU资源的问题。tasks是一个阻塞队列BlockingQueue:第一.阻塞队列的特点就是不会占用CPU的资源,第二.tasks的take()方法会一直阻塞直到取得元素或当前线程中断。
在处理过程中,如果抛出未知异常,会直接被for循环中的try catch掉,继续循环处理下一个任务。
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {private final GlobalConfig globalConfig;private final DistroProtocol distroProtocol;private final DataStore dataStore;//用于存储所有已注册的服务实例数据private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();private volatile Notifier notifier = new Notifier();...@PostConstructpublic void init() {//初始化完成后,会将notifier任务提交给GlobalExecutor来执行GlobalExecutor.submitDistroNotifyTask(notifier);}...public class Notifier implements Runnable {private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);//Add new notify task to queue.public void addTask(String datumKey, DataOperation action) {if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {return;}if (action == DataOperation.CHANGE) {services.put(datumKey, StringUtils.EMPTY);}//tasks是一个阻塞队列,把key、action封装成Pair对象,放入队列中tasks.offer(Pair.with(datumKey, action));}@Overridepublic void run() {Loggers.DISTRO.info("distro notifier started");//无限循环for (; ;) {try {//从阻塞队列中获取任务Pair<String, DataOperation> pair = tasks.take();//处理任务handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}private void handle(Pair<String, DataOperation> pair) {try {//把在DistroConsistencyServiceImpl.onPut()方法创建的key和action取出来String datumKey = pair.getValue0();DataOperation action = pair.getValue1();services.remove(datumKey);int count = 0;if (!listeners.containsKey(datumKey)) {return;}for (RecordListener listener : listeners.get(datumKey)) {count++;try {if (action == DataOperation.CHANGE) {//把Instances信息写到注册表里去listener.onChange(datumKey, dataStore.get(datumKey).value);continue;}if (action == DataOperation.DELETE) {listener.onDelete(datumKey);continue;}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);}}if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name());}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}...@Overridepublic void put(String key, Record value) throws NacosException {//把包含了当前注册的服务实例的、最新的服务实例列表,存储到DataStore对象中onPut(key, value);//在集群架构下,DistroProtocol.sync()方法会进行集群节点的服务实例数据同步distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}public void onPut(String key, Record value) {if (KeyBuilder.matchEphemeralInstanceListKey(key)) {//创建Datum对象,把服务key和服务的所有服务实例Instances放入Datum对象中Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();//添加到DataStore的Map对象里dataStore.put(key, datum);} if (!listeners.containsKey(key)) {return;}//添加处理任务notifier.addTask(key, DataOperation.CHANGE);}
}public class GlobalExecutor {private static final ScheduledExecutorService DISTRO_NOTIFY_EXECUTOR = ExecutorFactory.Managed.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),new NameThreadFactory("com.alibaba.nacos.naming.distro.notifier"));...public static void submitDistroNotifyTask(Runnable runnable) {//向线程池提交任务,让线程池执行任务DISTRO_NOTIFY_EXECUTOR.submit(runnable);}...
}public class NameThreadFactory implements ThreadFactory {private final AtomicInteger id = new AtomicInteger(0);private String name; public NameThreadFactory(String name) {if (!name.endsWith(StringUtils.DOT)) {name += StringUtils.DOT;}this.name = name;}@Overridepublic Thread newThread(Runnable r) {String threadName = name + id.getAndDecrement();Thread thread = new Thread(r, threadName);thread.setDaemon(true);return thread;}
}public final class ExecutorFactory {...public static final class Managed {private static final String DEFAULT_NAMESPACE = "nacos";private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance();...//Create a new single scheduled executor service with input thread factory and register to manager.public static ScheduledExecutorService newSingleScheduledExecutorService(final String group, final ThreadFactory threadFactory) {ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory);//注册到ThreadPoolManager可以方便管理ScheduledExecutorService,比如注销、销毁THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);return executorService;}...}...
}public final class ThreadPoolManager {private Map<String, Map<String, Set<ExecutorService>>> resourcesManager;private Map<String, Object> lockers = new ConcurrentHashMap<String, Object>(8);...//Register the thread pool resources with the resource manager.public void register(String namespace, String group, ExecutorService executor) {if (!resourcesManager.containsKey(namespace)) {synchronized (this) {lockers.put(namespace, new Object());}}final Object monitor = lockers.get(namespace);synchronized (monitor) {Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace);if (map == null) {map = new HashMap<String, Set<ExecutorService>>(8);map.put(group, new HashSet<ExecutorService>());map.get(group).add(executor);resourcesManager.put(namespace, map);return;}if (!map.containsKey(group)) {map.put(group, new HashSet<ExecutorService>());}map.get(group).add(executor);}}//Cancel the uniform lifecycle management for all threads under this resource.public void deregister(String namespace, String group) {if (resourcesManager.containsKey(namespace)) {final Object monitor = lockers.get(namespace);synchronized (monitor) {resourcesManager.get(namespace).remove(group);}}}...
}
总结:异步任务是提升性能的一种方式。很多开源框架为了提升自身处理性能,都会采利用异步任务 + 内存队列。
4.内存注册表—如何处理注册表的高并发读写冲突
(1)服务实例注册的客户端源码和服务端源码梳理
(2)Nacos注册表结构
(3)写时复制机制介绍
(4)Nacos服务注册写入注册表源码分析
(1)服务实例注册的客户端源码和服务端源码梳理
一.客户端发起服务注册的源码梳理
订单服务、库存服务的项目引入nacos-discovery服务注册中心依赖后,当项目启动时,就会扫描到依赖中的spring.factories文件,然后去创建spring.factories文件中定义的配置类。
在spring.factories文件中:有一个名为NacosServiceRegistryAutoConfiguration配置类,在这个配置类定义了三个Bean对象:NacosServiceRegistry、NacosRegistration和NacosAutoServiceRegistration。
NacosAutoServiceRegistration类的父类实现了ApplicationListener接口,也就是实现了onApplicationEvent()这个监听事件方法。当Spring容器启动时,会发布WebServerInitializedEvent监听事件,从而被Nacos客户端即NacosAutoServiceRegistration的监听方法监听到。
这个监听事件方法会调用NacosServiceRegistry类中的register()方法,register()方法又会调用Nacos服务端实例注册的HTTP接口完成服务注册。
在发起服务实例注册接口的调用前,客户端还会开启一个BeatTask任务,这个BeatTask任务会每隔5秒向Nacos服务端发送心跳检查请求。
二.服务端处理服务注册的源码梳理
Nacos服务端处理服务注册的HTTP接口是:/nacos/v1/ns/instance。由于Nacos服务端也是个Spring Boot项目,所以通过架构图找到Nacos源码的naming模块,然后就可以通过请求地址定位到InstanceController类。
在InstanceController类中会有对应HTTP接口的register()方法,该方法最终会把客户端的实例对象包装成Datum对象放入DataStore类中,然后再包装一个Pair对象,放入Notifier的tasks内存阻塞队列。
DistroConsistencyServiceImpl中有个@PostConstruct修饰的init()方法。在该类被实例化后,这个init()方法会把一个Notifier任务提交给一个线程池执行。
Notifier的run()方法,首先会不断循环从tasks阻塞队列中获取Pair对象,然后调用Notifier的handle()方法把Instances对象从DataStore类中取出来,接着调用listener.onChange()方法把服务实例数据写入到注册表中。
(2)Nacos注册表结构
一.Nacos注册表的使用
在ServiceManager类中有一个serviceMap属性,它就是Nacos的内存注册表,Nacos注册表就是用来存放微服务实例注册信息的地方。客户端在调用其他微服务时,会先调用Nacos查询实例列表接口,查询当前可用服务,从而发起微服务调用。
//Core manager storing all services in Nacos.@Component
public class ServiceManager implements RecordListener<Service> {//注册表,Map(namespace, Map(group::serviceName, Service)). private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();...public Service getService(String namespaceId, String serviceName) {if (serviceMap.get(namespaceId) == null) {return null;}return chooseServiceMap(namespaceId).get(serviceName);}public Map<String, Service> chooseServiceMap(String namespaceId) {return serviceMap.get(namespaceId);}...
}@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {private Map<String, Cluster> clusterMap = new HashMap<>();...
}public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {//持久化实例列表@JsonIgnoreprivate Set<Instance> persistentInstances = new HashSet<>();//临时实例列表@JsonIgnoreprivate Set<Instance> ephemeralInstances = new HashSet<>();...
}
二.Nacos注册表的结构分析
ServiceManager的serviceMap属性,即注册表结构由两层Map组合而成。也就是:Map(namespace, Map(group::serviceName, Service))。
Nacos支持对服务进行分类,最上层是一个命名空间Namespace。命名空间Namespace默认是public,也可以自定义为dev、test等。
在public命名空间下,可以包含不同的分组Group。比如定义两个分组Group:DEFAULT_GROUP_1、DEFAULT_GROUP_2。这样命名空间Namespace和分组Group就对应注册表最外层的两个Map。
在ServiceManager.serviceMap的内层Map中,其value是个Service对象。在Service类中,有一个clusterMap属性。clusterMap的key是对应的集群名字,如北京集群、广州集群等。clusterMap的value是个Cluster对象,用来存放某集群下的所有实例对象。
在Cluster类中,存在两个不同实例类型的Set集合,这两个集合就会存储具体的Instance实例对象,Instance实例对象里会包含实例的IP、Port等信息。
三.Nacos注册表的设计原因
之所以Nacos要这么设计注册表,那是为了灵活应对不同的使用场景。如果项目简单,测试、预发、生产不同环境都使用同一个Nacos服务端,那么可以通过命名空间来区分。
如果项目复杂,不同环境使用不同的Nacos服务端,那么可以通过命名空间来区分不同的模块。而订单模块下可以细分很多微服务,然后通过分组来区分不同的环境。包括在Service对象里,同一个服务也可能在多个地区都有部署。比如北京服务器部署2台、广州服务器部署2台等。
(3)写时复制机制介绍
Nacos服务端把新注册的实例写入到注册表中,用的就是写时复制机制。写时复制机制,能够很好地避免读写并发冲突。
写时复制:Copy On Write。在数据写入到某存储位置时,首先将原有内容拷贝出来,写到另一处地方,然后再将原来的引用地址修改成新对象的地址。
下面展示了一个并发冲突的例子:
public static void main(String[] args) {//假设objectSet是用来存放实例信息Set<Object> objectSet = new HashSet<>();//模拟异步任务,写入数据new Thread(new Runnable() {@Overridepublic void run() {try {//先睡眠一下,否则还没开始读,就已经写完了Thread.sleep(100L);} catch (InterruptedException e) {e.printStackTrace();}//写入10w条数据for (int i = 0; i < 100000; i++) {objectSet.add(i);}}}).start();//死循环一直读取数据,模拟高并发场景for (; ;) {for (Object o : objectSet) {System.out.println(o);}}
}
运行上面的代码就会抛出如下异常信息:
Exception in thread "main" java.util.ConcurrentModificationException
意思是在对集合迭代、读取时,如果同时对其进行修改,就会抛出ConcurrentModificationException异常。
这时候就可以采用写时复制来避免这个问题。先创建一个复制对象,把原来的数据复制一份到该复制对象上。然后在复制对象上进行新增、修改的操作,这时是不会影响原来数据的。等到在复制对象上进行的操作完成之后,再把原来对象的引用地址直接修改为复制对象的引用。
(4)Nacos服务注册写入注册表源码分析
在执行Notifier的handle()方法时,核心的代码是:
//把Instances信息写到注册表里去
listener.onChange(datumKey, dataStore.get(datumKey).value);
dataStore.get(datumKey).value就是从DataStore中获取Instances对象。listener.onChange()其实就是调用Service的onChange()方法更新注册表。
因为在注册某个服务的第一个实例时,创建的服务Service会作为Listener添加到ConsistencyService的listeners,并且已经将新创建的服务Service放入到了ServiceManager的注册表中了。所以线程池执行Notifier的handle()方法时,就能遍历所有Service进行更新。
其实注册表serviceMap只是存放了Service对象的引用,而ConsistencyService的listeners也存放了Service对象的引用。当遍历ConsistencyService的listeners,执行Service.onChange()方法时,更新的就是JVM在堆内存中的Service实例对象,也就更新了注册表。因为注册表是一个Map,最终都是引用到对内存中的Service实例对象。
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {private final DataStore dataStore;private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();...public class Notifier implements Runnable {private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);...@Overridepublic void run() {Loggers.DISTRO.info("distro notifier started");//无限循环for (; ;) {try {//从阻塞队列中获取任务Pair<String, DataOperation> pair = tasks.take();//处理任务handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}private void handle(Pair<String, DataOperation> pair) {try {//把在DistroConsistencyServiceImpl.onPut()方法创建的key和action取出来String datumKey = pair.getValue0();DataOperation action = pair.getValue1();services.remove(datumKey);int count = 0;if (!listeners.containsKey(datumKey)) {return;}for (RecordListener listener : listeners.get(datumKey)) {count++;try {if (action == DataOperation.CHANGE) {//把Instances信息写到注册表里去listener.onChange(datumKey, dataStore.get(datumKey).value);continue;}if (action == DataOperation.DELETE) {listener.onDelete(datumKey);continue;}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);}}if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name());}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}...@Overridepublic void put(String key, Record value) throws NacosException {//把包含了当前注册的服务实例的、最新的服务实例列表,存储到DataStore对象中onPut(key, value);//在集群架构下,DistroProtocol.sync()方法会进行集群节点的服务实例数据同步distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}//Put a new record.public void onPut(String key, Record value) {if (KeyBuilder.matchEphemeralInstanceListKey(key)) {//创建Datum对象,把服务key和服务的所有服务实例Instances放入Datum对象中Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();//添加到DataStore的Map对象里dataStore.put(key, datum);}if (!listeners.containsKey(key)) {return;}//添加处理任务notifier.addTask(key, DataOperation.CHANGE);}...
}//Store of data.
@Component
public class DataStore {private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);...public Datum get(String key) {return dataMap.get(key);}...
}public class Datum<T extends Record> implements Serializable {public String key;public T value;...
}//Package of instance list.
public class Instances implements Record {private List<Instance> instanceList = new ArrayList<>();...
}//服务管理者,拥有所有的服务列表,用于管理所有服务的注册、销毁、修改等
@Component
public class ServiceManager implements RecordListener<Service> {//注册表,Map(namespace, Map(group::serviceName, Service)).private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();@Resource(name = "consistencyDelegate")private ConsistencyService consistencyService;...//Register an instance to a service in AP mode.//This method creates service or cluster silently if they don't exist.public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//1.创建一个空的服务createEmptyService(namespaceId, serviceName, instance.isEphemeral());//2.根据命名空间ID、服务名获取一个服务,如果获取结果为null则抛异常Service service = getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName);}//3.添加服务实例addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);}...//1.创建一个空服务public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {createServiceIfAbsent(namespaceId, serviceName, local, null);}//Create service if not exist.public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {Service service = getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));//now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}}private void putServiceAndInit(Service service) throws NacosException {//把Service放入注册表serviceMap中putService(service);service.init();//把Service作为监听器添加到consistencyService的listeners中consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());}//Put service into manager.public void putService(Service service) {if (!serviceMap.containsKey(service.getNamespaceId())) {synchronized (putServiceLock) {if (!serviceMap.containsKey(service.getNamespaceId())) {serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());}}}serviceMap.get(service.getNamespaceId()).put(service.getName(), service);}...
}
其中从DataStore中获取出来的Instances对象的来源如下:
//服务管理者,拥有所有的服务列表,用于管理所有服务的注册、销毁、修改等
@Component
public class ServiceManager implements RecordListener<Service> {//Map(namespace, Map(group::serviceName, Service)).private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();@Resource(name = "consistencyDelegate")private ConsistencyService consistencyService;...//添加服务实例public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {//构建要注册的服务实例对应的服务的keyString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//根据命名空间以及服务名获取要注册的服务实例对应的服务Service service = getService(namespaceId, serviceName);//使用synchronized锁住要注册的服务实例对应的服务synchronized (service) {//由于一个服务可能存在多个服务实例,所以需要根据当前注册请求的服务实例ips,获取对应服务的最新服务实例列表List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);//Instances实现了用于在Nacos集群进行网络传输的Record接口Instances instances = new Instances();instances.setInstanceList(instanceList);//执行DelegateConsistencyServiceImpl的put()方法consistencyService.put(key, instances);}}private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {//更新对应服务的服务实例列表return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);}//Compare and get new instance list.public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {//先获取已经注册到Nacos的、当前要注册的服务实例对应的服务的、所有服务实例Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));List<Instance> currentIPs = service.allIPs(ephemeral);Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());Set<String> currentInstanceIds = Sets.newHashSet();for (Instance instance : currentIPs) {//把instance实例的IP当作key,instance实例当作value,放入currentInstancescurrentInstances.put(instance.toIpAddr(), instance);//把实例唯一编码添加到currentInstanceIds中currentInstanceIds.add(instance.getInstanceId());}//用来存放当前要注册的服务实例对应的服务的、所有服务实例Map<String, Instance> instanceMap;if (datum != null && null != datum.value) {instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);} else {instanceMap = new HashMap<>(ips.length);}for (Instance instance : ips) {if (!service.getClusterMap().containsKey(instance.getClusterName())) {Cluster cluster = new Cluster(instance.getClusterName(), service);cluster.init();service.getClusterMap().put(instance.getClusterName(), cluster);Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());}if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {instanceMap.remove(instance.getDatumKey());} else {Instance oldInstance = instanceMap.get(instance.getDatumKey());if (oldInstance != null) {instance.setInstanceId(oldInstance.getInstanceId());} else {instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));}//instanceMap的key与IP和端口有关instanceMap.put(instance.getDatumKey(), instance);}}if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));}//最后instanceMap里肯定会包含新注册的Instance实例//并且如果不是第一次注册,里面还会包含之前注册的Instance实例信息return new ArrayList<>(instanceMap.values());}...
}//Package of instance list.
public class Instances implements Record {private List<Instance> instanceList = new ArrayList<>();...
}
接下来是Service的onChange()方法的详情:
Service的onChange()方法需要传入两个参数:参数一是key,这个key是由KeyBuilder的buildInstanceListKey()代码创建出来的。参数二是Instances,里面有个InstanceList属性,可以存放多个Instance实例对象。实际上Instances参数可能会包含之前多个已经注册的Instance实例信息,并且一定会包含当前新注册的Instance实例信息。
Service的onChange()方法,最后会调用Service的updateIPs()方法。Service的updateIPs()方法又会调用Cluster的updateIps()方法,会把新注册的Instance更新到Cluster对象实例中。
在Cluster的updateIps()方法中,便会通过写时复制机制来更新实例Set。如果不用写时复制,那么就会并发读写同一个Set对象。如果使用写时复制,那么同一时间的读和写都是不同的Set对象。即使用新对象替换旧对象那一刻还有线程没迭代读完旧对象,也不影响。因为没有迭代读完旧对象的线程继续进行迭代读,替换的只是对象引用。ephemeralInstances变量只是引用了Set对象的地址而已。这里说的替换,只是让ephemeralInstances变量引用另外Set对象的地址。
//Service of Nacos server side
//We introduce a 'service --> cluster --> instance' model,
//in which service stores a list of clusters, which contain a list of instances.
//his class inherits from Service in API module and stores some fields that do not have to expose to client.
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {private Map<String, Cluster> clusterMap = new HashMap<>();...@Overridepublic void onChange(String key, Instances value) throws Exception {Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);for (Instance instance : value.getInstanceList()) {if (instance == null) {//Reject this abnormal instance list:throw new RuntimeException("got null instance " + key);}if (instance.getWeight() > 10000.0D) {instance.setWeight(10000.0D);}if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {instance.setWeight(0.01D);}}updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));recalculateChecksum();}//Update instances. 这里的instances里就包含了新注册的实例对象public void updateIPs(Collection<Instance> instances, boolean ephemeral) {//clusterMap表示的是该服务的集群Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList<>());}//遍历全部实例对象:包括已经注册过的实例对象 和 新注册的实例对象//这里的作用就是对相同集群下的instance进行分类for (Instance instance : instances) {try {if (instance == null) {Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}//判定客户端传过来的instance实例中,是否设置了ClusterNameif (StringUtils.isEmpty(instance.getClusterName())) {//如果否,就设置instance实例的ClusterName为DEFAULTinstance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}//判断之前是否存在对应的CLusterName,如果没有则需要创建新的Cluster对象if (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());//创建新的Cluster集群对象Cluster cluster = new Cluster(instance.getClusterName(), this);cluster.init();//将新创建的Cluster对象放入到集群clusterMap中getClusterMap().put(instance.getClusterName(), cluster);}//根据集群名字,从ipMap里面获取集群下的所有实例List<Instance> clusterIPs = ipMap.get(instance.getClusterName());if (clusterIPs == null) {clusterIPs = new LinkedList<>();ipMap.put(instance.getClusterName(), clusterIPs);}//将客户端传过来的新注册的instance实例,添加到clusterIPs,也就是ipMap中clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);}}//对所有的服务实例分好类之后,按照ClusterName来更新注册表for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {//entryIPs已经是根据ClusterName分好组的实例列表了List<Instance> entryIPs = entry.getValue();//调用Cluster.updateIps()方法,根据写时复制,对注册表中的每一个Cluster对象进行更新clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);}setLastModifiedMillis(System.currentTimeMillis());getPushService().serviceChanged(this);StringBuilder stringBuilder = new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString());}...
}public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {@JsonIgnoreprivate Set<Instance> persistentInstances = new HashSet<>();@JsonIgnoreprivate Set<Instance> ephemeralInstances = new HashSet<>();@JsonIgnoreprivate Service service;...//Update instance list.public void updateIps(List<Instance> ips, boolean ephemeral) {//先判定是否是临时实例,然后把对应的实例数据取出来,放入到新创建的toUpdateInstances集合中Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;//将老的实例列表toUpdateInstances复制一份到oldIpMap中HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());for (Instance ip : toUpdateInstances) {oldIpMap.put(ip.getDatumKey(), ip);}...//最后把传入进来的实例列表,重新初始化一个HaseSet,赋值给toUpdateInstancestoUpdateInstances = new HashSet<>(ips);//判断是否是临时实例,将Cluster的persistentInstances或ephemeralInstances替换为toUpdateInstancesif (ephemeral) {//直接把之前的实例列表替换成新的ephemeralInstances = toUpdateInstances;} else {//直接把之前的实例列表替换成新的persistentInstances = toUpdateInstances;}}...
}
从这部分源码中就可以看出,全程都没有对之前注册表中的数据进行操作。而是先拿出来,最后直接把新的数据替换过去,这样就完成了注册表修改。从而避免了对Set的并发读写冲突。
相关文章:
Nacos源码—1.Nacos服务注册发现分析二
大纲 1.客户端如何发起服务注册 发送服务心跳 2.服务端如何处理客户端的服务注册请求 3.注册服务—如何实现高并发支撑上百万服务注册 4.内存注册表—如何处理注册表的高并发读写冲突 2.服务端如何处理客户端的服务注册请求 (1)客户端自动发送服务注册请求梳理 (2)Nacos…...
设备指纹护航电商和金融反欺诈体系建设
众所周知,人的指纹具有唯一性,可以作为人的身份识别标识。对于设备而言,也有可以用于识别的特征。设备指纹是指可以用于唯一标识出某一设备的特征或者独特的设备标识,具有固定性、较难篡改性、唯一性等特质。 设备指纹是金融机构…...
FFmpeg源码学习---ffmpeg
1、ffmpeg源码主函数 ┌────────────────────┐ │ main() │ └─────────┬───────────┘ ↓ ┌────────────────────┐ │ 初始化 (日志/网络等) │ │ init_dynload() │ │ avf…...
leetcode 206. 反转链表
题目描述: 迭代法: /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode() : val(0), next(nullptr) {}* ListNode(int x) : val(x), next(nullptr) {}* ListNode(int x, ListNode …...
NVIDIA新模型DAM-3B:描述一切,图像视频局部描述新突破
在数字时代,图像和视频内容爆炸式增长,如何让AI像人类一样精准描述画面中的特定区域,成为计算机视觉领域的核心挑战。传统模型要么丢失细节,要么缺乏上下文,而NVIDIA与UC Berkeley联合团队提出的DAM(Descri…...
7、langChain和RAG实战:基于LangChain和RAG的常用案例实战
PDF 文档问答ChatBot 本地上传文档 支持 pdf支持 txt支持 doc/docx问答页面 python环境 新建一个requirements.txt文件streamlit python-docx PyPDF2 faiss-cpu langchain langchain-core langchain-community langchain-openai然后安装相应的包pip install -r requirements.t…...
c++11: 类型转换
目录 一 C语言中的类型转换 二 . C强制类型转换 1. static_cast 2. reinterpret_cast 3. const_cast 4. dynamic_cast 三 explicit 关键字 一 C语言中的类型转换 在C语言中,如果赋值运算符左右两侧类型不同,或者形参与实参类型不匹配ÿ…...
Matlab自学笔记五十二:变量名称:检查变量名称是否存在或是否与关键字冲突
1.变量名称的命名规则 有效的变量名称以字母开头,后跟字母、数字或下划线,Matlab变量名称对字母大小写是区分的,A和a是不相同的变量,不能使用与Matlab关键字冲突的变量名称,例如if、end等,判断一个字符是不…...
西门子PLC结构化编程_水处理系统水泵多备多投
文章目录 前言一、功能概述二、程序编写1. 需求分析2. 编写运行时间累计功能块3. 创建自定义数据类型1. 时间排序数据类型2. 多备多投数据类型3. 多备多投切换数据类型 4. 编程1. 创建DB数据块1. 多备多投数据块2. 多备多投切换数据块 2. 创建FB功能块 三、程序调用总结 前言 …...
AutoGen 框架深度解析:构建多智能体协作的事件驱动架构
在当下多智能体(Multi-Agent)AI系统快速发展的背景下,AutoGen 作为微软研究院开源的编程框架,为构建可扩展、灵活且可调试的智能体协作应用提供了完备的工具与最佳实践。本文将从设计动机、核心架构、关键概念、安装与快速上手、典型场景、进阶特性、生态与扩展、最佳实践,…...
算法相关概念
1 算法概述 1.1 算法概念 算法是特定问题求解步骤的描述,也是独立存在的一种解决问题的思想和方法 对于算法而言,实现他的编程语言无关紧要,重要的是思想和方法!!! 公式:程序算法数据结构&a…...
《Astro 3.0岛屿架构让内容网站“脱胎换骨”》
内容优先的网站越来越成为主流。无论是新闻资讯、知识博客,还是电商产品展示,用户都希望能快速获取所需内容,这对网站的性能和体验提出了极高要求。而Astro 3.0的岛屿架构,就像是为内容优先网站量身定制的一把神奇钥匙,…...
Vue3 + Element-Plus + 阿里云文件上传
Element-Plus 阿里云文件上传 1、选择文件夹方法2、Chrome 浏览器查看 input typefile 元素上传的文件方法3、上传文件4、FormDataFormData 是什么创建 FormDataFormData 常用方法FormData 的实际应用性能与注意事项总结 1、选择文件夹方法 input typefile 元素想要上传文件夹…...
【Linux】第十一章 管理网络
目录 1.TCP/IP网络模型 物理层(Physical) 数据链路层(Date Link) 网络层(Internet) 传输层(Transport) 应用层(Application) 2. 对于 IPv4 地址&#…...
用vite动态导入vue的路由配置
在Vue应用中,通过路由可以实现不同页面之间的切换,同时也可以实现页面之间的传参和控制页面的显示与隐藏。但是我们在开发的过程中,会发现在路由配置中的路由配置和我们的项目结构高度重复,在我们修改页面文件结构时非常的麻烦与复…...
sources.list.d目录
sources.list可能大家很熟悉,是配置镜像链接的地方。 sources.list.d其实就是一个目录,在linux系统中.d后缀一般定义为一个目录,且很喜欢用这种方式。 这种方式有一个好处,就是修改不会影响到sources.list文件, 在这里…...
【C语言】文件操作
目录 一为什么使用文件 二什么是文件 程序文件 数据文件 文件名 二进制文件和文本文件? 三文件的打开与关闭 流的概念 标准流 文件指针 指针的声明 指针的初始化 四文件的打开与关闭 打开 fopen()函数 五总结: 前言: …...
静态库与动态库简介
静态库与动态库简介 基本概念 静态库 静态库是在编译链接阶段被直接整合到可执行文件中的代码集合。链接器会从静态库中提取程序所需的所有对象,并将它们复制到最终的可执行文件中。 特点: 可执行文件包含了所有代码,运行时无需外部依赖…...
02《小地图实时》Unity
创建一个新的项目 创建一个球体 作为主角 重命名为Player 在主角上创建空的子物体 重命名为MiniMapIcon 增加一个精灵图片 并设置为绿色 增加一个层(目的是在小地图中看的到 而在场景中看不到这个绿色Icon) 命名为MiniMap 在主摄像机中设置剔除遮罩Culli…...
【Redis】基础4:作为分布式锁
文章目录 1. 一些概念2. MySQL方案2.1 方案一:事务特性2.1.1 存在的问题2.1.2 解决方案 2.2 方案二:乐观锁2.3 方案三:悲观锁 3. Redis3.1 实现原理3.2 实现细节3.2.1 问题1:持有期间锁过期问题3.2.2 问题2:判断和释放…...
迭代器与生成器
目录 Iterator 的作用 Iterator 的遍历过程 Symbol.iterator方法 实现iterator接口的自定义类示例 Generator函数 迭代器对象的next方法的运行逻辑 迭代器对象除了具有next方法,还可以具有return方法。 Iterator 的作用 为各种数据结构,提供一个统…...
Python 实现的运筹优化系统数学建模详解(动态规划模型)
相关代码链接:https://download.csdn.net/download/heikediguoshinib/90713747?spm1001.2014.3001.5503 一、引言 在计算机科学与数学建模的广阔领域中,算法如同精密的齿轮,推动着问题的解决与系统的运行。当面对复杂的优化问题时&…...
miniconda在ARM64位芯片上面的安装
文章目录 前言一、特点二、适用场景三、下载安装及使用1.下载脚本文件2.安装命令3.常见用法 总结 前言 Miniconda 是一个轻量级的 Python 发行版,它是 Anaconda 的一个简化版本。Anaconda 是一个广泛使用的数据科学平台,包含了众多的 Python 包和工具&a…...
vue跨域问题总结笔记
目录 一、Websocket跨域问题 1.nginx配置 2.VUE CLI代理 3.env.development配置 4.nginx日志 5.解决 一、解决跨域的几种常用方法 1.Vue CLI代理 2.JSONP 3.WebSocket 4.NGINX解决跨域问题 6.Java解决跨域 二、Vue跨域问题详解 1. 什么是跨域 2. 跨域的例子 3.…...
自动驾驶领域专业词汇(专业术语)整理
以下是分类整理的自动驾驶领域专业词汇表,涵盖 AI、芯片、传感器、自动驾驶核心、辅助驾驶、安全、通信、车灯、泊车、测试标准 等类别: AI相关 缩写英文全称中文解释AIArtificial Intelligence人工智能,模拟人类智能的技术体系NNNeural Ne…...
说一下react更新的流程
beginWork 使用v-dom和current fiber去生成子节点的workInProgress Fiber 期间会执行函数组件、类组件、diff子节点 给我需要变更的节点,打赏effectTag 增placement 2 0010 删deletion 8 1000 改 update 4 0100 增和改 placementAndUpdate…...
C 语言函数指针与指针函数详解
一、引言 在 C 语言的编程世界中,函数指针和指针函数是两个既强大又容易混淆的概念。它们为 C 语言带来了更高的灵活性和可扩展性,广泛应用于回调函数、动态链接库、状态机等多种场景。深入理解和掌握函数指针与指针函数,对于提升 C 语言编程…...
政策支持与市场驱动:充电桩可持续发展的双轮引擎
随着全球能源转型加速,新能源汽车成为实现低碳交通的重要方向。然而,充电基础设施不足仍是制约其普及的关键瓶颈。当前,国际主流的充电桩运营模式包括政府推动、电网企业推动及汽车厂商推动三种模式,但单一模式均存在显著局限性。…...
在 Ubuntu 22.04 x64 系统安装/卸载 1Panel 面板
一、 1Panel 是什么? 1Panel 是一款基于 Go 语言开发的现代化开源服务器管理面板(类似宝塔面板),专注于容器化(Docker)和云原生环境管理,提供可视化界面简化服务器运维操作。 1. 1Panel主要功…...
dummy cli-tool ubuntu22.04使用
项目场景:dummy cli-tool ubuntu22.04使用 提示:这里简述项目相关背景:执行python3 run_shell.py时报错 例如:项目场景:示例:通过蓝牙芯片(HC-05)与手机 APP 通信,每隔 5s 传输一批传感器数据(不是很大) …...
厚铜板的镀前处理差异:工艺参数与成本影响
在现代电子设备中,厚铜电路板因其优异的导电性能和良好的热管理能力而备受青睐。生产过程中,对铜层进行电镀加厚是一个关键步骤,它涉及到一系列复杂的化学和物理过程。在进行电镀之前,必须对电路板进行适当的准备工作,…...
【C到Java的深度跃迁:从指针到对象,从过程到生态】第四模块·Java特性专精 —— 第十六章 多线程:从pthread到JMM的升维
一、并发编程的范式革命 1.1 C多线程的刀耕火种 C语言通过POSIX线程(pthread)实现并发,需要开发者直面底层细节: 典型pthread实现: #include <pthread.h> int counter 0; pthread_mutex_t lock PTHREAD…...
数据库学习笔记(十三)---存储过程
前言: 学习和使用数据库可以说是程序员必须具备能力,这里将更新关于MYSQL的使用讲解,大概应该会更新30篇,涵盖入门、进阶、高级(一些原理分析);这一篇存储过程,下一篇是存储函数;虽然MYSQL命令很多,但是自…...
JWT(JSON Web Token)源码分析
Java - JWT的简单介绍和使用 Java JWT:原理、机制及案例示范 什么是JWT? 1.1 JWT的基本概念 JWT(JSON Web Token)是一种用于在各方之间传递JSON格式信息的紧凑、URL安全的令牌(Token)。JWT的主要作用是验…...
Vue 3 中通过 createApp 创建的 app 实例的所有核心方法,包含完整示例、使用说明及对比表格
以下是 Vue 3 中通过 createApp 创建的 app 实例的所有核心方法,包含完整示例、使用说明及对比表格: 1. app.component() 作用:注册全局组件 参数: name:组件名称(字符串)componentÿ…...
Hadoop 单机模式(Standalone Mode)部署与 WordCount 测试
通过本次实验,成功搭建了 Hadoop 单机环境并运行了基础 MapReduce 程序,为后续分布式计算学习奠定了基础。 掌握 Hadoop 单机模式的安装与配置方法。 熟悉 Hadoop 环境变量的配置及 Java 依赖管理。 使用 Hadoop 自带的 WordCount 示例程序进行简单的 …...
线段树合并与分解
合并 #include <bits/stdc.h> using namespace std; #define asd(i,a,b) for(int ia;i<b;i) #define int long long const int inf 0x3f3f3f3f, N 1e5 5, Z 1e5; int n, m, fa[N], o[N][25], dep[N], tot, root[N], ans[N]; vector<int> g[N]; struct node…...
驱动开发硬核特训 │ 深度解析 fixed regulator 驱动与 regulator_ops
一、引言:本次目标 本篇聚焦于: Regulator 子系统基础概念设备树节点与驱动代码的对应关系regulator_desc、regulator_ops、regulator_dev 的完整讲解驱动端的实际注册与管理流程 通过一个实际案例,系统掌握 regulator 子系统 的全貌。 二…...
Linux中的shell脚本练习
1.判断字符串是否为空 #!/usr/bin/bash while : #:默认值为真 do read -p "请输入你的密码: " a pass123456 if [ -z $a ];thenecho "您输入的密码不能为空"exit 1 elseif [ $a $pass ];thenecho "登录成功"breakelseecho "您的密码输入有…...
MySQL基础篇 | 1-数据库概述与MySQL安装
【MySQL基础篇-1】数据库概述与MySQL安装 1. 数据库概述2. MySQL环境搭建2.1. MySQL的四大版本2.2. 软件下载1. 数据库概述 MySQL官网网站:https://dev.mysql.com/doc/relnotes/mysql/8.0/en/ SQL Server:SQL Server是微软开发的大型商业数据库。C#、.net等语言常使用,与wi…...
JVM 自动内存管理
一、运行时数据区域详解 Java 虚拟机在运行 Java 程序时,会将所管理的内存划分为多个不同的数据区域,各区域有着独特的用途、创建和销毁时间。 程序计数器:作为线程私有的较小内存空间,它是当前线程执行字节码的行号指示器。字节…...
InitializingBean接口和@PostConstruct-笔记
1. InitializingBean 简介 1.1 功能简介 InitializingBean 是 Spring 框架中的一个接口,用在 Bean 初始化后执行自定义逻辑。它提供了 afterPropertiesSet() 方法,该方法在以下时机被 Spring 容器自动调用: 属性注入完成后(即所…...
考研408-计算机组成原理冲刺考点(1-3章)
第一章 计算机系统概述 1.计算机核心 早期的冯诺依曼计算机是以运算器为中心的,而现在的计算机是以存储器为中心的 2.五大部件 3.汇编程序、编译程序、解释程序的辨析...
模板方法模式(Template Method Pattern)
模板方法模式(Template Method Pattern)是一种行为型设计模式,它定义了一个操作中的算法骨架,将一些步骤的实现延迟到子类中。模板方法使得子类可以在不改变算法结构的前提下,重新定义算法中的某些步骤。 一、基础 1. 意图 定义一个操作中的算法骨架,将某些步骤延迟到…...
一文了解无人机系统
无人机系统,又称无人驾驶航空器系统(Remotely Piloted Aircraft System,RPAS),作为一个由无人机平台、遥控站、指令与控制数据链及其他部件构成的完整技术体系,其系统架构包含多个核心分系统。具体而言&…...
系统架构师2025年论文《论软件的设计模式》
论软件的设计模式 摘要: 2016 年,我所在的公司承担了某市医院预约挂号系统的研发任务。我作为公司的技术总监,希望能打造基于该系统的系列产品,参与到项目的设计中,以期开发扩展性和可维护性良好的预约挂号系统,为以后的产品开发打下基础。网络靶场是网络安全技术研究的…...
集成电路流片随笔19:full_handshake
全双工握手接收模块 (full_handshake_rx),它的功能是接收来自发送端 (tx) 的数据,并对发送端进行应答(ACK)。模块实现了基于握手的通信机制,以确保数据的可靠传输。模块的输入输出分别连接于发送端和接收端,…...
Android Framework 探秘
以下文字来源AI,准确性不敢保证! 安卓Framework层概述 安卓的 Framework(框架层) 是安卓系统的核心组成部分,位于应用层和系统底层(如Linux内核)之间,负责为应用提供统一的接口和功…...
亚马逊云科技2025战略解析:AI驱动下的全球生态重塑
一、战略转向:从“云优先”到“AI优先”的核心逻辑 1. 千亿美元资本投入AI基建 芯片自研突破:2025年资本支出70%投向AI芯片与液冷数据中心。自研芯片矩阵全面升级,包括3纳米工艺的Trainium3(算力提升4倍)、单核性能…...
NGINX ngx_http_addition_module 模块响应体前后注入内容
一、模块概述 模块名称:ngx_http_addition_module引入版本:自 0.7.9 起支持 addition_types,0.8.29 起支持“*”通配;功能:对符合 MIME 类型的响应,在响应体前后分别插入指定子请求 URI 返回的内容&#x…...