当前位置: 首页 > news >正文

openEuler24.03 LTS下安装Flink

目录

    • Flink的安装模式
    • 下载Flink
    • 安装Local模式
      • 前提条件
      • 解压安装包
      • 启动集群
      • 查看进程
      • 提交作业
        • 文件WordCount
        • 持续流WordCount
      • 查看Web UI
        • 配置flink-conf.yaml
        • 简单使用
      • 关闭集群
    • Standalone Session模式
      • 前提条件
      • Flink集群规划
      • 解压安装包
      • 配置flink
        • 配置flink-conf.yaml
        • 配置workers
        • 配置masters
      • 分发到其他机器
        • 分发安装目录
        • 修改node3和node4的配置
      • 启动flink集群
      • 查看进程
      • Web UI
      • 提交应用测试
        • 文件WordCount
        • 持续流WordCount
      • 关闭flink集群
    • YARN模式
      • 前提条件
      • 解压安装包
      • 配置环境变量
      • 启动Hadoop集群
      • 提交应用测试
        • 会话模式
        • 单作业模式
        • 应用模式
      • 关闭Hadoop集群

Flink的安装模式

(1)Local模式: 所有Flink 组件(JobManager、TaskManager)都会在同一个 JVM 进程中运行,用于本地开发调试。

(2)Standalone Session模式:独立集群部署,需预先启动Flink进程,测试环境或小规模生产场景‌。

(3)Yarn模式:基于Hadoop YARN资源管理器,支持动态资源分配和自动容错‌,适合与Hadoop生态整合(如HDFS、YARN)的大规模生产环境‌。提供三种子模式:

  • **会话模式(Session Mode)**‌:创建并长期保持一个JobManager,当需要有作业提交,则动态创建TaskManager,作业完成回收TaskManager资源,多个作业共享一个JobManager集群资源。
  • **单作业模式(Per-Job Mode)**‌:每个作业单独启动一个集群,按作业隔离资源,避免资源竞争‌,作业完成后回收集群资源,应用代码在客户端节点运行。
  • **应用模式(Application Mode)**‌:与单作业模式类似,也是一个作业单独启动一个集群,作业完成后回收集群资源,但将应用代码提交至JobManager执行,减少客户端负载‌。

(4)K8S模式:基于容器化技术,支持自动扩缩容和弹性伸缩‌,每个JobManager/TaskManager以Pod形式运行,资源隔离性强‌,适合云原生环境‌,被认为是未来主流部署方式。

下载Flink

下载Flink,并上传到Linux /opt/software目录

# 国内下载
https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz# 或者官网下载
https://archive.apache.org/dist/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz

安装Local模式

在node2机器上进行,解压后无需任何配置即可直接使用。

前提条件

  • 有1台Linux机器,且安装好jdk,这里是jdk8,可参考:openEuler24.03 LTS下安装Hadoop3完全分布式-安装Java

解压安装包

解压并重命名

[liang@node2 spark-yarn]$ cd /opt/software/
[liang@node2 software]$ ls | grep flink
flink-1.17.2-bin-scala_2.12.tgz
[liang@node2 software]$ tar -zxvf flink-1.17.2-bin-scala_2.12.tgz -C /opt/module
[liang@node2 software]$ cd /opt/module
[liang@node2 software]$ mv flink-1.17.2 flink-local

启动集群

[liang@node2 software]$ cd /opt/module/flink-local
[liang@node2 flink-local]$ ls
bin  conf  examples  lib  LICENSE  licenses  log  NOTICE  opt  plugins  README.txt
[liang@node2 flink-local]$ ls bin/
bash-java-utils.jar  kubernetes-jobmanager.sh   start-zookeeper-quorum.sh
config.sh            kubernetes-session.sh      stop-cluster.sh
find-flink-home.sh   kubernetes-taskmanager.sh  stop-zookeeper-quorum.sh
flink                pyflink-shell.sh           taskmanager.sh
flink-console.sh     sql-client.sh              yarn-session.sh
flink-daemon.sh      sql-gateway.sh             zookeeper.sh
historyserver.sh     standalone-job.sh
jobmanager.sh        start-cluster.sh
[liang@node2 flink-local]$ bin/start-cluster.sh

查看进程

[liang@node2 flink-local]$ jps
7717 StandaloneSessionClusterEntrypoint
8006 TaskManagerRunner
8121 Jps

提交作业

文件WordCount
$ bin/flink run examples/streaming/WordCount.jar

输出过程

[liang@node2 flink-local]$ bin/flink run examples/streaming/WordCount.jar
Executing example with default input data.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID b4f0874e1e377c83fc830baf1767413a
Program execution finished
Job with JobID b4f0874e1e377c83fc830baf1767413a has finished.
Job Runtime: 886 ms

查看结果

查看输出的WordCount结果的末尾10行数据

[liang@node2 flink-local]$ ls log/
flink-liang-client-node2.log
flink-liang-standalonesession-0-node2.log
flink-liang-standalonesession-0-node2.log.1
flink-liang-standalonesession-0-node2.out
flink-liang-taskexecutor-0-node2.log
flink-liang-taskexecutor-0-node2.out
[liang@node2 flink-local]$ tail log/flink-*-taskexecutor-*.out

执行过程

[liang@node2 flink-local]$ tail log/flink-*-taskexecutor-*.out
(nymph,1)
(in,3)
(thy,1)
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)
持续流WordCount

使用 SocketWindowWordCount 示例实时接收 Socket 输入

终端1:发送数据终端

$ nc -lk 9999 

终端2:打开新的终端,提交作业

$ cd /opt/module/flink-local
$ bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname localhost --port 9999

在发送数据终端发送数据

在这里插入图片描述

查看结果

打开新的终端,命令行查看结果,每行计算一次结果

[liang@node2 ~]$ cd /opt/module/flink-local
[liang@node2 flink-local]$ tail -f log/flink-liang-taskexecutor-0-node2.out

查看Web UI

local模式仅支持命令行查看,不支持浏览器访问。

[liang@node2 flink-local]$ curl localhost:8081

在这里插入图片描述

浏览器查看

node2:8081

在这里插入图片描述

要能查看到Web UI还需要配置Flink。

配置flink-conf.yaml
[liang@node2 flink-local]$ cd conf
[liang@node2 conf]$ vim flink-conf.yaml

找到相关配置项并修改,如下

jobmanager.rpc.address: node2
jobmanager.bind-host: 0.0.0.0
taskmanager.bind-host: 0.0.0.0
taskmanager.host: node2
rest.address: node2
rest.bind-address: 0.0.0.0

这样配置支持Web UI访问,也支持远程提交作业。

注意:配置flink-conf.yaml后,local模式就变成Flink伪分布了。

特性Local模式‌**伪分布模式(Standalone模式)**‌
适用场景单机本地快速测试,验证代码逻辑或简单性能‌单机模拟集群环境,用于开发调试或学习多组件交互(如JobManager与TaskManager)‌
资源管理无需启动独立进程,由本地JVM线程模拟所有组件(JobManager + TaskManager)‌需启动独立进程:JobManager(主节点)和至少1个TaskManager(工作节点)‌
Web UI默认不启用Web界面‌提供Web UI(默认端口8081),可监控任务状态、日志和资源配置‌
网络配置仅本地回环地址(127.0.0.1),无法远程访问‌可绑定真实IP地址(如0.0.0.0),支持远程提交作业和访问Web UI‌
容错性无高可用机制,进程崩溃则任务终止‌支持高可用配置(需依赖ZooKeeper),但单机伪分布下通常不启用‌

重启集群

[liang@node2 conf]$ cd ..
[liang@node2 flink-local]$ ls bin/
bash-java-utils.jar  kubernetes-jobmanager.sh   start-zookeeper-quorum.sh
config.sh            kubernetes-session.sh      stop-cluster.sh
find-flink-home.sh   kubernetes-taskmanager.sh  stop-zookeeper-quorum.sh
flink                pyflink-shell.sh           taskmanager.sh
flink-console.sh     sql-client.sh              yarn-session.sh
flink-daemon.sh      sql-gateway.sh             zookeeper.sh
historyserver.sh     standalone-job.sh
jobmanager.sh        start-cluster.sh
[liang@node2 flink-local]$ bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 8006) on host node2.
Stopping standalonesession daemon (pid: 7717) on host node2.
[liang@node2 flink-local]$
[liang@node2 flink-local]$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node2.
Starting taskexecutor daemon on host node2.
[liang@node2 flink-local]$ jps
9220 StandaloneSessionClusterEntrypoint
9509 TaskManagerRunner
9595 Jps

浏览器访问

node2:8081

在这里插入图片描述

简单使用

文件的WordCount

测试提交WordCount作业

$ bin/flink run examples/streaming/WordCount.jar

执行过程

[liang@node2 flink-local]$ bin/flink run examples/streaming/WordCount.jar
Executing example with default input data.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 3f939fb4b8d4c2dab4d47d7b5f4cc389
Program execution finished
Job with JobID 3f939fb4b8d4c2dab4d47d7b5f4cc389 has finished.
Job Runtime: 913 ms

查看输出的WordCount结果的末尾10行数据

$ tail log/flink-*-taskexecutor-*.out

执行过程

[liang@node2 flink-local]$ tail log/flink-*-taskexecutor-*.out
(nymph,1)
(in,3)
(thy,1)
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)

查看Web UI

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

输入数据有限,所以计算会完成。

持续流WordCount

使用 SocketWindowWordCount 示例实时接收 Socket 输入

终端1:发送数据终端

$ nc -lk 9999 

终端2:打开新的终端,提交作业

$ cd /opt/module/flink-local
$ bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname localhost --port 9999

在发送数据终端发送数据

在这里插入图片描述

提交作业终端

在这里插入图片描述

打开新的终端,命令行查看结果,每行计算一次结果

[liang@node2 ~]$ cd /opt/module/flink-local/
[liang@node2 flink-local]$ ls
bin  conf  examples  lib  LICENSE  licenses  log  NOTICE  opt  plugins  README.txt
[liang@node2 flink-local]$ ls log/
flink-liang-client-node2.log                 flink-liang-taskexecutor-0-node2.log
flink-liang-standalonesession-0-node2.log    flink-liang-taskexecutor-0-node2.log.1
flink-liang-standalonesession-0-node2.log.1  flink-liang-taskexecutor-0-node2.log.2
flink-liang-standalonesession-0-node2.log.2  flink-liang-taskexecutor-0-node2.out
flink-liang-standalonesession-0-node2.out
[liang@node2 flink-local]$ tail -f log/flink-liang-taskexecutor-0-node2.out
hello : 1
flink : 1
hello : 1
hadoop : 1

在这里插入图片描述

Web UI查看结果

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

关闭集群

关闭集群

$ bin/stop-cluster.sh

jps查看进程

$ jps

执行过程

[liang@node2 flink-local]$ bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 9509) on host node2.
Stopping standalonesession daemon (pid: 9220) on host node2.
[liang@node2 flink-local]$ jps
10579 Jps

Standalone Session模式

安装Flink Standalone 完全分布集群

前提条件

有三台Linux机器

三台机器均安装好jdk,这里使用jdk8,可参考:openEuler24.03 LTS下安装Hadoop3完全分布式-安装Java

Flink集群规划

node2node3node4
JobManager
TaskManagerTaskManagerTaskManager

解压安装包

解压并重命名

[liang@node2 software]$ tar -zxvf flink-1.17.2-bin-scala_2.12.tgz -C /opt/module
[liang@node2 software]$ cd /opt/module
[liang@node2 module]$ mv flink-1.17.2 flink-standalone

进入到解压后的目录,查看包含的文件

[liang@node2 module]$ cd flink-standalone
[liang@node2 flink-standalone]$ ls
bin  conf  examples  lib  LICENSE  licenses  log  NOTICE  opt  plugins  README.txt

配置flink

进入flink配置目录,查看配置文件

[liang@node2 ~]$ cd conf
[liang@node2 conf]$ ls
flink-conf.yaml           log4j.properties          logback-session.xml  workers
log4j-cli.properties      log4j-session.properties  logback.xml          zoo.cfg
log4j-console.properties  logback-console.xml       masters
配置flink-conf.yaml
[liang@node2 conf]$ vim flink-conf.yaml

找到相关配置项并修改,如下

jobmanager.rpc.address: node2
jobmanager.bind-host: 0.0.0.0
taskmanager.bind-host: 0.0.0.0
taskmanager.host: node2
rest.address: node2
rest.bind-address: 0.0.0.0
配置workers
[liang@node2 conf]$ vim workers

把原有内容删除,添加内容如下:

node2
node3
node4
配置masters
[liang@node2 conf]$ vim masters 

修改后内容如下:

node2:8081

分发到其他机器

分发安装目录
[liang@node2 conf]$ xsync /opt/module/flink-standalone
修改node3和node4的配置

分别修改node3、node4机器的flink-conf.yaml文件,将taskmanager.host的值修改为所在机器的主机名。

node3机器

进入node3机器flink的配置目录

[liang@node3 ~]$ cd /opt/module/flink-standalone/conf/

配置flinke-conf.yaml文件

[liang@node3 conf]$ vim flink-conf.yaml

taskmanager.host的值修改为node3

taskmanager.host: node3

node4机器

进入node4机器flink的配置目录

[liang@node4 ~]$ cd /opt/module/flink-standalone/conf/

配置flinke-conf.yaml文件

[liang@node4 conf]$ vim flink-conf.yaml

taskmanager.host的值修改为node4

taskmanager.host: node4

启动flink集群

在node2机器,执行启动集群命令

[liang@node2 conf]$ cd ..
[liang@node2 flink-standalone]$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host node2.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
Starting taskexecutor daemon on host node4.

查看进程

jps查看进程

[liang@node2 flink-standalone]$ jpsall
=============== node2 ===============
10116 Jps
9557 StandaloneSessionClusterEntrypoint
9977 TaskManagerRunner
=============== node3 ===============
2342 TaskManagerRunner
2431 Jps
=============== node4 ===============
2340 TaskManagerRunner
2430 Jps

注意:如果没有jpsall命令,就分别在node2、node3、node4执行jps命令

node2有StandaloneSessionClusterEntrypointTaskManagerRunner进程

node3有TaskManagerRunner进程

node4有TaskManagerRunner进程

看到如上进程,说明Flink Standalone集群配置成功。

Web UI

浏览器主机名:8081访问

node2:8081

在这里插入图片描述

提交应用测试

文件WordCount
[liang@node2 flink-standalone]$ bin/flink run examples/streaming/WordCount.jar
Executing example with default input data.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID ee8f5d8b493be0ad1463a072b11c4586
Program execution finished
Job with JobID ee8f5d8b493be0ad1463a072b11c4586 has finished.
Job Runtime: 1080 ms

查看结果有两种方式:

  • 命令行查看
  • Web UI查看

命令行查看结果

查看输出的wordcount结果的末尾10行数据

[liang@node2 flink-standalone]$ tail log/flink-*-taskexecutor-*.out
(nymph,1)
(in,3)
(thy,1)
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)

Web UI查看作业结果

node2:8081

查看作业

在这里插入图片描述

查看作业结果

点击查看TaskManagers,点击其中一个TaskManager机器,点击Stdout能查看到作业结果

在这里插入图片描述

点击node2:38277-0fc879,点击Stdout

在这里插入图片描述

注意:如果node2查看不到,可以返回点击其他机器查看。

持续流WordCount

使用 SocketWindowWordCount 示例实时接收 Socket 输入

发送数据终端

$ nc -lk 9999 

打开新的终端,提交作业

$ cd /opt/module/flink-standalone
$ bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname node2 --port 9999

在发送数据终端发送数据

hello flink standalone mode
hello world

在这里插入图片描述

Web UI查看作业结果

查看作业

在这里插入图片描述

查看结果

在这里插入图片描述

点击node3:38219-8077d2,点击stdout

在这里插入图片描述

在node3查看到结果,说明是使用node3的Task Manager进行计算的。

关闭flink集群

在node2机器,执行关闭集群命令

[liang@node2 flink-standalone]$ bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 9977) on host node2.
Stopping taskexecutor daemon (pid: 2342) on host node3.
Stopping taskexecutor daemon (pid: 2340) on host node4.
Stopping standalonesession daemon (pid: 9557) on host node2.

jps查看进程

[liang@node2 flink-standalone]$ jpsall
=============== node2 ===============
11575 Jps
=============== node3 ===============
2935 Jps
=============== node4 ===============
2914 Jps

YARN模式

Flink作为客户端,把作业提交到Yarn中计算

只需在node2机器安装Flink

前提条件

  • 安装好Hadoop完全分布式集群,可参考:openEuler24.03 LTS下安装Hadoop3完全分布式

解压安装包

解压并重命名

[liang@node2 ~]$ cd /opt/software
[liang@node2 software]$ tar -zxvf flink-1.17.2-bin-scala_2.12.tgz -C /opt/module
[liang@node2 software]$ cd /opt/module
[liang@node2 module]$ mv flink-1.17.2 flink-yarn

配置环境变量

[liang@node2 module]$ sudo vim /etc/profile.d/my_env.sh

添加如下环境变量,让Flink能找到Hadoop

#FLINK YARN MODE NEED
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

让环境变量生效

[liang@node2 module]$ source /etc/profile

分发环境变量

[liang@node2 module]$ sudo /home/liang/bin/xsync /etc/profile.d/my_env.sh

根据提示输入node2机器root账号的密码

分别在node3、node4让环境变量生效

[liang@node3 conf]$ source /etc/profile
[liang@node4 conf]$ source /etc/profile

启动Hadoop集群

$ hdp.sh start

提交应用测试

会话模式

开启一个YARN会话来启动一个Flink session集群

[liang@node2 module]$ cd flink-yarn
[liang@node2 flink-yarn]$ bin/yarn-session.sh -nm test

输入部分日志如下

在这里插入图片描述

可以看到JobManager的Web UI地址为

http://node3:35665

注意:端口号是随机的

浏览器访问看到Web UI地址

在这里插入图片描述

说明在Yarn的Container中创建了一个Flink Session集群

提交应用集群

提交应用有两种方式:

  • 命令行方式
  • Web UI方式

命令行方式

打开新的终端

$ cd /opt/module/flink-yarn
$ bin/flink run examples/streaming/WordCount.jar

客户端可以自行确定JobManager的地址,也可以通过-m或者-jobmanager参数指定JobManager的地址,JobManager地址在YARN Session启动页面中找到。

Web UI查看作业

node3:35665

在这里插入图片描述

查看结果

在这里插入图片描述

查看Task Managers没有任何信息,原因是因为运行完成后,为了节约资源,释放了Task Manager

要看到结果,重新执行一次作业,快速地查看Web UI 的Task Managers,看到如下结果。

在这里插入图片描述

Web UI方式

准备jar包

下载Flink自带的example jar包

打开新的终端,执行nc命令发送数据

[liang@node3 module]$ nc -lk 9999

浏览器访问,基于Web UI提交应用jar包

node3:35665

在这里插入图片描述

选择SocketWjarindowWordCount.jar后,点击如下界面jar包名称设置作业

在这里插入图片描述

填写如下参数,点击Submit提交

--hostname node3 --port 9999

在这里插入图片描述

查看到正在运行的作业

在这里插入图片描述

在nc终端,发送数据,例如:

hello flink hello world

在这里插入图片描述

返回Web UI查看结果

点击Task Managers,点击container_1742653773403_0001_01_000009

在这里插入图片描述

在这里插入图片描述

取消作业

点击作业名称

在这里插入图片描述

点击Cancel Job,点击Yes

在这里插入图片描述

在这里插入图片描述

计算文件作业

重新提交另外一个jar:WordCount.jar

在这里插入图片描述

不填写任何参数,直接点击Submit

在这里插入图片描述

查看结果

在这里插入图片描述

结束Yarn Session会话

返回执行Yarn Session命令的终端,按Ctrl + c 返回命令行

在这里插入图片描述

Yarn会话模式总结:

Yarn 会话模式,通过yarn-session.sh命令在Yarn Container创建一个JobManager,并保持这个JobManager。当有作业提交时,会动态创建TaskManager,当作业完后后,会回收TaskManager。

单作业模式

Yarn模式下,需要使用HDFS文件

创建输入测试文件

[liang@node2 flink-yarn]$ ls
bin  conf  examples  lib  LICENSE  licenses  log  NOTICE  opt  plugins  README.txt
[liang@node2 flink-yarn]$ vim input.txt

内容如下

hello yarn mode
hello flink
hello hadoop

将input.txt上传到HDFS

$ hdfs dfs -put input.txt /

提交作业

使用单作业模式提交作业

$ bin/flink run -d -t yarn-per-job -Dclassloader.check-leaked-classloader=false examples/streaming/WordCount.jar \
--input hdfs://node2:8020/input.txt \
--output hdfs://node2:8020/output

注意:yarn模式下,输入数据都为HDFS的路径,output是一个输出目录,执行命令之前,输出目录不能存在。

查看结果

执行命令后,等待运行作业结束,查看输出目录

[liang@node2 flink-yarn]$ hdfs dfs -ls /
Found 8 items
-rw-r--r--   1 liang supergroup         25 2025-03-18 23:17 /1.txt
drwxr-xr-x   - liang supergroup          0 2025-03-21 23:21 /hbase
-rw-r--r--   1 liang supergroup         41 2025-03-22 23:54 /input.txt
drwxr-xr-x   - liang supergroup          0 2025-03-18 23:18 /out
drwxr-xr-x   - liang supergroup          0 2025-03-23 00:07 /output
drwxr-xr-x   - liang supergroup          0 2025-03-22 18:31 /spark-evenlog-directory
drwx------   - liang supergroup          0 2025-03-19 15:16 /tmp
drwxr-xr-x   - liang supergroup          0 2025-03-19 15:18 /user
[liang@node2 flink-yarn]$ hdfs dfs -ls /output
Found 1 items
drwxr-xr-x   - liang supergroup          0 2025-03-23 00:07 /output/2025-03-23--00
[liang@node2 flink-yarn]$ hdfs dfs -ls /output/2025-03-23--00
Found 1 items
-rw-r--r--   1 liang supergroup         69 2025-03-23 00:07 /output/2025-03-23--00/part-0d077c9e-7a99-4eca-8c91-7c21ffaa1cec-0
[liang@node2 flink-yarn]$ hdfs dfs -cat /output/2025-03-23--00/part-0d077c9e-7a99-4eca-8c91-7c21ffaa1cec-0
(hello,1)
(yarn,1)
(mode,1)
(hello,2)
(flink,1)
(hello,3)
(hadoop,1)
应用模式

文件Wordcount

执行如下命令

$ bin/flink run-application -t yarn-application -d examples/streaming/WordCount.jar \
--input hdfs://node2:8020/input.txt \
--output hdfs://node2:8020/output1 

在这里插入图片描述

等待运行结束,查看结果

[liang@node2 flink-yarn]$ hdfs dfs -ls /
Found 9 items
-rw-r--r--   1 liang supergroup         25 2025-03-18 23:17 /1.txt
drwxr-xr-x   - liang supergroup          0 2025-03-21 23:21 /hbase
-rw-r--r--   1 liang supergroup         41 2025-03-22 23:54 /input.txt
drwxr-xr-x   - liang supergroup          0 2025-03-18 23:18 /out
drwxr-xr-x   - liang supergroup          0 2025-03-23 00:07 /output
drwxr-xr-x   - liang supergroup          0 2025-03-23 00:12 /output1
drwxr-xr-x   - liang supergroup          0 2025-03-22 18:31 /spark-evenlog-directory
drwx------   - liang supergroup          0 2025-03-19 15:16 /tmp
drwxr-xr-x   - liang supergroup          0 2025-03-19 15:18 /user
[liang@node2 flink-yarn]$ hdfs dfs -ls /output1
Found 1 items
drwxr-xr-x   - liang supergroup          0 2025-03-23 00:12 /output1/2025-03-23--00
[liang@node2 flink-yarn]$ hdfs dfs -ls /output1/2025-03-23--00
Found 1 items
-rw-r--r--   1 liang supergroup         69 2025-03-23 00:12 /output1/2025-03-23--00/part-01662fc4-01e6-45a4-858c-5443fc2b31f0-0
[liang@node2 flink-yarn]$ hdfs dfs -cat /output1/2025-03-23--00/*
(hello,1)
(yarn,1)
(mode,1)
(hello,2)
(flink,1)
(hello,3)
(hadoop,1)

持续流WordCount

终端1:在node2 发送数据终端

[liang@node2 flink-yarn]$ nc -lk 9999 

终端2:打开新的终端,提交作业

$ cd /opt/module/flink-yarn
$ bin/flink run-application -t yarn-application -d examples/streaming/SocketWindowWordCount.jar --hostname node2 --port 9999

在这里插入图片描述

看到Web UI地址为

node3:37105 

浏览器访问以上查到的Web UI地址

在这里插入图片描述

返回nc终端发送数据

hello flink hello yarn

在这里插入图片描述

在Web UI查看结果

在这里插入图片描述

在这里插入图片描述

结束任务

在nc终端,按Ctrl + c 结束任务

关闭Hadoop集群

$ hdp.sh stop

如有需要,可点击查看:配套视频教程

完成!enjoy it!

相关文章:

openEuler24.03 LTS下安装Flink

目录 Flink的安装模式下载Flink安装Local模式前提条件解压安装包启动集群查看进程提交作业文件WordCount持续流WordCount 查看Web UI配置flink-conf.yaml简单使用 关闭集群 Standalone Session模式前提条件Flink集群规划解压安装包配置flink配置flink-conf.yaml配置workers配置…...

Redis 持久化

一、持久化 redis 虽然是个内存数据库,但是 redis 支持RDB 和 AOF 两种持久化机制, 将数据写往磁盘,可以有效的避免因进程退出造成的数据丢失问题,当下次重启时利用之前持久化的文件即可实现数据恢复。 二、Redis 支持RDB 和 AOF …...

塔能科技:智能路灯物联运维产业发展现状与趋势分析

随着智慧城市建设的推进,智能路灯物联运维产业正经历快速发展,市场规模持续扩大。文章探讨了智能路灯物联运维的技术体系、市场机遇和挑战,并预测了未来发展趋势,为行业发展提供参考。 关键词 智能路灯;物联运维&#…...

前端知识点---闭包(javascript)

文章目录 1.怎么理解闭包?2.闭包的特点3.闭包的作用?4 闭包注意事项&#xff1a;5 形象理解 1.怎么理解闭包? 函数里面包着另一个函数&#xff0c;并且内部函数可以访问外部函数的变量。 <script>function outer(){let count 0return functioninner (){countconsole.l…...

单次 CMS Old GC 耗时长问题分析与优化

目录 一、现象说明 二、CMS GC 机制简述 三、可能导致长时间停顿的原因详细分析 &#xff08;一&#xff09;Full GC&#xff08;完全垃圾回收&#xff09; 1. 主要原因 2.参数调整 &#xff08;二&#xff09;Promotion Failure&#xff08;晋升失败&#xff09; 1. 主…...

Python星球日记 - 第16天:爬虫基础(仅学习使用)

&#x1f31f;引言&#xff1a; 上一篇&#xff1a;Python星球日记 - 第15天&#xff1a;综合复习&#xff08;回顾前14天所学知识&#xff09; 名人说&#xff1a;不要人夸颜色好&#xff0c;只留清气满乾坤&#xff08;王冕《墨梅》&#xff09; 创作者&#xff1a;Code_流苏…...

【回眸】Linux 内核 (十四)进程间通讯 之 信号量

前言 信号量概念 信号量常用API 1.创建/获取一个信号量 2.改变信号量的值 3. 控制信号量 信号量函数调用 运行结果展示 前言 上一篇文章介绍的共享内存有局限性,如:同步与互斥问题、内存管理复杂性问题、数据结构限制问题、可移植性差问题、调试困难问题。本篇博文介…...

Python 字典和集合(字典的变种)

本章内容的大纲如下&#xff1a; 常见的字典方法 如何处理查找不到的键 标准库中 dict 类型的变种set 和 frozenset 类型 散列表的工作原理 散列表带来的潜在影响&#xff08;什么样的数据类型可作为键、不可预知的 顺序&#xff0c;等等&#xff09; 字典的变种 这一节总结了…...

LeetCode】寻找重复子树:深度解析与高效解法

&#x1f4d6; 问题描述 给定一棵二叉树的根节点 root &#xff0c;返回所有重复的子树。若两棵树结构相同且节点值相同&#xff0c;则认为它们是重复的。对于同类重复子树&#xff0c;只需返回其中任意一棵的根节点。 &#x1f330; 示例解析 示例1 输入&#xff1a; 1/ …...

[蓝桥杯] 挖矿(CC++双语版)

题目链接 P10904 [蓝桥杯 2024 省 C] 挖矿 - 洛谷 题目理解 我们可以将这道题中矿洞的位置理解成为一个坐标轴&#xff0c;以题目样例绘出坐标轴&#xff1a; 样例&#xff1a; 输入的5为矿洞数量&#xff0c;4为可走的步数。第二行输入是5个矿洞的坐标。输出结果为在要求步数…...

Appium如何实现移动端UI自动化测试?

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 Appium是一个开源跨平台移动应用自动化测试框架。 既然只是想学习下Appium如何入门&#xff0c;那么我们就直奔主题。文章结构如下&#xff1a; 为什么要使用A…...

在集合中哪些可以为null,哪些不能为null;Java 集合中 null 值允许情况总结与记忆技巧

Java 集合中 null 值允许情况总结与记忆技巧 一、核心集合对 null 的支持情况 集合类型Key 是否可为 nullValue 是否可为 null原因/备注HashMap✅ 是✅ 是对 null key 有特殊处理&#xff08;存放在数组第 0 个位置&#xff09;LinkedHashMap✅ 是✅ 是继承自 HashMapTreeMap…...

Python 并发编程指南:协程 vs 多线程及其他模型比较

Python 并发编程指南&#xff1a;协程 vs 多线程及其他模型比较 并发编程是指在单个程序中同时处理多个任务的能力&#xff0c;这些任务可以交替进行&#xff08;同一时刻并不一定真的同时运行&#xff09;&#xff0c;而并行则强调在同一时刻真正同时运行多个任务&#xff08…...

WPS JS宏编程教程(从基础到进阶)-- 第五部分:JS数组与WPS结合应用

目录 摘要第5章 JS数组与WPS结合应用5-1 JS数组的核心特性核心特性解析5-2 数组的两种创建方式(字面量与扩展操作符)1. 字面量创建2. 扩展操作符创建5-3 数组创建应用:提取字符串中的数字需求说明代码实现5-4 用函数创建数组(new Array、Array.of、Array.from)1. new Arra…...

STM32定时器完全指南:从基础原理到高级应用 | 零基础入门STM32第九十六步

主题内容教学目的/扩展视频TIM定时器重点课程定时器&#xff0c;捕获器&#xff0c;比较器&#xff0c;PWM&#xff0c;单脉冲。高级TIM。定时器中断。了解TIM使用 师从洋桃电子&#xff0c;杜洋老师 &#x1f4d1;文章目录 一、定时器核心原理1.1 硬件架构解析1.2 核心参数公式…...

Kafka分区机制详解:原理、策略与应用

#作者&#xff1a;张桐瑞 文章目录 一、分区的作用二、分区策略&#xff08;一&#xff09;轮询策略&#xff08;二&#xff09;随机策略&#xff08;三&#xff09;按消息键保序策略 三、实际案例&#xff1a;消息顺序问题的解决四、其他分区策略&#xff1a;基于地理位置的分…...

最小K个数

文章目录 题意思路代码 题意 题目链接 思路 代码 class Solution { public:vector<int> smallestK(vector<int>& arr, int k) {priority_queue<int> Q;for (auto &index:arr){Q.push(index);if (Q.size() > k)Q.pop();}vector<int> ans…...

【STL】list介绍(附与vector的比较)

文章目录 1.关于list2.使用2.1 list的构造2.2 list 迭代器的使用2.3 list 容量操作2.3.1 size()2.3.2 empty()2.3.3 resize() 2.4 list 元素访问2.4.1 front()2.4.2 back() 2.5 list 修改操作2.5.1 push_front()2.5.2 pop_front()2.5.3 push_back()2.5.4 pop_back()2.5.5 inser…...

音视频生命探测仪,救援现场的“视听先锋”|鼎跃安全

地震等自然灾害的突发性和破坏性对人类生命构成严重威胁。据统计&#xff0c;地震后的“黄金72小时”内&#xff0c;被困者的存活率随时间的推移急剧下降&#xff0c;因此快速、精准的搜救技术至关重要。 传统搜救手段依赖人耳识别呼救声或手动挖掘&#xff0c;效率低且易造成二…...

Arch视频播放CPU占用高

Arch Linux配置视频硬件加速 - DDoSolitary’s Blog 开源神器&#xff1a;加速你的视频体验 —— libvdpau-va-gl-CSDN博客 VDPAU&#xff08;Video Decode and Presentation API for Unix&#xff09; VA-API&#xff08;Video Acceleration API&#xff09; OpenGL 我的电…...

Python技巧:二维列表 和 二维矩阵 的区别

np.vstack 是 NumPy 中的一个函数&#xff0c;用于将多个数组沿垂直方向&#xff08;行方向&#xff09;堆叠。它可以处理 二维列表 和 二维矩阵&#xff0c;但它们之间有一些关键区别。以下是详细说明&#xff1a; 1. 二维列表 定义: 二维列表是 Python 原生的数据结构&#x…...

Linux 命令清单(Linux Command List)

测试人员必备的 Linux 命令清单文件管理 ls —— 显示目录内容。 ls -l 使用 -l 选项查看详细信息。 cd —— 改变当前工作目录。 cd /path/to/directory mkdir —— 创建新目录。 mkdir new_directory rm —— 删除文件或目录。 rm filename rm -r directory 使用 …...

Wallaby‘s: Nightmare (v1.0.2)靶场渗透

Wallabys: Nightmare (v1.0.2) 来自 <Wallabys: Nightmare (v1.0.2) ~ VulnHub> 1&#xff0c;将两台虚拟机网络连接都改为NAT模式 2&#xff0c;攻击机上做namp局域网扫描发现靶机 nmap -sn 192.168.23.0/24 那么攻击机IP为192.168.23.182&#xff0c;靶场IP192.168.23…...

java基础 可拆分迭代器 Spliterator<T>

Spliterator Spliterator介绍核心方法tryAdvanceforEachRemainingtrySplitestimateSizetrySplit 结合并行流&#xff08;Parallel Stream&#xff09;关键注意事项总结 Spliterator介绍 Spliterator&#xff08;Splittable Iterator&#xff09;是 Java 8 引入的接口&#xff…...

【AI提示词】决策专家

提示说明 决策专家可以帮助你进行科学决策&#xff0c;尽可能避免错误&#xff0c;提升决策成功的概率。 提示词 # Role : 决策专家决策&#xff0c;是面对不容易判断优劣的几个选项&#xff0c;做出正确的选择。说白了&#xff0c;决策就是拿个主意。决策专家是基于科学决策…...

VectorBT量化入门系列:第二章 VectorBT核心功能与数据处理

VectorBT量化入门系列&#xff1a;第二章 VectorBT核心功能与数据处理 本教程专为中高级开发者设计&#xff0c;系统讲解VectorBT技术在量化交易中的应用。通过结合Tushare数据源和TA-Lib技术指标&#xff0c;深度探索策略开发、回测优化与风险评估的核心方法。从数据获取到策略…...

Spring Boot 配置文件加载优先级全解析

精心整理了最新的面试资料和简历模板&#xff0c;有需要的可以自行获取 点击前往百度网盘获取 点击前往夸克网盘获取 Spring Boot 配置文件加载优先级全解析 Spring Boot 的配置文件加载机制是开发者管理不同环境配置的核心功能之一。其通过外部化配置&#xff08;Externaliz…...

System V 信号量:控制进程间共享资源的访问

System V 信号量&#xff1a;控制进程间共享资源的访问 在多进程操作系统中&#xff0c;当多个进程需要共享资源时&#xff0c;必须确保对资源的访问是有序的&#xff0c;以避免竞争条件&#xff08;Race Condition&#xff09;和数据不一致性问题。System V 信号量&#xff0…...

海运货代系统哪家好?能解决了哪些常见管理难题?

随着跨境电商的迅速发展&#xff0c;货代行业在全球供应链中扮演着越来越重要的角色。随着市场需求的多样化和国际运输环境的复杂化&#xff0c;货代企业面临的挑战也愈发复杂。为了应对这些挑战&#xff0c;数字化管理工具成为货代行业不可或缺的一部分。如今先进的海运货代系…...

预测性维护+智能优化:RK3568的储能双保险

在碳中和目标推动下&#xff0c;储能行业正经历前所未有的发展机遇。作为储能系统的核心组件&#xff0c;储能柜的智能化水平直接影响着整个系统的效率和安全性。RK3568智慧边缘控制器凭借其强大的计算能力、丰富的接口和高效的能源管理特性&#xff0c;正在成为工商储能柜的&q…...

蓝桥20257-元宵分配

#include <iostream> #include <bits/stdc.h> using namespace std; const int N1e910; typedef long long LL; int main() {// 请在此输入您的代码//将强其中的一碗全部倒进另一个中&#xff0c;将所有汤圆排序&#xff0c;最后选择前&#xff08;N/2&#xff09;…...

How to connect a mobile phone to your computer?

How to connect a mobile phone to your computer? 1. Background /ˈbkɡraʊnd/2. How to connect a mobile phone to your computer?References 1. Background /ˈbkɡraʊnd/ Let me introduce the background first. Today we will talk about this topic: How to conn…...

【力扣刷题实战】全排列II

大家好&#xff0c;我是小卡皮巴拉 文章目录 目录 力扣题目&#xff1a;全排列II 题目描述 解题思路 问题理解 算法选择 具体思路 解题要点 完整代码&#xff08;C&#xff09; 兄弟们共勉 &#xff01;&#xff01;&#xff01; 每篇前言 博客主页&#xff1a;小卡…...

题目练习之map的奇妙使用

♥♥♥~~~~~~欢迎光临知星小度博客空间~~~~~~♥♥♥ ♥♥♥零星地变得优秀~也能拼凑出星河~♥♥♥ ♥♥♥我们一起努力成为更好的自己~♥♥♥ ♥♥♥如果这一篇博客对你有帮助~别忘了点赞分享哦~♥♥♥ ♥♥♥如果有什么问题可以评论区留言或者私信我哦~♥♥♥ ✨✨✨✨✨✨ 个…...

Excel 日期值转换问题解析

目录 问题原因 解决方案 方法1&#xff1a;使用 DateTime.FromOADate 转换 方法2&#xff1a;处理可能为字符串的情况 方法3&#xff1a;使用 ExcelDataReader 时的处理 额外提示 当你在 Excel 单元格中看到 2024/12/1&#xff0c;但 C# 读取到 45627 时&#xff0c;这是…...

Linux--文件系统

ok&#xff0c;上次我们提到了硬件和inode&#xff0c;这次我们继续学习文件系统 ext2文件系统 所有的准备⼯作都已经做完&#xff0c;是时候认识下文件系统了。我们想要在硬盘上存储文件&#xff0c;必须先把硬盘格式化为某种格式的文件系统&#xff0c;才能存储文件。文件系…...

2025 年福建交安安全员考试:结合本省交通特点备考​

福建地处东南沿海&#xff0c;交通建设具有独特特点&#xff0c;这对交安安全员考试备考意义重大。在桥梁建设方面&#xff0c;由于面临复杂的海洋环境&#xff0c;桥梁的防腐、防台风等安全措施成为重点。考生在学习桥梁施工安全知识时&#xff0c;要特别关注福建本地跨海大桥…...

【项目管理】第6章 信息管理概论 --知识点整理

项目管理 相关文档&#xff0c;希望互相学习&#xff0c;共同进步 风123456789&#xff5e;-CSDN博客 &#xff08;一&#xff09;知识总览 项目管理知识域 知识点&#xff1a; &#xff08;项目管理概论、立项管理、十大知识域、配置与变更管理、绩效域&#xff09; 对应&…...

python-leetcode 66.寻找旋转排序数组中的最小值

题目&#xff1a; 已知一个长度为n的数组&#xff0c;预先按照升序排列&#xff0c;经由1到n次旋转后&#xff0c;得到输入数组&#xff0c;例如&#xff0c;原数组 nums [0,1,2,4,5,6,7] 在变化后可能得到&#xff1a; 若旋转 4 次&#xff0c;则可以得到 [4,5,6,7,0,1,2]若…...

WinMerge下载及使用教程(附安装包)

文章目录 一、WinMerge安装步骤1.WinMerge下载&#xff1a;2.解压&#xff1a;3.启动&#xff1a; 二、WinMerge使用步骤1.添加文件或文件夹2.查看差异3.格式选择 WinMerge v2.16.36 是一款免费开源的文件与文件夹比较、合并工具&#xff0c;能帮您快速找出差异&#xff0c;提高…...

Codeforces Round 1011 (Div. 2)

Dashboard - Codeforces Round 1011 (Div. 2) - Codeforces Problem - B - Codeforces 题目大意&#xff1a; 给你一个数组&#xff0c;你可以用一段子序列中没有出现的最小非负整数,替换数组中的组序列&#xff0c;经过若干操作&#xff0c;让数组变为长度为1&#xff0c;值…...

深度学习实战105-利用LSTM+Attention模型做生产车间中的铝合金生产时的合格率的预测应用

大家好,我是微学AI,今天给大家介绍一下深度学习实战105-利用LSTM+Attention模型做生产车间中的铝合金生产时的合格率的预测应用。 本项目利用LSTM+Attention模型对铝合金生产合格率进行预测,不仅在理论上具有创新性和可行性,而且在实际应用中也具有重要的价值和广阔的应用前…...

苹果内购支付 Java 接口

支付流程&#xff0c;APP支付成功后 前端调用后端接口&#xff0c;后端接口将前端支付成功后拿到的凭据传给苹果服务器检查&#xff0c;如果接口返回成功了&#xff0c;就视为支付。 代码&#xff0c;productId就是苹果开发者后台提前设置好的 产品id public CommonResult<S…...

Scrapy 是什么?Python 强大的爬虫框架详解

1. Scrapy 简介 Scrapy 是一个用 Python 编写的开源 网络爬虫框架&#xff0c;用于高效地从网站提取结构化数据。它提供了完整的爬虫开发工具&#xff0c;包括请求管理、数据解析、存储和异常处理等功能&#xff0c;适用于数据挖掘、监测和自动化测试等场景。 Scrapy 的核心特…...

一种用于基于扩散磁共振成像(MRI)的微观结构估计的外梯度与噪声调谐自适应迭代网络|文献速递-深度学习医疗AI最新文献

Title 题目 An extragradient and noise-tuning adaptive iterative network for diffusionMRI-based microstructural estimation 一种用于基于扩散磁共振成像&#xff08;MRI&#xff09;的微观结构估计的外梯度与噪声调谐自适应迭代网络 Background 背景 2.1. Advanced…...

需求的图形化分析-状态转换图

实时系统和过程控制应用程序可以在任何给定的时间内以有限的状态存在。当满足所定义的标准时&#xff0c;状态就会发生改变&#xff0c;例如在特定条件下&#xff0c;接收到一个特定的输入激励。这样的系统是有限状态机的例子。此外&#xff0c;许多业务对象&#xff08;如销售…...

3月AI论文精选十篇

1. Feature-Level Insights into Artificial Text Detection with Sparse Autoencoders[1] 核心贡献&#xff1a;通过稀疏自编码器揭示AI生成文本的检测特征&#xff0c;提出基于特征分布的鉴别方法。研究发现&#xff0c;AI文本在稀疏编码空间中呈现独特的"高频低幅"…...

【android bluetooth 框架分析 01】【关键线程 2】【bt_stack_manager_thread线程介绍】

1. bt_stack_manager_thread bt_stack_manager_thread 是蓝牙协议栈中的核心调度线程&#xff0c;负责串行化处理协议栈的生命周期事件&#xff0c;包括初始化、启动、关闭与清理操作。它确保这些状态切换在同一线程中按顺序执行&#xff0c;避免竞态和资源冲突。作为蓝牙栈的…...

GEO, TCGA 等将被禁用?!这40个公开数据库可能要小心使用了

GEO, TCGA 等将被禁用&#xff1f;&#xff01;这40个公开数据库可能要小心使用了 最近NIH公共数据库开始对中国禁用的消息闹得风风火火&#xff1a; 你认为研究者上传到 GEO 数据库上的数据会被禁用吗&#xff1f; 单选 会&#xff0c;毕竟占用存储资源 不会&#xff0c;不…...

matlab安装python API 出现Invalid version: ‘R2022a‘,

打开 setup.py 文件&#xff0c;找到设置版本号的部分 将 versionR2022a 修改为符合 Python 版本号规范的格式&#xff0c;例如 version2022.1 保存 setup.py 文件...