Nacos源码—6.Nacos升级gRPC分析二
大纲
1.Nacos 2.x版本的一些变化
2.客户端升级gRPC发起服务注册
3.服务端进行服务注册时的处理
4.客户端服务发现和服务端处理服务订阅的源码分析
4.客户端服务发现和服务端处理服务订阅的源码分析
(1)Nacos客户端进行服务发现的源码
(2)Nacos服务端处理服务订阅请求的源码
(1)Nacos客户端进行服务发现的源码
一.nacos-discovery引入Ribbon实现服务调用时的负载均衡
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
三.nacos-client如何进行服务发现
一.nacos-discovery引入Ribbon实现服务调用时的负载均衡
Nacos客户端就是引入了nacos-discovery + nacos-client依赖的项目。由于nacos-discovery整合了Ribbon,所以Ribbon可以调用Nacos服务端的服务实例查询列表接口。于是Nacos客户端便借助Ribbon实现了服务调用时的负载均衡,即Ribbon会从服务实例列表中选择一个服务实例给客户端进行服务调用。
在nacos-discovery的pom.xml中,可以看到它引入了Ribbon依赖:
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
在Ribbon中会有一个ServerList接口,如下所示:ServerList就是一个扩展接口,这个接口的作用就是获取Server列表。然后nacos-discovery会针对这个接口进行实现,从而整合Ribbon。
从引入的包来看,loadbalancer是属于Ribbon源码包下的。而LoadBalancer则是Ribbon中的负载均衡器。负载均衡器会结合IRule负载均衡策略,从服务实例列表中选择一个实例。
package com.netflix.loadbalancer;import java.util.List;//Interface that defines the methods sed to obtain the List of Servers
public interface ServerList<T extends Server> {public List<T> getInitialListOfServers();//Return updated list of servers. This is called say every 30 secspublic List<T> getUpdatedListOfServers();
}
当Nacos客户端进行微服务调用时,会通过Ribbon来选出一个服务实例,此时Ribbon会调用NacosServerList的getUpdatedListOfServers()方法获取服务实例列表。
nacos-discovery的NacosServerList类继承了AbstractServerList类,而且实现了Ribbon的ServerList接口的两个方法,如下所示:
public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {......
}public class NacosServerList extends AbstractServerList<NacosServer> {private NacosDiscoveryProperties discoveryProperties;private String serviceId;public NacosServerList(NacosDiscoveryProperties discoveryProperties) {this.discoveryProperties = discoveryProperties;}@Overridepublic List<NacosServer> getInitialListOfServers() {return getServers();}@Overridepublic List<NacosServer> getUpdatedListOfServers() {return getServers();}private List<NacosServer> getServers() {try {//读取分组String group = discoveryProperties.getGroup();//通过服务名称、分组、true(表示只需要健康实例),//调用NacosNamingService.selectInstances()方法来查询服务实例列表List<Instance> instances = discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);//把Instance转换成NacosServer类型return instancesToServerList(instances);} catch (Exception e) {throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + serviceId, e);}}private List<NacosServer> instancesToServerList(List<Instance> instances) {List<NacosServer> result = new ArrayList<>();if (CollectionUtils.isEmpty(instances)) {return result;}for (Instance instance : instances) {result.add(new NacosServer(instance));}return result;}public String getServiceId() {return serviceId;}@Overridepublic void initWithNiwsConfig(IClientConfig iClientConfig) {this.serviceId = iClientConfig.getClientName();}
}
NacosServerList的核心方法是getServers(),因为nacos-discovery实现Ribbon的两个接口都调用到了该方法。
在nacos-discovery的NacosServerList的getServers()方法中,会调用nacos-client的NacosNamingService的selectInstances()方法,来获取服务实例列表。
三.nacos-client如何进行服务发现
在nacos-client的NacosNamingService的selectInstances()方法中,首先会调用ServiceInfoHolder的getServiceInfo()方法从本地缓存获取数据。ServiceInfoHolder的serviceInfoMap中的value是一个ServiceInfo对象,在ServiceInfo对象中会有一个Listhosts属性来存放实例数据。
如果ServiceInfoHolder中的本地缓存没有对应的ServiceInfo对象,那么就会调用NamingClientProxyDelegate的subscribe()方法。该方法首先会开启一个查询服务实例列表的延时执行的任务,然后通过Client对象发送订阅请求,去服务端实时获取服务实例数据。
具体来说就是先调用ServiceInfoUpdateService的scheduleUpdateIfAbsent()方法,开启一个延迟执行查询服务实例列表的UpdateTask任务,然后再次调用ServiceInfoHolder的getServiceInfoMap()方法查询本地缓存。如果本地缓存为空,则向服务端发起gRPC请求获取服务实例数据,也就是通过调用NamingGrpcClientProxy的subscribe()方法,触发调用NamingGrpcClientProxy的doSubscribe()方法,再触发调用NamingGrpcClientProxy的requestToServer()方法,接着调用RpcClient的request()方法发送gRPC请求给服务端,最后调用ServiceInfoHolder的processServiceInfo()方法更新本地缓存。
public class NacosNamingService implements NamingService {private ServiceInfoHolder serviceInfoHolder;private NamingClientProxy clientProxy;...private void init(Properties properties) throws NacosException {...this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);}@Overridepublic List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {ServiceInfo serviceInfo;String clusterString = StringUtils.join(clusters, ",");//判断是否需要订阅,默认为trueif (subscribe) {//查询Nacos本地缓存数据serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);if (null == serviceInfo) {//如果本地缓存数据为空,则调用Client代理NamingClientProxyDelegate的订阅方法subscribe(),通过Client对象请求服务端获取数据serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);}} else {serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);}//返回数据return selectInstances(serviceInfo, healthy);}...
}//Naming client service information holder.
public class ServiceInfoHolder implements Closeable {private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;...public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());//获取keyString groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);String key = ServiceInfo.getKey(groupedServiceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}//通过key从本地缓存中获取数据return serviceInfoMap.get(key);}...
}public class NamingClientProxyDelegate implements NamingClientProxy {private final ServiceInfoUpdateService serviceInfoUpdateService;private final ServiceInfoHolder serviceInfoHolder;private final NamingGrpcClientProxy grpcClientProxy;...public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException {...this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this, changeNotifier);this.serviceInfoHolder = serviceInfoHolder;this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);}...@Overridepublic ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);//1.调用ServiceInfoUpdateService.scheduleUpdateIfAbsent()方法开启一个查询服务实例列表的定时任务serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);//再次查询本地的缓存数据ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (null == result || !isSubscribed(serviceName, groupName, clusters)) {//2.如果本地缓存还是为空,则使用gRPC来请求服务端,也就是调用NamingGrpcClientProxy.subscribe()方法result = grpcClientProxy.subscribe(serviceName, groupName, clusters);}//更新本地缓存serviceInfoHolder.processServiceInfo(result);return result;}...
}public class NamingGrpcClientProxy extends AbstractNamingClientProxy {...private final RpcClient rpcClient;private final NamingGrpcRedoService redoService;public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {...//通过RpcClientFactory.createClient()方法创建一个GrpcSdkClient对象实例,然后赋值给rpcClient属性this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);this.redoService = new NamingGrpcRedoService(this);start(serverListFactory, serviceInfoHolder);}private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {rpcClient.serverListFactory(serverListFactory);rpcClient.registerConnectionListener(redoService);rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));rpcClient.start();NotifyCenter.registerSubscriber(this);}...@Overridepublic ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);}redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);//执行订阅return doSubscribe(serviceName, groupName, clusters);}//Execute subscribe operation.public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {//构建订阅服务请求——SubscribeServiceRequest对象SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters, true);//向服务端发送SubscribeServiceRequest类型的请求SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);redoService.subscriberRegistered(serviceName, groupName, clusters);return response.getServiceInfo();}private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {try {request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));//实际会调用RpcClient.request()方法发起gRPC请求Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {throw new NacosException(response.getErrorCode(), response.getMessage());}if (responseClass.isAssignableFrom(response.getClass())) {return (T) response;}NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());} catch (Exception e) {throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);}throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");}...
}
其中UpdateTask任务的run()方法会先调用NamingClientProxy.queryInstancesOfService()方法,然后调用ServiceInfoHolder的processServiceInfo()方法向服务端查询服务实例列表以及更新本地服务实例缓存。当该任务执行完毕时,会继续向调度线程池提交一个延迟6s执行的任务,从而实现不断更新本地缓存的服务实例列表。
在ServiceInfoHolder的processServiceInfo()方法更新本地服务实例缓存中,会判断服务实例是否发生改变。如果有改变,那么客户端会先发布一个服务实例改变事件InstancesChangeEvent,然后把新的服务实例数据写入本地磁盘。
public class ServiceInfoUpdateService implements Closeable {private final NamingClientProxy namingClientProxy;private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();...//Schedule update if absent.public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {//生成一个serverKeyString serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);//判断当前serviceKey是否已经开启了对应的定时任务,如果已经开启就不开启了if (futureMap.get(serviceKey) != null) {return;}//加一把同步锁,避免并发冲突synchronized (futureMap) {//加锁之后再进行一次判断,双重检测Double Checkif (futureMap.get(serviceKey) != null) {return;}//添加UpdateTask任务到executor延迟执行ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));futureMap.put(serviceKey, future);}}private synchronized ScheduledFuture<?> addTask(UpdateTask task) {return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);}...public class UpdateTask implements Runnable {long lastRefTime = Long.MAX_VALUE;private boolean isCancel;private final String serviceName;private final String groupName;private final String clusters;private final String groupedServiceName;private final String serviceKey;//the fail situation. 1:can't connect to server 2:serviceInfo's hosts is emptyprivate int failCount = 0;public UpdateTask(String serviceName, String groupName, String clusters) {this.serviceName = serviceName;this.groupName = groupName;this.clusters = clusters;this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);}@Overridepublic void run() {long delayTime = DEFAULT_DELAY;try {if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);isCancel = true;return;}//获取本地缓存ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);if (serviceObj == null) {//如果本地缓存为空,则通过gRPC去查询服务端的服务实例数据serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);//更新本地缓存serviceInfoHolder.processServiceInfo(serviceObj);//更新UpdateTask的lastRefTime属性,即更新UpdateTask任务的数据获取时间lastRefTime = serviceObj.getLastRefTime();return;}//如果本地缓存不为空,则判断本地缓存最后一次刷新时间 是否小于等于 最后一次UpdateTask任务的数据获取时间if (serviceObj.getLastRefTime() <= lastRefTime) {//如果小于等于,则重新查询服务端的服务实例数据serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);//更新本地缓存serviceInfoHolder.processServiceInfo(serviceObj);}lastRefTime = serviceObj.getLastRefTime();if (CollectionUtils.isEmpty(serviceObj.getHosts())) {incFailCount();return;}//TODO multiple time can be configured.//计算下一次定时任务执行的时间,这个时间默认是6sdelayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;//请求成功后,重置错误次数为0resetFailCount();} catch (Throwable e) {//记录请求失败的次数incFailCount();NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);} finally {if (!isCancel) {//根据请求失败的次数,动态调整重新提交的UpdateTask定时任务的执行时间executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);}}}private void incFailCount() {int limit = 6;if (failCount == limit) {return;}failCount++;}private void resetFailCount() {failCount = 0;}}...
}//Naming client service information holder.
public class ServiceInfoHolder implements Closeable {private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;...//Process service info.public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}//获取本地缓存中的服务实例ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {return oldService;}//更新本地缓存中的服务实例serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);//判断服务实例是否有改变boolean changed = isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(), JacksonUtils.toJson(serviceInfo.getHosts()));//发布服务实例改变事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));//将服务实例信息写入本地磁盘DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;}...
}
(2)Nacos服务端处理服务订阅请求的源码
一.根据要查询的Service对象读取缓存
二.添加订阅者即Subscriber对象
假设order-service需要调用stock-service的接口,那么order-service(Nacos客户端)就要向Nacos服务端订阅stock-service服务,也就是order-service需要从服务端获取到stock-service的所有服务实例。
客户端向服务端发起订阅请求的参数类型是SubscribeServiceRequest。服务订阅请求的处理方法是SubscribeServiceRequestHandler的handle(),该方法会先从SubscribeServiceRequest对象里获取信息构建Service对象,然后再根据RequestMeta请求元数据构建Subscriber对象,接着就会调用ServiceStorage的getData()方法读取缓存中的服务实例,以及通过clientOperationService的subscribeService()方法添加订阅者。
//Handler to handle subscribe service.
@Component
public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {private final ServiceStorage serviceStorage;private final EphemeralClientOperationServiceImpl clientOperationService;...//假设order-service需要调用stock-service的接口,那么order-service(Nacos客户端)就要向服务端订阅stock-service服务//也就是order-service需要从服务端获取到(查询出)stock-service的所有服务实例@Override@Secured(action = ActionTypes.READ)public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String serviceName = request.getServiceName();String groupName = request.getGroupName();String app = request.getHeader("app", "unknown");String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//构建要查询的Service服务对象,对应的是stock-serivceService service = Service.newService(namespaceId, groupName, serviceName, true);//构建要订阅Service服务的订阅者,对应的是order-serviceSubscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());//1.调用ServiceStorage.getData()方法读取缓存ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service), metadataManager.getServiceMetadata(service).orElse(null), subscriber);if (request.isSubscribe()) {//2.添加订阅者,如果订阅的服务有变动,则需要通知订阅者clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());} else {clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());}return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);}}
一.根据要查询的Service对象读取缓存
调用的方法是ServiceStorage.getData():
-> serviceDataIndexes.get()
-> ServiceStorage.getPushData()方法
-> ServiceStorage.emptyServiceInfo()方法
-> ServiceStorage.getAllInstancesFromIndex()方法
-> ClientServiceIndexesManager.getAllClientsRegisteredService()方法
-> ServiceStorage.getInstanceInfo()方法根据clientId获取Instance对象
-> EphemeralIpPortClientManager.getClient()方法
-> AbstractClient.getInstancePublishInfo()方法
-> ServiceStorage.parseInstance()方法
-> serviceDataIndexes.put() + serviceClusterIndex.put()
ServiceStorage的getData()方法在读取缓存时,获取要查询的Service服务对象下的全部Instance实例会分三步:一是从注册表中获取要查询的Service对象下的全部clientId,二是根据clientId获取对应的Client对象,三是根据Client对象获取对应的Instance信息。
@Component
public class ServiceStorage {//缓存要查询的Service服务对象对应的已注册的服务详情private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;//缓存要查询的Service服务对象对应的服务集群private final ConcurrentMap<Service, Set<String>> serviceClusterIndex;...public ServiceInfo getData(Service service) {//判断缓存中是否有数据,传入的参数service就是要查询的Service服务对象,对应的是stock-serivce//如果有则直接读取缓存数据,如果没有则调用ServiceStorage.getPushData(service)return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);}public ServiceInfo getPushData(Service service) {//调用ServiceStorage.emptyServiceInfo()方法创建空的ServiceInfo对象ServiceInfo result = emptyServiceInfo(service);if (!ServiceManager.getInstance().containSingleton(service)) {return result;}//调用ServiceStorage.getAllInstancesFromIndex()方法获服务取实例列表//ServiceInfo的hosts属性就包含了该服务的所有Instance实例数据result.setHosts(getAllInstancesFromIndex(service));//将获取到的ServiceInfo对象放入到缓存中serviceDataIndexes.put(service, result);return result;}private ServiceInfo emptyServiceInfo(Service service) {ServiceInfo result = new ServiceInfo();result.setName(service.getName());result.setGroupName(service.getGroup());result.setLastRefTime(System.currentTimeMillis());result.setCacheMillis(switchDomain.getDefaultPushCacheMillis());return result;}private List<Instance> getAllInstancesFromIndex(Service service) {//传入的参数service就是要查询的Service服务对象,对应的是stock-serivceSet<Instance> result = new HashSet<>();Set<String> clusters = new HashSet<>();//each = clientId,调用ClientServiceIndexesManager.getAllClientsRegisteredService()方法获取Service服务对象的所有clientIdfor (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {//调用ServiceStorage.getInstanceInfo()方法,根据clientId查询出对应的Instance信息,即InstancePublishInfo对象Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);if (instancePublishInfo.isPresent()) {//对象转换,将InstancePublishInfo对象转换成Instance对象Instance instance = parseInstance(service, instancePublishInfo.get());result.add(instance);clusters.add(instance.getClusterName());}}//cache clusters of this service//缓存要查询的Service服务对象,对应有哪些集群serviceClusterIndex.put(service, clusters);return new LinkedList<>(result);}private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {//通过调用EphemeralIpPortClientManager.getClient()方法,根据clientId获取对应的Client连接对象Client client = clientManager.getClient(clientId);if (null == client) {return Optional.empty();}//调用AbstractClient.getInstancePublishInfo()方法,获取该Client客户端注册的服务实例信息InstancePublishInforeturn Optional.ofNullable(client.getInstancePublishInfo(service));}private Instance parseInstance(Service service, InstancePublishInfo instanceInfo) {Instance result = InstanceUtil.parseToApiInstance(service, instanceInfo);Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(service, instanceInfo.getMetadataId());metadata.ifPresent(instanceMetadata -> InstanceUtil.updateInstanceMetadata(result, instanceMetadata));return result;}...
}@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();...public Collection<String> getAllClientsRegisteredService(Service service) {//服务端在处理服务注册时,最后会调用ClientServiceIndexesManager.addPublisherIndexes()方法://将客户端对应的clientId放入到publisherIndexes注册表中//所有下面的代码会从注册表中获取要查询的Service服务对象对应的clientId集合return publisherIndexes.containsKey(service) ? publisherIndexes.get(service) : new ConcurrentHashSet<>();}private void addPublisherIndexes(Service service, String clientId) {//判断注册表是否存在该Service,不存在则创建一个空的ConcurrentHashSetpublisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());//把clientId放入到对应的Service中publisherIndexes.get(service).add(clientId);//发布服务改变事件NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}...
}@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {//key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();...@Overridepublic Client getClient(String clientId) {//客户端和服务端建立长连接时,就会通过EphemeralIpPortClientManager.clientConnected()方法将clientId放入到clientsreturn clients.get(clientId);}...
}public abstract class AbstractClient implements Client {//publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务//存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象//key为已注册的Service,value是根据请求中的instance实例信息封装的InstancePublishInfo对象protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);...@Overridepublic InstancePublishInfo getInstancePublishInfo(Service service) {return publishers.get(service);}...
}
二.添加订阅者即Subscriber对象
调用的方法是EphemeralClientOperationServiceImpl.subscribeService(),添加订阅者其实就是先根据clientId找出对应的客户端Client对象,然后往AbstractClient.subscribers属性放入服务对象和对应的订阅者对象,最后再发布一个客户端订阅服务事件ClientSubscribeServiceEvent。
这个事件会被ClientServiceIndexesManager的onEvent()方法处理,即调用ClientServiceIndexesManager的addSubscriberIndexes()方法,该方法会继续发布一个服务订阅事件ServiceSubscribedEvent。
//Operation service for ephemeral clients and services.
@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {...//添加订阅者//@param service service:要查询的Service对象,比如stock-service//@param subscriber subscribe:订阅者,比如对应order-service//@param clientId id of client:对应order-service与Nacos服务端的连接ID@Overridepublic void subscribeService(Service service, Subscriber subscriber, String clientId) {//传入的service是要查询的Service对象,比如stock-serviceService singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);//传入的clientId是代表着order-service的Client对象,调用EphemeralIpPortClientManager.getClient()方法Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//往代表着order-service的Client对象中,添加订阅者client.addServiceSubscriber(singleton, subscriber);client.setLastUpdatedTime();//发布客户端订阅服务事件ClientSubscribeServiceEvent,也就是order-service客户端订阅了service服务NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));}...
}@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {//key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();...@Overridepublic Client getClient(String clientId) {//客户端和服务端建立长连接时,就会通过EphemeralIpPortClientManager.clientConnected()方法将clientId放入到clientsreturn clients.get(clientId);}...
}public abstract class AbstractClient implements Client {//subscribers存放着:订阅者Subscriber(其实可理解为当前客户端)订阅了的Service服务//subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);...@Overridepublic boolean addServiceSubscriber(Service service, Subscriber subscriber) {//服务订阅时,添加订阅者//subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)if (null == subscribers.put(service, subscriber)) {MetricsMonitor.incrementSubscribeCount();}return true;}...
}@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();//订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientIdprivate final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();...//可以处理客户端注册事件ClientRegisterServiceEvent@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientOperation(ClientOperationEvent event) {Service service = event.getService();String clientId = event.getClientId();if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {//处理客户端注册事件ClientRegisterServiceEventaddPublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {//处理客户端注销事件ClientDeregisterServiceEventremovePublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {//处理客户端订阅服务事件ClientSubscribeServiceEventaddSubscriberIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {//处理客户端取消订阅事件ClientUnsubscribeServiceEventremoveSubscriberIndexes(service, clientId);}}private void addSubscriberIndexes(Service service, String clientId) {//传入的service是要查询的Service对象stock-service,clientId是订阅者order-service对应的客户端连接对象IDsubscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());//Fix #5404, Only first time add need notify event. 只有第一次添加时需要发布通知事件if (subscriberIndexes.get(service).add(clientId)) {//发布服务订阅事件ServiceSubscribedEventNotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));}}...
}
(3)总结
相关文章:
Nacos源码—6.Nacos升级gRPC分析二
大纲 1.Nacos 2.x版本的一些变化 2.客户端升级gRPC发起服务注册 3.服务端进行服务注册时的处理 4.客户端服务发现和服务端处理服务订阅的源码分析 4.客户端服务发现和服务端处理服务订阅的源码分析 (1)Nacos客户端进行服务发现的源码 (2)Nacos服务端处理服务订阅请求的源…...
如何选择自己喜欢的cms
选择内容管理系统cms what is cms1.whatcms.org2.IsItWP.com4.Wappalyzer5.https://builtwith.com/6.https://w3techs.com/7. https://www.netcraft.com/8.onewebtool.com如何在不使用 CMS 检测器的情况下手动检测 CMS 结论 在开始构建自己的数字足迹之前,大多数人会…...
前端面经 作用域和作用域链
含义:JS中变量生效的区域 分类:全局作用域 或者 局部作用域 局部作用域:函数作用域 和 块级作用域ES6 全局作用域:在代码中任何地方都生效 函数中定义函数中生效,函数结束失效 块级作用域 使用let或const 声明 作用域链:JS查…...
开启智能Kubernetes管理新时代:kubectl-ai让操作更简单!
在如今的科技世界中,Kubernetes 已经成为容器编排领域的标杆,几乎所有现代应用的基础设施都离不开它。然而,面对复杂的集群管理和日常运维,许多开发者常常感到无所适从。今天,我们将为大家介绍一款结合了人工智能的强大工具——kubectl-ai。它不仅能帮助开发者更加顺畅地与…...
STM32 ADC
目录 ADC简介 逐次逼近型ADC STM32 ADC框图 输入通道 转换模式 •单次转换,非扫描模式 •连续转换,非扫描模式 •单次转换,扫描模式 •连续转换,扫描模式 触发控制 数据对齐 转换时间 校准 硬件电路 A…...
nextjs站点地图sitemap添加
app/sitemap.xml/route.ts (主站点地图索引) sitemap.xml 为文件夹名称 route.ts代码如下: import { NextResponse } from next/server; import { url } from /config/navigation; export async function GET() {// const entries generateMonthlyEntries();con…...
TCP/IP和OSI对比
TCP/IP模型的实际特性 网络层(IP层) 仅提供无连接的不可靠服务:TCP/IP模型的网络层核心协议是IP(Internet Protocol),其设计是无连接且不可靠的。IP数据包独立传输,不保证顺序、不确认交…...
【hadoop】Hbase java api 案例
代码实现: HBaseConnection.java package com.peizheng.bigdata;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client…...
深入理解Spring缓存注解:@Cacheable与@CacheEvict
在现代应用程序开发中,缓存是提升系统性能的重要手段。Spring框架提供了一套简洁而强大的缓存抽象,其中Cacheable和CacheEvict是两个最常用的注解。本文将深入探讨这两个注解的工作原理、使用场景以及最佳实践。 1. Cacheable注解 基本概念 Cacheable…...
[git]如何关联本地分支和远程分支
主题 本文总结如何关联git本地分支和远程分支的相关知识点。 详情 查看本地分支 git branch 查看远程分支 git branch -r 查看所有分支(本地远程) git branch -a 查看本地分支及其关联的远程分支(如有) git branch -vv 关联本地分支到远程分支: git branch …...
Linux58 ssh服务配置 jumpserver 测试双网卡 为何不能ping通ip地址
判断为NAT模式网卡 能ping 通外网 ens34为仅主机模式网卡 [rootlocalhost network-scripts]# ip route show default default via 10.1.1.254 dev ens33 proto static metric 100 10.0.0.0/8 dev ens33 proto kernel scope link src 10.1.1.37 metric 100 11.0.0.0/8 dev…...
chart.js 柱状图Y轴数据设置起始值
事情的起因, 我以为是: chart.js 柱状图Y轴数据显示不全, 因为数据是浮点数, 换了整数测试还不行, 多次更换数据, 数据显示不全仍然存在, 而且是不固定位置的不显示。 直到相同数据换了折…...
算法题(142):木材加工
审题: 本题需要我们找到可以将木头切割至少k段的单段长度最长值 思路: 方法一:暴力解法 首先我们知道单段长度的最长值就是数组中数据的最大值max,所以我们可以遍历1~max的数据,将他们确定为l,然后计算出当…...
嵌入式学习--江协51单片机day3
今天学的东西挺多的,包括:自己设计的小应用,矩阵键盘,矩阵键盘密码锁,控制按键led流水灯,定时器时钟 (那个视频真的煎熬,连续两个1小时的简直要命,那个时钟也是听的似懂…...
Linux命令行参数注入详解
本文主要聚焦 Linux 系统及其 ELF 二进制文件,深入探讨参数注入的原理与防范措施。 核心术语解析 在进入主题之前,我们先厘清几个关键术语,以确保理解的准确性: 参数解析器 当程序被调用时,操作系统会向程序的 main…...
【HCIP】----OSPF综合实验
实验题目 实验需求: 1,R5为ISP,其上只能配置IP地址;R4作为企业边界路由器, 出口公网地址需要通过PPP协议获取,并进行chap认证 2,整个OSPF环境IP基于172.16.0.0/16划分; 3࿰…...
C++模板笔记
Cpp模板笔记 文章目录 Cpp模板笔记1. 为什么要定义模板2. 模板的定义2.1 函数模板2.1.1 函数模板的重载2.1.2 头文件与实现文件形式(重要)2.1.3 模板的特化2.1.4 模板的参数类型2.1.5 成员函数模板2.1.6 使用模板的规则 2.2 类模板2.3 可变参数模板 模板…...
二极管的动态特性
主要内容 二极管的单向导电特性并不十分理想,这是因为二极管的本质是有P型半导体和N型半导体接触形成的PN结。 PN结除了除了构成单向到点的二极管外,还存在一个结电容,这个结电容会导致"双向"导电。 也就是说,这会让二…...
WSL(Windows Subsystem for Linux)入门
目录 1.简介2.安装与配置3.常用命令4.进阶使用4.1 文件系统交互4.2 网络互通4.3 配置代理4.4 运行 GUI 程序4.5 Docker 集成 1.简介 WSL 是 Windows 系统内置的 Linux 兼容层,允许直接在 Windows 中运行 Linux 命令行工具和应用程序,无需虚拟机或双系统…...
k8s术语之secret
在kubernetes中,还存在一种和ConfigMap非常类似的对象,称之为Secret对象。它主要用于存储敏感信息,例如密码、密钥、证书等等。 首先使用base64对数据进行编码 rootmaster pvs ]# echo -n admin | base64 YWRtaW4 实例:隐藏mysql密…...
Vue2 中 el-dialog 封装组件属性不生效的深度解析(附 $attrs、inheritAttrs 原理)
Vue2 中 el-dialog 封装组件属性不生效的深度解析(附 $attrs、inheritAttrs 原理) 在使用 Vue2 和 Element UI 进行组件封装时,我们常会遇到父组件传入的属性不生效的情况,比如在封装的 el-dialog 组件中传入 width"100%&qu…...
使用Compose编排工具搭建Ghost博客系统
序:需要提前自备一台部署好docker环境的虚拟机,了解并熟练compose编排工具 在Centos7中,在线/离线安装Docker:https://blog.csdn.net/2301_82085712/article/details/147140694 Docker编排工具---Compose的概述及使用࿱…...
车载网络TOP20核心概念科普
一、基础协议与总线技术 CAN总线 定义:控制器局域网,采用差分信号传输,速率最高1Mbps,适用于实时控制(如动力系统)。形象比喻:如同“神经系统”,负责传递关键控制信号。 LIN总线 定…...
机器学习第一讲:机器学习本质:让机器通过数据自动寻找规律
机器学习第一讲:机器学习本质:让机器通过数据自动寻找规律 资料取自《零基础学机器学习》。 查看总目录:学习大纲 一、从婴儿学说话说起 👶 想象你教1岁宝宝认「狗」: 第一阶段:指着不同形态的狗&#x…...
Android ImageView 加载 Base64编码图片
在 Android 中显示服务端返回的 Base64 编码的 GIF 图片(如 data:image/gif;base64,...),需要以下步骤: 首先从字符串中分离出纯 Base64 部分(去掉 data:image/gif;base64, 前缀)image/gif 表示图片是 gif…...
如何使用 QuickAPI 推动医院数据共享 —— 基于数据仓库场景的实践
目录 01 医疗行业面临的数据孤岛问题 02 QuickAPI:将 SQL 变为数据 API 的利器 03 快速落地的应用实践 ✅ 步骤一:统一 SQL 逻辑,模块化管理 ✅ 步骤二:配置权限与调用策略 ✅ 步骤三:上线并接入调用 04 核心收…...
【5G通信】bwp和redcap 随手记 2
好的,让我们重新解释Cell-Defined BWP (CD BWP) 和 Non-Cell-Defined BWP (NCD BWP),并结合RedCap终端和非RedCap终端的应用进行说明。 一、定义 Cell-Defined BWP (CD BWP) 定义:由网络(基站)通过RRC信令为终端配置的…...
AI时代企业应用系统架构的新思路与CIO变革指南
作为制造企业CIO,我们看问题需要有前瞻性,AI时代企业应用系统架构需要进行全面转型。 一、新思想与新技术 1. 核心新思想 可视化开发AI的融合模式:不再只依赖纯代码开发或传统低代码,而是两者结合,通过AI理解自然语…...
kotlin JvmName注解的作用和用途
1. JvmName 注解的作用 JvmName 是 Kotlin 提供的一个注解,用于在编译为 Java 字节码时自定义生成的类名或方法名。 作用对象: 文件级别(整个 .kt 文件)函数、属性、类等成员 主要用途: 控制 Kotlin 编译后生成的 JV…...
为什么强调 RESTful 的无状态性?-优雅草卓伊凡
为什么强调 RESTful 的无状态性?-优雅草卓伊凡 RESTful 架构的核心原则之一是 无状态性(Statelessness),它要求 每次客户端请求必须包含服务器处理该请求所需的所有信息,服务器不会存储客户端的状态(如会话…...
信息系统项目管理工程师备考计算类真题讲解十五
一、决策论问题 分析:首先要明白几个概念: 1)最大最大准则(Maxmax):也称乐观主义准则,其决策原则为“大中取大” 2)最大最小准则(Maxmin):也称悲观主义准则,…...
android-ndk开发(9): undefined reference to `__aarch64_ldadd4_acq_rel` 报错分析
1. 概要 基础库 libbase.a 基于 android ndk r18b 编译, 被算法库 libfoo.so 和算法库 libbar.a 依赖, 算法库则分别被 libapp1.so 和 libapp2.so 依赖。 libapp1.so 的开发者向 libfoo.so 的开发者反馈了链接报错: error: undefined symb…...
Java 对象克隆(Object Cloning)详解
Java 对象克隆(Object Cloning)详解 对象克隆是指创建一个对象的精确副本,Java 提供了两种克隆方式:浅克隆(Shallow Clone)和深克隆(Deep Clone)。下面从实现原理、使用场景到注意事项全面解析。 一、克隆的基本概念 1. 为什么要克隆? 需要对象副本时避免修改原始对…...
Asp.Net Core IIS发布后PUT、DELETE请求错误405
一、方案1 1、IIS管理器,处理程序映射。 2、找到aspNetCore,双击。点击请求限制...按钮,并在谓词选项卡上,添加两者DELETE和PUT. 二、方案2 打开web.config文件,添加<remove name"WebDAVModule" />&…...
AI搜索的未来:技术纵深发展与关键突破路径
一、模型架构的颠覆性重构 1、混合专家系统(MoE)的工程化突破 动态路径选择:Google Gemini 1.5 Pro采用MoE架构,其门控网络(Gating Network)通过实时计算输入token与128个专家模型的余弦相似度,动态分配计算资…...
从艾米・阿尔文看 CTO 的多面特质与成长路径
在《对话 CTO,驾驭高科技浪潮》的开篇,艾米・阿尔文的经历如同一扇窗,为我们展现出首席技术官丰富而立体的世界。通过深入探究这一章节,我们能洞察 CTO 在技术领域前行所需的特质、面临的挑战,以及成长发展的脉络。 一…...
记录阿里云服务器搭建FTP服务器的注意事项
在阿里云服务器上(centos)系统,使用vsftpd搭建了一台FTP服务器。 搭建过程中,也留意到了操作防火墙放行端口。但搭建成功后,仍无法访问。 问题是:还需要在阿里云控制台设置一下。 在此记录。 1、登陆账…...
第二章 Logback的架构(三)
Logger, Appenders 和 Layouts 工作原理概述 在介绍了基本的Logback组件之后,我们现在可以描述当用户调用Logger的打印方法时,Logback框架日志请求的执行步骤。 现在让我们分析一下当用户调用名为com.wombat的Logger的info()方法时,Logback…...
第十六届蓝桥杯大赛软件赛C/C++大学B组部分题解
第十六届蓝桥杯大赛软件赛C/C大学B组题解 试题A: 移动距离 问题描述 小明初始在二维平面的原点,他想前往坐标(233,666)。在移动过程中,他只能采用以下两种移动方式,并且这两种移动方式可以交替、不限次数地使用: 水平向右移动…...
服务器数据恢复—Linux操作系统服务器意外断电导致部分文件丢失的数据恢复
服务器数据恢复环境&故障: 一台安装linux系统的服务器意外断电。管理员重启服务器后进行检测,发现服务器上部分文件丢失。管理员没有进行任何操作,直接将服务器正常关机并切断电源。 服务器数据恢复过程: 1、北亚企安数据恢复…...
技术视界 | 青龙机器人训练地形详解(三):复杂地形精讲之台阶
在前两篇中,我们依次讲解了“如何创建一个地形”以及“如何将地形添加到训练环境中”。从基础出发,逐步构建机器人可交互的三维仿真环境。在机器人强化学习训练中,地形的复杂度决定了策略的泛化能力,仅靠 jump_plat 和 jump_pit 等…...
Android 位掩码操作(和~和|的二进制运算)
在 Android 开发中,位掩码操作通过二进制位的逻辑运算实现高效的状态管理。以下以 &(与)、|(或)和 ~(非)运算符为例,详细说明其二进制计算过程: 一、按位与 & 运…...
【JS逆向基础】前端基础-HTML与CSS
1,flask框架 以下是一个使用flask框架写成的serve程序 # noinspection PyUnresolvedReferences #Flash框架的基本内容from flask import Flask app Flask(__name__)app.route(/index) def index():return "hello index"app.route(/login) def login():re…...
高速供电,一步到位——以太联-Intellinet 9口2.5G PoE++非管理型交换机_562140:网络升级的理想之选
在数字化浪潮席卷全球的当下,高速稳定的网络连接已成为企业运营、家庭娱乐以及各类智能场景正常运转的基石。从企业办公场景中员工对高效协同办公的追求,到家庭环境里用户对流畅高清视频、在线游戏的渴望,再到智慧城市建设中大量监控设备、无…...
rom定制系列------红米note12 5G版miui14修改型号root版 原生安卓14批量线刷固件 原生安卓15等
红米Note 12 5G机型也称为 Note 12R Pro,机型代码:sunstone 高通骁龙4 Gen1八核处理器适用于以下型号的小米机型:22111317G, 22111317I, 22101317C miui14稳定版 14.0.10安卓13固件 根据客户需求,采用miui最后一个版本。修改以…...
机器学习 数据集
数据集 1. scikit-learn工具介绍1.1 scikit-learn安装1.2 Scikit-learn包含的内容 2 数据集2.1 sklearn玩具数据集介绍2.2 sklearn现实世界数据集介绍2.3 sklearn加载玩具数据集示例1:鸢尾花数据示例2:分析糖尿病数据集 2.4 sklearn获取现实世界数据集示…...
JVM运行时数据区域(Run-Time Data Areas)的解析
# JVM运行时数据区域(Run-Time Data Areas)的解析 欢迎来到我的博客:TWind的博客 我的CSDN::Thanwind-CSDN博客 我的掘金:Thanwinde 的个人主页 本文参考于:深入理解Java虚拟机:JVM高级特性与最佳实践 本文的JVM均…...
python基础:序列和索引-->Python的特殊属性
一.序列和索引 1.1 用索引检索字符串中的元素 # 正向递增 shelloworld for i in range (0,len(s)):# i是索引print(i,s[i],end\t\t) print(\n--------------------------) # 反向递减 for i in range (-10,0):print(i,s[i],end\t\t)print(\n--------------------------) print(…...
在k8s中,如何实现服务的访问,k8s的ip是变化的,怎么保证能访问到我的服务
在K8S中,Pod的IP动态变化确实无法直接通过固定IP限制访问,但可以通过标签(Label)、服务(Service)和网络策略(NetworkPolicy)的组合,实现动态身份识别的访问控制ÿ…...
用NVivo革新企业创新:洞悉市场情绪,引领金融未来
在当今快速变化的商业环境中,理解市场和客户的情感脉动是企业成功的关键。尤其在金融行业,无论是评估经济走势、股票市场波动,还是洞察消费者信心,精准把握隐藏在新闻报道、社交媒体和消费者反馈中的情感倾向至关重要。而NVivo这款…...