当前位置: 首页 > news >正文

Flink深入浅出之01:应用场景、基本架构、部署模式

Flink

1️⃣ 一 、知识要点

📖 1. Flink简介

  • Apache Flink® — Stateful Computations over Data Streams
  • Apache Flink 是一个分布式大数据处理引擎,可对有界数据流无界数据流进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算
    • 官网地址:http://flink.apache.org

在这里插入图片描述

1.1 处理无界和有界数据
  • 任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。

  • 数据可以被作为无界或者有界流来处理。

(1) 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。(2) 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理

在这里插入图片描述

  • Apache Flink 擅长处理无界和有界数据集 精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。
1.2 部署应用到任意地方
  • Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源管理器,例如 Hadoop YARN、 Apache Mesos 和 Kubernetes,但同时也可以作为独立集群运行。

    	Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resource-manager-specific)的部署模式实现的。Flink 可以采用与当前资源管理器相适应的方式进行交互。部署 Flink 应用程序时,Flink 会根据应用程序配置的并行性自动标识所需的资源,并从资源管理器请求这些资源。在发生故障的情况下,Flink 通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信都是通过 REST 调用进行的,这可以简化 Flink 与各种环境中的集成
    
1.3 运行任意规模应用
  • Flink 旨在任意规模上运行有状态流式应用。因此,应用程序被并行化为可能数千个任务,这些任务分布在集群中并发执行。所以应用程序能够充分利用无尽的 CPU、内存、磁盘和网络 IO。而且 Flink 很容易维护非常大的应用程序状态。其异步和增量的检查点算法对处理延迟产生最小的影响,同时保证精确一次状态的一致性。

    Flink 用户报告了其生产环境中一些令人印象深刻的扩展性数字:每天处理数万亿的事件可以维护几TB大小的状态可以部署上千个节点的集群
    
1.4 利用内存性能
  • 有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期和异步地对本地状态进行持久化存储来保证故障场景下精确一次的状态一致性。

在这里插入图片描述

📖 2. Flink 的应用场景

  在实际生产的过程中,大量数据在不断地产生,例如金融交易数据、互联网订单数据、 GPS 定位数据、传感器信号、移动终端产生的数据、通信信号数据等,以及我们熟悉的网络 流量监控、服务器产生的日志数据,这些数据最大的共同点就是实时从不同的数据源中产生, 然后再传输到下游的分析系统。针对这些数据类型主要包括实时智能推荐、复杂事件处理、 实时欺诈检测、实时数仓与 ETL 类型、流数据分析类型、实时报表类型等实时业务场景,而 Flink 对于这些类型的场景都有着非常好的支持。
  • 1、实时智能推荐

    	智能推荐会根据用户历史的购买行为,通过推荐算法训练模型,预测用户未来可能会购 买的物品。对个人来说,推荐系统起着信息过滤的作用,对 Web/App 服务端来说,推荐系统 起着满足用户个性化需求,提升用户满意度的作用。推荐系统本身也在飞速发展,除了算法 越来越完善,对时延的要求也越来越苛刻和实时化。利用 Flink 流计算帮助用户构建更加实 时的智能推荐系统,对用户行为指标进行实时计算,对模型进行实时更新,对用户指标进行 实时预测,并将预测的信息推送给 Wep/App 端,帮助用户获取想要的商品信息,另一方面也 帮助企业提升销售额,创造更大的商业价值。
    
  • 2、复杂事件处理

    	对于复杂事件处理,比较常见的案例主要集中于工业领域,例如对车载传感器、机械设 备等实时故障检测,这些业务类型通常数据量都非常大,且对数据处理的时效性要求非常高。 通过利用 Flink 提供的 CEP(复杂事件处理)进行事件模式的抽取,同时应用 Flink 的 Sql 进行事件数据的转换,在流式系统中构建实时规则引擎,一旦事件触发报警规则,便立即将 告警结果传输至下游通知系统,从而实现对设备故障快速预警监测,车辆状态监控等目的。
    
  • 3、实时欺诈检测

    	在金融领域的业务中,常常出现各种类型的欺诈行为,例如信用卡欺诈、信贷申请欺诈等,而如何保证用户和公司的资金安全,是来近年来许多金融公司及银行共同面对的挑战。 随着不法分子欺诈手段的不断升级,传统的反欺诈手段已经不足以解决目前所面临的问题。 以往可能需要几个小时才能通过交易数据计算出用户的行为指标,然后通过规则判别出具有 欺诈行为嫌疑的用户,再进行案件调查处理,在这种情况下资金可能早已被不法分子转移, 从而给企业和用户造成大量的经济损失。而运用 Flink 流式计算技术能够在毫秒内就完成对 欺诈判断行为指标的计算,然后实时对交易流水进行规则判断或者模型预测,这样一旦检测 出交易中存在欺诈嫌疑,则直接对交易进行实时拦截,避免因为处理不及时而导致的经济损失。
    
  • 4、实时数仓与 ETL

    	结合离线数仓,通过利用流计算诸多优势和 SQL 灵活的加工能力,对流式数据进行实时 清洗、归并、结构化处理,为离线数仓进行补充和优化。另一方面结合实时数据 ETL 处理能力,利用有状态流式计算技术,可以尽可能降低企业由于在离线数据计算过程中调度逻辑的 复杂度,高效快速地处理企业需要的统计结果,帮助企业更好地应用实时数据所分析出来的结果。
    
  • 5、流数据分析

    	实时计算各类数据指标,并利用实时结果及时调整在线系统相关策略,在各类内容投放、 无线智能推送领域有大量的应用。流式计算技术将数据分析场景实时化,帮助企业做到实时化分析 Web 应用或者 App 应用的各项指标,包括 App 版本分布情况、Crash 检测和分布等, 同时提供多维度用户行为分析,支持日志自主分析,助力开发者实现基于大数据技术的精细 化运营、提升产品质量和体验、增强用户黏性。
    
  • 6、实时报表分析

    	实时报表分析是近年来很多公司采用的报表统计方案之一,其中最主要的应用便是实时 大屏展示。利用流式计算实时得出的结果直接被推送到前端应用,实时显示出重要指标的变 换情况。最典型的案例便是淘宝的双十一活动,每年双十一购物节,除疯狂购物外,最引人 注目的就是天猫双十一大屏不停跳跃的成交总额。在整个计算链路中包括从天猫交易下单购 买到数据采集、数据计算、数据校验,最终落到双十一大屏上展现的全链路时间压缩在 5 秒以内,顶峰计算性能高达数三十万笔订单/秒,通过多条链路流计算备份确保万无一失。 而在其他行业,企业也在构建自己的实时报表系统,让企业能够依托于自身的业务数据,快速提取出更多的数据价值,从而更好地服务于企业运行过程中。
    

📖 3. Flink基本技术栈

在这里插入图片描述

在flink整个软件架构体系中。同样遵循着分层的架构设计理念,在降低系统耦合度的同时,也为上层用户构建flink应用提供了丰富且友好的接口。

  • API & libraries层
	作为分布式数据处理框架,fink同时提供了支撑流计算和批计算的接口,同时在此基础之上抽象出不同的应用类型的组件库。
如:基于流处理的CEP(复杂事件处理库)、SQL&Table库、FlinkML(机器学习库)、Gelly(图处理库)
有流式处理API,批处理API。流式处理的支持事件处理,表操作。批处理的,支持机器学习,图计算,也支持表操作。
  • Runtime核心层
	该层主要负责对上层的接口提供基础服务,也就是flink分布式计算的核心实现。flink底层的执行引擎。
  • 物理部署层
该层主要涉及到flink的部署模式,目前flink支持多种部署模式:
本地 local
集群 standalone/yarn
云 GCE/EC2  谷歌云、亚马逊云
kubenetes

📖 4. Flink基本架构

在这里插入图片描述

​ Flink 整个系统主要由两个组件组成,分别为 JobManager 和 TaskManager,Flink 架构也遵循 Master-Slave 架构设计原则,JobManager 为 Master 节点,TaskManager 为 Worker(Slave)节点。所有组件之间的通信都是借助于 Akka Framework,包括任务的状态以及 Checkpoint 触发等信息

  • Client

    	客户端负责将任务提交到集群,与 JobManager 构建 Akka 连接,然后将任务提交JobManager,通过和 JobManager 之间进行交互获取任务执行状态。客户端提交任务可以采用CLI 方式或者通过使用 Flink WebUI 提交,也可以在应用程序中指定 JobManager 的 RPC 网络端口构建 ExecutionEnvironment 提交 Flink 应用。
    
  • JobManager

    	JobManager 负责整个 Flink 集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中 TaskManager 上 TaskSlot 的使用情况,为提交的应用分配相应的 TaskSlot 资源并命令 TaskManager 启动从客户端中获取的应用。JobManager 相当于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理。JobManager 和 TaskManager 之间通过 Actor System 进行通信,获取任务执行的情况并通过 Actor System 将应用的任务执行情况发送给客户端。同时在任务执行的过程中,Flink JobManager 会触发 Checkpoint 操作,每个 TaskManager 节点 收到 Checkpoint 触发指令后,完成 Checkpoint 操作,所有的 Checkpoint 协调过程都是在 Fink JobManager 中完成。当任务完成后,Flink 会将任务执行的信息反馈给客户端,并且释放掉 TaskManager 中的资源以供下一次提交任务使用。
    
  • TaskManager

    	TaskManager 相当于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。客户端通过将编写好的 Flink 应用编译打包,提交到 JobManager,然后 JobManager 会根据已注册在 JobManager 中 TaskManager 的资源情况,将任务分配给有资源的 TaskManager节点,然后启动并运行任务。TaskManager 从 JobManager 接收需要部署的任务,然后使用 Slot 资源启动 Task,建立数据接入的网络连接,接收数据并开始数据处理。同时 TaskManager 之间的数据交互都是通过数据流的方式进行的。可以看出,Flink 的任务运行其实是采用多线程的方式,这和 MapReduce 多 JVM 进行的方式有很大的区别,Flink 能够极大提高 CPU 使用效率,在多个任务和 Task 之间通过 TaskSlot 方式共享系统资源,每个 TaskManager 中通过管理多个 TaskSlot 资源池进行对资源进行有效管理。
    

📖 5. Flink的源码编译(了解)

  • 我们可以对flink的源码进行编译,方便对我们各种hadoop的版本进行适配

  • 参见:https://blog.csdn.net/h335146502/article/details/96483310

cd /kkb/soft
编译flink-shaded包
wget  https://github.com/apache/flink-shaded/archive/release-7.0.tar.gz
tar -zxvf flink-shaded-release-7.0.tar.gz -C /kkb/install/
cd /kkb/install/flink-shaded-release-7.0/
mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.14.2编译flink源码
wget http://archive.apache.org/dist/flink/flink-1.9.2/flink-1.9.2-src.tgztar -zxf flink-1.9.2-src.tgz -C /kkb/install/
cd /kkb/install/flink-1.9.2/
mvn -T2C clean install -DskipTests -Dfast -Pinclude-hadoop -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.14.2

📖 6. Local模式安装

  • 1、安装jdk,配置JAVA_HOME,建议使用jdk1.8以上

  • 2、安装包下载地址:

    https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.9.2/flink-1.9.2-bin-scala_2.11.tgz

  • 3、直接上传安装包到服务器

  • 4、解压安装包并配置环境变量

    tar -zxf flink-1.9.2-bin-scala_2.11.tgz -C /kkb/install/配置环境变量
    sudo vim /etc/profile
    export FLINK_HOME=/kkb/install/flink-1.9.2
    export PATH=:$FLINK_HOME/bin:$PATH
    
  • 5、启动服务

    • local模式,什么配置项都不需要配,直接启动服务器即可

      cd /kkb/install/flink-1.9.2
      #启动flink
      bin/start-cluster.sh 
      #停止flink
      bin/stop-cluster.sh 
      
  • 6、Web页面浏览

    • http://node01:8081/#/overview

在这里插入图片描述

📖 7. Standalone模式安装

  • (1)集群规划
主机名JobManagerTaskManager
node01
node02
node03
  • (2)依赖

    • jdk1.8以上,配置JAVA_HOME
    • 主机之间免密码
  • (3)安装步骤

node01修改以下配置文件(a) 修改conf/flink-conf.yaml#jobmanager地址
jobmanager.rpc.address: node01
#使用zookeeper搭建高可用
high-availability: zookeeper
##存储JobManager的元数据到HDFS
high-availability.storageDir: hdfs://node01:8020/flink
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181(b) 修改conf/slaves
node01
node02
node03(c) 修改conf/masters
node01:8081
node02:8081(d)上传flink-shaded-hadoop-2-uber-2.7.5-10.0.jar到flink的lib目录下
将flink-shaded-hadoop-2-uber-2.7.5-10.0.jar 这个jar包上传到flink的安装目录的lib下(e) 拷贝到其他节点
scp -r /kkb/install/flink-1.9.2 node02:/kkb/install
scp -r /kkb/install/flink-1.9.2 node03:/kkb/install(f):node01(JobMananger)节点启动
注意:启动之前先启动hadoop和zookeeper集群
cd /kkb/install/flink-1.9.2
bin/start-cluster.sh(g):访问
http://node01:8081
http://node02:8081(h):关闭flink集群, 在主节点上执行
cd /kkb/install/flink-1.9.2
bin/stop-cluster.sh
  • (4) StandAlone模式需要考虑的参数
jobmanager.heap.mb:     		jobmanager节点可用的内存大小
taskmanager.heap.mb:    		taskmanager节点可用的内存大小
taskmanager.numberOfTaskSlots:   每台taskmanager节点可用的cpu数量
parallelism.default:             默认情况下任务的并行度
taskmanager.tmp.dirs:            taskmanager的临时数据存储目录

📖 8. Flink on Yarn模式安装

  1. 首先安装好Hadoop(yarn)
  2. 上传一个flink的包,配置好hadoop的环境变量就可以了
  • flink on yarn有两种方式
8.1 第一种方式
  • 内存集中管理模式(Yarn Session

    • 在Yarn中初始化一个Flink集群,开辟指定的资源,之后我们提交的Flink Jon都在这个Flink yarn-session中,也就是说不管提交多少个job,这些job都会共用开始时在yarn中申请的资源。这个Flink集群会常驻在Yarn集群中,除非手动停止。

在这里插入图片描述

8.2 第二种方式
  • 内存Job管理模式==【yarn-cluster 推荐使用】==
    • 在Yarn中,每次提交job都会创建一个新的Flink集群,任务之间相互独立,互不影响并且方便管理。任务执行完成之后创建的集群也会消失。
      在这里插入图片描述
8.3 不同模式的任务提交
  • 第一种模式

    • 【yarn-session.sh(开辟资源) + flink run(提交任务)】

      • 1、在flink目录启动yarn-session

        bin/yarn-session.sh -n 2 -tm 1024 -s 1 -d# -n 表示申请2个容器,
        # -s 表示每个容器启动多少个slot
        # -tm 表示每个TaskManager申请1024M内存
        # -d 表示以后台程序方式运行
        
      • 2、使用 flink 脚本提交任务

        bin/flink run examples/batch/WordCount.jar \
        -input  hdfs://node01:8020/words.txt \
        -output hdfs://node01:8020/output/result.txt##说明:如果启动了很多的yarn-session, 在提交任务的时候可以通过参数 -yid 指定作业提交到哪一个yarn-session中运行
        ##例如:
        bin/flink run \
        -yid application_1597295374041_0008 \
        examples/batch/WordCount.jar \
        -input  hdfs://node01:8020/words.txt \
        -output hdfs://node01:8020/output/result.txt
        • 3、停止任务
      yarn application -kill application_1587024622720_0001
      
  • 第二种模式

    • 【flink run -m yarn-cluster (开辟资源+提交任务)】

      • 1、启动集群,执行任务

        bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 \
        examples/batch/WordCount.jar \
        -input hdfs://node01:8020/words.txt \
        -output hdfs://node01:8020/output1注意:client端必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。
        
  • help信息

    • yarn-session.sh 脚本参数

      用法:  必选  -n,--container <arg>   分配多少个yarn容器 (=taskmanager的数量)  可选  -D <arg>                        动态属性  -d,--detached                   独立运行  -jm,--jobManagerMemory <arg>    JobManager的内存 [in MB]  -nm,--name                     在YARN上为一个自定义的应用设置一个名字  -q,--query                      显示yarn中可用的资源 (内存, cpu核数)  -qu,--queue <arg>               指定YARN队列.  -s,--slots <arg>                每个TaskManager使用的slots数量  -tm,--taskManagerMemory <arg>   每个TaskManager的内存 [in MB]  -z,--zookeeperNamespace <arg>   针对HA模式在zookeeper上创建NameSpace -id,--applicationId <yarnAppId> YARN集群上的任务id,附着到一个后台运行的yarn    session中
      
    • flink run 脚本参数

      run [OPTIONS] <jar-file> <arguments>  "run" 操作参数:  
      -c,--class <classname>  如果没有在jar包中指定入口类,则需要在这里通过这个参数指定  
      -m,--jobmanager <host:port>  指定需要连接的jobmanager(主节点)地址,使用这个参数可以指定一个不同于配置文件中的jobmanager  
      -p,--parallelism <parallelism>   指定程序的并行度。可以覆盖配置文件中的默认值。默认查找当前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】:
      ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1连接指定host和port的jobmanager:
      ./bin/flink run -m node01:6123 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1启动一个新的yarn-session:
      ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1注意:yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀
      例如:flink run -m yarn-cluster -yn 2 examples/batch/WordCount.jar 
8.4 Flink on YARN集群部署
  • (1) flink on yarn运行原理

在这里插入图片描述

在这里插入图片描述

  • 其实Flink on YARN部署很简单,就是只要部署好hadoop集群即可,我们只需要部署一个Flink客户端,然后从flink客户端提交Flink任务即可。类似于spark on yarn模式。

📖 9. 入门案例演示

9.1 实时需求分析
实时统计每隔1秒统计最近2秒单词出现的次数
  • 创建maven工程,添加pom依赖
 <properties><flink.version>1.9.2</flink.version><scala.version>2.11.8</scala.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
9.1.1 实时代码开发(scala版本,flink完全倒向了java,别用scala了)
  • 代码开发

    package com.kaikeba.demo1import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.time.Time/*** 使用滑动窗口* 每隔1秒钟统计最近2秒钟的每个单词出现的次数*/
    object FlinkStream {def main(args: Array[String]): Unit = {//构建流处理的环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//从socket获取数据val sourceStream: DataStream[String] = env.socketTextStream("node01",9999)//导入隐式转换的包import org.apache.flink.api.scala._//对数据进行处理val result: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")) //按照空格切分.map(x => (x, 1))           //每个单词计为1.keyBy(0)                   //按照下标为0的单词进行分组      .timeWindow(Time.seconds(2),Time.seconds(1)) //每隔1s处理2s的数据.sum(1)            //按照下标为1累加相同单词出现的次数//对数据进行打印result.print()//开启任务env.execute("FlinkStream")}}
  • 发送 socket 数据

    ##在node01上安装nc服务
    sudo yum -y install nc
    nc -lk 9999
    
  • 打成jar包提交到yarn中运行

    flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 -c com.kaikeba.demo1.FlinkStream original-flink_study-1.0-SNAPSHOT.jar 
    
9.1.2 实时代码开发(java版本)
  • 代码开发

    package com.kaikeba.demo1;import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;/*** java代码开发实时统计每隔1秒统计最近2秒单词出现的次数*/
    public class WindowWordCountJava {public static void main(String[] args) throws Exception {//步骤一:获取流式处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//步骤二:获取socket数据DataStreamSource<String> sourceDstream = env.socketTextStream("node01", 9999);//步骤三:对数据进行处理DataStream<WordCount> wordAndOneStream = sourceDstream.flatMap(new FlatMapFunction<String, WordCount>() {public void flatMap(String line, Collector<WordCount> collector) throws Exception {String[] words = line.split(" ");for (String word : words) {collector.collect(new WordCount(word, 1L));}}});DataStream<WordCount> resultStream = wordAndOneStream.keyBy("word")  //按照单词分组.timeWindow(Time.seconds(2), Time.seconds(1)) //每隔1s统计2s的数据.sum("count");   //按照count字段累加结果//步骤四:结果打印resultStream.print();//步骤五:任务启动env.execute("WindowWordCountJava");}public static class WordCount{public String word;public long count;//记得要有这个空构建public WordCount(){}public WordCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "WordCount{" +"word='" + word + '\'' +", count=" + count +'}';}}
    }
  • 发送socket数据

    #在node01上执行命令,发送数据
    nc -lk 9999
    
9.2 离线需求分析
对文件进行单词计数,统计文件当中每个单词出现的次数。
9.2.1 离线代码开发(scala)
package com.kaikeba.demo1import org.apache.flink.api.scala.{ DataSet, ExecutionEnvironment}/*** scala开发flink的批处理程序*/
object FlinkFileCount {def main(args: Array[String]): Unit = {//todo:1、构建Flink的批处理环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//todo:2、读取数据文件val fileDataSet: DataSet[String] = env.readTextFile("d:\\words.txt")import org.apache.flink.api.scala._//todo: 3、对数据进行处理val resultDataSet: AggregateDataSet[(String, Int)] = fileDataSet.flatMap(x=> x.split(" ")).map(x=>(x,1)).groupBy(0).sum(1)//todo: 4、打印结果resultDataSet.print()//todo: 5、保存结果到文件resultDataSet.writeAsText("d:\\result")env.execute("FlinkFileCount")}
}

📖 10. Flink并行度&Slot&Task

	Flink的每个TaskManager为集群提供solt。每个task slot代表了TaskManager的一个固定大小的资源子集。 solt的数量通常与每个TaskManager节点的可用CPU内核数成比例。一般情况下你的slot数是你每个节点的cpu的核数。

在这里插入图片描述

10.1 并行度

一个Flink程序由多个任务组成(source、transformation和 sink)。 一个任务由多个并行的实例(线程)来执行, 一个任务的并行实例 (线程) 数目就被称为该任务的并行度。

10.2 并行度的设置
  • 一个任务的并行度设置可以从多个级别指定
    • Operator Level(算子级别)
    • Execution Environment Level(执行环境级别)
    • Client Level(客户端级别)
    • System Level(系统级别)
  • 这些并行度的优先级为
    • Operator Level > Execution Environment Level > Client Level > System Level
10.2.1 算子级别

在这里插入图片描述

10.2.2 执行环境级别

在这里插入图片描述

10.2.3 客户端级别
  • 并行度可以在客户端将job提交到Flink时设定,对于CLI客户端,可以通过-p参数指定并行度

    bin/flink run -p 10 examples/batch/WordCount.jar
    
10.2.4 系统级别
  • 在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度

    parallelism.default: 1
    
10.3 并行度操作演示
  • 为了方便在本地测试观察任务并行度信息,可以在本地工程添加以下依赖

    <dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version>
    </dependency>
    
  • 案例

    • 注意获取程序的执行环境发生变化了
    • val environment=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    package com.kaikeba.demo1import org.apache.flink.core.fs.FileSystem.WriteMode
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.time.Time/*** 本地调试并行度*/
    object TestParallelism {def main(args: Array[String]): Unit = {//使用createLocalEnvironmentWithWebUI方法,构建本地流式处理环境val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()//执行环境级别//environment.setParallelism(4)import org.apache.flink.api.scala._//接受socket数据val sourceStream: DataStream[String] = environment.socketTextStream("node01",9999)val countStream: DataStream[(String, Int)] = sourceStream.flatMap(x => x.split(" ")).setParallelism(5) //算子级别.map(x => (x, 1)).keyBy(0).timeWindow(Time.seconds(2), Time.seconds(1)).sum(1)countStream.print()environment.execute()}
    }
    • 设置并行度,观察localhost:8081界面

    在这里插入图片描述

在这里插入图片描述

2️⃣ 总结

  • 掌握Flink的编程规范

  • 了解Flink的集群模式

相关文章:

Flink深入浅出之01:应用场景、基本架构、部署模式

Flink 1️⃣ 一 、知识要点 &#x1f4d6; 1. Flink简介 Apache Flink — Stateful Computations over Data StreamsApache Flink 是一个分布式大数据处理引擎&#xff0c;可对有界数据流和无界数据流进行有状态的计算。Flink 能在所有常见集群环境中运行&#xff0c;并能以…...

react脚手架(creat-react-app)

安装 react脚手架 React官方提供的脚手架工程Create React App&#xff1a;https://github.com/facebook/create-react-app npm install create-react-app -g 全局安装 create-react-app my-react (my-react为项目名称&#xff0c;可以自定义) cd my-react 启动项目&#xff1a…...

TypeError: Cannot set properties of undefined (setting ‘xxx‘)

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》、《前端求职突破计划》 &#x1f35a; 蓝桥云课签约作者、…...

使用Node.js从零搭建DeepSeek本地部署(Express框架、Ollama)

目录 1.安装Node.js和npm2.初始化项目3.安装Ollama4.下载DeepSeek模型5.创建Node.js服务器6.运行服务器7.Web UI对话-Chrome插件-Page Assist 1.安装Node.js和npm 首先确保我们机器上已经安装了Node.js和npm。如果未安装&#xff0c;可以通过以下链接下载并安装适合我们操作系…...

考网络安全工程师证要什么条件才能考?

在当今数字化时代&#xff0c;网络安全问题日益凸显&#xff0c;网络安全工程师成为了一个备受瞩目的职业。许多有志于投身这一行业的学子或职场人士&#xff0c;都希望通过考取网络安全工程师证书来提升自己的专业素养和竞争力。那么&#xff0c;考网络安全工程师证需要具备哪…...

【情境领导者】评估情境——准备度水平

本系列是看了《情境领导者》一书&#xff0c;结合自己工作的实践经验所做的学习笔记。 在文章【情境领导者】评估情境——什么是准备度-CSDN博客我们提到准备度是由能力和意愿两部分组成的。 准备度水平 而我们要怎么去评估准备度呢&#xff1f;准备度水平是指人们在每项工作中…...

一套企业级智能制造云MES系统源码, vue-element-plus-admin+springboot

MES应该是继ERP之后制造企业信息化最热门的管理软件&#xff0c;它适应产品个性化与敏捷化制造需求&#xff0c;满足生产过程精益管理而产生和发展起来的信息系统。 作为企业实现数字化与智能化的核心支撑技术与重要组成部分&#xff0c;MES在帮助制造企业走向数字化、智能化等…...

蓝桥杯备考:动态规划线性dp之传球游戏

按照动态规划的做题顺序 step1&#xff1a;定义状态表示 f[i][j] 表示 第i次传递给了第j号时一共有多少种方案 step2: 推到状压公式 step3:初始化 step4:最终结果实际上就是f[m][1] #include <iostream> #include <cstring> using namespace std;const int N …...

网络编程 day05

网络编程 day05 12. SQL 数据库概念常用数据库MySQL与SQLite的区别 SQL基础SQL语句使用基本语句的使用—命令行操作sqlite3系统命令sqlite命令 sqlite3编程—函数接口 13. setsockopt&#xff1a;设置套接字属性 12. SQL 数据库 概念 数据库是“按照数据结构来组织、存储和管理…...

Excel中COUNTIF用法解析

COUNTIF 是 Excel 中一个非常实用的函数&#xff0c;用于统计满足某个条件的单元格数量。它的基本语法如下&#xff1a; 基本语法 COUNTIF(范围, 条件) 范围&#xff1a;需要统计的单元格区域&#xff0c;例如 A1:A10 或整列 A:A。 条件&#xff1a;用于判断哪些单元格需要被…...

使用XShell连接RHEL9并配置yum阿里源

目录 1.先在终端查看本地IP 2.打开XShell进行连接 方法一&#xff1a; 方法二&#xff1a; 3.关闭防火墙及SElinux 4.更改主机名为node2 5.修改YUM源为阿里源&#xff08;将系统中国外的yum文件换成国内的阿里镜像文件&#xff09; 1.找到本机的yum配置文件 2.删除原有…...

FPGA时序约束的几种方法

一,时钟约束 时钟约束是最基本的一个约束,因为FPGA工具是不知道你要跑多高的频率的,你必要要告诉工具你要跑的时钟频率。时钟约束也就是经常看到的Fmax,因为Fmax是针对“最差劲路径”,也就是说,如果该“最差劲路径”得到好成绩,那些不是最差劲的路径的成绩当然比…...

C# 在Excel中插入和操作切片器-详解

目录 使用工具 C# 在Excel中插入切片器 插入切片器到透视表 插入切片器到表格 C# 在Excel中修改切片器 C# 删除Excel中的切片器 切片器&#xff08;Slicer&#xff09;是Excel中的一个强大工具&#xff0c;它提供了直观且交互式的方式来过滤数据。通过切片器&#xff0c;…...

新编大学应用英语综合教程3 U校园全套参考答案

获取全套答案&#xff1a; 链接&#xff1a;https://pan.quark.cn/s/abaa0338724e...

Kubernetes中的 iptables 规则介绍

#作者&#xff1a;邓伟 文章目录 一、Kubernetes 网络模型概述二、iptables 基础知识三、Kubernetes 中的 iptables 应用四、查看和调试 iptables 规则五、总结 在 Kubernetes 集群中&#xff0c;iptables 是一个核心组件&#xff0c; 用于实现服务发现和网络策略。iptables 通…...

操作系统 2.2-多进程总体实现

多个进程使用CPU的图像 如何使用CPU呢&#xff1f; 通过让程序执行起来来使用CPU。 如何充分利用CPU呢&#xff1f; 通过启动多个程序&#xff0c;交替执行来充分利用CPU。 启动了的程序就是进程&#xff0c;所以是多个进程推进 操作系统需要记录这些进程&#xff0c;并按照…...

基于SeaShips数据集的yolov8训练教程

之前已经试过在yolov3和faster-rcnn上训练SeaShips数据集&#xff0c;本次在yolov8上进行训练。 yolov8的训练有两种方式&#xff0c;一种是在mmdetection框架下下载mmyolo运行&#xff0c;另一种是直接采用ultralytics。本文两种方法都会介绍。 目录 一、mmyolo 1.1 创建环…...

【时间序列聚类】从数据中发现隐藏的模式

在大数据时代&#xff0c;时间序列数据无处不在。无论是股票市场的价格波动、天气的变化趋势&#xff0c;还是用户的点击行为&#xff0c;这些数据都随着时间推移而产生。然而&#xff0c;面对海量的时间序列数据&#xff0c;我们如何从中提取有价值的信息&#xff1f;答案之一…...

在线研讨会 | 加速游戏和AI应用,全面认识Imagination DXTP GPU

近日&#xff0c;Imagination宣布推出 Imagination DXTP GPU IP&#xff0c;该产品重新定义了智能手机和其他功耗受限设备的图形和计算加速。它专为高效的效率而设计&#xff0c;能够提供运行AI、游戏和用户界面体验所需的性能&#xff0c;确保这些体验可以全天候流畅且持续地运…...

百度SEO关键词布局从堆砌到场景化的转型指南

百度SEO关键词布局&#xff1a;从“堆砌”到“场景化”的转型指南 引言 在搜索引擎优化&#xff08;SEO&#xff09;领域&#xff0c;关键词布局一直是核心策略之一。然而&#xff0c;随着搜索引擎算法的不断升级和用户需求的多样化&#xff0c;传统的“关键词堆砌”策略已经…...

数据库基础练习1

目录 1.创建数据库和表 2.插入数据 创建一个数据库&#xff0c;在数据库种创建一张叫heros的表&#xff0c;在表中插入几个四大名著的角色&#xff1a; 1.创建数据库和表 #创建表 CREATE DATABASE db_test;#查看创建的数据库 show databases; #使用db_test数据库 USE db_te…...

UVC for USBCamera in Android

基于UVC 协议&#xff0c;完成USBCamera 开发 文章目录 一、目的&#xff1a;二、USBCamera 技术实现方案难点 三、误区&#xff1a;四、基础补充、资源参考架构图了解Camera相关专栏零散知识了解部分相机源码参考&#xff0c;学习API使用&#xff0c;梳理流程&#xff0c;偏应…...

C++学习之路,从0到精通的征途:入门基础

目录 一.C的第一个程序 二.命名空间 1.namespace的价值 2.命名空间的定义 3.命名空间使用 三.C的输入与输出 1.<iostream> 2.流 3.std(standard) 四.缺省参数 1.缺省参数的定义 2.全缺省/半缺省 3.声明与定义 ​五.函数重载 1.参数个数不同 2.参数类型不…...

RSA-OAEP填充方案与定时攻击防护

目录 RSA-OAEP填充方案与定时攻击防护一、前言二、RSA 与 OAEP 填充方案概述2.1 RSA 加密算法基础2.2 OAEP 填充方案的引入2.3 数学公式推导 三、定时攻击原理与防护策略3.1 定时攻击的基本原理3.2 防护定时攻击的策略 四、基于 Python 的 RSA-OAEP 与定时攻击防护实现五、完整…...

探索高性能AI识别和边缘计算 | NVIDIA Jetson Orin Nano 8GB 开发套件测评总结

# NVIDIA Jetson Orin Nano 8GB测评&#xff1a;当边缘计算遇上"性能暴徒"&#xff0c;树莓派看了想转行 引言&#xff1a;比咖啡机还小的"AI超算"&#xff0c;却让开发者集体沸腾 2025年的某个深夜&#xff0c;程序员老王盯着工位上巴掌大的NVIDIA Jets…...

Seata

Seata是一款开源的分布式事务解决方案&#xff0c;由阿里巴巴发起并维护&#xff0c;旨在帮助应用程序管理和协调分布式事务。以下是对Seata的详细介绍&#xff1a; 一、概述 Seata致力于提供高性能和简单易用的分布式事务服务&#xff0c;它为用户提供了AT、TCC、SAGA和XA等…...

STM32之Unix时间戳

时间戳按秒计时&#xff0c;可转换成年月日时分。32有符号存储时间戳&#xff0c;2的32次/2-1到2038年&#xff0c;STM32是2的32次方-1&#xff0c;到2106年溢出。所有时区共用一个时间戳秒计数器&#xff0c;在伦敦和北京都是0&#xff0c;不同经度加上小时即可。...

告别手动复制粘贴:可定时自动备份的实用软件解析

软件介绍 此前不少小伙伴都在找备份工具&#xff0c;其实复制文件用fastcopy就可以&#xff0c;但它需要手动操作。 今天介绍的简易备份工具则能实现定时备份。 这款软件有个小问题&#xff0c;当源目录和目标目录路径太长时&#xff0c;【立即备份】按钮可能会超出软件界面范…...

Django下防御Race Condition

目录 漏洞原因 环境搭建 复现 A.无锁无事务时的竞争攻击 B.无锁有事务时的竞争攻击 防御 A.悲观锁加事务防御 B.乐观锁加事务防御 总结 漏洞原因 Race Condition 发生在多个执行实体&#xff08;如线程、进程&#xff09;同时访问共享资源时&#xff0c;由于执行顺序…...

python从入门到精通(二十三):文件操作和目录管理难度分级练习题

文件操作和目录管理 文件操作基础难度1. 简单文件写入2. 简单文件读取3. 追加内容到文件 中级难度4. 逐行读取文件并统计行数5. 读取文件并提取特定信息6. 复制文件内容到新文件 高级难度7. 处理二进制文件8. 批量文件处理9. 日志文件分析 参考答案示例1. 简单文件写入2. 简单文…...

揭开AI-OPS 的神秘面纱 第二讲-技术架构与选型分析 -- 数据采集层技术架构与组件选型分析

基于上一讲预设的架构图&#xff0c;深入讨论各个组件所涉及的技术架构、原理以及选型策略。我将逐层、逐组件地展开分析&#xff0c;并侧重于使用数据指标进行技术选型的对比。 我们从 数据采集层 开始&#xff0c;进行最细粒度的组件分析和技术选型比对。 数据采集层技术架构…...

jupyter配置多个核心

CMD输入 先创建虚拟环境 "D:\Program Files\Python37\python.exe" -m venv myenv激活虚拟环境 myenv\Scripts\activate"D:\Program Files\Python37\python.exe" -m pip install ipykernel "D:\Program Files\Python37\python.exe" -m ipykern…...

如何优化FFmpeg拉流性能及避坑指南

FFmpeg作为流媒体处理的核心工具&#xff0c;其拉流性能直接影响直播/点播体验。本文从协议优化、硬件加速、网络策略三大维度切入&#xff0c;结合实战案例与高频踩坑点&#xff0c;助你突破性能瓶颈&#xff01; 一、性能优化进阶&#xff1a;从协议到硬件的全链路调优 协议选…...

机器学习:线性回归,梯度下降,多元线性回归

线性回归模型 (Linear Regression Model) 梯度下降算法 (Gradient Descent Algorithm) 的数学公式 多元线性回归&#xff08;Multiple Linear Regression&#xff09;...

笔记五:C语言编译链接

Faye&#xff1a;孤独让我们与我们所爱的人相处的每个瞬间都无比珍贵&#xff0c;让我们的回忆价值千金。它还驱使你去寻找那些你在我身边找不到的东西。 ---------《寻找天堂》 目录 一、编译和链接的介绍 1.1 程序的翻译环境和执行环境 1.1.1 翻译环境 1.1.2 运行环境 …...

SpringUI:打造高质量Web交互设计的首选元件库

SpringUI作为一个专为Web设计与开发领域打造的高质量交互元件库&#xff0c;确实为设计师和开发者提供了极大的便利。以下是对SpringUI及其提供的各类元件的详细解读和一些建议&#xff1a; SpringUI概述 SpringUI集合了一系列预制的、高质量的交互组件&#xff0c;旨在帮助设…...

LeetCode - 神经网络的 反向传播(Sigmoid + MSE) 教程

欢迎关注我的CSDN:https://spike.blog.csdn.net/ 本文地址:https://spike.blog.csdn.net/article/details/146085177 使用 Python + Numpy,设计带有 Sigmoid 激活函数 的神经网络,实现反向传播以更新神经元的权重和偏置。函数输入:特征向量(Input)、真实标签(Label)、初始…...

Elastic如何获取当前系统时间

文章目录 1. 使用 _ingest.timestamp 在 Ingest Pipeline 中获取当前时间2. 使用 Painless Script 获取当前时间3. 使用 now 关键字在查询中获取当前时间4. 使用 date 类型字段的默认值5. 使用 Kibana 的 Dev Tools 查看当前时间6. 使用 date 聚合获取当前时间7. 使用 Elastics…...

腾讯云对象存储服务(COS)

腾讯云对象存储服务&#xff08;COS&#xff09; 安全、可扩展、低成本的云存储解决方案 腾讯云 对象存储服务&#xff08;COS&#xff0c;Cloud Object Storage&#xff09; 是一种高可靠、高性能、可扩展的云存储服务&#xff0c;专为海量非结构化数据&#xff08;如图片、…...

力扣35.搜索插入位置-二分查找

class Solution:def searchInsert(self, nums: List[int], target: int) -> int:# 初始化左右指针left, right 0, len(nums) - 1# 当左指针小于等于右指针时&#xff0c;继续循环while left < right:# 计算中间位置mid (left right) // 2# 如果中间元素等于目标值&…...

SSLScan实战指南:全面检测SSL/TLS安全配置

SSLScan是一款开源的SSL/TLS安全扫描工具,用于检测服务器的加密协议、支持的加密套件、证书信息以及潜在的安全漏洞。本指南将详细介绍如何安装、使用SSLScan,并结合实战案例帮助您全面评估服务器的安全性。 一、SSLScan简介 功能特性: 检测支持的SSL/TLS协议版本(如TLS 1.…...

Linux 进程管理

一.进程 1.基本介绍 在Linux中每一个执行的程序都称之为进程&#xff0c;每一个进程都会分配一个进程号&#xff08;PID&#xff09;。进程以前台和后台两种方式存在&#xff0c;前台进程就是我们可以在屏幕上操作的&#xff0c;后台进程我们无法在屏幕上看到。 程序是静态的…...

mfc140u.dll是什么?当程序遭遇mfc140u.dll问题:快速恢复正常的秘诀

在使用Windows操作系统运行某些软件时&#xff0c;不少用户会遇到令人头疼的mfc140u.dll文件丢失错误。mfc140u.dll这个错误一旦出现&#xff0c;往往导致相关程序无法正常启动或运行&#xff0c;给用户带来诸多不便。这天的这篇文章将给大家分析mfc140u.dll是什么&#xff1f;…...

日新F1、瑞研F600P 干线光纤熔接(熔接损耗最大0.03DB)

Ⅰ. 设备特性对比与实测验证 1. 日新F1&#xff08;两马达&#xff09;极限参数 切割角度&#xff1a;必须≤0.3&#xff08;双边累计误差&#xff1c;0.6&#xff09; ▶ 实测案例&#xff1a;切割0.35时&#xff0c;损耗波动达0.05-0.08dB&#xff08;超干线标准&#xff09…...

【我的待办(MyTodolists)-免费无内购的 IOS 应用】

我的待办&#xff08;MyTodolists&#xff09; 我的待办&#xff1a;智能任务管理助手应用说明主要功能为什么选择"我的待办"&#xff1f;隐私保障使用截图 我的待办&#xff1a;智能任务管理助手 应用说明 "我的待办"是一款智能化的任务管理应用&#x…...

微信小程序+SpringBoot的单词学习小程序平台(程序+论文+讲解+安装+修改+售后)

感兴趣的可以先收藏起来&#xff0c;还有大家在毕设选题&#xff0c;项目以及论文编写等相关问题都可以给我留言咨询&#xff0c;我会一一回复&#xff0c;希望帮助更多的人。 系统背景 &#xff08;一&#xff09;社会需求背景 在全球化的大背景下&#xff0c;英语作为国际…...

测试直播web自动化所学

web框架封装 web自动化开始&#xff1a;用电脑替代人工测试。 日常人工测试 —— 先点击XX 输入XXX 。。。页面是否符合预期 自动化测试的编码&#xff1a; web自动化&#xff0c;Selenium[常用测试库] Selenium&#xff0c;每个页面&#xff0c;是由元素组成的。html构成。 …...

Vue+Ant Design搭建AI聊天对话

今天在这里介绍一下 Ant Design X&#xff0c;这是蚂蚁设计团队推出的一款专注于人工智能&#xff08;AI&#xff09;领域的组件库&#xff0c;主要面向 React 生态系统(目前支持Openai&#xff0c;通义千问)。官方也推出了ant-design-x-vue 面向 Vue。当然我们今天的主题也是使…...

应用案例 | 精准控制,高效运行—宏集智能控制系统助力SCARA机器人极致性能

概述 随着工业4.0的深入推进&#xff0c;制造业对自动化和智能化的需求日益增长。传统生产线面临空间不足、效率低下、灵活性差等问题&#xff0c;尤其在现有工厂改造项目中&#xff0c;如何在有限空间内实现高效自动化成为一大挑战。 此次项目的客户需要在现有工厂基础上进行…...

JavaScript基础-运算符的分类

在JavaScript编程中&#xff0c;运算符是构建表达式和执行操作的基础工具。了解不同类型的运算符以及它们的工作原理对于编写高效且无误的代码至关重要。本文将介绍JavaScript中的主要运算符类型&#xff0c;并通过实例展示它们的用法。 一、算术运算符 算术运算符用于执行基…...