当前位置: 首页 > news >正文

Spark生态圈

Spark 主要用于替代Hadoop中的 MapReduce 计算模型。存储依然可以使用 HDFS,但是中间结果可以存放在内存中;调度可以使用 Spark 内置的,也可以使用更成熟的调度系统 YARN 等。

Spark有完善的生态圈:

  • Spark Core:实现了 Spark 的基本功能,包含 RDD、任务调度、内存管理、错误恢复、与存储系统交互等模块。
  • Spark SQL:Spark 用来操作结构化数据的程序包。通过 Spark SQL,我们可以使用 SQL 操作数据。
  • Spark Streaming:Spark 提供的对实时数据进行流式计算的组件。提供了用来操作数据流的 API。
  • Spark MLlib:提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。
  • GraphX(图计算):Spark 中用于图计算的 API,性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。
  • 集群管理器:Spark 设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。
  • Structured Streaming:处理结构化流,统一了离线和实时的 API。

Spark 特点

快:

与 Hadoop 的 MapReduce 相比,Spark 基于内存的运算要快 100 倍以上,基于硬盘的运算也要快 10 倍以上。Spark 通过基于内存来高效处理数据流。

易用:

Spark 支持 Java、Python、R 和 Scala 的 API,还支持超过 80 种高级算法,使用户可以快速构建不同的应用。而且 Spark 支持交互式的 Python 和 Scala 的 shell,可以非常方便地在这些 shell 中使用 Spark 集群来验证解决问题的方法。

通用:

Spark 提供了统一的解决方案。Spark 可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX),这些不同类型的处理都可以在同一个应用中无缝使用。

兼容性:

Spark 可以非常方便地与其他的开源产品进行融合。比如,Spark 可以使用 Hadoop 的 YARN 和 Apache Mesos 作为它的资源管理和调度器,并且可以处理所有 Hadoop 支持的数据,包括 HDFS、HBase 和 Cassandra 等。这对于已经部署 Hadoop 集群的用户特别重要,因为不需要做任何数据迁移就可以使用 Spark 的强大处理能力。

Spark Core

RDD

MapReduce 框架采用非循环式的数据流模型,会把中间结果写入到 HDFS 中,带来了大量的数据复制、磁盘 IO 和序列化开销。且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象。因此出现了RDD这个概念。

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

RDD 不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数)。

RDD单词拆解:

  • Resilient :它是弹性的,RDD 里面的中的数据可以保存在内存中或者磁盘里面(RDD的数据默认存放在内存中,但是当内存资源不足时,spark会自动将RDD数据写入磁盘。比如某结点内存只能处理20W数据,那么这20W数据就会放入内存中计算,剩下10W放到磁盘中。RDD的弹性体现在于RDD上自动进行内存和磁盘之间权衡和切换的机制);
  • Distributed : 它里面的元素是分布式存储的,可以用于分布式计算;
  • Dataset:  它是一个集合,可以存放很多元素。

RDD 属性


RDD 的源码描述如下:

其含义如下:

  • A list of partitions :一组分片(Partition)/一个分区(Partition)列表,即数据集的基本组成单位。对于 RDD 来说,每个分片都会被一个计算任务处理,分片数决定并行度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。
  • A function for computing each split :一个函数会被作用在每一个分区。Spark 中 RDD 的计算是以分片为单位的,compute 函数会被作用到每个分区上。
  • A list of dependencies on other RDDs :一个 RDD 会依赖于其他多个 RDD。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。(Spark 的容错机制)
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):可选项,对于 KV 类型的 RDD 会有一个 Partitioner,即 RDD 的分区函数,默认为 HashPartitioner。
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):可选项,一个列表,存储存取每个 Partition 的优先位置(preferred location)。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照"移动数据不如移动计算"的理念,Spark 在进行任务调度的时候,会尽可能选择那些存有数据的 worker 节点来进行任务计算。

总结:

RDD 是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来,如何计算,主要属性包括:分区列表、计算函数、依赖关系、分区函数(默认是 hash)、最佳位置。分区列表、分区函数、最佳位置,这三个属性其实说的就是数据集在哪,在哪计算更合适,如何分区;计算函数、依赖关系,这两个属性其实说的是数据集怎么来的。

RDD API

RDD 的创建方式
① 由外部存储系统的数据集创建,包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、HBase 等:

val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

② 通过已有的 RDD 经过算子转换生成新的 RDD:

val rdd2=rdd1.flatMap(_.split(" "))

③ 由一个已经存在的 Scala 集合创建:

val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))

makeRDD 方法底层调用了 parallelize 方法:

RDD 算子

RDD 的算子分为两类:

  1. Transformation转换操作:返回一个新的 RDD
  2. Action动作操作:返回值不是 RDD(无返回值或返回其他的)

Transformation转换算子


Action 动作算子:

统计操作:

Transformation和action算子有什么区别?

Transformation 变换/转换:这种变换并不触发提交作业,完成作业中间过程处理。Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算

Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。

RDD 持久化/缓存

某些 RDD 的计算或转换可能会比较耗费时间,如果这些 RDD 后续还会频繁的被使用到,那么可以将这些 RDD 进行持久化/缓存:

val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了

RDD 通过 persist 或 cache 方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。通过查看 RDD 的源码发现 cache 最终也是调用了 persist 无参方法(默认存储只存在内存中)。

存储级别
默认的存储级别都是仅在内存存储一份,Spark 的存储级别还有好多种,存储级别在 object StorageLevel 中定义的。


总结:

  • RDD 持久化/缓存的目的是为了提高后续操作的速度
  • 缓存的级别有很多,默认只存在内存中,开发中使用 memory_and_disk
  • 只有执行 action 操作的时候才会真正将 RDD 数据进行持久化/缓存
  • 实际开发中如果某一个 RDD 后续会被频繁的使用,可以将该 RDD 进行持久化/缓存
  • 这里当小数据量的时候缓存能提升效率,但数据大的时候内存放不下就会报溢出

RDD 容错机制Checkpoint

持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。

怎么解决?

Checkpoint 的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在 HDFS 上,这就天然的借助了 HDFS 天生的高容错、高可靠来实现数据最大程度上的安全,实现了 RDD 的容错和高可用。用法如下:

SparkContext.setCheckpointDir("目录") //HDFS的目录RDD.checkpoint

总结:

开发中如何保证数据的安全性性及读取效率:可以对频繁使用且重要的数据,先做缓存/持久化,再做 checkpint 操作。


持久化和 Checkpoint 的区别:

  • 位置:Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存–实验中) Checkpoint 可以保存数据到 HDFS 这类可靠的存储上。
  • 生命周期:Cache 和 Persist 的 RDD 会在程序结束后会被清除或者手动调用 unpersist 方法 Checkpoint 的 RDD 在程序结束后依然存在,不会被删除。

RDD 的依赖关系

RDD有两种依赖,分别为宽依赖(wide dependency/shuffle dependency)和窄依赖(narrow dependency) :

从上图可以看到:

  • 窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖;
  • 宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)


对于窄依赖:窄依赖的多个分区可以并行计算;窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。

对于宽依赖:划分 Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段


DAG 的生成和划分 Stage

DAG

DAG(Directed Acyclic Graph 有向无环图):指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程);

原始的 RDD 通过一系列的转换操作就形成了 DAG 有向无环图,任务执行时,可以按照 DAG 的描述,执行真正的计算(数据被操作的一个过程)。

DAG 的边界:

  • 开始:通过 SparkContext 创建的 RDD;
  • 结束:触发 Action,一旦触发 Action 就形成了一个完整的 DAG。
DAG 划分Stage

从上图可以看出:

  • 一个 Spark 程序可以有多个 DAG(有几个 Action,就有几个 DAG,上图最后只有一个 Action(图中未表现),那么就是一个 DAG);
  • 一个 DAG 可以有多个 Stage(根据宽依赖/shuffle 进行划分);
  • 同一个 Stage 可以有多个 Task 并行执行(task 数=分区数,如上图,Stage1 中有三个分区 P1、P2、P3,对应的也有三个 Task);
  • 可以看到这个 DAG 中只 reduceByKey 操作是一个宽依赖,Spark 内核会以此为边界将其前后划分成不同的 Stage;
  • 在图中 Stage1 中,从 textFile 到 flatMap 到 map 都是窄依赖,这几步操作可以形成一个流水线操作,通过 flatMap 操作生成的 partition 可以不用等待整个 RDD 计算结束,而是继续进行 map 操作,这样大大提高了计算的效率。

所以总结下stage是如何划分的:

  1. 从hdfs中读取文件后,创建 RDD 对象
  2. DAGScheduler模块介入运算,计算RDD之间的依赖关系。RDD之间的依赖关系就形成了DAG

每一个JOB被分为多个Stage,划分Stage的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个Stage,避免多个Stage之间的消息传递开销。
因此spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中

为什么要划分 Stage?  

为了并行计算。

一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多个 Stage/阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。

总结:

Spark 会根据 shuffle/宽依赖使用回溯算法来对 DAG 进行 Stage 划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到当前的 stage/阶段中。

RDD累加器和广播变量

在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。

为了满足这种需求,Spark 提供了两种类型的变量:

  • 累加器 (accumulators):累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)。
  • 广播变量 (broadcast variables):广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。

累加器:通常在向 Spark 传递函数时,比如使用 map() 函数或者用filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果:

语法:

val xx: Accumulator[Int] = sc.accumulator(0)

示例代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}object AccumulatorTest {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//使用scala集合完成累加var counter1: Int = 0;var data = Seq(1,2,3)data.foreach(x => counter1 += x )println(counter1)//6println("+++++++++++++++++++++++++")//使用RDD进行累加var counter2: Int = 0;val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]dataRDD.foreach(x => counter2 += x)println(counter2)//0//注意:上面的RDD操作运行结果是0//因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量//而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2//最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系//那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊!//如果解决?---使用累加器val counter3: Accumulator[Int] = sc.accumulator(0)dataRDD.foreach(x => counter3 += x)println(counter3)//6}
}


广播变量
关键词:sc.broadcast()

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object BroadcastVariablesTest {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//不使用广播变量val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap//scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))//根据水果编号取水果名称val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))fruitNames.foreach(println)//注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多,//那么会导致,被各个Task共用到的fruitMap会被多次传输//应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可//如何做到?---使用广播变量//注意:广播变量的值不能被修改,如需修改可以将数据存到外部数据源,如MySQL、Redisprintln("=====================")val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))fruitNames2.foreach(println)}
}

Spark SQL

Hive 是将 SQL 转为 MapReduce。

SparkSQL 可以理解成是将 SQL 解析成:“RDD + 优化” 再执行。

在学习Spark SQL前,需要了解数据分类。

数据分为如下几类:


总结:

  • RDD 主要用于处理非结构化数据 、半结构化数据、结构化
  • SparkSQL 主要用于处理结构化数据(较为规范的半结构化数据也可以处理)

Spark SQL 数据抽象

DataFrame 和 DataSet

Spark SQL数据抽象可以分为两类:

① DataFrame:

DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库的二维表格,带有 Schema 元信息(可以理解为数据库的列名和类型)。DataFrame = RDD + 泛型 + SQL 的操作 + 优化

② DataSet:

DataSet是DataFrame的进一步发展,它比RDD保存了更多的描述信息,概念上等同于关系型数据库中的二维表,它保存了类型信息,是强类型的,提供了编译时类型检查。调用 Dataset 的方法先会生成逻辑计划,然后被 spark 的优化器进行优化,最终生成物理计划,然后提交到集群中运行!DataFrame = Dateset[Row]

RDD、DataFrame、DataSet的关系如下:

  • RDD[Person]:以 Person 为类型参数,但不了解其内部结构。
  • DataFrame:提供了详细的结构信息 schema 列的名称和类型。这样看起来就像一张表了。
  • DataSet[Person]:不光有 schema 信息,还有类型信息。

举例
假设 RDD 中的两行数据长这样:

RDD[Person]:

那么 DataFrame 中的数据长这样:

因为DataFrame = RDD[Person] + 泛型 + Schema + SQL 操作 + 优化

那么 Dataset 中的数据长这样:

因为Dataset[Person] = DataFrame + 泛型

Dataset 也可能长这样:Dataset[Row]:

即 DataFrame = DataSet[Row]

总结

  • DataFrame = RDD - 泛型 + Schema + SQL + 优化
  • DataSet = DataFrame + 泛型
  • DataSet = RDD + Schema + SQL + 优化

Spark SQL 应用

创建 DataFrame/DataSet

方式一:读取本地文件

① 在本地创建一个文件,有 id、name、age 三列,用空格分隔,然后上传到 hdfs 上。

vim /root/person.txt

内容如下:

1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40

② 打开 spark-shell

spark/bin/spark-shell
##创建 RDDval lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" ")) //RDD[Array[String]]

③ 定义 case class(相当于表的 schema)

case class Person(id:Int, name:String, age:Int)

④ 将 RDD 和 case class 关联

val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person]

⑤ 将 RDD 转换成 DataFrame

val personDF = personRDD.toDF //DataFrame

也可以通过 SparkSession 构建 DataFrame

val dataFrame=spark.read.text("hdfs://node1:8020/person.txt")
dataFrame.show //注意:直接读取的文本文件没有完整schema信息
dataFrame.printSchema

两种查询风格:DSL 和 SQL
DSL风格示例:

personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show

SQL 风格示例:

spark.sql("select * from t_person").show

总结:

  • DataFrame 和 DataSet 都可以通过RDD来进行创建;
  • 不管是 DataFrame 还是 DataSet 都可以注册成表,之后可以使用 SQL 进行查询,也可以使用 DSL

Spark Streaming

Spark Streaming 是一个基于 Spark Core 之上的实时计算框架,可以从很多数据源消费数据并对数据进行实时的处理,具有高吞吐量和容错能力强等特点。

Spark streaming内部的基本工作原理是:接受实时输入数据流,然后将数据拆分成batch,比如每收集一秒的数据封装成一个batch,然后将每个batch交给spark的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个的batch组成的。

Spark Streaming 的特点:

  • 易用:可以像编写离线批处理一样去编写流式程序,支持 java/scala/python 语言。
  • 容错:SparkStreaming 在没有额外代码和配置的情况下可以恢复丢失的工作。
  • 易整合到 Spark 体系:流式处理与批处理和交互式查询相结合。

整体流程

  • ① Spark Streaming 中,会有一个接收器组件 Receiver,作为一个长期运行的 task 跑在一个 Executor 上,Receiver 接收外部的数据流形成 input DStream。
  • ② DStream 会被按照时间间隔划分成一批一批的 RDD,当批处理间隔缩短到秒级时,便可以用于处理实时数据流(时间间隔的大小可以由参数指定,一般设在 500 毫秒到几秒之间)。
  • ③ 对 DStream 进行操作就是对 RDD 进行操作,计算处理的结果可以传给外部系统。
  • ④ 接受到实时数据后,给数据分批次,然后传给 Spark Engine 处理最后生成该批次的结果。

数据抽象

Spark Streaming 的基础抽象是 DStream(Discretized Stream,离散化数据流,连续不断的数据流),代表持续性的数据流和经过各种 Spark 算子操作后的结果数据流。

可以从以下多个角度深入理解 DStream:

① DStream 本质上就是一系列时间上连续的 RDD,每个RDD包含了一个时间段的数据:


② 对 DStream 的数据的进行操作也是按照 RDD 为单位来进行的:


③ 容错性,底层 RDD 之间存在依赖关系,DStream 直接也有依赖关系,RDD 具有容错性,那么 DStream 也具有容错性。

④ 准实时性/近实时性

  • Spark Streaming 将流式计算分解成多个 Spark Job,对于每一时间段数据的处理都会经过 Spark DAG 图分解以及 Spark 的任务集的调度过程。
  • 对于目前版本的 Spark Streaming 而言,其最小的 Batch Size 的选取在 0.5~5 秒钟之间。

所以 Spark Streaming 能够满足流式准实时计算场景,对实时性要求非常高的如高频实时交易场景则不太适合。

总结: 简单来说 DStream 就是对 RDD 的封装,你对 DStream 进行操作,就是对 RDD 进行操作。对于 DataFrame/DataSet/DStream 来说本质上都可以理解成 RDD。

Spark 两种核心 Shuffle

在 MapReduce 框架中,Shuffle 阶段是连接 Map 与 Reduce 之间的桥梁, Map 阶段通过 Shuffle 过程将数据输出到 Reduce 阶段中。由于 Shuffle 涉及磁盘的读写和网络 I/O,因此 Shuffle 性能的高低直接影响整个程序的性能。Spark 也有 Map 阶段和 Reduce 阶段,因此也会出现Shuffle。

哪些spark算子会有shuffle?

  1. 去重,distinct
  2. 排序,groupByKey,reduceByKey等
  3. 重分区,repartition,coalesce
  4. 集合或者表操作,interection,joi

Spark Shuffle 分为两种:

  • 一种是基于 Hash 的 Shuffle;
  • 另一种是基于 Sort 的 Shuffle。

在spark-1.6版本之前,一直使用HashShuffle,在spark-1.6版本之后使用Sort-Base Shuffle,因为HashShuffle存在的不足所以就替换了HashShuffle.

Hash Shuffle

HashShuffleManager
shuffle write 阶段,主要就是在一个 stage 结束计算之后,为了下一个 stage 可以执行 shuffle 类的算子(比如 reduceByKey),而将每个 task 处理的数据按 key 进行“划分”。所谓“划分”,就是对相同的 key 执行 hash 算法,从而将相同 key 都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游 stage 的一个 task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

下一个 stage 的 task 有多少个,当前 stage 的每个 task 就要创建多少份磁盘文件。比如:

  • 下一个 stage 总共有 100 个 task,那么当前 stage 的每个 task 都要创建 100 份磁盘文件。
  • 如果当前 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,那么每个 Executor 上总共就要创建 500 个磁盘文件,所有 Executor 上会创建 5000 个磁盘文件。

由此可见,未经优化的 shuffle write 操作所产生的磁盘文件的数量是极其惊人的。

shuffle read 阶段,通常就是一个 stage 刚开始时要做的事情。此时该 stage 的每一个 task 就需要将上一个 stage 的计算结果中的所有相同 key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行 key 的聚合或连接等操作。由于 shuffle write 的过程中,map task 给下游 stage 的每个 reduce task 都创建了一个磁盘文件,因此 shuffle read 的过程中,每个 reduce task 只要从上游 stage 的所有 map task 所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read 的拉取过程是一边拉取一边进行聚合的。每个 shuffle read task 都会有一个自己的 buffer 缓冲,每次都只能拉取与 buffer 缓冲相同大小的数据,然后通过内存中的一个 Map 进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到 buffer 缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

HashShuffleManager 工作原理如下图所示:


优化的HashShuffleManager
为了优化 HashShuffleManager 我们可以设置一个参数:spark.shuffle.consolidateFiles,该参数默认值为 false,将其设置为 true 即可开启优化机制,通常来说,如果我们使用 HashShuffleManager,那么都建议开启这个选项。

开启 consolidate 机制之后,在 shuffle write 过程中,task 就不是为下游 stage 的每个 task 创建一个磁盘文件了,此时会出现 shuffleFileGroup 的概念,每个 shuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量与下游 stage 的 task 数量是相同的。一个 Executor 上有多少个 cpu core,就可以并行执行多少个 task。而第一批并行执行的每个 task 都会创建一个 shuffleFileGroup,并将数据写入对应的磁盘文件内。

当 Executor 的 cpu core 执行完一批 task,接着执行下一批 task 时,下一批 task 就会复用之前已有的 shuffleFileGroup,包括其中的磁盘文件,也就是说,此时 task 会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate 机制允许不同的 task 复用同一批磁盘文件,这样就可以有效将多个 task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能。

假设第二个 stage 有 100 个 task,第一个 stage 有 50 个 task,总共还是有 10 个 Executor(Executor CPU 个数为 1),每个 Executor 执行 5 个 task。那么原本使用未经优化的 HashShuffleManager 时,每个 Executor 会产生 500 个磁盘文件,所有 Executor 会产生 5000 个磁盘文件的。但是此时经过优化之后,每个 Executor 创建的磁盘文件的数量的计算公式为:cpu core的数量 * 下一个stage的task数量,也就是说,每个 Executor 此时只会创建 100 个磁盘文件,所有 Executor 只会创建 1000 个磁盘文件。

优化后的 HashShuffleManager 工作原理如下图所示:

优点:

  • 可以省略不必要的排序开销。
  • 避免了排序所需的内存开销。

缺点:

  • 生产的文件过多,会对文件系统造成压力。
  • 大量小文件的随机读写带来一定的磁盘开销。
  • 数据块写入时所需的缓存空间也会随之增加,对内存造成压力。

SortShuffle

SortShuffleManager 的运行机制主要分成三种:

  • 普通运行机制;
  • bypass 运行机制:当 shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold的值时(默认为 200),就会启用 bypass 机制;
  • Tungsten Sort 运行机制:开启此运行机制需设置配置项 spark.shuffle.manager=tungsten-sort。开启此项配置也不能保证就一定采用此运行机制(后面会解释)。

普通运行机制

在该模式下,数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。

一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件 ,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。

SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。比如第一个 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,而第二个 stage 有 100 个 task。由于每个 task 最终只有一个磁盘文件,因此此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁盘文件。

普通运行机制的 SortShuffleManager 工作原理如下图所示:

bypass 运行机制

Reducer 端任务数比较少的情况下,基于 Hash Shuffle 实现机制明显比基于 Sort Shuffle 实现机制要快,因此基于 Sort huffle 实现机制提供了一个回退方案,就是 bypass 运行机制。对于 Reducer 端任务数少于配置属性spark.shuffle.sort.bypassMergeThreshold设置的个数时,使用带 Hash 风格的回退计划。

bypass 运行机制的触发条件如下:

shuffle map task 数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。
不是聚合类的 shuffle 算子。
此时,每个 task 会为每个下游 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。

而该机制与普通 SortShuffleManager 运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

bypass 运行机制的 SortShuffleManager 工作原理如下图所示:

Tungsten Sort Shuffle 运行机制

基于 Tungsten Sort 的 Shuffle 实现机制主要是借助 Tungsten 项目所做的优化来高效处理 Shuffle。

Spark 提供了配置属性,用于选择具体的 Shuffle 实现机制,但需要说明的是,虽然默认情况下 Spark 默认开启的是基于 SortShuffle 实现机制,但实际上,参考 Shuffle 的框架内核部分可知基于 SortShuffle 的实现机制与基于 Tungsten Sort Shuffle 实现机制都是使用 SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的:

对应非基于 Tungsten Sort 时,通过 SortShuffleWriter.shouldBypassMergeSort 方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制,当该方法返回的条件不满足时,则通过 SortShuffleManager.canUseSerializedShuffle 方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制,而当这两个方法返回都为 false,即都不满足对应的条件时,会自动采用普通运行机制。

因此,当设置了 spark.shuffle.manager=tungsten-sort 时,也不能保证就一定采用基于 Tungsten Sort 的 Shuffle 实现机制。

要实现 Tungsten Sort Shuffle 机制需要满足以下条件:

Shuffle 依赖中不带聚合操作或没有对输出进行排序的要求。

Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)。

Shuffle 过程中的输出分区个数少于 16777216 个。

实际上,使用过程中还有其他一些限制,如引入 Page 形式的内存管理模型后,内部单条记录的长度不能超过 128 MB (具体内存模型可以参考 PackedRecordPointer 类)。另外,分区个数的限制也是该内存模型导致的。

所以,目前使用基于 Tungsten Sort Shuffle 实现机制条件还是比较苛刻的。

Spark 底层执行原理

Spark有哪些组件?

  • master:管理集群和节点,不参与计算。
  • worker:计算节点,进程本身不参与计算,和master汇报。
  • Driver:运行程序的main方法,创建spark context对象。
  • spark context:控制整个application的生命周期,包括dagsheduler和task scheduler等组件。
  • client:用户提交程序的入口。

Spark 运行流程

具体运行流程如下:

  1. 构建Application的运行环境,Driver创建一个SparkContext

  2. SparkContext 向资源管理器(Standalone、Mesos、Yarn)注册并申请运行 Executor
  3. 资源管理器分配 Executor,然后资源管理器启动 Executor
  4. Executor 发送心跳至资源管理器
  5. SparkContext 构建 DAG 有向无环图,DAGScheduler将DAG图解析成Stage,,由task Scheduler将Task发送给Executor运行
  6. DAGScheduler将DAG图解析成Stage,每个Stage有多个task,形成(TaskSet)发送给task Scheduler
  7. Executor 向 SparkContext 申请 Task
  8. TaskScheduler 将 Task 发送给 Executor 运行
  9. 同时 SparkContext 将应用程序代码发放给 Executor
  10. Task 在 Executor 上运行,运行完毕释放所有资源

从代码角度看 DAG 图的构建

Val lines1 = sc.textFile(inputPath1).map(...).map(...)
Val lines2 = sc.textFile(inputPath2).map(...)
Val lines3 = sc.textFile(inputPath3)
Val dtinone1 = lines2.union(lines3)
Val dtinone = lines1.join(dtinone1)
dtinone.saveAsTextFile(...)
dtinone.filter(...).foreach(...)

上述代码的 DAG 图如下所示:

Spark 内核会在需要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是如上图所示的 DAG。Spark 的计算发生在 RDD 的 Action 操作,而对 Action 之前的所有 Transformation,Spark 只是记录下 RDD 生成的轨迹,而不会触发真正的计算。

将 DAG 划分为 Stage 核心算法

一个 Application 可以有多个 job 多个 Stage:

Spark Application 中可以因为不同的 Action 触发众多的 job,一个 Application 中可以有很多的 job,每个 job 是由一个或者多个 Stage 构成的,后面的 Stage 依赖于前面的 Stage,也就是说只有前面依赖的 Stage 计算完毕后,后面的 Stage 才会运行。

划分依据:Stage 划分的依据就是宽依赖,像 reduceByKey,groupByKey 等算子,会导致宽依赖的产生。


核心算法:回溯算法

从后往前回溯/反向解析,遇到窄依赖加入本 Stage,遇见宽依赖进行 Stage 切分。

Spark 内核会从触发 Action 操作的那个 RDD 开始从后往前推,首先会为最后一个 RDD 创建一个 Stage,然后继续倒推,如果发现对某个 RDD 是宽依赖,那么就会将宽依赖的那个 RDD 创建一个新的 Stage,那个 RDD 就是新的 Stage 的最后一个 RDD。然后依次类推,继续倒推,根据窄依赖或者宽依赖进行 Stage 的划分,直到所有的 RDD 全部遍历完成为止。

将 DAG 划分为 Stage 剖析

一个 Spark 程序可以有多个 DAG(有几个 Action,就有几个 DAG,上图最后只有一个 Action(图中未表现),那么就是一个 DAG)。

一个 DAG 可以有多个 Stage(根据宽依赖/shuffle 进行划分)。

同一个 Stage 可以有多个 Task 并行执行(task 数=分区数,如上图,Stage1 中有三个分区 P1、P2、P3,对应的也有三个 Task)。

可以看到这个 DAG 中只 reduceByKey 操作是一个宽依赖,Spark 内核会以此为边界将其前后划分成不同的 Stage。

同时我们可以注意到,在图中 Stage1 中,从 textFile 到 flatMap 到 map 都是窄依赖,这几步操作可以形成一个流水线操作,通过 flatMap 操作生成的 partition 可以不用等待整个 RDD 计算结束,而是继续进行 map 操作,这样大大提高了计算的效率。

提交 Stages

调度阶段的提交,最终会被转换成一个任务集的提交,DAGScheduler 通过 TaskScheduler 接口提交任务集,这个任务集最终会触发 TaskScheduler 构建一个 TaskSetManager 的实例来管理这个任务集的生命周期,对于 DAGScheduler 来说,提交调度阶段的工作到此就完成了。

而 TaskScheduler 的具体实现则会在得到计算资源的时候,进一步通过 TaskSetManager 调度具体的任务到对应的 Executor 节点上进行运算。

任务调度总体诠释

补充一、scala 脚本 模版

SourceTable.scala

trait SourceTable {val BASIC_INFO = "basic_info"
}

OutputTable.scala

trait OutputTable {val TARGET_TABLE = "target_table"
}

GeneratorDataTask.scala

object GeneratorDataTask extend SourceTable with OutputTable {private val argsKeys = Set("dt", "env")val INDEX_DT: String = DuccUtil.getLastDt("last_dt");def main(args: Array[String]): Unit = {val spark = SparkSessionUtil.getSession("GeneratorDataTask")try {val properties = TaskUtil.getArgs(args, argsKeys)if (!properties.contains("dt") || !properties.contains("env")) {throw new Exception("必要参数为空")}val selectDF = spark.read.table(BASIC_INFO).where(s"dt = '$INDEX_DT'") selectDF.na.fill(0).write.mode(SaveMode.Overwrite).insertInto(TARGET_TABLE)   } catch {case e: Exception => {println("Excetion detail:", e)System.exit(1)}} finally {spark.stop()    }}

补充二、Spark任务运行流程

简单的说:

用户在client端提交作业后,会由Driver运行main方法并创建spark context上下文。调用RDD上的方法,形成dag图输入dagscheduler,按照rdd之间的依赖关系划分stage输入task scheduler。task scheduler会将stage划分为task set分发到各个节点的executor中执行。

再具体的说:

  1. 客户端提交作业
  2. Driver启动流程
  3. Driver申请资源并启动其余Executor(即Container)
  4. Executor启动流程
  5. 作业调度,生成stages与tasks。
  6. Task调度到Executor上,Executor启动线程执行Task逻辑
  7. Driver管理Task状态
  8. Task完成,Stage完成,作业完成

在工厂环境下,Spark 集群的部署方式一般为 YARN-Cluster 模式:

时序图:

  1. 提交一个Spark应用程序,首先通过Client向 ResourceManager 请求启动一个Application,同时检查是否有足够的资源满足 Application 的需求,如果资源条件满足,则准备 ApplicationMaster的启动上下文,交给ResourceManager,并循环监控Application状态。
  2. 当提交的资源队列中有资源时,ResourceManager 会在某个 NodeManager 上启动 ApplicationMaster 进程,ApplicationMaster 会单独启动 Driver 后台线程,当 Driver启动后,ApplicationMaster 会通过本地的 RPC 连接 Driver,并开始向 ResourceManager申请 Container 资源运行 Executor 进程(一个 Executor 对应与一个 Container),当ResourceManager 返回 Container 资源,ApplicationMaster 则在对应的 Container 上启动 Executor。
  3. Driver 线程主要是初始化 SparkContext 对象,准备运行所需的上下文,然后一方面保持与 ApplicationMaster 的 RPC 连接,通过 ApplicationMaster 申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲 Executor 上。Driver 初始化 SparkContext 过 程 中 , 会 分 别 初 始 化 DAGScheduler 、TaskScheduler、SchedulerBackend 以及 HeartbeatReceiver,并启动 SchedulerBackend以及 HeartbeatReceiver。SchedulerBackend 通过 ApplicationMaster 申请资源,并不断从 TaskScheduler 中拿到合适的 Task 分发到 Executor 执行。HeartbeatReceiver 负责接收 Executor 的心跳信息,监控 Executor 的存活状况,并通知到 TaskScheduler。
  4. 当 ResourceManager 向 ApplicationMaster 返回 Container 资源时,ApplicationMaster 就尝试在对应的 Container 上启动 Executor 进程,Executor 进程起来后,会向 Driver 反向注册,注册成功后保持与 Driver 的心跳,同时等待 Driver分发任务,当分发的任务执行完毕后,将任务状态上报给 Driver。

从任务调度的阶段来看四个步骤:

  • 1.构建DAG(调用RDD上的方法)
  • 2.DAGScheduler将DAG切分Stage(切分的依据是Shuffle),将Stage中生成的Task以TaskSet的形式给TaskScheduler
  • 3.TaskScheduler调度Task(根据资源情况将Task调度到相应的Executor中)
  • 4.Executor接收Task,然后将Task丢入到线程池中执行

补充三、为什么说与MapReduce相比,Spark运行效率更高?

reduceByKey的结果是一个新的RDD,其中每个键都唯一,与每个键相关联的值经过了聚合操作。

groupByKey的结果是一个新的RDD,其中每个键都与一个迭代器相关联,迭代器包含了与该键关联的所有值。 

spark借鉴了Mapreduce,继承了其分布式计算的优点并进行了改进,spark生态更为丰富,功能更为强大,性能更加适用范围广,mapreduce更简单,稳定性好。主要区别:

(1)spark把运算的中间数据(shuffle阶段产生的数据)存放在内存,迭代计算效率更高,mapreduce的中间结果需要落地,保存到磁盘

(2)Spark容错性高,它通过弹性分布式数据集RDD来实现高效容错,RDD是一组分布式的存储在 节点内存中的只读性的数据集,这些集合石弹性的,某一部分丢失或者出错,可以通过整个数据集的计算流程的血缘关系来实现重建,mapreduce的容错只能重新计算

(3)Spark更通用,提供了transformation和action这两大类的多功能api,另外还有流式处理spark streaming模块、图计算等等,mapreduce只提供了map和reduce两种操作,流计算及其他的模块支持比较缺乏

(4)Spark框架和生态更为复杂,有RDD,血缘lineage、执行时的有向无环图DAG,stage划分等,很多时候spark作业都需要根据不同业务场景的需要进行调优以达到性能要求,mapreduce框架及其生态相对较为简单,对性能的要求也相对较弱,运行较为稳定,适合长期后台运行。

(5)Spark计算框架对内存的利用和运行的并行度比mapreduce高,Spark运行容器为executor,内部ThreadPool中线程运行一个Task,mapreduce在线程内部运行container,container容器分类为MapTask和ReduceTask.程序运行并行度高

(6)Spark对于executor的优化,在JVM虚拟机的基础上对内存弹性利用:storage memory与Execution memory的弹性扩容,使得内存利用效率更高

补充四、RDD中reduceBykey与groupByKey哪个性能好?

reduceByKey是一个转换操作(transformation),它会对具有相同键的值进行聚合操作。这个操作会遍历数据集中的每个键值对,并将具有相同键的值传递给一个减少函数(reduce function),该函数会将这些值组合成一个单一的值。示例:

from pyspark import SparkContextsc = SparkContext("local", "ReduceByKey Example")data = [("a", 1), ("b", 1), ("a", 1), ("a", 1), ("b", 1), ("c", 1)]
rdd = sc.parallelize(data)# 使用reduceByKey进行聚合
result = rdd.reduceByKey(lambda x, y: x + y)print(result.collect())
# 输出: [('a', 3), ('b', 2), ('c', 1)]

groupByKey也是一个转换操作,它会将具有相同键的所有值组合在一起,并返回一个键值对,其中键是原始键,值是一个迭代器,包含了所有具有该键的值。 示例:

from pyspark import SparkContextsc = SparkContext("local", "GroupByKey Example")data = [("a", 1), ("b", 1), ("a", 1), ("a", 1), ("b", 1), ("c", 1)]
rdd = sc.parallelize(data)# 使用groupByKey进行分组
grouped = rdd.groupByKey()# 对分组后的结果进行转换,以便查看
result = grouped.mapValues(lambda values: list(values))print(result.collect())
# 输出: [('a', [1, 1, 1]), ('b', [1, 1]), ('c', [1])]

reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。

groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。

所以在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还可以防止使用groupByKey造成的内存溢出问题。

补充五、Shuffle数据块有多少种不同的存储方式?

  1. RDD数据块:用来存储所缓存的RDD数据。
  2. Shuffle数据块:用来存储持久化的Shuffle数据。
  3. 广播变量数据块:用来存储所存储的广播变量数据。
  4. 任务返回结果数据块:用来存储在存储管理模块内部的任务返回结果。通常情况下任务返回结果随任务一起通过Akka返回到Driver端。但是当任务返回结果很大时,会引起Akka帧溢出,这时的另一种方案是将返回结果以块的形式放入存储管理模块,然后在Driver端获取该数据块即可,因为存储管理模块内部数据块的传输是通过Socket连接的,因此就不会出现Akka帧溢出了。
  5. 流式数据块:只用在Spark Streaming中,用来存储所接收到的流式数据块

补充六、Scala里trait有什么功能,与class有何异同?什么时候用trait什么时候该用class?

trait可以被继承,而且支持多重继承,其实它更像我们熟悉的接口(interface),但它与接口又有不同之处是:trait中可以写方法的实现,interface不可以(java8开始支持接口中允许写方法实现代码了),这样看起来trait又很像抽象类。

补充七、Scala 语法中to 和 until有啥区别?

to 包含上界,until不包含上界。

补充八、Scala伴生对象和伴生类

单例对象与类同名时,这个单例对象被称为这个类的伴生对象,而这个类被称为这个单例对象的伴生类。伴生类和伴生对象要在同一个源文件中定义,伴生对象和伴生类可以互相访问其私有成员。不与伴生类同名的单例对象称为孤立对象。

import scala.collection.mutable.Mapclass ChecksumAccumulator {private var sum = 0def add(b: Byte) {sum += b}def checksum(): Int = ~(sum & 0xFF) + 1
}object ChecksumAccumulator {private val cache = Map[String, Int]()def calculate(s: String): Int =if (cache.contains(s))cache(s)else {val acc = new ChecksumAccumulatorfor (c <- s)acc.add(c.toByte)val cs = acc.checksum()cache += (s -> cs)println("s:"+s+" cs:"+cs)cs}def main(args: Array[String]) {println("Java 1:"+calculate("Java"))println("Java 2:"+calculate("Java"))println("Scala :"+calculate("Scala"))}
}

补充九、Spark 中 Partition,Task,core,Executor的个数决定因素和关系?

Partition 是 Spark RDD 计算的最小单元,决定了计算的并发度。分区数如果远小于集群可用的 CPU 数,不利于发挥 Spark 的性能,还容易导致数据倾斜等问题。分区数如果远大于集群可用的 CPU 数,会导致资源分配的时间过长,从而影响性能。

默认情况下,Task 的数量是由 Partition 决定的,RDD 计算时,每个分区会启一个 Task,所以 RDD 的分区数决定了总 Task 数。

补充十、介绍一下join操作优化经验?

join其实常见的就分为两类: map join 和 reduce join。

当大表和小表join时,用map join能显著提高效率。将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘IO消耗,运行效率极其低下,这个过程一般被称为 reduce join。如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。

原则就是尽量不进行reduce join。

两个大表之间进行join操作,影响性能的主要因素是数据倾斜,我们要进行尽量保证join的两张表发送到executor的数据的数量是一样的,而这个可以通过distributed by join(条件列)进行,这样可以提前把两个表的数据按照条件列分布好,在进行join操作时就不会发生数据倾斜的问题了

ps: distributed by 条件列 是把数据按照条件列进行分区,分区的数量由set spark.sql.shuffle.partitions=600; 进行控制,此外,即使不是用于join操作,遇到表数据倾斜是我们也可以使用,例如:select * from Table distribute by rand(); 这样就可以保证每个分区的数据基本一致了。

补充十一、Spark的数据本地性有哪几种?

Spark中的数据本地性有三种:

1)PROCESS_LOCAL是指读取缓存在本地节点的数据

2)NODE_LOCAL是指读取本地节点硬盘数据

3)ANY是指读取非本地节点数据

通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关,如果RDD经常用的话将该RDD cache到内存中,注意,由于cache是lazy的,所以必须通过一个action的触发,才能真正的将该RDD cache到内存中。

补充十二、scala代码实现WordCount

val conf = new SparkConf()   
val sc = new SparkContext(conf)   
val line = sc.textFile("xxxx.txt") 
line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_). collect().foreach(println)
sc.stop()

相关文章:

Spark生态圈

Spark 主要用于替代Hadoop中的 MapReduce 计算模型。存储依然可以使用 HDFS&#xff0c;但是中间结果可以存放在内存中&#xff1b;调度可以使用 Spark 内置的&#xff0c;也可以使用更成熟的调度系统 YARN 等。 Spark有完善的生态圈&#xff1a; Spark Core&#xff1a;实现了…...

如何计算相位差

如何计算相位差 假设我们有两个同频率的正弦信号&#xff1a; 这里两个信号的角频率w2πf是相同的&#xff0c;根据同频正弦信号相位差的计算方法&#xff0c;直接用两个信号的相位相减。 再来看利用波形图计算相位差的例子&#xff1a; 另一种计算方式&#xff1a;...

Bash Shell知识合集

1. chmod命令 创建一个bash shell脚本 hello.sh ~script $ touch hello.sh脚本创建完成后并不能直接执行&#xff0c;我们要用chmod命令授予它可执行的权限&#xff1a; ~script $ chmod 755 hello.sh授权后的脚本可以直接执行&#xff1a; ~script $ ./hello.sh2.指定运行…...

《信管通低代码信息管理系统开发平台》Windows环境安装说明

1 简介 《信管通低代码信息管理系统应用平台》提供多环境软件产品开发服务&#xff0c;包括单机、局域网和互联网。我们专注于适用国产硬件和操作系统应用软件开发应用。为事业单位和企业提供行业软件定制开发&#xff0c;满足其独特需求。无论是简单的应用还是复杂的系统&…...

如何查看服务器内存占用情况?

如何查看服务器的内存占用情况&#xff1f;你知道内存使用情况对服务器性能的重要性吗&#xff1f;内存是服务器运行的核心资源之一&#xff0c;了解内存的占用情况可以帮助你优化系统性能。 要查看服务器的内存占用情况&#xff0c;首先需要确定你使用的是哪种操作系统。不同…...

【源码】Sharding-JDBC源码分析之SQL中影子库ShadowSQLRouter路由的原理

Sharding-JDBC系列 1、Sharding-JDBC分库分表的基本使用 2、Sharding-JDBC分库分表之SpringBoot分片策略 3、Sharding-JDBC分库分表之SpringBoot主从配置 4、SpringBoot集成Sharding-JDBC-5.3.0分库分表 5、SpringBoot集成Sharding-JDBC-5.3.0实现按月动态建表分表 6、【…...

OCR实践-Table-Transformer

前言 书接上文 OCR实践—PaddleOCR Table-Transformer 与 PubTables-1M table-transformer&#xff0c;来自微软&#xff0c;基于Detr&#xff0c;在PubTables1M 数据集上进行训练&#xff0c;模型是在提出数据集同时的工作&#xff0c; paper PubTables-1M: Towards comp…...

代码随想录五刷day6

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、力扣144. 二叉树的前序遍历(递归)二、力扣144. 二叉树的前序遍历(迭代)三、力扣145. 二叉树的后序遍历(递归)四、力扣145. 二叉树的后序遍历(迭代)五、力扣…...

【自信息、信息熵、联合熵、条件熵、互信息】

文章目录 一、自信息 I(X)二、信息熵&#xff1a;衡量系统的混乱程度信息熵 H(X)联合熵 H(X,Y) 三、条件熵H(Y|X) 联合熵H(X,Y) - 信息熵H(X)四、互信息 I(X,Y)五、总结References 一、自信息 I(X) 自信息(Self-information) 是由香农提出的&#xff0c;用来衡量单一事件发生…...

我的秋招总结

我的秋招总结 个人背景 双非本&#xff0c;985硕&#xff0c;科班 准备情况 以求职为目的学习Java的时间大概一年。 八股&#xff0c;一开始主要是看B站黑马的八股文课程&#xff0c;背JavaGuide和小林coding还有面试鸭。 算法&#xff0c;250&#xff0c;刷了3遍左右 项目&…...

page_ref_freeze浅析

最近在研究struct page的引用计数refcount&#xff0c;看到有个page_ref_freeze()特性很有意思。用这篇博客记录一下。 本文分析基于linux4.19.195 static inline int page_ref_freeze(struct page *page, int count) {int ret likely(atomic_cmpxchg(&page->_refcoun…...

Python毕业设计选题:基于python的酒店推荐系统_django+hadoop

开发语言&#xff1a;Python框架&#xff1a;djangoPython版本&#xff1a;python3.7.7数据库&#xff1a;mysql 5.7数据库工具&#xff1a;Navicat11开发软件&#xff1a;PyCharm 系统展示 管理员登录 管理员功能界面 用户管理 酒店客房管理 客房类型管理 客房预定管理 用户…...

选择 SquashFS 作为启动分区的文件系统格式:详细教程

SquashFS 是一种高压缩率的只读文件系统,广泛用于嵌入式系统、Linux 发行版以及其他需要节省存储空间的场景。它特别适合用于启动分区、只读根文件系统(rootfs)等应用,因为它通过压缩技术极大地节省了存储空间。在本博客中,我们将详细介绍如何在 RK3568 等嵌入式设备上使用…...

Unity 读Excel,读取xlsx文件解决方案

Unity读取表格数据 效果&#xff1a; 思路&#xff1a; Unity可以解析Json&#xff0c;但是读取Excel需要插件的帮助&#xff0c;那就把这个功能分离开&#xff0c;读表插件就只管读表转Json&#xff0c;Unity就只管Json解析&#xff0c;中间需要一个存储空间&#xff0c;使用…...

【C语言】指针数组、数组指针、函数指针、指针函数、函数指针数组、回调函数

【C语言】函数指针与指针函数 文章目录 [TOC](文章目录) 前言一、指针数组二、数组指针三、函数指针四、指针函数五、函数指针数组六、回调函数七、参考资料总结 前言 使用工具&#xff1a; 1.DEVC 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、…...

EasyExcel 模板+公式填充

使用 CellWriteHandler 的实现类来实现公式写入 Data NoArgsConstructor public class CustomCellWriteHandler implements CellWriteHandler {private int maxRowNum 2000;// 动态传入列表数量public CustomCellWriteHandler(int maxRowNum) {this.maxRowNum maxRowNum;}Ov…...

vue最新源码探索分析

我在github上fork了最新版本vue3.5版本的源码并做了大幅删除&#xff0c;保留最核心的代码&#xff0c;有兴趣的可以看看&#xff0c;欢迎大家提出PR 仓库地址 https://github.com/greatanimalion/core 本项目vue版本3.5.13 为了方便查看与分析&#xff0c;减少心智负担 已…...

产品初探Devops!以及AI如何赋能Devops?

DevOps源自Development&#xff08;开发&#xff09;和Operations&#xff08;运维&#xff09;的组合&#xff0c;是一种新的软件工程理念&#xff0c;旨在打破传统软件工程方法中“开发->测试->运维”的割裂模式&#xff0c;强调端到端高效一致的交付流程&#xff0c;实…...

深入解析MVCC中Undo Log版本底层存储读取逻辑

一、引言 多版本并发控制&#xff08;MVCC&#xff0c;Multi-Version Concurrency Control&#xff09;是一种广泛应用于关系数据库管理系统中的并发控制技术。它通过保存数据的历史版本&#xff0c;使得在事务并发执行时&#xff0c;每个事务都能看到数据的一致性视图。在MVC…...

【生产问题记录-Mysql分区】

描述 月报是根据日报的数据统计出来的&#xff0c;但是今天早上发现月报没有数据&#xff0c;日报是有数据的&#xff0c;那么为什么会导致这个结果呢&#xff1f; 问题解决 设计 因为日报table_day每天的数据量都在60w&#xff0c;所以我们采用了分区的形式&#xff0c;进…...

大型语言模型(LLMs)演化树 Large Language Models

大型语言模型&#xff08;LLMs&#xff09;演化树 Large Language Models flyfish 下面的图来自论文地址 Transformer 模型&#xff08;如 BERT 和 GPT-3&#xff09;已经给自然语言处理&#xff08;NLP&#xff09;领域带来了革命性的变化。这得益于它们具备并行化能力&…...

【LeetCode: 3159. 查询数组中元素的出现位置 + 统计下标】

&#x1f680; 算法题 &#x1f680; &#x1f332; 算法刷题专栏 | 面试必备算法 | 面试高频算法 &#x1f340; &#x1f332; 越难的东西,越要努力坚持&#xff0c;因为它具有很高的价值&#xff0c;算法就是这样✨ &#x1f332; 作者简介&#xff1a;硕风和炜&#xff0c;…...

git自动压缩提交的脚本

可以将当前未提交的代码自动执行 git addgit commitgit squash Git 命令安装指南 1. 创建脚本目录 如果目录不存在&#xff0c;创建它&#xff1a; mkdir -p ~/.local/bin2. 创建脚本文件 vim ~/.local/bin/git-squash将完整的脚本代码复制到此文件中。 3. 设置脚本权限…...

uniapp中Nvue白屏问题 ReferenceError: require is not defined

uniapp控制台输出如下 exception function:createInstanceContext, exception:white screen cause create instanceContext failed,check js stack ->Uncaught ReferenceError: require is not defined 或者 exception function:createInstanceContext, exception:white s…...

Centos8安装图形化界面

由于Centos8已经停止维护&#xff0c;所以在使用的时候会遇到yum元数据找不到的情况 1、更新yum数据源 进入目录&#xff1a; cd /etc/yum.repos.d/ 修改文件&#xff1a; sed -i s/mirrorlist/#mirrorlist/g /etc/yum.repos.d/CentOS-* sed -i s|#baseurlhttp://mirror.cent…...

2023年厦门市第30届小学生C++信息学竞赛复赛上机操作题(三、2023C. 太空旅行(travel))

#include <bits/stdc.h>using namespace std;struct Ship {int u; // 从地球到火星的时间int v; // 从火星到天王星的时间 };// 自定义比较函数 bool cmp(const Ship &a, const Ship &b) {return a.u max(a.v, b.u) b.v < b.u max(b.v, a.u) a.v; }int ma…...

Doris的SQL原理解析

今天来介绍下Doris的SQL原理解析&#xff0c;主要从语法、解析、分析、执行等几个方面来介绍&#xff0c;可以帮助大家对Doris底层有个清晰的理解~ 一、Doris简介 Apache Doris是一个基于MPP架构的高性能、实时的分析型数据库&#xff0c;能够较好的满足报表分析、即席查询、…...

【RAG实战】语言模型基础

语言模型赋予了计算机理解和生成人类语言的能力。它结合了统计学原理和深度神经网络技术&#xff0c;通过对大量的样本数据进行复杂的概率分布分析来学习语言结构的内在模式和相关性。具体地&#xff0c;语言模型可根据上下文中已出现的词序列&#xff0c;使用概率推断来预测接…...

探索 .idea 文件夹:Java Maven 工程的隐形守护者

一、.idea文件夹深度解析&#xff1a;IntelliJ IDEA项目配置的核心 在Java Maven工程的开发环境中&#xff0c;.idea文件夹扮演着举足轻重的角色。这是IntelliJ IDEA项目特有的一个配置文件夹&#xff0c;它包含了项目所需的各种配置信息&#xff0c;以确保项目能够在不同的开…...

JAVA代理模式和适配器模式

文章目录 Java 代理模式和适配器模式代理模式&#xff08;Proxy Pattern&#xff09;适配器模式&#xff08;Adapter Pattern&#xff09;代理模式和适配器模式的区别 代理模式的使用举例静态代理实现:用代理模式记录方法调用日志动态代理实现:使用 Java 动态代理记录方法调用日…...

Python大数据可视化:基于python大数据的电脑硬件推荐系统_flask+Hadoop+spider

开发语言&#xff1a;Python框架&#xff1a;flaskPython版本&#xff1a;python3.7.7数据库&#xff1a;mysql 5.7数据库工具&#xff1a;Navicat11开发软件&#xff1a;PyCharm 系统展示 管理员登录 管理员功能界面 价格区间界面 用户信息界面 品牌管理 笔记本管理 电脑主机…...

【YOLOv3】源码(train.py)

概述 主要模块分析 参数解析与初始化 功能&#xff1a;解析命令行参数&#xff0c;设置训练配置项目经理制定详细的施工计划和资源分配日志记录与监控 功能&#xff1a;初始化日志记录器&#xff0c;配置监控系统项目经理使用监控和记录工具&#xff0c;实时跟踪施工进度和质量…...

一维、线性卡尔曼滤波的例程(MATLAB)

这段 MATLAB 代码实现了一维线性卡尔曼滤波器的基本功能&#xff0c;用于估计在存在噪声的情况下目标状态的真实值 文章目录 一维线性卡尔曼滤波代码运行代码介绍1. **初始化部分**2. **数据生成**3. **卡尔曼滤波器实现**4. **结果可视化**5. **统计输出** 源代码 总结 一维线…...

【Rust自学】6.2. Option枚举

喜欢的话别忘了点赞、收藏加关注哦&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 6.2.1. 什么是Option枚举 它定义于标准库中&#xff0c;在Prelude&#xff08;预导入模块&#xff09;中&#xff0c;负责描述这样的场景…...

unity学习1:第1个template的项目platformer 学习

目录 0 教训&#xff0c;不要学生思路&#xff1a;路径依赖 1 从unity的编辑器里直接下载一个template 2 第一个下马威&#xff1a;下载到本地的这个模板项目第一次运行就报错, 其次关了重进就好了 2.1 报错 2.2 解决 2.3 解决 3 第2个拦路虎&#xff1a; 项目的声音大小…...

初识 Conda:一站式包管理和环境管理工具

文章目录 1. 什么是 Conda&#xff1f;2. 为什么选择 Conda&#xff1f;3. Conda 的安装3.1 安装步骤&#xff08;以 Miniconda 为例&#xff09; 4. Conda 的核心功能4.1 包管理4.2 环境管理4.3 Conda Forge4.4 设置国内镜像 5. 常见使用场景5.1 数据科学项目5.2 离线安装5.3 …...

vue.js 组件化开发 根组件

Vue.js是一个用于构建用户界面的渐进式JavaScript框架。组件化开发是Vue.js的核心理念之一&#xff0c;它允许开发者将部分代码封装为可重用的组件&#xff0c;从而提高代码的复用性和可维护性。而根组件是Vue.js应用的最顶层组件&#xff0c;它包含了其他所有的组件。 下面详…...

ASP.NET WebForms:实现全局异常捕获与处理的最佳实践

在ASP.NET WebForms中&#xff0c;你可以通过以下方法来统一捕获后台异常&#xff1a; 1. 在Global.asax中使用Application_Error Global.asax文件允许你处理应用程序级别的异常。你可以在Application_Error事件中捕获所有未处理的异常&#xff0c;并根据需要记录或处理它们。…...

vue3配置测试环境、开发环境、生产环境

第一步&#xff1a;在src同级新建 .env.production 、.env.test 、.env.development文件 第二步&#xff1a;在文件中配置开发环境、生产环境、测试环境 // 开发环境 .env.developmentNODE_ENV developmentVUE_APP_MODE development outputDir dist_dev // 打出包的名称VUE_…...

农历节日倒计时:基于Python的公历与农历日期转换及节日查询小程序(升级版)

农历节日倒计时&#xff1a;基于Python的公历与农历日期转换及节日查询小程序升级版 调整的功能 上一个小程序只是能计算当年的农历节日的间隔时间&#xff0c;那么这次修改一下&#xff0c;任意年份的农历节日都可以&#xff0c;并且能输出农历节日对应的阳历日期&#xff0…...

linux Python环境部署

登录Python官网去下载对应的版本&#xff1a;Python下载地址 在data目录下创建python文件夹 mkdir python上传下载的安装包 Python-3.8.18.tgz解压 tar -xf Python-3.8.18.tgz进入解压后的目录 cd Python-3.8.18/编译安装 ./configure --prefix/data/python38 make &&…...

Python基础语法知识——数据类型的查询、数据类型转化

今天第一次学习python&#xff0c;之前学习过C&#xff0c;感觉学习起来还可以&#xff0c;就是刚用的时候有点手残&#xff0c;想的是python代码&#xff0c;结果写出来就是C,本人决定每天抽出时间写点。同时继续更新NX二次开发专栏学习&#xff0c;话不多说&#xff0c;晚上的…...

命令行之巅:Linux Shell编程的至高艺术(中)

文章一览 前言一、输入/输出及重定向命令1.1 输入/输出命令1.1.1 read命令1.1.2 echo命令 1.2 输入/输出重定向1.3 重定向深入讲解1.4 Here Document1.4.1 /dev/null 文件 二、shell特殊字符和命令语法2.1 引号2.1.1 双引号2.1.2 单引号2.1.3 倒引号 2.2 注释、管道线和后台命令…...

利用Gurobi追溯模型不可行原因的四种方案及详细案例

文章目录 1. 引言2. 追溯不可行集的四种方法2.1 通过约束增减进行判断2.2 通过computeIIS函数获得冲突集2.3 利用 feasRelaxS() 或 feasRelax() 函数辅助排查2.4 利用 IIS Force 属性1. 引言 模型不可行是一个让工程师头疼的问题,对于复杂模型而言,导致模型不可行的原因可能…...

「matplotlib」绘制图线和数据点的样式风格和颜色表大全

绘制图线和数据点的样式风格和颜色表大全 显示图例 legend() 属性 linestyle 属性 marker 属性color 1、legend() 显示坐标轴中图线的对应标注的显示位置plt.legend(loc) locloc codebest &#xff08;default&#xff09;0upper right1upper left2lower left3lower rig…...

【基础还得练】 KKT 条件

优秀教程-真正理解拉格朗日乘子法和 KKT 条件&#xff1a; link优秀教程-最优化(6)&#xff1a;一般约束优化问题的最优性理论&#xff1a; link KKT条件&#xff08;Karush-Kuhn-Tucker条件&#xff09;是非线性规划中的一组必要条件&#xff0c;在某些情况下也是最优解的充分…...

Node.JS 版本管理工具 Fnm 安装及配置(Windows)

安装流程可参考&#xff1a;fnm 安装及配置(Windows)_fnm安装-CSDN博客 然后就是在git bash如何生效 在 Git Bash 中使用 fnm 需要确保你正确设置了环境变量。你可以按照以下步骤进行配置&#xff1a; 1. **打开 Git Bash**&#xff1a; 启动 Git Bash。 2. **编辑 .bas…...

STM32-笔记11-手写带操作系统的延时函数

1、为什么带操作系统的延时函数&#xff0c;和笔记10上的延时函数不能使用同一种&#xff1f; 因为笔记10的延时函数在每次调用的时候&#xff0c;会一直开关定时器&#xff0c;而在FreeRTOS操作系统中&#xff0c;SysTick定时器当作时基使用。 时基是一个时间显示的基本单位。…...

CCNP_SEC_ASA 第六天作业

实验需求&#xff1a; 为保障内部用户能够访问Internet&#xff0c;请把10.1.1.0/24网络动态转换到外部地址池202.100.1.100-202.100.1.200&#xff0c;如果地址池耗尽后&#xff0c;PAT到Outside接口 提示&#xff1a;需要看到如下输出信息 Inside#telnet 202.100.1.1 Trying …...

Python小括号( )、中括号[ ]和大括号{}代表什么

python语言最常见的括号有三种&#xff0c;分别是&#xff1a;小括号( )、中括号[ ]和大括号也叫做花括号{ }&#xff0c;分别用来代表不同的python基本内置数据类型。 小括号&#xff08;&#xff09;&#xff1a;struct结构体&#xff0c;但不能改值 python中的小括号( )&am…...