大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3)
Paimon的下载及安装,并且了解了主键表的引擎以及changelog-producer的含义参考:
- 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1)
利用Paimon表做lookup join,集成mysql cdc等参考:
- 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2)
利用Paimon的Tag兼容Hive,Branch管理等参考:
- 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3)
大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3)
今天,我们继续快速了解下最近比较火的Apache Paimon:
- 官方文档:https://paimon.apache.org/docs/1.0/
- 推荐阅读:当流计算邂逅数据湖:Paimon 的前生今世
1 利用Tag兼容Hive
- Paimon 的每一次写都会生成一个 Immutable 的快照,快照可以被 Time Travel 的读取。
- 但在大多数情况下,作业会生成过多的快照,所以根据表配置,快照会在合适的时间点被过期。
快照过期还会删除旧的数据文件,过期快照的历史数据将无法再查询。
- 要解决此问题,可以基于快照创建 Tag。Tag 将维护快照的清单和数据文件。
典型的用法是每天创建Tag(如下图所示),然后可以维护每天的历史数据以进行批式查询。
- 推荐在 ODS 层使用 Tag 来替代 Hive 的分区,但是后续的 DWD 和 DWS 不建议。
1.1 Tag创建
1.1.1 自动创建
-- Flink SQL
CREATE TABLE t (k INT PRIMARY KEY NOT ENFORCED,f0 INT,...
) WITH ('tag.automatic-creation' = 'process-time', -- 基于process-time自动创建'tag.creation-period' = 'daily', -- 创建间隔:每天'tag.creation-delay' = '10 m', -- 延迟10min'tag.num-retained-max' = '90' -- 最大保存90天
);
- 上面配置表明每天0点10分钟创建一个 Tag,最大保留3个月的 Tag,Flink 流式写入,自动创建 Tags,自动清理 Tags。
1.1.2 利用Action包创建Tag
<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-1.0.0.jar \create_tag \--warehouse <warehouse-path> \--database <database-name> \ --table <table-name> \--tag_name <tag-name> \[--snapshot <snapshot_id>] \[--time_retained <time-retained>] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
- 如果未设置
snapshot
, 那么默认snapshot_id默认为最新 - 当然,也能删除Tag、回滚Tag,可以参考官网命令:
- Manage Tags | Apache Paimon
1.2 利用Tag映射
-
有了 Tag 后,可以在 Flink SQL 或者 Spark SQL 里使用 Time Travel 来查询 Tags;
-
但是这给业务带来了一个问题,老的 Hive SQL 如何兼容?老的 Hive 可是一个全量分区表,而 Paimon 表是一个非分区主键表,Hive 数据仓库的传统使用更习惯于使用分区来指定查询的 Tag。
-
paimon引入了
metastore.tag-to-partition
和metastore.tag-to-partition.preview'
(配置此参数可以让 Hive SQL 查询到未 Tag 的分区,比如当前最新数据) 来将未分区的主键表映射到 Hive metastore 中的分区表,并映射分区字段为 Tag 查询。 -
Flink 结合 Paimon 打造的入湖架构如下:
- 通过 Flink CDC 一键全增量一体入湖到 Paimon,此任务可以配置 Tag 的自动创建,然后通过 Paimon 的能力,将 Tag 映射为 Hive 的分区,完全兼容原有 Hive SQL 的用法。
- 优势如下:
- 架构链路复杂度低,不再因为各种组件的问题导致链路延时,你只用运维这一个流作业,而且可以完全兼容原有 Hive SQL 用法。
- 时延低:延时取决于流作业的 Checkpoint Interval,
数据最低1分钟实时可见 (建议1-5分钟)
。不但如此,Paimon 也提供了流读的能力,让你完成分钟级的 Streaming 计算,也可以写到下游别的存储。 - 存储成本低:得益于湖格式的 Snapshot 管理,加上 LSM 的文件复用,比如同样是存储 100天的快照,原有 Hive 数仓 100 天需要 100 份的存储,Paimon 在某些增量数据不多的场景只需要 2 份的存储,大幅节省存储资源。
- 计算成本低:得益于 LSM 的增量合并能力,此条链路只有增量数据的处理,没有全量的合并。可能有用户会担心,常驻的流作业会消耗更多的资源,对 Paimon 来说,你可以打开纯异步 Compaction 的机制,以 Paimon 优异的性能表现,只用少量的资源即可完成同步,Paimon 另有整库同步等能力帮助节省资源。
1.2.1 tag-to-partition
-- 创建映射的paimon表
Flink SQL> drop table if exists mydb_t;
Flink SQL> CREATE TABLE mydb_t (pk INT,col1 STRING,col2 STRING
) WITH ('bucket' = '-1',-- Only Hive Engine can be used to query these upsert-to-partitioned tables.-- 将tag映射为hive分区'metastore.tag-to-partition' = 'dt'
);-- 插入数据
-- snapshot=1
Flink SQL> INSERT INTO mydb_t VALUES (1, '10', '100'), (2, '20', '200');
-- snapshot=2
Flink SQL> INSERT INTO mydb_t VALUES (3, '30', '300'), (4, '40', '400');
- 然后,利用action包创建Tag
# 利用action 包创建tag\
# 依旧利用hive元数据做catalog
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \create_tag \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table mydb_t \--tag_name '2025-02-18' \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083 \--snapshot 1[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \create_tag \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table mydb_t \--tag_name '2025-02-19' \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083 \--snapshot 2
- 我们就可以在hive中查看分区,且查询数据
0: jdbc:hive2://192.168.42.101:10000> show partitions mydb_t;
+----------------+
| partition |
+----------------+
| dt=2025-02-18 |
| dt=2025-02-19 |
+----------------+
2 rows selected (0.438 seconds)0: jdbc:hive2://192.168.42.101:10000> select * from mydb_t a where dt = '2025-02-18';
+-------+---------+---------+-------------+
| a.pk | a.col1 | a.col2 | a.dt |
+-------+---------+---------+-------------+
| 1 | 10 | 100 | 2025-02-18 |
| 2 | 20 | 200 | 2025-02-18 |
+-------+---------+---------+-------------+
2 rows selected (3.27 seconds)0: jdbc:hive2://192.168.42.101:10000> select * from mydb_t a where dt = '2025-02-19';
+-------+---------+---------+-------------+
| a.pk | a.col1 | a.col2 | a.dt |
+-------+---------+---------+-------------+
| 1 | 10 | 100 | 2025-02-19 |
| 2 | 20 | 200 | 2025-02-19 |
| 3 | 30 | 300 | 2025-02-19 |
| 4 | 40 | 400 | 2025-02-19 |
+-------+---------+---------+-------------+
1.2.2 tag-to-partition.preview
- 上述示例只能查询已经创建的tag,但Paimon是一个实时数据湖,您还需要查询最新的数据。因此,Paimon提供了一个预览功能
'metastore.tag-to-partition.preview'
可选值如下:- “none”:不自动创建标签;
- “process-time”:基于机器时间,当处理时间超过周期时间加上延迟时,创建标签;
- “watermark”:基于输入的watermark,当watermark超过周期时间加上延迟时,创建标签;
- “batch”:在批处理场景中,任务完成后生成当前快照对应的标签。
Flink SQL> drop table if exists mydb_preview;
Flink SQL> CREATE TABLE mydb_preview (pk INT,col1 STRING,col2 STRING
) WITH ('bucket' = '-1','metastore.tag-to-partition' = 'dt',-- paimon会基于process-time提前创建partitions'metastore.tag-to-partition.preview' = 'process-time'
);-- snapshot=1
Flink SQL> INSERT INTO mydb_preview VALUES (1, '10', '100'), (2, '20', '200');-- create tag '2025-02-19' for snapshot 1
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \create_tag \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table mydb_preview \--tag_name '2025-02-19' \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083 \--snapshot 10: jdbc:hive2://192.168.42.101:10000> show partitions mydb_preview;;
+----------------+
| partition |
+----------------+
| dt=2025-02-19 |
| dt=2025-02-20 |
+----------------+
2 rows selected (0.085 seconds)
0: jdbc:hive2://192.168.42.101:10000> select * from mydb_preview a where dt = '2025-02-19';
+-------+---------+---------+-------------+
| a.pk | a.col1 | a.col2 | a.dt |
+-------+---------+---------+-------------+
| 1 | 10 | 100 | 2025-02-19 |
| 2 | 20 | 200 | 2025-02-19 |
+-------+---------+---------+-------------+
2 rows selected (0.292 seconds)
0: jdbc:hive2://192.168.42.101:10000> select * from mydb_preview a where dt = '2025-02-20';
+-------+---------+---------+-------------+
| a.pk | a.col1 | a.col2 | a.dt |
+-------+---------+---------+-------------+
| 1 | 10 | 100 | 2025-02-20 |
| 2 | 20 | 200 | 2025-02-20 |
+-------+---------+---------+-------------+
2 rows selected (0.263 seconds)-- new data in '2025-02-20'
Flink SQL> INSERT INTO mydb_preview VALUES (3, '30', '300'), (4, '40', '400');0: jdbc:hive2://192.168.42.101:10000> select * from mydb_preview a where dt = '2025-02-20';
+-------+---------+---------+-------------+
| a.pk | a.col1 | a.col2 | a.dt |
+-------+---------+---------+-------------+
| 1 | 10 | 100 | 2025-02-20 |
| 2 | 20 | 200 | 2025-02-20 |
| 3 | 30 | 300 | 2025-02-20 |
| 4 | 40 | 400 | 2025-02-20 |
+-------+---------+---------+-------------+
2 Branch分支管理
- 在流式数据处理中,修正数据具有挑战性,因为它可能会影响现有数据,而用户会看到流式的临时结果,这是不期望的。
- 我们假设现有工作流正在处理的分支是
main分支
。通过创建自定义数据分支,可以在现有表上对新任务进行实验性测试和数据验证,而无需停止现有的读取/写入工作流,也无需从主分支复制数据。 - 通过合并或替换分支操作,用户可以完成数据的修正。
-- 1、创建paimon表
Flink SQL> drop table if exists flink_branch_demo;
Flink SQL> CREATE TABLE flink_branch_demo (dt STRING NOT NULL,name STRING NOT NULL,amount BIGINT,PRIMARY KEY (dt, name) NOT ENFORCED
) PARTITIONED BY (dt)
WITH ('connector' = 'paimon'
);-- 2、创建一个专门用于流写的分支streambranch, 这个分支将负责接收实时流入的数据。
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \create_branch \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table flink_branch_demo \--branch_name streambranch \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083-- 3、设置流写分支的属性
Flink SQL> ALTER TABLE `flink_branch_demo$branch_streambranch` SET ('bucket' = '4','changelog-producer' = 'lookup'
);-- 4、设置回滚分支(如果要实现分支回滚必须要设置该参数)
Flink SQL> ALTER TABLE flink_branch_demo SET ( 'scan.fallback-branch' = 'streambranch' );-- 5、写入数据
-- 5-1、主分支写入数据
Flink SQL> insert into flink_branch_demo values ('20240725', 'apple', 3), ('20240725', 'banana', 5);Flink SQL> select * from flink_branch_demo;
+----------+--------+--------+
| dt | name | amount |
+----------+--------+--------+
| 20240725 | apple | 3 |
| 20240725 | banana | 5 |
+----------+--------+--------+
2 rows in set-- 5-2、再往streambranch分支写入数据
Flink SQL> INSERT INTO `flink_branch_demo$branch_streambranch`
VALUES ('20240725', 'apple', 666), ('20240725', 'peach', 999), ('20240726', 'cherry', 33), ('20240726', 'pear', 88);-- 5-3、查询主分支
-- 20240725分区的新的数据没有生效! 那说明原表已经有的分区的数据,在streambranch写入这些分区的数据,原表是不会更新的,只要是往原表里面写了某个分区的数据,那么这个分区的数据以写入原表主分支的为准。
-- 原表主分支没有的分区的数据,则按照streambranch读取,因为设置了原表的 'scan.fallback-branch' = 'streambranch' ,读取原表可以查到streambranch这部分的数据。
Flink SQL> select * from flink_branch_demo;
+----------+--------+--------+
| dt | name | amount |
+----------+--------+--------+
| 20240726 | cherry | 33 | -- 26号分区主表没有,使用了分支表中的数据
| 20240726 | pear | 88 |
| 20240725 | apple | 3 | -- 25号的分区使用了主表中的数据
| 20240725 | banana | 5 |
+----------+--------+--------+-- 5-4、查询流分支
Flink SQL> select * from `flink_branch_demo$branch_streambranch` ;
+----------+--------+--------+
| dt | name | amount |
+----------+--------+--------+
| 20240726 | cherry | 33 |
| 20240726 | pear | 88 |
| 20240725 | apple | 666 |
| 20240725 | peach | 999 |
+----------+--------+--------+
4 rows in set-- 6、合并分支
-- 合并分支表操作(Fast Forward),即:删除主表的一切数据,并将分支表的一切数据拷贝到主表
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \fast_forward \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table flink_branch_demo \--branch_name streambranch \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083-- 再次查询主表
Flink SQL> select * from flink_branch_demo;
+----------+--------+--------+
| dt | name | amount |
+----------+--------+--------+
| 20240726 | cherry | 33 |
| 20240726 | pear | 88 |
| 20240725 | apple | 666 |
| 20240725 | peach | 999 |
+----------+--------+--------+-- 7、数据回归到主分支版本(注意:不进行上面合并操作)
Flink SQL> ALTER TABLE flink_branch_demo RESET( 'scan.fallback-branch');
3 追加表(Append table)
- 如果一个表没有定义主键,那它就是一个追加表(
Append Table
)。与主键表相比,追加表无法直接接收变更日志,也不能直接通过 upsert 更新数据,只能接收追加数据。
使用场景或优势 | 说明 |
---|---|
批量写入和批量读取 | 类似于常规的 Hive 分区表,适用于大规模数据的批量处理。 |
友好的对象存储 | 良好的兼容性和适应性,支持 S3、OSS 等对象存储。 |
时间穿越和回滚 | 支持数据的时间旅行和回滚功能,方便数据的历史查询和恢复。 |
低成本的删除和更新 | 在批量数据操作中,能够以较低的计算和资源成本进行删除和更新操作。 |
流式接收中的小文件自动合并 | 在流式写入过程中,自动处理小文件合并,减少存储碎片。 |
队列形式的流式读写 | 支持如队列般的流式读写操作,可以像消息队列一样处理数据。 |
高性能查询 | 通过顺序和索引实现的高效查询性能。 |
3.1 流式处理
- Append Table可以通过 Flink 进行非常灵活的流式写入,并可以像队列一样通过 Flink 进行读取。
- 唯一的区别是其延迟为分钟级别,但其优势在于非常低的成本以及能够进行过滤和投影下推。
3.1.1 小文件自动合并
- 在流式写入作业中,如果没有定义分桶(bucket),写入器不会进行压缩;
- 相反,将使用压缩协调器(Compact Coordinator)扫描小文件并将压缩任务传递给压缩工作者(Compact Worker)。
- 流式模式下,如果在 Flink 中运行插入 SQL,拓扑结构将如下所示:
- 注意:
- 上面的压缩任务不会引起反压。
- 如果设置 write-only 为 true,压缩协调器(Compact Coordinator)和压缩工作者(Compact Worker)将在拓扑中被移除。
- 自动压缩仅在 Flink 引擎的流模式下被支持。可以通过 Paimon 在 Flink 中启动压缩作业,并通过设置 write-only 禁用所有其他压缩。
3.1.2 流式查询
- 追加表可以像消息队列一样使用,进行流式查询,与主键表类似,有两个选项可以进行流式读取:
- 默认模式:流式读取在首次启动时生成表的最新快照,并继续读取最新的增量记录。
- 增量模式:可以指定 scan.mode 或 scan.snapshot-id 或 scan.timestamp-millis 或 scan.file-creation-time-millis 进行增量读取。
- 追加表的流式查询类似 Flink-Kafka,默认情况下不保证顺序。如果数据需要某种顺序,也需要考虑定义桶键(bucket-key),即
Bucketed Append
。
3.2 查询优化
3.2.1 按照顺序跳过查询
- Paimon 默认在清单文件中记录每个字段的最大值和最小值。
- 在查询时,根据查询的 WHERE 条件,通过清单中的统计信息进行文件过滤。如果过滤效果良好,查询时间可以从分钟级别加速到毫秒级别。
- 然而,数据分布并不总是能有效过滤,因此如果可以根据 WHERE 条件中的字段对数据进行排序,将会更高效。
- 具体可参考:Flink COMPACT Action or Flink COMPACT Procedure or Spark COMPACT Procedure.
3.2.2 按文件索引跳过数据
-
如下代码所示,可以使用文件索引,会在读取端通过索引过滤文件
- 定义
file-index.bloom-filter.columns
后,Paimon 将为每个文件创建相应的索引文件。 - 如果索引文件太小,它将直接存储在清单中,否则将存储在数据文件的目录中。
- 每个数据文件对应一个索引文件,该文件有独立的定义,可以包含不同类型的多列索引。
CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT ) WITH ('file-index.bloom-filter.columns' = 'product_id','file-index.bloom-filter.product_id.items' = '200' );
- 定义
-
索引种类如下所示:
# 布隆过滤器索引 file-index.bloom-filter.columns:指定需要创建布隆过滤器索引的列。 file-index.bloom-filter.<column_name>.fpp:配置布隆过滤器的误报率(False Positive Probability)。 file-index.bloom-filter.<column_name>.items:配置每个数据文件中预期的唯一项数量。# Bitmap(位图索引): file-index.bitmap.columns:指定需要创建位图索引的列。# Bit-Slice Index Bitmap(位切片索引位图): file-index.bsi.columns:指定需要创建位切片索引(BSI)的列。如果想为现有表添加文件索引,且不进行任何数据重写,可以使用rewrite_file_index过程。 在使用该过程之前,可以使用ALTER子句来为表配置file-index.<filter-type>.columns。 可以参考: https://paimon.apache.org/docs/1.0/flink/procedures/#procedures
- 布隆过滤器索引和位图索引的区别
特性 布隆过滤器索引(Bloom Filter Index) 位图索引(Bitmap Index) 设计目标 快速判断某个值是否可能存在,减少磁盘 I/O 精确查询低基数列,支持多条件组合查询 实现原理 基于哈希函数的概率型数据结构 基于Bitmap的精确索引结构 适用数据类型 高基数列(如唯一 ID、字符串等) 低基数列(如性别、状态等) 查询类型 等值查询( =
)等值查询( =
)和多条件组合查询(AND
、OR
)存储效率 存储空间小,适合大规模数据集 低基数列存储效率高,高基数列存储开销大 查询效率 查询速度快,但存在误报率 查询效率高,无误报率 更新代价 较低 较高 适用场景 大数据集的快速过滤 低基数列的精确查询和多条件组合查询 - 位图索引和位切片索引的区别
特性 Bitmap Index(位图索引) Bit-Slice Index (BSI)(位切片索引) 适用数据类型 低基数(即列中唯一值的数量较少)的任意类型(如枚举、状态等) 高基数的数值型数据(如金额、时间戳等) 查询类型 等值查询、范围查询 范围查询、聚合查询(如 SUM
、MAX
等)存储效率 低基数列高效,高基数列存储开销大 高基数列存储效率高 实现复杂度 简单 复杂 更新代价 较高 较高
3.3 Bucketed Append
- 可以指定 bucket 和 bucket-key 以创建一个Bucketed Append表。
- 在Bucketed Append中,不同桶内的数据是严格有序的,流式读取将按写入顺序准确地传输记录。这样可以优化数据处理和查询性能。
-- 创建Bucketed Append表
CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT
) WITH ('bucket' = '8','bucket-key' = 'product_id'
);
3.3.1 有界流
- 流式来源(Streaming Source)也可以是有界的,可以通过指定 scan.bounded.watermark 来定义有界流模式的结束条件。
- 例如,指定kafka源并声明watermark 的定义。当使用此kafka源写入Paimon表时,Paimon表的快照将生成相应的watermark,以便流式读取此Paimon表时可以使用有界watermark的功能。
-- 临时表
drop TEMPORARY table if exists order_from_kafka;
CREATE TEMPORARY TABLE order_from_kafka (`user` int,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '8' HOUR - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders_test','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','properties.group.id' = 'tGroup','json.fail-on-missing-field' = 'false','scan.startup.mode' = 'earliest-offset','json.ignore-parse-errors' = 'true'
);-- 创建topic
/opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --topic orders_test --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092 -- paimon追加表
drop table if exists paimon_r;
CREATE TABLE paimon_r (`user` int,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '8' HOUR - INTERVAL '5' SECOND
) WITH ('connector' = 'paimon'
);-- 将Kafka表中的数据实时插入到Paimon表中:
INSERT INTO paimon_r SELECT * FROM order_from_kafka;-- 启动有界流任务读取 Paimon 表
-- 1696126500000 2023-10-01 10:15:00-- 当Flink处理过程中遇到第一个水印值大于或等于这个时间点的记录时,
-- 它会停止继续读取后续的数据,即使数据源中还有更晚时间点的数据。
Flink SQL> SELECT * FROM paimon_r /*+ OPTIONS('scan.bounded.watermark'='1696126500000') */;
+----+-------------+--------------------------------+-------------------------+
| op | user | product | order_time |
+----+-------------+--------------------------------+-------------------------+
| +I | 1001 | iPhone 15 | 2023-10-01 10:00:00.000 |
| +I | 1002 | MacBook Pro | 2023-10-01 10:05:00.000 |
| +I | 1003 | AirPods Pro | 2023-10-01 10:10:00.000 |
| +I | 1004 | iPad Air | 2023-10-01 10:15:00.000 |
+----+-------------+--------------------------------+-------------------------+
Received a total of 4 rows-- 启动命令行生产者,模拟数据源源源不断地生产数据(每隔一段时间插入1条数据)
/opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --topic orders_test --bootstrap-server centos01:9092
{"user": 1001, "product": "iPhone 15", "order_time": "2023-10-01 10:00:00"}
{"user": 1002, "product": "MacBook Pro", "order_time": "2023-10-01 10:05:00"}
{"user": 1003, "product": "AirPods Pro", "order_time": "2023-10-01 10:10:00"}
-- "2023-10-01 10:15:00" 时候watermark是1696126495000,即:2023-10-01 10:14:55
-- 此时有界流并未结束
{"user": 1004, "product": "iPad Air", "order_time": "2023-10-01 10:15:00"}
-- "2023-10-01 10:20:00" 时候watermark是1696126795000
-- 即:2023-10-01 10:19:55 > 2023-10-01 10:15:00(1696126500000)
-- 停止继续读取后续的数据,即使数据源中还有更晚时间点的数据
{"user": 1005, "product": "Apple Watch", "order_time": "2023-10-01 10:20:00"}
{"user": 1006, "product": "Apple Watch", "order_time": "2023-10-02 08:00:00"}
{"user": 1007, "product": "Apple Watch", "order_time": "2023-10-03 08:20:00"}
{"user": 1008, "product": "Apple Watch", "order_time": "2024-10-03 08:20:00"}
{"user": 1009, "product": "Apple Watch", "order_time": "2025-10-03 08:20:00"}
3.3.2 批处理(Batch)
- 通过设置
spark.sql.sources.v2.bucketing.enabled
为 true,Spark 将识别 V2 数据源报告的特定分布,并在必要时尝试避免shuffle。 - 如下代码所示,如果两个表具有相同的分桶策略和相同数量的桶,昂贵的 join shuffle 操作将被避免。
-- 在必要时尝试避免shuffle
SET spark.sql.sources.v2.bucketing.enabled = true;-- 事实表
CREATE TABLE FACT_TABLE (order_id INT, f1 STRING
) TBLPROPERTIES ('bucket'='10', 'bucket-key' = 'order_id');-- 维度表
CREATE TABLE DIM_TABLE (order_id INT, f2 STRING
) TBLPROPERTIES ('bucket'='10', 'primary-key' = 'order_id');SELECT *
FROM FACT_TABLE
JOIN DIM_TABLE
ON FACT_TABLE.order_id = DIM_TABLE.order_id;
注:
-
Paimon还有其他功能,这里就不再介绍,可以参考官网自行了解。例如:
-
Paimon 在 Flink 1.17 及后续版本中支持使用 UPDATE 更新主键表记录、使用DELETE删除change-log数据;
-
流式读取表时指定consumer-id,防止快照因为过期而被删除;
-
paimon提供了包含有关每个表的元数据和信息的系统表,例如创建的快照和使用的选项。用户可以通过批量查询访问系统表。
-- 快照表 Snapshots Table SELECT * FROM ws_t$snapshots;-- 模式表 Schemas Table SELECT * FROM ws_t$schemas;-- 选项表 Options Table SELECT * FROM ws_t$options;-- 标签表 Tags Table SELECT * FROM ws_t$tags;-- 审计日志表 Audit log Table SELECT * FROM ws_t$audit_log; ......
-
可以集成其他引擎,如spark引擎等
-
Paimon表支持分区过期配置
-
缩放Bucket官方示例
- Rescale Bucket | Apache Paimon
-
相关文章:
大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3)
Paimon的下载及安装,并且了解了主键表的引擎以及changelog-producer的含义参考: 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1) 利用Paimon表做lookup join,集成mysql cdc等参考: 大数据组件(四)快速入门实时数据…...
【论文解读】《Training Large Language Models to Reason in a Continuous Latent Space》
论文链接 1. 背景与动机 语言空间与推理的矛盾 目前大多数大语言模型(LLMs)在解决复杂问题时采用链式思维(Chain-of-Thought, CoT)方法,即利用自然语言逐步推导出答案。然而,论文指出: 自然语言…...
Linux-CentOS 7安装
Centos 7镜像:https://pan.baidu.com/s/1fkQHYT64RMFRGLZy1xnSWw 提取码: q2w2 VMware Workstation:https://pan.baidu.com/s/1JnRcDBIIOWGf6FnGY_0LgA 提取码: w2e2 1、打开vmware workstation 2、选择主界面的"创建新的虚拟机"或者点击左上…...
【Web RCE 漏洞常见类型】
Web RCE 漏洞常见类型 1. 注入类漏洞2. 反序列化漏洞3. 文件处理漏洞4. 模板引擎漏洞5. 服务端请求伪造(SSRF)6. 框架/中间件漏洞7. 第三方组件漏洞8. 配置不当与协议滥用9. 其他边缘场景防御建议 以下是可以导致远程代码执行(RCE)…...
【蓝桥杯单片机】第十三届省赛第二场
一、真题 二、模块构建 1.编写初始化函数(init.c) void Cls_Peripheral(void); 关闭led led对应的锁存器由Y4C控制关闭蜂鸣器和继电器 2.编写LED函数(led.c) void Led_Disp(unsigned char ucLed); 将ucLed取反的值赋给P0 开启锁存器 关闭锁存…...
【够用就好006】-PC桌面管理ECS服务器的实操步骤
背景介绍解决思路拓展知识 背景介绍 #够用就好#知其然知其所以然#aigc创意人左边 我计划搭建个人网站,计划格式化我的ECS服务器,但是里面有我之前的实践项目,我舍不得删除,我想要保存到本地。 通常我都是在vscode中用remotes ssh…...
Spring Boot 2/3.x 中 MultipartFile 接收问题深度解析与实战解决方案
文章目录 引言:文件上传的暗礁与应对一、核心机制解析1.1 多部分请求处理流程1.2 关键配置参数演进 二、典型问题排查与修复2.1 文件接收为null问题2.2 大文件上传内存溢出 三、版本差异陷阱3.1 Jakarta Servlet API迁移影响3.2 默认配置变更对比 四、高级问题解决方…...
MySQL的三种并发问题和四种隔离级别
阅读之前,请心里默念,脏读、不可重复读、幻读是三种常见的并发问题,隔离级别是应对并发问题的四种隔离级别,隔离级别和并发问题是两个东西,不要混淆。 在数据库事务中,脏读(Dirty Readÿ…...
【复习】Redis
数据结构 Redis常见的数据结构 String:缓存对象Hash:缓存对象、购物车List:消息队列Set:点赞、共同关注ZSet:排序 Zset底层? Zset底层的数据结构是由压缩链表或跳表实现的 如果有序集合的元素 < 12…...
【Docker】如何在Linux、Windows、MacOS中安装Docker
Linux安装Docker 在终端中执行一键安装脚本命令安装dockersudo curl -fsSL https://gitee.com/tech-shrimp/docker_installer/releases/download/latest/linux.sh | bash -s docker --mirror Aliyun1.1 配置docker镜像源 在终端执行 一行命令,编辑配置文件sudo tee /etc/docke…...
Linux System V - 消息队列与责任链模式
概念 消息队列是一种以消息为单位的进程间通信机制,允许一个或多个进程向队列中发送消息,同时允许一个或多个进程从队列中接收消息。消息队列由内核维护,具有以下特点: 异步通信:发送方和接收方不需要同时运行&#x…...
k2路由器登录校园网
教程1刷入Breed,并手动刷入Padavan固件:斐讯K1、K2、K2P 刷机、刷入Breed 辅助工具 | tb (tbvv.net) Padavan下载网址: 我用的是: Padavan 登录的网址是 192.168.123.1 Padavan配置教程: 先用网线连上校园网&#…...
Docker基础实践与应用举例
Docker 是一个轻量级容器化平台,通过将应用及其依赖打包到容器中,实现快速部署和环境一致性。以下是 Docker 的实践与应用场景举例,结合具体操作步骤: 一、基础实践 1. 快速启动一个容器 # 运行一个Nginx容器,映射宿…...
EndNote与Word关联:科研写作的高效助力
在科研领域,文献管理与论文写作是紧密相连的重要环节。EndNote作为一款强大的文献管理工具,与Word实现有效关联后,能极大地提升科研写作效率。本文将详细介绍EndNote与Word关联的方法、关联后的优势、常见问题及解决办法,助力科研…...
用PyTorch从零构建 DeepSeek R1:模型架构和分步训练详解
DeepSeek R1 的完整训练流程核心在于,在其基础模型 DeepSeek V3 之上,运用了多种强化学习策略。 本文将从一个可本地运行的基础模型起步,并参照其技术报告,完全从零开始构建 DeepSeek R1,理论结合实践,逐步…...
SOME/IP-SD -- 协议英文原文讲解2
前言 SOME/IP协议越来越多的用于汽车电子行业中,关于协议详细完全的中文资料却没有,所以我将结合工作经验并对照英文原版协议做一系列的文章。基本分三大块: 1. SOME/IP协议讲解 2. SOME/IP-SD协议讲解 3. python/C举例调试讲解 5.1.2.2 S…...
Matlab——图像保存导出成好看的.pdf格式文件
点击图像的右上角,点击第一个保存按钮键。...
Mybatis常用动态 SQL 相关标签
1. <if> 用于条件判断,当满足条件时执行对应的 SQL 片段。 示例: <select id"findUser" resultType"User">SELECT * FROM usersWHERE 11<if test"name ! null and name ! ">AND name #{name}</if><if…...
计算机网络与通讯知识总结
计算机网络与通讯知识总结 基础知识总结 1)FTP:文件传输 SSH:远程登录 HTTP:网址访问 2)交换机 定义:一种基于MAC地址实现局域网(LAN)内数据高速转发的网络设备,可为接入设备提供独享通信通道。 - 核心功能: 1.数据链路层(OSI第二层)工作,通过MAC地址…...
Redis 通用命令
Redis 通用命令 文章目录 Redis 通用命令 1. 启动redis 1.1 前台启动1.2 后台启动1.3 开机自启 2.Redis命令行客户端3. 常见命令 3.1 help3.2 KEYS3.3 DEL3.4 EXISTS3.5 EXPIRE&TTL 1. 启动redis 1.1 前台启动 在安装好redis后,我们可以在任意目录输入以…...
【idea问题排查技巧】
以下是针对 IDEA 中 日志打标(动态标记) 和 全链路追踪 功能的分步详解,结合具体场景和操作截图说明,帮助快速掌握实战技巧。 一、动态日志打标:不修改代码输出关键信息 1. 断点日志打印(非侵入式打标) 场景:在调试时,需要临时查看某个变量的值,但不想修改代码添加…...
VSCode自定义快捷键和添加自定义快捷键按键到状态栏
VSCode自定义快捷键和添加自定义快捷键按键到状态栏 📄在VSCode中想实现快捷键方式执行与某些指令操作进行绑定,可以通过配置组合式的键盘按键映射来实现,另外一种方式就是将执行某些特定的指令嵌入在面板菜单上,在想要执行的时候…...
【Redis 原理】通信协议 内存回收
文章目录 通信协议--RESP内存回收内存过期策略惰性删除周期删除 内存淘汰策略 通信协议–RESP Redis是一个CS架构的软件,通信一般分两步(不包括pipeline和PubSub): 客户端(client)向服务端(se…...
AWS - Redshift - 外部表读取 Parquet 文件中 timestamp 类型的数据
问题: 通过 Redshift Spectrum 功能可以读取 S3 中的文件,当读取 Parquet 文件时,如果列格式设置为 timestamp, 通过 psql 客户端读取会出现以下错误: testdb# select * from myspectrum_schema_0219.test_ns; ERROR…...
H5--开发适配
在 H5 开发中,适配不同设备和屏幕尺寸至关重要,它能确保页面在各种环境下都有良好的显示效果和用户体验。以下介绍几种常见的 H5 开发适配方案: 视口(Viewport)设置 视口单位是相对于浏览器视口的尺寸进行度量的单位&…...
llama-factory部署微调方法(wsl-Ubuntu Windows)
llama-factory项目GitHub地址:GitHub - hiyouga/LLaMA-Factory: Unified Efficient Fine-Tuning of 100 LLMs & VLMs (ACL 2024) wsl-Ubuntu: 1.获取项目 git clone https://github.com/hiyouga/LLaMA-Factory.gitcd LLaMA-Factory/ 2.安装环境…...
【Unity】鱼群效果模拟
鱼群效果模拟 文章目录 鱼群效果模拟Boid算法实现方式version1_CPUversion2_GPUversion3_Multilaterationversion4_Bitonic_Sorting (GPU友好)version5_Skinning (TODO) 细节项优化项参考链接 Boid算法 Boid算法是一种模拟群体行…...
C++ 编程语言简介
C 是一种通用编程语言,它是作为 C 语言的增强而开发的,以包含面向对象的范例。它是一种命令式和编译语言。 C 是一种高级的通用编程语言,专为系统和应用程序编程而设计。它由贝尔实验室的 Bjarne Stroustrup 于 1983 年开发,作为…...
Day15-后端Web实战-登录认证——会话技术JWT令牌过滤器拦截器
目录 登录认证1. 登录功能1.1 需求1.2 接口文档1.3 思路分析1.4 功能开发1.5 测试 2. 登录校验2.1 问题分析2.2 会话技术2.2.1 会话技术介绍2.2.2 会话跟踪方案2.2.2.1 方案一 - Cookie2.2.2.2 方案二 - Session2.2.2.3 方案三 - 令牌技术 2.3 JWT令牌2.3.1 介绍2.3.2 生成和校…...
迪威模型:引领 3D 模型轻量化技术革新
在数字化时代,3D 模型的应用领域愈发广泛,从影视制作、游戏开发到工业设计、建筑仿真等,都离不开 3D 模型的支持。然而,随着模型复杂度的不断提高,文件体积也日益庞大,这给存储、传输和加载带来了极大的挑战…...
大学本科教务系统设计方案,涵盖需求分析、架构设计、核心模块和技术实现要点
以下是大学本科教务系统的设计方案,涵盖需求分析、架构设计、核心模块和技术实现要点: 大学本科教务系统设计方案 一、需求分析 1. 核心用户角色 角色功能需求学生选课/退课、成绩查询、课表查看、学分统计、考试报名、学业预警教师成绩录入、课程大纲上传、教学进度管理、…...
安装Liunx(CentOS-6-x86_64)系统
一:下载与安装Liunx(CentOS-7-x86_64) 1.下载: CentOS-6.10-x86_64-bin-DVD1.iso 2.安装: 按照自己的需求来 下载的镜像文件地址 加载完成后设置 查看网络和本地ip 3.配置仓库(用于yum下载࿰…...
DeepSeek开源周 Day01:从FlashMLA背后原理回顾KV Cache
FlashMLA 今天DeepSeek开源周第一天,开放了FlashMLA仓库,1小时内星标1.6k! FlashMLA 是一个高效的 MLA 解码内核,专为 Hopper GPU 优化,适用于可变长度序列。该项目目前发布了 BF16 和具有 64 块大小分页 kvcache 的功…...
java23种设计模式-工厂方法模式
工厂方法模式(Factory Method Pattern)学习笔记 🌟 定义 工厂方法模式属于创建型设计模式,定义一个创建对象的接口,但让子类决定实例化哪一个类。将类的实例化操作延迟到子类,是面向对象设计中"开闭…...
数据驱动未来!天合光能与永洪科技携手开启数字化新篇章
在信息化时代的今天,企业间的竞争早就超越了传统产品与服务的范畴,新的核心竞争力即——数据处理能力和信息技术的应用。作为数据技术领域的领军者,永洪科技凭借其深厚的技术积累和丰富的行业经验,成功助力天合光能实现数字化升级…...
【C++设计模式】工厂方法设计模式:深入解析从基础到进阶
1. 引言 在软件开发的世界里,设计模式如同巧妙的建筑蓝图,为解决常见问题提供了行之有效的方案。工厂方法模式作为一种广受欢迎的创建型设计模式,以其独特的优势在众多项目中得到广泛应用。它不仅能够为对象的创建提供通用且灵活的方式,还能有效隐藏实现细节,提升代码的可…...
Vue 3 + Vite 项目中配置代理解决开发环境中跨域请求问题
在 Vue 3 Vite 项目中,配置代理是解决开发环境中跨域请求问题的常见方法。通过在 Vite 的配置文件中设置代理,可以将前端请求转发到后端服务器,从而避免浏览器的同源策略限制。 1. 创建 Vue 3 Vite 项目 首先,确保你已经安装了…...
2.3 变量
版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 变量是用来存放某个值的数据,它可以表示一个数字、一个字符串、一个结构、一个类等。变量包含名称、类型和值。在代码中…...
16、Python面试题解析:python中的浅拷贝和深拷贝
在 Python 中,浅拷贝(Shallow Copy) 和 深拷贝(Deep Copy) 是处理对象复制的两种重要机制,它们的区别主要体现在对嵌套对象的处理方式上。以下是详细解析: 1. 浅拷贝(Shallow Copy&a…...
BUUCTF-Web方向21-25wp
目录 [HCTF 2018]admin弱口令session伪造 [MRCTF2020]你传你🐎呢[护网杯 2018]easy_tornado[ZJCTF 2019]NiZhuanSiWei[MRCTF2020]Ez_bypass第一层第二层 [HCTF 2018]admin 打开环境,有三处提示,一个跳转链接,一个登录注册&#x…...
elementPlus 中表单验证方法(手机号、正整数、邮箱)
1、手机号验证 <el-form ref"formRef" :model"form" :rules"rule" label-width"100px"><el-form-item label"联系电话" prop"mobile"><el-input type"tel" v-model"form.mobile&q…...
阿里云 ACS:高效、弹性、低成本的容器计算解决方案
阿里云的 容器计算服务(Alibaba Cloud Container Service, ACS) 是一种 Serverless 容器计算 解决方案,提供高度弹性、低成本、易管理的 Kubernetes(K8s)容器运行环境。用户无需关注底层服务器资源,而是直接…...
启动Redis报错记录
突然启动Redis就报了个错:‘Could not create server TCP listening socket 127.0.0.1:6379: bind: 操作成功完成。‘ 查了下解决方案,应该是6379端口已绑定,服务没有关闭。 需要输入命令redis-cli 再输入shutdown 但又出现了新的问题&…...
springBoot统一响应类型2.0版本
前言: 通过实践而发现真理,又通过实践而证实真理和发展真理。从感性认识而能动地发展到理性认识,又从理性认识而能动地指导革命实践,改造主观世界和客观世界。实践、认识、再实践、再认识,这种形式,循环往…...
ubuntu离线安装Ollama并部署Llama3.1 70B INT4
文章目录 1.下载Ollama2. 下载安装Ollama的安装命令文件install.sh3.安装并验证Ollama4.下载所需要的大模型文件4.1 加载.GGUF文件(推荐、更容易)4.2 加载.Safetensors文件(不建议使用) 5.配置大模型文件 参考: 1、 如…...
Unity游戏制作中的C#基础(4)数组声明和使用
一、数组的声明 在 C# 中,声明数组有多种方式,每种方式都有其适用的场景,下面为你逐一详细介绍: 1. 直接初始化声明 这种方式直观且便捷,在声明数组的同时就为其赋初值,让数组从诞生之初就拥有了具体的数据…...
自定义SpringBoot Starter
✅自定义SpringBoot Starter SpringBoot 的 starter 可以帮我们简化配置,非常的方便,定义起来其实也不复杂,我的项目中定义了很多 starter,比如business-job就是一个 stater,以他为例,介绍下如何定义 star…...
电脑经常绿屏(蓝屏)怎么办(解决方法)?
一、排查系统与驱动问题 进入安全模式修复系统 强制重启电脑 3 次触发恢复环境,选择 疑难解答 > 高级选项 > 启动设置 > 重启,按 F5 或 5 进入带网络连接的安全模式3。 在安全模式下,尝试卸载最近安装的软件或更新,尤其…...
IO/网络IO基础全览
目录 IO基础CPU与外设1. 程序控制IO(轮询)2. 中断中断相关知识中断分类中断处理过程中断隐指令 3. DMA(Direct Memory Access) 缓冲区用户空间和内核空间IO操作的拷贝概念传统IO操作的4次拷贝减少一个CPU拷贝的mmap内存映射文件(m…...
DPVS-5: 后端服务监控原理与测试
后端监控原理 被动监测 DPVS自带了被动监控,通过监控后端服务对外部请求的响应情况,判断服务器是否可用。 DPVS的被动监测,并不能获取后端服务器的详细情况,仅仅通过丢包/拒绝情况来发觉后端服务是否可用。 TCP session state…...