大数据技术之SparkCore
RDD概述
什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
RDD五大特性
RDD编程
RDD的创建
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
从集合中创建
- 1)从集合中创建RDD:parallelize
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;import java.util.Arrays;
import java.util.List;public class Test01_List {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("hello", "spark"));List<String> result = stringRDD.collect();for (String s : result) {System.out.println(s);}// 4. 关闭scsc.stop();}
}
从外部存储系统的数据集创建
由外部存储系统的数据集创建RDD包括:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等。
- 1)数据准备
在新建的SparkCore项目名称上右键=》新建input文件夹=》在input文件夹上右键=》分别新建1.txt和2.txt。每个文件里面准备一些word单词。 - 2)创建RDD
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;import java.util.List;public class Test02_File {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<String> lineRDD = sc.textFile("input");List<String> result = lineRDD.collect();for (String s : result) {System.out.println(s);}// 4. 关闭scsc.stop();}
}
从其他RDD创建
主要是通过一个RDD运算完后,再产生新的RDD。
分区规则
从集合创建RDD
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;import java.util.Arrays;public class Test01_ListPartition {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码// 默认环境的核数,local环境中,分区数量和环境核数相关,但是一般不推荐// 可以手动填写参数控制分区的个数JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("hello", "spark", "hello", "spark", "hello"),2);// 数据分区的情况// 0 => 1,2 1 => 3,4,5// 利用整数除机制 左闭右开// 0 => start 0*5/2 end 1*5/2// 1 => start 1*5/2 end 2*5/2stringRDD.saveAsTextFile("output");// 4. 关闭scsc.stop();}
}
从文件创建RDD
- 1)分区测试
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;import java.util.List;public class Test02_FilePartition {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码// 默认填写的最小分区数 2和环境的核数取小的值 一般为2JavaRDD<String> lineRDD = sc.textFile("input/1.txt");// 具体的分区个数需要经过公式计算// 首先获取文件的总长度 totalSize// 计算平均长度 goalSize = totalSize / numSplits// 获取块大小 128M// 计算切分大小 splitSize = Math.max(minSize, Math.min(goalSize, blockSize));// 最后使用splitSize 按照1.1倍原则切分整个文件 得到几个分区就是几个分区// 实际开发中 只需要看文件总大小 / 填写的分区数 和块大小比较 谁小拿谁进行切分lineRDD.saveAsTextFile("output");// 数据会分配到哪个分区// 如果切分的位置位于一行的中间 会在当前分区读完一整行数据// 0 -> 1,2 1 -> 3 2 -> 4 3 -> 空// 4. 关闭scsc.stop();}
}
注意:getSplits文件返回的是切片规划,真正读取是在compute方法中创建LineRecordReader读取的,有两个关键变量: start = split.getStart(), end = start + split.getLength
①分区数量的计算方式:
totalSize = 10
goalSize = 10 / 3 = 3(byte) 表示每个分区存储3字节的数据
分区数= totalSize/ goalSize = 10 /3 => 3,3,4
4子节大于3子节的1.1倍,符合hadoop切片1.1倍的策略,因此会多创建一个分区,即一共有4个分区 3,3,3,1
②Spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,跟字节数没有关系
③数据读取位置计算是以偏移量为单位来进行计算的。
Transformation转换算子
Value类型
map()映射
参数f是一个函数可以写作匿名子类,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。即,这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。
- 1)具体实现
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;public class Test01_Map {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<String> lineRDD = sc.textFile("input/1.txt");// 需求:每行结尾拼接||// 两种写法 lambda表达式写法(匿名函数) JavaRDD<String> mapRDD = lineRDD.map(s -> s + "||");// 匿名函数写法 JavaRDD<String> mapRDD1 = lineRDD.map(new Function<String, String>() {@Overridepublic String call(String v1) throws Exception {return v1 + "||";}});for (String s : mapRDD.collect()) {System.out.println(s);}// 输出数据的函数写法mapRDD1.collect().forEach(a -> System.out.println(a));mapRDD1.collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
flatMap()扁平化
- 1)功能说明
与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。
区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。 - 2)需求说明:创建一个集合,集合里面存储的还是子集合,把所有子集合中数据取出放入到一个大的集合中。
- 4)具体实现:
import org.apache.commons.collections.ListUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;public class Test02_FlatMap {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码ArrayList<List<String>> arrayLists = new ArrayList<>();arrayLists.add(Arrays.asList("1","2","3"));arrayLists.add(Arrays.asList("4","5","6"));JavaRDD<List<String>> listJavaRDD = sc.parallelize(arrayLists,2);// 对于集合嵌套的RDD 可以将元素打散// 泛型为打散之后的元素类型JavaRDD<String> stringJavaRDD = listJavaRDD.flatMap(new FlatMapFunction<List<String>, String>() {@Overridepublic Iterator<String> call(List<String> strings) throws Exception {return strings.iterator();}});stringJavaRDD. collect().forEach(System.out::println);// 通常情况下需要自己将元素转换为集合JavaRDD<String> lineRDD = sc.textFile("input/2.txt");JavaRDD<String> stringJavaRDD1 = lineRDD.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {String[] s1 = s.split(" ");return Arrays.asList(s1).iterator();}});stringJavaRDD1. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
groupBy()分组
- 1)功能说明:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
- 2)需求说明:创建一个RDD,按照元素模以2的值进行分组。
- 3)具体实现
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;import java.util.Arrays;public class Test03_GroupBy {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);// 泛型为分组标记的类型JavaPairRDD<Integer, Iterable<Integer>> groupByRDD = integerJavaRDD.groupBy(new Function<Integer, Integer>() {@Overridepublic Integer call(Integer v1) throws Exception {return v1 % 2;}});groupByRDD.collect().forEach(System.out::println);// 类型可以任意修改JavaPairRDD<Boolean, Iterable<Integer>> groupByRDD1 = integerJavaRDD.groupBy(new Function<Integer, Boolean>() {@Overridepublic Boolean call(Integer v1) throws Exception {return v1 % 2 == 0;}});groupByRDD1. collect().forEach(System.out::println);Thread.sleep(600000);// 4. 关闭scsc.stop();}
}
groupBy会存在shuffle过程
shuffle:将不同的分区数据进行打乱重组的过程
shuffle一定会落盘。可以在local模式下执行程序,通过4040看效果。
filter()过滤
- 1)功能说明
接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。 - 2)需求说明:创建一个RDD,过滤出对2取余等于0的数据
- 3)代码实现
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;import java.util.Arrays;public class Test04_Filter {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);JavaRDD<Integer> filterRDD = integerJavaRDD.filter(new Function<Integer, Boolean>() {@Overridepublic Boolean call(Integer v1) throws Exception {return v1 % 2 == 0;}});filterRDD. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
distinct()去重
- 1)功能说明:对内部的元素去重,并将去重后的元素放到新的RDD中。
- 2)代码实现
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;import java.util.Arrays;public class Test05_Distinct {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);// 底层使用分布式分组去重 所有速度比较慢,但是不会OOMJavaRDD<Integer> distinct = integerJavaRDD.distinct();distinct. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
注意:distinct会存在shuffle过程。
sortBy()排序
- 1)功能说明
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。Spark的排序结果是全局有序。 - 2)需求说明:创建一个RDD,按照数字大小分别实现正序和倒序排序
- 3)代码实现:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;import java.util.Arrays;public class Test6_SortBy {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(5, 8, 1, 11, 20), 2);// (1)泛型为以谁作为标准排序 (2) true为正序 (3) 排序之后的分区个数JavaRDD<Integer> sortByRDD = integerJavaRDD.sortBy(new Function<Integer, Integer>() {@Overridepublic Integer call(Integer v1) throws Exception {return v1;}}, true, 2);sortByRDD. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
Key-Value类型
要想使用Key-Value类型的算子首先需要使用特定的方法转换为PairRDD
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;public class Test01_pairRDD{public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);JavaPairRDD<Integer, Integer> pairRDD = integerJavaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {@Overridepublic Tuple2<Integer, Integer> call(Integer integer) throws Exception {return new Tuple2<>(integer, integer);}});pairRDD. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
mapValues()只对V进行操作
- 1)功能说明:针对于(K,V)形式的类型只对V进行操作
- 2)需求说明:创建一个pairRDD,并将value添加字符串"|||"
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;import java.util.Arrays;public class Test02_MapValues {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaPairRDD<String, String> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("k", "v"), new Tuple2<>("k1", "v1"), new Tuple2<>("k2", "v2")));// 只修改value 不修改keyJavaPairRDD<String, String> mapValuesRDD = javaPairRDD.mapValues(new Function<String, String>() {@Overridepublic String call(String v1) throws Exception {return v1 + "|||";}});mapValuesRDD. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
groupByKey()按照K重新分组
- 1)功能说明
groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。
该操作可以指定分区器或者分区数(默认使用HashPartitioner) - 2)需求说明:统计单词出现次数
- 4)代码实现:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;public class Test03_GroupByKey {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<String> integerJavaRDD = sc.parallelize(Arrays.asList("hi","hi","hello","spark" ),2);// 统计单词出现次数JavaPairRDD<String, Integer> pairRDD = integerJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<>(s, 1);}});// 聚合相同的keyJavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();// 合并值JavaPairRDD<String, Integer> result = groupByKeyRDD.mapValues(new Function<Iterable<Integer>, Integer>() {@Overridepublic Integer call(Iterable<Integer> v1) throws Exception {Integer sum = 0;for (Integer integer : v1) {sum += integer;}return sum;}});result. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}}
reduceByKey()按照K聚合V
- 1)功能说明:该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。
- 2)需求说明:统计单词出现次数
- 3)代码实现:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;public class Test04_ReduceByKey {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<String> integerJavaRDD = sc.parallelize(Arrays.asList("hi","hi","hello","spark" ),2);// 统计单词出现次数JavaPairRDD<String, Integer> pairRDD = integerJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<>(s, 1);}});// 聚合相同的keyJavaPairRDD<String, Integer> result = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});result. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
reduceByKey和groupByKey区别
- 1)reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。
- 2)groupByKey:按照key进行分组,直接进行shuffle。
- 3)开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。影响业务逻辑时建议先对数据类型进行转换再合并。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;public class Test06_ReduceByKeyAvg {public static void main(String[] args) throws InterruptedException {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("hi", 96), new Tuple2<>("hi", 97), new Tuple2<>("hello", 95), new Tuple2<>("hello", 195)));// ("hi",(96,1))JavaPairRDD<String, Tuple2<Integer, Integer>> tuple2JavaPairRDD = javaPairRDD.mapValues(new Function<Integer, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> call(Integer v1) throws Exception {return new Tuple2<>(v1, 1);}});// 聚合RDDJavaPairRDD<String, Tuple2<Integer, Integer>> reduceRDD = tuple2JavaPairRDD.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {return new Tuple2<>(v1._1 + v2._1, v1._2 + v2._2);}});// 相除JavaPairRDD<String, Double> result = reduceRDD.mapValues(new Function<Tuple2<Integer, Integer>, Double>() {@Overridepublic Double call(Tuple2<Integer, Integer> v1) throws Exception {return (new Double(v1._1) / v1._2);}});result. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
sortByKey()按照K进行排序
- 1)功能说明
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。 - 2)需求说明:创建一个pairRDD,按照key的正序和倒序进行排序
- 3)代码实现:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;public class Test05_SortByKey {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaPairRDD<Integer, String> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>(4, "a"), new Tuple2<>(3, "c"), new Tuple2<>(2, "d")));// 填写布尔类型选择正序倒序JavaPairRDD<Integer, String> pairRDD = javaPairRDD.sortByKey(false);pairRDD. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
Action行动算子
行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。
collect()以数组的形式返回数据集
1)功能说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。
注意:所有的数据都会被拉取到Driver端,慎用。
- 2)需求说明:创建一个RDD,并将RDD内容收集到Driver端打印
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;import java.util.Arrays;
import java.util.List;public class Test01_Collect {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);List<Integer> collect = integerJavaRDD.collect();for (Integer integer : collect) {System.out.println(integer);}// 4. 关闭scsc.stop();}
}
count()返回RDD中元素个数
- 1)功能说明:返回RDD中元素的个数
- 2)需求说明:创建一个RDD,统计该RDD的条数
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;import java.util.Arrays;public class Test02_Count {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);long count = integerJavaRDD.count();System.out.println(count);// 4. 关闭scsc.stop();}
}
first()返回RDD中的第一个元素
- 1)功能说明:返回RDD中的第一个元素
- 2)需求说明:创建一个RDD,返回该RDD中的第一个元素
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;import java.util.Arrays;public class Test03_First {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);Integer first = integerJavaRDD.first();System.out.println(first);// 4. 关闭scsc.stop();}
}
take()返回由RDD前n个元素组成的数组
- 1)功能说明:返回一个由RDD的前n个元素组成的数组
- 2)需求说明:创建一个RDD,取出前两个元素
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;import java.util.Arrays;
import java.util.List;public class Test04_Take {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);List<Integer> list = integerJavaRDD.take(2);list.forEach(System.out::println);// 4. 关闭scsc.stop();}
}
countByKey()统计每种key的个数
- 1)功能说明:统计每种key的个数
- 2)需求说明:创建一个PairRDD,统计每种key的个数
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;
import java.util.Map;public class Test05_CountByKey {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("a", 8), new Tuple2<>("b", 8), new Tuple2<>("a", 8), new Tuple2<>("d", 8)));Map<String, Long> map = pairRDD.countByKey();System.out.println(map);// 4. 关闭scsc.stop();}
}
save相关算子
- 1)saveAsTextFile(path)保存成Text文件
功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 - 2)saveAsObjectFile(path) 序列化成对象保存到文件
功能说明:用于将RDD中的元素序列化成对象,存储到文件中。 - 3)代码实现
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;public class Test06_Save {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);integerJavaRDD.saveAsTextFile("output");integerJavaRDD.saveAsObjectFile("output1");// 4. 关闭scsc.stop();}
}
foreach()遍历RDD中每一个元素
- 2)需求说明:创建一个RDD,对每个元素进行打印
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;import java.util.Arrays;public class Test07_Foreach {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),4);integerJavaRDD.foreach(new VoidFunction<Integer>() {@Overridepublic void call(Integer integer) throws Exception {System.out.println(integer);}});// 4. 关闭scsc.stop();}
}
foreachPartition ()遍历RDD中每一个分区
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;import java.util.Arrays;
import java.util.Iterator;public class Test08_ForeachPartition {public static void main(String[] args) {// 1. 创建配置对象SparkConf conf = new SparkConf().setAppName("core").setMaster("local[*]");// 2. 创建sc环境JavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);// 多线程一起计算 分区间无序 单个分区有序parallelize.foreachPartition(new VoidFunction<Iterator<Integer>>() {@Overridepublic void call(Iterator<Integer> integerIterator) throws Exception {// 一次处理一个分区的数据while (integerIterator.hasNext()) {Integer next = integerIterator.next();System.out.println(next);}}});// 4. 关闭scsc.stop();}
}
WordCount案例实操
- 1)导入项目依赖
下方的是scala语言打包插件 只要使用scala语法打包运行到linux上面 必须要有
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.3.1</version></dependency>
</dependencies>
本地调试
本地Spark程序调试需要使用Local提交模式,即将本机当做运行环境,Master和Worker都为本机。运行时直接加断点调试即可。如下:
- 1)代码实现
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;
import java.util.Iterator;
import java.util.List;public class WordCount {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码// 读取数据JavaRDD<String> javaRDD = sc.textFile("input/2.txt");// 长字符串切分为单个单词JavaRDD<String> flatMapRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {List<String> strings = Arrays.asList(s.split(" "));return strings.iterator();}});// 转换格式为 (单词,1)JavaPairRDD<String, Integer> pairRDD = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<>(s, 1);}});// 合并相同单词JavaPairRDD<String, Integer> javaPairRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});javaPairRDD. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
- 2)调试流程
debug
集群运行
- 1)修改代码,修改运行模式,将输出的方法修改为落盘,同时设置可以自定义的传入传出路径
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;
import java.util.Iterator;
import java.util.List;public class WordCount {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("yarn").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码// 读取数据JavaRDD<String> javaRDD = sc.textFile(args[0]);// 长字符串切分为单个单词JavaRDD<String> flatMapRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {List<String> strings = Arrays.asList(s.split(" "));return strings.iterator();}});// 转换格式为 (单词,1)JavaPairRDD<String, Integer> pairRDD = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<>(s, 1);}});// 合并相同单词JavaPairRDD<String, Integer> javaPairRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});javaPairRDD.saveAsTextFile(args[1]);// 4. 关闭scsc.stop();}
}
- 2)打包到集群测试
(1)点击package打包,然后,查看打完后的jar包
(2)将WordCount.jar上传到/opt/module/spark-yarn目录
(3)在HDFS上创建,存储输入文件的路径/input
[jjm@hadoop102 spark-yarn]$ hadoop fs -mkdir /input
(4)上传输入文件到/input路径
[jjm@hadoop102 spark-yarn]$ hadoop fs -put /opt/software /input
(5)执行任务
[jjm@hadoop102 spark-yarn]$ bin/spark-submit \
--class com.atguigu.spark.WordCount \
--master yarn \
./WordCount.jar \
/input \
/output
注意:input和ouput都是HDFS上的集群路径。
(6)查询运行结果
[jjm@hadoop102 spark-yarn]$ hadoop fs -cat /output/*
RDD序列化
在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。下面我们看几个例子:
序列化异常
- 0)创建包名:com.atguigu.serializable
- 1)创建使用的javaBean:User
import java.io.Serializable;public class User implements Serializable {private String name;private Integer age;public User(String name, Integer age) {this.name = name;this.age = age;}public User() {}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", age=" + age +'}';}
}
- 2)创建类:Test01_Ser测试序列化
import com.atguigu.bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;import java.util.Arrays;public class Test01_Ser {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码User zhangsan = new User("zhangsan", 13);User lisi = new User("lisi", 13);JavaRDD<User> userJavaRDD = sc.parallelize(Arrays.asList(zhangsan, lisi), 2);JavaRDD<User> mapRDD = userJavaRDD.map(new Function<User, User>() {@Overridepublic User call(User v1) throws Exception {return new User(v1.getName(), v1.getAge() + 1);}});mapRDD. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
Kryo序列化框架
参考地址: https://github.com/EsotericSoftware/kryo
Java的序列化能够序列化任何的类。但是比较重,序列化后对象的体积也比较大。
Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。
import com.atguigu.bean.User;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;import java.util.Arrays;public class Test02_Kryo {public static void main(String[] args) throws ClassNotFoundException {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore")// 替换默认的序列化机制.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注册需要使用kryo序列化的自定义类.registerKryoClasses(new Class[]{Class.forName("com.atguigu.bean.User")});// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码User zhangsan = new User("zhangsan", 13);User lisi = new User("lisi", 13);JavaRDD<User> userJavaRDD = sc.parallelize(Arrays.asList(zhangsan, lisi), 2);JavaRDD<User> mapRDD = userJavaRDD.map(new Function<User, User>() {@Overridepublic User call(User v1) throws Exception {return new User(v1.getName(), v1.getAge() + 1);}});mapRDD. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
RDD依赖关系
查看血缘关系
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
- 0)创建包名:com.atguigu.dependency
- 1)代码实现
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;
import java.util.Iterator;
import java.util.List;public class Test01_Dep {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<String> lineRDD = sc.textFile("input/2.txt");System.out.println(lineRDD.toDebugString());System.out.println("-------------------");JavaRDD<String> wordRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {List<String> stringList = Arrays.asList(s.split(" "));return stringList.iterator();}});System.out.println(wordRDD.toDebugString());System.out.println("-------------------");JavaPairRDD<String, Integer> tupleRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<>(s, 1);}});System.out.println(tupleRDD.toDebugString());System.out.println("-------------------");JavaPairRDD<String, Integer> wordCountRDD = tupleRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});System.out.println(wordCountRDD.toDebugString());System.out.println("-------------------");// 4. 关闭scsc.stop();}
}
窄依赖
窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一or多对一),窄依赖我们形象的比喻为独生子女。
宽依赖
宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖(只能是一对多),会引起Shuffle,总结:宽依赖我们形象的比喻为超生。
具有宽依赖的transformations包括:sort、reduceByKey、groupByKey、join和调用rePartition函数的任何操作。
宽依赖对Spark去评估一个transformations有更加重要的影响,比如对性能的影响。
在不影响业务要求的情况下,要尽量避免使用有宽依赖的转换算子,因为有宽依赖,就一定会走shuffle,影响性能。
Stage任务划分
- 1)DAG有向无环图
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。
- 2)任务运行的整体流程
- 3)RDD任务切分中间分为:Application、Job、Stage和Task
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系。
- 4)代码实现
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;
import java.util.Iterator;
import java.util.List;public class Test02_Dep {public static void main(String[] args) throws InterruptedException {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<String> lineRDD = sc.textFile("input/2.txt");System.out.println(lineRDD.toDebugString());System.out.println("-------------------");JavaRDD<String> wordRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {List<String> stringList = Arrays.asList(s.split(" "));return stringList.iterator();}});System.out.println(wordRDD);System.out.println("-------------------");JavaPairRDD<String, Integer> tupleRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<>(s, 1);}});System.out.println(tupleRDD.toDebugString());System.out.println("-------------------");// 缩减分区JavaPairRDD<String, Integer> coalesceRDD = tupleRDD.coalesce(1);JavaPairRDD<String, Integer> wordCountRDD = coalesceRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}},4);System.out.println(wordCountRDD.toDebugString());System.out.println("-------------------");wordCountRDD. collect().forEach(System.out::println);
wordCountRDD. collect().forEach(System.out::println);Thread.sleep(600000);// 4. 关闭scsc.stop();}
}
RDD持久化
RDD Cache缓存
RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
- 1)代码实现
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;import java.util.Arrays;
import java.util.Iterator;
import java.util.List;public class Test01_Cache {public static void main(String[] args) throws InterruptedException {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<String> lineRDD = sc.textFile("input/2.txt");//3.1.业务逻辑JavaRDD<String> wordRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String s) throws Exception {List<String> stringList = Arrays.asList(s.split(" "));return stringList.iterator();}});JavaRDD<Tuple2<String, Integer>> tuple2JavaRDD = wordRDD.map(new Function<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> call(String v1) throws Exception {System.out.println("*****************");return new Tuple2<>(v1, 1);}});//3.5 cache缓存前打印血缘关系System.out.println(tuple2JavaRDD.toDebugString());//3.4 数据缓存。
//cache底层调用的就是persist方法,缓存级别默认用的是MEMORY_ONLYtuple2JavaRDD.cache();//3.6 persist方法可以更改存储级别// wordToOneRdd.persist(StorageLevel.MEMORY_AND_DISK_2)//3.2 触发执行逻辑tuple2JavaRDD. collect().forEach(System.out::println);//3.5 cache缓存后打印血缘关系
//cache操作会增加血缘关系,不改变原有的血缘关系System.out.println(tuple2JavaRDD.toDebugString());System.out.println("=====================");//3.3 再次触发执行逻辑tuple2JavaRDD. collect().forEach(System.out::println);Thread.sleep(1000000);// 4. 关闭scsc.stop();}
}
- 2)源码解析
mapRdd.cache()
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)object StorageLevel {val NONE = new StorageLevel(false, false, false, false)val DISK_ONLY = new StorageLevel(true, false, false, false)val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)val MEMORY_ONLY = new StorageLevel(false, true, false, true)val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。SER:表示序列化。
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
- 3)自带缓存算子
Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。
RDD CheckPoint检查点
- 1)检查点:是通过将RDD中间结果写入磁盘。
- 2)为什么要做检查点?
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。 - 3)检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统
- 4)检查点数据存储格式为:二进制的文件
- 5)检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。
- 6)检查点触发时间:对RDD进行Checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍。
- 7)设置检查点步骤
(1)设置检查点数据存储路径:sc.setCheckpointDir(“./checkpoint1”)
(2)调用检查点方法:wordToOneRdd.checkpoint() - 8)代码实现
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;public class Test02_CheckPoint {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);sc.setCheckpointDir("ck");// 3. 编写代码JavaRDD<String> lineRDD = sc.textFile("input/2.txt");JavaPairRDD<String, Long> tupleRDD = lineRDD.mapToPair(new PairFunction<String, String, Long>() {@Overridepublic Tuple2<String, Long> call(String s) throws Exception {return new Tuple2<String, Long>(s, System.currentTimeMillis());}});// 查看血缘关系System.out.println(tupleRDD.toDebugString());// 增加检查点避免计算两次tupleRDD.cache();// 进行检查点tupleRDD.checkpoint();tupleRDD. collect().forEach(System.out::println);System.out.println(tupleRDD.toDebugString());// 第二次计算tupleRDD. collect().forEach(System.out::println);// 第三次计算tupleRDD. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
- 9)执行结果
访问http://localhost:4040/jobs/页面,查看4个job的DAG图。其中第2个图是checkpoint的job运行DAG图。第3、4张图说明,检查点切断了血缘依赖关系。
(1)只增加checkpoint,没有增加Cache缓存打印
第1个job执行完,触发了checkpoint,第2个job运行checkpoint,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。
(hadoop,1577960215526)
。。。。。。
(hello,1577960215526)
(hadoop,1577960215609)
。。。。。。
(hello,1577960215609)
(hadoop,1577960215609)
。。。。。。
(hello,1577960215609)
(2)增加checkpoint,也增加Cache缓存打印
第1个job执行完,数据就保存到Cache里面了,第2个job运行checkpoint,直接读取Cache里面的数据,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。
(hadoop,1577960642223)
。。。。。。
(hello,1577960642225)
(hadoop,1577960642223)
。。。。。。
(hello,1577960642225)
(hadoop,1577960642223)
。。。。。。
(hello,1577960642225)
缓存和检查点区别
(1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
(2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
(3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
(4)如果使用完了缓存,可以通过unpersist()方法释放缓存。
检查点存储到HDFS集群
如果检查点数据存储到HDFS集群,要注意配置访问集群的用户名。否则会报访问权限异常。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;public class Test02_CheckPoint2 {public static void main(String[] args) {// 修改用户名称System.setProperty("HADOOP_USER_NAME","jjm");// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 需要设置路径.需要提前在HDFS集群上创建/checkpoint路径sc.setCheckpointDir("hdfs://hadoop102:8020/checkpoint");// 3. 编写代码JavaRDD<String> lineRDD = sc.textFile("input/2.txt");JavaPairRDD<String, Long> tupleRDD = lineRDD.mapToPair(new PairFunction<String, String, Long>() {@Overridepublic Tuple2<String, Long> call(String s) throws Exception {return new Tuple2<String, Long>(s, System.currentTimeMillis());}});// 查看血缘关系System.out.println(tupleRDD.toDebugString());// 增加检查点避免计算两次tupleRDD.cache();// 进行检查点tupleRDD.checkpoint();tupleRDD. collect().forEach(System.out::println);System.out.println(tupleRDD.toDebugString());// 第二次计算tupleRDD. collect().forEach(System.out::println);// 第三次计算tupleRDD. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
键值对RDD数据分区
Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。
- 1)注意:
(1)只有Key-Value类型的pairRDD才有分区器,非Key-Value类型的RDD分区的值是None
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。 - 2)获取RDD分区
(1)创建包名:com.atguigu.partitioner
(2)代码实现
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;import java.util.Arrays;public class Test01_Partitioner {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaPairRDD<String, Integer> tupleRDD = sc.parallelizePairs(Arrays.asList(new Tuple2<>("s", 1), new Tuple2<>("a", 3), new Tuple2<>("d", 2)));// 获取分区器Optional<Partitioner> partitioner = tupleRDD.partitioner();System.out.println(partitioner);JavaPairRDD<String, Integer> reduceByKeyRDD = tupleRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});// 获取分区器Optional<Partitioner> partitioner1 = reduceByKeyRDD.partitioner();System.out.println(partitioner1);// 4. 关闭scsc.stop();}
}
Hash分区
Ranger分区
广播变量
广播变量:分布式共享只读变量。
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark Task操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来会很顺手。在多个Task并行操作中使用同一个变量,但是Spark会为每个Task任务分别发送。
- 1)使用广播变量步骤:
(1)调用SparkContext.broadcast(广播变量)创建出一个广播对象,任何可序列化的类型都可以这么实现。
(2)通过广播变量.value,访问该对象的值。
(3)广播变量只会被发到各个节点一次,作为只读值处理(修改这个值不会影响到别的节点)。 - 2)原理说明
- 3)创建包名:com.atguigu.broadcast
- 4)代码实现
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;import java.util.Arrays;
import java.util.List;public class Test02_Broadcast {public static void main(String[] args) {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(4, 56, 7, 8, 1, 2));// 幸运数字List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);// 找出幸运数字// 每一个task都会创建一个list浪费内存/*JavaRDD<Integer> result = intRDD.filter(new Function<Integer, Boolean>() {@Overridepublic Boolean call(Integer v1) throws Exception {return list.contains(v1);}});*/// 创建广播变量// 只发送一份数据到每一个executorBroadcast<List<Integer>> broadcast = sc.broadcast(list);JavaRDD<Integer> result = intRDD.filter(new Function<Integer, Boolean>() {@Overridepublic Boolean call(Integer v1) throws Exception {return broadcast.value().contains(v1);}});result. collect().forEach(System.out::println);// 4. 关闭scsc.stop();}
}
SparkCore实战
数据准备
- 1)数据格式
编号 | 字段名称 | 字段类型 | 字段含义 |
---|---|---|---|
1 | date | String | 用户点击行为的日期 |
2 | user_id | Long | 用户的ID |
3 | session_id | String | Session的ID |
4 | page_id | Long | 某个页面的ID |
5 | action_time | String | 动作的时间点 |
6 | search_keyword | String | 用户搜索的关键词 |
7 | click_category_id | Long | 点击某一个商品品类的ID |
8 | click_product_id | Long | 某一个商品的ID |
9 | order_category_ids | String | 一次订单中所有品类的ID集合 |
10 | order_product_ids | String | 一次订单中所有商品的ID集合 |
11 | pay_category_ids | String | 一次支付中所有品类的ID集合 |
12 | pay_product_ids | String | 一次支付中所有商品的ID集合 |
13 | city_id | Long | 城市 id |
需求:Top10热门品类
需求说明:品类是指产品的分类,大型电商网站品类分多级,咱们的项目中品类只有一级,不同的公司可能对热门的定义不一样。我们按照每个品类的点击、下单、支付的量(次数)来统计热门品类。
鞋 点击数 下单数 支付数
衣服 点击数 下单数 支付数
电脑 点击数 下单数 支付数
例如,综合排名 = 点击数20% + 下单数30% + 支付数*50%
为了更好的泛用性,当前案例按照点击次数进行排序,如果点击相同,按照下单数,如果下单还是相同,按照支付数。
需求分析
采用样例类的方式实现,聚合算子使用reduceByKey。
需求实现
- 1)添加lombok的插件
- 2)添加依赖lombok可以省略getset代码
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version>
</dependency>
- 0)创建两个存放数据的类
package com.atguigu.spark.bean;import lombok.Data;
import java.io.Serializable;@Data
public class UserVisitAction implements Serializable {private String date;private String user_id;private String session_id;private String page_id;private String action_time;private String search_keyword;private String click_category_id;private String click_product_id;private String order_category_ids;private String order_product_ids;private String pay_category_ids;private String pay_product_ids;private String city_id;public UserVisitAction() {}public UserVisitAction(String date, String user_id, String session_id, String page_id, String action_time, String search_keyword, String click_category_id, String click_product_id, String order_category_ids, String order_product_ids, String pay_category_ids, String pay_product_ids, String city_id) {this.date = date;this.user_id = user_id;this.session_id = session_id;this.page_id = page_id;this.action_time = action_time;this.search_keyword = search_keyword;this.click_category_id = click_category_id;this.click_product_id = click_product_id;this.order_category_ids = order_category_ids;this.order_product_ids = order_product_ids;this.pay_category_ids = pay_category_ids;this.pay_product_ids = pay_product_ids;this.city_id = city_id;}}
package com.atguigu.spark.bean;import lombok.Data;import java.io.Serializable;@Data
public class CategoryCountInfo implements Serializable, Comparable<CategoryCountInfo> {private String categoryId;private Long clickCount;private Long orderCount;private Long payCount;public CategoryCountInfo() {}public CategoryCountInfo(String categoryId, Long clickCount, Long orderCount, Long payCount) {this.categoryId = categoryId;this.clickCount = clickCount;this.orderCount = orderCount;this.payCount = payCount;}@Overridepublic int compareTo(CategoryCountInfo o) {// 小于返回-1,等于返回0,大于返回1if (this.getClickCount().equals(o.getClickCount())) {if (this.getOrderCount().equals(o.getOrderCount())) {if (this.getPayCount().equals(o.getPayCount())) {return 0;} else {return this.getPayCount() < o.getPayCount() ? -1 : 1;}} else {return this.getOrderCount() < o.getOrderCount() ? -1 : 1;}} else {return this.getClickCount() < o.getClickCount() ? -1 : 1;}}
}
- 4)核心业务代码实现
import com.atguigu.spark.bean.CategoryCountInfo;
import com.atguigu.spark.bean.UserVisitAction;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.ArrayList;
import java.util.Iterator;public class Test06_Top10 {public static void main(String[] args) throws ClassNotFoundException {// 1.创建配置对象SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore")// 替换默认的序列化机制.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注册需要使用kryo序列化的自定义类.registerKryoClasses(new Class[]{Class.forName("com.atguigu.spark.bean.CategoryCountInfo"),Class.forName("com.atguigu.spark.bean.UserVisitAction")});// 2. 创建sparkContextJavaSparkContext sc = new JavaSparkContext(conf);// 3. 编写代码// 分三次进行wordCount 最后使用cogroup合并三组数据JavaRDD<String> lineRDD = sc.textFile("input/user_visit_action.txt");// 转换为对象JavaRDD<UserVisitAction> actionRDD = lineRDD.map(new Function<String, UserVisitAction>() {@Overridepublic UserVisitAction call(String v1) throws Exception {String[] data = v1.split("_");return new UserVisitAction(data[0],data[1],data[2],data[3],data[4],data[5],data[6],data[7],data[8],data[9],data[10],data[11],data[12]);}});JavaRDD<CategoryCountInfo> categoryCountInfoJavaRDD = actionRDD.flatMap(new FlatMapFunction<UserVisitAction, CategoryCountInfo>() {@Overridepublic Iterator<CategoryCountInfo> call(UserVisitAction userVisitAction) throws Exception {ArrayList<CategoryCountInfo> countInfos = new ArrayList<>();if (!userVisitAction.getClick_category_id().equals("-1")) {// 当前为点击数据countInfos.add(new CategoryCountInfo(userVisitAction.getClick_category_id(), 1L, 0L, 0L));} else if (!userVisitAction.getOrder_category_ids().equals("null")) {// 当前为订单数据String[] orders = userVisitAction.getOrder_category_ids().split(",");for (String order : orders) {countInfos.add(new CategoryCountInfo(order, 0L, 1L, 0L));}} else if (!userVisitAction.getPay_category_ids().equals("null")) {// 当前为支付数据String[] pays = userVisitAction.getPay_category_ids().split(",");for (String pay : pays) {countInfos.add(new CategoryCountInfo(pay, 0L, 0L, 1L));}}return countInfos.iterator();}});// 合并相同的idJavaPairRDD<String, CategoryCountInfo> countInfoJavaPairRDD = categoryCountInfoJavaRDD.mapToPair(new PairFunction<CategoryCountInfo, String, CategoryCountInfo>() {@Overridepublic Tuple2<String, CategoryCountInfo> call(CategoryCountInfo categoryCountInfo) throws Exception {return new Tuple2<>(categoryCountInfo.getCategoryId(), categoryCountInfo);}});JavaPairRDD<String, CategoryCountInfo> countInfoPairRDD = countInfoJavaPairRDD.reduceByKey(new Function2<CategoryCountInfo, CategoryCountInfo, CategoryCountInfo>() {@Overridepublic CategoryCountInfo call(CategoryCountInfo v1, CategoryCountInfo v2) throws Exception {v1.setClickCount(v1.getClickCount() + v2.getClickCount());v1.setOrderCount(v1.getOrderCount() + v2.getOrderCount());v1.setPayCount(v1.getPayCount() + v2.getPayCount());return v1;}});JavaRDD<CategoryCountInfo> countInfoJavaRDD = countInfoPairRDD.map(new Function<Tuple2<String, CategoryCountInfo>, CategoryCountInfo>() {@Overridepublic CategoryCountInfo call(Tuple2<String, CategoryCountInfo> v1) throws Exception {return v1._2;}});// CategoryCountInfo需要能够比较大小JavaRDD<CategoryCountInfo> result = countInfoJavaRDD.sortBy(new Function<CategoryCountInfo, CategoryCountInfo>() {@Overridepublic CategoryCountInfo call(CategoryCountInfo v1) throws Exception {return v1;}}, false, 2);result. collect().forEach(System.out::println);// 4. 关闭scsc.stop();
}
}
相关文章:
大数据技术之SparkCore
RDD概述 什么是RDD RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。 RDD五大特性 RDD编程 RDD的创…...
element-plus的组件数据配置化封装 - table
目录 一、封装的table、table-column组件以及相关ts类型的定义 1、ATable组件的封装 - index.ts 2、ATableColumn组件的封装 - ATableColumn.ts 3、ATable、ATableColumn类型 - interface.ts 二、ATable、ATableColumn组件的使用 三、相关属性、方法的使用以及相关说明 1. C…...
蓝桥杯每日真题 - 第15天
题目:(钟表) 题目描述(13届 C&C B组B题) 解题思路: 理解钟表指针的运动: 秒针每分钟转一圈,即每秒转6度。 分针每小时转一圈,即每分钟转6度。 时针每12小时转一圈…...
c#:winform调用bartender实现打印(学习整理笔记)
效果 学习路径 C# winform调用Bartender进行自定义打印、批量打印、检索文件夹中的模板_哔哩哔哩_bilibili 一、初始环境搭建见: c#:winform引入bartender-CSDN博客https://blog.csdn.net/weixin_46001736/article/details/143989473?sharetypeblogdetail&s…...
周期法频率计的设计
目录 周期法频率计 分析: 设计过程: 周期法频率计 对于低频信号,应用周期法进行测频。周期法测频的基本原理是:应用标准频率信号统计被测信号两个相邻脉冲之间的脉冲数,然后通过脉冲数计算出被测信号的周期ÿ…...
【2024亚太杯亚太赛APMCM C题】数学建模竞赛|宠物行业及相关产业的发展分析与策略|建模过程+完整代码论文全解全析
第一个问题是:请基于附件 1 中的数据以及你的团队收集的额外数据,分析过去五年中国宠物行业按宠物类型的发展情况。并分析中国宠物行业发展的因素,预测未来三年中国宠物行业的发展。 第一个问题:分析中国宠物行业按宠物类型的发展…...
uniapp的renderjs使用
uniapp中的RenderJS主要服务于APP和H5平台,其作用包括降低逻辑层和视图层的通讯损耗,提供高性能视图交互能力,以及在视图层操作DOM和运行Web的JS库。 RenderJS是uni-app中一个特性,它允许开发者在页面中使用JavaScript直接渲…...
CPU命名那些事
一、Intel CPU命名 1. 命名结构 Intel CPU 的命名通常包含以下几个部分: 品牌 产品线 系列 代数 具体型号 后缀 例如:Intel Core i7-13700K 2. 各部分含义 品牌 Intel:表示厂商(几乎所有命名中都有)。不同品…...
【LLM】一文学会SPPO
博客昵称:沈小农学编程 作者简介:一名在读硕士,定期更新相关算法面试题,欢迎关注小弟! PS:哈喽!各位CSDN的uu们,我是你的小弟沈小农,希望我的文章能帮助到你。欢迎大家在…...
鸿蒙多线程开发——线程间数据通信对象03(sendable)
1、简 介 在传统JS引擎上,对象的并发通信开销的优化方式只有一种,就是把实现下沉到Native侧,通过Transferable对象的转移或共享方式降低并发通信开销。而开发者仍然还有大量对象并发通信的诉求,这个问题在业界的JS引擎实现上并没…...
web前端开发--动画效果
1、3D旋转 <!DOCTYPE html> <html><head><meta charset"UTF-8"><title>3D旋转</title><style type"text/css">div{/*设置大盒子样式*/width: 100px;height: 100px;/*background-color: rgba(255,0,0,0.5);*/ba…...
【数据分享】中国价格统计年鉴(2013-2024) PDF
数据介绍 犹如一座珍贵的宝库,全面而系统地记录了中国在这一时期的价格变动情况。它涵盖了丰富的内容,包括宏观经济指标、商品价格、居民消费价格以及城市物价监测等多个方面。 在宏观经济指标方面,年鉴中收录了 GDP、CPI、PPI 等重要数据&…...
鸿蒙NEXT开发案例:字数统计
【引言】 本文将通过一个具体的案例——“字数统计”组件,来探讨如何在鸿蒙NEXT框架下实现这一功能。此组件不仅能够统计用户输入文本中的汉字、中文标点、数字、以及英文字符的数量,还具有良好的用户界面设计,使用户能够直观地了解输入文本…...
TritonServer中加载模型,并在Gunicorn上启动Web服务调用模型
TritonServer中加载模型,并在Gunicorn上启动Web服务调用模型 一、TritonServer中加载模型1.1 搭建本地仓库1.2 配置文件1.3 服务端代码1.4 启动TritonServer二、Gunicorn上启动Web服务2.1 安装和配置Gunicorn2.2 启动Gunicorn三、调用模型四、性能优化与监控五、总结在深度学习…...
[UE5学习] 一、使用源代码安装UE5.4
一、简介 本文介绍了如何使用源代码安装编译UE5.4,并且新建简单的项目,打包成安卓平台下的apk安装包。 二、使用源代码安装UE5.4 注意事项: 请保证可以全程流畅地科学上网。请保证C盘具有充足的空间。请保证接下来安装下载的visual studi…...
2023AE软件、Adobe After Effects安装步骤分享教程
2023AE软件是一款由Adobe公司开发的视频编辑软件,也被称为Adobe After Effects。它在广告、电影、电视和网络视频等领域广泛应用,用于制作动态图形、特效、合成和其他视觉效果。该软件支持多种视频和音频文件格式,具有丰富的插件和预设&#…...
Xilinx 7 系列 FPGA的各引脚外围电路接法
Xilinx 7系列FPGA的外围电路接法涉及到多个方面,包括电源引脚、时钟输入引脚、FPGA配置引脚、JTAG调试引脚,以及其他辅助引脚。 本文参考资料: ds180 - 7 Series FPGAs Data Sheet - Overview ds181 - Artix 7 FPGAs Data Sheet - DC and AC…...
【LeetCode热题100】队列+宽搜
这篇博客是关于队列宽搜的几道题,主要包括N叉树的层序遍历、二叉树的锯齿形层序遍历、二叉树最大宽度、在每个数行中找最大值。 class Solution { public:vector<vector<int>> levelOrder(Node* root) {vector<vector<int>> ret;if(!root) …...
<Sqlite><websocket>使用Sqlite与websocket,实现网页端对数据库的【读写增删】操作
前言 本文是在websocket进行通讯的基础,添加数据库进行数据的存储,数据库软件使用的是sqlite。 环境配置 系统:windows 平台:visual studio code 语言:javascript、html 库:nodejs、sqlite 概述 此前,我们实现在利用websocket和socket,将网页端与下位控制器如PLC进行…...
summernote富文本批量上传音频,视频等附件
普通项目,HTML的summernote富文本批量上传音频,视频等附件(其他附件同理) JS和CSS的引入 <head><th:block th:include"include :: summernote-css" /> </head> <body><th:block th:include"include :: summernote-js" /> …...
第六十五周周报 UP2ME
文章目录 week 65 UP2ME摘要Abstract1. 题目2. Abstract3. 文献解读3.1 Introduction3.2 创新点 4. 网络结构4.1 单变量预训练4.1.1 样例生成4.1.2 掩码自动编码器预训练4.1.3 即时反应模式 4.2 多元微调4.2.1 稀疏依赖图构造4.2.2 时域频道层 5. 实验结果6. 结论7. 部分关键代…...
Unity 使用 Excel 进行配置管理(读Excel配置表、Excel转保存Txt 文本、读取保存的 Txt 文本配置内容)
Unity 使用 Excel 进行配置管理(读Excel配置表、Excel转保存Txt 文本、读取保存的 Txt 文本配置内容) 目录 Unity 使用 Excel 进行配置管理(读Excel配置表、Excel转保存Txt 文本、读取保存的 Txt 文本配置内容) 一、简单介绍 二、实现原理 三、注意事项 四、案例简单步…...
【STM32】MPU6050简介
文章目录 MPU6050简介MPU6050关键块带有16位ADC和信号调理的三轴MEMS陀螺仪具有16位ADC和信号调理的三轴MEMS加速度计I2C串行通信接口 MPU6050对应的数据手册:MPU6050 陀螺仪加速度计 链接: https://pan.baidu.com/s/13nwEhGvsfxx0euR2hMHsyw?pwdv2i6 提取码: v2i6…...
学习日记_20241123_聚类方法(MeanShift)
前言 提醒: 文章内容为方便作者自己后日复习与查阅而进行的书写与发布,其中引用内容都会使用链接表明出处(如有侵权问题,请及时联系)。 其中内容多为一次书写,缺少检查与订正,如有问题或其他拓展…...
Qt常用控件 按钮
文章目录 1. QAbstractButton 简介2. QPushButton2.1 例子1,设置按钮的图标2.2 例子2,设置按钮快捷键 3. QRadioButton3.1 介绍3.2 例子1,选择性别3.3 例子2,试试其他的信号3.3 例子3,分组 4. QCheckBox4.1 介绍4.2 例…...
医院信息化与智能化系统(22)
医院信息化与智能化系统(22) 这里只描述对应过程,和可能遇到的问题及解决办法以及对应的参考链接,并不会直接每一步详细配置 如果你想通过文字描述或代码画流程图,可以试试PlantUML,告诉GPT你的文件结构,让他给你对应…...
嵌入式硬件实战基础篇(二)-稳定输出3.3V的太阳能电池-无限充放电
引言:本内容主要用作于学习巩固嵌入式硬件内容知识,用于想提升下述能力,针对学习稳压芯片和电容以及电池之间的运用,对于硬件PCB以及原理图的练习和前面硬件篇的实际运用;太阳能是一种清洁、可再生的能源,广…...
UE5材质篇5 简易水面
不得不说,UE5里搞一个水面实在是相比要自己写各种反射来说太友好了,就主要是开启一堆开关,lumen相关的,然后稍微连一些蓝图就几乎有了 这里要改一个shading model,要这个 然后要增加一个这个node 并且不需要连接base …...
Rollup配置实战:多产物与多入口的完美结合 (2)
常用配置 多产物配置 我们可以将 output 改造成一个数组,对外暴露出不同格式的产物供他人使用,不仅包括 ESM,也需要包括诸如CommonJS、UMD等格式,保证良好的兼容性 import { defineConfig } from rollupexport default defineC…...
iced源码分析
前言 iced是一个比较流行的UI库,设计思路还是挺有意思的,不过因为rust复杂的语法,这个库确实很难让一个不精通rust的开发者那么容易理解。这里记录下这几天的阅读源码心得。 正文 iced核心包括四个模块。 iced库,主要控制应用…...
Hadoop的MapReduce详解
文章目录 Hadoop的MapReduce详解一、引言二、MapReduce的核心概念1、Map阶段1.1、Map函数的实现 2、Reduce阶段2.1、Reduce函数的实现 三、MapReduce的执行流程四、MapReduce的使用实例Word Count示例1. Mapper类2. Reducer类3. 执行Word Count 五、总结 Hadoop的MapReduce详解…...
【Python系列】字典灵活的数据存储与操作
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...
【MCU】微控制器的编程技术:ISP 与 IAP
在嵌入式领域中,将程序下载到内置 Flash 有两种技术 ISP (In-system programming) ISP 即在系统编程,是指一些可编程逻辑器件、微控制器、芯片组和其他嵌入式设备在安装到完整嵌入式系统后能够进行编程,而不需要在将芯片安装到系统中之前对…...
TCP/IP 协议:网络世界的基石(2/10)
一、引言 在当今数字化时代,互联网已经成为人们生活中不可或缺的一部分。而在互联网的背后,TCP/IP 协议扮演着至关重要的角色,堪称互联网的基石。 TCP/IP 协议是一组用于数据通信的协议集合,它的名字来源于其中最重要的两个协议…...
小R的二叉树探险 | 模拟
问题描述 在一个神奇的二叉树中,结构非常独特: 每层的节点值赋值方向是交替的,第一层从左到右,第二层从右到左,以此类推,且该二叉树有无穷多层。 小R对这个二叉树充满了好奇,她想知道…...
Redis ⽀持哪⼏种数据类型?适⽤场景,底层结构
目录 Redis 数据类型 一、String(字符串) 二、Hash(哈希) 三、List(列表) 四、Set(集合) 五、ZSet(sorted set:有序集合) 六、BitMap 七、HyperLogLog 八、GEO …...
十、事件类型(鼠标事件、焦点.. 、键盘.. 、文本.. 、滚动..)、事件对象、事件流(事件捕获、事件冒泡、阻止冒泡和默认行为、事件委托)
1. 事件类型 1.1 鼠标事件 1.1.1 click 鼠标点击 1.1.2 mouseenter 鼠标进入 1.1.3 mouseleave 鼠标离开 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widt…...
RabbitMQ学习-One
同步调用和异步调用 1.假设我们现在又两个服务,分别是修改商品服务和查询商品服务,每个服务都有自己的数据库; 2.左侧的流程假设我们总共需要耗时40ms; 3.因为不同服务数据库不一样,所以我们就要考虑修改了左边服务的…...
蓝队基础,网络七杀伤链详解
声明! 学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下,如涉及侵权马上删除文章,笔记只是方便各位师傅的学习和探讨,文章所提到的网站以及内容,只做学习交流,其他均与本人以及泷羽sec团队无关&#…...
机器学习实战:银行客户是否认购定期存款
项目结构与步骤 1. 项目概述 项目名称:葡萄牙银行电话营销活动分析与定期存款认购预测目标:通过分析银行的电话营销数据,构建模型预测客户是否会认购定期存款。数据来源:葡萄牙银行营销活动数据集关键挑战:数据不平衡…...
【一篇搞定配置】网络分析工具WireShark的安装与入门使用
🌈 个人主页:十二月的猫-CSDN博客 🔥 系列专栏: 🏀各种软件安装与配置_十二月的猫的博客-CSDN博客 💪🏻 十二月的寒冬阻挡不了春天的脚步,十二点的黑夜遮蔽不住黎明的曙光 目录 1.…...
气膜场馆照明设计:科技与环保的完美结合—轻空间
气膜场馆的照明设计,选用高效节能的400瓦LED灯具,结合现代节能技术,提供强大而均匀的光照。LED灯具在光效和寿命方面优势显著,不仅降低运营能耗,还有效减少碳排放,为绿色场馆建设贡献力量。 科学分布&…...
C语言程序编译和链接
编译环境和运行 编译环境也可以称为翻译环境,是将源代码转换为机器可以识别的二进制指令; 运行环境也可以称为执行环境,用于实际执行代码; 翻译环境 翻译环境由编译和链接两个部分组成,而编译又可以分解为&#x…...
springBoot整合 Tess4J实现OCR识别文字(图片+PDF)
1. 环境准备 JDK 8 或更高版本Maven 3.6 或更高版本Spring Boot 2.4 或更高版本Tesseract OCR 引擎Tess4J 库 2. 安装 Tesseract OCR 引擎 下载地址: Home UB-Mannheim/tesseract Wiki GitHub linux直接安装:sudo apt-get install tesseract-ocr 3.…...
阿里数字人工作 Emote Portrait Alive (EMO):基于 Diffusion 直接生成视频的数字人方案
TL;DR 2024 年 ECCV 阿里智能计算研究所的数字人工作,基于 diffusion 方法来直接的从音频到视频合成数字人,避免了中间的三维模型或面部 landmark 的需求,效果很好。 Paper name EMO: Emote Portrait Alive - Generating Expressive Portra…...
Java将PDF保存为图片
将 PDF 文件转换为图片是常见的需求之一,特别是在需要将 PDF 内容以图像形式展示或处理时。其中最常用的是 Apache PDFBox。 使用 Apache PDFBox Apache PDFBox 是一个开源的 Java 库,可以用来处理 PDF 文档。它提供了将 PDF 页面转换为图像的功能。 …...
医药企业的终端市场营销策略
近年来,随着医药行业的快速发展,终端市场逐渐成为企业竞争的关键领域。在政策趋严、市场环境变化以及数字化转型的大背景下,医药企业如何在终端市场中立于不败之地?本文结合我们在医药数字化领域的经验,为大家剖析终端…...
使用EFK收集k8s日志
首先我们使用EFK收集Kubernetes集群中的日志,本次实验讲解的是在Kubernetes集群中启动一个Elasticsearch集群,如果企业内已经有了Elasticsearch集群,可以直接将日志输出至已有的Elasticsearch集群。 文章目录 部署elasticsearch创建Kibana创建…...
Vue3 + TypeScript 项目搭建
Vue3 TypeScript 项目搭建 环境准备 首先确保你的开发环境满足以下要求: # 检查 Node.js 版本 (需要 14.0 或更高版本) node -v# 检查 npm 版本 npm -v# 安装或更新 Vue CLI npm install -g vue/cli创建项目 使用 Vue CLI 创建项目: # 创建项目 np…...
Python操作neo4j库py2neo使用(一)
Python操作neo4j库py2neo使用(一) 安装(只用于测试) docker-compose .yml 文件 version: 3.8 services:neo4j:image: neo4j:5.6.0-enterprise #商业版镜像hostname: neo4jcontainer_name: neo4jports:- "7474:7474"-…...