当前位置: 首页 > news >正文

Flink中普通API的使用

本篇文章从Source、Transformation(转换因子)、sink这三个地方进行讲解

Source:

  1. 创建DataStream
  2. 本地文件
  3. Socket
  4. Kafka

Transformation(转换因子):

  1. map
  2. FlatMap
  3. Filter
  4. KeyBy
  5. Reduce
  6. Union和connect
  7. Side Outputs

sink:

  1. print 打印
  2. writerAsText 以文本格式输出
  3. writeAsCsv 以csv格式输出
  4. 输出到MySQL
  5. 输出到kafka
  6. 自定义输出

先准备一个模板方便后续使用


#if (${PACKAGE_NAME} && ${PACKAGE_NAME} != "")package ${PACKAGE_NAME};#end
#parse("File Header.java")
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**@基本功能:@program:${PROJECT_NAME}@author: ${USER}@create:${YEAR}-${MONTH}-${DAY} ${HOUR}:${MINUTE}:${SECOND}**/
public class ${NAME} {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();
}
}

Source:

预定义source

创建DataStream(四种)

  • 使用env.fromElements:类型要一致
  • 使用env.fromcollections:支持多种collection的具体类型
  • 使用env.generateSequence()方法创建基于Sequence的DataStream --已经废弃了
  • 使用env.fromSequence()方法创建基于开始和结束的DataStream
package com.bigdata.source;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;public class _01YuDingYiSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 各种获取数据的SourceDataStreamSource<String> dataStreamSource = env.fromElements("hello world txt", "hello nihao kongniqiwa");dataStreamSource.print();// 演示一个错误的//DataStreamSource<Object> dataStreamSource2 = env.fromElements("hello", 1,3.0f);//dataStreamSource2.print();DataStreamSource<Tuple2<String, Integer>> elements = env.fromElements(Tuple2.of("张三", 18),Tuple2.of("lisi", 18),Tuple2.of("wangwu", 18));elements.print();// 有一个方法,可以直接将数组变为集合  复习一下数组和集合以及一些非常常见的APIString[] arr = {"hello","world"};System.out.println(arr.length);System.out.println(Arrays.toString(arr));List<String> list = Arrays.asList(arr);System.out.println(list);env.fromElements(Arrays.asList(arr),Arrays.asList(arr),Arrays.asList(arr)).print();// 第二种加载数据的方式// Collection 的子接口只有 Set 和 ListArrayList<String> list1 = new ArrayList<>();list1.add("python");list1.add("scala");list1.add("java");DataStreamSource<String> ds1 = env.fromCollection(list1);DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList(arr));// 第三种DataStreamSource<Long> ds3 = env.fromSequence(1, 100);ds3.print();// execute 下面的代码不运行,所以,这句话要放在最后。env.execute("获取预定义的Source");}
}

 本地文件

        File file = new File("datas/wc.txt");File file2 = new File("./");System.out.println(file.getAbsoluteFile());System.out.println(file2.getAbsoluteFile());DataStreamSource<String> ds1 = env.readTextFile("datas/wc.txt");ds1.print();// 还可以获取hdfs路径上的数据DataStreamSource<String> ds2 = env.readTextFile("hdfs://bigdata01:9820/home/a.txt");ds2.print();

Socket 

linux(socket命令)
下载:yum install -y nc   
nc -lk 8888   --向8888端口发送消息,这个命令先运行,如果先运行java程序,会报错!
如果端口被占用就换一个端口
本地(socket命令)
nc -lp 8888
或者 nc -l -p 8888
如果不是在exe端打开的用 -l -p 8888
java代码 
DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
Word Count案例 
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class SourceDemo02_Socket {public static void main(String[] args) throws Exception {//TODO 1.env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//TODO 2.source-加载数据DataStream<String> socketDS = env.socketTextStream("bigdata01", 8889);//TODO 3.transformation-数据转换处理//3.1对每一行数据进行分割并压扁DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(word);}}});//3.2每个单词记为<单词,1>DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}});//3.3分组KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});//3.4聚合SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);//TODO 4.sink-数据输出result.print();//TODO 5.execute-执行env.execute();}
}

 JDBC

 Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink", "root", "root");PreparedStatement preparedStatement = connection.prepareStatement("select monitor_id,speed_limit from t_monitor_info group by monitor_id, speed_limit");ResultSet resultSet = preparedStatement.executeQuery();ArrayList<Tuple2<String,Double>> arr = new ArrayList<>();while (resultSet.next()){String monitor_id = resultSet.getString("monitor_id");Double speed_limit = resultSet.getDouble("speed_limit");Tuple2 tuple2 = new Tuple2(monitor_id, speed_limit);arr.add(tuple2);}

Kafka

添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version>
</dependency>
package com.bigdata.day02;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.setProperty("bootstrap.servers", "bigdata01:9092");properties.setProperty("group.id", "g1");FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic1",new SimpleStringSchema(),properties);DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);// 以下代码跟flink消费kakfa数据没关系,仅仅是将需求搞的复杂一点而已// 返回true 的数据就保留下来,返回false 直接丢弃dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String word) throws Exception {// 查看单词中是否包含success 字样return word.contains("success");}}).print();env.execute();}
}

自定义source

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;
import java.util.UUID;/*** 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)* 要求:* - 随机生成订单ID(UUID)* - 随机生成用户ID(0-2)* - 随机生成订单金额(0-100)* - 时间戳为当前系统时间*/@Data  // set get toString
@AllArgsConstructor
@NoArgsConstructor
class OrderInfo{private String orderId;private int uid;private int money;private long timeStamp;
}
// class MySource extends RichSourceFunction<OrderInfo> {
//class MySource extends RichParallelSourceFunction<OrderInfo> {
class MySource implements SourceFunction<OrderInfo> {boolean flag = true;@Overridepublic void run(SourceContext ctx) throws Exception {// 源源不断的产生数据Random random = new Random();while(flag){OrderInfo orderInfo = new OrderInfo();orderInfo.setOrderId(UUID.randomUUID().toString());orderInfo.setUid(random.nextInt(3));orderInfo.setMoney(random.nextInt(101));orderInfo.setTimeStamp(System.currentTimeMillis());ctx.collect(orderInfo);Thread.sleep(1000);// 间隔1s}}// source 停止之前需要干点啥@Overridepublic void cancel() {flag = false;}
}
public class CustomSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 将自定义的数据源放入到env中DataStreamSource dataStreamSource = env.addSource(new MySource())/*.setParallelism(1)*/;System.out.println(dataStreamSource.getParallelism());dataStreamSource.print();env.execute();}}

 如果代码换成ParallelSourceFunction,每次生成12个数据,假如是12核数的话(有多少核就生成多少个数据)。

 Rich 类型的Source可以比非Rich的多出有:
    - open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)
    - close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦
    - getRuntimeContext 方法可以获得当前的Runtime对象(底层API) 

rich模板
/*** 自定义一个RichParallelSourceFunction的实现*/
public class CustomerRichSourceWithParallelDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);mySource.print();env.execute();}/*Rich 类型的Source可以比非Rich的多出有:- open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)- close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦- getRuntime方法可以获得当前的Runtime对象(底层API)*/public static class MySource extends RichParallelSourceFunction<String> {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("open......");}@Overridepublic void close() throws Exception {super.close();System.out.println("close......");}@Overridepublic void run(SourceContext<String> ctx) throws Exception {ctx.collect(UUID.randomUUID().toString());}@Overridepublic void cancel() {}}
}

Transformation(转换因子):

map算子(一变多)

package com.bigdata.day02;import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.text.SimpleDateFormat;
import java.util.Date;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2024-05-13 11:40:37**/
@Data
@AllArgsConstructor
class LogBean{private String ip;      // 访问ipprivate int userId;     // 用户idprivate long timestamp; // 访问时间戳private String method;  // 访问方法private String path;    // 访问路径
}
public class Demo04 {// 将数据转换为javaBeanpublic static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> streamSource = env.readTextFile("datas/a.log");//3. transformation-数据处理转换SingleOutputStreamOperator<LogBean> map = streamSource.map(new MapFunction<String, LogBean>() {@Overridepublic LogBean map(String line) throws Exception {String[] arr = line.split("\\s+");//时间戳转换  17/05/2015:10:06:53String time = arr[2];SimpleDateFormat format = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss");Date date = format.parse(time);long timeStamp = date.getTime();return new LogBean(arr[0],Integer.parseInt(arr[1]),timeStamp,arr[3],arr[4]);}});//4. sink-数据输出map.print();//5. execute-执行env.execute();}
}

FlatMap算子(类似于炸裂函数)

数据

张三,苹果手机,联想电脑,华为平板
李四,华为手机,苹果电脑,小米平板

package com.bigdata.day03;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @基本功能:* @program:FlinkDemo* @author: 闫哥* @create:2023-11-21 09:51:59**/
public class FlatMapDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\flatmap.log");//3. transformation-数据处理转换DataStream<String> flatMapStream = fileStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {//张三,苹果手机,联想电脑,华为平板String[] arr = line.split(",");String name = arr[0];for (int i = 1; i < arr.length; i++) {String goods = arr[i];collector.collect(name+"有"+goods);}}});//4. sink-数据输出flatMapStream.print();//5. execute-执行env.execute();}
}

 Filter(过滤)

package com.bigdata.day03;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Date;/*** @基本功能:* @program:FlinkDemo* @author: zxx* @create:2023-11-21 09:10:30**/public class FilterDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class LogBean{String ip;      // 访问ipint userId;     // 用户idlong timestamp; // 访问时间戳String method;  // 访问方法String path;    // 访问路径}public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStream<String> fileStream = env.readTextFile("F:\\BD230801\\FlinkDemo\\datas\\a.log");//3. transformation-数据处理转换// 读取第一题中 a.log文件中的访问日志数据,过滤出来以下访问IP是83.149.9.216的访问日志DataStream<String> filterStream = fileStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String line) throws Exception {String ip = line.split(" ")[0];return  ip.equals("83.149.9.216");}});//4. sink-数据输出filterStream.print();//5. execute-执行env.execute();}
}

 KeyBy(分组)

元组

//用字段位置
wordAndOne.keyBy(0, 1);//用KeySelector
wordAndOne.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> getKey(Tuple2<String, Integer> value) throws Exception {return Tuple2.of(value.f0, value.f1);}
});

POJO (Plain Old Java Object):普通Java对象

public class PeopleCount {private String province;private String city;private Integer counts;public PeopleCount() {}//省略其他代码。。。
}

多个字段keyBy

source.keyBy(new KeySelector<PeopleCount, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> getKey(PeopleCount value) throws Exception {return Tuple2.of(value.getProvince(), value.getCity());}
});

 Reduce --sum的底层是reduce(聚合)

 //  [ ("10.0.0.1",1),("10.0.0.1",1),("10.0.0.1",1) ]keyByStream.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {// t1 => ("10.0.0.1",10)// t2 => ("10.0.0.1",1)return Tuple2.of(t1.f0,  t1.f1 + t2.f1);}}).print();

union和connect-合并和连接 

Union

union可以合并多个同类型的流

将多个DataStream 合并成一个DataStream

connect

connect可以连接2个不同类型的流(最后需要处理后再输出)

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化【一国两制】,两个流相互独立, 作为对比Union后是真的变成一个流了。

和union类似,但是connect只能连接两个流,两个流之间的数据类型可以同,对两个流的数据可以分别应用不同的处理逻辑.

package com.bigdata.day03;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;/*** @基本功能:* @program:FlinkDemo* @author: zxx* @create:2023-11-21 11:40:12**/
public class UnionConnectDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<String> stream1 = env.fromElements("hello", "nihao", "吃甘蔗的人");DataStreamSource<String> stream2 = env.fromElements("hello", "kong ni qi wa", "看电子书的人");DataStream<String> unionStream = stream1.union(stream2);unionStream.print();DataStream<Long> stream3 = env.fromSequence(1, 10);// stream1.union(stream3); 报错//3. transformation-数据处理转换ConnectedStreams<String, Long> connectStream = stream1.connect(stream3);// 此时你想使用这个流,需要各自重新处理// 处理完之后的数据类型必须相同DataStream<String> mapStream = connectStream.map(new CoMapFunction<String, Long, String>() {// string 类型的数据@Overridepublic String map1(String value) throws Exception {return value;}// 这个处理long 类型的数据@Overridepublic String map2(Long value) throws Exception {return Long.toString(value);}});//4. sink-数据输出mapStream.print();//5. execute-执行env.execute();}
}
package com.bigdata.transforma;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;/*** @基本功能:* @program:FlinkDemo* @author: zxx* @create:2024-11-22 10:50:13**/
public class _08_两个流join {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);DataStreamSource<String> ds1 = env.fromElements("bigdata", "spark", "flink");DataStreamSource<String> ds2 = env.fromElements("python", "scala", "java");DataStream<String> ds3 = ds1.union(ds2);ds3.print();// 接着演示 connectDataStreamSource<Long> ds4 = env.fromSequence(1, 10);ConnectedStreams<String, Long> ds5 = ds1.connect(ds4);ds5.process(new CoProcessFunction<String, Long, String>() {@Overridepublic void processElement1(String value, CoProcessFunction<String, Long, String>.Context ctx, Collector<String> out) throws Exception {System.out.println("String流:"+value);out.collect(value);}@Overridepublic void processElement2(Long value, CoProcessFunction<String, Long, String>.Context ctx, Collector<String> out) throws Exception {System.out.println("Long流:"+value);out.collect(String.valueOf(value));}}).print("合并后的打印:");//2. source-加载数据//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}

Side Outputs侧道输出(侧输出流) --可以分流

举例说明:对流中的数据按照奇数和偶数进行分流,并获取分流后的数据

package com.bigdata.day02;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** @基本功能:* @program:FlinkDemo* @author: zxx* @create:2024-05-13 16:19:56**/
public class Demo11 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 侧道输出流DataStreamSource<Long> streamSource = env.fromSequence(0, 100);// 定义两个标签OutputTag<Long> tag_even = new OutputTag<Long>("偶数", TypeInformation.of(Long.class));OutputTag<Long> tag_odd = new OutputTag<Long>("奇数", TypeInformation.of(Long.class));//2. source-加载数据SingleOutputStreamOperator<Long> process = streamSource.process(new ProcessFunction<Long, Long>() {@Overridepublic void processElement(Long value, ProcessFunction<Long, Long>.Context ctx, Collector<Long> out) throws Exception {// value 代表每一个数据if (value % 2 == 0) {ctx.output(tag_even, value);} else {ctx.output(tag_odd, value);}}});// 从数据集中获取奇数的所有数据DataStream<Long> sideOutput = process.getSideOutput(tag_odd);sideOutput.print("奇数:");// 获取所有偶数数据DataStream<Long> sideOutput2 = process.getSideOutput(tag_even);sideOutput2.print("偶数:");//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
}

sink:

print 打印

writerAsText 以文本格式输出

dataStreamSource.writeAsText("F:\\BD230801\\FlinkDemo\\datas\\result", FileSystem.WriteMode.OVERWRITE);

writerAsText 以文本格式输出

 DataStreamSource<Tuple2<String, Integer>> streamSource = env.fromElements(Tuple2.of("篮球", 1),Tuple2.of("篮球", 2),Tuple2.of("篮球", 3),Tuple2.of("足球", 3),Tuple2.of("足球", 2),Tuple2.of("足球", 3));// writeAsCsv 只能保存 tuple类型的DataStream流,因为如果不是多列的话,没必要使用什么分隔符streamSource.writeAsCsv("datas/csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

输出到MySQL

 JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.cj.jdbc.Driver").withUrl("jdbc:mysql://localhost:3306/zuoye").withUsername("root").withPassword("123456").build();studentDataStreamSource.addSink(JdbcSink.sink("insert into stu values(?,?,?)",new JdbcStatementBuilder<Student>() {@Overridepublic void accept(PreparedStatement preparedStatement, Student student) throws SQLException {preparedStatement.setInt(1,student.getId());preparedStatement.setString(2,student.getName());preparedStatement.setInt(3,student.getAge());}},jdbcConnectionOptions));

 输出到kafka

FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<String>("topic2",new SimpleStringSchema(),properties);filterStream.addSink(kafkaProducer);

自定义Sink--模拟jdbcSink的实现 

package com.bigdata.day03;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @基本功能:* @program:FlinkDemo* @author: zxx* @create:2023-11-21 16:08:04**/public class CustomJdbcSinkDemo {@Data@AllArgsConstructor@NoArgsConstructorstatic class Student{private int id;private String name;private int age;}static class MyJdbcSink  extends RichSinkFunction<Student> {Connection conn =null;PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {// 这个里面编写连接数据库的代码Class.forName("com.mysql.jdbc.Driver");conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test1", "root", "123456");ps = conn.prepareStatement("INSERT INTO `student` (`id`, `name`, `age`) VALUES (null, ?, ?)");}@Overridepublic void close() throws Exception {// 关闭数据库的代码ps.close();conn.close();}@Overridepublic void invoke(Student student, Context context) throws Exception {// 将数据插入到数据库中ps.setString(1,student.getName());ps.setInt(2,student.getAge());ps.execute();}}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Student> studentStream = env.fromElements(new Student(1, "马斯克", 51));studentStream.addSink(new MyJdbcSink());env.execute();}
}

相关文章:

Flink中普通API的使用

本篇文章从Source、Transformation&#xff08;转换因子&#xff09;、sink这三个地方进行讲解 Source&#xff1a; 创建DataStream本地文件SocketKafka Transformation&#xff08;转换因子&#xff09;&#xff1a; mapFlatMapFilterKeyByReduceUnion和connectSide Outpu…...

【人工智能】从零构建一个文本分类器:用Python和TF-IDF实现

《Python OpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门! 文本分类是自然语言处理(NLP)领域的基础任务之一,广泛应用于垃圾邮件检测、情感分析和新闻分类等场景。本篇文章从零开始,通过详细讲解 TF-IDF 特征提取方法,以及如何将其与机器学习算法结合,实现一…...

原型模式

功能&#xff1a;复制一个运行时的对象&#xff0c;包括对象各个成员当前的值。并且能够通过父类的指针来克隆出子类的对象 主要解决&#xff1a;在运行期建立原型 优点&#xff1a;性能提高、避免了构造函数的约束 步骤&#xff1a; 1、定义抽象原型&#xff0c;声明纯虚接…...

基于FPGA的FM调制(载波频率、频偏、峰值、DAC输出)-带仿真文件-上板验证正确

基于FPGA的FM调制-带仿真文件-上板验证正确 前言一、FM调制储备知识载波频率频偏峰值个人理解 二、代码分析1.模块分析2.波形分析 总结 前言 FM、AM等调制是学习FPGA信号处理一个比较好的小项目&#xff0c;通过学习FM调制过程熟悉信号处理的一个简单流程&#xff0c;进而熟悉…...

open-instruct - 训练开放式指令跟随语言模型

文章目录 关于 open-instruct设置训练微调偏好调整RLVR 污染检查开发中仓库结构 致谢 关于 open-instruct github : https://github.com/allenai/open-instruct 这个仓库是我们对在公共数据集上对流行的预训练语言模型进行指令微调的开放努力。我们发布这个仓库&#xff0c;并…...

Java爬虫:获取1688商品详情接口的技术实现与代码示例

引言 1688作为中国领先的B2B电子商务平台&#xff0c;拥有海量的商品信息。对于商家和市场研究人员来说&#xff0c;能够从1688获取商品详情信息&#xff0c;对于市场分析、竞品研究等具有重要价值。本文将介绍如何使用Java编写爬虫&#xff0c;以合法、高效的方式获取1688商品…...

详解Rust泛型用法

文章目录 基础语法泛型与结构体泛型约束泛型与生命周期泛型与枚举泛型和Vec静态泛型(const 泛型)类型别名默认类型参数Sized Trait与泛型常量函数与泛型泛型的性能 Rust是一种系统编程语言&#xff0c;它拥有强大的泛型支持&#xff0c;泛型是Rust中用于实现代码复用和类型安全…...

Spring Boot拦截器(Interceptor)详解

拦截器Interceptor 拦截器我们主要分为三个方面进行讲解&#xff1a; 介绍下什么是拦截器&#xff0c;并通过快速入门程序上手拦截器拦截器的使用细节通过拦截器Interceptor完成登录校验功能 1. 快速入门 什么是拦截器&#xff1f; 是一种动态拦截方法调用的机制&#xff…...

STM32-- 看门狗--介绍、使用场景、失效场景

STM32 中的看门狗&#xff08;Watchdog Timer&#xff0c;简称 WDG&#xff09;有两种主要类型&#xff1a;独立看门狗&#xff08;IWDG&#xff09; 和 窗口看门狗&#xff08;WWDG&#xff09;。它们的喂狗机制各有特点&#xff0c;主要区别如下&#xff1a; 1. 独立看门狗&a…...

Perplexica - AI 驱动的搜索引擎

更多AI开源软件&#xff1a; AI开源 - 小众AIhttps://www.aiinn.cn/sources Perplexica 是一个开源的 AI 驱动的搜索工具或 AI 驱动的搜索引擎&#xff0c;可以深入互联网寻找答案。受 Perplexity AI 的启发&#xff0c;它是一个开源选项&#xff0c;不仅可以搜索网络&#xf…...

Linux笔记--基于OCRmyPDF将扫描件PDF转换为可搜索的PDF

1--官方仓库 https://github.com/ocrmypdf/OCRmyPDF 2--基本步骤 # 安装ocrmypdf库 sudo apt install ocrmypdf# 安装简体中文库 sudo apt-get install tesseract-ocr-chi-sim# 转换 # -l 表示使用的语言 # --force-ocr 防止出现以下错误&#xff1a;ERROR - PriorOcrFoundE…...

MySQL聚合查询分组查询联合查询

#对应代码练习 -- 创建考试成绩表 DROP TABLE IF EXISTS exam; CREATE TABLE exam ( id bigint, name VARCHAR(20), chinese DECIMAL(3,1), math DECIMAL(3,1), english DECIMAL(3,1) ); -- 插入测试数据 INSERT INTO exam (id,name, chinese, math, engli…...

ffmpeg 预设的值 加速

centos 安装ffmpeg 编译安装 官网获取最新的linux ffmpeg 代码 https://ffmpeg.org//releases/ mkdir -p /data/app/ffmpeg cd /data/app/ffmpeg wget http://www.ffmpeg.org/releases/ffmpeg-7.1.tar.gz tar -zxvf ffmpeg-7.1.tar.gz#安装所需的编译环境 yum install -y \…...

Spring Boot 与 Spring Cloud Alibaba 版本兼容对照

版本选择要点 Spring Boot 3.x 与 Spring Cloud Alibaba 2022.0.x Spring Boot 3.x 基于 Jakarta EE&#xff0c;javax.* 更换为 jakarta.*。 需要使用 Spring Cloud 2022.0.x 和 Spring Cloud Alibaba 2022.0.x。 Alibaba 2022.0.x 对 Spring Boot 3.x 的支持在其发行说明中…...

解决爬虫ConnectionResetError出现的问题

提问 使用python进行网络爬虫出现ConnectionResetError如何解决&#xff1f; 解答 遇到ConnectionResetError错误时&#xff0c;通常是因为远程服务器端主动重置了连接。常见原因包括请求频率过高、网络问题或触发了防爬虫机制。为解决该问题&#xff0c;可以采取以下方法&a…...

Rust学习笔记_03——元组

Rust学习笔记_01——基础 Rust学习笔记_02——数组 Rust学习笔记_03——元组 文章目录 Rust学习笔记_03——元组元组1. 定义元祖2. 访问元组中的元素3. 元组的解构4. 元组不可遍历和切片5. 元组作为函数返回值6. 单元元组7. 代码演示 元组 在Rust编程语言中&#xff0c;元组&a…...

win10安装MySQL8.0.40,含踩坑记录

这里写自定义目录标题 win10安装MySQL8下载安装包配置环境变量初始化MySQL创建data文件夹初始化配置文件安装MySQL服务初始化创建root用户启动服务设置root用户密码登录验证 踩坑&#xff1a;MySQL 服务正在启动 ...MySQL 服务无法启动。服务没有报告任何错误。请键入 NET HELP…...

python+django自动化平台(一键执行sql) 前端vue-element展示

一、开发环境搭建和配置 pip install mysql-connector-pythonpip install PyMySQL二、django模块目录 dbOperations ├── __init__.py ├── __pycache__ │ ├── __init__.cpython-313.pyc │ ├── admin.cpython-313.pyc │ ├── apps.cpython-313.pyc │ …...

【计算机网络】核心部分复习

目录 交换机 v.s. 路由器OSI七层更实用的TCP/IP四层TCPUDP 交换机 v.s. 路由器 交换机-MAC地址 链接设备和设备 路由器- IP地址 链接局域网和局域网 OSI七层 物理层&#xff1a;传输设备。原始电信号比特流。数据链路层&#xff1a;代表是交换机。物理地址寻址&#xff0c;交…...

urllib3只支持OpenSSL1.1.1

1 现象 urllib3 v2.0 only supports OpenSSL 1.1.1, currently the ssl module is compiled with OpenSSL 1.1.0j 20 Nov 2018.2 解决方法 降低urllib3的版本。 从pycharm中&#xff0c;先卸载原有的urllib3版本。 菜单“File|Settings|Project:python|Project Interprete…...

简单web项目自定义部署Dockerfile

本意就是弄清楚如何做web自定义项目的镜像。 基础镜像是java:8u261-jdk&#xff0c;其中java路径为/opt/java webdemo1.0.0.1-SNAPSHOT.jar文件里面已经包含了lib文件。 可以设置PATH也可以不设置&#xff0c;但是建议设置JAVA_HOME FROM swr.cn-north-4.myhuaweicloud.com…...

apache实现绑定多个虚拟主机访问服务

1个网卡绑定多个ip的命令 ip address add 192.168.45.140/24 dev ens33 ip address add 192.168.45.141/24 dev ens33 在linux服务器上&#xff0c;添加多个站点资料&#xff0c;递归创建三个文件目录 分别在三个文件夹下&#xff0c;建立测试页面 修改apache的配置文件http.…...

svn 崩溃、 cleanup失败 怎么办

在使用svn的过程中&#xff0c;可能出现整个svn崩溃&#xff0c; 例如cleanup 失败的情况&#xff0c;类似于 这时可以下载本贴资源文件并解压。 或者直接访问网站 SQLite Download Page 进行下载 解压后得到 sqlite3.exe 放到发生问题的svn根目录的.svn路径下 右键呼出pow…...

深度学习—BP算法梯度下降及优化方法Day37

梯度下降 1.公式 w i j n e w w i j o l d − α ∂ E ∂ w i j w_{ij}^{new} w_{ij}^{old} - \alpha \frac{\partial E}{\partial w_{ij}} wijnew​wijold​−α∂wij​∂E​ α为学习率 当α过小时&#xff0c;训练时间过久增加算力成本&#xff0c;α过大则容易造成越过最…...

python常见问题-pycharm无法导入三方库

1.运行环境 python版本&#xff1a;Python 3.9.6 需导入的greenlet版本&#xff1a;greenlet 3.1.1 2.当前的问题 由于需要使用到greenlet三方库&#xff0c;所以进行了导入&#xff0c;以下是我个人导入时的全过程 ①首先尝试了第1种导入方式&#xff1a;使用pycharm进行…...

虚幻引擎---目录结构篇

一、引擎目录 成功安装引擎后&#xff0c;在安装路径下的Epic Games目录中可以找到与引擎版本对应的文件夹&#xff0c;其中的内容如下&#xff1a; Engine&#xff1a;包含构成引擎的所有源代码、内容等。 Binaries&#xff1a;包含可执行文件或编译期间创建的其他文件。Bui…...

OpenCV相机标定与3D重建(6)将3D物体点投影到2D图像平面上函数projectPoints()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 cv::fisheye::projectPoints 是 OpenCV 库中用于鱼眼镜头模型的函数&#xff0c;它将3D物体点投影到2D图像平面上。这个函数对于模拟或者理解鱼眼…...

QINQ技术

定义 QINQ即802.1q in 802.1q&#xff0c;因为IEEE802.1Q中定义的Vlan Tag域只有12个比特&#xff0c;仅能表示4096个Vlan&#xff0c;随网络发展被用尽&#xff0c;于是在原有带vlan的数据上再携带一层vlan标签用于扩展vlan数目。一般来说外层vlan是公网&#xff0c;内层是私…...

COMSOL工作站:配置指南与性能优化

COMSOL Multiphysics 求解的问题类型相当广泛&#xff0c;提供了仿真单一物理场以及灵活耦合多个物理场的功能&#xff0c;供工程师和科研人员来精确分析各个工程领域的设备、工艺和流程。 软件内置的#模型开发器#包含完整的建模工作流程&#xff0c;可实现从几何建模、材料参数…...

一键生成唯美动漫图:ComfyUI-tPonynai详细搭建教程

tPonynai 是在 C 站上开源的动漫风格扩散模型&#xff0c;与其他基础大模型一样&#xff0c;只需要输入适当的正面和负面提示词就能够实现动漫图片的生成。截至目前已经有 12.9k 的下载量&#xff0c;生成效果也非常不错。本文将介绍ComfyUI-tPonynai在算家云搭建以及本地部署的…...

Python 3 教程第22篇(数据结构)

Python3 数据结构 本章节我们主要结合前面所学的知识点来介绍Python数据结构。 列表 Python中列表是可变的&#xff0c;这是它区别于字符串和元组的最重要的特点&#xff0c;一句话概括即&#xff1a;列表可以修改&#xff0c;而字符串和元组不能。 以下是 Python 中列表的方…...

Oracle RAC的DB未随集群自动启动

RDBMS 19.25 参考文档&#xff1a; Oracle Database 12c (12.1 and 12.2) How does one modify the database resource parameter AUTO_START How to Disable Auto Start of ASM From Cluster Resource (Doc ID 2016160.1) 实际操作&#xff1a; [rootnode19c01 ~]# crsc…...

深度学习-49-AI应用实战之基于HyperLPR的车牌识别

文章目录 1 车牌识别系统1.1 识别原理1.1.1 车牌定位1.1.2 字符识别2 实例应用2.1 安装hyperlpr32.2 识别结果2.3 可视化显示2.4 结合streamlit3 附录3.1 PIL.Image转换成OpenCV格式3.2 OpenCV转换成PIL.Image格式3.3 st.image嵌入图像内容3.4 参考附录1 车牌识别系统 车牌识别…...

Chrome插件(扩展)开发中对表单元素赋值操作

最近在写chrome插件时候&#xff0c;需要对vue开发登录界面中的表单进行赋值&#xff0c;最开始简单的以为&#xff0c;找到对应的元素&#xff0c;直接value"XXXX" document.querySelector(input).value"admin" 结果一运行&#xff0c;发现输入框的值确…...

详解MVC架构与三层架构以及DO、VO、DTO、BO、PO | SpringBoot基础概念

&#x1f64b;大家好&#xff01;我是毛毛张! &#x1f308;个人首页&#xff1a; 神马都会亿点点的毛毛张 今天毛毛张分享的是SpeingBoot框架学习中的一些基础概念性的东西&#xff1a;MVC结构、三层架构、POJO、Entity、PO、VO、DO、BO、DTO、DAO 文章目录 1.架构1.1 基本…...

QML学习 —— 30、图片翻转效果(附源码)

效果 说明 Flipable是一种可以在正面和背面之间明显“翻转”的物品,就像卡片一样。它可以与“旋转”、“状态”和“过渡”类型一起使用,以产生翻转效果。正面和背面属性用于固定分别显示在可翻转物品正面和背面的物品。 代码 import QtQuick 2.12 import QtQuick.Window 2.1…...

rk3588交叉编译opencv

基于forlinx开发板Linux5.10.66Qt5.15.2的环境 交叉编译工具链&#xff1a;aarch64-buildroot-linux-gnu-gcc、aarch64-buildroot-linux-gnu-g opencv版本&#xff1a;3.4.15 创建toolchain.cmake # 工具链路径 set(CMAKE_C_COMPILER /home/forlinx/aarch64-buildroot-linux…...

Kubernetes 之 Ingress 和 Service 的异同点

1. 概念与作用 1.1 Ingress Ingress 是什么&#xff1f; Ingress主要负责七层负载&#xff0c;将外部 HTTP/HTTPS 请求路由到集群内部的服务。它可以基于域名和路径定义规则&#xff0c;从而将外部请求分配到不同的服务。 ingress作用 提供 基于 HTTP/HTTPS 的路由。 支持 …...

Java 反射(Reflection)

Java 反射&#xff08;Reflection&#xff09; Java 反射&#xff08;Reflection&#xff09;是一个强大的特性&#xff0c;它允许程序在运行时查询、访问和修改类、接口、字段和方法的信息。反射提供了一种动态地操作类的能力&#xff0c;这在很多框架和库中被广泛使用&#…...

C语言刷题笔记3(7)

7.1 数组处理斐波那契数列 题目描述:用数组来处理Fibonacci数列并输出。 输入:一个不超过40且大于2的整数n&#xff0c;表示需要处理并输出的Fibonacci数个数。 输出:输出前n个Fibonacci数&#xff0c;每行输出5个值&#xff0c;按每12位向右对齐的方式输出。请注意不要在第…...

【新人系列】Python 入门(十四):文件操作

✍ 个人博客&#xff1a;https://blog.csdn.net/Newin2020?typeblog &#x1f4dd; 专栏地址&#xff1a;https://blog.csdn.net/newin2020/category_12801353.html &#x1f4e3; 专栏定位&#xff1a;为 0 基础刚入门 Python 的小伙伴提供详细的讲解&#xff0c;也欢迎大佬们…...

学成在线day06

上传视屏 断点续传 通常视频文件都比较大&#xff0c;所以对于媒资系统上传文件的需求要满足大文件的上传要求。http协议本身对上传文件大小没有限制&#xff0c;但是客户的网络环境质量、电脑硬件环境等参差不齐&#xff0c;如果一个大文件快上传完了网断了没有上传完成&…...

详细介绍HTTP与RPC:为什么有了HTTP,还需要RPC?

目录 一、HTTP 二、RPC 介绍 工作原理 核心功能 如何服务寻址 如何进行序列化和反序列化 如何网络传输 基于 TCP 协议的 RPC 调用 基于 HTTP 协议的 RPC 调用 实现方式 优点和缺点 使用场景 常见框架 示例 三、问题 问题一&#xff1a;是先有HTTP还是先有RPC&…...

ffmpeg 各版本号对应表格

想看看ffmpeg各个版本对应表&#xff0c; #! /bin/bashFF_PATH$1 CURRENTpwd RESULT"$CURRENT/test_version.txt"cd $FF_PATHif [ -f $RESULT ]; thenrm $RESULT fifor i in git branch -a | grep remotes/origin/release/ | grep -v HEAD | grep -v master; dogit…...

cesium 3Dtiles变量

原本有一个变亮的属性luminanceAtZenith&#xff0c;但是新版本的cesium没有这个属性了。于是 let lightColor 3.0result._customShader new this.ffCesium.Cesium.CustomShader({fragmentShaderText:void fragmentMain(FragmentInput fsInput, inout czm_modelMaterial mate…...

如何分析Windows防火墙日志

Windows防火墙&#xff0c;也被称为Windows Defender Firewall&#xff0c;是一种内置的安全功能&#xff0c;可以主动监控和分析运行Windows操作系统的计算机上通过Windows防火墙的网络流量&#xff0c;主要目的是作为计算机和互联网或其他网络之间的屏障&#xff0c;使管理员…...

Linux下 history 命令输出时间

在 Linux 中&#xff0c;查看每条命令的执行时间。 文章目录 [toc]**1. 配置 Shell 以记录命令执行时间****1.1 Bash Shell****步骤&#xff1a;****注意事项&#xff1a;** **1.2 Zsh Shell****步骤&#xff1a;****注意事项&#xff1a;** 1. 配置 Shell 以记录命令执行时间 …...

ChatGPT/AI辅助网络安全运营之-数据解压缩

在网络安全的世界中&#xff0c;经常会遇到各种压缩的数据&#xff0c;比如zip压缩&#xff0c;比如bzip2压缩&#xff0c;gzip压缩&#xff0c;xz压缩&#xff0c;7z压缩等。网络安全运营中需要对这些不同的压缩数据进行解压缩&#xff0c;解读其本意&#xff0c;本文将探索一…...

导入 OpenCV for Android 的技巧

下载了 OpenCV for Android Sdk 以后&#xff0c;一头雾水&#xff0c;不知道从哪里下手&#xff0c;既不是jar、也不是aar&#xff0c;没关系&#xff0c;简单几步即可使用 OpenCV。 1、使用 Android Studio 打开 samples &#xff08;示例&#xff09;项目 2、同步项目&…...

云原生时代的轻量级反向代理Traefik

Traefik 是一个用于路由和管理网络流量的反向代理&#xff0c;同时也是一个支持多种协议&#xff08;HTTP、HTTPS、TCP、UDP&#xff09;的负载均衡器。它通过自动服务发现和动态配置&#xff0c;帮助开发者和运维团队轻松管理复杂的应用架构。 Traefik 的主要特点如下&#x…...