【Flink系列】5. DataStream API
5. DataStream API
DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成:
5.1 执行环境(Execution Environment)
Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。
不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前Flink的运行环境,从而建立起与Flink框架之间的联系。
5.1.1 创建执行环境
我们要获取的执行环境,是StreamExecutionEnvironment类的对象,这是所有Flink程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。
1)getExecutionEnvironment
最简单的方式,就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
这种方式,用起来简单高效,是最常用的一种创建执行环境的方式。
2)createLocalEnvironment
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
3)createRemoteEnvironment
这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager主机名1234, // JobManager进程端口号"path/to/jarFile.jar" // 提交给JobManager的JAR包);
在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。
5.1.2 执行模式(Execution Mode)
从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API。
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream API执行模式包括:流执行模式、批执行模式和自动模式。
- 流执行模式(Streaming)
这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。 - 批执行模式(Batch)
专门用于批处理的执行模式。 - 自动模式(AutoMatic)
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
批执行模式的使用。主要有两种方式:
(1)通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。
(2)通过代码配置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);
在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。
实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。
5.1.3 触发程序执行
需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。
所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。
env.execute();
5.2 源算子(Source)
Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream<String> stream = env.addSource(...);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource<String> stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。
5.2.1 准备工作
为了方便练习,这里使用WaterSensor作为数据模型。
具体代码如下:
public class WaterSensor {public String id;public Long ts;public Integer vc;public WaterSensor() {}public WaterSensor(String id, Long ts, Integer vc) {this.id = id;this.ts = ts;this.vc = vc;}public String getId() {return id;}public void setId(String id) {this.id = id;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}public Integer getVc() {return vc;}public void setVc(Integer vc) {this.vc = vc;}@Overridepublic String toString() {return "WaterSensor{" +"id='" + id + '\'' +", ts=" + ts +", vc=" + vc +'}';}@Overridepublic boolean equals(Object o) {if (this == o) {return true;}if (o == null || getClass() != o.getClass()) {return false;}WaterSensor that = (WaterSensor) o;return Objects.equals(id, that.id) &&Objects.equals(ts, that.ts) &&Objects.equals(vc, that.vc);}@Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}
}
这里需要注意,我们定义的WaterSensor,有这样几个特点:
- 类是公有(public)的
- 有一个无参的构造方法
- 所有属性都是公有(public)的
- 所有属性的类型都是可以序列化的
Flink会把这样的类作为一种特殊的POJO(Plain Ordinary Java Object简单的Java对象,实际就是普通JavaBeans)数据类型来对待,方便数据的解析和序列化。另外我们在类中还重写了toString方法,主要是为了测试输出显示更清晰。
我们这里自定义的POJO类会在后面的代码中频繁使用,所以在后面的代码中碰到,把这里的POJO类导入就好了。
5.2.2 从集合中读取数据
最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();List<Integer> data = Arrays.asList(1, 22, 3);DataStreamSource<Integer> ds = env.fromCollection(data);stream.print();env.execute();
}
5.2.3 从文件读取数据
真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。
读取文件,需要添加文件连接器依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version>
</dependency>
示例如下:
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/word.txt")).build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),"file").print();env.execute();
}
说明:
- 参数可以是目录,也可以是文件;还可以从HDFS目录下读取,使用路径hdfs://…;
- 路径可以是相对路径,也可以是绝对路径;
- 相对路径是从系统属性user.dir获取路径:idea下是project的根目录,standalone模式下是集群节点根目录;
5.2.4 从Socket读取数据
不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。
我们之前用到的读取socket文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。
DataStream<String> stream = env.socketTextStream("localhost", 7777);
5.2.5 从Kafka读取数据
Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。
所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。这里我们需要导入的依赖如下。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version>
</dependency>
代码如下:
public class SourceKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setTopics("topic_1").setGroupId("atguigu").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()) .build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");stream.print("Kafka");env.execute();}
}
5.2.6 从数据生成器读取数据
Flink从1.11开始提供了一个内置的DataGen 连接器,主要是用于生成一些随机数,用于在没有数据源的时候,进行流任务的测试以及性能测试等。1.17提供了新的Source写法,需要导入依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-datagen</artifactId><version>${flink.version}</version></dependency>
代码如下:
public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataGeneratorSource<String> dataGeneratorSource =new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:"+value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();env.execute();}
}
5.2.7 Flink支持的数据类型
1)Flink的类型系统
Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
2)Flink支持的数据类型
对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:
(1)基本类型
所有Java基本类型及其包装类,再加上Void、String、Date、BigDecimal和BigInteger。
(2)数组类型
包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。
(3)复合数据类型
- Java元组类型(TUPLE):这是Flink内置的元组类型,是Java API的一部分。最多25个字段,也就是从Tuple0~Tuple25,不支持空字段。
- Scala 样例类及Scala元组:不支持空字段。
- 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
- POJO:Flink自定义的类似于Java bean模式的类。
(4)辅助类型
Option、Either、List、Map等。
(5)泛型类型(GENERIC)
Flink支持所有的Java类和Scala类。不过如果没有按照上面POJO类型的要求来定义,就会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由Flink本身序列化的,而是由Kryo序列化的。
在这些类型中,元组类型和POJO类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。所以,在项目实践中,往往会将流处理程序中的元素类型定为Flink的POJO类型。
Flink对POJO类型的要求如下:
- 类是公有(public)的
- 有一个无参的构造方法
- 所有属性都是公有(public)的
- 所有属性的类型都是可以序列化的
3)类型提示(Type Hints)
Flink还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于Java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
为了解决这类问题,Java API提供了专门的“类型提示”(type hints)。
回忆一下之前的word count流处理程序,我们在将String类型的每个词转换成(word, count)二元组后,就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
Flink还专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明确地指定转换之后的DataStream里元素的类型。
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
5.3 转换算子(Transformation)
数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
5.3.1 基本转换算子(map/ filter/ flatMap)
5.3.1.1 映射(map)
map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改变。
下面的代码用不同的方式,实现了提取WaterSensor中的id字段的功能。
public class TransMap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),new WaterSensor("sensor_2", 2, 2));// 方式一:传入匿名类,实现MapFunctionstream.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}).print();// 方式二:传入MapFunction的实现类// stream.map(new UserMap()).print();env.execute();}public static class UserMap implements MapFunction<WaterSensor, String> {@Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}
}
上面代码中,MapFunction实现类的泛型类型,与输入数据类型和输出数据的类型有关。在实现MapFunction接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还需要重写一个map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。
5.3.1.2 过滤(filter)
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。
案例需求:下面的代码会将数据流中传感器id为sensor_1的数据过滤出来。
public class TransFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));// 方式一:传入匿名类实现FilterFunctionstream.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}).print();// 方式二:传入FilterFunction实现类// stream.filter(new UserFilter()).print();env.execute();}public static class UserFilter implements FilterFunction<WaterSensor> {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}
}
5.3.1.3 扁平映射(flatMap)
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
同map一样,flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。
案例需求:如果输入的数据是sensor_1,只打印vc;如果输入的数据是sensor_2,既打印ts又打印vc。
实现代码如下:
public class TransFlatmap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));stream.flatMap(new MyFlatMap()).print();env.execute();}public static class MyFlatMap implements FlatMapFunction<WaterSensor, String> {@Overridepublic void flatMap(WaterSensor value, Collector<String> out) throws Exception {if (value.id.equals("sensor_1")) {out.collect(String.valueOf(value.vc));} else if (value.id.equals("sensor_2")) {out.collect(String.valueOf(value.ts));out.collect(String.valueOf(value.vc));}}}
}
5.3.2 聚合算子(Aggregation)
计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),类似于MapReduce中的reduce操作。
5.3.2.1 按键分区(keyBy)
对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。
基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。
在内部,是通过计算key的哈希值(hash code),对分区数进行取模运算来实现的。所以这里key如果是POJO的话,必须要重写hashCode()方法。
keyBy()方法需要传入一个参数,这个参数指定了一个或一组key。有很多不同的方法来指定key:比如对于Tuple数据类型,可以指定字段的位置或者多个位置的组合;对于POJO类型,可以指定字段的名称(String);另外,还可以传入Lambda表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取key的逻辑。
我们可以以id作为key做一个分区操作,代码实现如下:
public class TransKeyBy {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));// 方式一:使用Lambda表达式KeyedStream<WaterSensor, String> keyedStream = stream.keyBy(e -> e.id);// 方式二:使用匿名类实现KeySelectorKeyedStream<WaterSensor, String> keyedStream1 = stream.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor e) throws Exception {return e.id;}});env.execute();}
}
需要注意的是,keyBy得到的结果将不再是DataStream,而是会将DataStream转换为KeyedStream。KeyedStream可以认为是“分区流”或者“键控流”,它是对DataStream按照key的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定key的类型。
KeyedStream也继承自DataStream,所以基于它的操作也都归属于DataStream API。但它跟之前的转换操作得到的SingleOutputStreamOperator不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如sum,reduce)。
5.3.2.2 简单聚合(sum/min/max/minBy/maxBy)
有了按键分区的数据流KeyedStream,我们就可以基于它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:
- sum():在输入流上,对指定的字段做叠加求和的操作。
- min():在输入流上,对指定的字段求最小值。
- max():在输入流上,对指定的字段求最大值。
- minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
- maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。
简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。
对于元组类型的数据,可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以f0、f1、f2、…来命名的。
如果数据流的类型是POJO类,那么就只能通过字段名称来指定,不能通过位置来指定了。
public class TransAggregation {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));stream.keyBy(e -> e.id).max("vc"); // 指定字段名称env.execute();}
}
简单聚合算子返回的,同样是一个SingleOutputStreamOperator,也就是从KeyedStream又转换成了常规的DataStream。所以可以这样理解:keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个DataStream。而且经过简单聚合之后的数据流,元素的数据类型保持不变。
一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,应该只用在含有有限个key的数据流上。
5.3.2.3 归约聚合(reduce)
reduce可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
调用KeyedStream的reduce方法时,需要传入一个参数,实现ReduceFunction接口。接口在源码中的定义如下:
public interface ReduceFunction<T> extends Function, Serializable {T reduce(T value1, T value2) throws Exception;
}
ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
我们可以单独定义一个函数类实现ReduceFunction接口,也可以直接传入一个匿名类。当然,同样也可以通过传入Lambda表达式实现类似的功能。
为了方便后续使用,定义一个WaterSensorMapFunction:
public class WaterSensorMapFunction implements MapFunction<String,WaterSensor> {@Overridepublic WaterSensor map(String value) throws Exception {String[] datas = value.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]) ,Integer.valueOf(datas[2]) );}
}
案例:使用reduce实现max和maxBy的功能。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).keyBy(WaterSensor::getId).reduce(new ReduceFunction<WaterSensor>(){@Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println("Demo7_Reduce.reduce");int maxVc = Math.max(value1.getVc(), value2.getVc());//实现max(vc)的效果 取最大值,其他字段以当前组的第一个为主//value1.setVc(maxVc);//实现maxBy(vc)的效果 取当前最大值的所有字段if (value1.getVc() > value2.getVc()){value1.setVc(maxVc);return value1;}else {value2.setVc(maxVc);return value2;}}}).print();
env.execute();
reduce同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以我们需要将reduce算子作用在一个有限key的流上。
5.3.3 用户自定义函数(UDF)
用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类。
5.3.3.1 函数类(Function Classes)
Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
需求:用来从用户的点击数据中筛选包含“sensor_1”的内容:
方式一:实现FilterFunction接口
public class TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3));DataStream<String> filter = stream.filter(new UserFilter());filter.print();env.execute();}public static class UserFilter implements FilterFunction<WaterSensor> {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}}
}
方式二:通过匿名类来实现FilterFunction接口:
DataStream<String> stream = stream.filter(new FilterFunction< WaterSensor>() {@Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals("sensor_1");}
});
方式二的优化:为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去。
DataStreamSource<WaterSensor> stream = env.fromElements(
new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)
);DataStream<String> stream = stream.filter(new FilterFunctionImpl("sensor_1"));public static class FilterFunctionImpl implements FilterFunction<WaterSensor> {private String id;FilterFunctionImpl(String id) { this.id=id; }@Overridepublic boolean filter(WaterSensor value) throws Exception {return thid.id.equals(value.id);}
}
方式三:采用匿名函数(Lambda)
public class TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<WaterSensor> stream = env.fromElements(new WaterSensor("sensor_1", 1, 1),
new WaterSensor("sensor_1", 2, 2),
new WaterSensor("sensor_2", 2, 2),
new WaterSensor("sensor_3", 3, 3)); //map函数使用Lambda表达式,不需要进行类型声明SingleOutputStreamOperator<String> filter = stream.filter(sensor -> "sensor_1".equals(sensor.id));filter.print();env.execute();}
}
5.3.3.2 富函数类(Rich Function Classes)
“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
- open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
- close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。
来看一个例子说明:
public class RichFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(1,2,3,4).map(new RichMapFunction<Integer, Integer>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic Integer map(Integer integer) throws Exception {return integer + 1;}@Overridepublic void close() throws Exception {super.close();System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}}).print();env.execute();}
}
5.3.4 物理分区算子(Physical Partitioning)
常见的物理分区策略有:随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)。
5.3.4.1 随机分区(shuffle)
最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。
随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。
经过随机分区之后,得到的依然是一个DataStream。
我们可以做个简单测试:将数据读入之后直接打印到控制台,将输出的并行度设置为2,中间经历一次shuffle。执行多次,观察结果是否相同。
public class ShuffleExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> stream = env.socketTextStream("hadoop102", 7777);;stream.shuffle().print()env.execute();}
}
5.3.4.2 轮询分区(Round-Robin)
轮询,简单来说就是“发牌”,按照先后顺序将数据做依次分发。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。
stream.rebalance()
5.3.4.3 重缩放分区(rescale)
重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中。rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。
stream.rescale()
5.3.4.4 广播(broadcast)
这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
stream.broadcast()
5.3.4.5 全局分区(global)
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。
stream.global()
5.3.4.6 自定义分区(Custom)
当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。
1)自定义分区器
public class MyPartitioner implements Partitioner<String> {@Overridepublic int partition(String key, int numPartitions) {return Integer.parseInt(key) % numPartitions;}
}
2)使用自定义分区
public class PartitionCustomDemo {public static void main(String[] args) throws Exception {
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(2);DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);DataStream<String> myDS = socketDS.partitionCustom(new MyPartitioner(),value -> value);myDS.print();env.execute();}
}
5.3.5 分流
所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
5.3.5.1 简单实现
其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。
代码实现:
public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<Integer> ds = env.socketTextStream("hadoop102", 7777).map(Integer::valueOf);//将ds 分为两个流 ,一个是奇数流,一个是偶数流//使用filter 过滤两次SingleOutputStreamOperator<Integer> ds1 = ds.filter(x -> x % 2 == 0);SingleOutputStreamOperator<Integer> ds2 = ds.filter(x -> x % 2 == 1);ds1.print("偶数");ds2.print("奇数");env.execute();}
}
这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流stream复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?
5.3.5.2 使用侧输出流
关于处理函数中侧输出流的用法,我们已经在7.5节做了详细介绍。简单来说,只需要调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的id和类型。
代码实现:将WaterSensor按照Id类型进行分流。
public class SplitStreamByOutputTag {
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class)){};OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class)){};//返回的都是主流SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>(){@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {if ("s1".equals(value.getId())) {ctx.output(s1, value);} else if ("s2".equals(value.getId())) {ctx.output(s2, value);} else {//主流out.collect(value);}}});ds1.print("主流,非s1,s2的传感器");SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1);SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2);s1DS.printToErr("s1");s2DS.printToErr("s2");env.execute();}
}
5.3.6 基本合流操作
在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。
5.3.6.1 联合(Union)
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在代码中,我们只要基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合了;得到的依然是一个DataStream:
stream1.union(stream2, stream3, ...)
注意:union()的参数可以是多个DataStream,所以联合操作可以实现多条流的合并。
代码实现:我们可以用下面的代码做一个简单测试:
public class UnionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> ds2 = env.fromElements(2, 2, 3);DataStreamSource<String> ds3 = env.fromElements("2", "2", "3");ds1.union(ds2,ds3.map(Integer::valueOf)).print();env.execute();}
}
5.3.6.2 连接(Connect)
流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect)。
1)连接流(ConnectedStreams)
代码实现:需要分为两步:首先基于一条DataStream调用.connect()方法,传入另外一条DataStream作为参数,将两条流连接起来,得到一个ConnectedStreams;然后再调用同处理方法得到DataStream。这里可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法。
public class ConnectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3);
// DataStreamSource<String> source2 = env.fromElements("a", "b", "c");SingleOutputStreamOperator<Integer> source1 = env.socketTextStream("hadoop102", 7777).map(i -> Integer.parseInt(i));DataStreamSource<String> source2 = env.socketTextStream("hadoop102", 8888);/*** TODO 使用 connect 合流* 1、一次只能连接 2条流* 2、流的数据类型可以不一样* 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的*/ConnectedStreams<Integer, String> connect = source1.connect(source2);SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer value) throws Exception {return "来源于数字流:" + value.toString();}@Overridepublic String map2(String value) throws Exception {return "来源于字母流:" + value;}});result.print();env.execute(); }
}
上面的代码中,ConnectedStreams有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用.map()方法时传入的不再是一个简单的MapFunction,而是一个CoMapFunction,表示分别对两条流中的数据执行map操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1()就是对第一条流中数据的map操作,.map2()则是针对第二条流。
2)CoProcessFunction
与CoMapFunction类似,如果是调用.map()就需要传入一个CoMapFunction,需要实现map1()、map2()两个方法;而调用.process()时,传入的则是一个CoProcessFunction。它也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。
值得一提的是,ConnectedStreams也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个ConnectedStreams:
connectedStreams.keyBy(keySelector1, keySelector2);
这里传入两个参数keySelector1和keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的keyBy用法完全一致。ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。
案例需求:连接两条流,输出能根据id匹配上的数据(类似inner join效果)
public class ConnectKeybyDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(Tuple2.of(1, "a1"),Tuple2.of(1, "a2"),Tuple2.of(2, "b"),Tuple2.of(3, "c"));DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(Tuple3.of(1, "aa1", 1),Tuple3.of(1, "aa2", 2),Tuple3.of(2, "bb", 1),Tuple3.of(3, "cc", 1));ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);// 多并行度下,需要根据 关联条件 进行keyby,才能保证key相同的数据到一起去,才能匹配上ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKey = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);SingleOutputStreamOperator<String> result = connectKey.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {// 定义 HashMap,缓存来过的数据,key=id,value=list<数据>Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();@Overridepublic void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// TODO 1.来过的s1数据,都存起来if (!s1Cache.containsKey(id)) {// 1.1 第一条数据,初始化 value的list,放入 hashmapList<Tuple2<Integer, String>> s1Values = new ArrayList<>();s1Values.add(value);s1Cache.put(id, s1Values);} else {// 1.2 不是第一条,直接添加到 list中s1Cache.get(id).add(value);}//TODO 2.根据id,查找s2的数据,只输出 匹配上 的数据if (s2Cache.containsKey(id)) {for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {out.collect("s1:" + value + "<--------->s2:" + s2Element);}}}@Overridepublic void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// TODO 1.来过的s2数据,都存起来if (!s2Cache.containsKey(id)) {// 1.1 第一条数据,初始化 value的list,放入 hashmapList<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();s2Values.add(value);s2Cache.put(id, s2Values);} else {// 1.2 不是第一条,直接添加到 list中s2Cache.get(id).add(value);}//TODO 2.根据id,查找s1的数据,只输出 匹配上 的数据if (s1Cache.containsKey(id)) {for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {out.collect("s1:" + s1Element + "<--------->s2:" + value);}}}});result.print();env.execute();}
}
5.4 输出算子(Sink)
Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。
5.4.1 连接到外部系统
Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
Flink1.12以前,Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
Flink1.12开始,同样重构了Sink架构,
stream.sinkTo(…)
当然,Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:
我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir框架,也实现了一些其他第三方系统与Flink的连接器。
除此以外,就需要用户自定义实现sink连接器了。
5.4.2 输出到文件
Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:
- 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
- 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。
示例:
public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中,都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint,否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "Number:" + value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");// 输出到文件系统FileSink<String> fieSink = FileSink// 输出行式存储的文件,指定路径、指定编码.<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))// 输出文件的一些配置: 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("atguigu-").withPartSuffix(".log").build())// 按照目录分桶:如下,就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))// 文件滚动策略: 1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();}
}
5.4.3 输出到Kafka
(1)添加Kafka 连接器依赖
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
(2)启动Kafka集群
(3)编写输出到Kafka的示例代码
输出无key的record:
public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次,必须开启checkpoint(后续章节介绍)env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** Kafka Sink:* TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可* 1、开启checkpoint(后续介绍)* 2、设置事务前缀* 3、设置事务超时时间: checkpoint间隔 < 事务超时时间 < max的15分钟*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")// 指定序列化器:指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("ws").setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别: 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("atguigu-")// 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}
自定义序列化器,实现带key的record:
public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop102", 7777);/*** 如果要指定写入kafka的key,可以自定义序列化器:* 1、实现 一个接口,重写 序列化 方法* 2、指定key,转成 字节数组* 3、指定value,转成 字节数组* 4、返回一个 ProducerRecord对象,把key、value放进去*/KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092").setRecordSerializer(new KafkaRecordSerializationSchema<String>() {@Nullable@Overridepublic ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas = element.split(",");byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);byte[] value = element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord<>("ws", key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("atguigu-").setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();sensorDS.sinkTo(kafkaSink);env.execute();}
}
(4)运行代码,在Linux主机启动一个消费者,查看是否收到数据
[atguigu@hadoop102 ~]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws
5.4.4 输出到MySQL(JDBC)
写入数据的MySQL的测试步骤如下。
(1)添加依赖
添加MySQL驱动:
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version>
</dependency>
官方还未提供flink-connector-jdbc的1.17.0的正式依赖,暂时从apache snapshot仓库下载,pom文件中指定仓库路径:
<repositories><repository><id>apache-snapshots</id><name>apache snapshots</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url></repository>
</repositories>
添加依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.17-SNAPSHOT</version>
</dependency>
如果不生效,还需要修改本地maven的配置文件,mirrorOf中添加如下标红内容:
<mirror><id>aliyunmaven</id><mirrorOf>*,!apache-snapshots</mirrorOf><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public</url></mirror>
(2)启动MySQL,在test库下建表ws
mysql>
CREATE TABLE `ws` (`id` varchar(100) NOT NULL,`ts` bigint(20) DEFAULT NULL,`vc` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
(3)编写输出到MySQL的示例代码
public class SinkMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法: addsink* 2、JDBCSink的4个参数:* 第一个参数: 执行的sql,一般就是 insert into* 第二个参数: 预编译sql, 对占位符填充值* 第三个参数: 执行选项 ---》 攒批、重试* 第四个参数: 连接选项 ---》 url、用户名、密码*/SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink("insert into ws values(?,?,?)",new JdbcStatementBuilder<WaterSensor>() {@Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor,如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小:条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8").withUsername("root").withPassword("000000").withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());sensorDS.addSink(jdbcSink);env.execute();}
}
(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。
5.4.5 自定义Sink输出
如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction<String>());
在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。
相关文章:
【Flink系列】5. DataStream API
5. DataStream API DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成: 5.1 执行环境(Execution Environment) Flink程序可以在各种上下文…...
【tailscale 和 ssh】当服务器建立好节点,但通过客户端无法通过 ssh 连接
背景 当服务器建立好节点,一切显示正常但通过客户端无法通过 vs code 中的 ssh 连接到服务器 问题解决 因为服务器是重装过的,所以忘记在服务器上下载 ssh 了。。。安装完成并启动 SSH 服务后便可正常连接! sudo apt update sudo apt in…...
TDengine 做 Apache SuperSet 数据源
Apache Superset 是一个现代的企业级商业智能(BI)Web 应用程序,主要用于数据探索和可视化。它由 Apache 软件基金会支持,是一个开源项目,它拥有活跃的社区和丰富的生态系统。Apache Superset 提供了直观的用户界面…...
PCL 新增自定义点类型【2025最新版】
目录 一、自定义点类型1、前言2、定义方法3、代码示例二、合并现有类型三、点云按时间渲染1、CloudCompare渲染2、PCL渲染博客长期更新,本文最近更新时间为:2025年1月18日。 一、自定义点类型 1、前言 PCL库自身定义了很多点云类型,但是在使用的时候时如果要使用自己定义的…...
【记录52】el-table-column 添加fixed属性 滚动条无法滑动
问题: el-table-column 添加fixed属性 滚动条无法滑动 使用element UI组件,用到el-table的el-table-column的fixed属性时,当滚动条长度小于固定列时,滚动条无法通过鼠标去点击滑动操作 原因 fixed是用来固定列的属性,其…...
华为OD机试E卷 ---最大值
一、题目描述 给定一组整数(非负),重排顺序后输出一个最大的整数。 二、示例1 用例1 输入 10 9输出 910说明:输出结果可能非常大,所以你需要返回一个 字符串只而不是整数。 三、输入描述 数字组合 四、输出描述 最大的整数 五、解题思路 字符…...
服务器迁移MySQL
由于公司原有的服务器不再使用,需要将老的服务器上的MySQL迁移到新的服务器上,因此需要对数据进行备份迁移,前提是两台服务器已安装相同版本的MySQL,这里就不再讲解MySQL的安装步骤了,可以安装包、可以在线下载、可以容…...
.Net Core微服务入门全纪录(二)——Consul-服务注册与发现(上)
系列文章目录 1、.Net Core微服务入门系列(一)——项目搭建 2、.Net Core微服务入门全纪录(二)——Consul-服务注册与发现(上) 3、.Net Core微服务入门全纪录(三)——Consul-服务注…...
【Linux系统】分区挂载
我们能够根据一个 inode 号在指定分区寻找目标文件的 struct inode,也能根据目录文件的内容,通过映射关系,找指定的 inode,可是,现在有个问题: 问题:inode 是不能跨分区使用的!Linu…...
进阶——十六届蓝桥杯嵌入式熟练度练习(按键+LCD)
高亮(一) 声明 char buf[21];unsigned char upled0x04;uint8_t key_val;uint8_t key_down,key_up,key_old;uint32_t key_time;uint8_t key_temp,key_flag;uint8_t line_flag; 按键代码 void key_proc(void) { key_valkey_scan();key_downkey_val&…...
简单介绍JSONStream的使用
地址 作用 这个模块是根据需要筛选出json数据中自己所需要的数据 使用 var JSONStream require("JSONStream"); var parse require("fast-json-parse"); var fs require("fs");fs.createReadStream("./time.json").pipe(JSONSt…...
LTX-Video 高效视频生成模型,一键处理图片文字
LTX-Video 是由 Lightricks 在 2024 年开发的一种视频生成模型,这种模型采用了 transformer 和 Video-VAE 技术,能够高效生成高分辨率视频。此外,LTX-Video 支持多种视频生成方式,包括从文本到视频和从图像到视频。 教程链接&…...
AUTOSAR从入门到精通专栏总目录
AUTOSAR通过分层、模块化和封装的特性,革命性地改变了汽车电子软件开发。它简化了软硬件接口,提高了软件复用性,降低了开发成本和周期。随着 AUTOSAR 在汽车电子系统开发领域的广泛应用,对其深入理解和熟练掌握已成为汽车电子软件…...
Windows环境本地配置pyspark环境详细教程
目录 一、背景简记二、本地单机spark环境配置详细步骤第一步:python环境安装第二步:安装jdk及配置环境变量安装包下载安装环境变量配置 第三步:安装Spark安装包下载安装配置环境变量 第四步:安装hadoop安装包下载安装配置环境变量…...
第8篇:从入门到精通:掌握Python异常处理
第8篇:异常处理 内容简介 本篇文章将深入探讨Python中的异常处理机制。您将学习异常的基本概念与类型,掌握使用try-except块处理异常的方法,了解finally语句的作用,以及如何抛出和定义自定义异常。通过丰富的代码示例࿰…...
聚铭网络6款产品入选CCIA《网络安全专用产品指南》
近日,中国网络安全产业联盟CCIA正式发布《网络安全专用产品指南》(第二版)(以下简称《指南》)。聚铭网络凭借突出技术优势、创新能力以及市场积累,旗下安全产品成功入选防火墙、网络安全审计、日志分析、网…...
高等数学学习笔记 ☞ 不定积分的积分方法
1. 第一换元积分法 1. 基础概念:形如的过程,称为第一换元积分法。 2. 核心思想:通过对被积函数的观察(把被积函数的形式与积分表的积分公式进行比较),把外部的部分项拿到的内部(求原函数), 然后进行拼凑,…...
08、如何预防SQL注入
目录 1、分析及其存在哪些危险 2、预防SQL注入 1、分析及其存在哪些危险 原理: SQL 注入是一种常见的网络攻击手段,攻击者通过在用户输入中插入恶意的 SQL 语句,利用程序对用户输入处理不当的漏洞,使恶意 SQL 语句被数据库服务器执行。 通常发生在应用程序将用户输入直接拼…...
如何发布自己的第一个Chrome扩展程序
如何发布自己的Chrome扩展程序 只需要六步即可完成Chrome扩展程序的发布 (1)首先打开google chrome 应用商城注册开发者账号的页面 (2)现在进行一个绑卡支付5美元的一次性注册费用即可。【不知道如何绑卡的支付的,文…...
ubuntu开机自启,其他方式
在 Ubuntu 22.04 中,如果不使用 .service 文件,仍然有其他方法可以让某个 .sh 文件随着系统启动而自动运行。以下是几种替代方法: 方法一:使用 crontab 的 reboot 选项 crontab 是 Linux 中用于设置周期性被执行的任务的工具。通…...
容器渗透横向
本质上要获得 1.获得容器IP段 2.获得主机IP段 3.获得本机IP 4.通过CNI或Docker0等扫描本机端口 Flannel 容器信息 rootubuntu-linux-22-04-desktop:/home/parallels/Desktop# k get po -A -o wide NAMESPACE NAME …...
软件测试—接口测试面试题及jmeter面试题
一,接口面试题 1.接口的作用 实现前后端的交互,实现数据的传输 2.什么是接口测试 接口测试就是对系统或组件之间的接口进行测试,主要是校验数据的交换、传递和控制管理过程,以及相互逻辑关系 3.接口测试必要性 1.可以发现很…...
ansible之playbook剧本
Playbook 1.playbook的组成部分 开头 用 --- 表示,表示是一个yaml文件,但是可以忽略不写 Tasks(任务) 包含了在目标主机上执行的操作,操作还是由模板来执行。每一个任务都是一个ansible的模块,调用系统命…...
基于 Python 的财经数据接口库:AKShare
AKShare 是基于 Python 的财经数据接口库,目的是实现对股票、期货、期权、基金、外汇、债券、指数、加密货币等金融产品的基本面数据、实时和历史行情数据、衍生数据从数据采集、数据清洗到数据落地的一套工具,主要用于学术研究目的。 安装 安装手册见…...
强网杯RS加密签名伪造及PyramidWeb利用栈帧打内存马
RS加密签名伪造及PyramidWeb利用栈帧打内存马 今年强网杯的一个新框架源代码如下 from wsgiref.simple_server import make_server from pyramid.config import Configurator from pyramid.events import NewResponse from pyramid.response import Response import utiluser…...
Java基础——概念和常识(语言特点、JVM、JDK、JRE、AOT/JIT等介绍)
我是一个计算机专业研0的学生卡蒙Camel🐫🐫🐫(刚保研) 记录每天学习过程(主要学习Java、python、人工智能),总结知识点(内容来自:自我总结网上借鉴࿰…...
搭建Node.js后端
从头开始搭建一个Node.js后端,并实现查询历史数据的功能,下面是详细的步骤说明,包括环境配置、项目初始化、代码编写、以及服务器启动。 1. 环境配置 1.1 安装 Node.js 和 npm 首先,你需要在你的电脑上安装 Node.js 和 npm&…...
RV1126+FFMPEG推流项目(9)AI和AENC模块绑定,并且开启线程采集
前面两篇已经交代AI和AENC模块的配置,这篇就让这两个模块绑定起来,绑定的原因是,Aenc从Ai模块拿到采集的原始数据进行编码。 使用 RK_MPI_SYS_Bind 把 AI 节点和 AENC 进行绑定,其中 enModId 是模块 ID 号选择的是 RK_ID_AI、s32C…...
vue编写一个可拖动的模块,并可以和任何其他组件组合使用
实现思路: 使用 Vue 的自定义指令(directive)来处理拖动逻辑。在 mounted 钩子中添加鼠标事件监听器,以实现拖动功能。在 unmounted 钩子中移除鼠标事件监听器,防止内存泄漏。 代码示例: <template&g…...
Spring6.0新特性-HTTP接口:使用@HttpExchange实现更优雅的Http客户端
文章目录 一、概述二、使用1、创建接口HttpExchange方法2、创建一个在调用方法时执行请求的代理3、方法参数4、返回值5、错误处理(1)为RestClient(2)为WebClient(3)为RestTemplate 注意 一、概述 官方文档…...
【蓝桥杯】Python算法——求逆元的两种算法
目录 零、前言一、逆元二、扩展欧几里得算法三、费马小定理四、总结 零、前言 距离25年蓝桥杯还有大概三个月时间,接下来重点应该会放在蓝桥杯备考方向,一起努力,一起加油 一、逆元 什么是逆元?这是数论中的一个基本概念。如果存…...
音频语言模型与多模态体系结构
音频语言模型与多模态体系结构 多模态模型正在创造语言、视觉和语音等以前独立的研究领域的协同效应。这些模型使用通用架构,将每种模式视为不同的“token”,使它们能够以一种与人类认知非常相似的方式联合建模和理解世界。 可以将多模态分为两个主要领域:输入空间(…...
Redis的安装和配置、基本命令
一、实验目的 本实验旨在帮助学生熟悉Redis的安装、配置和基本使用,包括启动Redis服务、使用命令行客户端进行操作、配置Redis、进行多数据库操作以及掌握键值相关和服务器相关的命令。 二、实验环境准备 1. JAVA环境准备:确保Java Development Kit …...
2025.1.17——三、SQLi regexp正则表达式|
题目来源:buuctf [NCTF2019]SQLi1 目录 一、打开靶机,整理信息 二、解题思路 step 1:正常注入 step 2:弄清关键字黑名单 1.目录扫描 2.bp爆破 step 3:根据过滤名单构造payload step 4:regexp正则注…...
flutter 常用UI组件
文章目录 1. Toast 文本提示框oktoastbot_toast2. loading 加载窗flutter_easyloading3. 对话框gex dialog4.下拉刷新pull_to_refresh5. pop 窗custom_pop_up_menu6. pin code 密码框pinput7. 二维码qr_flutter8. swiper 滚动组件carousel_sliderflutter_swiper_view9. Badge 角…...
基于微信小程序的健身房预约管理系统
作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:…...
Windows远程桌面网关出现重大漏洞
微软披露了其Windows远程桌面网关(RD Gateway)中的一个重大漏洞,该漏洞可能允许攻击者利用竞争条件,导致拒绝服务(DoS)攻击。该漏洞被标识为CVE-2025-21225,已在2025年1月的补丁星期二更新中得到…...
如何学习Transformer架构
Transformer架构自提出以来,在自然语言处理领域引发了革命性的变化。作为一种基于注意力机制的模型,Transformer解决了传统序列模型在并行化和长距离依赖方面的局限性。本文将探讨Transformer论文《Attention is All You Need》与Hugging Face Transform…...
RPA赋能内容创作:打造小红书入门词语图片的全自动化流程
🌟 嗨,我是LucianaiB! 🌍 总有人间一两风,填我十万八千梦。 🚀 路漫漫其修远兮,吾将上下而求索。 用RPA全自动化批量生产【入门词语】图片做小红书商单,保姆级工具开发教程 最近由…...
Quantum supremacy using a programmable superconducting processor 全文翻译,配公式和图
2019年,谷歌的量子霸权文章, 原文链接: https://www.nature.com/articles/s41586-019-1666-5 第一稿,公式和视图后续再加 。。。 Quantum supremacy using a programmable superconducting processor 使用可编程超导处理器的量…...
Jupyter notebook中运行dos指令运行方法
Jupyter notebook中运行dos指令运行方法 目录 Jupyter notebook中运行dos指令运行方法一、DOS(磁盘操作系统)指令介绍1.1 DOS介绍1.2 DOS指令1.2.1 DIR - 显示当前目录下的文件和子目录列表。1.2.2 CD 或 CHDIR - 改变当前目录1.2.3 使用 CD .. 可以返回上一级目录1…...
个人vue3-学习笔记
声明:这只是我个人的学习笔记(黑马),供以后复习用 。一天学一点,随时学随时更新。明天会更好的! 这里只给代码,不给运行结果,看不出来代码的作用我也该进厂了。。。。。 Day1 使用create-vue创建项目。 1.检查版本。 node -v 2.创建项目 npm init vue@latest 可…...
Ubuntu安装搜狗输入法
Ubuntu安装搜狗输入法 官网完整安装步骤调整默认输入法 官网 搜狗输入法 - 官网:https://shurufa.sogou.com/ 搜狗输入法Linux版 - 首页:https://shurufa.sogou.com/linux 搜狗输入法Linux版 - 安装指南:https://shurufa.sogou.com/linux/gu…...
【C】链表算法题2 -- 反转链表
leetcode链接https://leetcode.cn/problems/reverse-linked-list/description/https://leetcode.cn/problems/reverse-linked-list/description/https://leetcode.cn/problems/reverse-linked-list/description/https://leetcode.cn/problems/reverse-linked-list/description/…...
1.7 ChatGPT:引领AI对话革命的致胜之道
ChatGPT:引领AI对话革命的致胜之道 随着人工智能(AI)技术的迅猛发展,特别是在自然语言处理(NLP)领域,OpenAI 的 ChatGPT 已经成为了举世瞩目的技术突破。从普通的自动化客服到深入的创作与协作,ChatGPT 通过其卓越的语言理解和生成能力,改变了人们与计算机交互的方式…...
当设置dialog中有el-table时,并设置el-table区域的滚动,看到el-table中多了一条横线
问题:当设置dialog中有el-table时,并设置el-table区域的滚动,看到el-table中多了一条横线; 原因:el-table有一个before的伪元素作为表格的下边框下,初始的时候已设置,在滚动的时候并没有重新设置…...
如何使用 useMemo 和 memo 优化 React 应用性能?
使用 useMemo 和 memo 优化 React 应用性能 在构建复杂的 React 应用时,性能优化是确保应用流畅运行的关键。React 提供了多种工具来帮助开发者优化组件的渲染和计算逻辑,其中 useMemo 和 memo 是两个非常有用的 Hook。本文将详细介绍这两个工具的使用方…...
turtle教学课程课堂学习考试在线网站
完整源码项目包获取→点击文章末尾名片!...
spark任务优化参数整理
以下参数中有sql字眼的一般只有spark-sql模块生效,如果你看过spark的源码,你会发现sql模块是在core模块上硬生生干了一层,所以反过来spark-sql可以复用core模块的配置,例外的时候会另行说明,此外由于总结这些参数是在不…...
TCP TIME-WAIT 状态为什么要坚持 2MSL
经常有人问这个问题,这种问题问我就对了。我准备了下面的一幅时序图来解释这个问题: 简单点说就是两个目的: 正常处理被动关闭方的重传 FIN;确保当前连接的所有报文全部消失。 也就是说,无论任何情况下,…...