Flink架构概览,Flink DataStream API 的使用,FlinkCDC的使用
一、Flink与其他组件的协同
Flink 是一个分布式、高性能、始终可用、准确一次(Exactly-Once)语义的流处理引擎,广泛应用于大数据实时处理场景中。它与 Hadoop 生态系统中的组件可以深度集成,形成完整的大数据处理链路。下面我们从 Flink 的 核心架构 出发,结合与 Hadoop 组件协同方式,详细剖析 Flink 的作用。
1. Flink 核心架构详解
1)架构组件图概览
+-------------------------+
| Client |
+-------------------------+|v
+-------------------------+
| JobManager (JM) | <-- Master 负责调度
+-------------------------+|v
+-------------------------+
| TaskManagers (TM) | <-- Worker 执行算子任务
+-------------------------+|v
+-------------------------+
| Slot | <-- 执行资源单位
+-------------------------+
2)核心组件职责
组件 | 描述 |
---|---|
Client | 提交作业到 Flink 集群,触发作业执行。 |
JobManager (JM) | 管理作业生命周期,负责调度任务、故障恢复、协调检查点(Checkpoint)等。 |
TaskManager (TM) | 具体执行作业的物理任务(算子),负责数据交换、状态管理等。 |
Slot | TaskManager 内部的资源单位,用于任务部署。每个 TaskManager 有多个 Slot。 |
3)状态管理与容错
-
Checkpoint/Savepoint:可恢复一致性状态(Exactly Once)
-
State Backend:保存状态(如 RocksDB、FsStateBackend)
-
Recovery:通过重放 Checkpoint 恢复任务
2. Flink 与 Hadoop 各组件的协同关系
Flink 虽然是独立系统,但能与 Hadoop 生态的多个关键组件协同工作,构建完整的大数据平台。
1)与 HDFS(Hadoop Distributed File System)
协同方式 | 描述 |
---|---|
输入源 | Flink 可直接读取 HDFS 中的批量数据(如 ORC、Parquet、Text 等格式) |
状态后端 | Flink Checkpoint/Savepoint 可存储到 HDFS 上,保证高可用与容灾 |
输出目标 | Flink 作业可以将计算结果输出到 HDFS,作为后续离线处理的数据 |
fs.defaultFS: hdfs://namenode:8020
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints/
2)与 Hive
协同方式 | 描述 |
---|---|
读取表数据 | Flink 可通过 Hive Catalog 与 Hive 元数据打通,直接读取 Hive 表 |
写入表 | Flink SQL 可将流式数据写入 Hive(使用 INSERT INTO) |
统一元数据 | Flink + Hive Catalog 支持表结构共享,便于湖仓一体实践 |
CREATE CATALOG my_hive WITH ('type' = 'hive','hive-conf-dir' = '/etc/hive/conf'
);
3)与 Kafka(实时采集)
协同方式 | 描述 |
---|---|
实时数据源 | Flink 通过 Kafka Source 接收实时数据流(如日志、订单等) |
下游结果写入 | Flink 可将流式计算结果写入 Kafka(供下游消费) |
Exactly Once 语义 | Flink + Kafka + Checkpoint 可实现端到端的精确一次语义 |
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
consumer.setStartFromGroupOffsets();
consumer.setCommitOffsetsOnCheckpoints(true);
4)与 HBase(实时查询)
协同方式 | 描述 |
---|---|
维表关联 | Flink 可使用 HBase 作为维表进行流批 Join,实时补充维度数据 |
实时写入 | 计算结果可实时写入 HBase,支持下游查询系统使用(如用户画像等) |
tableEnv.executeSql("CREATE TABLE hbase_dim (...) WITH ('connector' = 'hbase-2.2', ...)");
5)与 YARN
协同方式 | 描述 |
---|---|
资源调度 | Flink 可部署在 YARN 上,利用 Hadoop 的资源调度管理能力 |
Session / Per-Job 模式 | 支持多租户资源隔离或每个作业独立资源隔离部署 |
flink run -m yarn-cluster -ynm my-flink-job myjob.jar
6)与 Zookeeper
协同方式 | 描述 |
---|---|
高可用 JobManager | 使用 Zookeeper 实现 JobManager 的 leader election |
Checkpoint HA 元数据存储 | 配合 HDFS 存储 Checkpoint 元数据路径 |
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.storageDir: hdfs://namenode:8020/flink/ha/
3. Flink 的作用总结
模块 | Flink 的角色 |
---|---|
实时数据处理 | 核心组件,进行低延迟、高吞吐流处理计算 |
数据清洗与 ETL | 提供强大 SQL / DataStream API 进行多源数据处理与聚合 |
实时指标计算 | 支持实时 KPI、UV/PV、订单流等分析 |
数据湖构建 | 可作为流式数据入湖的计算引擎(结合 Hudi/Iceberg) |
实时监控预警 | 搭配 Kafka + Prometheus,构建告警与监控系统 |
实时数仓建设 | 联合 Kafka + Hive + HDFS + HBase 构建流批一体数仓体系 |
4. Flink 架构在 Hadoop 平台的实际部署图
+-------------+| Flume/Nginx|+------+------+|Kafka集群|+-------------------+--------------------+| |+---v---+ +----v----+| Flink |--> 清洗 → 维表 Join → 计算 | Spark |+---+---+ +----+----+| |
+-------v---------+ +--------v--------+
| HBase/Redis | | HDFS / Hive |
+-----------------+ +-----------------+
二、Flink DataStream API的使用
现在以 Flink DataStream API 为核心,深入剖析一个真实生产场景的 从 Kafka 到 Kafka 的流式处理全流程,包括:
-
项目结构与依赖
-
数据模型与清洗
-
水位线与乱序处理
-
异步维表查询(HBase/MySQL/Redis)
-
窗口聚合逻辑
-
数据下发(Kafka Sink)
-
容错机制与 Checkpoint 配置
1. 项目结构与依赖
1)Maven 依赖(pom.xml
)
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.0.1-1.17</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hbase-2.2</artifactId><version>1.17.1</version></dependency>
</dependencies>
2. 数据模型定义
1)订单数据结构(OrderEvent)
public class OrderEvent {public String orderId;public String userId;public String productId;public double price;public int quantity;public long orderTime; // epoch millis
}
2) 商品维度(ProductInfo)
public class ProductInfo {public String productId;public String categoryId;public String productName;
}
3)聚合结果结构(OrderStat)
public class OrderStat {public String categoryId;public long windowStart;public long windowEnd;public double totalAmount;
}
3. Kafka Source + JSON 反序列化
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("kafka:9092").setTopics("order_events").setGroupId("flink-consumer").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<OrderEvent> orderStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "KafkaSource").map(json -> new ObjectMapper().readValue(json, OrderEvent.class)).returns(OrderEvent.class);
4. 水位线处理(乱序数据支持)
WatermarkStrategy<OrderEvent> watermarkStrategy = WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, ts) -> event.orderTime);DataStream<OrderEvent> orderStreamWithWM = orderStream.assignTimestampsAndWatermarks(watermarkStrategy);
5. 异步维表关联(以 HBase 为例)
使用 AsyncFunction
实现异步查询(支持 Redis/HBase/MySQL)
示例实现:AsyncProductEnrichmentFunction
public class AsyncProductEnrichmentFunction extends RichAsyncFunction<OrderEvent, Tuple2<OrderEvent, ProductInfo>> {private transient HBaseClient hBaseClient;@Overridepublic void open(Configuration parameters) throws Exception {hBaseClient = new HBaseClient("hbase.zookeeper.quorum");}@Overridepublic void asyncInvoke(OrderEvent input, ResultFuture<Tuple2<OrderEvent, ProductInfo>> resultFuture) {CompletableFuture.supplyAsync(() -> hBaseClient.queryProductInfo(input.productId)).thenAccept(productInfo -> resultFuture.complete(Collections.singletonList(Tuple2.of(input, productInfo))));}@Overridepublic void close() throws Exception {hBaseClient.close();}
}
应用异步函数
DataStream<Tuple2<OrderEvent, ProductInfo>> enrichedStream = AsyncDataStream.unorderedWait(orderStreamWithWM,new AsyncProductEnrichmentFunction(),5, TimeUnit.SECONDS, 100
);
6. 按类目 ID 滚动窗口聚合
DataStream<OrderStat> resultStream = enrichedStream.map(tuple -> new Tuple3<>(tuple.f1.categoryId, tuple.f0.orderTime, tuple.f0.price * tuple.f0.quantity)).returns(Types.TUPLE(Types.STRING, Types.LONG, Types.DOUBLE)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Long, Double>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((t, ts) -> t.f1)).keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(1))).aggregate(new AggregateFunction<Tuple3<String, Long, Double>, Double, OrderStat>() {private long windowStart, windowEnd;private String categoryId;public Double createAccumulator() { return 0.0; }public Double add(Tuple3<String, Long, Double> value, Double acc) {categoryId = value.f0;return acc + value.f2;}public OrderStat getResult(Double acc) {return new OrderStat(categoryId, windowStart, windowEnd, acc);}public Double merge(Double acc1, Double acc2) {return acc1 + acc2;}}, new ProcessWindowFunction<OrderStat, OrderStat, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<OrderStat> elements, Collector<OrderStat> out) {OrderStat stat = elements.iterator().next();stat.windowStart = context.window().getStart();stat.windowEnd = context.window().getEnd();out.collect(stat);}});
7. 写入 Kafka Sink
KafkaSink<OrderStat> kafkaSink = KafkaSink.<OrderStat>builder().setBootstrapServers("kafka:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("order_stats").setValueSerializationSchema(stat -> {ObjectMapper mapper = new ObjectMapper();return mapper.writeValueAsBytes(stat);}).build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).build();resultStream.sinkTo(kafkaSink);
8. 容错与 HA 配置(关键)
1)Checkpoint 配置
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode/flink/checkpoints"));
2)高可用配置(flink-conf.yaml)
high-availability: zookeeper
high-availability.zookeeper.quorum: zk1:2181,zk2:2181
state.checkpoints.dir: hdfs://namenode/flink/checkpoints
state.savepoints.dir: hdfs://namenode/flink/savepoints
9. 运行命令(on YARN)
flink run -m yarn-cluster -c com.company.OrderRealtimeJob your-job.jar
10. 监控与排障建议
工具 | 功能 |
---|---|
Flink Web UI | 监控 Task、Checkpoint、Watermark |
Prometheus | 指标采集 |
Grafana | 可视化 |
AlertManager | 告警配置 |
Savepoint | 容错恢复点 |
三、FlinkCDC实时采集数据入湖
解析Flink CDC(Change Data Capture)在大数据体系中的使用方法,并结合 Kafka、Hudi、Iceberg、Hive、HDFS 等大数据组件,提供一套 可落地、可执行、可扩展的完整集成方案。
1. Flink CDC 简介
Flink CDC 是 Apache Flink + Debezium 的组合,用于实时采集 MySQL/PostgreSQL 等数据库的变更数据(INSERT/UPDATE/DELETE),并以 流式方式传递到下游系统(Kafka、Hudi、Iceberg、HBase 等)。
2. 典型架构场景:Flink CDC + Hudi + Hive 实时数据湖方案
+-------------+ +---------------------+| MySQL/Postgres | || Source DB +--------> | Flink CDC Connector |+-------------+ | |+----------+----------+|| Row-level ChangeLogv+----------+----------+| Flink Job || (数据清洗/处理) |+----------+----------+|v+----------+----------+| Hudi Sink (Flink) |+----------+----------+|v+-------------+-------------+| Hive / Presto / Trino || 实时查询(支持 ACID) |+---------------------------+
3. 方案目标
-
实时采集 MySQL 数据(基于 Binlog)
-
支持变更(Insert/Update/Delete)语义
-
数据存入 Hudi 表(支持 MOR/COW 格式)
-
Hive/Presto 端可直接查询
4. 组件版本建议
组件 | 版本建议 |
---|---|
Flink | 1.17.x 或 1.18.x |
Flink CDC | 2.4.1 |
Debezium | 内嵌于 Flink CDC |
Hudi | 0.13.1+ |
Hive | 2.3.x / 3.1.x |
Hadoop/HDFS | 3.x |
5. 部署准备
1)安装 Kafka(可选)
用于做 CDC 中转(可选,支持 Flink 直接接 Hudi)
2)安装 Hive Metastore + Hadoop HDFS
用于管理 Hudi 表元数据和 HDFS 存储
3)准备 MySQL 源数据库
配置 binlog,设置 binlog_format = ROW
,并开启 server_id
、binlog_row_image = full
6. 关键配置代码与步骤
1)添加 Maven 依赖
<dependencies><!-- Flink CDC --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.1</version></dependency><!-- Hudi Sink --><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink-bundle_2.12</artifactId><version>0.13.1</version></dependency>
</dependencies>
2)Flink SQL 示例(CDC → Hudi)
-- 1. 源表:MySQL CDC 表
CREATE TABLE ods_orders (id STRING,user_id STRING,amount DOUBLE,ts TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = 'mysql-host','port' = '3306','username' = 'flink','password' = 'flink123','database-name' = 'srm','table-name' = 'orders','scan.startup.mode' = 'initial'
);-- 2. 目标表:Hudi 表(MOR 模式)
CREATE TABLE dwd_orders (id STRING PRIMARY KEY NOT ENFORCED,user_id STRING,amount DOUBLE,ts TIMESTAMP(3)
) PARTITIONED BY (`user_id`)
WITH ('connector' = 'hudi','path' = 'hdfs://namenode/data/hudi/dwd_orders','table.type' = 'MERGE_ON_READ','hoodie.datasource.write.recordkey.field' = 'id','write.tasks' = '4','compaction.async.enabled' = 'true','hive_sync.enabled' = 'true','hive_sync.mode' = 'hms','hive_sync.metastore.uris' = 'thrift://hive-metastore:9083','hive_sync.db' = 'ods','hive_sync.table' = 'dwd_orders'
);-- 3. 实时写入
INSERT INTO dwd_orders
SELECT * FROM ods_orders;
7. 关键功能说明
功能 | 配置字段 | 说明 |
---|---|---|
主键变更支持 | PRIMARY KEY ... NOT ENFORCED | 支持 upsert |
增量采集模式 | scan.startup.mode = initial | 首次全量 + 后续增量 |
实时 compaction | compaction.async.enabled = true | MOR 表性能保障 |
Hive 数据同步 | hive_sync.enabled = true | Hudi 自动注册 Hive 元数据 |
8. 整合优化建议
1)多表 CDC 同步统一处理
使用 Flink CDC 的 schema-name.table-name
通配符:
'database-name' = 'srm',
'table-name' = '.*',
配合 Flink SQL Catalog + Dynamic Table Factory,可实现一拖 N 的多表处理逻辑。
2)增加清洗逻辑(如空值过滤、转换)
SELECTid,user_id,amount * 1.13 AS amount_tax,ts
FROM ods_orders
WHERE amount IS NOT NULL;
3)写入 Kafka(替代 Hudi) → 用于事件总线或下游消费
CREATE TABLE kafka_sink (id STRING,user_id STRING,amount DOUBLE,ts TIMESTAMP(3)
) WITH ('connector' = 'kafka','topic' = 'ods.orders','properties.bootstrap.servers' = 'kafka:9092','format' = 'json','scan.startup.mode' = 'latest-offset'
);
9. Flink CDC 整合场景汇总
场景 | 描述 | 推荐组件 |
---|---|---|
实时数据入湖 | MySQL → Hudi | Flink CDC + Hudi |
数据仓库加速 | Oracle → Iceberg | Flink CDC + Iceberg |
数据中台构建 | MySQL → Kafka → 多下游 | Flink CDC + Kafka |
数据回流校验 | Kafka → Flink → MySQL | Flink SQL + JDBC Sink |
DWD建模 | ODS → DWD/DWM → ADS | Flink SQL + 维表 JOIN |
10. 可视化监控
工具 | 功能 |
---|---|
Flink UI | Checkpoint、Watermark、吞吐 |
Prometheus | 指标采集 |
Grafana | 监控仪表盘 |
Hive | SQL 查询验证 |
四、自定义 Flink CDC Job 的完整实现
自定义 Flink CDC Job 的完整实现,采用 Java DataStream API 编写,支持:
-
多表接入(MySQL 为例)
-
自定义清洗、转换逻辑
-
支持写入 Kafka、Hudi、Iceberg 等下游系统
-
可部署为标准 Flink 应用(
flink run
执行)
1. 自定义 Flink CDC Job 场景说明
目标:
-
从 MySQL 采集订单表
srm.orders
-
做清洗(如金额换算、字段过滤)
-
输出到 Hudi 表(或 Kafka/Console)
2. 依赖配置(Maven)
<dependencies><!-- Flink CDC --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.1</version></dependency><!-- Flink 通用 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.2</version></dependency><!-- 可选:Sink 依赖,如 Kafka、Hudi、Iceberg -->
</dependencies>
3. 完整代码示例:CustomCdcJob.java
public class CustomCdcJob {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2. 配置 CDC 源:MySQLMySqlSource<Order> mysqlSource = MySqlSource.<Order>builder().hostname("mysql-host").port(3306).databaseList("srm").tableList("srm.orders").username("flink").password("flink123").deserializer(new OrderDeserializationSchema()) // 自定义反序列化.build();// 3. 接入 SourceDataStreamSource<Order> orderStream = env.fromSource(mysqlSource,WatermarkStrategy.noWatermarks(),"MySQL CDC Source");// 4. 数据清洗/转换SingleOutputStreamOperator<Order> cleaned = orderStream.filter(order -> order.amount > 0).map(order -> {order.amount = order.amount * 1.13; // 加税return order;});// 5. Sink:控制台 / Kafka / Hudicleaned.print();env.execute("Custom Flink CDC Job");}
}
4. 自定义反序列化器:OrderDeserializationSchema
public class OrderDeserializationSchema implements DebeziumDeserializationSchema<Order> {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<Order> collector) {Struct value = (Struct) sourceRecord.value();if (value == null) return;Struct after = value.getStruct("after");if (after != null) {Order order = new Order();order.id = after.getString("id");order.userId = after.getString("user_id");order.amount = after.getFloat64("amount");order.ts = Instant.ofEpochMilli(after.getInt64("ts")).atZone(ZoneId.of("UTC")).toLocalDateTime();collector.collect(order);}}@Overridepublic TypeInformation<Order> getProducedType() {return TypeInformation.of(Order.class);}
}
5. 定义 POJO 类:Order.java
public class Order implements Serializable {public String id;public String userId;public Double amount;public LocalDateTime ts;@Overridepublic String toString() {return String.format("[Order] id=%s, user=%s, amt=%.2f, ts=%s",id, userId, amount, ts.toString());}
}
6. Sink 可选方案
1)控制台输出(开发调试)
cleaned.print();
2)Kafka Sink(事件总线)
KafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("kafka:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("srm.orders.cdc").setValueSerializationSchema(new SimpleStringSchema()).build()).build();cleaned.map(order -> JSON.toJSONString(order)).sinkTo(kafkaSink);
3)写入 Hudi 表(通过 Flink Hudi Sink)
cleaned.addSink(HudiSinkUtil.getSink());
自定义 Hudi Sink 工具类可基于 HoodieSink
封装。
七、打包部署方式
1)使用 maven-shade-plugin
打 fat-jar:
mvn clean package -DskipTests
输出:custom-cdc-job-1.0-SNAPSHOT.jar
2)提交到 Flink 集群
flink run -m yarn-cluster -c com.example.CustomCdcJob custom-cdc-job.jar
8. 扩展功能(可选)
功能 | 实现方式 |
---|---|
多表同步 | .tableList("srm.orders,srm.invoice") |
动态 schema 推导 | 使用 JsonDebeziumDeserializationSchema |
维表 join | Flink SQL / Broadcast Join |
自定义状态存储 | Flink KeyedState |
exactly-once 写入 Kafka/Hudi | 使用 checkpoint 支持 |
相关文章:
Flink架构概览,Flink DataStream API 的使用,FlinkCDC的使用
一、Flink与其他组件的协同 Flink 是一个分布式、高性能、始终可用、准确一次(Exactly-Once)语义的流处理引擎,广泛应用于大数据实时处理场景中。它与 Hadoop 生态系统中的组件可以深度集成,形成完整的大数据处理链路。下面我们从…...
AI加速芯片全景图:主流架构和应用场景详解
目录 一、为什么AI芯片如此重要? 二、主流AI芯片架构盘点 三、不同芯片在训练与推理中的部署逻辑 四、真实应用案例解读 五、AI芯片发展趋势预测 AI芯片的选择,是AI系统能否高效运行的关键。今天笔者就从架构角度出发,带你系统了解主流AI加速芯片的种类、优劣对比及实际…...
Ubuntu22.04 系统安装Docker教程
1.更新系统软件包 #确保您的系统软件包是最新的。这有助于避免安装过程中可能遇到的问题 sudo apt update sudo apt upgrade -y 2.安装必要的依赖 sudo apt install apt-transport-https ca-certificates curl software-properties-common -y 3.替换软件源 原来/etc/apt/s…...
更新ubuntu软件源遇到GPG error
BUG背景 执行sudo apt update后遇到类似下列报错: E: The repository https://download.docker.com/linux/ubuntu bionic Release no longer has a Release file. N: Updating from such a repository cant be done securely, and is therefore disabled by defau…...
vue调后台接口
1.1 什么是 axios Axios 是一个基于 promise 的 HTTP 库,可以用来发送网络请求。它可以在浏览器和 node.js 中使用,本质上是对原生 XMLHttpRequest 的封装,符合最新的 ES 规范,支持 Promise API,能够拦截请求和响应&am…...
Ubuntu学习记录
冷知识补充 1.VMware官网安装后,会有两个软件,一个收费(pro)(功能更多,可以一次运行多个虚拟机)(尽管2024年最新版本的也免费了)一个免费(player)。 2.ubuntu打开终端快捷键:ctrlal…...
【音频】如何解析mp3文件
解析和播放MP3文件涉及两个主要步骤:解码(将MP3压缩数据转换为原始PCM音频)和播放(将PCM数据通过音频设备输出)。以下是不同平台和编程语言的实现方法: 一、MP3文件结构基础 MP3文件由多个**帧(Frame)**组成,每帧包含固定时长的音频数据(通常为26ms)。每个帧包含:…...
学习笔记:黑马程序员JavaWeb开发教程(2025.4.9)
12.16 异常处理 定义一个类,加上注解RestControllerAdvice,即定义了一个全局异常处理器 再方法上加上注解ExceptionHandler,通过注解当中的value属性来指定捕获那个类型的异常 完成Filter、interceptor、异常处理代码实操 Filter Filter里…...
【音频】wav文件如何解析编码格式(压缩格式)?
要确定一个WAV文件的编码格式,可以通过以下几种方法实现,包括使用操作系统自带工具、专业音频软件或编程解析文件头信息。以下是详细说明: 一、通过文件属性查看(Windows/macOS) 1. Windows系统 步骤: 右…...
【Django系统】Python+Django携程酒店评论情感分析系统
Python Django携程酒店评论情感分析系统 项目概述 这是一个基于 Django 框架开发的酒店评论情感分析系统。系统使用机器学习技术对酒店评论进行情感分析,帮助酒店管理者了解客户反馈,提升服务质量。 主要功能 评论数据导入:支持导入酒店…...
OpenCv高阶(十六)——Fisherface人脸识别
文章目录 前言一、Fisherface人脸识别原理1. 核心思想:LDA与Fisher准则2. 实现步骤(1) 数据预处理(2) 计算类内散布矩阵 SW对每个类别(每个人)计算均值向量 μi:(3) 计算类间散布矩阵 SB(4) 求解投影矩阵 W(5) 降维与分类 3. Fish…...
数据库与Redis数据一致性解决方案
在写数据时保证 Redis 和数据库数据一致,可采用以下方案,需根据业务场景权衡选择: 1. 先更新数据库,再更新 Redis 步骤: 写入 / 更新数据库数据。删除或更新 Redis 缓存。适用场景:读多写少,对缓存一致性要求不高(短暂不一致可接受)。风险:若第二步失败,导致缓存与…...
Python面试题
Python面试题 Python面试题回答1. Python面向对象的三个特征?多态如何实现和使用2. is 和 的区别?3. GIL了解吗?说说4. 可变类型和不可变类型?5. yield用法?6. 深拷贝和浅拷贝区别?7. Python中的线程8. 生…...
力扣周赛置换环的应用,最少交换次数
置换环的基本概念 置换环是排列组合中的一个概念,用于描述数组元素的重排过程。当我们需要将一个数组转换为另一个数组时,可以把这个转换过程分解为若干个 “环”。每个环代表一组元素的循环交换路径。 举个简单例子 假设原数组 A [3, 2, 1, 4]&…...
差分数组 - 对区间内元素的统一操作
目录 概念 题单 1 拼车 2 将区间分为最少组数 3 字母移位 4 使数组中的所有元素都等于零 5 零数组变换Ⅰ 6 最大化城市的最小电量 概念 差分数组,顾名思义,就是由原数组的相邻元素作差而得到的差值组成的新的数组。 对于原数组 a [ 1 , 3 , 5 …...
线上问题排查
一:CPU飙高问题排查过程 遇到这种问题,首先是登录到服务器,看一下具体情况。 定位进程:top命令,查看CPU占用情况定位线程:top -Hp 1893命令,查看各个线程的CPU使用情况定位代码:pr…...
计及可再生能源不确定性的经济优化调度方法
目前,计及可再生能源不确定性的经济调度方法主要有随机优化、鲁棒优化和区间优化。 随机优化:可再生能源输出被定义为一个已知概率分布的随机变量。 难以同时保证计算精度和效率。 1-场景法 场景生成 基于随机变量概率分布进行采样:蒙特…...
支持向量机(SVM):分类与回归的数学之美
在机器学习的世界里,支持向量机(Support Vector Machine,简称 SVM)是一种极具魅力且应用广泛的算法。它不仅能有效解决分类问题,在回归任务中也有着出色的表现。下面,就让我们深入探索 SVM 如何在分类和回归…...
用户刷题记录日历——签到表功能实现
MySQL实现 在数据库中设计一张签到表,记录用户每次签到的日期及其他相关信息。然后通过时间范围查询得到用户的签到记录。 CREATE TABLE user_sign_in (id BIGINT AUTO_INCREMENT PRIMARY KEY, -- 主键,自动递增userId BIGINT NOT NULL, …...
C语言中的内存函数
目录 1 memcpy()函数的基本信息及功能(1) void * destination(2) const void * source(3) size_t num 1.2 memcpy()函数实战演练1.3 memcpy()函数的模拟实现1.3.1 my_memcpy()函数定义及参数1.3.2 my_memcp…...
本特利内华达330103-00-03-05-02-05毫米接近传感器
描述 3300 XL 8 mm近程传感器系统包括:一个3300 XL 8 mm探头、一根3300 XL延长电缆1和一个3300 XL近程传感器2。 该系统提供的输出电压与探针尖端和观察到的导电表面之间的距离成正比,可以测量静态(位置)和动态(振动)值。该系统的主要应用是流体膜轴承机器的振动和位…...
啤酒游戏与系统思考
今天,与上海地产集团的伙伴们一同体验经典的系统思考沙盘模拟——“啤酒游戏”。虽然大家身处房地产行业,但也会惊讶地发现,啤酒游戏的核心理念对任何行业都适用,尤其是站在全局的角度,做出精准决策。 每次进行啤酒游戏…...
id分页遍历数据漏行问题
令入参id为0 while(true){ select * from table where id>#{id} order by id asc limit 100; 取结果集中最大id作为下次查询的入参 其他操作 } 这个算法一般没问题,但在主从数据系统中,主库写,查询从库遍历数据时,出现了…...
【Vue3】Vue3工程的创建 及 开发者工具的安装
目录 一、创建Vue3工程的方式 方法一 方法二 二、区分Vue3 和 Vue2的构建 观察main.js vue3不向下兼容,也就是说Vue3不支持Vue2的写法! JavaScript 的模块导入有两种常见写法: 三、安装Vue3的开发者工具 总结不易~本章节对我有很大的…...
docker exec -it abc bash
当然可以!让我们详细解析一下 docker exec -it abc bash 这个命令的各个部分及其作用。 命令概述 docker exec -it abc bash这个命令用于在已经运行的 Docker 容器 abc 中启动一个新的交互式终端会话。具体来说,它会执行容器内的 bash 命令,…...
基于AI大语言模型的历史文献分析在气候与灾害重建中的技术-以海南岛千年台风序列重建为例
随着人工智能技术的飞速发展,大语言模型如GPT、BERT等在自然语言处理领域取得了显著成果。这些模型不仅提高了文本数据的处理和理解效率,还为历史灾害研究提供了全新的视角和方法。本文将深入探讨基于AI大语言模型的历史文献分析在气候与灾害重建领域中的…...
【最细】自动化测试-解决日志问题,一文贯通...
目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 持续集成的自动化…...
PromptIDE:一款强大的AI提示词优化工具
今天向大家推荐一款专业且免费的AI提示词工具——PromptIDE。作为AI领域从业者,我们都深知提示词质量对模型输出的重要性,而这款工具正是为解决这一痛点而生。 核心功能解析 1、提示词优化 简单输入你的需求描述,点击AI生成提示词…...
f-string 高效的字符串格式化
f-string,称为格式化字符串常量(formatted string literals),是Python3.6新引入的一种字符串格式化方法,该方法源于PEP 498 – Literal String Interpolation,主要目的是使格式化字符串的操作更加简便。 p…...
Powershell及命令行文章合集(不定期更新)
一、Powershell: 1.Powershell中常用命令和常用属性:https://blog.csdn.net/humors221/article/details/147978718 2.Powershell数值应用讲解:https://blog.csdn.net/humors221/article/details/142897029 3.PowerShell 抓取网络日志:https://blog.csdn.net/humors221/artic…...
leetcode hot100刷题日记——8.合并区间
class Solution { public:vector<vector<int>> merge(vector<vector<int>>& intervals) {if(intervals.empty()){//复习empty函数啊,日记1有的return {};}// 按照区间的起始位置进行排序sort(intervals.begin(), intervals.end());vect…...
基于moonshot模型的Dify大语言模型应用开发核心场景
基于moonshot模型的Dify大语言模型应用开发核心场景学习总结 一、Dify环境部署 1.Docker环境部署 这里使用vagrant部署,下载vagrant之后,vagrant up登陆,vagrant ssh,在vagrant 中使用 vagrant centos/7 init 快速创建虚拟机 安装…...
系统设计应优先考虑数据流还是控制流?为什么优先考虑数据流?数据流为主、控制流为辅的架构原则是什么?控制流优先会导致哪些问题?
在当代软件工程的复杂演化中,每个现代系统,不论是处理金融交易的平台、智能家居系统,还是自动驾驶系统,都面临同一个核心问题:设计者该以“数据流”为主导,还是以“控制流”为主导? 在系统设计过程中,工程师所面对的核心问题不仅是代码的堆叠与组织,更是信息流动模式…...
Redis Cluster动态扩容:架构原理与核心机制解析
一、哈希槽的数学本质与拓扑重构 核心图示:哈希槽分配演变 #mermaid-svg-YmcBfipoPA8LvxYF {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-YmcBfipoPA8LvxYF .error-icon{fill:#552222;}#mermaid-svg-Y…...
高考之外,英国国际大一:开启本科留学新征程
在竞争激烈的教育赛道上,高考一直被视为通向高等学府的重要关卡。然而,当千军万马挤在国内升学这座独木桥时,越来越多有远见的学生和家长将目光投向海外,英国本科留学凭借其灵活的录取机制和多元的升学路径,成为众多学…...
UML 图的细分类别及其应用
统一建模语言(UML,Unified Modeling Language)是一种用于软件系统建模的标准化语言,广泛应用于软件工程领域。UML 图分为多种类别,每种图都有其特定的用途和特点。本文将详细介绍 UML 图的细分类别,包括 类…...
Android10如何设置ro.debuggable=1?
说明:仅供学习使用,请勿用于非法用途,若有侵权,请联系博主删除 作者:zhu6201976 目录 一、背景 二、如何解决? 三、操作步骤 一、背景 Android 10 开始的限制:ro.debuggable 是只读属性 从 …...
每日算法刷题计划Day12 5.21:leetcode不定长滑动窗口求最短/最长3道题,,用时1h40min(有点长了)
求最短/最小 一般题目都有「至少」的要求。 想窗口成立的条件,right右移增强条件,然后while循环left右移最终破坏条件 模版套路 在while循环内更新答案 class Solution { public:int minSubArrayLen(int target, vector<int>& nums) {int …...
JUC高并发编程
1. JUC概述 1.1 什么是JUC JUC时java.util.concurrent工具包的简称。这是一个处理线程的工具包,JDK1.5开始出现的。 1.2 进程和线程的概念 1.2.1 进程与线程 打开一个软件,就开启了一个进程,一个进程会包括很多个线程,线程是…...
算法--js--电话号码的字母组合
题:给定一个仅包含数字 2-9 的字符串,返回所有它能表示的字母组合。答案可以按 任意顺序 返回。给出数字到字母的映射如下(与电话按键相同)。注意 1 不对应任何字母。 function letterCombinations (digits){if (!digits.length)…...
数据库blog4_数据库软件的设计方法与实际架构
🌿数据库的设计 由上一章可以得出数据库着重关注数据的逻辑结构和存储结构。即这是数据库设计的核心,但详细的设计结构也要研究。以下是介绍 🍂数据库架构思路 ● 数据库本身 数据(Data) 数据:数据库中存储的实际信息,是用户存储…...
Kubernetes中runnable接口的深度解析与应用
在 Kubernetes 或其他 Go 项目中,runnable 接口定义了一个通用的运行契约,允许不同类型的组件通过统一的接口启动和管理生命周期。以下是详细解析: 1. 接口定义分析 type runnable interface {RunWithContext(ctx context.Context) error }关…...
curl: (35) Peer reports incompatible or unsupported protocol version.
这个错误信息表明在使用 curl 命令时遇到了 TLS 协议版本不兼容的问题。这通常是因为 curl 和服务器之间在协商 TLS 协议版本时出现了问题。在 CentOS 7 中,你可以尝试以下解决方案: 一、使用--tlsv1.2选项 尝试在 curl 命令中添加 --tlsv1.2 选项&…...
算法竞赛板子
算法竞赛板子 目录 1. ST表_区间最值_gcd_按位与_按位或 2. 树状数组 3. 快读 4. 带权并查集 5. 欧拉筛 6. 组合数 7. lucas定理求组合数 8. 离散化 9. 线形基 10. 主席树 11. 约瑟夫环 12. tarjan 求静态LCA 13. tarjan 求无向图割点 14. tarjan 求无向图割点后的连通块 15.…...
Vulkan 动态渲染
前言 开发环境:Vulkan 1.3.2 Vulkan SDK VS 2022。语言 C vulkan.hpp。依赖vk-bootstrap,SDL3。 很久以前学Vulkan学得不彻底,写引擎的时候才发现那么困难,于是重新回来巩固一下Vulkan基础。并发现了很多小细节大学问。 动态渲…...
【亲测有效】Ubuntu22.04安装黑屏重启进入系统卡死
一:进入U盘安装引导时黑屏 问题描述:选择 ‘try or install ubuntu’ ,开始安装,出现黑屏。 解决方案: 1.安装时,先选择" try or install ubuntu", 此时不要按enter,按"e&quo…...
wps编辑技巧
1、编辑模式 2、图片提取方法:右键保存图片 可以直接右键保存下来看看是否是原始图,如果歪着的图,可能保存下来是正的,直接保存试下 3、加批注...
磁盘分区与挂载——笔记
1.磁盘分区 磁盘分区是将物理磁盘划分为多个逻辑区域的过程。每个分区可视为独立的存储单元,拥有独立的文件系统,可安装不同操作系统或存放不同类型数据。例如,将硬盘分为系统盘(存放操作系统)、数据盘(存…...
安卓基础(代码解析)
Build.VERSION.SDK_INT > Build.VERSION_CODES.M && !Settings.canDrawOverlays(this) Build.VERSION.SDK_INT > Build.VERSION_CODES.M Build.VERSION.SDK_INT:获取当前Android系统的API版本号,每个Android版本都有一个对应的API版本号…...
基于Android的XX校园交流APP
开发语言:Java框架:ssmAndroidJDK版本:JDK1.8服务器:tomcat7数据库:mysql 5.7数据库工具:Navicat12开发软件:eclipse/myeclipse/ideaMaven包:Maven3.3.9 系统展示 APP登录 APP首页…...