当前位置: 首页 > news >正文

9. Flink的性能优化

1. Flink的资源和代码优化

1.1 slot资源配置

Flink中具体跑任务的进程叫TaskManager,TM进程又会根据配置划分出诺干个TaskSlot,它是具体运行SubTask的地方。slot是Flink用来隔离各个subtask的资源集合,这里的资源一把指内存,TCP连接和CPU是共享的
关于Slot在TM中的个数设置,可以参考如下

  • 如果是Standalone模式,建议Slot的数量和TM的Cpu Core一致
  • 如果是Yarn模式,会根据配置动态申请TM,此时TM就是Yarn的Container,所有对应Slot的设置要根据yarn中Container最大可申请的Core一致,默认是4个

1.2 并行度设置

通常说的并行度是指一个job中并行度最大的那个算子的并行度,设置合适的并行度最关键的还是靠多次的调试,但可以从以下几个方面考虑设置初始并行度

  • 数据源的并行度:对于source/sink端的并行度,一般设置成跟source本身的并行度一直即可,在消费速度跟不上时(出现背压)可以减少source的并行度,减缓背压。但是不建议source端的并行度大于source本身的并行度,这样就会出现并行度空闲的情况,浪费资源
  • 算子逻辑的复杂度:这个比较抽象,一般如果处理逻辑较多时或是需要连接外部资源的时候,都会提高算子的复杂度,此时可适当提高并行度,这一块也是需要多次调试的地方
  • 系统资源可用性:在考虑并行度的时候可以考虑集群的可用资源多少

1.3 SlotSharingGroup

SlotSharingGroup是实现taskslot共享的一种机制,在程序中会有一个默认的“default” 共享组,不同的subtask可以组成一个完整的job管道,放在同一个TaskSlot中运行,他们共享这个taskslot中的所有资源,这些subtask的数据传递就变的更加的简单,提高了运行的效率。不同并行度的相同subtask,会被分配到不同的taskslot中,这样会分散各个taskslot的运行压力,合理利用集群资源

如果一个job中有多个SSG,这个job所需要的taskslot就不是等于它最大并行度,而是等于各个SSG中各自最大并行度的和

在这里插入图片描述

1.4 细粒度资源管理

slot是Flink运行时资源调度和分配的基本单元,Flink1.14之后出现细粒度资源管理概念,之前的版本中对于TM的资源会平均分配给自己拥有的所有slot,这种分配方式会导致一些计算密集性和内存需求大的subtask“挤在”同一个slot中,从而影响作业的运行效率,而且有些subtask可能需要类似GPU这样的贵重资源,之前的粗粒度资源管理方式是不能解决的。细粒度的资源管理是指,可以动态的剪切一个slot专门给到对应的subtask运行,如下图所示
在这里插入图片描述
具体是实现案例如下

//创建SSG共享组对象指定资源配置
SlotSharingGroup ssgB = SlotSharingGroup.newBuilder("b").setCpuCores(0.5).setTaskHeapMemoryMB(10).build();
//直接指定SSG共享组对象名称来使用SSG共享组资源
DataStream<...> ds1 = someStream.filter(...).slotSharingGroup(ssgB)

1.5 Flink的异步IO

Flink中默认是同步IO,向外部组件发布一条查询命令必须要等到第一条回复了,才可以继续发送,这种情况要是在有大批量数据的时候就会出现阻塞的情况,通过异步的IO就可以一次将多个查询发送到外部系统,统一得到回复,用这个方法来提高效率
在这里插入图片描述
需要用异步IO需要外部系统支持异步请求的客户端(如:Java的Vertx),如果不支持可以用线程池模拟异步客户端

1.6 设置barrier对齐和非对齐

checkpoint机制中用barrier来衡量此时是否可以做算子本地的快照存储,在多并行度的情况下,barrier在上下游传递时会涉及到barrier广播和barrier对齐机制。当上游数据向下游多个并行度发送barrier时,需要对barrier进行广播,保证下游各个并行度barrier一致,当上游多个并行度向下游少量并行度传递barrier时,需要对barrier进行对齐,因为每个并行度在上游算子处理完对应数据的时间时不一样的,所有往下游发barrier的时间也不一样
在这里插入图片描述
如上图所示,barrier对齐机制中快的并行度会等待慢的并行度到达之后才能开始做本地的状态快照,在等待的过程中来的数据就会被缓存起来,如果并行度之间相差很大,就会占用很大的内存资源,可能会导致内存溢出,同时它的等待也会迟缓checkpoint的整体时间,还有当数据流被阻塞住处理来不及时会触发反压机制,进而更加限制数据流的流动,导致barrier在的流速更慢,进一步导致checkpoint时间变长,进入恶性循环
在这里插入图片描述
针对上面的问题,Flink中提出barrier的不对齐机制解决这个问题,它的运行过程中,并不会等待那些barrier流速慢的并行度,最快的那个并行度到达之后,对应的算子就会做本地的状态快照,并把barrier继续往下游发,并移除慢的并行度的barrier,从细节上来说,不对齐机制会有额外的input buffer和output buffer用来缓存流中未处理的数据,当barrier达到下游算子的input buffer后,Flink会将该barrier插入到该下游算子的output buffer的最前面,并把该barrier发送给后面的算子,同时当前算子做快照,其中包括当前算子的状态,已经input/output buffer 已经流式慢的barrier之前的数据全部都会保存在状态后端。
两种机制的优缺点对比

  • barrier机制
    • 优点:状态后端保存的数据少
    • 缺点:缓存的数据多,对Flink的内存压力大,checkpoint时间长,易触发反压机制
  • barrier不对其机制
    • 优点:大大加快了checkpoint的进程,不容易出现反压问题
    • 缺点:状态后端保存的数据多,状态恢复时间长
      建议简单的数据处理作业使用轻量级的barrier对齐机制,但对于计算复杂,容易出现反压,checkpoint经常会超时的作业用barrier不对其机制

2. Flink的内存优化

Flink的内存分为Flink的总内存和JVM自身需要的内存,Flink总内存又分为对堆内存和堆外内存,首先,堆堆和堆外都有Framework和Task内存,其中堆外还包含,托管内存和Network内存,这就是Flink内存的模型。其中堆内存是平时跑task的时候用的,堆外内存总的来说,是为了job的优化,提升运行效率而存在的,其中托管内存是用于状态后端数据存储等功能使用,Network用于各个TM之间网络数据的交互使用。
主要优化方式如下

一般需要注意的是TaskManager的内存,因为job的运行主要是在TM上。通常是在程序的配置文件中配置,taskmanager.memory.process.size=2G,jobmanager.memory.process.size=1G,这样的默认配置即可,然后就会到预发布环境去测试运行的效率,主要的标准有这么几个,一个是测试程序在历史峰值的时候会不会出现反压,另一个是再提升到历史峰值的20%的数据量,如果这两个都不会出现反压,就可以。如果出现反压的情况,但是不是很严重的情况,我们一般会通过调整一些参数再观察结果,这个就需要根据自己的程序具体分析,比如程序中开启了checkpoint,而且状态数据也比较大的程序,可以试着调大,taskmanager.memory.managed.fraction(它是控制托管内存占Flink总内存的比例的),增大用于状态后端的内存。如果程序中需要大量数据在各个TM之间传递的话,就试着调大,taskmanager.memory.network.fraction(它是控制network内存在flink内存的占比)。到此如果还是被压严重,就会试着加大TM的整体内存大小

3. Checkpoint和大状态优化

3.1 Checkpoint的优化

3.1.1 在webui中查看checkpoint的情况

在这里插入图片描述

点击running的job,再点击checkpoint标签,主要是看end to end Duration ,checkpointd data size ,full checkpoint data size,processed(persisted) in-flight data,这几个重要指标,他们分别表示,此次CK花的时间,产生多少数据,缓存了多少数据(0表示没有缓存数据)

3.1.2 优化
  • 设置CK的存储位置: 推荐是写入外部的文件系统中,默认是JM的堆内存中
  • CK的模式: 主要看任务的需求,堆数据的一致性要求高可以使用默认的exactly-once,如果堆时延和吞吐要求较高,可以使用at-least-once
  • 指定CK之间的最小间隔时间: 通过观察webui上CK的启动时间,如果经常超过CK的设置时间,说明CK很繁忙,同时也意味这过多的资源被用于CK的过程,为了防止这种情况,可以试着调整两个CK之间的等待时间,让更多的资源用于算子计算来缓解CK的超时问题
  • 设置CK并行度: 这种方案需要保证Flink集群资源充足才可以,不然只会使情况更加恶化,设置多并行度的CK,意味着CK需要占用更多的资源
  • 设置增量的CK: 这点对于大状态的作业效果比较明显,Flink提供了两类状态后端,其中HashMapStateBackend是基于JM的内存存储的全量状态存储,还有就是基于EmbeddedRockDBStateBackend的增量状态后端,它的数据是存储在TM的本地数据目录
  • 开启不对齐CK: 当Flink处于严重背压的情况下,同时它会缓存大量的数据导致CK的周期加长,这种情况下可以试着开启非对齐的CK,它允许第一个barrier到达之后就开始做本地快照不等慢的并行度,同时放弃落后的并行度往下游传barrier,直接把它拉齐,这样很大程度上加快了checkpoint的速度
  • 开启Changelog: Changlog是为了优化增量快照的一种机制,在增量快照中,RocketDB会定期的对本地全量快照做压缩,压缩出来的新数据肯定会大于增量的数据,而且上传也是随着checkpoint机制一起执行的,如果对于大状态且task比较多的job来说,这样压缩后的大状态上传就会变的频繁,这就会导致checkpoint整体的效率变低。
    • Changlog的解决方案: 它主要思路是通过增加Changlog记录实时的状态变更日志来辅助增量状态的更新,压缩后的大状态,它会独立于checkpoint机制单独周期性的上传。具体的实现是,在task做本地快照的时候,它会往Changlog里写,同时也往state table(RocketDB)中写,然后state table中的数据会周期性的独立上传,changlog是实时的上传,等state table把对应的状态上传之后,就会截断changlog中的数据,保证checkpoint Storage只存储一份完整的状态数据
    • Changelog的缺点: 由它的机制可以看出,会在状态存储系统创建更多的文件,同时changlog会给TM带来额外的内存开销,在状态恢复的过程中,需要额外的重放changlog

3.2 RocksDB优化

在对RocketDB进行优化时,我一般会分如下几个层级

  • 调整Flink的manage memory占比: Flink中的manage memory主要是给状态后端使用,所以可以调大它在Flink总内存中的占比(taskmanage.memory.managed.fraction=0.4这是默认值),并观察调优效果
  • 调整RocketDB读写的性能: 经过调整托管内存的效果不明显,可以手动控制RocketDB的参数(state.backend.rocksdb.memory.managed=true默认)修改成false,可以先优化它的读写性能上入手,RocketDB的架构跟Hbase的类似,一共有memtable,blockbuffer,sstfile三层架构组成,数据会先写入memtable中,也就是内存中,等内存的数据达到一定量时会刷写到磁盘中。在读数据的时候,优先会到memtable中读取,如果没有会到blockbuffer中读取(它时通过布隆过滤器的形式得到的一个缓冲数据),如果还是没有找到会到sstfile也就是磁盘上查找。通过以上读写的原理了解,可以增加memtable的内存来优化写的性能(state.backend.rocksdb.memory.write-buffer-ratio=0.5),可以通过增大索引和布隆过滤器的内存来优化读的性能(stae.backend.rocksdb.memory.high-prio-ratio=0.1默认)
  • 调整RocketDB更底层的参数: 如果上面的两个方式效果都不明显,那只能对RocketDB更底层的参数进行调整,一般不太建议,要进行它的调整,建议先开启rocketDB的监控,通过监控更好的观察到问题,再去调整(开启RocketDB监控需要损耗job的整体性能),具体的可参照官网慢慢调整

3.3 Task本地恢复

Task本地恢复功能默认是禁止的,需要配置“state.backend.local-recovery=true”来开启,Flink默认情况下,task是通过到远程持久化存储中拿到对应task的状态用于恢复,这样做的优势是状态具备天然的容错性和各个节点都可以访问获取状态信息,但是缺点也存在,那就是从远程读取效率比较低下,这会导致大状态的task恢复时间长。而且很多时候,task失败重试时,Flink会把task分配到原先节点上继续运行,这就为task本地恢复提供了可行性。task本地恢复的原理是,对于每个checkpoint,每个task不仅将状态写入分布式存储中,同时还在task本地存储一份相同的备份,对于那些重启的task,并且还在被分配在之前的taskmanager节点上运行的task来说,不需要从远程读task状态,直接可以在本地拿到状态数据,用于恢复。

需注意以下几点:
1. 如果对应的taskmanager丢失,那么task的本地状态也会丢失
2. task本地恢复仅涵盖keyed state,不支持算子和定时器状态
3. unaligned checkpoint目前不支持task本地恢复

4. 网络内存优化

4.1 Flink网络传输的过程

Flink网络传输是通过类似生产者-消费者的模式实现,如图上游的TM1生产好数据之后,会写入ResultPartition中,然后会通知Jobmanager,上游数据准备好了,JM会通知到下游的TM2,并找到对应接收此RP缓冲区中的InputChannel,然后InputChannel会返回去通知RP可以启动网络传输,之后RP会把缓冲区数据交给TM1的网络堆栈通过Netty进行传输,到此就完成了上下游数据的传输。

这里需要注意:TM1和TM2之间的Netty网络连接是共享的,而且是长期存在的
在这里插入图片描述

4.2 数据反压机制

Flink的反压机制有两种实现方式,一种是老版本的基于TCP的反压机制,一种是基于Credit的反压机制。下面介绍下这两种
基于TCP的反压机制:
下游的消费者处理数据的速度慢下来之后,刚开始的时候上游感知不是很强,还是依旧源源不断的产生新的buffer数据,然后通过netty往下游传递,下游的接收模块InputGate会一直接收,直到把InputGate模块下的所有缓冲区占满,此时缓冲区无法接收数据,发过来的数据就会堵塞在netty底层socket缓冲区中,由于产生的数据无法发送,就又会把生产端的缓冲区占满到此,上游就会感知到来之下游的反压。

需要注意的:TCP的反压机制,有这么几个缺点
1.上游要感知到下游的反压,需要把上下游的数据缓冲区,socket缓冲区都占满之后才能知道,导致反压链路过长
2.由于socket网络连接是TM级别的,一旦被一个task占满之后,其它消费正常的task也会被阻塞

在这里插入图片描述
基于credit的翻页机制: 它主要是解决TCP反压带来的问题的,从Flink的1.5版本开始因引入,主要的思路如下图所示,它通过让接收模块的InputGate和发送模块的ResultPartition直接沟通感知下游此时的消费速度,来判断上游是否要触发反压机制。具体实现上来说,当ResultPartition模块准备好数据之后,会告知下游此次需要传输的数据量是多少个buffer(每个buffer32kb),Input Gate模块接收到会返回此时自己最多能接收多少个buffer,RP接收到反馈之后,就会按照要求给下游发数据。此时如果下游可接收的buffer数量为0,就说明下游的消费已经停止,上游就需要触发反压机制
在这里插入图片描述

4.3 网络内存优化

  • 缓存销胀机制: 根据当前消费数据的速率来自动的计算一个合理的缓冲区数量来保障Flink在做checkpoint时的效率和job运行整体的性能,通过配置:taskmanager.network.memory.buffer-debloat.enabled=true来开启,开启销胀机制是需要消耗job的资源的,所有如果吞吐不佳的时候,可以关闭它
  • 缓冲区数量设置建议: 如果吞吐不佳时关闭销账机制,需要手动调整网络缓冲区个数,可以从如下方面入手,缓冲区分为独占缓冲区和流动缓冲区,独占缓冲区是单独给task用的,如果不够程序会报错,流动缓冲区是为了处理数据倾斜,提供作业性能的,缓冲区的计算公式:number_of_buffers = expected_throughput * buffer_roundtrip / buffer_size,expected_throughput表示希望的吞吐量,比如320MB/s,buffer_roundtrip表示数据在节点之间的延时,一般默认1ms,buffer_size表示每个buffer的容量,默认是32kb,所有想要吞吐量达到320MB/s,需要的缓存区数量是number_of_buffers=320MB/s*1ms/32KB=10

5. Flink的反压优化

反压是指Flink程序下游的消费数据跟不上上游的数据产生速度,而触发的一种警告机制,它会直接给job带来如下的影响:

  • Flink性能下降: 由于下游的消费速度跟不上,直接迫使上游的数据来源端产生数据的速度变慢,从而导致Flink整体的吞吐量下降,性能下降
  • checkpoint时间变成或失败: 反压导致整个任务的流速变慢,这也将导致checkpoint barrier流经整个数据管道的数据变长,从而增加CK的总体时间,严重的可能会超时,如多次超时则会导致失败
  • 内存OOM: 在checkpoint的对齐机制中,反压会导致部分并发度的barrier到达时间更晚,这就意味这快的并发度在等待的过程中需要缓存大量的数据,这就有可能导致OOM的出现
  • 任务卡住: 在下游有窗口计算的时候,反压会导致watermark往下游流速变量,这会导致窗口迟迟不触发,就会出现卡顿现象

5.1 Flink反压问题的定位

  1. 禁用Flink任务算子链后再次运行任务
  2. 根据JobGrap定位出现反压的位置
  3. 结合webui task的执行情况定位具体的问题点
  4. 解决对应的问题
    一般先根据webui大致定位是哪个算子的问题,再webui上先找到没有出现反压的第一个算子,它往往是处于繁忙状态颜色是红色的,程序中如果出现性能问题,往往就是这个算子导致的,如下图所示,可以具体结合代码进行处理
    在这里插入图片描述
    结合webui执行情况定位具体问题,如下图所示,1号subtask接收的处理的数据几十倍与其他的subtask,这明显就是数据倾斜导致的问题。
    在这里插入图片描述
    此外我们还可以通过火焰图来查看Flink业务逻辑是否出现性能问题,它可以把Flink中出现的对象占用CPU的时长大小显示出来,我们可以根据业务情况,判断此对象是否正常
    在这里插入图片描述

5.2 Flink反压的原因及优化

  1. 资源设置不合理: 数据源生产数据速度很快,下游消费跟不上,此时容易产生反压。
    • 可以通过webui查看每个task内存使用情况,适当增大Flink任务资源及并行度来解决问题
  2. 突发性数据量激增: 数据流突增很可能会触发反压机制,如果频繁的激增的话,可以适当增加Flink任务的并行度来分摊
  3. 数据倾斜问题: 数据倾斜会导致其中一两个task处理很大量的数据,这就容易导致处理不过来触发反压,这种情况需要根据业务数据查找倾斜问题点,如:是因为聚合之后出现大量相同的key导致,可以使用多阶段聚合把对应的key打散来解决
  4. 代码执行效率问题: 这个问题可以通过查看火焰图的方式观察到(筛选出自己写的类,观看使用CPU的时长就能发现问题),一般在代码优化方面可以从,源头数据采用多并行方式,读取外部数据库使用异步IO的方式,提高算子并行度,避免打状态,checkpoint使用不对齐机制,多步骤分散业务避免在一个算子内实现复杂逻辑

6. 数据倾斜

数据倾斜往往是由于个别subtask处理绝大多数数据,在Flink中往往以反压的形式表现出来,然后我们可以通过查看webui里的subtask的指标,可以具体观察出哪些发生数据倾斜的subtask,然后继续向上游找,一直找到发生倾斜的源头,分析原因,具体解决

6.1 数据倾斜原因和处理方案

数据倾斜一般有两种原因,一中是数据源本身有数据倾斜问题,比如kafka的某个partition的数据量特别多,还有一种是因为进行了KeyBy操作后,导致的数据倾斜。

  • 数据本身倾斜: 这个可以通过修改Flink上下游的分区策略即可,Flink上下游之间并行度一致时,默认用forward的分区策略,forward分区策略会保持上下游一比一的数据流流转方式,这对于那些有复杂逻辑的subtask来说很容易就会出现反压等一些问题。可以通过shuffle()或是rebalance()算子来打散那些出现数据倾斜的分区
  • KeyBy导致数据倾斜: 这种倾斜,一般使用多阶段聚合的方式来处理,首先导致这种倾斜的原因是个别key的数据量特别大,通过KeyBy时这些数据都被分配到一个分区里,由一个subtask来处理,这就会导致反压甚至OOM等一些列问题。然后我们解决这个问题的思路就是把这些大数据量的key打散到多个分区中由多个subtask去处理,再在下游对这些subtask的结果再进行处理,这样就达到了最终目的。

    需要注意的点:Flink是实时数据流,在进行多阶段聚合的时候,不能简单的用聚合函数直接聚合,因为它每次会把聚合后的结果往下游传递,这样得到的结果是无效的甚至统计的结果会不准,比如用sum算子,它每次往下游传递的是sum之后的结果,等到再次聚合的时候算的是sum结果的结果,并不是原来数据的结果了

Flink中的多阶段聚合操作可以用“攒批”聚合之后再发往下游处理,或是用开窗的方式,把数据聚集在窗口中计算好再往下游发送

7. Tabla和SQL优化

7.1 使用Flilnk SQL的累计窗口

Flink中的滑动窗口和滚动窗口比较适合周期不长要求输出结果的场景,比如每10s统计当前的在线人数(滚动窗口),每10s统计近一个小时的用户在线人数(滑动窗口),如果要求统计一天内的在线人数,每20秒输出一次。这种需求,上面的两种窗口就不太合适,FlinkSQL提供一种累计窗口是专门解决这个问题的,具体实现如下

Table result = tableEnv.sqlQuery("select " +"sid,window_start,window_end,sum(duration) as sum_dur " +"from TABLE(" +"   CUMULATE(TABLE stationlog_tbl,DESCRIPTOR(time_ltz), INTERVAL '5' SECOND , INTERVAL '1' DAY)" +") " +"group by sid,window_start,window_end");

其CUMULATE是个表值函数,里面的TABLE是对应的表,descriptor指定表中的时间列,剩下的两个超参数分别指定窗口累积的步长和指定窗口的长度

7.2 利用MiniBatch优化聚合

默认情况下Flink是来一笔数据就聚合一次,聚合数据时会读取状态值,然后进行聚合操作,之后把结果又写回状态中,每笔数据都要进行这些操作,对整个job的StateBacken性能有一定的要求,比如RockDBBackend,MinniBatch会在算子的内存中先把数据先存起来,等达到一定条件后做触发处理,触发前会进行本地局部聚合,这样只在需要聚合这批数据的时候才去访问和写入一次状态数据。显然这种方式保证了吞吐量,但损失了实时性,这就需要开发者自己权衡了
在这里插入图片描述
具体实现如下

//3.开启minibatch
//通过flink configuration进行参数设置
TableConfig configuration = tableEnv.getConfig();
//开启MiniBatch 优化,默认false
configuration.set("table.exec.mini-batch.enabled", "true");
//设置5秒时间处理缓冲数据,默认0s
configuration.set("table.exec.mini-batch.allow-latency", "5 s");
//设置每个聚合操作可以缓冲的最大记录数,默认-1,开启MiniBatch后必须设置为正值
configuration.set("table.exec.mini-batch.size", "5000");

7.3 使用Local-Global优化聚合

Local-global操作类似于MapReduce中的Combine操作,在聚合之前会在上游的算子中先进行本地局部聚合,把聚合之后的结果往下游发送,这样既减少了Shuffle操作时网络拉取的数据量,也较少了对状态后端的操作次数,当然了这个优化方式是依赖上面提到的MiniBatch优化的,要先开启MiniBatch操作才行
在这里插入图片描述
具体实现如下

//2.创建TableEnv
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//3.开启Local-Global 聚合
//通过flink configuration进行参数设置
TableConfig configuration = tableEnv.getConfig();
//开启MiniBatch 优化,默认false
configuration.set("table.exec.mini-batch.enabled", "true");
//设置5秒时间处理缓冲数据,默认0s
configuration.set("table.exec.mini-batch.allow-latency", "5 s");
//设置每个聚合操作可以缓冲的最大记录数,默认-1,开启MiniBatch后必须设置为正值
configuration.set("table.exec.mini-batch.size", "5000");
//设置Local-Global 聚合
configuration.set("table.optimizer.agg-phase-strategy", "TWO_PHASE");

Local-Global能有效的解决常规的数据倾斜问题

7.4 拆分distinct聚合

distinct是针对去重的聚合操作,如果distinct key的值分布稀疏且数据倾斜,用Local-Global的优化方式性能提升并不明显,因为每个并行可能仍然包含几乎所有的原始记录,并且全局聚合将成为瓶颈。这种情况就可以使用distinct聚合来优化,其原理如下:

它会把聚合分为两个阶段,第一个阶段由bucket key组成的key进行group by操作,bucket key是使用 hash_code(key)%bucket_num计算所得,相当于是给数据加随机前缀进行聚合。第二阶段是由原始的group key进行Shuffle,并使用sum聚合来自不同的buckets的值,相当于把sql

-- 原始sql
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day-- 语句改写成如下形式
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)GROUP BY day

7.5 用Filter替代Case when操作

如下的sql是统计总的UV,android和iPhone的UV,wep和other的UV,在Flink中三个查询指标都是针对user_id进行去重统计,每个查询指标都会维护一个状态实例,这样会导致Flink维护状态实例过大

SELECTday,COUNT(DISTINCT user_id) AS total_uv,COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day

如果修改成用Filter的方式,Flink Sql的优化器可以识别到相distinct key上不同过滤器参数,如上三个count distinct 都在user_id一列上,Flink可以只使用一个共享状态实例,而不是三个状态实例

SELECTday,COUNT(DISTINCT user_id) AS total_uv,COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day

可以在webui中对比修改前后checkpoint的大小,可以明显的看到filter的方式会少很多

相关文章:

9. Flink的性能优化

1. Flink的资源和代码优化 1.1 slot资源配置 Flink中具体跑任务的进程叫TaskManager&#xff0c;TM进程又会根据配置划分出诺干个TaskSlot&#xff0c;它是具体运行SubTask的地方。slot是Flink用来隔离各个subtask的资源集合&#xff0c;这里的资源一把指内存&#xff0c;TCP…...

DeepSeek安装部署笔记(二)

Bat批处理文件的编写 第五步 启动openWebUI的批处理编写1、下面的代码&#xff0c;复制到文本文件&#xff0c;再改扩展名2、这样&#xff0c;在桌面直接双击此文件运行 第五步 启动openWebUI的批处理编写 1、下面的代码&#xff0c;复制到文本文件&#xff0c;再改扩展名 ec…...

【谷粒商城踩坑记】第二坑 renren-fast-vue的node-sass问题

第二坑 renren-fast-vue的node-sass问题 前端开始&#xff0c;第一关就是源码运行&#xff0c;直接报错以下内容&#xff1a; node-sass6.0.1 postinstall: node scripts/build.js 这个在老的前端项目中&#xff0c;特别是使用了sass的项目中经常会出现。 其实当时没有记录下具…...

【Linux-网络】HTTP的清风与HTTPS的密语

&#x1f3ac; 个人主页&#xff1a;谁在夜里看海. &#x1f4d6; 个人专栏&#xff1a;《C系列》《Linux系列》《算法系列》 ⛰️ 道阻且长&#xff0c;行则将至 目录 &#x1f4da; 引言 &#x1f4da; 一、HTTP &#x1f4d6; 1.概述 &#x1f4d6; 2.URL &#x1f5…...

【SpringBoot】数据访问技术spring Data、 JDBC、MyBatis、JSR-303校验

Spring Boot 数据访问技术及特性 目录标题 Spring Boot 数据访问技术及特性摘要1. 引言2. Spring Data架构与原理2.1 Spring Data概述2.2 Spring Data核心组件2.3 Spring Boot与Spring Data的集成机制 3. Spring Boot与JDBC的整合3.1 JDBC整合流程3.2 数据源自动配置3.3 JdbcTe…...

直装永久授权,最新专业版集成VB7

无论是学生、教师还是职场工作人员&#xff0c;办公软件在日常工作和学习中都是不可或缺的重要工具。在众多办公软件中&#xff0c;微软的Microsoft Office和金山软件的WPS Office最常用的选择。对于许多使用要求不高的小伙伴而言&#xff0c;WPS Office因其易用性和免费版本的…...

Ollama 框架本地部署教程:开源定制,为AI 项目打造专属解决方案!

Ollama 是一款开源的本地大语言模型&#xff08;LLM&#xff09;运行框架&#xff0c;用于管理和运行语言模型。具有以下核心特点&#xff1a; 开源可定制&#xff1a;采用 MIT 开源协议&#xff0c;开发者能自由使用、阅读源码并定制&#xff0c;可根据自身需求进行功能扩展和…...

【单片机项目】电源如何扩展、电源模块、电池模块如何接线

一、前言 解决2个关键问题&#xff1a; 【1】如果项目编号小于172之前的项目。 可能会遇到电源模块不够接&#xff0c;需要扩展电源的问题。 【2】如果项目编号是大于 172之后项目&#xff0c;部分项目用到了稳压电源模块或者是电池模块。 这篇文章单独讲解一下如何接线。 …...

汽车智能钥匙中PKE低频天线的作用

PKE&#xff08;Passive Keyless Entry&#xff09;即被动式无钥匙进入系统&#xff0c;汽车智能钥匙中PKE低频天线在现代汽车的智能功能和安全保障方面发挥着关键作用&#xff0c;以下是其具体作用&#xff1a; 信号交互与身份认证 低频信号接收&#xff1a;当车主靠近车辆时…...

mongodb安装教程以及mongodb的使用

MongoDB是由C语言编写的一种面向文档的NoSQL数据库&#xff0c;旨在为WEB应用提供可扩展的高性能数据存储解决方案。与传统的关系型数据库&#xff08;如 MySQL 或 PostgreSQL&#xff09;不同&#xff0c;MongoDB 存储数据的方式是以 BSON&#xff08;类似于 JSON 的二进制格式…...

SpringMVC学习(controller层加载控制与(业务、功能)bean加载控制、Web容器初始化配置类)(3)

目录 一、SpringMVC、Spring的bean加载控制。 &#xff08;1&#xff09;实际开发的包结构层次。 &#xff08;2&#xff09;如何"精准"控制两个容器分别加载各自bean。(分析) <1>SpringMVC相关bean加载控制。(方法) <2>Spring相关bean加载控制。(方法) …...

redis基础结构

title: redis基础结构 date: 2025-03-04 08:39:12 tags: redis categories: redis笔记 Redis入门 &#xff08;NoSQL, Not Only SQL&#xff09; 非关系型数据库 关系型数据库&#xff1a;以 表格 的形式存在&#xff0c;以 行和列 的形式存取数据&#xff0c;一系列的行和列被…...

树莓派学习(一)——3B+环境配置与多用户管理及编程实践

树莓派学习&#xff08;一&#xff09;——3B环境配置与多用户管理及编程实践 一、实验目的 掌握树莓派3B无显示器安装与配置方法。学习Linux系统下多用户账号的创建与管理。熟悉在树莓派上使用C语言和Python3编写简单程序的方法。 二、实验环境 硬件设备&#xff1a;树莓派…...

【音视频】ffplay播放控制

一、ffplay播放控制 1.1、ffplay打开视频 比如我当前目录下现在有一个1.mp4的视频&#xff0c;可以使用下面的命令用ffplay打开并播放它 ffplay 1.mp4输入后回车即可打开相应的视频 1.2 ffplay播放控制 使用q、ESC退出播放按f、双击切换全屏状态按m切换为静音按9减少音量&a…...

【智能体Agent】ReAct智能体的实现思路和关键技术

基于ReAct&#xff08;Reasoning Acting&#xff09;框架的自主智能体 import re from typing import List, Tuplefrom langchain_community.chat_message_histories.in_memory import ChatMessageHistory from langchain_core.language_models.chat_models import BaseChatM…...

Redis系列之慢查询分析与调优

Redis 慢查询分析与优化&#xff1a;提升性能的实战指南 Redis 作为一款高性能的内存数据库&#xff0c;因其快速的数据读写能力和灵活的数据结构&#xff0c;被广泛应用于缓存、消息队列、排行榜等多种业务场景。然而&#xff0c;随着业务规模的扩大和数据量的增加&#xff0…...

分布式锁—2.Redisson的可重入锁一

大纲 1.Redisson可重入锁RedissonLock概述 2.可重入锁源码之创建RedissonClient实例 3.可重入锁源码之lua脚本加锁逻辑 4.可重入锁源码之WatchDog维持加锁逻辑 5.可重入锁源码之可重入加锁逻辑 6.可重入锁源码之锁的互斥阻塞逻辑 7.可重入锁源码之释放锁逻辑 8.可重入锁…...

大模型巅峰对决:DeepSeek vs GPT-4/Claude/PaLM-2 全面对比与核心差异揭秘

文章目录 一、架构设计深度解剖1.1 核心架构对比图谱1.2 动态MoE架构实现架构差异分析表 二、训练策略全面对比2.1 训练数据工程对比2.2 分布式训练代码对比DeepSeek混合并行实现GPT-4 Megatron实现对比 2.3 关键训练参数对比 三、性能表现多维评测3.1 基准测试全景对比3.2 推理…...

解决各大浏览器中http地址无权限调用麦克风摄像头问题(包括谷歌,Edge,360,火狐)后续会陆续补充

项目场景&#xff1a; 在各大浏览器中http地址调用电脑麦克风摄像头会没有权限&#xff0c;http协议无法使用多媒体设备 原因分析&#xff1a; 为了用户的隐私安全&#xff0c;http协议无法使用多媒体设备。因为像摄像头和麦克风属于可能涉及重大隐私问题的API&#xff0c;ge…...

Linux - 网络套接字

一、网络编程 1&#xff09;地址结构 1. IP地址结构 struct in_addr&#xff1a;是用于表示 IPv4 地址 的结构体&#xff0c;定义在头文件 <netinet/in.h> 中。它的主要作用是存储一个 32 位的 IPv4 地址&#xff0c;通常与 struct sockaddr_in 一起使用。 struct in_a…...

Oracle数据库监听学习

官方文档&#xff1a; Net Services Administrators Guide Net Services Reference 一、动态注册 1.实例启动后&#xff0c;LREG 进程每分钟自动将服务名&#xff08;service_name&#xff09;注册到监听器中 也可以通过 alter system register 命令实现立刻注册。&#x…...

利率债、信用债、可转债区别与优势

利率债、信用债、城投债和可转债是债券市场的主要品种&#xff0c;它们在发行主体、风险收益特征和投资优势上各有不同。以下是它们的区别和优势&#xff1a; 1. 利率债 定义&#xff1a;利率债是由政府或政府支持的机构发行的债券&#xff0c;主要包括国债、政策性金融债&…...

C语言番外篇(4)------------------>VS环境下源码的隐藏

假设你是一个优秀的程序员&#xff0c;开发了一款功能十分强大的计算器。现在有一家做计算器的公司看上了你的功能&#xff0c;想通过每一年给你几万块钱使用这个功能。那我们是只提供一个头文件和静态库给他们使用这个功能就行呢&#xff1f;还是连同源代码一起给这家公司呢&a…...

Java集合

写在前面 本人在学习JUC过程中学习到集合和并发时有许多稀碎知识点 需要总结梳理思路与知识点 本文内容会涉及到ArrayList&#xff0c;HashMap以及扩容机制&#xff0c;ConcurrentHashMap&#xff0c;Synchronized&#xff0c;Volatile&#xff0c;ReentrantLock&#xff0c;…...

el-input 设置类型为number时,输入中文后光标会上移,并且会出现上下箭头

光标上移 设置 el-input 的 typenumber后&#xff0c;只能输入数字&#xff0c;输入中文后会自动清空&#xff0c;但是会出现一个问题&#xff1a;【光标会上移&#xff0c;如下图】 解决方法&#xff1a;修改样式 注意&#xff1a;需要使用样式穿透 :deep( ) /*解决el-in…...

迷你世界脚本自定义UI接口:Customui

自定义UI接口&#xff1a;Customui 彼得兔 更新时间: 2024-11-07 15:12:42 具体函数名及描述如下:&#xff08;除前两个&#xff0c;其余的目前只能在UI编辑器内部的脚本使用&#xff09; 序号 函数名 函数描述 1 openUIView(...) 打开一个UI界面&#xff08;注意…...

解决windows npm无法下载electron包的问题

1.将nsis.zip解压到C:\Users\XXX\AppData\Local\electron-builder\Cache 2.将winCodeSign.zip解压到C:\Users\XXX\AppData\Local\electron-builder\Cache 3.将electron-v20.3.8-win32-ia32.zip复制到C:\Users\XXX\AppData\Local\electron\Cache 4.将electron-v20.3.8-win32-…...

Notepad++ 8.6.7 安装与配置全攻略(Windows平台)

一、软件定位与核心优势 Notepad 是开源免费的代码/文本编辑器&#xff0c;支持超过80种编程语言的高亮显示&#xff0c;相比系统自带记事本具有以下优势&#xff1a; 轻量高效&#xff1a;启动速度比同类软件快30%插件扩展&#xff1a;支持NppExec、JSON Viewer等200插件跨文…...

Unity InputField + ScrollRect实现微信聊天输入框功能

1、实现动态高度尺寸的的InputField 通过这两个部件就可以实现inputField的动态改变尺寸。 将inputField放入到scrollview当中作为子类 将scrollview 链接到UIChatInputField脚本中。 2、实现UIChatInputField //聊天输入框&#xff08;类似wechat&#xff09; [RequireComp…...

Java-servlet(三)Java-servlet-Web环境搭建(下)详细讲解利用maven和tomcat搭建Java-servlet环境

Java-servlet&#xff08;三&#xff09;Java-servlet-Web环境搭建&#xff08;下&#xff09;利用maven和tomcat搭建Java-servlet环境 前言一、配置maven阿里镜像二、利用IDEA创建maven文件创建maven文件删除src文件创建新的src模版删除example以及org文件 三、在第二个xml文件…...

在 CLion 中使用 Boost.Test 进行 C++ 单元测试

1. 安装 Boost.Test Boost.Test 是 Boost C 库的一部分&#xff0c;因此需要安装完整的 Boost 库。 方法 1&#xff1a;使用包管理器安装&#xff08;推荐&#xff09; Windows&#xff08;vcpkg&#xff09; 直接使用 CLion 集成的 vcpkg安装 boost-test&#xff1a; 也可…...

极狐GitLab 17.9 正式发布,40+ DevSecOps 重点功能解读【二】

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署极狐GitLab。 学习极狐GitLab 的相关资料&#xff1a; 极狐GitLab 官网极狐…...

文本处理Bert面试内容整理-BERT的预训练任务是什么?

BERT的预训练任务主要有两个,分别是 Masked Language Model (MLM) 和 Next Sentence Prediction (NSP)。这两个任务帮助BERT学习从大规模未标注文本中提取深层次的语义和上下文信息。 1. Masked Language Model (MLM)(掩码语言模型)...

【蓝桥杯】每天一题,理解逻辑(3/90)【Leetcode 快乐数】

闲话系列&#xff1a;每日一题&#xff0c;秃头有我&#xff0c;Hello&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;,我是IF‘Maxue&#xff0c;欢迎大佬们来参观我写的蓝桥杯系列&#xff0c;我好久没有更新博客了&#xff0c;因为up猪我寒假用自己的劳动换了…...

“深入浅出”系列之Linux篇:(10)基于C++实现分布式网络通信RPC框架

分布式网络通信rpc框架 项目是分布式网络通信rpc框架&#xff0c; 文中提到单机服务器的缺点&#xff1a; 硬件资源的限制影响并发&#xff1a;受限于硬件资源&#xff0c;聊天服务器承受的用户的并发有限 模块的编译部署难&#xff1a;任何模块小的修改&#xff0c;都导致整…...

Python的那些事第四十一篇:简化数据库交互的利器Django ORM

Django ORM:简化数据库交互的利器 摘要 随着互联网技术的飞速发展,Web开发越来越受到重视。Django作为一款流行的Python Web框架,以其高效、安全、可扩展等特点受到了广大开发者的喜爱。其中,Django ORM(对象关系映射)是Django框架的核心组件之一,它为开发者提供了一种…...

[自动驾驶-传感器融合] 多激光雷达的外参标定

文章目录 引言外参标定原理ICP匹配示例参考文献 引言 多激光雷达系统通常用于自动驾驶或机器人&#xff0c;每个雷达的位置和姿态不同&#xff0c;需要将它们的数据统一到同一个坐标系下。多激光雷达外参标定的核心目标是通过计算不同雷达坐标系之间的刚性变换关系&#xff08…...

初学STM32之简单认识IO口配置(学习笔记)

在使用51单片机的时候基本上不需要额外的配置IO&#xff0c;不过在使用特定的IO的时候需要额外的设计外围电路&#xff0c;比如PO口它是没有内置上拉电阻的。因此若想P0输出高电平&#xff0c;它就需要外接上拉电平。&#xff08;当然这不是说它输入不需要上拉电阻&#xff0c;…...

【长安大学】苹果手机/平板自动连接认证CHD-WIFI脚本(快捷指令)

背景&#xff1a; 已经用这个脚本的记得设置Wifi时候&#xff0c;关闭“自动登录” 前几天实在忍受不了CHD-WIFI动不动就断开&#xff0c;一天要重新连接&#xff0c;点登陆好几次。试了下在网上搜有没有CHD-WIFI的自动连接WIFI自动认证脚本&#xff0c;那样我就可以解放双手&…...

powermock,mock使用笔记

介于日本的形式主义junit4单体测试&#xff0c;特记笔记&#xff0c;以下纯用手机打出来&#xff0c;因为电脑禁止复制粘贴。 pom文件 powermock-module-junit1.7.4 powermock-api-mokcito 1.7.4 spring-test 8 1&#xff0c;测试类头部打注解 RunWith(PowerMockRunner.class…...

大模型微调实战指南

1. 引言 在人工智能领域&#xff0c;大模型&#xff08;如GPT、BERT、DeepSeek等&#xff09;已经展现出了强大的通用能力。然而&#xff0c;要让这些模型在特定任务或领域中发挥最佳性能&#xff0c;微调&#xff08;Fine-tuning&#xff09;是必不可少的一步。本文将带你从零…...

计算机毕业设计Python+Django+Vue3微博数据舆情分析平台 微博用户画像系统 微博舆情可视化(源码+ 文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…...

HTML第四节

一.复合选择器 1.后代选择器 注&#xff1a;1.后代选择器会选中后代所有的要选择的标签 2.儿子选择器 3.并集选择器 注&#xff1a;1.注意换行&#xff0c;同时选中多种标签 4.交集选择器 注&#xff1a;1.标签选择器放在最前面&#xff0c;例如放在类选择器的前面 2.两个选择…...

Kubernetes 的正式安装

1.基础的网络结构说明 软件路由器 ikuai 当然同一个仅主机模式 相当于在 同一个我们所谓的广播域内 所以相当于它们的几张网卡 是被连接起来的 为了防止出现问题 我们可以把第二块网卡临时关闭一下 2.准备路由器 ikuai 爱快 iKuai-商业场景网络解决方案提供商 (ikuai8.com)…...

VS2022C#windows窗体应用程序调用DeepSeek API

目录 一、创建DeepSeek API Key 二、创建窗体应用程序 三、设计窗体 1、控件拖放布局‌‌ 2、主窗体【Form1】设计 3、多行文本框【tbContent】 4、提交按钮【btnSubmit】 5、单行文字框 四、撰写程序 五、完整代码 六、运行效果 七、其它 一、创建DeepSeek API Ke…...

7. 机器人记录数据集(具身智能机器人套件)

1. 树莓派启动机器人 conda activate lerobotpython lerobot/scripts/control_robot.py \--robot.typelekiwi \--control.typeremote_robot2. huggingface平台配置 huggingface官网 注册登录申请token&#xff08;要有写权限&#xff09;安装客户端 # 安装 pip install -U …...

阿里云操作系统控制台——ECS操作与性能优化

引言&#xff1a;在数字化时代&#xff0c;云服务器作为强大的计算资源承载平台&#xff0c;为企业和开发者提供了灵活且高效的服务。本文将详细介绍如何一步步操作云服务器 ECS&#xff0c;从开通到组件安装&#xff0c;再到内存全景诊断&#xff0c;帮助快速上手&#xff0c;…...

在飞腾E2000Q开发板上,基于RT-Thread操作系统,实现DeepSeek语音交互

目录 一 &#xff0c;简介 二 &#xff0c;流程与结果分享 1. Phytium E2000q demo开发板连接 2. RT-Thread Kconfig 配置选择 &#xff08;1&#xff09;驱动 &#xff08;2&#xff09;软件包 3. 主要代码 &#xff08;1&#xff09;录音功能&#xff0c;将录音结果保存…...

navicat导出postgresql的数据库结构、字段名、备注等等

1、执行sql语句 SELECT A.attnum AS "序号",C.relname AS "表名",CAST ( obj_description ( relfilenode, pg_class ) AS VARCHAR ) AS "表名描述",A.attname AS "字段名称",A.attnotnull as "是否不为null",(case when A…...

K8s 1.27.1 实战系列(三)安装网络插件

Kubernetes 的网络插件常见的有 Flannel 和 Calico ,这是两种主流的 CNI(容器网络接口)解决方案,它们在设计理念、实现方式、性能特征及适用场景上有显著差异。以下是两者的综合对比分析: 一、Flannel 和 Calico 1. 技术基础与网络实现 Flannel 核心机制:基于 Overlay …...