Nacos源码—2.Nacos服务注册发现分析四
大纲
5.服务发现—服务之间的调用请求链路分析
6.服务端如何维护不健康的微服务实例
7.服务下线时涉及的处理
8.服务注册发现总结
7.服务下线时涉及的处理
(1)Nacos客户端服务下线的源码
(2)Nacos服务端处理服务下线的源码
(3)Nacos服务端发送服务变动事件给客户端的源码
(1)Nacos客户端服务下线的源码
Nacos客户端的Spring容器被销毁时,会通知Nacos服务端进行服务下线。首先会触发调用AbstractAutoServiceRegistration的destroy()方法。因为该类实现了Spring监听器,并且该方法被@PreDestroy注解修饰。@PreDestroy注解的作用是:Spring容器销毁时回调被该注解修饰的方法。
然后调用NacosServiceRegistry的deregister()方法 -> NamingService的deregisterInstance()方法 -> NamingProxy的deregisterService()方法,最后调用NamingProxy的reqApi()方法向"/nacos/v1/ns/instance"接口发起删除请求。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {@Beanpublic NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {return new NacosServiceRegistry(nacosDiscoveryProperties);}@Bean@ConditionalOnBean(AutoServiceRegistrationProperties.class)public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);}@Bean@ConditionalOnBean(AutoServiceRegistrationProperties.class)public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);}
}public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {......
}public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {private final ServiceRegistry<R> serviceRegistry;...@PreDestroypublic void destroy() {stop();}public void stop() {if (this.getRunning().compareAndSet(true, false) && isEnabled()) {deregister();if (shouldRegisterManagement()) {deregisterManagement();}this.serviceRegistry.close();}}protected void deregister() {//调用NacosServiceRegistry.deregister()方法this.serviceRegistry.deregister(getRegistration());}...
}public class NacosServiceRegistry implements ServiceRegistry<Registration> {...@Overridepublic void deregister(Registration registration) {...NamingService namingService = namingService();String serviceId = registration.getServiceId();String group = nacosDiscoveryProperties.getGroup();try {//调用NamingService.deregisterInstance()方法namingService.deregisterInstance(serviceId, group, registration.getHost(), registration.getPort(), nacosDiscoveryProperties.getClusterName());} catch (Exception e) {log.error("ERR_NACOS_DEREGISTER, de-register failed...{},", registration.toString(), e);}log.info("De-registration finished.");}private NamingService namingService() {return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());}...
}//以上是nacos-discovery的,以下是nacos-client的
public class NacosNamingService implements NamingService {private BeatReactor beatReactor;private NamingProxy serverProxy;...@Overridepublic void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException {Instance instance = new Instance();instance.setIp(ip);instance.setPort(port);instance.setClusterName(clusterName);deregisterInstance(serviceName, groupName, instance);}@Overridepublic void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {if (instance.isEphemeral()) {beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort());}//调用NamingProxy.deregisterService()方法serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);}...
}public class NamingProxy implements Closeable {...public void deregisterService(String serviceName, Instance instance) throws NacosException {final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.DELETE);}...
}
(2)Nacos服务端处理服务下线的源码
Nacos服务端处理服务下线的入口是InstanceController的deregister()方法,然后会调用ServiceManager的removeInstance()方法移除注册表里的实例,也就是调用ServiceManager的substractIpAddresses()方法。其中会传入remove参数执行ServiceManager的updateIpAddresses()方法,该方法的返回结果不会包含要删除的实例。
在ServiceManager的updateIpAddresses()方法中,判断入参action如果是remove,那么会把对应的Instance移除掉。但此时并不操作内存注册表,只是在返回的结果中删除对应的Instance实例。然后和注册逻辑一样,也是通过异步任务 + 内存队列的方式,去修改注册表。
//Instance operation controller.
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {@Autowiredprivate ServiceManager serviceManager;...//Deregister instances.@CanDistro@DeleteMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String deregister(HttpServletRequest request) throws Exception {Instance instance = getIpAddress(request);String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Service service = serviceManager.getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName);return "ok";}//移除ServiceManager的注册表里的Instance实例serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);return "ok";}...
}//Core manager storing all services in Nacos.
@Component
public class ServiceManager implements RecordListener<Service> {//注册表,Map(namespace, Map(group::serviceName, Service)).private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();@Resource(name = "consistencyDelegate")private ConsistencyService consistencyService;...//Remove instance from service.public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {Service service = getService(namespaceId, serviceName);synchronized (service) {//移除InstanceremoveInstance(namespaceId, serviceName, ephemeral, service, ips);}}private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service, Instance... ips) throws NacosException {//和注册一样,也是先构建keyString key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//在这个instanceList中,不会包含需要删除的Instance实例了List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);//包装成Instances对象Instances instances = new Instances();instances.setInstanceList(instanceList);//调用和注册一样的逻辑,把instanceList中的Instance,通过写时复制的机制,修改内存注册表consistencyService.put(key, instances);}private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {//UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE传的Removereturn updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);}//Compare and get new instance list.public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {//先获取已经注册到Nacos的、当前要注册的服务实例对应的服务的、所有服务实例Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));List<Instance> currentIPs = service.allIPs(ephemeral);Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());Set<String> currentInstanceIds = Sets.newHashSet();for (Instance instance : currentIPs) {//把instance实例的IP当作key,instance实例当作value,放入currentInstancescurrentInstances.put(instance.toIpAddr(), instance);//把实例唯一编码添加到currentInstanceIds中currentInstanceIds.add(instance.getInstanceId());}//用来存放当前要注册的服务实例对应的服务的、所有服务实例Map<String, Instance> instanceMap;if (datum != null && null != datum.value) {instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);} else {instanceMap = new HashMap<>(ips.length);}for (Instance instance : ips) {if (!service.getClusterMap().containsKey(instance.getClusterName())) {Cluster cluster = new Cluster(instance.getClusterName(), service);cluster.init();service.getClusterMap().put(instance.getClusterName(), cluster);Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());}if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {//移除Instance实例instanceMap.remove(instance.getDatumKey());} else {Instance oldInstance = instanceMap.get(instance.getDatumKey());if (oldInstance != null) {instance.setInstanceId(oldInstance.getInstanceId());} else {instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));}//instanceMap的key与IP和端口有关instanceMap.put(instance.getDatumKey(), instance);}}if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));}//最后instanceMap里肯定会包含新注册的Instance实例//并且如果不是第一次注册,里面还会包含之前注册的Instance实例信息return new ArrayList<>(instanceMap.values());}...
}
(3)Nacos服务端发送服务变动事件给客户端的源码
一.处理服务注册或服务下线时让客户端感知的方案
二.处理服务注册或服务下线时发布服务变动事件
三.监听服务变动事件并通过UDP发送推送给客户端
一.处理服务注册或服务下线时让客户端感知的方案
Nacos客户端进行服务注册或服务下线时,其他Nacos客户端如何感知。
方案一:其他Nacos客户端在服务发现时,会通过定时任务去更新客户端本地缓存,但是这样做会有几秒钟的延迟。
方案二:当Nacos服务端的注册表发生了变动,服务端主动通知客户端。其实Nacos服务端在处理服务注册或服务下线时的最后逻辑是一样的。即在通过写时复制修改完注册表后,服务端会发布一个变动事件。然后通过UDP方式通知每一个客户端,从而让客户端更快感知服务变动。
二.处理服务注册或服务下线时发布服务变动事件
服务注册或服务下线时,都会调用ConsistencyService的put()方法,将本次操作包装成Pair对象放入阻塞队列,然后由异步任务Notifier来处理阻塞队列中的Pair对象。
异步任务Notifier对阻塞队列中的Pair对象进行处理时,会调用Pair对象对应的Service服务的onChange()方法,而Service的onChange()方法又会调用Service的updateIPs()方法。
在Service的updateIPs()方法中:会先调用Cluster的updateIps()方法通过写时复制机制去修改注册表,然后调用PushService的serviceChanged()方法发布服务变动事件。
@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {private final GlobalConfig globalConfig;private final DistroProtocol distroProtocol;private final DataStore dataStore;//用于存储所有已注册的服务实例数据private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();private volatile Notifier notifier = new Notifier();...@PostConstructpublic void init() {//初始化完成后,会将notifier任务提交给GlobalExecutor来执行GlobalExecutor.submitDistroNotifyTask(notifier);}@Overridepublic void put(String key, Record value) throws NacosException {//把包含了当前注册的服务实例的、最新的服务实例列表,存储到DataStore对象中onPut(key, value);//在集群架构下,DistroProtocol.sync()方法会进行集群节点的服务实例数据同步distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}public void onPut(String key, Record value) {if (KeyBuilder.matchEphemeralInstanceListKey(key)) {//创建Datum对象,把服务key和服务的所有服务实例Instances放入Datum对象中Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();//添加到DataStore的Map对象里dataStore.put(key, datum);} if (!listeners.containsKey(key)) {return;}//添加处理任务notifier.addTask(key, DataOperation.CHANGE);}...public class Notifier implements Runnable {private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);//Add new notify task to queue.public void addTask(String datumKey, DataOperation action) {if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {return;}if (action == DataOperation.CHANGE) {services.put(datumKey, StringUtils.EMPTY);}//tasks是一个阻塞队列,把key、action封装成Pair对象,放入队列中tasks.offer(Pair.with(datumKey, action));}public int getTaskSize() {return tasks.size();}@Overridepublic void run() {Loggers.DISTRO.info("distro notifier started");//无限循环for (; ;) {try {//从阻塞队列中获取任务Pair<String, DataOperation> pair = tasks.take();//处理任务handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}private void handle(Pair<String, DataOperation> pair) {try {//把在DistroConsistencyServiceImpl.onPut()方法创建的key和action取出来String datumKey = pair.getValue0();DataOperation action = pair.getValue1();services.remove(datumKey);int count = 0;if (!listeners.containsKey(datumKey)) {return;}for (RecordListener listener : listeners.get(datumKey)) {count++;try {if (action == DataOperation.CHANGE) {//把Instances信息写到注册表里去,会调用Service.onChange()方法listener.onChange(datumKey, dataStore.get(datumKey).value);continue;}if (action == DataOperation.DELETE) {listener.onDelete(datumKey);continue;}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);}}if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name());}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}}
}//Service of Nacos server side
//We introduce a 'service --> cluster --> instance' model,
//in which service stores a list of clusters, which contain a list of instances.
//his class inherits from Service in API module and stores some fields that do not have to expose to client.
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {private Map<String, Cluster> clusterMap = new HashMap<>();...@Overridepublic void onChange(String key, Instances value) throws Exception {Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);for (Instance instance : value.getInstanceList()) {if (instance == null) {//Reject this abnormal instance list:throw new RuntimeException("got null instance " + key);}if (instance.getWeight() > 10000.0D) {instance.setWeight(10000.0D);}if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {instance.setWeight(0.01D);}}updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));recalculateChecksum();}//Update instances. 这里的instances里就包含了新注册的实例对象public void updateIPs(Collection<Instance> instances, boolean ephemeral) {//clusterMap表示的是该服务的集群Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList<>());}//遍历全部实例对象:包括已经注册过的实例对象 和 新注册的实例对象//这里的作用就是对相同集群下的instance进行分类for (Instance instance : instances) {try {if (instance == null) {Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}//判定客户端传过来的instance实例中,是否设置了ClusterNameif (StringUtils.isEmpty(instance.getClusterName())) {//如果否,就设置instance实例的ClusterName为DEFAULTinstance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}//判断之前是否存在对应的CLusterName,如果没有则需要创建新的Cluster对象if (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());//创建新的Cluster集群对象Cluster cluster = new Cluster(instance.getClusterName(), this);cluster.init();//将新创建的Cluster对象放入到集群clusterMap中getClusterMap().put(instance.getClusterName(), cluster);}//根据集群名字,从ipMap里面获取集群下的所有实例List<Instance> clusterIPs = ipMap.get(instance.getClusterName());if (clusterIPs == null) {clusterIPs = new LinkedList<>();ipMap.put(instance.getClusterName(), clusterIPs);}//将客户端传过来的新注册的instance实例,添加到clusterIPs,也就是ipMap中clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);}}//对所有的服务实例分好类之后,按照ClusterName来更新注册表for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {//entryIPs已经是根据ClusterName分好组的实例列表了List<Instance> entryIPs = entry.getValue();//调用Cluster.updateIps()方法,根据写时复制,对注册表中的每一个Cluster对象进行更新clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);}setLastModifiedMillis(System.currentTimeMillis());//使用UDP方式通知Nacos客户端getPushService().serviceChanged(this);StringBuilder stringBuilder = new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString());}@JsonIgnorepublic PushService getPushService() {return ApplicationUtils.getBean(PushService.class);}...
}public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {@JsonIgnoreprivate Set<Instance> persistentInstances = new HashSet<>();@JsonIgnoreprivate Set<Instance> ephemeralInstances = new HashSet<>();@JsonIgnoreprivate Service service;...//Update instance list.public void updateIps(List<Instance> ips, boolean ephemeral) {//先判定是否是临时实例,然后把对应的实例数据取出来,放入到新创建的toUpdateInstances集合中Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;//将老的实例列表toUpdateInstances复制一份到oldIpMap中HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());for (Instance ip : toUpdateInstances) {oldIpMap.put(ip.getDatumKey(), ip);}...//最后把传入进来的实例列表,重新初始化一个HaseSet,赋值给toUpdateInstancestoUpdateInstances = new HashSet<>(ips);//判断是否是临时实例,将CLuster的persistentInstances或ephemeralInstances替换为toUpdateInstancesif (ephemeral) {//直接把之前的实例列表替换成新的ephemeralInstances = toUpdateInstances;} else {//直接把之前的实例列表替换成新的persistentInstances = toUpdateInstances;}}...
}
三.监听服务变动事件并通过UDP发送通知给客户端
PushService的serviceChanged()方法发布服务变动事件。由于PushService实现了ApplicationListener,所以PushService的onApplicationEvent()方法会收到发布的服务变动事件,然后调用PushService的udpPush()方法通过UDP协议主动通知客户端。
总结:如果Nacos服务端的注册表发生变动,会通过UDP协议主动通知客户端。UDP协议比较轻量化,它无需建立连接就可以发送封装的IP数据包。虽然UDP协议下的传输不可靠,但是不可靠也没关系。因为每个客户端本地还有一个定时任务去更新本地实例列表缓存。
@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {private ApplicationContext applicationContext;private static DatagramSocket udpSocket;private static volatile ConcurrentMap<String, Long> udpSendTimeMap = new ConcurrentHashMap<>();private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap = new ConcurrentHashMap<>();...//Service changed.public void serviceChanged(Service service) {//merge some change events to reduce the push frequency:if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {return;}//发布服务变动事件this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));}@Overridepublic void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();String serviceName = service.getName();String namespaceId = service.getNamespaceId();Future future = GlobalExecutor.scheduleUdpSender(() -> {try {Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");//获取某服务下的所有Nacos客户端ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}Map<String, Object> cache = new HashMap<>(16);long lastRefTime = System.nanoTime();//遍历所有客户端for (PushClient client : clients.values()) {...//通过UDP进行通知udpPush(ackEntry);}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {if (ackEntry == null) {Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");return null;}if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush += 1;return ackEntry;}try {if (!ackMap.containsKey(ackEntry.key)) {totalPush++;}ackMap.put(ackEntry.key, ackEntry);udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());Loggers.PUSH.info("send udp packet: " + ackEntry.key);//通过UDP协议发送消息udpSocket.send(ackEntry.origin);ackEntry.increaseRetryTime();GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);return ackEntry;} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush += 1;return null;}}...
}
(4)服务下线的处理总结
8.服务注册发现总结
一.客户端
nacos-discovery利用了Spring的事件监听机制,在Spring容器启动时的调用Nacos服务端提供的服务实例注册接口。在调用服务实例注册接口时,客户端会开启一个异步任务来做发送心跳。
在客户端进行微服务调用时,nacos-discovery会整合Ribbon,然后查询Nacos服务端的服务实例列表来维护本地缓存,从而通过Ribbon实现服务调用时的负载均衡。
在关闭Spring容器时,会触发Nacos客户端销毁的方法,然后调用Nacos服务端的服务下线接口,从而完成服务下线流程。
二.服务端
服务端的核心功能:服务注册、服务查询、服务下线、心跳健康。服务注册的实现要点:异步任务 + 内存阻塞队列、内存注册表、写时复制。
服务端也会开启心跳健康检查的定时任务来检查不健康的实例。如果发现Instance超过15秒没有心跳,则标记为不健康。如果发现Instance超过30秒没有心跳,则会直接删除。
进行服务查询时,是直接从内存注册表中获取Instance列表进行返回。
相关文章:
Nacos源码—2.Nacos服务注册发现分析四
大纲 5.服务发现—服务之间的调用请求链路分析 6.服务端如何维护不健康的微服务实例 7.服务下线时涉及的处理 8.服务注册发现总结 7.服务下线时涉及的处理 (1)Nacos客户端服务下线的源码 (2)Nacos服务端处理服务下线的源码 (3)Nacos服务端发送服务变动事件给客户端的源码…...
从Windows开发迁移到信创开发的指南:国产替代背景下的技术路径与实践
🧑 博主简介:CSDN博客专家、CSDN平台优质创作者,高级开发工程师,数学专业,10年以上C/C, C#, Java等多种编程语言开发经验,拥有高级工程师证书;擅长C/C、C#等开发语言,熟悉Java常用开…...
从数据到决策:安科瑞EIoT如何让每一度电“清晰可见”?
安科瑞顾强 在能源管理迈向精细化与数字化的今天,安科瑞EIoT能源物联网平台以“数据驱动能源价值”为核心理念,融合物联网、云计算与大数据技术,打通从设备感知到云端决策的全链路闭环,助力工商业企业、园区、物业等场景实现用电…...
10.学习笔记-MyBatisPlus(P105-P110)
1.MyBatisPlus入门案例 (1)MyBatisPlus(简称Mp)是基于MyBatis框架基础上开发的增强型工具,目的是简化开发,提高效率。 (2)开发方式:基于MyBatis使用MyBatisPlusÿ…...
LayerSkip: Enabling Early Exit Inference and Self-Speculative Decoding
TL;DR 2024 年 Meta FAIR 提出了 LayerSkip,这是一种端到端的解决方案,用于加速大语言模型(LLMs)的推理过程 Paper name LayerSkip: Enabling Early Exit Inference and Self-Speculative Decoding Paper Reading Note Paper…...
fastapi和flaskapi有什么区别
FastAPI 和 Flask 都是 Python 的 Web 框架,但设计目标和功能特性有显著差异。以下是它们的核心区别: 1. 性能与异步支持 FastAPI 基于 Starlette(高性能异步框架)和 Pydantic(数据校验库)…...
在 JMeter 中使用 BeanShell 获取 HTTP 请求体中的 JSON 数据
在 JMeter 中,您可以使用 BeanShell 处理器来获取 HTTP 请求体中的 JSON 数据。以下是几种方法: 方法一:使用前置处理器获取请求体 如果您需要在发送请求前访问请求体: 添加一个 BeanShell PreProcessor 到您的 HTTP 请求采样器…...
Go 1.25为什么要废除核心类型
关于核心类型为什么要1.25里要移除,作者Robert在博客Goodbye core types - Hello Go as we know and love it!里给了详细耐心的解答。 背景:Go 1.18 引入了泛型(generics),带来了类型参数…...
flask中的Response 如何使用?
在 Flask 中,Response 对象用于生成 HTTP 响应并返回给客户端。以下是其常见用法及示例: 1. 直接返回字符串或 HTML 视图函数返回的字符串会被自动包装为 Response 对象,默认状态码为 200,内容类型为 text/html: app…...
基于SpringAI实现简易聊天对话
简介 本文旨在记录学习和实践 Spring AI Alibaba 提供的 ChatClient 组件的过程。ChatClient 是 Spring AI 中用于与大语言模型(LLM)进行交互的高级 API,它通过流畅(Fluent)的编程接口,极大地简化了构建聊天…...
STM32单片机入门学习——第49节: [15-2] 读写内部FLASH读取芯片ID
写这个文章是用来学习的,记录一下我的学习过程。希望我能一直坚持下去,我只是一个小白,只是想好好学习,我知道这会很难,但我还是想去做! 本文写于:2025.04.29 STM32开发板学习——第49节: [15-2] 读写内部FLASH&读取芯片ID 前言开发板说…...
第14讲:科研图表的导出与排版艺术——高质量 PDF、TIFF 输出与投稿规范全攻略!
目录 📘 前言:导出,不只是“保存”! 🎯 一、你需要掌握的导出目标 🖼️ 二、TIFF / PNG 导出规范(适用于投稿) 🧲 三、PDF 矢量图导出(排版首选) 🧩 四、强烈推荐组合:showtext + Cairo 🧷 五、多个图的组合导出技巧 🧪 六、特殊投稿需求处理 �…...
SRIO IP调试问题记录(ready信号不拉高情况)
问题:调试过程中遇到有时写入数据后数据不发送,并且ready信号在写入一定数据后一直拉低的情况(偶发,不是每次必然出现)。buf空间设置为16时,写入15包数据,写完第16包包头后,ready信号…...
使用DDR4控制器实现多通道数据读写(十)
一、本章概述 本章节对目前单通道的读写功能进项测试,主要验证读写的数据是否正确,并观察该工程可以存储的最大容量。通过空满信号进行读写测试,根据ila抓取fifo和ddr4全部满的时刻,可以观察到最大容量。再通过debug逻辑可以测试读…...
从 BERT 到 GPT:Encoder 的 “全局视野” 如何喂饱 Decoder 的 “逐词纠结”
当 Encoder 学会 “左顾右盼”:Decoder 如何凭 “单向记忆” 生成丝滑文本? 目录 当 Encoder 学会 “左顾右盼”:Decoder 如何凭 “单向记忆” 生成丝滑文本?引言一、Encoder vs Decoder:核心功能与基础架构对比1.1 本…...
探寻软件稳定性的奥秘
在软件开发的广袤领域中,软件的稳定性宛如基石,支撑着整个软件系统的运行与发展。《发布!软件的设计与部署》这本书的第一部分,对软件稳定性进行了深入且全面的剖析,为软件开发人员、架构师以及相关从业者们提供了极具…...
Reverse-WP记录9
前言 之前写的,一直没发,留个记录吧,万一哪天记录掉了起码在csdn有个念想 1.easyre1 32位无壳elf文件 shiftF12进入字符串,发现一串数字,双击进入 进入main函数 int __cdecl main(int argc, const char **argv, const…...
日常开发小Tips:后端返回带颜色的字段给前端
一般来说,展示给用户的字体格式,都是由前端控制,展现给用户; 但是当要表示某些字段的数据为异常数据,或者将一些关键信息以不同颜色的形式呈现给用户时,而前端又不好判断,那么就可以由后端来控…...
partition_pdf 和chunk_by_title 的区别
from unstructured.partition.pdf import partition_pdf from unstructured.chunking.title import chunk_by_titlepartition_pdf 和 chunk_by_title 初看有点像,都在"分块",但是它们的本质完全不一样。 先看它们核心区别 partition_pdfchun…...
JAVA-使用Apache POI导出数据到Excel,并把每条数据的图片打包成zip附件项
最近项目要实现一个功能,就是在导出报表的时候 ,要把每条数据的所有图片都要打包成zip附件在excel里一起导出。 1. 添加依赖 <dependency><groupId>org.apache.poi</groupId><artifactId>poi</artifactId><version>…...
前端——CSS1
一,概述 CSS(Cascading Style Sheets)(级联样式表) css是一种样式表语言,为html标签修饰定义外观,分工不同 涉及:对网页的文字、背景、宽、高、布局进行修饰 分为内嵌样式表&…...
《AI大模型应知应会100篇》【精华】第40篇:长文本处理技巧:克服大模型的上下文长度限制
[精华]第40篇:长文本处理技巧:克服大模型的上下文长度限制 摘要 在大语言模型应用中处理超出其上下文窗口长度的长文本是一项挑战。本文面向初学者介绍长文本处理的常见难题,以及一系列有效策略和技巧,包括如何对文档进行合理分…...
开源模型应用落地-qwen模型小试-Qwen3-8B-快速体验(一)
一、前言 阿里云最新推出的 Qwen3-8B 大语言模型,作为国内首个集成“快思考”与“慢思考”能力的混合推理模型,凭借其 80 亿参数规模及 128K 超长上下文支持,正在重塑 AI 应用边界。该模型既可通过轻量化“快思考”实现低算力秒级响应,也能在复杂任务中激活深度推理模式,以…...
千问3(Qwen3)模型开源以及初体验
体验地址:百炼控制台 1 千问3模型:全球最强开源大模型震撼发布 2025年4月29日,阿里巴巴正式开源了新一代通义千问模型Qwen3(简称千问3),这一里程碑式的事件标志着中国开源大模型首次登顶全球性能榜首。千问…...
对 FormCalc 语言支持较好的 PDF 编辑软件综述
FormCalc是一种专为PDF表单计算设计的脚本语言,主要应用于Adobe生态及SAP相关工具。以下是对FormCalc支持较好的主流软件及其特点: 1. Adobe LiveCycle Designer 作为FormCalc的原生开发环境,LiveCycle Designer提供最佳支持: …...
20250429-李彦宏口中的MCP:AI时代的“万能接口“
目录 一、什么是MCP? 二、为什么需要MCP? 三、MCP的工作原理 3.1 核心架构 3.2 工作流程 四、MCP的应用场景 4.1 开发者工具集成 4.2 智能助手增强 4.3 企业应用集成 4.4 典型案例 五、MCP的技术特点 5.1 标准化接口 5.2 可扩展性设计 5.…...
汽车启动原理是什么?
好的!同学们,今天我们来讨论汽车的启动原理,重点分析其中的动力来源和摩擦力作用。我会结合物理概念,用尽量直观的方式讲解。 1. 汽车为什么会动?——动力的来源 汽车发动机(内燃机或电动机)工…...
LeetCode[347]前K个高频元素
思路: 使用小顶堆,最小的元素都出去了,省的就是大,高频的元素了,所以要维护一个小顶堆,使用map存元素高频变化,map存堆里,然后输出堆的东西就行了 代码: class Solution…...
《软件测试52讲》学习笔记:如何设计一个“好的“测试用例?
引言 在软件测试领域,设计高质量的测试用例是保证软件质量的关键。本文基于茹炳晟老师在《软件测试52讲》中关于测试用例设计的讲解,结合个人学习心得,系统总结如何设计一个"好的"测试用例。 一、什么是"好的"测试用例…...
【深度学习新浪潮】ISP芯片算法技术简介及关键技术分析
ISP芯片及其功能概述 ISP(Image Signal Processor)芯片作为现代影像系统的核心组件,负责对图像传感器输出的原始信号进行后期处理。ISP的主要功能包括线性纠正、噪声去除、坏点修复、色彩校正以及白平衡调整等,这些处理步骤对于提高图像质量和视觉效果至关重要。随着科技的…...
QtCreator Kits构建套件报错(红色、黄色感叹号)
鼠标移动上去,查看具体报错提示。 一.VS2022Qt5.14.2(MSVC2017) 环境VS2022Qt5.14.2(MSVC2017) 错误:Compilers produce code for different ABIs:x86-windows-msvc2005-pe-64bit,x86-windows-msvc2005-pe-32bit 错误࿱…...
天能资管(SkyAi):全球布局,领航资管新纪元
在全球化浪潮汹涌澎湃的今天,资管行业的竞争已不再是单一市场或区域的较量,而是跨越国界、融合全球资源的全面竞争。天能资管(SkyAi),作为卡塔尔投资局(Qatar Investment Authority,QIA)旗下的尖端科技品牌,正以其独特的全球视野和深远的战略眼光,积极布局资管赛道,力求在全球资…...
基于PHP的宠物用品商城
有需要请加文章底部Q哦 可远程调试 基于PHP的宠物用品商城 一 介绍 宠物用品商城系统基于原生PHP开发,数据库mysql,前端bootstrap,jquery.js等。系统角色分为用户和管理员。(附带参考文档) 技术栈:phpmysqlbootstrapphpstudyvsc…...
桂链:使用Fabric的测试网络
桂链是基于Hyperledger Fabric开源区块链框架扩展开发的区块链存证平台,是桂云网络(OSG)公司旗下企业供应链、流程审批等场景数字存证软件产品,与桂花流程引擎(Osmanthus)并列为桂云网络旗下的标准与可定制…...
k8s术语master,node,namepace,LABLE
1.Master Kubernetes中的master指的是集群控制节点,每个kubernetes集群里都需要有一个Master节点来负责整个集群的管理和控制,基本上kubernetes的所有控制命令都发给它,它来负责具体的执行过程。Master节点通常会占据一个独立的服务器(高可用建议3台服务器)。 Master节点…...
香港科技大学广州|智能制造学域硕、博研究生招生可持续能源与环境学域博士招生宣讲会—四川大学专场!
香港科技大学广州|智能制造学域硕、博研究生招生&可持续能源与环境学域博士招生宣讲会—四川大学专场!!! 两个学域代表教授亲临现场,面对面答疑解惑助攻申请!可带简历现场咨询和面试! &am…...
【Vue】 实现TodoList案例(待办事项)
目录 组件化编码流程(通用) 1.实现静态组件:抽取组件,使用组件实现静态页面效果 2.展示动态数据: 1. 常规 HTML 属性 3.交互——从绑定事件监听开始 什么时候要用 event: 什么时候不需要用 event&am…...
Ubuntu 20.04 安装 ROS 2 Foxy Fitzroy
目录 1,安装前须知 2,安装过程 2.1,设置语言环境 2.2,设置源 2.3,安装ROS 2软件包 2.4,环境设置 2.5,测试 2.6,不想每次执行source 检验是否成功(另…...
【Unity】使用LitJson保存和读取数据的例子
LitJson 是一个轻量级的 JSON 解析和生成库,广泛应用于 .NET 环境中。 优点:轻量级,易用,性能优秀,支持LINQ和自定义对象的序列化和反序列化。 public class LitJsonTest : MonoBehaviour { // Start is called before…...
飞蛾扑火算法优化+Transformer四模型回归打包(内含MFO-Transformer-LSTM及单独模型)
飞蛾扑火算法优化Transformer四模型回归打包(内含MFO-Transformer-LSTM及单独模型) 目录 飞蛾扑火算法优化Transformer四模型回归打包(内含MFO-Transformer-LSTM及单独模型)预测效果基本介绍程序设计参考资料 预测效果 基本介绍 …...
物联网平台厂商有哪些?2025物联网平台推荐?国内有哪些比较好的物联网平台?
评选维度: 技术实力:涵盖设备接入规模、数据处理效率、AI/边缘计算融合能力、协议兼容性及平台架构先进。 应用场景:包括垂直领域解决方案的成熟度、定制化能力、跨行业复用性及实际落地案例规模。 安全可靠:涉及数据传输加密、…...
瑞幸咖啡披露2025年Q1财报:门店净增1757家,营业利润率达8.3%
4月29日,瑞幸咖啡(OTC:LKNCY)公布2025年第一季度财报。数据显示,2025年第一季度总净收入88.65亿元人民币,同比增长41.2%,GMV达103.54亿元人民币。截止一季度末,门店总数达24097家。依…...
selenium IDE脚本如何转换为可运行的selenium webdriver java程序
上一篇博客(用selenium4 webdriver java 搭建并完成第一个自动化测试脚本-CSDN博客)介绍了如何创建一个selenium webdriver 的java工程。 之前博客(带你用selenium IDE的录制第一个自动化测试脚本也介绍了如何使用selenum ide …...
GA-Transformer遗传算法优化编码器多特征分类预测/故障诊断,作者:机器学习之心
GA-Transformer遗传算法优化编码器多特征分类预测/故障诊断 目录 GA-Transformer遗传算法优化编码器多特征分类预测/故障诊断效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab实现GA-Transformer遗传算法优化编码器多特征分类预测/故障诊断,运行环境M…...
LeetCode热题100--53.最大子数组和--中等
1. 题目 给你一个整数数组 nums ,请你找出一个具有最大和的连续子数组(子数组最少包含一个元素),返回其最大和。 子数组是数组中的一个连续部分。 示例 1: 输入:nums [-2,1,-3,4,-1,2,1,-5,4] 输出&…...
【计算机视觉】深度解析MediaPipe:谷歌跨平台多媒体机器学习框架实战指南
深度解析MediaPipe:谷歌跨平台多媒体机器学习框架实战指南 技术架构与设计哲学核心设计理念系统架构概览 核心功能与预构建解决方案1. 人脸检测2. 手势识别3. 姿势估计4. 物体检测与跟踪 实战部署指南环境配置基础环境准备获取源码 构建第一个示例(手部追…...
血管造影正常≠心脏没事!无创技术破解心肌缺血漏诊困局
提到冠心病检查,很多人会纠结:到底哪项检查能更全面地反映病情、精准得出结论? 从准确性来说,冠脉 CT 与冠脉造影是临床常用手段。二者虽然能够清晰显示血管大冠脉是否存在狭窄或斑块,但二者本质上有相同的 “局限性”…...
ClickHouse副本集群
每个节点安装clickhouse服务安装 zookeeper每个节点修改 /etc/clickhouse-server/config.xml 863行左右 <remote_servers><default><shard><replica><host>18.1.13.30</host><port>9000</port></replica><replica&g…...
Go 语言中的 `os.Truncate` 函数详解
os.Truncate 是 Go 标准库中用于修改文件大小的函数。下面我将全面解析这个函数的功能、用法和注意事项。 函数签名 func Truncate(name string, size int64) error核心功能 os.Truncate 用于: 将指定文件截断或扩展到指定大小处理符号链接时会操作链接指向的实…...
java 加入本地lib jar处理方案
在 Java 项目中,如果想将本地的 .jar 文件加入到 Maven 构建流程中,有以下几种常见方式可以选择: ✅ 推荐方式:将本地 JAR 安装到本地 Maven 仓库 这是最佳实践。通过 mvn install:install-file 命令把JAR 包安装到本地仓库&…...