当前位置: 首页 > news >正文

Nacos源码—7.Nacos升级gRPC分析三

大纲

5.服务变动时如何通知订阅的客户端

6.微服务实例信息如何同步集群节点

5.服务变动时如何通知订阅的客户端

(1)服务注册和服务订阅时发布的客户端注册和订阅事件的处理

(2)延迟任务的执行引擎源码

(3)处理客户端注册和订阅事件时发布的服务变动和服务订阅事件的处理

(1)服务注册和服务订阅时发布的客户端注册和订阅事件的处理

一.服务注册

Nacos客户端注册服务实例时,Nacos服务端会发布ClientRegisterServiceEvent客户端注册服务实例事件。Nacos服务端在处理客户端注册服务实例事件时,会把clientId写入到注册表,然后接着发布ServiceChangedEvent服务改变事件。

//Instance request handler.
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {private final EphemeralClientOperationServiceImpl clientOperationService;public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {this.clientOperationService = clientOperationService;}@Override@Secured(action = ActionTypes.WRITE)public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {//根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {case NamingRemoteConstants.REGISTER_INSTANCE://注册实例return registerInstance(service, request, meta);case NamingRemoteConstants.DE_REGISTER_INSTANCE://注销实例return deregisterInstance(service, request, meta);default:throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));}}private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {//调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;//参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名//参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息//参数meta.getConnectionId():这个参数很关键,它是连接IDclientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {//调用EphemeralClientOperationServiceImpl的注销方法deregisterInstance()clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);}
}//Operation service for ephemeral clients and services.
@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {private final ClientManager clientManager;public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {this.clientManager = clientManager;}@Overridepublic void registerInstance(Service service, Instance instance, String clientId) {//从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象Service singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));}//从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//将请求中的instance实例信息封装为InstancePublishInfo对象InstancePublishInfo instanceInfo = getPublishInfo(instance);//往Client对象里添加已注册的服务对象Service,调用的是IpPortBasedClient对象的父类AbstractClient的addServiceInstance()方法client.addServiceInstance(singleton, instanceInfo);//设置IpPortBasedClient对象的lastUpdatedTime属性为最新时间client.setLastUpdatedTime();//发布客户端注册服务实例的事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));//发布服务实例元数据的事件NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}...
}//Client and service index manager.
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();...//处理客户端注册事件ClientRegisterServiceEvent@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientOperation(ClientOperationEvent event) {Service service = event.getService();String clientId = event.getClientId();if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {//处理客户端注册事件ClientRegisterServiceEventaddPublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {//处理客户端注销事件ClientDeregisterServiceEventremovePublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {//处理客户端订阅服务事件ClientSubscribeServiceEventaddSubscriberIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {//处理客户端取消订阅事件ClientUnsubscribeServiceEventremoveSubscriberIndexes(service, clientId);}}private void addPublisherIndexes(Service service, String clientId) {//判断注册表是否存在该Service,不存在则创建一个空的ConcurrentHashSetpublisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());//把clientId放入到对应的Service中publisherIndexes.get(service).add(clientId);//发布服务改变事件NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}...
}

二.服务订阅

客户端查询微服务实例列表进行服务发现时,调用的是订阅接口。服务端处理客户端的订阅请求时会发布ClientSubscribeServiceEvent事件,这个事件的处理逻辑是先向订阅表添加clientId到所订阅服务对应的集合中,如果第一次添加clientId则发布一个ServiceSubscribedEvent服务订阅事件。

//Handler to handle subscribe service.
@Component
public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {private final ServiceStorage serviceStorage;private final EphemeralClientOperationServiceImpl clientOperationService;...//假设order-service需要调用stock-service的接口,那么order-service(Nacos客户端)就要向服务端订阅stock-service服务//也就是order-service需要从服务端获取到(查询出)stock-service的所有服务实例@Override@Secured(action = ActionTypes.READ)public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {String namespaceId = request.getNamespace();String serviceName = request.getServiceName();String groupName = request.getGroupName();String app = request.getHeader("app", "unknown");String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);//构建要查询的Service服务对象,对应的是stock-serivceService service = Service.newService(namespaceId, groupName, serviceName, true);//构建要订阅Service服务的订阅者,对应的是order-serviceSubscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());//1.调用ServiceStorage.getData()方法读取缓存ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service), metadataManager.getServiceMetadata(service).orElse(null), subscriber);if (request.isSubscribe()) {//2.添加订阅者,如果订阅的服务有变动,则需要通知订阅者clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());} else {clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());}return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);}
}//Operation service for ephemeral clients and services.
@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {...//添加订阅者//@param service    service:要查询的Service对象,比如stock-service//@param subscriber subscribe:订阅者,比如对应order-service//@param clientId   id of client:对应order-service与Nacos服务端的连接ID@Overridepublic void subscribeService(Service service, Subscriber subscriber, String clientId) {//传入的service是要查询的Service对象,比如stock-serviceService singleton = ServiceManager.getInstance().getSingletonIfExist(service).orElse(service);//传入的clientId是代表着order-service的Client对象,调用EphemeralIpPortClientManager.getClient()方法Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//往代表着order-service的Client对象中,添加订阅者client.addServiceSubscriber(singleton, subscriber);client.setLastUpdatedTime();//发布客户端订阅服务事件ClientSubscribeServiceEvent,也就是order-service客户端订阅了service服务NotifyCenter.publishEvent(new ClientOperationEvent.ClientSubscribeServiceEvent(singleton, clientId));}...
}@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();//订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientIdprivate final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();...//可以处理客户端注册事件ClientRegisterServiceEvent@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientOperation(ClientOperationEvent event) {Service service = event.getService();String clientId = event.getClientId();if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {//处理客户端注册事件ClientRegisterServiceEventaddPublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {//处理客户端注销事件ClientDeregisterServiceEventremovePublisherIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {//处理客户端订阅服务事件ClientSubscribeServiceEventaddSubscriberIndexes(service, clientId);} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {//处理客户端取消订阅事件ClientUnsubscribeServiceEventremoveSubscriberIndexes(service, clientId);}}private void addSubscriberIndexes(Service service, String clientId) {//传入的service是要查询的Service对象stock-service,clientId是订阅者order-service对应的客户端连接对象IDsubscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());// Fix #5404, Only first time add need notify event. 只有第一次添加时需要发布通知事件if (subscriberIndexes.get(service).add(clientId)) {//发布服务订阅事件ServiceSubscribedEventNotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));}}...
}

(2)延迟任务的执行引擎源码

一.什么是延迟任务执行引擎

延迟任务执行引擎就是可以往执行引擎中添加任务,该任务会被延时执行。Nacos的延迟任务执行引擎就是NacosDelayTaskExecuteEngine类。

Nacos会通过延迟任务执行引擎来处理服务改变事件和服务订阅事件,即ServiceChangedEvent和ServiceSubscribedEvent。

二.延迟任务执行引擎的执行原理

首先,Nacos会定义一个名为NacosTaskProcessor的任务处理器接口。NacosTaskProcessor是一个Interface ,它有很多个实现类。

然后,执行引擎会记录相关的任务处理器实现类。NacosDelayTaskExecuteEngine继承自AbstractNacosTaskExecuteEngine,AbstractNacosTaskExecuteEngine相当于任务执行引擎中心。AbstractNacosTaskExecuteEngine有两个属性来记录这些处理器实现类,并提供了两个方法可以向任务执行引擎中心添加处理器,这两个方法分别是addProcessor()方法和setDefaultTaskProcessor()方法。

接着,创建NacosDelayTaskExecuteEngine时会开启一个定时执行的任务,该定时执行的任务会定时执行ProcessRunnable的run()方法。

延时任务执行引擎有一个Map类型的tasks属性存放所有延迟执行的任务,而在ProcessRunnable的run()方法中,会触发调用其processTasks()方法。processTasks()方法会从tasks属性中获取全部的延迟任务,然后遍历处理。即先通过任务key获取具体的任务,再通过任务key获取对应的处理器,接着调用NacosTaskProcessor的process()方法,来完成延迟任务的执行。

最后,NacosDelayTaskExecuteEngine会提供一个addTask()方法,这个方法可以将延迟执行的任务添加到延时任务执行引擎的tasks属性中。

//Abstract nacos task execute engine. 任务执行引擎中心
public abstract class AbstractNacosTaskExecuteEngine<T extends NacosTask> implements NacosTaskExecuteEngine<T> {private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<>();private NacosTaskProcessor defaultTaskProcessor;...@Overridepublic void addProcessor(Object key, NacosTaskProcessor taskProcessor) {taskProcessors.putIfAbsent(key, taskProcessor);}@Overridepublic void removeProcessor(Object key) {taskProcessors.remove(key);}@Overridepublic NacosTaskProcessor getProcessor(Object key) {return taskProcessors.containsKey(key) ? taskProcessors.get(key) : defaultTaskProcessor;}@Overridepublic Collection<Object> getAllProcessorKey() {return taskProcessors.keySet();}@Overridepublic void setDefaultTaskProcessor(NacosTaskProcessor defaultTaskProcessor) {this.defaultTaskProcessor = defaultTaskProcessor;}...
}//Nacos delay task execute engine. 延迟任务执行引擎
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {private final ScheduledExecutorService processingExecutor;//任务池protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;protected final ReentrantLock lock = new ReentrantLock();...public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));//开启延时任务,即启动ProcessRunnable线程任务processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}...   @Overridepublic AbstractDelayTask removeTask(Object key) {lock.lock();try {AbstractDelayTask task = tasks.get(key);if (null != task && task.shouldProcess()) {return tasks.remove(key);} else {return null;}} finally {lock.unlock();}}@Overridepublic Collection<Object> getAllTaskKeys() {Collection<Object> keys = new HashSet<Object>();lock.lock();try {keys.addAll(tasks.keySet());} finally {lock.unlock();}return keys;}@Overridepublic void shutdown() throws NacosException {tasks.clear();processingExecutor.shutdown();}@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {lock.lock();try {AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {newTask.merge(existTask);}//最后放入到任务池中tasks.put(key, newTask);} finally {lock.unlock();}}//process tasks in execute engine.protected void processTasks() {//获取tasks中所有的任务,然后进行遍历Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {//通过任务key,获取具体的任务,并且从任务池中移除掉AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}//通过任务key获取对应的NacosTaskProcessor延迟任务处理器NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {// ReAdd task if process failed//调用获取到的NacosTaskProcessor延迟任务处理器的process()方法if (!processor.process(task)) {//如果失败了,会重试添加task回tasks这个map中retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error ", e);retryFailedTask(taskKey, task);}}}private void retryFailedTask(Object key, AbstractDelayTask task) {task.setLastProcessTime(System.currentTimeMillis());addTask(key, task);}private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}
}

(3)处理客户端注册和订阅事件时发布的服务变动和服务订阅事件的处理

一.服务端处理服务变动和服务订阅事件的入口

二.执行推送的任务PushExecuteTask说明

三.客户端收到服务端发送的Service服务实例数据推送的处理

一.服务端处理服务变动和服务订阅事件的入口

处理入口是:NamingSubscriberServiceV2Impl的onEvent()方法。其中,对事件的处理使用了双层内存队列(存储延迟任务 + 同步任务)的异步处理方式。

onEvent()方法主要会往延迟任务执行引擎中添加任务,也就是首先会根据不同的事件类型构建不同的PushDelayTask任务,然后调用延迟任务执行引擎NacosDelayTaskExecuteEngine的addTask()方法,把PushDelayTask延迟任务添加到PushDelayTaskExecuteEngine的任务池。

创建继承自NacosDelayTaskExecuteEngine的PushDelayTaskExecuteEngine延迟任务执行引擎时会创建一个定时任务,定时从任务池中取出任务,然后调用对应的任务处理器的process()方法。

PushDelayTask任务对应的任务处理器是PushDelayTaskProcessor,所以最终会触发执行PushDelayTaskProcessor的process()方法。

在执行PushDelayTaskProcessor的process()方法时,会调用NamingExecuteTaskDispatcher的dispatchAndExecuteTask()方法,提交由PushDelayTask任务封装的PushExecuteTask任务给NacosExecuteTaskExecuteEngine进行处理,此时会调用NacosExecuteTaskExecuteEngine的addTask()方法添加任务。

其中,PushExecuteTask任务会被分发到NacosExecuteTaskExecuteEngine执行引擎中的一个TaskExecuteWorker处理,TaskExecuteWorker的process()方法会把PushExecuteTask任务放入队列。由于TaskExecuteWorker初始化时会启动一个线程不断从队列中获取任务并执行,所以最终便会执行到PushExecuteTask的run()方法。

//Naming subscriber service for v2.x.
@org.springframework.stereotype.Service
public class NamingSubscriberServiceV2Impl extends SmartSubscriber implements NamingSubscriberService {private final PushDelayTaskExecuteEngine delayTaskEngine;...@Overridepublic void onEvent(Event event) {if (!upgradeJudgement.isUseGrpcFeatures()) {return;}if (event instanceof ServiceEvent.ServiceChangedEvent) {//If service changed, push to all subscribers.//如果服务变动,会向Service服务的所有订阅者推送Service服务的实例信息,让订阅者(客户端)更新本地缓存ServiceEvent.ServiceChangedEvent serviceChangedEvent = (ServiceEvent.ServiceChangedEvent) event;Service service = serviceChangedEvent.getService();//调用NacosDelayTaskExecuteEngine.addTask()方法,往延迟任务执行引擎添加任务delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay()));} else if (event instanceof ServiceEvent.ServiceSubscribedEvent) {//If service is subscribed by one client, only push this client.//如果Service服务被一个客户端订阅,则只推送Service服务的实例信息给该客户端ServiceEvent.ServiceSubscribedEvent subscribedEvent = (ServiceEvent.ServiceSubscribedEvent) event;Service service = subscribedEvent.getService();//调用NacosDelayTaskExecuteEngine.addTask()方法,往延迟任务执行引擎添加任务delayTaskEngine.addTask(service, new PushDelayTask(service, PushConfig.getInstance().getPushTaskDelay(), subscribedEvent.getClientId()));}}...
}public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {...private static class PushDelayTaskProcessor implements NacosTaskProcessor {    private final PushDelayTaskExecuteEngine executeEngine;public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {this.executeEngine = executeEngine;}@Overridepublic boolean process(NacosTask task) {//任务类型转换PushDelayTask pushDelayTask = (PushDelayTask) task;//获取要推送的服务;比如某服务发生改变时,需要推送该服务的实例给订阅的客户端;比如某服务被订阅时,需要推送该服务的实例给对应的客户端;Service service = pushDelayTask.getService();//调用NamingExecuteTaskDispatcher.dispatchAndExecuteTask()方法//提交PushExecuteTask线程任务给NacosExecuteTaskExecuteEngine来处理NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));return true;}}
}public class NamingExecuteTaskDispatcher {private static final NamingExecuteTaskDispatcher INSTANCE = new NamingExecuteTaskDispatcher();private final NacosExecuteTaskExecuteEngine executeEngine;private NamingExecuteTaskDispatcher() {executeEngine = new NacosExecuteTaskExecuteEngine(EnvUtil.FUNCTION_MODE_NAMING, Loggers.SRV_LOG);}public static NamingExecuteTaskDispatcher getInstance() {return INSTANCE;}public void dispatchAndExecuteTask(Object dispatchTag, AbstractExecuteTask task) {executeEngine.addTask(dispatchTag, task);}public String workersStatus() {return executeEngine.workersStatus();}
}public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {private final TaskExecuteWorker[] executeWorkers;public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {super(logger);executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];for (int mod = 0; mod < dispatchWorkerCount; ++mod) {executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());}}...@Overridepublic void addTask(Object tag, AbstractExecuteTask task) {//根据tag获取到TaskExecuteWorkerNacosTaskProcessor processor = getProcessor(tag);if (null != processor) {processor.process(task);return;}TaskExecuteWorker worker = getWorker(tag);//调用TaskExecuteWorker.process()方法把AbstractExecuteTask任务放入到队列当中去worker.process(task);}...
}public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {//任务存储容器private final BlockingQueue<Runnable> queue;public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {...this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);new InnerWorker(name).start();}...@Overridepublic boolean process(NacosTask task) {if (task instanceof AbstractExecuteTask) {//把NacosTask任务放入到阻塞队列中putTask((Runnable) task);}return true;}private void putTask(Runnable task) {try {//把NacosTask任务放入到阻塞队列中queue.put(task);} catch (InterruptedException ire) {log.error(ire.toString(), ire);}}...private class InnerWorker extends Thread {InnerWorker(String name) {setDaemon(false);setName(name);}@Overridepublic void run() {while (!closed.get()) {try {//一直取阻塞队列中的任务Runnable task = queue.take();long begin = System.currentTimeMillis();//调用NacosTask中的run方法task.run();long duration = System.currentTimeMillis() - begin;if (duration > 1000L) {log.warn("task {} takes {}ms", task, duration);}} catch (Throwable e) {log.error("[TASK-FAILED] " + e.toString(), e);}}}}
}

二.执行推送的任务PushExecuteTask说明

在PushExecuteTask的run()方法中,首先会从ServiceStorage获取要推送的服务Service最新的实例数据包装,然后调用PushExecuteTask的getTargetClientIds()方法获取要推送的clientId,接着根据clientId获取订阅了Service服务的的客户端订阅者对象,最后调用PushExecutorDelegate的doPushWithCallback()方法,也就是调用PushExecutorRpcImpl的doPushWithCallback()方法回调客户端,即调用RpcPushService的pushWithCallback()方法回调客户端,即调用GrpcConnection的asyncRequest()方法向客户端发送RPC请求。

执行PushExecuteTask的getTargetClientIds()方法获取要推送的clientId时,会根据PushDelayTask的pushToAll属性来获取对应的clientId。因为在NamingSubscriberServiceV2Impl的onEvent()方法中,如果处理的是服务改变事件,则构造的PushDelayTask是面向所有客户端。如果处理的是服务订阅事件,则构造的PushDelayTask是面向一个客户端。

所以如果PushDelayTask要面向所有客户端推送Service服务实例数据,那么就调用ClientServiceIndexesManager的getAllClientsSubscribeService()方法,从订阅者列表中获取订阅了Service服务的所有clientId。如果PushDelayTask要面向单个客户端推送Service服务实例数据,则通过PushDelayTask的getTargetClients()方法获取对应的clientId即可。

总结:服务变动需要通知全部订阅了该Service服务的客户端对象,服务订阅只需要通知当前订阅者客户端对象即可。

//Nacos naming push execute task.
public class PushExecuteTask extends AbstractExecuteTask {//要推送的Service服务//比如某服务发生改变时,需要推送该服务的实例给订阅的客户端;比如某服务被订阅时,需要推送该服务的实例给对应的客户端;private final Service service;private final PushDelayTaskExecuteEngine delayTaskEngine;private final PushDelayTask delayTask;public PushExecuteTask(Service service, PushDelayTaskExecuteEngine delayTaskEngine, PushDelayTask delayTask) {this.service = service;this.delayTaskEngine = delayTaskEngine;this.delayTask = delayTask;}@Overridepublic void run() {try {//从ServiceStorage获取要推送的服务Service最新的实例数据包装PushDataWrapper wrapper = generatePushData();ClientManager clientManager = delayTaskEngine.getClientManager();//遍历订阅了Service服务的、要推送Service服务实例数据的所有clientIdfor (String each : getTargetClientIds()) {//根据clientId获取客户端Client对象Client client = clientManager.getClient(each);if (null == client) {continue;}//调用AbstractClient.getSubscriber()方法//因为AbstractClient对象中存放着它订阅的服务与订阅者对象映射//所以可以根据要推送的Service服务,获取对应的客户端订阅者对象Subscriber subscriber = clientManager.getClient(each).getSubscriber(service);//传入订阅者subscriber,调用PushExecutorDelegate.doPushWithCallback()方法回调客户端delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,new NamingPushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));}} catch (Exception e) {Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));}}private PushDataWrapper generatePushData() {//调用ServiceStorage.getPushData()方法根据要推送的Service对象,获取包含所有实例信息的ServiceInfo对象ServiceInfo serviceInfo = delayTaskEngine.getServiceStorage().getPushData(service);ServiceMetadata serviceMetadata = delayTaskEngine.getMetadataManager().getServiceMetadata(service).orElse(null);return new PushDataWrapper(serviceMetadata, serviceInfo);}private Collection<String> getTargetClientIds() {//通过PushDelayTask的pushToAll属性控制是否对全部订阅了Service服务的客户端Client,进行推送//处理服务改变事件时,delayTask.isPushToAll()就是true//处理服务订阅事件时,delayTask.getTargetClients()就是指定的客户端Client//其中getAllClientsSubscribeService()会从订阅者列表中获取订阅了Service服务的所有clientIdreturn delayTask.isPushToAll() ? delayTaskEngine.getIndexesManager().getAllClientsSubscribeService(service): delayTask.getTargetClients();}...
}public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {private final ClientManager clientManager;private final ClientServiceIndexesManager indexesManager;private final ServiceStorage serviceStorage;private final NamingMetadataManager metadataManager;private final PushExecutor pushExecutor;...
}public class PushDelayTask extends AbstractDelayTask {    private final Service service;private boolean pushToAll;private Set<String> targetClients;//处理服务变动事件,创建PushDelayTask任务时所使用的构造方法public PushDelayTask(Service service, long delay) {this.service = service;pushToAll = true;targetClients = null;setTaskInterval(delay);setLastProcessTime(System.currentTimeMillis());}//处理服务订阅事件,创建PushDelayTask任务时所使用的构造方法public PushDelayTask(Service service, long delay, String targetClient) {this.service = service;this.pushToAll = false;this.targetClients = new HashSet<>(1);//把clientId添加到targetClients中,这个clientId就是发起服务订阅的客户端与服务端建立长连接后的客户端连接IDthis.targetClients.add(targetClient);setTaskInterval(delay);setLastProcessTime(System.currentTimeMillis());}...
}@Component
public class ServiceStorage {//缓存要查询的Service服务对象对应的已注册的服务详情private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;...public ServiceInfo getPushData(Service service) {//调用ServiceStorage.emptyServiceInfo()方法创建空的ServiceInfo对象ServiceInfo result = emptyServiceInfo(service);if (!ServiceManager.getInstance().containSingleton(service)) {return result;}//调用ServiceStorage.getAllInstancesFromIndex()方法获服务取实例列表//ServiceInfo的hosts属性就包含了该服务的所有Instance实例数据result.setHosts(getAllInstancesFromIndex(service));//将获取到的ServiceInfo对象放入到缓存中serviceDataIndexes.put(service, result);return result;}...
}//Client and service index manager.
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();//订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientIdprivate final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();...public Collection<String> getAllClientsSubscribeService(Service service) {//从订阅者列表中获取订阅了Service服务的所有clientIdreturn subscriberIndexes.containsKey(service) ? subscriberIndexes.get(service) : new ConcurrentHashSet<>();}...
}public abstract class AbstractClient implements Client {//subscribers存放着:订阅者Subscriber(其实可理解为当前客户端)订阅了的Service服务//subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);...@Overridepublic Subscriber getSubscriber(Service service) {return subscribers.get(service);}@Overridepublic boolean addServiceSubscriber(Service service, Subscriber subscriber) {//服务订阅时,添加订阅者//subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)if (null == subscribers.put(service, subscriber)) {MetricsMonitor.incrementSubscribeCount();}return true;}...
}@Component
public class PushExecutorDelegate implements PushExecutor {private final PushExecutorRpcImpl rpcPushExecuteService;private final PushExecutorUdpImpl udpPushExecuteService;...@Overridepublic void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack) {getPushExecuteService(clientId, subscriber).doPushWithCallback(clientId, subscriber, data, callBack);}private PushExecutor getPushExecuteService(String clientId, Subscriber subscriber) {Optional<SpiPushExecutor> result = SpiImplPushExecutorHolder.getInstance().findPushExecutorSpiImpl(clientId, subscriber);if (result.isPresent()) {return result.get();}return clientId.contains(IpPortBasedClient.ID_DELIMITER) ? udpPushExecuteService : rpcPushExecuteService;}...
}@Component
public class PushExecutorRpcImpl implements PushExecutor {private final RpcPushService pushService;...@Overridepublic void doPushWithCallback(String clientId, Subscriber subscriber, PushDataWrapper data, PushCallBack callBack) {pushService.pushWithCallback(clientId, NotifySubscriberRequest.buildNotifySubscriberRequest(getServiceInfo(data, subscriber)), callBack, GlobalExecutor.getCallbackExecutor());}...
}//push response  to clients.
@Service
public class RpcPushService {@Autowiredprivate ConnectionManager connectionManager;...//push response with no ack.public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack, Executor executor) {Connection connection = connectionManager.getConnection(connectionId);if (connection != null) {try {//调用GrpcConnection.asyncRequest()方法向客户端发送推送请求connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) {@Overridepublic Executor getExecutor() {return executor;}@Overridepublic void onResponse(Response response) {if (response.isSuccess()) {requestCallBack.onSuccess();} else {requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage()));}} @Overridepublic void onException(Throwable e) {requestCallBack.onFail(e);}});} catch (ConnectionAlreadyClosedException e) {connectionManager.unregister(connectionId);requestCallBack.onSuccess();} catch (Exception e) {Loggers.REMOTE_DIGEST.error("error to send push response to connectionId ={},push response={}", connectionId, request, e);requestCallBack.onFail(e);}} else {requestCallBack.onSuccess();}}...
}

三.客户端收到服务端发送的Service服务实例数据推送的处理

NamingPushRequestHandler的requestReply()方法会处理服务端的推送,即调用ServiceInfoHolder的processServiceInfo()方法更新本地缓存。

public class NamingPushRequestHandler implements ServerRequestHandler {private final ServiceInfoHolder serviceInfoHolder;public NamingPushRequestHandler(ServiceInfoHolder serviceInfoHolder) {this.serviceInfoHolder = serviceInfoHolder;}@Overridepublic Response requestReply(Request request) {if (request instanceof NotifySubscriberRequest) {//进行类型转换NotifySubscriberRequest notifyResponse = (NotifySubscriberRequest) request;//更新客户端本地缓存数据serviceInfoHolder.processServiceInfo(notifyResponse.getServiceInfo());return new NotifySubscriberResponse();}return null;}
}public class ServiceInfoHolder implements Closeable {private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;...public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}//获取本地缓存中的服务实例ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (isEmptyOrErrorPush(serviceInfo)) {return oldService;}//更新本地缓存中的服务实例serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);//判断服务实例是否有改变boolean changed = isChangedServiceInfo(oldService, serviceInfo);if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(), JacksonUtils.toJson(serviceInfo.getHosts()));//发布服务实例改变事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));//将服务实例信息写入本地磁盘DiskCache.write(serviceInfo, cacheDir);}return serviceInfo;}...
}

(4)总结

相关文章:

Nacos源码—7.Nacos升级gRPC分析三

大纲 5.服务变动时如何通知订阅的客户端 6.微服务实例信息如何同步集群节点 5.服务变动时如何通知订阅的客户端 (1)服务注册和服务订阅时发布的客户端注册和订阅事件的处理 (2)延迟任务的执行引擎源码 (3)处理客户端注册和订阅事件时发布的服务变动和服务订阅事件的处理 (…...

量化学习DAY2-开始批量提交alpha!

量化学习第二天笔记 一、World Quant平台的Alpha概念 在World Quant平台中&#xff0c;alpha本质上是一个数学公式&#xff0c;它是**operator&#xff08;操作&#xff09;与Data&#xff08;数据&#xff09;**的组合。 &#xff08;一&#xff09;Data相关 Data&#xf…...

【Qwen3_ 4b lora xinli 】 task完成实践记录

task 我需要 基于llamafactory框架选取基本上相同的数据集用lora微调Qwen3_ 4b两次并保存lora参数然后分别合并这两个lora参数到基座模型。再换个数据集上接着进行微调。并且保存新的lora参数&#xff0c;然后我们匹配这里面的特征值和特征向量&#xff0c;如果这两个新的lora…...

文旅田园康养小镇规划设计方案PPT(85页)

1. 项目背景与定位 背景&#xff1a;位于长三角经济圈&#xff0c;依托安吉丰富的自然与文化资源&#xff0c;旨在打造集康养、度假、文化体验于一体的综合小镇。 定位&#xff1a;成为浙北地区知名的康养旅游目的地&#xff0c;融合“一溪两岸”规划理念&#xff0c;实现全面…...

[Windows] 能同时打开多个图片的图像游览器JWSEE v2.0

[Windows] 能同时打开多个图片的图像游览器JWSEE 链接&#xff1a;https://pan.xunlei.com/s/VOPpO86Hu3dalYLaZ1ivcTGIA1?pwdhckf# 十多年前收藏的能同时打开多个图片的图像游览器JWSEE v2.0&#xff0c;官网已没有下载资源。 JWSEE v2.0是乌鲁木齐金维图文信息科技有限公司…...

低成本自动化改造技术锚点深度解析

执行摘要 本文旨在深入剖析四项关键的低成本自动化技术&#xff0c;这些技术为工业转型提供了显著的运营和经济效益。文章将提供实用且深入的指导&#xff0c;涵盖老旧设备联网、AGV车队优化、空压机系统智能能耗管控以及此类项目投资回报率&#xff08;ROI&#xff09;的严谨…...

23盘古石决赛

一&#xff0c;流量分析 1. 计算流量包文件的SHA256值是&#xff1f;[答案&#xff1a;字母小写][★☆☆☆☆] 答案&#xff1a;2d689add281b477c82b18af8ab857ef5be6badf253db1c1923528dd73b3d61a9 解压出来流量包计算 2. 流量包长度在“640 - 1279”之间的的数据包总共有多少…...

C语言—指针3

1. 数组名的理解 观察以下代码 可以观察到pa指向的地址与数组首元素地址相同&#xff0c;那么可以说明数组就是首元素地址吗&#xff1f; 这种说法是不严谨的&#xff0c;观察以下代码&#xff1a; 程序输出的结果为16&#xff0c;此时的arr表示的是整个数组的大小。 观察以…...

操作系统 第2章节 进程,线程和作业

一:多道程序设计 1-多道程设计的目的 for:提高吞吐量(作业道数/处理时间),我们可以从提高资源的利用率出发 2-单道程序设计缺点: 设备的利用率低,内存的利用率低,处理机的利用率低 比如CPU去访问内存,CPU空转.内存等待CPU访问也是没有任何操作的.要是有多个东西要去访问不冲…...

数字化转型-4A架构之数据架构

系列文章 数字化转型-4A架构&#xff08;业务架构、应用架构、数据架构、技术架构&#xff09; 数字化转型-4A架构之业务架构 数字化转型-4A架构之应用架构 数据架构 Data Architecture&#xff08;DA&#xff09; 1. 定义 数据架构&#xff0c;是组织管理数据资产的科学之…...

Java中的反射

目录 什么是反射 反射的核心作用 反射的核心类 反射的基本使用 获取Class对象 创建对象 操作字段&#xff08;Field&#xff09; 调用方法&#xff08;Method&#xff09; 反射的应用场景 反射的优缺点 优点 缺点 示例&#xff1a;完整反射操作 总结 什么是反射 …...

LINUX CFS算法解析

文章目录 1. Linux调度器的发展历程2. CFS设计思想3. CFS核心数据结构3.1 调度实体(sched_entity)3.2 CFS运行队列(cfs_rq)3.3 任务结构体中的调度相关字段 4. 优先级与权重4.1 优先级范围4.2 权重映射表 (prio_to_weight[])优先级计算4.3.1. static_prio (静态优先级)4.3.2. n…...

内网渗透——红日靶场三

目录 一、前期准备 二、外网探测 1.使用nmap进行扫描 2.网站信息收集 3.漏洞复现(CVE-2021-23132) 4.disable_function绕过 5.反弹shell&#xff08;也&#xff0c;并不是&#xff09; 6.SSH登录 7.权限提升&#xff08;脏牛漏洞&#xff09; 8.信息收集 9.上线msf 三…...

The 2024 ICPC Kunming Invitational Contest G. Be Positive

https://codeforces.com/gym/105386/problem/G 题目&#xff1a; 结论&#xff1a; 从0开始每四个相邻数的异或值为0 代码&#xff1a; #include<bits/stdc.h> using namespace std; #define int long long void solve() {int n;cin >> n;if(n1||n%40){cout &…...

CommunityToolkit.Mvvm详解

属性可视化 给一个属性添加ObservableProperty就可以可视化了 [ObservableProperty] private string currentNameInfo;[ObservableProperty] private string currentClassInfo;[ObservableProperty] private string currentPhoneInfo;xaml中只需要绑定大写的属性就可以了 &l…...

密码学--AES

一、实验目的 1、完成AES算法中1轮加密和解密操作 2、掌握AES的4个基本处理步骤 3、理解对称加密算法的“对称”思想 二、实验内容 1、题目内容描述 &#xff08;1&#xff09;利用C语言实现字节代换和逆向字节代换&#xff0c;字节查S盒代换 &#xff08;2&#xff09;利…...

操作系统的初步了解

目录 引言&#xff1a;什么是操作系统&#xff1f; 一、设计操作系统的目的 二、操作系统是做什么的&#xff1a; 操作系统主要有四大核心任务&#xff1a; 1. 管理硬件 2. 运行软件 3. 存储数据 4. 提供用户界面 如何理解操作系统的管理呢&#xff1f; 1. 什么是操作…...

边缘计算:技术概念与应用详解

引言 随着物联网&#xff08;IoT&#xff09;、5G 和人工智能&#xff08;AI&#xff09;的快速发展&#xff0c;传统的云计算架构在处理海量数据和实时计算需求时逐渐显现出瓶颈。边缘计算&#xff08;Edge Computing&#xff09;作为一种新兴的计算范式&#xff0c;通过将计…...

C++进阶--红黑树的实现

文章目录 红黑树的实现红黑树的概念红黑树的规则红黑树的效率 红黑树的实现红黑树的结构红黑树的插入变色单旋&#xff08;变色&#xff09;双旋&#xff08;变色&#xff09; 红黑树的查找红黑树的验证 总结&#xff1a;结语 很高兴和大家见面&#xff0c;给生活加点impetus&a…...

[C++类和对象]类和对象的引入

面向过程和面向对象 C语言是面向过程的,关注的是过程,分析出求解问题的步骤,通过函数调用来逐步解决问题 C是基于面向对象的,关注的是对象,将一件事情分成不同的对象,靠对象之间完成交互 类的引入 C语言结构体中只能定义变量,在C中,结构体不仅仅可以定义变量,而且可以定义函…...

YOLOv12云端GPU谷歌免费版训练模型

1.效果 2.打开 https://colab.research.google.com/?utm_sourcescs-index 3.上传代码 4.解压 !unzip /content/yolov12-main.zip -d /content/yolov12-main 5.进入yolov12-main目录 %cd /content/yolov12-main/yolov12-main 6.安装依赖库 !pip install -r requirements.…...

课程审核流程揭秘:确保内容合规与用户体验

业务流程 为什么课程审核通过才可以发布呢&#xff1f; 这样做为了防止课程信息有违规情况&#xff0c;课程信息不完善对网站用户体验也不好&#xff0c;课程审核不仅起到监督作用&#xff0c;也是 帮助教学机构规范使用平台的手段。 如果流程复杂用工作流 说明如下&#xff…...

【LangChain高级系列】LangGraph第一课

前言 我们今天直接通过一个langgraph的基础案例&#xff0c;来深入探索langgraph的核心概念和工作原理。 基本认识 LangGraph是一个用于构建具有LLMs的有状态、多角色应用程序的库&#xff0c;用于创建代理和多代理工作流。与其他LLM框架相比&#xff0c;它提供了以下核心优…...

ATH12K 驱动框架

ATH12K 驱动框架 ath12k驱动框架及模块交互逻辑详解1. 总体架构2. 关键数据结构2.1 核心数据结构2.2 虚拟接口数据结构3. 硬件抽象层(HAL)4. 无线管理接口(WMI)5. 主机目标通信(HTC)6. 数据路径(DP)6.1 发送路径(TX)6.2 接收路径(RX)7. 多链路操作(MLO)8. 初始化和工作流程8.1 …...

CMA认证对象?CMA评审依据,CMA认证好处

CMA认证对象 CMA&#xff08;中国计量认证&#xff0c;China Metrology Accreditation&#xff09;的认证对象主要是第三方检测机构和实验室&#xff0c;包括&#xff1a; 独立检测机构&#xff1a;如环境监测站、产品质量检验所、食品药品检测机构等。 企业内部实验室&#…...

依赖关系-根据依赖关系求候选码

关系模式R&#xff08;U, F&#xff09;, U{}&#xff0c;F是R的函数依赖集&#xff0c;可以将属性分为4类&#xff1a; L: 仅出现在依赖集F左侧的属性 R: 仅出现在依赖集F右侧的属性 LR: 在依赖集F左右侧都出现的属性 NLR: 在依赖集F左右侧都未出现的属性 结论1: 若X是L类…...

解决应用程序在JAR包中运行时无法读取类路径下文件的问题

问题情景 java应用程序在IDE运行正常&#xff0c;打成jar包后执行却发生异常&#xff1a; java.io.FileNotFoundException: class path resource [cert/sync_signer_pri_test.key] cannot be resolved to absolute file path because it does not reside in the file system:…...

第十六届蓝桥杯B组第二题

当时在考场的时候这一道题目 无论我是使用JAVA的大数&#xff08;BIGTHGER&#xff09;还是赛后 使用PY 都是没有运行出来 今天也是突发奇想在B站上面搜一搜 看了才知道这也是需要一定的数学思维 通过转换 设X来把运算式精简化 避免运行超时 下面则是代码 public class lanba…...

龙虎榜——20250509

上证指数今天缩量&#xff0c;整体跌多涨少&#xff0c;走势处于日线短期的高位~ 深证指数今天缩量小级别震荡&#xff0c;大盘股表现更好~ 2025年5月9日龙虎榜行业方向分析 一、核心行业方向 军工航天 • 代表个股&#xff1a;航天南湖、天箭科技、襄阳轴承。 • 驱动逻辑…...

node提示node:events:495 throw er解决方法

前言 之前开发的时候喜欢使用高版本&#xff0c;追求新的东西&#xff0c;然后回头运行一下之前的项目提示如下 项目技术栈&#xff1a;node egg 报错 node:events:495 throw er; // Unhandled error event ^ Error: ENOENT: no such file or directory, scandir F:\my\gi…...

OrangePi Zero 3学习笔记(Android篇)4 - eudev编译(获取libudev.so)

目录 1. Ubuntu中编译 2. NDK环境配置 3. 编译 4. 安装 这部分主要是为了得到libudev&#xff08;因为原来的libudev已经不更新了&#xff09;&#xff0c;eudev的下载地址如下&#xff1a; https://github.com/gentoo/eudev 相应的代码最好是在Ubuntu中先编译通过&#…...

[AI ][Dify] Dify Tool 插件调试流程详解

在使用 Dify 进行插件开发时,调试是必不可少的环节。Dify 提供了远程服务调试的能力,让开发者可以快速验证插件功能和交互逻辑。本文将详细介绍如何配置环境变量进行插件调试,并成功在插件市场中加载调试状态的插件。 一、调试环境配置 在 Dify 的插件调试过程中,我们需要…...

learning ray之ray强化学习/超参调优和数据处理

之前我们掌握了Ray Core的基本编程&#xff0c;我们已经学会了如何使用Ray API。现在&#xff0c;让我们将这些知识应用到一个更实际的场景中——构建一个强化学习项目&#xff0c;并且利用Ray来加速它。 我们的目标是&#xff0c;通过Ray的任务和Actor&#xff0c;将一个简单…...

gpu硬件,gpu驱动,cuda,CUDA Toolkit,cudatoolkit,cudnn,nvcc概念解析

组件角色依赖关系GPU硬件无CUDA编程模型/平台需NVIDIA GPU和驱动CUDA Toolkit开发工具包&#xff08;含NVCC、库等&#xff09;需匹配GPU驱动和CUDA版本cuDNN深度学习加速库需CUDA ToolkitNVCCCUDA代码编译器包含在CUDA Toolkit中 GPU硬件&#xff1a; 硬件层面的图形处理器&…...

【C/C++】范围for循环

&#x1f4d8; C 范围 for 循环详解&#xff08;Range-based for loop&#xff09; 一、什么是范围 for 循环&#xff1f; 范围 for 循环&#xff08;Range-based for loop&#xff09; 是 C11 引入的一种简化容器/数组遍历的方式。它通过自动调用容器的 begin() 和 end() 方法…...

嵌入式开发学习(第二阶段 C语言基础)

C语言&#xff1a;第4天笔记 内容提要 流程控制 C语句数据的输入与输出 流程控制 C语句 定义 C程序是以函数为基础单位的。一个函数的执行部分是由若干条语句构成的。C语言都是用来完成一定操作的任务。C语句必须依赖于函数存在。 C程序结构 C语句分类 1.控制语句 作…...

大物重修之浅显知识点

第一章 质点运动学 例1 知识点公式如下&#xff1a; 例2 例3 例4 例5 例6 第四章 刚体的转动 例1 例2 例3 例4 例5 例6 第五章 简谐振动 例1 例2 例3 第六章 机械波 第八章 热力学基础 第九章 静电场 第十一章 恒定磁场…...

随笔-近况

好久没写了&#xff0c;手都生了。 我写的东西可以分为两类&#xff1a;技术和随笔。当然技术没有我自己创新的&#xff0c;都是些在解决问题过程中查询了很多资料&#xff0c;经过验证后&#xff0c;可以在项目上使用的。但是自从 deepseek 出现后&#xff0c;问题一下子简单…...

赤色世界 陈默传 第一章 另一个陈默

赤色世界 陈默传 第一章 另一个陈默 陈默在一片纯白的空间中缓缓睁开眼睛。没有声音&#xff0c;没有光影的变化&#xff0c;只有无尽的空白包围着他&#xff0c;仿佛整个世界被擦去了所有的色彩和形状。他站在那里&#xff0c;赤脚踩在这片空无一物的地面上&#xff0c;却能感…...

isp流程介绍(yuv格式阶段)

一、前言介绍 前面两章里面&#xff0c;已经分别讲解了在Raw和Rgb域里面&#xff0c;ISP的相关算法流程&#xff0c;从前面文章里面可以看到&#xff0c;在Raw和Rgb域里面&#xff0c;很多ISP算法操作&#xff0c;更像是属于sensor矫正或者说sensor标定操作。本质上来说&#x…...

关于大数据的基础知识(一)——定义特征结构要素

成长路上不孤单&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a;&#x1f60a; 【14后&#x1f60a;///计算机爱好者&#x1f60a;///持续分享所学&#x1f60a;///如有需要欢迎收藏转发///&#x1f60a;】 今日分享关于大数据的基础知识&#xff08;一&a…...

C++(1):整数常量

目录 一、进制表示 二、后缀指定类型 三、示例与注意 四、总结表 一、进制表示 十进制&#xff08;Decimal&#xff09; 格式&#xff1a;无前缀&#xff0c;由数字 0-9 组成。 示例&#xff1a;123、42。 注意&#xff1a;不能以 0 开头&#xff08;否则会被视为八进制&a…...

C# NX二次开发:宏录制实战讲解(第一讲)

今天要讲的是关于NX软件录制宏操作的一些案例。 下面讲如何在NX软件中复制Part体的录制宏。 NXOpen.Session theSession NXOpen.Session.GetSession(); NXOpen.Part workPart theSession.Parts.Work; NXOpen.Part displayPart theSession.Parts.Display; NXOpe…...

C++:书架

【描述】 John最近买了一个书架用来存放奶牛养殖书籍&#xff0c;但书架很快被存满了&#xff0c;只剩最顶层有空余。 John共有N头奶牛(1 ≤ N ≤ 20,000)&#xff0c;每头奶牛有自己的高度Hi(1 ≤ Hi ≤ 10,000)&#xff0c;N头奶牛的总高度为S。书架高度为B(1 ≤ B ≤ S <…...

34.笔记1

今天&#xff0c;我们回顾回顾曾经的知识。 1.二分 还记得当初的二分吗&#xff1f; 1.一开始的二分 就像下面这个故事&#xff1a; 有一只老鼠&#xff0c;躲在10个大瓷瓶后面。你的任务就是抓住这只老鼠&#xff0c;但在抓的过程会导致你选择的大瓷瓶成为分子碎片。 如…...

智慧工会服务平台建设方案Word(23页)

1. 引言 随着信息技术的快速发展&#xff0c;传统工会服务模式面临挑战&#xff0c;智慧工会服务平台应运而生。该平台旨在通过数字化手段&#xff0c;整合工会资源&#xff0c;优化服务流程&#xff0c;提高工作效率&#xff0c;为会员提供更加便捷、高效、个性化的服务体验。…...

常见降维算法分析

一、常见的降维算法 LDA线性判别PCA主成分分析t-sne降维 二、降维算法原理 2.1 LDA 线性判别 原理 &#xff1a;LDA&#xff08;Linear Discriminant Analysis&#xff09;线性判别分析是一种有监督的降维方法。它的目标是找到一个投影方向&#xff0c;使得不同类别的数据在…...

洛谷 P1179【NOIP 2010 普及组】数字统计 —— 逐位计算

题面:P1179 [NOIP 2010 普及组] 数字统计 - 洛谷 一&#xff1a;题目解释&#xff1a; 需要求一区间内数字 2 的出现次数。注意22则记为 2 次&#xff0c;其它没别的... 二&#xff1a;思路、 思想可以考虑动态规划需要计算在每一位上数字 2 的出现次数&#xff0c;然后将这些…...

互联网大厂Java求职面试:基于RAG的智能问答系统设计与实现-1

互联网大厂Java求职面试&#xff1a;基于RAG的智能问答系统设计与实现-1 场景背景 在某互联网大厂的技术面试中&#xff0c;技术总监张总正在面试一位名为郑薪苦的求职者。郑薪苦虽然对技术充满热情&#xff0c;但回答问题时总是带着幽默感&#xff0c;有时甚至让人哭笑不得。…...

学习黑客5 分钟读懂什么是 CVE?

5 分钟读懂什么是 CVE&#xff1f; ⏱️&#x1f510; 目标读者&#xff1a; 安全小白 风格&#xff1a; ***式清晰、循序渐进 篇幅&#xff1a; 5 分钟速读 &#x1f4d6; 目录 &#x1f680; 什么是 CVE&#xff1f;&#x1f914; 为什么要关注 CVE&#xff1f;&#x1f50d…...