当前位置: 首页 > news >正文

Flink源码解析之:如何根据JobGraph生成ExecutionGraph

Flink源码解析之:如何根据JobGraph生成ExecutionGraph

在上一篇Flink源码解析中,我们介绍了Flink如何根据StreamGraph生成JobGraph的流程,并着重分析了其算子链的合并过程和JobGraph的构造流程。

对于StreamGraph和JobGraph的生成来说,其都是在客户端生成的,本文将会讲述JobGraph到ExecutionGraph的生成过程,而这一过程会在Flink JobManager的服务端来完成。当JobGraph从客户端提交到JobManager后,JobManager会根据JobGraph生成对应的ExecutionGraph,而ExecutionGraph就是Flink作业调度时使用的核心数据结构。 本篇将会详细介绍JobGraph转换为ExecutionGraph的流程。

主体流程梳理

Flink在将JobGraph转换成ExecutionGraph后,便可以开始执行真正的任务。这一转换流程主要在Flink源码中的DefaultExecutionGraphBuilder类中的buildGraph方法中实现的。在转换过程中,涉及到了一些新的基本概念,先来简单介绍一下这些概念,对于理解ExecutionGraph有较大的帮助:

  • ExecutionJobVertex: 在ExecutionGraph中表示执行顶点,与JobGraph中的JobVertex一一对应。实际上,每个ExecutionJobVertex也是依赖JobVertex来创建的。
  • ExecutionVertex: 在ExecutionJobVertex类中创建,每个并发度都对应了一个ExecutionVertex对象,每个ExecutionVertex都代表JobVertex在某个特定并行子任务中的执行。在实际执行时,每个ExecutionVertex实际上就是一个Task,是ExecutionJobVertex并行执行的一个子任务。
  • Execution: Execution表示ExecutionVertex的一次执行。由于ExecutionVertex可以被执行多次(用于恢复、重新计算、重新分配),这个类用于跟踪该ExecutionVertex的单个执行状态和资源。
  • IntermediateResult: 在JobGraph中用IntermediateDataSet表示上游JobVertex的输出数据流,而在ExecutionGraph中,则用IntermediateResult来表示ExecutionJobVertex的输出数据流。
  • IntermediateResultPartition:这是IntermediateResult的一部分或一个分片。由于有多个并行任务(ExecutionVertex)执行相同的操作,每个任务都会产生一部分IntermediateResult。这些结果在物理存储和计算过程中,可能会被进一步划分成多个分区,每个分区对应一个 IntermediateResultPartition对象。

从上面的基本概念也可以看出,在ExecutionGraph中:

  • 相比StreamGraph和JobGraph,ExecutionGraph是实际根据任务并行度来生成拓扑结构的,在ExecutionGraph中,每个并行子任务都对应一个ExecutionVertex顶点和IntermediateResultPartition输出数据流分区。
  • 在ExecutionGraph中,上下游节点之间的连接是通过ExecutionVertex -> IntermediateResultPartition -> ExecutionVertex 对象来完成的。

整体的执行流程图如下所示:
在这里插入图片描述

入口方法:DefaultExecutionGraphBuilder.buildGraph

ExecutionGraph的生成是在DefaultExecutionGraphBuilder类的buildGraph方法中实现的:

public class DefaultExecutionGraphBuilder {public static DefaultExecutionGraph buildGraph(JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,ClassLoader classLoader,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,Time rpcTimeout,MetricGroup metrics,BlobWriter blobWriter,Logger log,ShuffleMaster<?> shuffleMaster,JobMasterPartitionTracker partitionTracker,TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,ExecutionDeploymentListener executionDeploymentListener,ExecutionStateUpdateListener executionStateUpdateListener,long initializationTimestamp,VertexAttemptNumberStore vertexAttemptNumberStore,VertexParallelismStore vertexParallelismStore)throws JobExecutionException, JobException {checkNotNull(jobGraph, "job graph cannot be null");final String jobName = jobGraph.getName();final JobID jobId = jobGraph.getJobID();// 创建JobInformationfinal JobInformation jobInformation =new JobInformation(jobId,jobName,jobGraph.getSerializedExecutionConfig(),jobGraph.getJobConfiguration(),jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths());// Execution 保留的最大历史数final int maxPriorAttemptsHistoryLength =jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);// IntermediateResultPartitions的释放策略final PartitionGroupReleaseStrategy.Factory partitionGroupReleaseStrategyFactory =PartitionGroupReleaseStrategyFactoryLoader.loadPartitionGroupReleaseStrategyFactory(jobManagerConfig);// create a new execution graph, if none exists so farfinal DefaultExecutionGraph executionGraph;try {// 创建默认的ExecutionGraph执行图对象,最后会返回该创建好的执行图对象executionGraph =new DefaultExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,maxPriorAttemptsHistoryLength,classLoader,blobWriter,partitionGroupReleaseStrategyFactory,shuffleMaster,partitionTracker,partitionLocationConstraint,executionDeploymentListener,executionStateUpdateListener,initializationTimestamp,vertexAttemptNumberStore,vertexParallelismStore);} catch (IOException e) {throw new JobException("Could not create the ExecutionGraph.", e);}// set the basic propertiestry {executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));} catch (Throwable t) {log.warn("Cannot create JSON plan for job", t);// give the graph an empty planexecutionGraph.setJsonPlan("{}");}// initialize the vertices that have a master initialization hook// file output formats create directories here, input formats create splitsfinal long initMasterStart = System.nanoTime();log.info("Running initialization on master for job {} ({}).", jobName, jobId);for (JobVertex vertex : jobGraph.getVertices()) {String executableClass = vertex.getInvokableClassName();if (executableClass == null || executableClass.isEmpty()) {throw new JobSubmissionException(jobId,"The vertex "+ vertex.getID()+ " ("+ vertex.getName()+ ") has no invokable class.");}try {vertex.initializeOnMaster(classLoader);} catch (Throwable t) {throw new JobExecutionException(jobId,"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),t);}}log.info("Successfully ran initialization on master in {} ms.",(System.nanoTime() - initMasterStart) / 1_000_000);// topologically sort the job vertices and attach the graph to the existing one// 这里会先做一个排序,source源节点会放在最前面,接着开始遍历// 必须保证当前添加到集合的节点的前置节点都已经添加进去了List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();if (log.isDebugEnabled()) {log.debug("Adding {} vertices from job graph {} ({}).",sortedTopology.size(),jobName,jobId);}// 构建执行图的重点方法。生成具体的ExecutionGraphexecutionGraph.attachJobGraph(sortedTopology);if (log.isDebugEnabled()) {log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);}// configure the state checkpointing// checkpoint的相关配置if (isCheckpointingEnabled(jobGraph)) {JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();// Maximum number of remembered checkpointsint historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);CheckpointStatsTracker checkpointStatsTracker =new CheckpointStatsTracker(historySize,snapshotSettings.getCheckpointCoordinatorConfiguration(),metrics);// load the state backend from the application settingsfinal StateBackend applicationConfiguredBackend;final SerializedValue<StateBackend> serializedAppConfigured =snapshotSettings.getDefaultStateBackend();if (serializedAppConfigured == null) {applicationConfiguredBackend = null;} else {try {applicationConfiguredBackend =serializedAppConfigured.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not deserialize application-defined state backend.", e);}}// StateBackend配置final StateBackend rootBackend;try {rootBackend =StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend,snapshotSettings.isChangelogStateBackendEnabled(),jobManagerConfig,classLoader,log);} catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);}// load the checkpoint storage from the application settingsfinal CheckpointStorage applicationConfiguredStorage;final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =snapshotSettings.getDefaultCheckpointStorage();if (serializedAppConfiguredStorage == null) {applicationConfiguredStorage = null;} else {try {applicationConfiguredStorage =serializedAppConfiguredStorage.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId,"Could not deserialize application-defined checkpoint storage.",e);}}final CheckpointStorage rootStorage;try {rootStorage =CheckpointStorageLoader.load(applicationConfiguredStorage,null,rootBackend,jobManagerConfig,classLoader,log);} catch (IllegalConfigurationException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured checkpoint storage", e);}// instantiate the user-defined checkpoint hooks// 示例化用户自定义的cp hookfinal SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =snapshotSettings.getMasterHooks();final List<MasterTriggerRestoreHook<?>> hooks;if (serializedHooks == null) {hooks = Collections.emptyList();} else {final MasterTriggerRestoreHook.Factory[] hookFactories;try {hookFactories = serializedHooks.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);}final Thread thread = Thread.currentThread();final ClassLoader originalClassLoader = thread.getContextClassLoader();thread.setContextClassLoader(classLoader);try {hooks = new ArrayList<>(hookFactories.length);for (MasterTriggerRestoreHook.Factory factory : hookFactories) {hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));}} finally {thread.setContextClassLoader(originalClassLoader);}}final CheckpointCoordinatorConfiguration chkConfig =snapshotSettings.getCheckpointCoordinatorConfiguration();// 创建CheckpointCoordinator对象executionGraph.enableCheckpointing(chkConfig,hooks,checkpointIdCounter,completedCheckpointStore,rootBackend,rootStorage,checkpointStatsTracker,checkpointsCleaner);}// create all the metrics for the Execution Graph// 添加metrics指标metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));return executionGraph;}

在这个方法里,会先创建一个 ExecutionGraph 对象,然后对 JobGraph 中的 JobVertex 列表做一下排序(先把有 source 节点的 JobVertex 放在最前面,然后开始遍历,只有当前 JobVertex 的前置节点都已经添加到集合后才能把当前 JobVertex 节点添加到集合中),最后通过过 attachJobGraph() 方法生成具体的ExecutionGraph。

在上面的代码中,最需要核心关注的方法是:executionGraph.attachJobGraph(sortedTopology);。该方法是创建ExecutionGraph的核心方法,包括了创建上面我们说的各种ExecutionGraph中涉及的对象,以及连接它们来形成ExecutionGraph拓扑结构。

接下来我们进入该方法来一探究竟。

生成ExecutionGraph:attachJobGraph

先来看下attachJobGraph方法的实现:

public void attachJobGraph(List<JobVertex> topologicallySorted) throws JobException {assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} "+ "vertices and {} intermediate results.",topologicallySorted.size(),tasks.size(),intermediateResults.size());final long createTimestamp = System.currentTimeMillis();// 遍历排序好的拓扑JobVertexfor (JobVertex jobVertex : topologicallySorted) {if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {this.isStoppable = false;}// 获取节点并行度信息VertexParallelismInformation parallelismInfo =parallelismStore.getParallelismInfo(jobVertex.getID());// create the execution job vertex and attach it to the graph// 创建ExecutionJobVertexExecutionJobVertex ejv =new ExecutionJobVertex(this,jobVertex,maxPriorAttemptsHistoryLength,rpcTimeout,createTimestamp,parallelismInfo,initialAttemptCounts.getAttemptCounts(jobVertex.getID()));// 重要方法!!!// 构建ExecutionGraph,连接上下游节点ejv.connectToPredecessors(this.intermediateResults);ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);if (previousTask != null) {throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",jobVertex.getID(), ejv, previousTask));}// 遍历ExecutionJobVertex的输出IntermediateResultfor (IntermediateResult res : ejv.getProducedDataSets()) {IntermediateResult previousDataSet =this.intermediateResults.putIfAbsent(res.getId(), res);if (previousDataSet != null) {throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",res.getId(), res, previousDataSet));}}this.verticesInCreationOrder.add(ejv);this.numVerticesTotal += ejv.getParallelism();}//将所有的执行顶点和结果分区注册到分布式资源管理系统中,以便能够进行分布式调度。
registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);// the topology assigning should happen before notifying new vertices to failoverStrategy// 转换执行拓扑executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);partitionGroupReleaseStrategy =// 创建部分组释放策略的方法,依赖于当前的调度的拓扑结构,这决定了当何时释放特定的中间数据结果所需的策略。
partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
}

在上面attchGraph方法中,首先遍历输入的排序后的JobVertex列表,对每一个JobVertex:

  • 判断是否停止: 对于单个 JobVertex,如果它是一个输入顶点且不可停止,则整个 Job 不可停止。这在流处理任务中是常见的,一些输入数据源可能无法停止(如Kafka)。
  • 获取并行信息并创建执行的顶点: 根据JobVertex的ID,从parallelismStore中获取并行信息。利用这些信息创建ExecutionJobVertex实例,它代表运行在特定TaskManager上的taskId,可以是待调度、运行或已完成的。
  • 判断新添加的顶点是否已经存在: 如果试图添加一个已经存在的顶点,这意味着存在程序错误,因为每个JobVertex应当有唯一的ID。这将抛出异常。
  • 判断数据集是否已经存在: 同样。如果试图添加一个已经存在的IntermediateResult,这将抛出异常。
  • 添加执行顶点到创建顺序列表和增加总的顶点数量: 记录创建顶点的顺序能够确保在执行时能够按照正确的依赖关系进行。并同时更新总的顶点数量。

遍历完成后, 注册执行顶点和结果分区,将所有的执行顶点和结果分区注册到分布式资源管理系统中,以便能够进行分布式调度。

利用DefaultExecutionTopology工具类将ExecutionGraph转换为SchedulingTopology,这样便于任务调度器进行处理。

最后,调用partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology())根据当前的调度的拓扑结构来创建组释放策略,这决定了当何时释放特定的中间数据结果所需的策略。

上面流程中,最需要关注的方法就是new ExecutionJobVertexejv.connectToPredecessors(this.intermediateResults);

接下来,我们分别对其进行探究。

创建 ExecutionJobVertex 对象

进入到该方法的源码中:

@VisibleForTesting
public ExecutionJobVertex(InternalExecutionGraphAccessor graph,JobVertex jobVertex,int maxPriorAttemptsHistoryLength,Time timeout,long createTimestamp,VertexParallelismInformation parallelismInfo,SubtaskAttemptNumberStore initialAttemptCounts)throws JobException {if (graph == null || jobVertex == null) {throw new NullPointerException();}this.graph = graph;this.jobVertex = jobVertex;this.parallelismInfo = parallelismInfo;// verify that our parallelism is not higher than the maximum parallelismif (this.parallelismInfo.getParallelism() > this.parallelismInfo.getMaxParallelism()) {throw new JobException(String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",jobVertex.getName(),this.parallelismInfo.getParallelism(),this.parallelismInfo.getMaxParallelism()));}this.resourceProfile =ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);this.taskVertices = new ExecutionVertex[this.parallelismInfo.getParallelism()];this.inputs = new ArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup = checkNotNull(jobVertex.getSlotSharingGroup());this.coLocationGroup = jobVertex.getCoLocationGroup();// create the intermediate resultsthis.producedDataSets =new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);this.producedDataSets[i] =new IntermediateResult(result.getId(),this,this.parallelismInfo.getParallelism(),result.getResultType());}// create all task verticesfor (int i = 0; i < this.parallelismInfo.getParallelism(); i++) {ExecutionVertex vertex =new ExecutionVertex(this,i,producedDataSets,timeout,createTimestamp,maxPriorAttemptsHistoryLength,initialAttemptCounts.getAttemptCount(i));this.taskVertices[i] = vertex;}// sanity check for the double referencing between intermediate result partitions and// execution verticesfor (IntermediateResult ir : this.producedDataSets) {if (ir.getNumberOfAssignedPartitions() != this.parallelismInfo.getParallelism()) {throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");}}final List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders =getJobVertex().getOperatorCoordinators();if (coordinatorProviders.isEmpty()) {this.operatorCoordinators = Collections.emptyList();} else {final ArrayList<OperatorCoordinatorHolder> coordinators =new ArrayList<>(coordinatorProviders.size());try {for (final SerializedValue<OperatorCoordinator.Provider> provider :coordinatorProviders) {coordinators.add(OperatorCoordinatorHolder.create(provider, this, graph.getUserClassLoader()));}} catch (Exception | LinkageError e) {IOUtils.closeAllQuietly(coordinators);throw new JobException("Cannot instantiate the coordinator for operator " + getName(), e);}this.operatorCoordinators = Collections.unmodifiableList(coordinators);}// set up the input splits, if the vertex has anytry {@SuppressWarnings("unchecked")InputSplitSource<InputSplit> splitSource =(InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();if (splitSource != null) {Thread currentThread = Thread.currentThread();ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();currentThread.setContextClassLoader(graph.getUserClassLoader());try {inputSplits =splitSource.createInputSplits(this.parallelismInfo.getParallelism());if (inputSplits != null) {splitAssigner = splitSource.getInputSplitAssigner(inputSplits);}} finally {currentThread.setContextClassLoader(oldContextClassLoader);}} else {inputSplits = null;}} catch (Throwable t) {throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);}
}

在上面这段代码中,主要实现了ExecutionVertex的创建和IntermediateResult对象的创建:

  • 遍历当前JobVertex的输出IntermediateDataSet列表,并根据IntermediateDataSet来创建相应的IntermediateResult对象。每个IntermediateDataSet都会对应一个IntermediateResult
  • 根据当前JobVertex的并发度,来创建相同数量的ExecutionVertex对象,每个ExecutionVertex对象代表一个并行计算任务,在实际执行时就是一个Task任务。

创建ExecutionVertex对象

进一步地,我们观察创建ExecutionVertex对象的实现逻辑如下所示:

public ExecutionVertex(ExecutionJobVertex jobVertex,int subTaskIndex,IntermediateResult[] producedDataSets,Time timeout,long createTimestamp,int maxPriorExecutionHistoryLength,int initialAttemptCount) {this.jobVertex = jobVertex;this.subTaskIndex = subTaskIndex;this.executionVertexId = new ExecutionVertexID(jobVertex.getJobVertexId(), subTaskIndex);this.taskNameWithSubtask =String.format("%s (%d/%d)",jobVertex.getJobVertex().getName(),subTaskIndex + 1,jobVertex.getParallelism());this.resultPartitions = new LinkedHashMap<>(producedDataSets.length, 1);// 根据IntermediateResult创建当前subTaskIndex分区下的IntermediateResultPartitonfor (IntermediateResult result : producedDataSets) {IntermediateResultPartition irp =new IntermediateResultPartition(result,this,subTaskIndex,getExecutionGraphAccessor().getEdgeManager());// 记录当前分区的irp到ir中result.setPartition(subTaskIndex, irp);// 记录分区ip与irp的对应关系resultPartitions.put(irp.getPartitionId(), irp);}this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);// 创建对应的Execution对象,初始化时initialAttempCount为0,如果后面重新调度这个task,它会自增加1this.currentExecution =new Execution(getExecutionGraphAccessor().getFutureExecutor(),this,initialAttemptCount,createTimestamp,timeout);getExecutionGraphAccessor().registerExecution(currentExecution);this.timeout = timeout;this.inputSplits = new ArrayList<>();
}

上述创建ExecutionVertex的过程主要实现了以下步骤:

  1. 生成中间结果分区IntermediateResultPartition
    中间结果分区代表一个并行任务产生的输出,同一并行任务可能会有多个输出(对应多个后续任务),也就是多个中间结果分区。
  • 基于 result,在相应的索引 subTaskIndex 上创建一个 IntermediateResultPartition 并给它赋值。IntermediateResultPartition 提供了并行任务的输出数据,对应于某个特定执行顶点 ExecutionVertex 的并行子任务。
  • 在创建过程中,需要使用 getExecutionGraphAccessor().getEdgeManager() 获取边管理器,边管理器是用于维护这个分区与其它 ExecutionVertex 之间的连接关系。
  • 记录这个 IntermediateResultPartition 到 result 中的相应索引位置,并在 resultPartitions 映射表中保存 IntermediateResultPartition。
  1. 创建执行(Execution)对象:
    这一过程是基于 Execution 的构造函数引发的。它用于代表该 ExecutionVertex 在某一特定点时间的一次尝试执行。创建 Execution 实例后,会将其注册到执行图(ExecutionGraph)中,以便于后续调度和执行任务。

通过以上流程,生成了中间结果分区,映射了每一个分区和其对应的任务关系,并且创建了 Execution 对象用于管理并跟踪任务的执行状态。

在创建好ExecutionVertex和IntermediateResultPartition后,根据上面的流程图,就是考虑如何将它们进行连接生成ExecutionGraph了。

这部分的实现逻辑就在attachJobGraph方法的ejv.connectToPredecessors(this.intermediateResults);方法中实现的。

生成ExecutionGraph

同样地,我们进入源码来深入观察一下实现逻辑:

public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets)throws JobException {List<JobEdge> inputs = jobVertex.getInputs();if (LOG.isDebugEnabled()) {LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.",jobVertex.getID(), jobVertex.getName(), inputs.size()));}for (int num = 0; num < inputs.size(); num++) {JobEdge edge = inputs.get(num);if (LOG.isDebugEnabled()) {if (edge.getSource() == null) {LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",num,jobVertex.getID(),jobVertex.getName(),edge.getSourceId()));} else {LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",num,jobVertex.getID(),jobVertex.getName(),edge.getSource().getProducer().getID(),edge.getSource().getProducer().getName()));}}// fetch the intermediate result via ID. if it does not exist, then it either has not// been created, or the order// in which this method is called for the job vertices is not a topological orderIntermediateResult ires = intermediateDataSets.get(edge.getSourceId());if (ires == null) {throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "+ edge.getSourceId());}this.inputs.add(ires);EdgeManagerBuildUtil.connectVertexToResult(this, ires, edge.getDistributionPattern());}
}

这段代码主要完成了将当前的ExecutionJobVertex与其前置任务(predecessors)连接的流程。传入的参数intermediateDatasets包含了JobGraph中所有的中间计算结果,这些结果是由上游前置任务产生的。

需要注意的是,该过程要求连接操作的执行顺序应遵循任务的拓扑顺序。Flink的计算任务通常由多个阶段组成,每个阶段的输出是下一个阶段的输入,每个阶段(JobVertex)都处理一种类型的计算,例如map或reduce。

流程大致如下:

  • 获取输入: 首先获取jobVertex的输入,输入是JobEdge列表,每一条JobEdge都代表一个上游产生的中间数据集和连接上下游的方式(例如HASH, BROADCAST)。
  • 循环处理每个输入: 然后遍历这些inputs,对于每一条JobEdge:
    • 基于edge.getSourceId()从intermediateDatasets获取IntermediateResult,这是一个中间计算结果。
    • 检查该中间结果是否存在,如果不存在,则表示这不是一个拓扑排序,因为预期的情况是当你尝试访问一个中间结果时,它应该已经被创建了。如果找不到,抛出一个异常。
    • 如果存在(没有异常被抛出),将找到的IntermediateResult添加到ExecutionJobVertex的inputs(List类型)中,这样当前任务就知道它的输入来自哪些中间结果。
    • 调用EdgeManagerBuildUtil.connectVertexToResult方法来建立当前ExecutionJobVertex与找到的IntermediateResult之间的连接。 EdgeManager是Flink中负责管理输入输出边的组件,它显示地记录了发送端的分区和接收端的分区对应关系。

这个流程重要的是建立了Job中每个任务的执行依赖关系,并明确了数据传输的方式,让任务在执行时清楚自己的输入来自哪里,当任务执行完成后,它产生的输出会通过何种方式被发送到哪些任务。

具体的连接方式,我们需要继续进入到EdgeManagerBuildUtil.connectVertexToResult方法中。其源码如下所示:

/*** Calculate the connections between {@link ExecutionJobVertex} and {@link IntermediateResult} ** based on the {@link DistributionPattern}.** @param vertex the downstream consumer {@link ExecutionJobVertex}* @param intermediateResult the upstream consumed {@link IntermediateResult}* @param distributionPattern the {@link DistributionPattern} of the edge that connects the*     upstream {@link IntermediateResult} and the downstream {@link IntermediateResult}*/
static void connectVertexToResult(ExecutionJobVertex vertex,IntermediateResult intermediateResult,DistributionPattern distributionPattern) {switch (distributionPattern) {// 点对点的连接方式case POINTWISE:connectPointwise(vertex.getTaskVertices(), intermediateResult);break;// 全连接的方式case ALL_TO_ALL:connectAllToAll(vertex.getTaskVertices(), intermediateResult);break;default:throw new IllegalArgumentException("Unrecognized distribution pattern.");}
}

会根据DistributionPattern选择不同的连接方式,这里主要分两种情况,DistributionPattern是跟Partitioner的配置有关。

这里以POINTWISE的连接方式来举例,看一下其是如何在构造ExecutionGraph时连接上下游节点的。

private static void connectPointwise(ExecutionVertex[] taskVertices, IntermediateResult intermediateResult) {final int sourceCount = intermediateResult.getPartitions().length;final int targetCount = taskVertices.length;if (sourceCount == targetCount) {for (int i = 0; i < sourceCount; i++) {ExecutionVertex executionVertex = taskVertices[i];IntermediateResultPartition partition = intermediateResult.getPartitions()[i];ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());partition.addConsumers(consumerVertexGroup);ConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(partition.getPartitionId(), intermediateResult);executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);}} else if (sourceCount > targetCount) {for (int index = 0; index < targetCount; index++) {ExecutionVertex executionVertex = taskVertices[index];ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromSingleVertex(executionVertex.getID());int start = index * sourceCount / targetCount;int end = (index + 1) * sourceCount / targetCount;List<IntermediateResultPartitionID> consumedPartitions =new ArrayList<>(end - start);for (int i = start; i < end; i++) {IntermediateResultPartition partition = intermediateResult.getPartitions()[i];partition.addConsumers(consumerVertexGroup);consumedPartitions.add(partition.getPartitionId());}ConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(consumedPartitions, intermediateResult);executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);}} else {for (int partitionNum = 0; partitionNum < sourceCount; partitionNum++) {IntermediateResultPartition partition =intermediateResult.getPartitions()[partitionNum];ConsumedPartitionGroup consumedPartitionGroup =createAndRegisterConsumedPartitionGroupToEdgeManager(partition.getPartitionId(), intermediateResult);int start = (partitionNum * targetCount + sourceCount - 1) / sourceCount;int end = ((partitionNum + 1) * targetCount + sourceCount - 1) / sourceCount;List<ExecutionVertexID> consumers = new ArrayList<>(end - start);for (int i = start; i < end; i++) {ExecutionVertex executionVertex = taskVertices[i];executionVertex.addConsumedPartitionGroup(consumedPartitionGroup);consumers.add(executionVertex.getID());}ConsumerVertexGroup consumerVertexGroup =ConsumerVertexGroup.fromMultipleVertices(consumers);partition.addConsumers(consumerVertexGroup);}}
}

上面这段代码的目的是通过“点对点”的方式(即每个生产者产生的数据只被一个消费者消费)建立任务节点(ExecutionVertex)与中间结果集(IntermediateResultPartition)之间的连接关系。

这个方法的逻辑主要是根据上游任务产生的IntermediateResultPartition的数量(源)和下游ExecutionVertex节点数量(目标)的比例关系,做不同的操作:

  • 源和目标数量相等:方法会将每个源中间结果分区与对应的下游ExecutionVertex节点连接。这种情况下,每个任务都完全独立,只会消费一个特定的上游中间结果分区
  • 源数量大于目标数量:源中间结果分区会被尽可能平均地分配给下游ExecutionVertex节点,即每个ExecutionVertex可能会消费多个源中间结果分区数据。
  • 源数量小于目标数量:每个源中间结果分区可能会被分配给多个下游ExecutionVertex节点消费,即多个ExecutionVertex节点可能消费同一个源中间结果分区数据。

⠀在执行连接的过程中,会创建ConsumerVertexGroup和ConsumedPartitionGroup:

  • ConsumerVertexGroup包含一组接收同一个中间结果分区(IntermediateResultPartition)的顶点集合。
  • ConsumedPartitionGroup包含顶点要消费的一组中间结果分区。

⠀注意,当源数量小于目标数量时,会有多个任务消费同一个源数据,所以需要使用ConsumerVertexGroup.fromMultipleVertices(consumers)来创建ConsumerVertexGroup。

几种连接情况的示例图如下所示:
在这里插入图片描述

到这里,这个作业的 ExecutionGraph 就创建完成了,有了 ExecutionGraph,JobManager 才能对这个作业做相应的调度。

总结

本文详细介绍了JobGraph生成ExecutionGraph的流程,介绍了ExecutionJobVertex、ExecutionVertex、IntermediateResult、IntermediateResultPartition相关概念的原理和生成过程。最后我们介绍了Flink在生成ExecutionGraph时是如何实现IntermediateResultPartition和ExecutionVertex的连接的。

到这里,StreamGraph、JobGraph和Execution的生成过程,在最近的三篇文章中都已经详细讲解完成了,当然除了我们介绍的内容外,还有更多的实现细节没有介绍,有兴趣的读者可以参考文本来阅读源码,以此加深自己的理解和对更多实现细节的挖掘。

最后,再对StreamGraph、JobGraph和ExecutionGraph做一个总结:

  • StreamGraph. StreamGraph 是表示 Flink 流计算的图模型,它是用户定义的计算逻辑的内部表示形式,是最原始的用户逻辑,一个没有做任何优化的DataFlow;
  • JobGraph. JobGraph 由一个或多个 JobVertex 对象和它们之间的 JobEdge 对象组成,包含并行任务的信息。在JobGraph中对StreamGraph进行了优化,将能够合并在同个算子链中的操作符进行合并,以此减少任务执行时的上下文切换,提任务执行性能。
  • ExecutionGraph. ExecutionGraph是 JobGraph 的并发执行版本,由 ExecutionVertex 和 IntermediateResultPartition 组成。每个 JobVertex 会被转换为一个或多个 ExecutionVertex,ExecutorGraph 包含了每个任务的全部实例,包含它们的状态、位置、输入输出结果。ExecutionGraph 是 Flink 中最核心的部分,它用于任务的调度、失败恢复等。

参考:
https://matt33.com/2019/12/20/flink-execution-graph-4/

相关文章:

Flink源码解析之:如何根据JobGraph生成ExecutionGraph

Flink源码解析之&#xff1a;如何根据JobGraph生成ExecutionGraph 在上一篇Flink源码解析中&#xff0c;我们介绍了Flink如何根据StreamGraph生成JobGraph的流程&#xff0c;并着重分析了其算子链的合并过程和JobGraph的构造流程。 对于StreamGraph和JobGraph的生成来说&…...

活动预告 |【Part2】Microsoft 安全在线技术公开课:安全性、合规性和身份基础知识

课程介绍 通过参加“Microsoft 安全在线技术公开课&#xff1a;安全性、合规性和身份基础知识”活动提升你的技能。在本次免费的介绍性活动中&#xff0c;你将获得所需的安全技能和培训&#xff0c;以创造影响力并利用机会推动职业发展。你将了解安全性、合规性和身份的基础知…...

网络基础入门到深入(2):网络协议-TCP/IP协议栈

目录 一.TCP/IP协议栈的四层结构 二.每一层的作用与协议 1.作用层 作用&#xff1a; 常见协议: 示例: 2.传输层 作用: 核心功能: 3.网络层 作用: 核心功能: 常见协议: 示例: 4.数据链路层(物理层) 作用: 核心功能: 常见技术: 示例: 三.TCP/IP协议栈的分层…...

美畅物联丨视频上云网关获取视频流地址供第三方调用的方法

在视频监控与流媒体传输领域&#xff0c;视频流地址的获取与调用是极为关键的环节。视频上云网关作为一款高效且稳定的视频传输设备&#xff0c;为获取视频流地址提供了便捷途径&#xff0c;从而使外部系统或平台能够方便地进行调用。今天我们就来讨论一下如何在视频上云网关上…...

【Cesium】一、cesium简介

文章目录 前言1.什么是Cesium&#xff1f;2.Cesium能做什么&#xff1f;3.Cesium的依赖性4.Cesium学习参考 前言 本人是前端&#xff0c;主要是开发web&#xff0c;使用技术栈Vue、Js。最近因工作需要开始学习使用Cesium&#xff0c;找到一位博主的文章很好&#xff0c;一边学…...

微服务架构介绍

微服务架构是一种现代化的软件架构风格&#xff0c;它将应用程序构建为一组小型、自治的服务&#xff0c;每个服务都运行在其独立的进程中&#xff0c;服务与服务之间通过轻量级通信机制&#xff08;通常是HTTP/RESTful API&#xff09;进行通信。 1. 服务&#xff08;Service&…...

SOLID-开闭原则

单一职责原则&#xff1a;https://blog.csdn.net/dmk877/article/details/143447010 在前面我们学习了单一职责原则&#xff0c;今天来一起学习一下SOLID原则中的开闭原则(Open-Closed Principle, OCP) 通过本篇博客你将学到到以下内容 ①什么是开闭原则 ②如何实现开闭原则 ③…...

Mac 安装 Flutter 提示 A network error occurred while checking

错误信息 A network error occurred while checking "https://maven.google.com/": Operation timed out原因 在中国大陆(由于访问 Google 服务器的限制导致超时),无法连接到 https://maven.google.com/ 解决方案 需要使用镜像网站 #flutter 使用国内的镜像 export …...

Rocky Linux下安装meld

背景介绍&#xff1a; meld是一款Linux系统下的用于 文件夹和文件的比对软件&#xff0c;非常常用&#xff1b; 故障现象&#xff1a; 输入安装命令后&#xff0c;sudo yum install meld&#xff0c;报错。 12-31 22:12:17 ~]$ sudo yum install meld Last metadata expirat…...

Sentinel 介绍与使用指南:构建高可用、可靠的微服务架构

在微服务架构中&#xff0c;服务间的依赖和调用非常复杂&#xff0c;这也带来了高并发、大流量等挑战。 如何确保系统在高负载情况下仍能稳定运行&#xff0c;如何避免某个服务的故障影响整个系统的稳定性&#xff1f;Sentinel&#xff0c;作为一个轻量级的、专为分布式系统设计…...

异步请求在TypeScript网络爬虫中的应用

异步请求的重要性 异步请求是现代网络应用中不可或缺的一部分&#xff0c;特别是在网络爬虫领域。它允许爬虫在等待网络响应的同时继续执行其他任务&#xff0c;从而提高效率和性能。在JavaScript和TypeScript中&#xff0c;异步请求可以通过多种方式实现&#xff0c;包括回调…...

智能商业分析 Quick BI

Quick BI 是阿里云提供的一款智能商业分析&#xff08;BI&#xff09;工具&#xff0c;旨在帮助企业快速获取业务洞察、优化决策过程、提升数据分析效率。通过强大的数据可视化和分析功能&#xff0c;Quick BI 能够帮助用户轻松连接多种数据源、创建多维度的报表和仪表盘&#…...

[算法] [leetcode-75] 颜色分类

75 颜色分类 给定一个包含红色、白色和蓝色、共 n 个元素的数组 nums &#xff0c;原地 对它们进行排序&#xff0c;使得相同颜色的元素相邻&#xff0c;并按照红色、白色、蓝色顺序排列。 我们使用整数 0、 1 和 2 分别表示红色、白色和蓝色。 必须在不使用库内置的 sort 函…...

抖音短视频矩阵系统源码开发技术解析

开发概览&#xff1a; 抖音短视频矩阵系统的构建基于一系列现代技术栈&#xff0c;主要包括VUE, Spring Boot和Django。本文档旨在为开发者提供关于短视频矩阵系统源代码的开发与部署指南。 技术框架分析&#xff1a; 前端技术选型&#xff1a; 对于前端界面的构建&#xf…...

Linux(CentOS)安装 MySQL

CentOS版本&#xff1a;CentOS 7 三种安装方式&#xff1a; 一、通过 yum 安装&#xff0c;最简单&#xff0c;一键安装&#xff0c;全程无忧。 二、通过 rpm 包安装&#xff0c;需具备基础概念及常规操作。 三、通过 gz 包安装&#xff0c;需具备配置相关操作。 --------…...

头歌实训数据结构与算法-二叉树及其应用(第9关:二叉树的顺序存储及基本操作)

任务描述 本关任务&#xff1a;以顺序结构存储二叉树&#xff0c;编写前序、中序、后序及层次顺序遍历二叉树的算法&#xff0c;并计算二叉树深度、所有结点总数。 相关知识 二叉树的定义 二叉树的递归定义&#xff1a; 二叉树或者是一棵空树。 或者是一棵由一个根结点和两…...

打印进度条

文章目录 1.Python语言实现(1)黑白色(2)彩色&#xff1a;蓝色 2.C语言实现(1)黑白颜色(2)彩色版&#xff1a;红绿色 1.Python语言实现 (1)黑白色 import sys import timedef progress_bar(percentage, width50):"""打印进度条:param percentage: 当前进度百分比…...

【LLM】Langflow 的简单使用

(PS&#xff1a;爆肝整理&#xff0c;请不要吝啬你的点赞和收藏。) 什么是 Langflow &#xff1f;Langflow 是一种用于构建多智能体和RAG应用的可视化框架。它提供了个无需编码的 AI 生态系统&#xff0c;能够无缝集成各种常用工具和技术栈。Langflow 以 Python 为基础&#x…...

探索 DC-SDK:强大的 3D 地图开发框架

在现代 Web 开发中&#xff0c;地理信息系统&#xff08;GIS&#xff09;和 3D 地图可视化变得越来越重要。dc-sdk 是一个基于 Cesium 的开源 WebGL 地图开发框架&#xff0c;它提供了丰富的地图可视化功能和简单易用的 API&#xff0c;使开发者能够轻松地在 Web 应用中集成 3D…...

3.5mm耳机接口硬件连接

结构 以最复杂的结构为例 简单的结构无非就是没有MIC&#xff08;麦克风&#xff09;接口 上图的5就是Detect的作用 上面这两款产品都为3.5mm的音频插座&#xff0c;图一 为连接4节的音频座&#xff0c;而且有两个开关&#xff0c;1接地&#xff0c;2接MIC&#xff0c;3接左声…...

nvidia_gpu_exporter 显卡监控

导入 grafana/dashboard.json https://github.com/utkuozdemir/nvidia_gpu_exporter/blob/master/grafana/dashboard.json参考 nvidia_gpu_exporter...

聊聊 Mongod 以及 MongoDB 常用命令

Mongod mongod 是 MongoDB 数据库服务器的核心守护进程&#xff0c;它负责启动并管理 MongoDB 数据库实例。简单来说&#xff0c;mongod 是 MongoDB 数据库服务器程序&#xff0c;它负责处理数据存储、数据请求、数据复制等后台服务。运行 mongod 是启动 MongoDB 数据库的第一…...

webrtc 源码阅读 make_ref_counted模板函数用法

目录 1. 模板参数解析 1.1 typename T 1.2 typename... Args 1.3 typename std::enable_if::value, T>::type* nullptr 2. scoped_refptr 3. new RefCountedObject(std::forward(args)...); 4. 综合说明 5.在webrtc中的用法 5.1 peerConnectionFactory对象的构建过…...

僵尸进程,孤儿进程、守护进程以及wait函数,waitpid函数

僵尸进程 如果子进程退出&#xff0c;但是父进程没有调用 wait 或 waitpid 获取子进程的状态信息&#xff0c;那么子进程的进程描述符&#xff08;task_struct&#xff09;仍然保存在系统中&#xff0c;那么该子进程叫做僵尸进程 #include<iostream> #include<pthre…...

Kafka消息不丢失与重复消费问题解决方案总结

1. 生产者层面 异步发送与回调处理 异步发送方式&#xff1a;生产者一般使用异步方式发送消息&#xff0c;异步发送有消息和回调接口两个参数。在回调接口的重写方法中&#xff0c;可通过异常参数判断消息发送状态。若消息发送成功&#xff0c;异常参数为null&#xff1b;若发…...

Docker新手:在tencent云上实现Python服务打包到容器

1 使用docker的原因 一致性和可移植性&#xff1a;Docker 容器可以在任何支持 Docker 的环境中运行&#xff0c;无论是开发者的笔记本电脑、测试服务器还是生产环境。这确保了应用在不同环境中的行为一致&#xff0c;减少了“在我的机器上可以运行”的问题。 隔离性&#xff…...

什么是 Spring 的组件(Bean)

什么是 Spring 的组件&#xff08;Bean&#xff09;&#xff1f; Spring 会自动创建、初始化、装配和销毁这些对象。Spring 使用 IoC&#xff08;控制反转&#xff09; 和 DI&#xff08;依赖注入&#xff09; 的理念&#xff0c;将应用程序的对象交给 Spring 容器统一管理&am…...

PawSQL性能巡检平台 (3) - 慢查询采集和优化

在数据库运维管理中&#xff0c;慢查询一直是影响系统性能的重要因素。本文将详细介绍PawSQL数据库性能巡检平台在慢查询管理和优化方面的功能特性&#xff0c;帮助数据库管理员更好地应对性能挑战。 一、PawSQL巡检平台慢查询管理概述 PawSQL平台提供了全面的慢查询管理功能&…...

虚拟机Centos下安装Mysql完整过程(图文详解)

目录 一. 准备工作 1. 设置虚拟机静态IP 2. 卸载Mysql 3. 给CentOS添加rpm源 二. 安装MySQL 1. 安装mysql服务 2. 启动mysql服务 3. 开启MySQL开机自启动 4. 查看mysql服务状态 5. 查看mysql初始密码 6. 登录mysql &#xff0c;修改密码 7. 允许外部访问MySQL数据库…...

微服务保护-sentinel

为什么要有微服务保护&#xff1f; 微服务保护是为了避免微服务雪崩而出现的&#xff0c;每个微服务能处理的请求是有限的&#xff0c;如果一个微服务出现问题导致一个请求进入微服务的时间太久&#xff0c;就会导致大量去请求停滞在微服务内部&#xff0c;这样就会过分占用系统…...

Redis Java 集成到 Spring Boot

Hi~&#xff01;这里是奋斗的明志&#xff0c;很荣幸您能阅读我的文章&#xff0c;诚请评论指点&#xff0c;欢迎欢迎 ~~ &#x1f331;&#x1f331;个人主页&#xff1a;奋斗的明志 &#x1f331;&#x1f331;所属专栏&#xff1a;Redis &#x1f4da;本系列文章为个人学习笔…...

RabbitMQ实现生产者消费者

一.启动MQ 注意管理员身份进入cmd才行,我这里是在本地安装的MQ,推荐使用虚拟机安装 二.思路 官方解释RabbitMQ结构: 自我理解RabbitMQ结构: 其实RabbitMQ的服务器就像邮局一样,我们的生产者和消费者对于这个服务器来说都是消费者,因为服务器都可以向两者发送消息 环境准备 …...

stm32f103zet6 ds18b20

main.c // main.c #include "sys.h" #include "ds18b20.h"int main(void){ uart_init(9600);delay_init();while(DS18B20_Init()) //DS18B20初始化 {printf("error");delay_ms(200);}while(1){printf("%4.2f\r\n",Get_Temp());}}ds18…...

期权懂|期权入门知识:开通50ETF期权需要什么条件?

锦鲤三三每日分享期权知识&#xff0c;帮助期权新手及时有效地掌握即市趋势与新资讯&#xff01; 开通50ETF期权需要什么条件&#xff1f; 一、基本资格要求 &#xff08;1&#xff09;年龄限制&#xff1a;投资者必须年满18周岁&#xff0c;具备完全民事行为能力。 &#…...

Linux day 1129

家人们今天继续学习Linux&#xff0c;ok话不多说一起去看看吧 三.Linux常用命令 3.1 Linux命令体验 3.1.1 常用命令演示 在这一部分中&#xff0c;我们主要介绍几个常用的命令&#xff0c;让大家快速感 受以下 Linux 指令的操作方式。主要包含以下几个指令&#xff1a; ls命…...

智能家居体验大变革 博联 AI 方案让智能不再繁琐

1. 全球AI技术发展背景及智能家居市场趋势 人工智能&#xff08;AI&#xff09;技术的飞速发展正在推动全球各行业的数字化转型。国际电信联盟与德勤联合发布《人工智能向善影响》报告指出&#xff0c;全球94%的商界领袖认为&#xff0c;人工智能技术对于其企业在未来5年内的发…...

git使用

git初始化 git init 指定要添加的文件 git add [文件名1] [文件名2] [文件名3] // 添加指定文件 git add . // 添加当前目录所有文件 将文件提交到本地仓库 git commit -m "备注信息" 添加远程仓库 git remote add origin [远程仓库地址] git remote -v // …...

嵌入科技的温情

嵌入式世界&#xff0c;是一个微小却无比精妙的宇宙。晶体管之间的脉动&#xff0c;仿佛是心跳的回响&#xff1b;代码中跳跃的逻辑&#xff0c;犹如人生中不可预知的转折。每一个嵌入式系统&#xff0c;都像是一个看不见的灵魂&#xff0c;将冰冷的机器唤醒&#xff0c;为生活…...

python利用selenium实现大麦网抢票

大麦网&#xff08;damai.cn&#xff09;是中国领先的现场娱乐票务平台&#xff0c;涵盖演唱会、音乐会、话剧、歌剧、体育赛事等多种门票销售。由于其平台上经常会有热门演出&#xff0c;抢票成为许多用户关注的焦点。然而&#xff0c;由于票务资源的有限性&#xff0c;以及大…...

PS等软件学习笔记

目录 一、ps基础操作快捷键 1、快速打开图片 2、屏幕画布变大变小 3、移动画布 4、CTRL回车&#xff0c;快速完成更改 5、还原 6、创建画布&#xff0c;CTRLN 7、复制图层&#xff0c;CTRLJ 8、一段文字行间距调整 9、反向选择&#xff0c;CTRLSHIFTI 10、抠图 二、…...

vue3学习笔记(9)-pinia、storeToRefs、getters

1.新的集中式状态&#xff08;数据&#xff09;管理库&#xff0c;redux vuex pinia 搭建 2.ref拆包 如果在reactive里面定义ref&#xff0c;则打印c时&#xff0c;无需.value 他自动拆包&#xff0c;如果直接在外面定义的ref则需要.value,他没有拆包 3.pinia存储读取数据 存…...

数据库基础知识---以MySQL为例

一、什么是MySQL 数据保存在不同的表中&#xff0c;而不是将所有数据放在一个大仓库内 二、特点 开源--免费下载跨平台--可以在多个操作系统进行运行性能好--可以出来大量数据简单--安装配置简单支持多种编程语言--可以与多种编程语言进行无缝集成 三、分类 DDL--数据定义…...

013-spring的注解整合第三方框架

给spring的ioc容器中添加对象 常用这3个方法...

使用ForceBindIP绑定应用到指定IP

前言 使用ForceBindIP工具&#xff0c;用户可以轻松地将特定应用程序绑定到指定的IP地址&#xff0c;从而确保应用程序的网络连接通过指定的网络适配器进行。通过在命令提示符下运行ForceBindIP并指定IP地址和应用程序的完整路径&#xff0c;用户能够控制应用程序的网络流量&a…...

python-LeetCode-两数之和

1. 两数之和 - 力扣&#xff08;LeetCode&#xff09; class Solution:def twoSum(self, nums: List[int], target: int) -> List[int]:# 创建一个哈希表用于存储值和索引num_to_index {}for i, num in enumerate(nums):# 计算目标值需要的补数complement target - num# 如…...

项目开发实践——基于SpringBoot+Vue3实现的在线考试系统(四)

文章目录 一、管理员角色功能实现1、添加教师功能实现1.1 页面设计1.2 前端功能实现1.3 后端功能实现1.4 效果展示2、教师管理功能实现2.1 页面设计2.2 前端功能实现2.3 后端功能实现2.3.1 后端查询接口实现2.3.2 后端编辑接口实现2.3.3 后端删除接口实现2.4 效果展示二、代码下…...

大语言模型的token和向量

现在大语言模型火了&#xff0c;像 ChatGPT 什么的&#xff0c;能回答问题、写文章&#xff0c;。但它们为啥这么聪明呢&#xff1f;这就和向量、Token 有关系。那怎么通过向量、Token来理解我们的问题呢。看完这篇文章就知道了 token Token 就像是语言里的小积木&#xff0c…...

Hyperledger Fabric有那些核心技术,和其他区块链对比Hyperledger Fabric有那些优势

Hyperledger Fabric是一个模块化、权限化的企业级区块链平台&#xff0c;与比特币、以太坊等公有链相比&#xff0c;Fabric主要为私有链或联盟链设计&#xff0c;适用于企业应用。它包含多项核心技术&#xff0c;使其在企业级区块链应用中具有独特优势。以下是Fabric的核心技术…...

ThinkPHP 8高效构建Web应用-第一个简单的MVC应用示例

【图书介绍】《ThinkPHP 8高效构建Web应用》-CSDN博客 《2025新书 ThinkPHP 8高效构建Web应用 编程与应用开发丛书 夏磊 清华大学出版社教材书籍 9787302678236 ThinkPHP 8高效构建Web应用》【摘要 书评 试读】- 京东图书 使用VS Code开发ThinkPHP项目-CSDN博客 我们先实现一…...

【Goland】怎么执行 go mod download

1、终端的执行 go mod tidy 2、终端执行不行的话&#xff0c;就可以通过右击go.mod文件来执行&#xff1b; 3、也可以按住Ctrl点击这个包安装&#xff1b;...