当前位置: 首页 > news >正文

Nacos源码—7.Nacos升级gRPC分析四

大纲

5.服务变动时如何通知订阅的客户端

6.微服务实例信息如何同步集群节点

6.微服务实例信息如何同步集群节点

(1)服务端处理服务注册时会发布一个ClientChangedEvent事件

(2)ClientChangedEvent事件的处理源码

(3)集群节点处理数据同步请求的源码

(1)服务端处理服务注册时会发布一个ClientChangedEvent事件

ClientChangedEvent事件的作用就是向集群节点同步服务实例数据的。

//Instance request handler.
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {private final EphemeralClientOperationServiceImpl clientOperationService;public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {this.clientOperationService = clientOperationService;}@Override@Secured(action = ActionTypes.WRITE)public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {//根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {case NamingRemoteConstants.REGISTER_INSTANCE://注册实例return registerInstance(service, request, meta);case NamingRemoteConstants.DE_REGISTER_INSTANCE://注销实例return deregisterInstance(service, request, meta);default:throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));}}private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {//调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;//参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名//参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息//参数meta.getConnectionId():这个参数很关键,它是连接IDclientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}...
}@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {private final ClientManager clientManager;public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {this.clientManager = clientManager;}@Overridepublic void registerInstance(Service service, Instance instance, String clientId) {//从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象Service singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));}//从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//将请求中的instance实例信息封装为InstancePublishInfo对象InstancePublishInfo instanceInfo = getPublishInfo(instance);//往Client对象里添加已注册的服务对象Service,调用的是IpPortBasedClient对象的父类AbstractClient的addServiceInstance()方法client.addServiceInstance(singleton, instanceInfo);//设置IpPortBasedClient对象的lastUpdatedTime属性为最新时间client.setLastUpdatedTime();//发布客户端注册服务实例的事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));//发布服务实例元数据的事件NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}...
}//Nacos naming client based ip and port.
//The client is bind to the ip and port users registered. It's a abstract content to simulate the tcp session client.
public class IpPortBasedClient extends AbstractClient {...@Overridepublic boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));}...
}//Abstract implementation of {@code Client}.
public abstract class AbstractClient implements Client {//publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务//存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象//key为已注册的Service,value是根据请求中的instance实例信息封装的InstancePublishInfo对象protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);//subscribers存放着:订阅者Subscriber(其实可理解为当前客户端)订阅了的Service服务//subscribers的key=stock-service(要订阅的某个服务)、value=order-service(订阅者,某个具体的包含IP的服务实例)protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);...@Overridepublic boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {//服务注册时,如果是第一次put进去Service对象,会返回nullif (null == publishers.put(service, instancePublishInfo)) {//监视器记录MetricsMonitor.incrementInstanceCount();}//发布客户端改变事件,用于处理集群间的数据同步NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());return true;}...
}

(2)ClientChangedEvent事件的处理源码

DistroClientDataProcessor的onEvent()方法会响应ClientChangedEvent。该方法如果判断出事件类型为ClientChangedEvent事件,那么就会执行DistroClientDataProcessor的syncToAllServer()方法,然后调用DistroProtocol的sync()方法进行集群节点同步处理。

DistroProtocol的sync()方法会遍历集群中除自身节点外的其他节点,然后对遍历到的每个节点执行DistroProtocol的syncToTarget()方法。

在DistroProtocol的syncToTarget()方法中,首先把要同步的集群节点targetServer包装成DistroKey对象,然后根据DistroKey对象创建DistroDelayTask延迟任务,接着调用NacosDelayTaskExecuteEngine的addTask()方法,往延迟任务执行引擎的tasks中添加任务。

NacosDelayTaskExecuteEngine在初始化时会启动一个定时任务,这个定时任务会定时执行ProcessRunnable的run()方法。而ProcessRunnable的run()方法会不断从任务池tasks中取出延迟任务处理,处理DistroDelayTask任务时会调用DistroDelayTaskProcessor的process()方法。

在执行DistroDelayTaskProcessor的process()方法时,会先根据DistroDelayTask任务封装一个DistroSyncChangeTask任务,然后调用NacosExecuteTaskExecuteEngine的addTask()方法。也就是调用TaskExecuteWorker的process()方法,将DistroSyncChangeTask任务添加到TaskExecuteWorker的阻塞队列中,同时创建TaskExecuteWorker时会启动线程不断从队列中取出任务处理。因此最终会执行DistroSyncChangeTask的run()方法。

public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {private final ClientManager clientManager;private final DistroProtocol distroProtocol;...@Overridepublic void onEvent(Event event) {...if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();//Only ephemeral data sync by Distro, persist client should sync by raft.//临时实例使用Distro协议,持久化实例使用Raft协议//ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {//如果event是客户端注销实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {//如果event是客户端注册实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}...
}@Component
public class DistroProtocol {private final ServerMemberManager memberManager;private final DistroTaskEngineHolder distroTaskEngineHolder;...//Start to sync by configured delay.public void sync(DistroKey distroKey, DataOperation action) {sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());}//Start to sync data to all remote server.public void sync(DistroKey distroKey, DataOperation action, long delay) {//遍历集群中除自身节点外的其他节点for (Member each : memberManager.allMembersWithoutSelf()) {syncToTarget(distroKey, action, each.getAddress(), delay);}}//Start to sync to target server.public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {//先把要同步的集群节点targetServer包装成DistroKey对象DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer);//然后根据DistroKey对象创建DistroDelayTask任务DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);//接着调用NacosDelayTaskExecuteEngine.addTask()方法//往延迟任务执行引擎DistroDelayTaskExecuteEngine中添加延迟任务DistroDelayTaskdistroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);}}...
}//延迟任务执行引擎
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {private final ScheduledExecutorService processingExecutor;protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任务池public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));//开启定时任务,即启动ProcessRunnable线程任务processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}...@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {lock.lock();try {AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {newTask.merge(existTask);}//最后放入到任务池中tasks.put(key, newTask);} finally {lock.unlock();}}protected void processTasks() {//获取tasks中所有的任务,然后进行遍历Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {//通过任务key,获取具体的任务,并且从任务池中移除掉AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}//通过任务key获取对应的NacosTaskProcessor延迟任务处理器NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {//ReAdd task if process failed//调用获取到的NacosTaskProcessor延迟任务处理器的process()方法if (!processor.process(task)) {//如果失败了,会重试添加task回tasks这个map中retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error ", e);retryFailedTask(taskKey, task);}}}...private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}
}//Distro delay task processor.
public class DistroDelayTaskProcessor implements NacosTaskProcessor {...@Overridepublic boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();switch (distroDelayTask.getAction()) {case DELETE://处理客户端注销实例时的延迟任务(同步数据到集群节点)//根据DistroDelayTask任务封装一个DistroSyncTask任务DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);//调用NacosExecuteTaskExecuteEngine.addTask()方法distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);return true;case CHANGE:case ADD://处理客户端注册实例时的延迟任务(同步数据到集群节点)//根据DistroDelayTask任务封装一个DistroSyncChangeTask任务DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);//调用NacosExecuteTaskExecuteEngine.addTask()方法distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;default:return false;}}
}//任务执行引擎
public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {private final TaskExecuteWorker[] executeWorkers;public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {super(logger);executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];for (int mod = 0; mod < dispatchWorkerCount; ++mod) {executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());}}...@Overridepublic void addTask(Object tag, AbstractExecuteTask task) {//根据tag获取到TaskExecuteWorkerNacosTaskProcessor processor = getProcessor(tag);if (null != processor) {processor.process(task);return;}TaskExecuteWorker worker = getWorker(tag);//调用TaskExecuteWorker.process()方法把AbstractExecuteTask任务放入到队列当中去worker.process(task);}private TaskExecuteWorker getWorker(Object tag) {int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();return executeWorkers[idx];}    ...
}public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {private final BlockingQueue<Runnable> queue;//任务存储容器public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {...this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);new InnerWorker(name).start();}@Overridepublic boolean process(NacosTask task) {if (task instanceof AbstractExecuteTask) {//把NacosTask任务放入到阻塞队列中putTask((Runnable) task);}return true;}private void putTask(Runnable task) {try {//把NacosTask任务放入到阻塞队列中queue.put(task);} catch (InterruptedException ire) {log.error(ire.toString(), ire);}}...private class InnerWorker extends Thread {InnerWorker(String name) {setDaemon(false);setName(name);}@Overridepublic void run() {while (!closed.get()) {try {//一直取阻塞队列中的任务Runnable task = queue.take();long begin = System.currentTimeMillis();//调用NacosTask中的run方法task.run();long duration = System.currentTimeMillis() - begin;if (duration > 1000L) {log.warn("task {} takes {}ms", task, duration);}} catch (Throwable e) {log.error("[TASK-FAILED] " + e.toString(), e);}}}}
}

执行DistroSyncChangeTask的run()方法,其实就是执行AbstractDistroExecuteTask的run()方法。AbstractDistroExecuteTask的run()方法会先获取请求数据,然后调用DistroClientTransportAgent的syncData()方法同步集群节点,也就是调用ClusterRpcClientProxy的sendRequest()方法发送数据同步请求,最终会调用RpcClient的request()方法 -> GrpcConnection的request()方法。

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {...@Overrideprotected void doExecuteWithCallback(DistroCallback callback) {String type = getDistroKey().getResourceType();//获取请求数据DistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return;}//默认调用DistroClientTransportAgent.syncData()方法同步集群节点getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);}private DistroData getDistroData(String type) {DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());if (null != result) {result.setType(OPERATION);}return result;}...
}public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask {...@Overridepublic void run() {//Nacos:Naming:v2:ClientDataString type = getDistroKey().getResourceType();//获取DistroClientTransportAgent对象DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);if (null == transportAgent) {Loggers.DISTRO.warn("No found transport agent for type [{}]", type);return;}Loggers.DISTRO.info("[DISTRO-START] {}", toString());//默认返回trueif (transportAgent.supportCallbackTransport()) {//默认执行子类的doExecuteWithCallback()方法doExecuteWithCallback(new DistroExecuteCallback());} else {executeDistroTask();}}protected abstract void doExecuteWithCallback(DistroCallback callback);...
}public class DistroClientTransportAgent implements DistroTransportAgent {private final ClusterRpcClientProxy clusterRpcClientProxy;private final ServerMemberManager memberManager;...@Overridepublic boolean syncData(DistroData data, String targetServer) {if (isNoExistTarget(targetServer)) {return true;}//创建请求对象DistroDataRequest request = new DistroDataRequest(data, data.getType());//找到集群节点Member member = memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);return false;}try {//向集群节点发送RPC异步请求Response response = clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);}return false;}...
}@Service
public class ClusterRpcClientProxy extends MemberChangeListener {...//send request to member.public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException {RpcClient client = RpcClientFactory.getClient(memberClientKey(member));if (client != null) {//调用RpcClient.request()方法return client.request(request, timeoutMills);} else {throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);}}...
}public abstract class RpcClient implements Closeable {//在NamingGrpcClientProxy初始化 -> 调用RpcClient.start()方法时,//会将GrpcClient.connectToServer()方法的返回值赋值给currentConnection属性protected volatile Connection currentConnection;...//send request.public Response request(Request request, long timeoutMills) throws NacosException {int retryTimes = 0;Response response;Exception exceptionThrow = null;long start = System.currentTimeMillis();while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {...//发起gRPC请求,调用GrpcConnection.request()方法response = this.currentConnection.request(request, timeoutMills);...}...}...
}

(3)集群节点处理数据同步请求的源码

通过DistroClientTransportAgent的syncData()方法发送的数据同步请求,会被DistroDataRequestHandler的handle()方法处理。然后会调用DistroDataRequestHandler的handleSyncData()方法,接着调用DistroProtocol的onReceive()方法,于是最终会调用到DistroClientDataProcessor.processData()方法。

在执行DistroClientDataProcessor的processData()方法时,如果是同步服务实例新增、修改后的数据,则执行DistroClientDataProcessor的handlerClientSyncData()方法。该方法会和处理服务注册时一样,发布一个客户端注册服务实例的事件。如果是同步服务实例删除后的数据,则调用EphemeralIpPortClientManager的clientDisconnected()方法。首先移除客户端对象信息,然后发布一个客户端注销服务实例的事件。

其中客户端注销服务实例的事件ClientDisconnectEvent,首先会被ClientServiceIndexesManager的onEvent()方法进行处理,处理时会调用ClientServiceIndexesManager的handleClientDisconnect()方法,移除ClientServiceIndexesManager订阅者列表的元素和注册表的元素。然后会被DistroClientDataProcessor的onEvent()方法进行处理,进行集群节点之间的数据同步。

@Component
public class DistroDataRequestHandler extends RequestHandler<DistroDataRequest, DistroDataResponse> {private final DistroProtocol distroProtocol;...@Overridepublic DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE://服务实例新增、修改、删除的同步,都会由DistroDataRequestHandler.handleSyncData()方法处理return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}}private DistroDataResponse handleSyncData(DistroData distroData) {DistroDataResponse result = new DistroDataResponse();//调用DistroProtocol.onReceive()方法if (!distroProtocol.onReceive(distroData)) {result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("[DISTRO-FAILED] distro data handle failed");}return result;}...
}@Component
public class DistroProtocol {...//Receive synced distro data, find processor to process.public boolean onReceive(DistroData distroData) {Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());//Nacos:Naming:v2:ClientDataString resourceType = distroData.getDistroKey().getResourceType();//获取DistroClientDataProcessor处理对象DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);return false;}//调用DistroClientDataProcessor.processData()方法return dataProcessor.processData(distroData);}...
}public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {...@Overridepublic boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE://服务实例添加和改变时的执行逻辑ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);handlerClientSyncData(clientSyncData);return true;case DELETE://服务实例删除时的执行逻辑String deleteClientId = distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);//调用EphemeralIpPortClientManager.clientDisconnected()方法clientManager.clientDisconnected(deleteClientId);return true;default:return false;}}private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());Client client = clientManager.getClient(clientSyncData.getClientId());upgradeClient(client, clientSyncData);}private void upgradeClient(Client client, ClientSyncData clientSyncData) {List<String> namespaces = clientSyncData.getNamespaces();List<String> groupNames = clientSyncData.getGroupNames();List<String> serviceNames = clientSyncData.getServiceNames();List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();Set<Service> syncedService = new HashSet<>();for (int i = 0; i < namespaces.size(); i++) {Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton = ServiceManager.getInstance().getSingleton(service);syncedService.add(singleton);InstancePublishInfo instancePublishInfo = instances.get(i);//如果和当前不一样才发布事件if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {client.addServiceInstance(singleton, instancePublishInfo);//发布客户端注册服务实例的事件,与客户端进行服务注册时一样NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {client.removeServiceInstance(each);NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}}}...
}@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {//key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();...@Overridepublic boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);//移除客户端信息IpPortBasedClient client = clients.remove(clientId);if (null == client) {return true;}//发布客户端注销服务实例的事件NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));client.release();return true;}...
}@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();//订阅者列表(服务消费者),一个Service服务对象,对应多个订阅者的clientIdprivate final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();...@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {Client client = event.getClient();for (Service each : client.getAllSubscribeService()) {//移除订阅者列表的元素removeSubscriberIndexes(each, client.getClientId());}for (Service each : client.getAllPublishedService()) {//移除注册表的元素removePublisherIndexes(each, client.getClientId());}}...
}public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {...@Overridepublic void onEvent(Event event) {...if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();//Only ephemeral data sync by Distro, persist client should sync by raft.//临时实例使用Distro协议,持久化实例使用Raft协议//ClientManager.isResponsibleClient()方法,判断只有该client的责任节点才能进行集群数据同步if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {//如果event是客户端注销实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {//如果event是客户端注册实例时需要进行集群节点同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}...
}

(4)总结

一.执行引擎的总结

延时任务执行引擎的实现原理是引擎有一个Map类型的tasks任务池,这个任务池可以根据key映射对应的任务处理器。引擎会定时从任务池中获取任务,执行任务处理器的处理方法处理任务。

任务执行引擎的实现原理是会创建多个任务执行Worker,每个任务执行Worker都会有一个阻塞队列。向任务执行引擎添加任务时会将任务添加到其中一个Woker的阻塞队列中,Worker在初始化时就会启动一个线程不断取出阻塞队列中的任务来处理。所以任务执行引擎会通过阻塞队列 + 异步任务的方式来实现。

二.用于向集群节点同步数据的客户端改变事件的处理流程总结

步骤一:先创建DistroDelayTask延迟任务放入到延迟任务执行引擎的任务池,DistroDelayTask延迟任务会由DistroDelayTaskProcessor处理器处理。

步骤二:DistroDelayTaskProcessor处理时会创建DistroSyncChangeTask任务,然后再将任务分发添加到执行引擎中的任务执行Worker的阻塞队列中。

步骤三:任务执行Worker会从队列中获取并执行DistroSyncChangeTask任务,也就是执行引擎会触发调用AbstractDistroExecuteTask的run()方法,从而调用DistroSyncChangeTask的doExecuteWithCallback()方法。

步骤四:doExecuteWithCallback()方法会获取最新的微服务实例列表,然后通过DistroClientTransportAgent的syncData()方法发送数据同步请求。

相关文章:

Nacos源码—7.Nacos升级gRPC分析四

大纲 5.服务变动时如何通知订阅的客户端 6.微服务实例信息如何同步集群节点 6.微服务实例信息如何同步集群节点 (1)服务端处理服务注册时会发布一个ClientChangedEvent事件 (2)ClientChangedEvent事件的处理源码 (3)集群节点处理数据同步请求的源码 (1)服务端处理服务注册…...

Linux 学习笔记2

Linux 学习笔记2 一、定时任务调度操作流程注意事项 二、磁盘分区与管理添加新硬盘流程磁盘管理命令 三、进程管理进程操作命令服务管理&#xff08;Ubuntu&#xff09; 四、注意事项 一、定时任务调度 操作流程 创建脚本 vim /path/to/script.sh # 编写脚本内容设置可执行权…...

一、每日Github软件分享----QuickGo外链直达工具​

QuickGo 是一款专注于提升网页浏览效率的浏览器扩展工具&#xff0c;其核心功能是自动绕过网站的安全跳转限制&#xff0c;让用户点击外链时无需手动确认&#xff0c;直接跳转至目标页面。以下是详细功能介绍与分析&#xff1a; 一、核心功能与亮点 极速跳转 通过优化浏览器 AP…...

【软件测试】软件缺陷(Bug)的详细描述

目录 一、软件缺陷(Bug) 1.1 缺陷的判定标准 1.2 缺陷的生命周期 1.3 软件缺陷的描述 1.3.1 提交缺陷的要素 1.3.2 Bug 的级别 1.4 如何发现更多的 Bug? 1.5 缺陷的有效管理 1.5.1 缺陷的编写 1.5.2 缺陷管理工具 1.5.2.1 缺陷管理 1.5.2.2 用例管理 一、软件缺陷…...

C++ stl中的list的相关函数用法

文章目录 list的介绍list的使用定义方式 插入和删除迭代器的使用获取元素容器中元素个数和容量的控制其它操作函数 list的使用&#xff0c;首先要包含头文件 #include <list>list的介绍 1.list是一种可以在常数范围内在链表中的任意位置进行插入和删除的序列式容器&…...

Cmd命令大全,从入门到放弃

1、文件和目录操作 dir /p:分页显示目录内容。 dir /w:以单行显示目录内容。 dir /s:显示指定目录及子目录下的所有文件。 dir /b:仅显示文件和目录名称。 dir /a:显示具有特定属性的文件和目录。 cd /d:改变当前驱动器。 pushd:将当前目录压入堆栈,并切换到指定…...

数据同步选择推Push还是拉Pull

数据同步选择“推”&#xff08;Push&#xff09;还是“拉”&#xff08;Pull”&#xff0c;要根据实际场景、系统架构和对实时性、资源消耗、安全性的需求来决定。下面是两种方式的对比分析&#xff0c;帮你更好地判断&#xff1a; 文章目录 推模式&#xff08;Push&#xff…...

Kafka集群加入新Broker节点会发生什么

Kafka集群加入新Broker节点会发生什么 当向现有的Kafka集群添加新的Broker节点时&#xff0c;会触发一系列自动和手动的过程。以下是详细的流程和影响&#xff1a; 自动发生的流程 集群发现与注册 新Broker启动时会向ZooKeeper注册自己加入集群的/brokers/ids路径下其他Broke…...

【LeetCode Solutions】LeetCode 176 ~ 180 题解

CONTENTS LeetCode 176. 第二高的薪水&#xff08;SQL 中等&#xff09;LeetCode 177. 第 N 高的薪水&#xff08;SQL 中等&#xff09;LeetCode 178. 分数排名&#xff08;SQL 中等&#xff09;LeetCode 179. 最大数&#xff08;中等&#xff09;LeetCode 180. 连续出现的数字…...

Sourcetree安装使用的详细教程

Sourcetree 是由 Atlassian 推出的 免费 Git 图形化客户端&#xff0c;支持 Git 和 Mercurial 仓库管理&#xff0c;适用于 Windows 和 macOS。 一、安装教程 1. 下载 官网地址&#xff1a;Sourcetree | Free Git GUI for Mac and Windows 选择你的平台下载安装包&#xff0…...

对比学习入门

Yann Lecun在NIPS 2016上提出了著名的“蛋糕比喻”&#xff1a;如果智能是一个蛋糕&#xff0c;蛋糕上的大部分是无监督学习&#xff08;unsupervised learning&#xff09;&#xff0c;蛋糕上的糖霜是监督学习&#xff08;supervised learning&#xff09;&#xff0c;而蛋糕上…...

Starrocks 的 ShortCircuit短路径

背景 本文基于 Starrocks 3.3.5 本文主要来探索一下Starrocks在FE端怎么实现 短路径&#xff0c;从而加速点查查询速度。 在用户层级需要设置 enable_short_circuit 为true 分析 数据流&#xff1a; 直接到StatementPlanner.createQueryPlan方法&#xff1a; ... OptExpres…...

Windows远程访问Ubuntu的方法

要在Windows上远程访问Ubuntu系统&#xff0c;以下是几种常见的方法&#xff1a; SSH (Secure Shell):通过Windows命令行远程连接到Ubuntu。 在Ubuntu上&#xff0c;确保安装并启用了SSH服务&#xff08;通常可以通过sudo apt update && sudo apt install openssh-serv…...

23种设计模式-行为型模式之模板方法模式(Java版本)

Java 模板方法模式&#xff08;Template Method Pattern&#xff09;详解 &#x1f9e0; 什么是模板方法模式&#xff1f; 模板方法模式是一种行为型设计模式&#xff0c;定义了一个操作中的算法骨架&#xff0c;将一些步骤的实现延迟到子类中。通过模板方法模式&#xff0c;…...

(undone) MIT6.S081 Lec17 VM for APP 学习笔记

url: https://mit-public-courses-cn-translatio.gitbook.io/mit6-s081/lec17-virtual-memory-for-applications-frans/17.1-ying-yong-cheng-xu-shi-yong-xu-ni-nei-cun-suo-xu-yao-de-te-xing 17.1 应用程序使用虚拟内存所需要的特性 今天的话题是用户应用程序使用的虚拟内存…...

值拷贝、浅拷贝和深拷贝

✅ 一、基本概念 1. 值拷贝&#xff08;Value Copy&#xff09; 含义&#xff1a;将一个变量的值完整复制到另一个变量中。 对象级别表现&#xff1a;调用的是拷贝构造函数&#xff08;copy constructor&#xff09;。 特点&#xff1a;对基本类型或不含动态资源的对象&…...

JWT原理及工作流程详解

JSON Web Token&#xff08;JWT&#xff09;是一种开放标准&#xff08;RFC 7519&#xff09;&#xff0c;用于在各方之间安全传输信息。其核心原理是通过结构化、签名或加密的JSON对象实现无状态身份验证和授权。以下是JWT的工作原理和关键组成部分&#xff1a; 1. JWT结构 J…...

广西某建筑用花岗岩矿自动化监测

1. 项目简介 某矿业有限公司成立于2021年&#xff0c;是由某建筑材料有限公司与个人共同出资成立&#xff0c;矿区面积0.4069平方公里&#xff0c;可开采筑用花岗岩、建筑用砂岩。建筑用花岗岩、建筑用砂岩可利用资源量分别为6338.69万吨、303.39万吨&#xff0c;设计生产规模…...

100个思维模型系列更新完毕!

giszz的粉丝们&#xff0c;到今天为止&#xff0c;思维模型专栏已经更新了100期&#xff0c;正式告一段落了。 以下是为你列出的100个思维模型名字&#xff08;部分思维模型可能存在多种表述方式&#xff0c;但核心概念一致&#xff0c;这里尽量涵盖不同领域常见的思维模型&am…...

Bitcoin跨链协议Clementine的技术解析:重构DeFi生态的信任边界

2025年5月2日&#xff0c;比特币Rollup项目Citrea在测试网正式推出跨链协议Clementine&#xff0c;其基于BitVM2编程语言构建的信任最小化桥接技术&#xff0c;被视为解决比特币与DeFi生态融合难题的关键突破。本文从技术背景、核心机制、安全模型、应用场景四大维度&#xff0…...

北斗导航 | RTKLib中重难点技术,公式,代码

Rtklib 一、抗差自适应卡尔曼滤波1. **核心难点**2. **公式与代码实现**二、模糊度固定与LAMBDA算法1. **核心难点**2. **LAMBDA算法实现**3. **部分模糊度固定技术**三、伪距单点定位与误差修正1. **多系统多频点修正**2. **接收机钟差与系统间偏差**四、动态模型与周跳处理1.…...

C++学习之类和对象_1

1. 面向过程与面向对象 C语言是面向过程的&#xff0c;注重过程&#xff0c;通过调用函数解决问题。 比如做番茄炒蛋&#xff1a;买番茄和鸡蛋->洗番茄和打鸡蛋->先炒蛋->把蛋放碟子上->炒番茄->再把蛋倒回锅里->加调料->出锅 而C是面向对象的&#xff…...

Oracle OCP认证考试考点详解083系列14

题记&#xff1a; 本系列主要讲解Oracle OCP认证考试考点&#xff08;题目&#xff09;&#xff0c;适用于19C/21C,跟着学OCP考试必过。 66. 第66题&#xff1a; 题目 解析及答案&#xff1a; 当一个非常大的数据文件被划分为四个部分进行 RMAN 多区段备份时&#xff0c;以下…...

华为欧拉(EulerOS)系统全栈软件部署指南:从 Redis 到 MySQL 实战详解

前言 在国产化操作系统蓬勃发展的背景下&#xff0c;华为欧拉&#xff08;EulerOS&#xff09;凭借其稳定性与安全性&#xff0c;成为企业级服务器部署的重要选择。本文基于官方技术文档与最佳实践&#xff0c;详细梳理 Redis 集群、RabbitMQ、JDK、Tomcat 及 MySQL 在欧拉系统…...

YOLO使用CableInspect-AD数据集实现输电线路缺陷检测

输电线路缺陷检测是一个关键的任务&#xff0c;旨在确保电力传输系统的可靠性和安全性。今天我们使用CableInspect-AD数据集进行训练完成缺陷检测&#xff0c;下载的数据集格式如下&#xff1a; 我们需要经过以下步骤&#xff1a; COCO转YOLO 首先是将COCO格式的数据转换为YO…...

全国青少年信息素养大赛 Python编程挑战赛初赛 内部集训模拟试卷五及详细答案解析

博主推荐 所有考级比赛学习相关资料合集【推荐收藏】1、Python比赛 信息素养大赛Python编程挑战赛 蓝桥杯python选拔赛真题详解...

三个线程 a、b、c 并发运行,b,c 需要 a 线程的数据如何解决

说明&#xff1a; 开发中经常会碰到线程并发&#xff0c;但是后续线程需要等待第一个线程执行完返回结果后&#xff0c;才能再执行后面线程。 如何处理呢&#xff0c;今天就介绍两种方法 1、使用Java自有的API即CountDownLatch&#xff0c;进行实现 思考&#xff1a;CountDown…...

python实现点餐系统

使用python实现点餐系统的增加菜品及价格&#xff0c;删除菜品&#xff0c;查询菜单&#xff0c;点菜以及会员折扣价等功能。 代码&#xff1a; 下面展示一些 内联代码片。 # coding utf-8menu {拍黄瓜: 6, 小炒肉: 28, 西红柿炒蛋: 18, 烤鱼: 30, 红烧肉: 38, 手撕鸡: 45,…...

力扣26——删除有序数组中的重复项

目录 1.题目描述&#xff1a; 2.算法分析&#xff1a; 3.代码展示&#xff1a; 1.题目描述&#xff1a; 给你一个 非严格递增排列 的数组 nums &#xff0c;请你 原地 删除重复出现的元素&#xff0c;使每个元素 只出现一次 &#xff0c;返回删除后数组的新长度。元素的 相对…...

A2A与MCP定义下,User,Agent,api(tool)间的交互流程图

官方图&#xff1a; 流程图&#xff1a; #mermaid-svg-2smjE8VYydjtLH0p {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-2smjE8VYydjtLH0p .error-icon{fill:#552222;}#mermaid-svg-2smjE8VYydjtLH0p .error-tex…...

【大模型面试每日一题】Day 13:数据并行与模型并行的区别是什么?ZeRO优化器如何结合二者?

【大模型面试每日一题】Day 13&#xff1a;数据并行与模型并行的区别是什么&#xff1f;ZeRO优化器如何结合二者&#xff1f; &#x1f4cc; 题目重现 &#x1f31f;&#x1f31f; 面试官&#xff1a;数据并行与模型并行的区别是什么&#xff1f;ZeRO优化器如何结合二者&…...

各种注解含义及使用

RestController RestController注解 是Spring 4.0引入的一个组合注解&#xff0c;用于简化RESTful Web服务的开发。 RestController注解 相当于 Controller 和 ResponseBody 注解的组合&#xff0c;表示该类是一个控制器&#xff0c;并且所有的方法返回值都将直接写入HTTP响应体…...

amass:深入攻击面映射和资产发现工具!全参数详细教程!Kali Linux教程!

简介 OWASP Amass 项目使用开源信息收集和主动侦察技术执行攻击面网络映射和外部资产发现。 此软件包包含一个工具&#xff0c;可帮助信息安全专业人员使用开源信息收集和主动侦察技术执行攻击面网络映射并执行外部资产发现。 使用的信息收集技术 技术数据来源APIs&#xf…...

xxl-job简单入门使用教程

1、从git中拉取xxl-job代码 https://gitee.com/xuxueli0323/xxl-job HTTPS&#xff1a;git clone https://gitee.com/xuxueli0323/xxl-job.git SSH&#xff1a;git clone gitgitee.com:xuxueli0323/xxl-job.git 拉取本地后&#xff0c;使用Idea打开项目&#xff0c;当前使用…...

设置GO程序在离线情况下读取本地缓存的模块

在 Go 中&#xff0c;GOPROXY 环境变量用于指定模块代理服务器的地址。如果你想让 GOPROXY 读取本地的模块&#xff0c;可以通过以下几种方式实现&#xff1a; 1. 使用本地代理服务器 你可以搭建一个本地的 Go 模块代理服务器&#xff0c;将需要的模块代码推送到代理服务器中…...

Python 爬虫基础入门教程(超详细)

一、什么是爬虫&#xff1f; 网络爬虫&#xff08;Web Crawler&#xff09;&#xff0c;又称网页蜘蛛&#xff0c;是一种自动抓取互联网信息的程序。爬虫会模拟人的浏览行为&#xff0c;向网站发送请求&#xff0c;然后获取网页内容并提取有用的数据。 二、Python爬虫的基本原…...

C语言实现三子棋

目录 1.通过模块化来实现三子棋 2.三子棋代码基本逻辑 3.代码的书写 第一步&#xff1a; 第二步&#xff1a; 第三步&#xff1a; 第四步&#xff1a; gane.h gane.c main.c 第五步&#xff1a; game.h game.c main.c 第六步&#xff1a; game.h game.c main…...

SCDN是什么?

SCDN是安全内容分发网络的简称&#xff0c;它在传统内容分发网络&#xff08;CDN&#xff09;的基础上&#xff0c;集成了安全防护能力&#xff0c;旨在同时提升内容传输速度和网络安全性。 SCDN的核心功能有&#xff1a; DDoS防御&#xff1a;识别并抵御大规模分布式拒绝服务…...

Transformer编码器+SHAP分析,模型可解释创新表达!

目录 效果一览基本介绍程序设计参考资料 效果一览 基本介绍 基本介绍 基于SHAP分析的特征选择和贡献度计算&#xff0c;Matlab2023b代码实现&#xff1b;基于MATLAB的SHAP可解释Transformer编码器回归模型&#xff0c;敏感性分析方法。 详细介绍 引言 在正向渗透&#xff08…...

机器学习第三讲:监督学习 → 带答案的学习册,如预测房价时需要历史价格数据

机器学习第三讲&#xff1a;监督学习 → 带答案的学习册&#xff0c;如预测房价时需要历史价格数据 资料取自《零基础学机器学习》。 查看总目录&#xff1a;学习大纲 关于DeepSeek本地部署指南可以看下我之前写的文章&#xff1a;DeepSeek R1本地与线上满血版部署&#xff1…...

【TACD模拟】质子辐照对GaN器件临界电压增加的影响机制

2013 年,佛罗里达大学的 Erin Patrick 等人基于 TRIM 和 FLOODS 模型,研究了质子辐照对 AlGaN/GaN HEMTs 的影响。研究背景方面,AlGaN/GaN HEMTs 因其宽禁带、高击穿场、高电子迁移率以及优秀的热管理能力,在商业和军事领域应用前景广阔,但长期可靠性仍是问题,尤其是栅极…...

VBA高级应用30例应用4:利用屏蔽事件来阻止自动运行事件

《VBA高级应用30例》&#xff08;版权10178985&#xff09;&#xff0c;是我推出的第十套教程&#xff0c;教程是专门针对高级学员在学习VBA过程中提高路途上的案例展开&#xff0c;这套教程案例与理论结合&#xff0c;紧贴“实战”&#xff0c;并做“战术总结”&#xff0c;以…...

基于SSM实现的健身房系统功能实现八

一、前言介绍&#xff1a; 1.1 项目摘要 随着社会的快速发展和人们健康意识的不断提升&#xff0c;健身行业也在迅速扩展。越来越多的人加入到健身行列&#xff0c;健身房的数量也在不断增加。这种趋势使得健身房的管理变得越来越复杂&#xff0c;传统的手工或部分自动化的管…...

Java设计模式之原型模式详解:从入门到精通

1. 原型模式概述 1.1 定义与核心概念 **原型模式(Prototype Pattern)**是一种创建型设计模式,它通过复制现有对象来创建新对象,而不是通过new关键字实例化。这种模式特别适用于创建成本较高的对象。 专业术语解释表: 术语解释原型(Prototype)被复制的原始对象克隆(Clone)…...

鞋样设计软件

Sxy 64鞋样设计软件是一款专业级鞋类设计工具 专为鞋业设计师与制鞋企业开发 该软件提供全面的鞋样设计功能 包括二维开版 三维建模 放码排料等核心模块 支持从草图构思到成品输出的完整设计流程 内置丰富的鞋型数据库与部件库 可快速生成各种鞋款模板 软件采用智能放码技术 精…...

移动IP与手机移动数据流量的概念、原理、区别与联系

移动IP与手机移动数据流量的概念、原理、区别与联系 文章目录 移动IP与手机移动数据流量的概念、原理、区别与联系一、概念与原理二、核心区别三、联系与协作四、技术挑战与发展趋势五、总结 一、概念与原理 移动IP 定义&#xff1a;移动IP是一种网络层协议&#xff0c;允许设…...

Spring中除DI之外获取 BEAN 的方式​

前言 在 Spring 框架的开发实践中&#xff0c;获取 Bean 是极为重要的基础操作。我们熟知通过依赖注入能便捷地获取 Bean&#xff0c;不过在许多场景下&#xff0c;还需要借助其他方式来获取 Bean。本文将深入探讨在 Spring 中除了依赖注入之外获取 Bean 的多种方式&#xff0c…...

屎上雕花系列-2nd

以下为“屎上雕花”的尝试2nd 使用Deepseek扩容而来&#xff0c;我竟然没有找到明显的错误&#xff0c;太强大了&#xff0c;工作改变生活了 LeCroy 以太网与 SAN 网络测试解决方案 硬件平台一&#xff1a;Xena 以太网流量生成器 Xena 以太网流量生成器是一款高性能的网络测…...

第十五章,SSL VPN

前言 IPSec 和 SSL 对比 IPSec远程接入场景---client提前安装软件&#xff0c;存在一定的兼容性问题 IPSec协议只能够对感兴趣的流量进行加密保护&#xff0c;意味着接入用户需要不停的调整策略&#xff0c;来适应IPSec隧道 IPSec协议对用户访问权限颗粒度划分的不够详细&…...

第四天 从CAN总线到Spark/Flink实时处理

前言 在智能网联汽车快速发展的今天&#xff0c;每辆汽车每天产生的数据量高达数GB。这些数据蕴藏着驾驶行为、车辆健康、道路状况等宝贵信息。本文将带您从零开始&#xff0c;系统学习车辆数据采集与分析的全流程技术体系&#xff0c;包含&#xff1a; CAN总线数据解析与采集…...