当前位置: 首页 > news >正文

Spark核心之02:常用算子详解

1、RDD操作详解

# 启动spark-shell
spark-shell --master local[2] 

1.1 基本转换

1) map

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> val b = a.map(x => x2)
scala> a.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。

2) filter

filter 是对RDD中的每个元素都执行一个指定的函数来过滤产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

val rdd = sc.parallelize(List(1,2,3,4,5,6))  
val filterRdd = rdd.filter(_ > 5)
filterRdd.collect() //返回所有大于5的数据的一个Array, Array(6,8,10,12)

3) flatMap

与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)

scala> val a = sc.parallelize(1 to 4, 2)
scala> val b = a.flatMap(x => 1 to x)
scala> b.collect
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

⭐️4) mapPartitions

mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 它的函数定义为:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]

f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数ff的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) {val cur = iter.nextres.::=(pre, cur)pre = cur  } res.iterator}
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。 mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。

5) mapPartitionsWithIndex

def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
//rdd1有两个分区
scala> var rdd2 = rdd1.mapPartitionsWithIndex{|     (x,iter) => {|       var result = List[String]()|       var i = 0|       while(iter.hasNext){|        i += iter.next()|       }|       result.::(x + "|" + i).iterator|     }| }//rdd2将rdd1中每个分区的数字累加,并在每个分区的累加结果前面加了分区索引
scala> rdd2.collect
res13: Array[String] = Array(0|3, 1|12)  //p-0(1,2) p-1(3,4,5)

🈂️好像没用了🈂️6) mapWith

mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]

第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;

第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。

举例:把partition index 乘以10加2,作为新的RDD的元素。

val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3)
x.mapWith(a => a * 10)((b, a) => (b,a + 2)).collect 

结果:

(1,2)

(2,2)

(3,2)

(4,12)

(5,12)

(6,12)

(7,22)

(8,22)

(9,22)

(10,22)

🈂️好像没用了🈂️7) flatMapWith

flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

举例:

scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9)

⭐️8) coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]

该函数用于将RDD进行重分区,使用HashPartitioner。

第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;

以下面的例子来看:

scala> var data = sc.parallelize(1 to 12, 3) 
scala> data.collect 
scala> data.partitions.size 
scala> var rdd1 = data.coalesce(1) 
scala> rdd1.partitions.size 
scala> var rdd1 = data.coalesce(4) 
scala> rdd1.partitions.size
res2: Int = 1   //如果重分区的数目大于原来的分区数,那么必须指定shuffle参数为true,//否则,分区数不便
scala> var rdd1 = data.coalesce(4,true) 
scala> rdd1.partitions.size
res3: Int = 4

⭐️9) repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

该函数其实就是coalesce函数第二个参数为true的实现

scala> var data = sc.parallelize(1 to 12, 3) 
scala> data.collect 
scala> data.partitions.size 
scala> var rdd1 = data. repartition(1) 
scala> rdd1.partitions.size 
scala> var rdd1 = data. repartition(4) 
scala> rdd1.partitions.size
res3: Int = 4

10) randomSplit

def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]]

该函数根据weights权重,将一个RDD切分成多个RDD。

该权重参数为一个Double数组

第二个参数为random的种子,基本可忽略。

scala> var rdd = sc.makeRDD(1 to 12,12)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at makeRDD at :21scala> rdd.collect
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)  scala> var splitRDD = rdd.randomSplit(Array(0.5, 0.1, 0.2, 0.2))
splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[17] at randomSplit at :23, 
MapPartitionsRDD[18] at randomSplit at :23, 
MapPartitionsRDD[19] at randomSplit at :23, 
MapPartitionsRDD[20] at randomSplit at :23)//这里注意:randomSplit的结果是一个RDD数组
scala> splitRDD.size
res8: Int = 4
//由于randomSplit的第一个参数weights中传入的值有4个,因此,就会切分成4个RDD,
//把原来的rdd按照权重0.5, 0.1, 0.2, 0.2,随机划分到这4个RDD中,权重高的RDD,划分到//的几率就大一些。
//注意,权重的总和加起来为1,否则会不正常 
scala> splitRDD(0).collect
res10: Array[Int] = Array(1, 4)scala> splitRDD(1).collect
res11: Array[Int] = Array(3)                                                    scala> splitRDD(2).collect
res12: Array[Int] = Array(5, 9)scala> splitRDD(3).collect
res13: Array[Int] = Array(2, 6, 7, 8, 10)

11) glom

def glom(): RDD[Array[T]]

该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素

scala> var rdd = sc.makeRDD(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at makeRDD at :21
scala> rdd.partitions.size
res33: Int = 3  //该RDD有3个分区
scala> rdd.glom().collect
res35: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
//glom将每个分区中的元素放到一个数组中,这样,结果就变成了3个数组

⭐️12) union并集

val rdd1 = sc.parallelize(List(5, 6, 4, 3))val rdd2 = sc.parallelize(List(1, 2, 3, 4))//求并集val rdd3 = rdd1.union(rdd2)rdd3.collect

⭐️13) distinct去重

val rdd1 = sc.parallelize(List(5, 6, 4, 3))val rdd2 = sc.parallelize(List(1, 2, 3, 4))//求并集val rdd3 = rdd1.union(rdd2)//去重输出rdd3.distinct.collect

⭐️14) intersection交集

val rdd1 = sc.parallelize(List(5, 6, 4, 3))val rdd2 = sc.parallelize(List(1, 2, 3, 4))//求交集val rdd4 = rdd1.intersection(rdd2) rdd4.collect

⭐️15) subtract

def subtract(other: RDD[T]): RDD[T]def subtract(other: RDD[T], numPartitions: Int): RDD[T]def subtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]

该函数返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。

val rdd1 = sc.parallelize(List(5, 6, 6, 4, 3))val rdd2 = sc.parallelize(List(1, 2, 3, 4))//求差集val rdd4 = rdd1.subtract(rdd2)rdd4.collect

16) subtractByKey

def subtractByKey[W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]def subtractByKey[W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]def subtractByKey[W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]

subtractByKey和基本转换操作中的subtract类似,只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素。

参数numPartitions用于指定结果的分区数

参数partitioner用于指定分区函数

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2) scala> rdd1.subtractByKey(rdd2).collectres13: Array[(String, String)] = Array((B,2))

17) groupbyKey

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求并集
val rdd4 = rdd1 union rdd2
//按key进行分组
val rdd5 = rdd4.groupByKe
rdd5.collect

18) reduceByKey

顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

举例:

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
//求并集
val rdd4 = rdd1 union rdd2
//按key进行分组
val rdd6 = rdd4.reduceByKey(_ + _)
rdd6.collect()

19) sortByKey

将List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1))和List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5))做wordcount,并按名称排序

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))val rdd3 = rdd1.union(rdd2)//按key进行聚合val rdd4 = rdd3.reduceByKey(_ + _)//false降序val rdd5 = rdd4.sortByKey(false)rdd5.collect

20) sortBy

将List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1))和List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5))做wordcount,并按数值排序

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2), ("shuke", 1)))val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 3), ("shuke", 2), ("kitty", 5)))val rdd3 = rdd1.union(rdd2)//按key进行聚合val rdd4 = rdd3.reduceByKey(_ + _)//false降序val rdd5 = rdd4.sortBy(_._2, false)rdd5.collect

21) zip

def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

scala> var rdd1 = sc.makeRDD(1 to 5,2)rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21scala> rdd1.zip(rdd2).collectres0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))      scala> rdd2.zip(rdd1).collectres1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21scala> rdd1.zip(rdd3).collectjava.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions//如果两个RDD分区数不同,则抛出异常

22) zipPartitions

zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。

该函数有好几种实现,可分为三类:

1. 参数是一个RDD
def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息

映射方法f参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21//rdd1两个分区中元素分布:
scala> rdd1.mapPartitionsWithIndex{|     (x,iter) => {|      var result = List[String]()|       while(iter.hasNext){|        result ::= ("part_" + x + "|" + iter.next())|       }|       result.iterator|       |     }|    }.collect
res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)//rdd2两个分区中元素分布
scala> rdd2.mapPartitionsWithIndex{|     (x,iter) => {|      var result = List[String]()|       while(iter.hasNext){|        result ::= ("part_" + x + "|" + iter.next())|       }|       result.iterator|       |     }|    }.collect
res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)//rdd1和rdd2做zipPartition
scala> rdd1.zipPartitions(rdd2){|    (rdd1Iter,rdd2Iter) => {|     var result = List[String]()|     while(rdd1Iter.hasNext && rdd2Iter.hasNext) {|      result::=(rdd1Iter.next() + "_" + rdd2Iter.next())|     }|     result.iterator|    }|   }.collect
res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)
2. 参数是两个RDD
def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21//rdd3中个分区元素分布
scala> rdd3.mapPartitionsWithIndex{|     (x,iter) => {|      var result = List[String]()|       while(iter.hasNext){|        result ::= ("part_" + x + "|" + iter.next())|       }|       result.iterator|       |     }|    }.collect
res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)//三个RDD做zipPartitions
scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){|    (rdd1Iter,rdd2Iter,rdd3Iter) => {|     var result = List[String]()|     while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {|      result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())|     }|     result.iterator|    }|   }
rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27scala> rdd4.collect
res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)
3. 参数是三个RDD
def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

用法同上面,只不过这里又多了个一个RDD而已。

23) zipWithIndex

def zipWithIndex(): RDD[(T, Long)]

该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。

scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21scala> rdd2.zipWithIndex().collectres27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))

24) zipWithUniqueId

def zipWithUniqueId(): RDD[(T, Long)]

该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:

每个分区中第一个元素的唯一ID值为:该分区索引号,

每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)

看下面的例子:

scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21//rdd1有两个分区,scala> rdd1.zipWithUniqueId().collectres32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))//总分区数为2//第一个分区第一个元素ID为0,第二个分区第一个元素ID为1//第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4//第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5

1.2 键值转换

⭐️25) partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]

该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。

scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21scala> rdd1.partitions.size
res20: Int = 2//查看rdd1中每个分区的元素
scala> rdd1.mapPartitionsWithIndex{|     (partIdx,iter) => {|      var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()|       while(iter.hasNext){|        var part_name = "part_" + partIdx;|        var elem = iter.next()|        if(part_map.contains(part_name)) {|         var elems = part_map(part_name)|         elems ::= elem|         part_map(part_name) = elems|        } else {|         part_map(part_name) = List[(Int,String)]{elem}|        }|       }|       part_map.iterator|       |     }|    }.collect
res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))//(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中//使用partitionBy重分区
scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23scala> rdd2.partitions.size
res23: Int = 2//查看rdd2中每个分区的元素
scala> rdd2.mapPartitionsWithIndex{|     (partIdx,iter) => {|      var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()|       while(iter.hasNext){|        var part_name = "part_" + partIdx;|        var elem = iter.next()|        if(part_map.contains(part_name)) {|         var elems = part_map(part_name)|         elems ::= elem|         part_map(part_name) = elems|        } else {|         part_map(part_name) = List[(Int,String)]{elem}|        }|       }|       part_map.iterator|     }|    }.collect
res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))//(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中

26) mapValues

mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。

举例:

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)scala> val b = a.map(x => (x.length, x))scala> b.mapValues("x" + _ + "x").collectres5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))

27) flatMapValues

flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。

举例

val a = sc.parallelize(List((1, 2), (3, 4), (5, 6)))val b = a.flatMapValues(x => 1.to(x))b.collect.foreach(println)

28) combineByKey

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。

其中的参数:

createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C ,分区内相同的key做一次

mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C,分区内相同的key循环做

mergeCombiners:分区合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C,分区之间循环做

numPartitions:结果RDD分区数,默认保持原有的分区数

partitioner:分区函数,默认为HashPartitioner

mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true

看下面例子:

scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21 scala> rdd1.combineByKey(|    (v : Int) => v + "_",  |    (c : String, v : Int) => c + "@" + v,  |    (c1 : String, c2 : String) => c1 + "$" + c2|   ).collect
res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))

其中三个映射函数分别为:

createCombiner: (V) => C

(v : Int) => v + “” //在每一个V值后面加上字符,返回C类型(String)

mergeValue: (C, V) => C

(c : String, v : Int) => c + “@” + v //合并C类型和V类型,中间加字符@,返回C(String)

mergeCombiners: (C, C) => C

(c1 : String, c2 : String) => c1 + “ ” + c 2 / / 合并 C 类型和 C 类型,中间加 ” + c2 //合并C类型和C类型,中间加 +c2//合并C类型和C类型,中间加,返回C(String)

其他参数为默认值。

最终,将RDD[String,Int]转换为RDD[String,String]。

再看例子:

rdd1.combineByKey((v : Int) => List(v),(c : List[Int], v : Int) => v :: c,(c1 : List[Int], c2 : List[Int]) => c1 ::: c2
).collect
res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))

最终将RDD[String,Int]转换为RDD[String,List[Int]]。

29) foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] 

该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V.

例子:

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))scala> rdd1.foldByKey(0)(_+_).collectres75: Array[(String, Int)] = Array((A,2), (B,3), (C,1)) //将rdd1中每个key对应的V进行累加,注意zeroValue=0,需要先初始化V,映射函数为+操//作,比如("A",0), ("A",2),先将zeroValue应用于每个V,得到:("A",0+0), ("A",2+0),即://("A",0), ("A",2),再将映射函数应用于初始化后的V,最后得到(A,0+2),即(A,2)

再看:

scala> rdd1.foldByKey(2)(_+_).collectres76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))//先将zeroValue=2应用于每个V,得到:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再将映射函//数应用于初始化后的V,最后得到:(A,2+4),即:(A,6)

再看乘法操作:

scala> rdd1.foldByKey(0)(__).collectres77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))//先将zeroValue=0应用于每个V,注意,这次映射函数为乘法,得到:("A",00), ("A",20),//即:("A",0), ("A",0),再将映射函//数应用于初始化后的V,最后得到:(A,00),即:(A,0)//其他K也一样,最终都得到了V=0scala> rdd1.foldByKey(1)(__).collectres78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))//映射函数为乘法时,需要将zeroValue设为1,才能得到我们想要的结果。

在使用foldByKey算子时候,要特别注意映射函数及zeroValue的取值。

30) reduceByKeyLocally

def reduceByKeyLocally(func: (V, V) => V): Map[K, V]

该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[91] at makeRDD at :21scala> rdd1.reduceByKeyLocally((x,y) => x + y)res90: scala.collection.Map[String,Int] = Map(B -> 3, A -> 2, C -> 1)

31) cogroup和groupByKey的区别

val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))//cogroupval rdd3 = rdd1.cogroup(rdd2)//groupbykeyval rdd4 = rdd1.union(rdd2).groupByKey//注意cogroup与groupByKey的区别rdd3.foreach(println)rdd4.foreach(println)

⭐️32) join

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))//求jionval rdd3 = rdd1.join(rdd2)rdd3.collect

33) leftOuterJoin

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。

参数numPartitions用于指定结果的分区数

参数partitioner用于指定分区函数

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)scala> rdd1.leftOuterJoin(rdd2).collectres11: Array[(String, (String, Option[String]))] = Array((B,(2,None)), (A,(1,Some(a))), (C,(3,Some(c))))

34) rightOuterJoin

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] 

rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。

参数numPartitions用于指定结果的分区数

参数partitioner用于指定分区函数

var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)var rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)scala> rdd1.rightOuterJoin(rdd2).collectres12: Array[(String, (Option[String], String))] = Array((D,(None,d)), (A,(Some(1),a)), (C,(Some(3),c)))

1.3 Action操作

35) first

def first(): T

first返回RDD中的第一个元素,不排序。

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21scala> rdd1.firstres14: (String, String) = (A,1)scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21scala> rdd1.firstres8: Int = 10

36) count

def count(): Long

count返回RDD中的元素数量。

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21scala> rdd1.countres15: Long = 3

36) reduce

def reduce(f: (T, T) ⇒ T): T

根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21scala> rdd1.reduce(_ + _)
res18: Int = 55scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21scala> rdd2.reduce((x,y) => {|    (x._1 + y._1,x._2 + y._2)|   })
res21: (String, Int) = (CBBAA,6)

37) collect

def collect(): Array[T]

collect用于将一个RDD转换成数组。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21scala> rdd1.collect
res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

38) take

def take(num: Int): Array[T]

take用于获取RDD中从0到num-1下标的元素,不排序。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21scala> rdd1.take(1)res0: Array[Int] = Array(10)                           scala> rdd1.take(2)res1: Array[Int] = Array(10, 4)

39) top

def top(num: Int)(implicit ord: Ordering[T]): Array[T]

top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21scala> rdd1.top(1)
res2: Array[Int] = Array(12)scala> rdd1.top(2)
res3: Array[Int] = Array(12, 10)//指定排序规则
scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499efscala> rdd1.top(1)
res4: Array[Int] = Array(2)scala> rdd1.top(2)
res5: Array[Int] = Array(2, 3)

40) takeOrdered

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

takeOrdered和top类似,只不过以和top相反的顺序返回元素。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21scala> rdd1.top(1)
res4: Array[Int] = Array(12)scala> rdd1.top(2)
res5: Array[Int] = Array(12, 10)scala> rdd1.takeOrdered(1)
res6: Array[Int] = Array(2)scala> rdd1.takeOrdered(2)
res7: Array[Int] = Array(2, 3)

⭐️41) aggregate

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U

aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.mapPartitionsWithIndex{(partIdx,iter) => {var part_map = scala.collection.mutable.Map[String,List[Int]]()while(iter.hasNext){var part_name = "part_" + partIdx;var elem = iter.next()if(part_map.contains(part_name)) {var elems = part_map(part_name)elems ::= elempart_map(part_name) = elems} else {part_map(part_name) = List[Int]{elem}}}part_map.iterator}}.collect
res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))##第一个分区中包含5,4,3,2,1
##第二个分区中包含10,9,8,7,6scala> rdd1.aggregate(1)(|      {(x : Int,y : Int) => x + y}, |      {(a : Int,b : Int) => a + b}|   )
res17: Int = 58

结果为什么是58,看下面的计算过程:

##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16##part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58

再比如:

scala> rdd1.aggregate(2)(|      {(x : Int,y : Int) => x + y}, |      {(a : Int,b : Int) => a  b}|   )
res18: Int = 1428
##这次zeroValue=2
##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
##最后:zeroValuepart_0part_1 = 2  17  42 = 1428

因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。

42) fold

def fold(zeroValue: T)(op: (T, T) ⇒ T): T

fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。

var rdd1 = sc.makeRDD(1 to 10, 2)
scala> rdd1.fold(1)(|    (x,y) => x + y   |   )
res19: Int = 58
##结果同上面使用aggregate的第一个例子一样,即:
scala> rdd1.aggregate(1)(|      {(x,y) => x + y}, |      {(a,b) => a + b}|   )
res20: Int = 58

43) lookup

def lookup(key: K): Seq[V]

lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21scala> rdd1.lookup("A")
res0: Seq[Int] = WrappedArray(0, 2)scala> rdd1.lookup("B")
res1: Seq[Int] = WrappedArray(1, 2)

44) countByKey

def countByKey(): Map[K, Long]

countByKey用于统计RDD[K,V]中每个K的数量。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21scala> rdd1.countByKey
res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)

45) foreach

def foreach(f: (T)Unit): Unit

foreach用于遍历RDD,将函数f应用于每一个元素。

但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。

比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。

我在Spark1.4中是这样,不知道是否真如此。

这时候,使用accumulator共享变量与foreach结合,倒是个不错的选择。

scala> var cnt = sc.accumulator(0)
cnt: org.apache.spark.Accumulator[Int] = 0scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21scala> rdd1.foreach(x => cnt += x)scala> cnt.value
res51: Int = 55scala> rdd1.collect.foreach(println) 

⭐️46) foreachPartition

def foreachPartition(f: (Iterator[T])Unit): Unit

foreachPartition和foreach类似,只不过是对每一个分区使用f。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21scala> var allsize = sc.accumulator(0)
size: org.apache.spark.Accumulator[Int] = 0scala>   rdd1.foreachPartition { x => {|    allsize += x.size|   }}
scala> println(allsize.value)
#10

⭐️47) sortBy

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

sortBy根据给定的排序k函数将RDD中的元素进行排序。

scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)scala> rdd1.sortBy(x => x).collectres1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //默认升序scala> rdd1.sortBy(x => x,false).collectres2: Array[Int] = Array(7, 6, 3, 2, 1, 0)  //降序//RDD[K,V]类型scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))scala> rdd1.sortBy(x => x).collectres3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7))//按照V进行降序排序scala> rdd1.sortBy(x => x._2,false).collectres4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1))

48) saveAsTextFile

def saveAsTextFile(path: String): Unitdef saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit

saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。

codec参数可以指定压缩的类名。

var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFShadoop fs -ls /tmp/lxw1234.com
Found 2 items
-rw-r--r--  2 lxw1234 supergroup     0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS
-rw-r--r--  2 lxw1234 supergroup     21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000hadoop fs -cat /tmp/lxw1234.com/part-00000

注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地目录。

## 指定压缩格式保存rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/",classOf[com.hadoop.compression.lzo.LzopCodec])hadoop fs -ls /tmp/lxw1234.com
-rw-r--r--  2 lxw1234 supergroup   0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS
-rw-r--r--  2 lxw1234 supergroup   71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzohadoop fs -text /tmp/lxw1234.com/part-00000.lzo

49) saveAsSequenceFile

saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。

用法同saveAsTextFile。

50) saveAsObjectFile

def saveAsObjectFile(path: String): Unit

saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。

对于HDFS,默认采用SequenceFile保存。

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.saveAsObjectFile("hdfs://cdh5/tmp/lxw1234.com/")hadoop fs -cat /tmp/lxw1234.com/part-00000
SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT
  1. saveAsHadoopFile
def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unitdef saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf =, codec: Option[Class[_ <: CompressionCodec]] = None): Unit

saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。

可以指定outputKeyClass、outputValueClass以及压缩格式。

每个分区输出一个文件。

var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritablerdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],classOf[com.hadoop.compression.lzo.LzopCodec])

52) saveAsHadoopDataset

def saveAsHadoopDataset(conf: JobConf): Unit

saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。

在JobConf中,通常需要关注或者设置五个参数:

文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。

##使用saveAsHadoopDataset将RDD保存到HDFS中import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConfvar rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
var jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set("mapred.output.dir","/tmp/lxw1234/")
rdd1.saveAsHadoopDataset(jobConf)

结果:

hadoop fs -cat /tmp/lxw1234/part-00000
A    2
A    1hadoop fs -cat /tmp/lxw1234/part-00001
B    6
B    3
B    7

##保存数据到HBASE

##HBase建表:

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritablevar conf = HBaseConfiguration.create()var jobConf = new JobConf(conf)jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3")jobConf.set("zookeeper.znode.parent","/hbase")jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")jobConf.setOutputFormat(classOf[TableOutputFormat])var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))rdd1.map(x => {var put = new Put(Bytes.toBytes(x._1))put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))(new ImmutableBytesWritable,put)}).saveAsHadoopDataset(jobConf)
##结果:
hbase(main):005:0> scan 'lxw1234'
ROW   COLUMN+CELL                                                 A    column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02                        B    column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06                        C    column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07                        
3 row(s) in 0.0550 seconds

注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。

53) saveAsNewAPIHadoopFile

def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unitdef saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit

saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。

用法基本同saveAsHadoopFile。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritablevar rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])

54) saveAsNewAPIHadoopDataset

def saveAsNewAPIHadoopDataset(conf: Configuration): Unit

作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。

以写入HBase为例:

HBase建表:

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}

完整的Spark应用程序:

package com.lxw1234.testimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Putobject Test {def main(args : Array[String]) {val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com")val sc = new SparkContext(sparkConf);var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")var job = new Job(sc.hadoopConfiguration)job.setOutputKeyClass(classOf[ImmutableBytesWritable])job.setOutputValueClass(classOf[Result])job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])rdd1.map(x => {var put = new Put(Bytes.toBytes(x._1))put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))(new ImmutableBytesWritable,put)}   ).saveAsNewAPIHadoopDataset(job.getConfiguration)sc.stop()  }
}

注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。

相关文章:

Spark核心之02:常用算子详解

1、RDD操作详解 # 启动spark-shell spark-shell --master local[2] 1.1 基本转换 1) map map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。 举例&#xff1a; scala> val a sc.parallelize(1 …...

【CSS—前端快速入门】CSS 常用样式

CSS 常用 CSS 样式 1. 前端样式查询网站&#xff1a; MDN Web Docs (mozilla.org) w3school 2. border 2.1 借助 MDN 了解 border 我们借助 MDN 网站来学习 border 样式的使用&#xff1a; 2.2 border 常见属性 保存代码&#xff0c;打开页面&#xff1a; 对于标签不同样式的…...

探索DeepSeek-R1的核心秘诀:突破SFT技术的新篇章

摘要 近期&#xff0c;一种显著超越SFT&#xff08;Sequence-to-Sequence with Teacher Forcing&#xff09;的技术成为研究焦点。作为o1/DeepSeek-R1的核心秘诀&#xff0c;该技术不仅提升了模型性能&#xff0c;还成功应用于多模态大型模型中&#xff0c;实现了功能扩展。与传…...

DailyNotes 增加提醒功能

TODO&#xff1a;准备给 DailyNotes 增加一个提醒功能&#xff0c;准备接入 AI 来做一些事情。试了一下&#xff0c;非常靠谱。 具体 DailyNotes 和 Ollama 的交互方式&#xff0c;可以直接调用命令行&#xff0c;也可以走网络API。 rayuK2CD9WCYN4 ~ % ollama run deepseek-…...

[Computer Vision]实验六:视差估计

目录 一、实验内容 二、实验过程 2.1.1 test.py文件 2.1.2 test.py文件结果与分析 2.2.1 文件代码 2.2.2 结果与分析 一、实验内容 给定左右相机图片&#xff0c;估算图片的视差/深度&#xff1b;体现极线校正&#xff08;例如打印前后极线对&#xff09;、同名点匹配…...

软件测试基础:功能测试知识总结

&#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 一、测试项目启动与研读需求文档 &#xff08;一&#xff09; 组建测试团队 1、测试团队中的角色 2、测试团队的基本责任 尽早地发现软件程序、系统或产品中…...

171. Excel 表列序号

Excel 表列序号 题目描述尝试做法推荐做法 题目描述 给你一个字符串 columnTitle &#xff0c;表示 Excel 表格中的列名称。返回 该列名称对应的列序号 。 例如&#xff1a; A -> 1 B -> 2 C -> 3 … Z -> 26 AA -> 27 AB -> 28 … 示例 1: 输入: colum…...

03.05 QT事件

实现一个绘图工具&#xff0c;具备以下功能&#xff1a; 鼠标绘制线条。 实时调整线条颜色和粗细。 橡皮擦功能&#xff0c;覆盖绘制内容。 撤销功能&#xff0c;ctrl z 快捷键撤销最后一笔 程序代码&#xff1a; <1> Widget.h: #ifndef WIDGET_H #define WIDGET…...

es如何进行refresh?

在 Elasticsearch 中,refresh 操作的作用是让最近写入的数据可以被搜索到。以下为你介绍几种常见的执行 refresh 操作的方式: 1. 使用 RESTful API 手动刷新 你可以通过向 Elasticsearch 发送 HTTP 请求来手动触发 refresh 操作。可以针对单个索引、多个索引或者所有索引进…...

unity6 打包webgl注意事项

webgl使用资源需要异步加载 使用localization插件时要注意&#xff0c;webgl不支持WaitForCompletion&#xff0c;LocalizationSettings.InitializationOperation和LocalizationSettings.StringDatabase.GetTable都不能用 web里想要看到具体的报错信息调试开启这两个&#xf…...

【前端基础】Day 9 PC端品优购项目

目录 1. 品优购项目规划 1.1 网站制作流程 1.2 品优购项目整体介绍 1.3 学习目的 1.4 开发工具以及技术栈 1.5 项目搭建工作 1.6 网站favicon图标 1.7 网站TDK三大标签SEO优化 2. 品优购首页制作 2.1 常见模块类命名 2.2 快捷导航shortcut制作 2.3 header制作 2.4…...

【django初学者项目】

下面为你详细介绍如何创建一个简单有趣的 Django 项目——博客系统。这个项目允许用户创建、查看、编辑和删除博客文章。 步骤 1&#xff1a;环境准备 首先&#xff0c;确保你已经安装了 Python 和 pip。然后&#xff0c;创建一个虚拟环境并激活它&#xff0c;接着安装 Django…...

自学微信小程序的第十三天

DAY13 1、使用map组件在页面中创建地图后&#xff0c;若想在JS文件中对地图进行控制&#xff0c;需要通过地图API来完成。先通过wx.createMapContext()方法创建MapContext&#xff08;Map上下文&#xff09;实例&#xff0c;然后通过该实例的相关方法来操作map组件。 const m…...

gitbash忽略未追踪文件的解决方式

文章目录 问题描述&#xff0c;如下图解决方式 问题描述&#xff0c;如下图 因为这些事项目本地运行或者IDE环境配置时产生的文件或目录&#xff0c;手动删除后还来出现&#xff0c;怎么实现忽略不显示呢&#xff1f; 解决方式 查看项目的根目录下是否存在.gitignore文件&…...

React生态、Vue生态与跨框架前端解决方案

React生态系统 1 基础框架 React.js 是一个用于构建UI的JavaScript库。 2 应用框架 Next.js 是基于React.js的完整应用框架。主要负责应用如何工作&#xff1a; 应用架构&#xff1a;路由系统、页面结构渲染策略&#xff1a;服务端渲染(SSR)、静态生成(SSG)、客户端渲染性…...

GPPT: Graph Pre-training and Prompt Tuning to Generalize Graph Neural Networks

GPPT: Graph Pre-training and Prompt Tuning to Generalize Graph Neural Networks KDD22 推荐指数&#xff1a;#paper/⭐⭐#​ 动机 本文探讨了图神经网络&#xff08;GNN&#xff09;在迁移学习中“预训练-微调”框架的局限性及改进方向。现有方法通过预训练&#xff08…...

【Elasticsearch】Elasticsearch 的`path.settings`是用于配置 Elasticsearch 数据和日志存储路径的重要设置

Elasticsearch 的path.settings是用于配置 Elasticsearch 数据和日志存储路径的重要设置&#xff0c;这些路径在elasticsearch.yml配置文件中定义。以下是关于 Elasticsearch 的路径设置&#xff08;path.data和path.logs&#xff09;以及快照存储库配置的详细说明&#xff1a;…...

使用vue3+element plus 的table自制的穿梭框(支持多列数据)

目录 一、效果图 二、介绍 三、代码区 一、效果图 话不多说&#xff0c;先上图 二、介绍 项目需要&#xff1a;通过穿梭框选择人员信息&#xff0c;可以根据部门、岗位进行筛选&#xff0c;需要显示多列&#xff08;不光显示姓名&#xff0c;还包括人员的一些基础信息&…...

学习笔记:IC存储总结(ROM,RAM, EEPROM, Flash, SRAM, DRAM, DDL)

一&#xff0c;概述 半导体存储器是一种可以存储大量二值信息的半导体器件。在电子计算机及一些其他的数字系统的工作过程中&#xff0c;需要对大量的数据进行储存。由于数据处理的数据量和运算速度的要求&#xff0c;因此把存储量和存取速度作为衡量存储器的重要指标。 在电子…...

本地部署pangolin获取谱系,从而达到预测新冠的流行趋势

步骤 1&#xff1a;安装Docker 注&#xff1a;此步骤忽略&#xff0c;可通过Docker官网对于文档进行安装,地址如下 Docker: Accelerated Container Application Developmenthttps://www.docker.com/ 步骤 2&#xff1a;拉取Pangolin镜像 docker pull staphb/pangolin:latest 步…...

【Python】文件File处理详细解释,附示例(文件操作、模式、编码、指针、调试、大文件处理、文件管理等)

文件 File 处理方法 1. 前言2. 文件基础操作2.1 文件打开与关闭2.2 with 语句(上下文管理器)3. 文件模式详解3.1 基础模式3.2 扩展模式4. 文件读写操作4.1 读取内容4.2 写入内容5. 文件指针和随机访问5.1 seek(offset.whence)5.2 二进制模式下的指针操作6. 文件编码处理6.1 指定…...

Windows 10/11 系统下 Git 的详细安装步骤和基础设置指南

以下是 Windows 10/11 系统下 Git 的详细安装步骤和基础设置指南&#xff1a; Windows 10/11 系统下 Git 的详细安装步骤和基础设置指南 一、详细安装步骤1. 下载 Git 安装包2. 运行安装程序1. 双击安装包&#xff0c;按以下选项配置&#xff1a;2. 点击 Install 完成安装。 二…...

RabbitMQ的四种交换机

RabbitMQ交换机 什么是RabbitMQ RabbitMQ 是一个开源的消息代理和队列服务器&#xff0c;用于在分布式系统中存储和转发消息。它基于 AMQP&#xff08;高级消息队列协议&#xff09;实现&#xff0c;支持多种消息传递模式&#xff0c;广泛应用于异步通信、应用解耦、负载均衡…...

探秘基带算法:从原理到5G时代的通信变革【一】引言

文章目录 一、引言1.1 研究背景与意义1.2 研究目的与方法1.3 研究内容与创新点 本博客为系列博客&#xff0c;主要讲解各基带算法的原理与应用&#xff0c;包括&#xff1a;viterbi解码、Turbo编解码、Polar编解码、CORDIC算法、CRC校验、FFT/DFT、QAMtiaozhi/解调、QPSK调制/解…...

ES中数据刷新策略refresh

在 Elasticsearch 中&#xff0c;插入数据时的 refresh 参数控制文档在写入后何时对搜索可见&#xff0c;其行为直接影响数据可见性和系统性能。以下是 refresh 参数的三个可选值&#xff08;true、false、wait_for&#xff09;的详细说明及适用场景&#xff1a; 1. refreshtr…...

【向量数据库Weaviate】 和Elasticsearch的区别

Weaviate 和 Elasticsearch 是两种不同类型的数据库&#xff0c;设计目标和应用场景有显著差异。以下是它们的核心区别和适用场景的详细对比&#xff1a; 1. 设计目标与核心能力 维度WeaviateElasticsearch核心能力向量数据库 图数据库&#xff08;语义搜索优先&#xff09;全…...

【Wireshark 02】抓包过滤方法

一、官方教程 Wireshark 官网文档 &#xff1a; Wireshark User’s Guide 二、显示过滤器 2.1、 “数据包列表”窗格的弹出过滤菜单 例如&#xff0c;源ip地址作为过滤选项&#xff0c;右击源ip->prepare as filter-> 选中 点击选中完&#xff0c;显示过滤器&#…...

【零基础到精通Java合集】第十五集:Map集合框架与泛型

课程标题:Map集合框架与泛型(15分钟) 目标:掌握泛型在Map中的键值类型约束,理解类型安全的键值操作,熟练使用泛型Map解决实际问题 0-1分钟:泛型Map的意义引入 以“字典翻译”类比泛型Map:明确键和值的类型(如英文→中文)。说明泛型Map的作用——确保键值对的类型一…...

三参数水质在线分析仪:从源头保障饮用水安全

【TH-ZS03】饮用水安全是人类健康的重要保障&#xff0c;其质量直接关系到人们的生命健康。随着工业化、城市化的快速发展&#xff0c;水体污染问题日益严峻&#xff0c;饮用水安全面临着前所未有的挑战。为了从源头保障饮用水安全&#xff0c;科学、高效的水质监测手段必不可少…...

Java8-Stream流介绍和使用案例

Java 8 引入了 Stream API&#xff0c;它提供了一种高效且声明式的方式来处理集合数据。Stream 的核心思想是将数据的操作分为中间操作&#xff08;Intermediate Operations&#xff09;和终端操作&#xff08;Terminal Operations&#xff09;&#xff0c;并通过流水线&#x…...

FieldFox 手持射频与微波分析仪

FieldFox 手持射频与微波分析仪 简述 Keysight FieldFox 便携式分析仪可以在非常恶劣的工作环境中&#xff0c;轻松完成从日常维护到深入故障诊断的各项工作。 选择最适合您需求且有强大软件支持的 Keysight FieldFox 配置。 主要特性 特点&#xff1a; FieldFox 分析仪可…...

JQuery学习笔记,点击按钮加载更多的图片

利用点击按钮模拟某京&#xff0c;某宝&#xff0c;滚动页面加载图片的效果&#xff0c;代码&#xff1a; <!DOCTYPE html> <html><head><meta charset"utf-8"><title>Ajax请求</title></head><body><button id…...

swift4-汇编分析枚举内存布局

一、枚举的内存原理 1.1 常规case enum TestEnum { case test1, test2, test3 } var t TestEnum.test1 t .test2 t .test3枚举是常规的case的情况-是采取一个字节来存枚举变量通过拿到枚举的内存地址&#xff0c;看地址里面存的枚举值情况窥探枚举内存存储情况 var t Te…...

Vue 3 整合 WangEditor 富文本编辑器:从基础到高级实践

本文将详细介绍如何在 Vue 3 项目中集成 WangEditor 富文本编辑器&#xff0c;实现图文混排、自定义扩展等高阶功能。 一、为什么选择 WangEditor&#xff1f; 作为国内流行的开源富文本编辑器&#xff0c;WangEditor 具有以下优势&#xff1a; 轻量高效&#xff1a;压缩后仅…...

Docker安装嵌入框架Text Embeddings Inference (TEI)

Docker安装Text Embeddings Inference (TEI) 1 简单介绍 文本嵌入推理&#xff08;TEI&#xff0c;Text Embeddings Inference &#xff09;是HuggingFace研发的一个用于部署和服务开源文本嵌入和序列分类模型的工具包。TEI兼容OpenAI的嵌入模型的规范。 # 官网地址 https:/…...

【HeadFirst系列之HeadFirst设计模式】第14天之与设计模式相处:真实世界中的设计模式

与设计模式相处&#xff1a;真实世界中的设计模式 设计模式是软件开发中的经典解决方案&#xff0c;它们帮助我们解决常见的设计问题&#xff0c;并提高代码的可维护性和可扩展性。在《Head First设计模式》一书中&#xff0c;作者通过生动的案例和通俗的语言&#xff0c;深入…...

java后端开发day27--常用API(二)正则表达式爬虫

&#xff08;以下内容全部来自上述课程&#xff09; 1.正则表达式&#xff08;regex&#xff09; 可以校验字符串是否满足一定的规则&#xff0c;并用来校验数据格式的合法性。 1.作用 校验字符串是否满足规则在一段文本中查找满足要求的内容 2.内容定义 ps&#xff1a;一…...

【UCB CS 61B SP24】Lecture 22 23: Tree and Graph Traversals, DFS, BFS 学习笔记

本文讲解了二叉树的四种遍历方式&#xff0c;以及如何通过前/后序遍历与中序遍历重建出二叉树&#xff0c;接着介绍了新的非线性数据结构&#xff1a;图&#xff0c;详细讲解了图的存储方式与遍历方式&#xff0c;最后使用 Java 基于邻接表的存储方式实现了图与 DFS、BFS 两种遍…...

Redis100道高频面试题

一、Redis基础 Redis是什么&#xff1f;主要应用场景有哪些&#xff1f; Redis 是一个开源的、基于内存的数据结构存储系统&#xff0c;支持多种数据结构&#xff08;如字符串、哈希、列表、集合等&#xff09;&#xff0c;可以用作数据库、缓存和消息中间件。 主要应用场景&…...

Mac OS Homebrew更换国内镜像源(中科大;阿里;清华)

omebrew官方的源一般下载包之类的会很慢&#xff0c;所以通常我们都是用国内的镜像源来代替&#xff0c;这样会提高我们的效率。Homebrew主要有四个部分组成: brew、homebrew-core 、homebrew-bottles、homebrew-cask。 代码语言&#xff1a;javascript 代码运行次数&#xf…...

excel vlookup的精确查询、模糊查询、反向查询、多列查询

目录 入门 精确查询 模糊查询 反向查询 (搭配 if 函数) 多列查询 (搭配 match 函数) 入门 精确查询 需求: 查找 学生编号是008 所在的班级 操作: 在I2单元格输入公式如下,VLOOKUP(H2,B1:E12,4,FALSE), 得出结果 看一下vlookup 公式每一个参数应该怎么写? 语法: vlookup…...

linux的文件系统及文件类型

目录 一、Linux支持的文件系统 二、linux的文件类型 2.1、普通文件 2.2、目录文件 2.3、链接文件 2.4、字符设备文件: 2.5、块设备文件 2.6、套接字文件 2.7、管道文件 三、linux的文件属性 3.1、关于权限部分 四、Linux的文件结构 五、用户主目录 5.1、工作目录…...

MySQL 安装配置(完整教程)

文章目录 一、MySQL 简介二、下载 MySQL三、安装 MySQL四、配置环境变量五、配置 MySQL 5.1 初始化 MySQL5.2 启动 MySQL 服务 六、修改 MySQL 密码七、卸载 MySQL八、结语 一、MySQL 简介 MySQL 是一款广泛使用的开源关系型数据库管理系统&#xff08;RDBMS&#xff09;&am…...

C# Unity 唐老狮 No.4 模拟面试题

本文章不作任何商业用途 仅作学习与交流 安利唐老狮与其他老师合作的网站,内有大量免费资源和优质付费资源,我入门就是看唐老师的课程 打好坚实的基础非常非常重要: 全部 - 游习堂 - 唐老狮创立的游戏开发在线学习平台 - Powered By EduSoho 如果你发现了文章内特殊的字体格式,…...

给没有登录认证的web应用添加登录认证(openresty lua实现)

这阵子不是deepseek火么&#xff1f;我也折腾了下本地部署&#xff0c;ollama、vllm、llama.cpp都弄了下&#xff0c;webui也用了几个&#xff0c;发现nextjs-ollama-llm-ui小巧方便&#xff0c;挺适合个人使用的。如果放在网上供多人使用的话&#xff0c;得接入登录认证才好&a…...

R语言绘图:韦恩图

韦恩分析 韦恩分析&#xff08;Venn Analysis&#xff09;常用于可视化不同数据集之间的交集和并集。维恩图&#xff08;Venn diagram&#xff09;&#xff0c;也叫文氏图、温氏图、韦恩图、范氏图&#xff0c;用于显示元素集合重叠区域的关系型图表&#xff0c;通过图形与图形…...

STM32——串口通信 UART

一、基础配置 Universal Asynchronous Receiver Transmitter 异步&#xff0c;串行&#xff0c;全双工 TTL电平 &#xff1a;高电平1 低电平0 帧格式&#xff1a; 起始位1bit 数据位8bit 校验位1bit 终止位1bit NVIC Settings一栏使能接受中断。 之前有设置LCD&#xff0c;…...

【大模型基础_毛玉仁】1.3 基于Transformer 的语言模型

【大模型基础_毛玉仁】1.3 基于Transformer 的语言模型 1.3 基于Transformer 的语言模型1.3.1 Transformer1&#xff09;注意力层&#xff08;AttentionLayer&#xff09;2&#xff09;全连接前馈层&#xff08;Fully-connected Feedforwad Layer&#xff09;3&#xff09;层正…...

靶场(二)---靶场心得小白分享

开始&#xff1a; 看一下本地IP 21有未授权访问的话&#xff0c;就从21先看起 PORT STATE SERVICE VERSION 20/tcp closed ftp-data 21/tcp open ftp vsftpd 2.0.8 or later | ftp-anon: Anonymous FTP login allowed (FTP code 230) |_Cant get dire…...

大学至今的反思与总结

现在是2025年的3月5日&#xff0c;我大三下学期。 自大学伊始&#xff0c;我便以考研作为自己的目标&#xff0c;有时还会做自己考研上岸头部985,211&#xff0c;offer如潮水般涌来的美梦。 但是我却忽略了一点&#xff0c;即便我早早下定了决心去考研&#xff0c;但并没有早…...