当前位置: 首页 > news >正文

Flink源码解析之:Flink On Yarn模式任务提交部署过程解析

Flink源码解析之:Flink On Yarn模式任务提交部署过程解析

一、Flink on Yarn部署模式概述

Apache Hadoop YARN 在许多数据处理框架中都很流行。 Flink 服务提交给 YARN 的 ResourceManager,后者会在 YARN NodeManagers 管理的机器上生成容器。 Flink 将其 JobManager 和 TaskManager 实例部署到这些容器中。

Flink 可根据在 JobManager 上运行的作业所需的处理插槽数量,动态分配和取消分配 TaskManager 资源。

Flink on Yarn的部署模式包括三种方式,Application Mode、Per-Job Mode、Session Mode。对于生成环境来说,更推荐使用Application Mode或Per-Job Mode,因为这两种模式能够提供更好的应用隔离性。

  1. Application Mode
    Application Mode模式将在 YARN 上启动一个 Flink 集群,应用程序 jar 的 main() 方法将在 YARN 的 JobManager 上执行。 应用程序一旦完成,群集就会关闭。 该种方式相比Per-Job模式来说,将应用main()方法的执行,StreamGraph、JobGraph的生成放在了Flink集群侧来实现。
  2. Per-Job Mode
    Per-job 模式将在 YARN 上启动一个 Flink 集群,在客户端生成StreamGraph、JobGraph,并上传依赖项。最后将 JobGraph 提交给 YARN 上的 JobManager。 如果通过—detached参数配置了分离模式,则客户端将在提交被接受后立即停止。
  3. Session Mode
    Session部署模式会在YARN上部署一个长期运行的Flink集群会话,该会话可以接受并执行多个Flink作业。
    Session部署模式包含两种操作模式:
    • attach mode(default):执行yarn-session.sh文件在Yarn上启动Flink集群,启动后客户端会一致运行,来追踪/监听集群状态。一旦集群异常,客户端会获取异常信息并展示。如果客户端异常终止了,则会发送signal到Flink集群,此时Flink集群同样也会终止。
    • detach mode :使用-d or --detached参数设置。在这种模式下,当执行yarn-session.sh文件在Yarn上启动Flink集群后,客户端会直接返回。要停止 Flink 群集,需要再次调用客户端或 YARN 工具。

三种提交模式的对比:
bin/flink.sh脚本可知,客户端提交过程统一由org.apache.flink.client.cli.CliFronted入口类触发。Per-Job模式和Session模式下Flink应用main方法都会在客户端执行。客户端解析生成JobGraph后会将依赖项和JobGraph序列化后的二进制数据一起发往集群上。当客户端机器上有大量作业提交时,需要大量的网络带宽下载依赖项并将二进制文件发送到集群,会造成客户端消耗大量的资源。尤其在大量用户共享客户端时,问题更加突出。为解决该问题,社区提出了Application模式将Flink应用main方法触发过程后置到了JobManager生成过程中,以此将带宽压力分散到集群各个节点上。

鉴于Application部署模式的优势,本文会以Application部署模式的源码来进行解析,探究Flink以Application模式提交任务到Yarn集群中所经过的大致流程,为我们理解Flink On Yarn的部署有一个更深入和清晰的认识。

二、Flink Application部署模式源码解析

(一)CliFronted入口类

本节以Application部署模式为例,介绍Flink On Yarn的客户端提交源码流程。正如上文说的,由bin/flink.sh脚本可知,客户端提交过程统一由org.apache.flink.client.cli.CliFronted入口类触发,为此,我们首先进入到该方法的源码中,来观察下该入口类的实现逻辑:

/** Submits the job based on the arguments. */
public static void main(final String[] args) {EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);// 1. find the configuration directory// 用来获取配置目录,该目录通常包含Flink的配置文件,如flink-conf.yaml。final String configurationDirectory = getConfigurationDirectoryFromEnv();// 2. load the global configuration// 加载flink的全局配置final Configuration configuration =GlobalConfiguration.loadConfiguration(configurationDirectory);// 3. load the custom command lines// 加载自定义的命令行配置final List<CustomCommandLine> customCommandLines =loadCustomCommandLines(configuration, configurationDirectory);int retCode = 31;try {// 实例化了CliFronted对象,CliFronted是Flink为CLI客户端提供的API,它提供了一系列的操作,例如作业的提交,取消,以及打印job的状态等。final CliFrontend cli = new CliFrontend(configuration, customCommandLines);SecurityUtils.install(new SecurityConfiguration(cli.configuration));// 启动Flink作业的入口,parseAndRun方法会解析命令行参数,并启动Flink作业。retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));} catch (Throwable t) {final Throwable strippedThrowable =ExceptionUtils.stripException(t, UndeclaredThrowableException.class);LOG.error("Fatal error while running command line interface.", strippedThrowable);strippedThrowable.printStackTrace();} finally {System.exit(retCode);}
}

上述代码是CliFronted的入口main方法,该方法首先根据Flink的配置路径加载全局配置,比如flink-conf.xml配置文件,接着加载自定义命令行配置,并实例化了CliFronted对象。CliFronted是Flink为CLI客户端提供的API,它提供了一系列的操作,例如作业的提交,取消,以及打印job的状态等。最后,调用cli.parseAndRun(args)方法,该方法会解析命令行参数,并启动Flink作业。

parseAndRun方法中,会根据传入参数的第一个参数值来决定Flink集群的部署模式:

  • run-application:则会进入到CliFronted类的runApplication方法中,执行Application部署流程。
  • run:则会进入到CliFronted类的run方法中,在客户端执行作业的main方法(利用反射来执行)

这也是为什么我在使用命令行以Application模式部署Flink集群时,命令的开始要用以下形式:

/bin/flink run-application -t yarn-application...

(二)runApplication

接下来,我们继续进入到runApplicaiton方法来看看它的实现逻辑:

protected void runApplication(String[] args) throws Exception {LOG.info("Running 'run-application' command.");// 解析传入的命令行参数final Options commandOptions = CliFrontendParser.getRunCommandOptions();final CommandLine commandLine = getCommandLine(commandOptions, args, true);// 如果命令行参数中包含帮助选项(-h/--help),则调用下述方法打印帮助信息并返回if (commandLine.hasOption(HELP_OPTION.getOpt())) {CliFrontendParser.printHelpForRunApplication(customCommandLines);return;}// 验证并获取激活的自定义命令行, CustonCommandLine是Flink用来处理不同部署模式的工具(例如Yarn,Standlone等),以便针对不同模式解析对应的特定设置和参数final CustomCommandLine activeCommandLine =validateAndGetActiveCommandLine(checkNotNull(commandLine));// 初始化ApplicationClusterDeployer实例, 这是Flink用来启动Application的工具final ApplicationDeployer deployer =new ApplicationClusterDeployer(clusterClientServiceLoader);final ProgramOptions programOptions;final Configuration effectiveConfiguration;// No need to set a jarFile path for Pyflink job.if (ProgramOptionsUtils.isPythonEntryPoint(commandLine)) {programOptions = ProgramOptionsUtils.createPythonProgramOptions(commandLine);effectiveConfiguration =getEffectiveConfiguration(activeCommandLine,commandLine,programOptions,Collections.emptyList());} else {programOptions = new ProgramOptions(commandLine);programOptions.validate();final URI uri = PackagedProgramUtils.resolveURI(programOptions.getJarFilePath());effectiveConfiguration =getEffectiveConfiguration(activeCommandLine,commandLine,programOptions,Collections.singletonList(uri.toString()));}// 根据programOptions获取程序参数和入口类名来创建ApplicationConfiguration实例final ApplicationConfiguration applicationConfiguration =new ApplicationConfiguration(programOptions.getProgramArgs(), programOptions.getEntryPointClassName());// 最后调用deployer.run()来运行应用。这一步通常包括联系Flink集群,提交应用程序并安排其在集群中执行。deployer.run(effectiveConfiguration, applicationConfiguration);
}

上述代码的实现流程与原理如下所示:

  • 解析命令行参数:首先,调用getCommandLine函数解析传入的命令行参数args。

  • 处理帮助选项:如果命令行参数中包含帮助选项(-h/–help),则调用CliFrontendParser.printHelpForRunApplication打印帮助信息并返回。

  • 获取激活的CustomCommandLine:通过validateAndGetActiveCommandLine函数获取激活的自定义命令行(CustomCommandLine)。CustomCommandLine是Flink用来处理不同部署模式的工具(例如Yarn,Standalone等),以便于针对不同模式解析对应的特定设置和参数。

  • 部署器配置:初始化ApplicationClusterDeployer实例,这是Flink用来启动Application的工具。

  • 提取程序选项和计算有效配置:区分Python作业和其他作业,生成对应的ProgramOptions并验证其有效性。此外,根据激活的命令行、解析得到的命令行参数和程序选项计算出有效的配置(effectiveConfiguration)。

  • 构造应用配置:使用从ProgramOptions中获取的程序参数和入口点类名创建ApplicationConfiguration实例。

  • 运行应用:最后,调用deployer.run()来运行应用。这一步通常包括联系Flink集群,提交应用程序并安排其在集群中执行。

ProgramOptions.entryPointClass的成员值是flink命令行 -c 选项指定的Flink应用入口类com.xxx.xxx.FlinkApplicationDemo,后续会以反射的形式触发main()方法的执行。

(三)ClusterDescriptor.deployApplicationCluster

上面代码中deployer.run(...)方法负责加载Yarn Application模式客户端信息等。

首先代码会根据configuration配置信息来获取ClusterClientFactory对象,获取的逻辑过程是根据configuration配置中的execution.target参数来决定的。

当执行命令行bin/flink run时, execution.target参数对应的枚举值可以如下:

  • remote
  • local
  • yarn-per-job
  • yarn-session
  • kubernetes-session
    当执行命令行bin/flink run-application时,execution.target参数对应的枚举值可以如下:
  • yarn-application
  • kubernetes-application

execution.target参数为yarn-application时,Flink便会生成相应的YarnClusterClientFactory客户端工厂类,然后调用该工厂类的createClusterDescriptor方法,该方法中会新建YarnClient实例,YarnClient实例负责在客户端提交Flink应用程序,并最终生成ClusterDescriptor实例,该实例包含用于在Yarn上部署Flink集群的部署信息Descriptor。

@Override
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {checkNotNull(configuration);final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);return getClusterDescriptor(configuration);
}private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {final YarnClient yarnClient = YarnClient.createYarnClient();final YarnConfiguration yarnConfiguration =Utils.getYarnAndHadoopConfiguration(configuration);yarnClient.init(yarnConfiguration);yarnClient.start();return new YarnClusterDescriptor(configuration,yarnConfiguration,yarnClient,YarnClientYarnClusterInformationRetriever.create(yarnClient),false);
}

有了该实例后,会调用deployApplicationCluster方法来部署Application模式的Flink集群。集群将在提交应用程序时创建,并在应用程序终止时拆除。此外,应用程序用户代码的{@code main()}将在集群上执行,而不是在客户端上执行。

YarnClusterDescriptor.deployApplicationCluster(…)方法调用过程如下:
(1)、YarnClusterDescriptor.deployApplicationCluster(…);进行一些配置和检查,并调用deployInternal(…)方法。
(2)、YarnClusterDescriptor.deployInternal(…);

其中,最重要的方法是deployInternal方法

(四)YarnClusterDescriptor.deployInternal

在该方法中,首先会判断Hadoop集群是否启用了Kerberos安全认证,如果开启了,则Flink会首先确认当前用户是否拥有有效的kerberos凭证。如果无效,则会抛出异常,部署作业失败。

紧接着,进行资源检查和部署模式判断。

validateClusterResources方法中,会根据配置的JobManager和TaskManager的资源大小与集群资源进行比对。

  1. 如果JobManager的配置内存大小 < Yarn配置的最小调度分配内存(yarn.scheduler.minimum-allocation-mb参数,默认1024MB),则JobManager的内存大小会设置为该配置值。
  2. 如果JobManager大小 > YARN 集群能够提供的单个容器的最大资源,则抛出异常:The cluster does not have the requested resources for the JobManager available!
  3. 如果TaskManager大小 > YARN 集群能够提供的单个容器的最大资源,则抛出异常:The cluster does not have the requested resources for the TaskManagers available!
  4. 如果TaskManager大小 > 当前YARN集群剩余资源单个任务容器分配的最大资源容量,则会打印告警日志:The requested amount of memory for the TaskManagers is more than the largest possible YARN container: freeClusterResources.containerLimit
  5. 如果JobManager大小 > 当前YARN集群剩余资源单个任务容器分配的最大资源容量,则会打印告警日志:The requested amount of memory for the JobManager is more than the largest possible YARN container: freeClusterResources.containerLimit

经过资源检查后,会将最后确定的JobManager和TaskManager资源保存在ClusterSpecification对象中。

在部署模式决定中,Flink 提供了两种部署模式:Detach模式和Non-Detach模式。如果是 Detach模式,Flink 作业提交到YARN后,客户端可以直接退出,而作业将继续在YARN集群上运行。而在 Non-Detach模式下,客户端将持续等待作业执行完成。

然后就到了一个非常重要的方法中:startAppMaster

会根据上面决定的ClusterSpecification资源实例,启动用于管理Flink作业的Application Master。

startAppMaster方法比较长。

这段代码主要是用于启动Flink在YARN集群上的Application Master的过程,代码中包含了几个主要部分:

  1. 首先,核心的首步骤是初始化文件系统并获取对应的 FileSystem 实例。代码检查了文件系统的类型,如果是本地文件系统类型(file://开头),会抛出警告,因为Flink在YARN上运行需要分布式文件系统来存储文件。
  2. 然后,获取了用于提交应用程序的 ApplicationSubmissionContext,并将 Flink 应用所需的各种文件如jar包、配置文件等上传到HDFS,并将这些文件的HDFS路径作为本地资源 (LocalResources)添加到ApplicationSubmissionContext里。
  3. 在文件上传阶段,包括了一系列复杂的步骤,首先是将 flink 配置、job graph、用户 jar、依赖库等上传到HDFS,并将这些文件的路径添加到应用的classpath;其次,如果设置了 security options(例如,Kerberos认证信息),会将相关文件也上传到HDFS;并且,对配置了Kerberos认证的 flink 应用,会从 YARN 获取 HDFS delegation tokens。
  4. 在收集完上述一系列依赖文件后,final ContainerLaunchContext amContainer = setupApplicationMasterContainer(yarnClusterEntrypoint, hasKrb5, processSpec) 负责设置启动ApplicationMaster的命令操作。
  5. 设置ApplicationMaster的环境变量,诸如_FLINK_CLASSPATH、_FLINK_DIST_JAR(Flink jar resource location (in HDFS))、KRB5_PATH、_YARN_SITE_XML_PATH等环境变量。最后调用amContainer.setEnvironment(appMasterEnv);方法进行设置。
  6. 接着,会将上述配置好的amContainer实例放入ApplicationSubmissionContext对象中,以及ApplicationName和所需的资源大小,最终交给交给YarnClient去提交,并随后通过周期性地获取应用状态,来等待应用处于RUNNING或FINISHED状态,完成应用的提交过程。
  7. 如果在这一系列操作中有任何异常或错误发生,会触发失败保护钩子 DeploymentFailureHook,进行必要的清理工作。

上面这段代码体现了 Flink on YARN 的工作原理,Flink 通过 YARN Client 提交应用,启动 Application Master 来进行资源申请和任务调度,这是典型的 YARN 应用程序模型。各种文件(包括 flink 本身、用户 jar、配置文件等)都被上传到HDFS,然后再从HDFS分发到运行任务的 YARN 容器中,这样做是为了实现文件的分布式共享,并且利用了 YARN 的 LocalResource 机制来进行文件的分发。

对于第四点中的setupApplicationMasterContainer方法,该方法构造了ApplicationMaster的命令行启动命令,如下所示:

ContainerLaunchContext setupApplicationMasterContainer(String yarnClusterEntrypoint, boolean hasKrb5, JobManagerProcessSpec processSpec) {// ------------------ Prepare Application Master Container  ------------------------------// respect custom JVM options in the YAML fileString javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);}// krb5.conf file will be available as local resource in JM/TM containerif (hasKrb5) {javaOpts += " -Djava.security.krb5.conf=krb5.conf";}// Set up the container launch context for the application masterContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);final Map<String, String> startCommandValues = new HashMap<>();startCommandValues.put("java", "$JAVA_HOME/bin/java");String jvmHeapMem =JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);startCommandValues.put("jvmmem", jvmHeapMem);startCommandValues.put("jvmopts", javaOpts);startCommandValues.put("logging", YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));startCommandValues.put("class", yarnClusterEntrypoint);startCommandValues.put("redirects","1> "+ ApplicationConstants.LOG_DIR_EXPANSION_VAR+ "/jobmanager.out "+ "2> "+ ApplicationConstants.LOG_DIR_EXPANSION_VAR+ "/jobmanager.err");String dynamicParameterListStr =JobManagerProcessUtils.generateDynamicConfigsStr(processSpec);startCommandValues.put("args", dynamicParameterListStr);final String commandTemplate =flinkConfiguration.getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);final String amCommand =BootstrapTools.getStartCommand(commandTemplate, startCommandValues);amContainer.setCommands(Collections.singletonList(amCommand));LOG.debug("Application Master start command: " + amCommand);return amContainer;
}

启动命令的参数包括以下部分:

  • “java”:Java二进制文件的路径。一般来说,在YARN容器中,Java的路径会被设置为$JAVA_HOME/bin/java。
  • “jvmmem”:JVM参数,主要是内存参数,比如最大堆内存、最小堆内存等。这些参数会基于Flink配置以及JobManager的内存配置来生成。
  • “jvmopts”:JVM选项。这些选项来自Flink配置文件中设置的JVM选项,以及若存在Kerberos krb5.conf文件,还会添加-Djava.security.krb5.conf=krb5.conf。
  • “logging”:日志配置项,用于配置Flink的日志选项。
  • “class”:启动类,即YARN集群入口点类名(yarnClusterEntrypoint)。
  • “redirects”:输出重定向的参数,将stdout(输出流)和stderr(错误流)重定向到日志文件中。
  • “args”:传递给启动类的参数,主要是JobManager的动态配置参数。

⠀这些参数最后会填入一个启动命令模板(通常为"%java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%"),来生成实际启动Flink应用的命令。

启动后的ApplicationMaster,在YARN集群上起着以下的关键作用:

  • 作为应用程序的主控制器,管理和监视应用程序的执行。
  • 负责请求YARN ResourceManager分配所需的资源(例如容器)。
  • 启动和监视任务执行器(TaskExecutor),它们在分配的容器中运行。
  • 与Flink的client(例如命令行界面或Web界面)以及ResourceManager进行交互,提供应用程序的状态和进度信息。
  • 在应用程序出现异常或失败时,它可以选择重新请求资源并重启失败的任务,提供了一定程度的错误恢复能力。

⠀因此,Application Master是Flink在YARN上运行的关键组件,它负责管理Flink应用程序的生命周期和资源。

(五)YarnApplicationClusterEntryPoint

在上面的setupApplicationMasterContainer方法中,我们说该方法构建了ApplicationMaster的启动命令。从该命令行中可以看到,命令行的启动入口类为yarnClusterEntrypoint参数,对于Yarn Application部署模式来说,参数对应的入口类即为YarnApplicationClusterEntryPoint。在第四部分的分析中,当通过yarnClient将ApplicationMaster提交到Yarn集群后,便会申请Container来执行ApplicationMaster,执行该入口类。
为此,接下来我们来分析一下,YarnApplicationClusterEntryPoint入口类的执行逻辑。

public static void main(final String[] args) {// startup checks and loggingEnvironmentInformation.logEnvironmentInfo(LOG, YarnApplicationClusterEntryPoint.class.getSimpleName(), args);SignalHandler.register(LOG);JvmShutdownSafeguard.installAsShutdownHook(LOG);Map<String, String> env = System.getenv();// 获取工作路径final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());Preconditions.checkArgument(workingDirectory != null,"Working directory variable (%s) not set",ApplicationConstants.Environment.PWD.key());try {YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);} catch (IOException e) {LOG.warn("Could not log YARN environment information.", e);}final Configuration dynamicParameters =ClusterEntrypointUtils.parseParametersOrExit(args,new DynamicParametersConfigurationParserFactory(),YarnApplicationClusterEntryPoint.class);final Configuration configuration =YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);PackagedProgram program = null;try {// 获取用户应用程序jar,程序参数、入口类名等信息,封装为PackagedProgram实例program = getPackagedProgram(configuration);} catch (Exception e) {LOG.error("Could not create application program.", e);System.exit(1);}try {configureExecution(configuration, program);} catch (Exception e) {LOG.error("Could not apply application configuration.", e);System.exit(1);}YarnApplicationClusterEntryPoint yarnApplicationClusterEntrypoint =new YarnApplicationClusterEntryPoint(configuration, program);// 	执行Application Cluster
ClusterEntrypoint.runClusterEntrypoint(yarnApplicationClusterEntrypoint);
}

上面这段代码中,使用getPackagedProgram(configuration)方法获取用户应用程序jar,程序参数、入口类名等信息,封装为PackagedProgram实例,便于后续调用。

最后,调用runClusterEntrypoint方法,启动执行Application Cluster集群。

ClusterEntrypoint.runClusterEntrypoint(...)方法的调用链路如下:

  • ClusterEntrypoint.runClusterEntrypoint(...)
  • ClusterEntrypoint.startCluster(...)
  • ClusterEntrypoint.runCluster(...)
  • DispatcherResourceManagerComponentFactory.create(…)

DispatcherResourceManagerComponentFactory.create方法中,启动了一系列服务,比如:

  • LeaderRetrievalService
  • WebMonitorEndpoint
  • ResourceManagerService
  • DispatcherRunner

本流程中主要需要关注的服务是DispatcherRunner,该方法中,会调用dispatcherRunnerFactory.createDispatcherRunner来初始化dispatchRunner实例,dispatcherRunner实例负责dispatcher组件的高可用leader选举操作,同时dispatcher组件负责触发Flink用户应用main(…)方法执行。

在创建DispatchRunner的过程中,包含高可用Leader选举过程,经过一系列的方法链调用,会选举出一个Leader DispatchRunner服务来负责后续的处理流程。

  • DispatcherResourceManagerComponentFactory.createDispatcherRunner
  • DefaultDispatcherRunner.create()
  • DispatcherRunnerLeaderElectionLifecycleManager.createFor()
  • DefaultLeaderElectionService.start()
  • LeaderElectionDriverFactory.createLeaderElectionDriver()
  • new ZooKeeperLeaderElectionDriver
  • LeaderLatch.start()
  • LeaderLatch.internalStart()
  • LeaderLatch.reset()
  • LeaderLatch.setLeadership()
  • ZooKeeperLeaderElectionDriver.isLeader()
  • DefaultLeaderElectionService.onGrantLeadership()
  • DefaultDispatcherRunner.grantLeadership()
  • DefaultDispatcherRunner.startNewDispatcherLeaderProcess()

选举为leader的DefaultDispatcherRunner实例候选者在回调动作过程中会一直调用到上面的grantLeadership(…)方法,并在startNewDispatcherLeaderProcess(…)方法中生成dispatcherLeaderProcess,表示一个Ledaer Dispatcher进程来提供服务,并通过newDispatcherLeaderProcess::start方法来启动执行该服务的后续处理流程。Leader候选者回调动作触发过程会另起篇幅详细讲解,此处先这样理解。

在后续的处理流程中,我们需要关注的点是在何时触发用户应用程序的main方法执行,为此,继续深入以下调用链:

  • AbstractDispatcherLeaderProcess.startInternal()
  • SessionDispatcherLeaderProcess.onStart()
  • SessionDispatcherLeaderProcess.createDispatcherIfRunning()
  • SessionDispatcherLeaderProcess.createDispatcher()
  • ApplicationDispatcherGatewayServiceFactory.create()
  • new ApplicationDispatcherBootstrap(...)

上述调用链中,createDispatcher(…)方法会调用dispatcherGatewayServiceFactory.create(…)方法,dispatcherGatewayServiceFactory实际类型是ApplicationDispatcherGatewayServiceFactory。在dispatcherGatewayServiceFactory.create(…)方法中新建ApplicationDispatcherBootstrap实例。

在ApplicationDispatcherBootstrap实例中,继续通过以下方法调用链fixJobIdAndRunApplicationAsync(…) -> runApplicationAsync(…) -> runApplicationEntryPoint(…) -> ClientUtils.executeProgram(…) -> program.invokeInteractiveModeForExecution() -> callMainMethod(mainClass, args) -> mainMethod.invoke(null, (Object) args)触发Flink应用main(…)方法的执行。

  • ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync()
  • ApplicationDispatcherBootstrap.runApplicationAsync()
  • ApplicationDispatcherBootstrap.runApplicationEntryPoint()
  • ClientUtils.executeProgram()
  • PackagedProgram.invokeInteractiveModeForExecution()
  • PackagedProgram.callMainMethod()
  • mainMethod.invoke(null, (Object) args);

最终,在ApplicationDispatcherBootstrap类的实现中,我们找到了用户应用程序的main方法执行入口。

三、回顾与总结

回顾一下上面的整体流程,首先,我们通过ApplicationMaster的启动命令,找到AM组建执行的入口类为YarnApplicationClusterEntryPoint,接着,在启动集群时,我们发现Flink会初始化一些诸如LeaderRetrievalService、WebMonitorEndpoint、ResourceManagerService、DispatcherRunner的服务,这些服务分别发挥不同的用途,与Yarn和Flink集群进行交互。在本次分析过程中,我们着重探究了DispatcherRunner服务的创建流程。

首先,会执行高可用的选举流程,最终选举出一个Leader DispatcherRunner来执行服务。选举完成后,该Leader DispatchRunner会调用ClientUtils.executeProgram方法,从封装好的PackagedProgram实例中,获取用户应用程序的入口类mainClass以及程序入参,并最终利用反射触发mainClass的main方法的执行,完成用户自定义Flink应用的执行。

以上就是主要的Flink On Yarn客户端作业的提交过程解析。这个提交过程相对来说还是比较复杂的,包含着很多部署配置参数,资源以及权限的校验和分配,ApplicationMaster的提交启动,并伴随AM启动后执行的一系列Flink服务初始化,以及我们关心的用户应用程序的调用入口,发现了在Application的部署模式下,用户应用程序的调用是在集群侧,也就是Leader DispatchRunner服务中完成的。

当然,DispatchRunner服务负责的任务远不止于此,上述流程中还有更多的细节等待我们去挖掘和学习,这篇文章可能只是让我们对提交流程有了一个初步的大体认识,对于更多深入的部分,需要我们不断思考不断挖掘,也欢迎大家交流观点和看法,感谢!

相关文章:

Flink源码解析之:Flink On Yarn模式任务提交部署过程解析

Flink源码解析之&#xff1a;Flink On Yarn模式任务提交部署过程解析 一、Flink on Yarn部署模式概述 Apache Hadoop YARN 在许多数据处理框架中都很流行。 Flink 服务提交给 YARN 的 ResourceManager&#xff0c;后者会在 YARN NodeManagers 管理的机器上生成容器。 Flink 将…...

C++算法20例

1、求两个数的最大公约数 int gcd(int a, int b) { 2 return b 0 ? a : gcd(b, a % b); 3} 2、判断素数 bool isPrime(int n) {if (n < 1) return false; for (int i 2; i * i < n; i) {if (n % i 0) return false;}return true; } 3、冒泡排序 void bubbleSort…...

雷军:科技传奇的逐梦之旅

亲爱的小伙伴们&#x1f618;&#xff0c;在求知的漫漫旅途中&#xff0c;若你对深度学习的奥秘、Java 与 Python 的奇妙世界&#xff0c;亦或是读研论文的撰写攻略有所探寻&#x1f9d0;&#xff0c;那不妨给我一个小小的关注吧&#x1f970;。我会精心筹备&#xff0c;在未来…...

python版本的Selenium的下载及chrome环境搭建和简单使用

针对Python版本的Selenium下载及Chrome环境搭建和使用&#xff0c;以下将详细阐述具体步骤&#xff1a; 一、Python版本的Selenium下载 安装Python环境&#xff1a; 确保系统上已经安装了Python 3.8及以上版本。可以从[Python官方网站]下载并安装最新版本的Python&#xff0c;…...

linux tar 文件解压压缩

文件压缩和解压 tar -c: 建立压缩档案 -x&#xff1a;解压 -t&#xff1a;查看内容 -r&#xff1a;向压缩归档文件末尾追加文件 -u&#xff1a;更新原压缩包中的文件 -z&#xff1a;有gzip属性的 -j&#xff1a;有bz2属性的 -v&#xff1a;显示所有过程 -O&#xff1a;…...

Razzashi Raptor

拉扎什迅猛龙 Razzashi Raptor 2024.12.24 无论是工作、游戏&#xff0c;除了坚持&#xff0c;还需要一点运气&#xff0c;2024年跨年啦。 World of Warcraft [CLASSIC]80猎人[Grandel][祖尔格拉布][血领主曼多基尔][拉扎什迅猛龙]20241231跨年回报_哔哩哔哩bilibili_魔兽 Ra…...

Fetch处理大模型流式数据请求与解析

为什么有的大模型可以一次返回多个 data&#xff1f; Server-Sent Events (SSE)&#xff1a;允许服务器连续发送多个 data: 行&#xff0c;每个代表一个独立的数据块。 流式响应&#xff1a;大模型服务通常以流式响应方式返回数据&#xff0c;提高响应速度。 批量处理&#x…...

【网络安全实验室】脚本关实战详情

难道向上攀爬的那条路&#xff0c;不是比站在顶峰更让人热血澎湃吗 1.key又又找不到了 点击链接&#xff0c;burp抓包&#xff0c;发送到重放模块&#xff0c;点击go 得到key 2.快速口算 python3脚本 得到key 3.这个题目是空的 试了一圈最后发现是 4.怎么就是不弹出key呢…...

怎么配置每一次重启服务器后,自动启动Tocmat

前言 宝子们&#xff0c;今天来给大家详细讲讲服务器如何配置每次重启后自动启动 Tomcat&#xff0c;让你的服务器应用始终保持在线状态&#xff0c;高效运行&#xff01; windows版本 在 Windows 系统下&#xff0c;有两种常用的方法可以实现这个目标。 第一种方法是利用服…...

《机器学习》——利用OpenCV库中的KNN算法进行图像识别

文章目录 KNN算法介绍下载OpenCV库实验内容实验结果完整代码手写数字传入模型训练 KNN算法介绍 一、KNN算法的基本要素 K值的选择&#xff1a;K值代表选择与新测试样本距离最近的前K个训练样本数&#xff0c;通常K是不大于20的整数。K值的选择对算法结果有重要影响&#xff0c…...

D3.js

d3是用于数据可视化 可用于处理数据、创建图表、实现动画效果和交互功能应用场景: 数据可视&#xff1a;将复杂的数据以图表的形式展示出来&#xff0c;便于用户理解和分析。交互式图&#xff1a;支持事件处理和动画效果&#xff0c;提升用户体验。仪表盘和报&#xff1a;广泛…...

Windows onnxruntime编译openvino

理论上来说&#xff0c;可以直接访问 ONNXRuntime Releases 下载 dll 文件&#xff0c;然后从官方文档中下载缺少的头文件以直接调用&#xff0c;但我没有尝试过。 1. 下载 OpenVINO 包 从官网下载 OpenVINO 的安装包并放置在 C:\Program Files (x86) 路径下&#xff0c;例如…...

Python中的sqlite3模块:SQLite数据库接口详解

Python中的sqlite3模块&#xff1a;SQLite数据库接口详解 主要功能sqlite3.connect(database)connection.cursor()cursor.execute(sql)connection.commit()cursor.fetchall()connection.close() 使用示例执行结果总结 在Python中&#xff0c;sqlite3模块提供了一个与SQLite数据…...

Unity功能模块一对话系统(4)实现个性文本标签

本期我们将了解如何在TMPro中自定义我们的标签样式&#xff0c;并实现两种有趣的效果。 一.需求描述 1.定义<float>格式的标签&#xff0c;实现标签处延迟打印功能 2.定义<r" "></r>格式的标签&#xff0c;实现标签区间内文本片段的注释显示功能…...

Vue.js前端框架教程15:Vue父子组件之间的通信ref、emits

文章目录 1. 属性传递(Props)2. 事件监听( Emits)3. `ref` 引用4. `provide` 和 `inject`5. 插槽(Slots)在 Vue 3 中,父子组件之间的通信可以通过多种方式实现,包括属性传递、事件监听、插槽以及 ref 和 provide/inject。以下是这些通信方式的详解: 1. 属性传递(Pro…...

Python 实现 冒泡排序算法示例

冒泡排序算法示例 冒泡排序&#xff08;Bubble Sort&#xff09;是一种简单的排序算法。它重复地遍历要排序的列表&#xff0c;比较相邻的元素&#xff0c;并交换它们的位置&#xff0c;如果它们的顺序错误。这个过程会重复进行&#xff0c;直到没有需要交换的元素为止&#x…...

《机器学习》--线性回归模型详解

线性回归模型是机器学习中的一种重要算法&#xff0c;以下是对其的详细解释&#xff1a; 一、定义与原理 线性回归&#xff08;Linear Regression&#xff09;是利用数理统计中回归分析&#xff0c;来确定两种或两种以上变量间相互依赖的定量关系的一种统计分析方法。线性回归…...

Django项目部署到服务器

文章目录 django项目部署到服务器在服务器上安装Django和依赖&#xff1a;项目代码上传配置数据库收集静态文件配置Web服务器配置Gunicorn&#xff08;WSGI服务器&#xff09;启动/停止/重载systemd服务。 django项目部署到服务器 在服务器上安装Django和依赖&#xff1a; su…...

CSDN编辑器

这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…...

如何免费解锁 IPhone 网络

您是否担心 iPhone 上的网络锁定&#xff1f;如果您的 iPhone 被锁定到特定运营商&#xff0c;解锁它可以连接到不同的运营商。好吧&#xff0c;我们为您准备了一份指南。 iPhone运营商免费解锁将是小菜一碟。在我们的解锁运营商 iphone 免费指南中。我们为您提供了一份简介&am…...

Trimble天宝X9三维扫描仪为建筑外墙检测提供了全新的解决方案【沪敖3D】

随着城市化进程的快速推进&#xff0c;城市高层建筑不断增多&#xff0c;对建筑质量的要求也在不断提高。建筑外墙检测&#xff0c;如平整度和垂直度检测&#xff0c;是衡量建筑质量的重要指标之一。传统人工检测方法不仅操作繁琐、效率低下&#xff0c;还难以全面反映墙体的真…...

微服务のGeteWay

目录 概念&#xff1a; 三大核心&#xff1a; 工作流程&#xff1a; 9527网关如何做路由映射&#xff1a; GetWay高级特性&#xff1a; 按服务名动态路由服务&#xff1a; 断言Route Predicate Factories &#xff1a; 获取当前时区时间&#xff1a; After Route &…...

Golang 中 Goroutine 的调度

Golang 中 Goroutine 的调度 Golang 中的 Goroutine 是一种轻量级的线程&#xff0c;由 Go 运行时&#xff08;runtime&#xff09;自动管理。Goroutine 的调度基于 M:N 模型&#xff0c;即多个 Goroutine 可以映射到多个操作系统线程上执行。以下是详细的调度过程和策略&…...

MyBatis使用的设计模式

目录 1. 工厂模式&#xff08;Factory Pattern&#xff09; 2. 单例模式&#xff08;Singleton Pattern&#xff09; 3. 代理模式&#xff08;Proxy Pattern&#xff09; 4. 装饰器模式&#xff08;Decorator Pattern&#xff09; 5. 观察者模式&#xff08;Observer Patt…...

淺談Cocos2djs逆向

前言 簡單聊一下cocos2djs手遊的逆向&#xff0c;有任何相關想法歡迎和我討論^^ 一些概念 列出一些個人認為比較有用的概念&#xff1a; Cocos遊戲的兩大開發工具分別是CocosCreator和CocosStudio&#xff0c;區別是前者是cocos2djs專用的開發工具&#xff0c;後者則是coco…...

选择器(结构伪类选择器,伪元素选择器),PxCook软件,盒子模型

结构为类选择器 伪元素选择器 PxCook 盒子模型 (内外边距&#xff0c;边框&#xff09; 内外边距合并&#xff0c;塌陷问题 元素溢出 圆角 阴影: 模糊半径&#xff1a;越大越模糊&#xff0c;也就是越柔和 案例一&#xff1a;产品卡片 <!DOCTYPE html> <html lang&q…...

CentOS 7系统 OpenSSH和OpenSSL版本升级指南

文章目录 CentOS 7系统 OpenSSH和OpenSSL版本升级指南环境说明当前系统版本当前组件版本 现存安全漏洞升级目标版本升级准备工作OpenSSL升级步骤1. 下载和解压2. 编译安装3. 配置环境 OpenSSH升级步骤1. 下载和解压2. 编译安装3. 创建systemd服务配置4. 更新SSH配置文件5. 设置…...

使用 Comparable 和 Comparator 接口对集合排序

使用 Comparable 和 Comparator 接口对集合排序&#xff1a; 1. 使用 Comparable 接口&#xff1a; 当你希望一个类的对象能够按照某种自然顺序进行排序时&#xff0c;可以实现 Comparable 接口 并重写 compareTo() 方法。 实现步骤&#xff1a; 1.1 实现 Comparable<T&g…...

最新常见的图数据库对比,选型,架构,性能对比

图数据库排名 地址&#xff1a;https://db-engines.com/en/ranking/graphdbms 知识图谱查询语言 SPARQL、Cypher、Gremlin、PGQL 和 G-CORE 语法 / 语义 / 特性 SPARQL Cypher Gremlin PGQL G-CORE 图模式匹配查询 语法 CGP CGP CGP(无可选)1 CGP CGP 语义 子…...

混合合并两个pdf文件

混合两个pdf 1、在线免费交替和混合奇数和偶数PDF页面2、有什么软件把两个 PDF 交叉合并&#xff1f;3、pdfsam本地合并 如何Google翻译的原文和译文合并&#xff0c;&#xff08;沉浸式翻译效果相对较好&#xff09; 1、在线免费交替和混合奇数和偶数PDF页面 https://deftpd…...

OpenCV-Python实战(9)——滤波降噪

一、均值滤波器 cv2.blur() img cv2.blur(src*,ksize*,anchor*,borderType*)img&#xff1a;目标图像。 src&#xff1a;原始图像。 ksize&#xff1a;滤波核大小&#xff0c;&#xff08;width&#xff0c;height&#xff09;。 anchor&#xff1a;滤波核锚点&#xff0c…...

uniapp——微信小程序读取bin文件,解析文件的数据内容(三)

微信小程序读取bin文件内容 读取用户选择bin文件&#xff0c;并解析数据内容&#xff0c;分包发送给蓝牙设备&#xff1b; 文章目录 微信小程序读取bin文件内容读取文件读取内容返回格式 API文档&#xff1a; getFileSystemManager 关于App端读取bin文件&#xff0c;请查看&…...

Python 中常用的算法

1. 排序算法 用于将数据按特定顺序排列。 冒泡排序&#xff08;Bubble Sort&#xff09;选择排序&#xff08;Selection Sort&#xff09;插入排序&#xff08;Insertion Sort&#xff09;快速排序&#xff08;Quick Sort&#xff09;归并排序&#xff08;Merge Sort&#xf…...

xadmin后台首页增加一个导入数据按钮

xadmin后台首页增加一个导入数据按钮 效果 流程 1、在添加小组件中添加一个html页面 2、写入html代码 3、在urls.py添加导入数据路由 4、在views.py中添加响应函数html代码 <!DOCTYPE html> <html lang...

Kubernetes: NetworkPolicy 的实践应用

一、Network Policy 是什么,在云原生领域有和作用 Network Policy 是 Kubernetes 官方提出来的一种网络策略的规范&#xff0c;用户通过编写符合对应规范的规则来控制 k8s 集群内 L3 和 L4 层的网络流量。 NetworkPolicy 主要的功能就是实现在云原生领域的容器网络管控它给用…...

计算机体系结构期末复习3:GPU架构及控制流问题

目录 一、GPU设计思路 1.简化流水线、增加核数 2.单指令多线程&#xff08;SIMT&#xff09; 3.同时驻留大量线程 4.总思路&#xff1a;多线程单指令多线程 二、GPU的控制流问题 1.什么是控制流问题 2.怎么应对分支分歧 一、GPU设计思路 1.简化流水线、增加核数 2.单指…...

excel怎么删除右边无限列(亲测有效)

excel怎么删除右边无限列&#xff08;亲测有效&#xff09; 网上很多只用第1步的&#xff0c;删除了根本没用&#xff0c;还是存在&#xff0c;但是隐藏后取消隐藏却是可以的。 找到右边要删除的列的第一个空白列&#xff0c;选中整个列按“ctrlshift>(向右的小箭头)”&am…...

ChatGPT-4助力学术论文提升文章逻辑、优化句式与扩充内容等应用技巧解析。附提示词案例

目录 1.扩写&#xff08;expansion/paraphrasing&#xff09; 2.优化&#xff08;optimization&#xff09; 3.缩写&#xff08;optimization&#xff09; 4.提取关键词&#xff08;keyword extraction&#xff09; 5.短语转换&#xff08;phrase transformation&#xff…...

C++和OpenGL实现3D游戏编程【连载19】——着色器光照初步(平行光和光照贴图)(附源码)

1、本节要实现的内容 我们在前期的教程中,讨论了在即时渲染模式下的光照内容。但在我们后期使用着色器的核心模式下,会经常在着色器中使光照,我们这里就讨论一下着色器光照效果,以及光照贴图效果,同时这里知识会为后期的更多光照效果做一些铺垫。本节我们首先讨论冯氏光照…...

html+css网页制作 美食 美食网5个页面

htmlcss网页制作 美食 美食网5个页面 网页作品代码简单&#xff0c;可使用任意HTML辑软件&#xff08;如&#xff1a;Dreamweaver、HBuilder、Vscode 、Sublime 、Webstorm、Text 、Notepad 等任意html编辑软件进行运行及修改编辑等操作&#xff09;。 获取源码 1&#xff0…...

Mac 12.1安装tiger-vnc问题-routines:CRYPTO_internal:bad key length

背景&#xff1a;因为某些原因需要从本地mac连接远程linxu桌面查看一些内容&#xff0c;必须使用桌面查看&#xff0c;所以ssh无法满足&#xff0c;所以决定安装vnc客户端。 问题&#xff1a; 在mac上通过 brew install tiger-vnc命令安装, 但是报错如下&#xff1a; > D…...

遥感图像车辆检测-目标检测数据集

遥感图像车辆检测-目标检测数据集&#xff08;包括VOC格式、YOLO格式&#xff09; 数据集&#xff1a; 链接: https://pan.baidu.com/s/1XVlRTVWpXZFi6ZL_Xcs7Rg?pwdaa6g 提取码: aa6g 数据集信息介绍&#xff1a; 共有 1035 张图像和一一对应的标注文件 标注文件格式提供了…...

51c自动驾驶~合集43

我自己的原文哦~ https://blog.51cto.com/whaosoft/12930230 #ChatDyn 上交大最新ChatDyn&#xff1a;一句话操纵三维动态 理解和生成真实的三维虚拟世界是空间智能的核心。所生成的三维虚拟世界能够为自动驾驶、具身智能等AI系统提供高质量闭环仿真训练场&#xff0c;高效…...

随机变量是一个函数-如何理解

文章目录 一. 随机变量二. 随机变量是一个函数-栗子(一对一)1. 掷骰子的随机变量2. 掷骰子的随机变量&#xff08;求点数平方&#xff09;3. 抛硬币的随机变量4. 学生考试得分的随机变量 三. 随机变量是一个函数-理解(多对一) 一. 随机变量 随机变量就是定义在样本空间上的函数…...

云计算在医疗行业的应用

云计算在医疗行业的应用广泛而深入&#xff0c;为医疗服务带来了前所未有的变革。以下是对云计算在医疗行业应用的详细解析&#xff1a; ### 一、医疗数据共享与整合 云计算平台具有强大的数据存储和处理能力&#xff0c;使得医疗数据共享与整合成为可能。通过云计算平台&…...

Cursor提示词

你是一位经验丰富的项目经理&#xff0c;对于用户每一次提出的问题&#xff0c;都不急于编写代码&#xff0c;更多是通过深思熱虑、结构化的推理以产生高质量的回答&#xff0c;探索更多的可能方案&#xff0c;并从中寻找最佳方案。 约束 代码必须可以通过编译回答尽量使用中…...

C++ 设计模式:单例模式(Singleton Pattern)

链接&#xff1a;C 设计模式 链接&#xff1a;C 设计模式 - 享元模式 单例模式&#xff08;Singleton Pattern&#xff09;是创建型设计模式&#xff0c;它确保一个类只有一个实例&#xff0c;并提供一个全局访问点来访问这个实例。单例模式在需要全局共享资源或控制实例数量的…...

C++中生成0到180之间的随机数

在C中生成0到180之间的随机数&#xff0c;可以使用标准库中的和头文件。提供了rand()函数来生成随机数&#xff0c;而提供了time()函数来设置随机数生成的种子。这样每次运行程序时&#xff0c;生成的随机数序列都会不同。 以下是一个简单的示例代码&#xff0c;展示了如何生成…...

[.闲于修.]Autosar_UDS_笔记篇_ISO14229-1

前言&#xff1a;闲来无事&#xff0c;摸鱼无趣&#xff0c;准备细读一下14229&#xff0c;记录一些容易被忽略掉的内容 正文&#xff1a;&#xff08;以下数字代表章节&#xff09; 7、Application layer protocol 7.5.6 多个并发请求消息 常见的服务器实现在服务器中只有一…...

如何利用云计算进行灾难恢复?

云计算环境下的灾难恢复实践指南 天有不测风云&#xff0c;企业的IT系统也一样&#xff0c;我见过太多因为没有做好灾备而吃大亏的案例。今天就和大家聊聊如何用云计算来做灾难恢复。 一个惊心动魄的真实案例&#xff1a;某电商平台的主数据中心因为市政施工不小心挖断了光纤…...