Pulsar客户端如何控制内存使用
Pulsar客户端如何控制内存使用
一、使用场景
在实际应用中,Pulsar客户端的内存使用控制是一个重要的性能优化点。假设有一个搜索类业务需要记录用户搜索请求,以便后续分析搜索热点和优化搜索效果。以下是一个简化的代码示例:
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("search-activities").create();
try {MessageId messageId = producer.send(/* message payload here */);log.debug("Search activity messageId={}", messageId);
} catch (Exception e) {log.error("Failed to record search activity", e);
}
在这个场景中,pulsarClient
和 producer
支持复用,推荐这么做,这里只是为了演示写到了一起。producer.send
是阻塞方式发送消息,线程会卡在这里等待发送结果返回。在现实中,根据消息在实际业务中的需要,可以选择阻塞和非阻塞两种方式。例如,业务上对搜索请求事件并无强依赖,因此使用阻塞方式发消息不太适合,从性能上考虑会加长整体的搜索延迟,从稳定性上考虑会增加搜索执行过程中的不确定性。因此,可以优化为非阻塞方式,将记录搜索事件放到其他线程中完成:
producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {if (ex != null) {log.error("Failed to record search activity", ex);} else {log.debug("Search activity messageId={}", msgId);}
});
在高TPS(例如单实例超过1000QPS)和大消息内容(例如100KB甚至1MB)的情况下,上述代码可能会遇到 MemoryBufferIsFullError
异常:
org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full
此外,如果服务与Pulsar的broker之间出现网络波动,或者Pulsar服务内部组件之间出现网络波动,导致整体producer写入延迟升高,亦或是短时间出现大量写入,还可能会遇到 ProducerQueueIsFullError
异常:
org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
二、Producer的内存控制
1. 配置项分析
在构建Producer时,ProducerBuilder
中与内存使用有关的配置项包括:
maxPendingMessages(int maxPendingMessages)
:控制producer内部队列中正在发送但还没有接收到broker确认的消息数量。若队列大小超出这个限制,默认行为是抛出ProducerQueueIsFullError
异常。可以通过设置blockIfQueueFull=true
调整为阻塞等待队列中空出新的空间。maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions)
:控制整个topic所有分区的总pending消息数量。最终到各个分区内部producer取maxPendingMessages
和maxPendingMessagesAcrossPartitions / partitions
的较小值。
2. 内存限制配置
在现实应用场景中,不同业务的消息大小差异很大,单纯基于消息数量控制内存使用是不切实际的。因此,在 PIP-74 中,ClientBuilder
提供了一个面向整个client实例统一的内存限制配置:
ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);
当客户端所有producer中所有pending的消息大小总和超过这个限制时,默认会抛出 MemoryBufferIsFullError
异常。若同时配置了 blockIfQueueFull=true
,则当前线程会阻塞等待前面pending的消息发送完成。
3. blockIfQueueFull
配置的使用
blockIfQueueFull
配置是为了限制客户端producer内存使用的同时,让开发者简化处理队列或者内存buffer满了的情况可以继续发送消息。然而,一旦配置为 true
,不论是应用发送消息调用的是阻塞的 Producer.send
方法还是非阻塞的 Producer.sendAsync
方法都会出现阻塞等待,这可能会阻塞当前线程,对于某些业务场景是不可接受的。
4. 默认配置
PIP-120 对 2.10.0
以及之后版本的客户端中,默认启用了 memoryLimit
配置,其默认值为 64MB
,同时默认禁用了 maxPendingMessages
和 maxPendingMessagesAcrossPartitions
配置(默认值修改为0),并将 maxPendingMessagesAcrossPartitions
配置标记为 Deprecated
。
三、Consumer的内存控制
1. 配置项分析
在构造一个Consumer时,ConsumerBuilder
提供的与内存使用有关的选项包括:
receiverQueueSize(int receiverQueueSize)
:控制每个分区consumer的接收队列大小。maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions)
:控制所有分区consumer和parent consumer的接收队列总大小。
Pulsar客户端通过预接收队列临时存放broker推送过来的消息,以便应用程序调用 Consumer#receive
或者 Consumer#receiveAsync
方法时直接从内存中返回消息,这是出于消费吞吐的考虑,本质上是一种以空间换取时间的策略。
2. 自动扩展接收队列
在 PIP-74 中提出了一个新的控制Consumer内存使用的方案,即 autoScaledReceiverQueueSizeEnabled
:
ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);
当启用这个特性后,receiverQueueSize
会从1开始呈2的指数倍增长,直至达到 receiverQueueSize
的限制或达到client的 memoryLimit
限制,其目标是在有限制的内存使用下,达到最大的吞吐效率。
四、番外:ackTimeout 和 ackTimeoutTickTime 的配置
除了Producer和Consumer在生产和消费过程中的内存使用之外,还有一个容易被忽视的点是创建Consumer时 ackTimeout
和 ackTimeoutTickTime
的配置如果不匹配,会消耗较多堆内内存。
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);
若Consumer配置了 ackTimeout
并且配置了较大的时间窗口(例如1小时或者更长),应适当调大 ackTimeoutTickTime
,这是因为Consumer内部使用了一个简单时间轮的算法对消息的处理时间计时,若 ackTimeout
时间窗口很大,ackTimeoutTickTime
仍然使用其默认值 1s
,时间轮本身将会占用大量堆内存空间。具体细节可参考客户端源码 UnAckedMessageTracker.java
。
五、总结
- 使用
sendAsync
非阻塞方法要注意其不能保证消息一定发送成功,特别是开启了blockIfQueueFull
之后,它会在特定情况下演变成阻塞方法。 - 对于同时使用到了Producer和Consumer的应用,推荐创建两个client,分别用来创建Producer和Consumer,做读写分离,避免由于共用
memoryLimit
导致相互影响
相关文章:
Pulsar客户端如何控制内存使用
Pulsar客户端如何控制内存使用 一、使用场景 在实际应用中,Pulsar客户端的内存使用控制是一个重要的性能优化点。假设有一个搜索类业务需要记录用户搜索请求,以便后续分析搜索热点和优化搜索效果。以下是一个简化的代码示例: PulsarClient…...
接口测试总结(http与rpc)
🍅 点击文末小卡片,免费获取软件测试全套资料,资料在手,涨薪更快 接口测试是测试系统组件间接口的一种测试。接口测试主要用于检测外部系统与系统之间以及内部各个子系统之间的交互点。测试的重点是要检查数据的交换,传…...
Linux:进程概念(二.查看进程、父进程与子进程、进程状态详解)
目录 1. 查看进程 1.1 准备工作 1.2 指令:ps—显示当前系统中运行的进程信息 1.3 查看进程属性 1.4 通过 /proc 系统文件夹看进程 2. 父进程与子进程 2.1 介绍 2.2 getpid() \getppid() 2.3 fork()函数—通过系统调用创建进程 fork()函数疑问 3. 进程状态…...
ubuntu22.04 编译安装libvirt 10.x
环境安装 sudo apt-get update -y sudo apt-get install qemu-system-x86 bridge-utils libyajl-dev -y sudo apt-get install build-essential autoconf automake libtool -y sudo apt-get install libxml2-dev libxslt1-dev libgnutls28-dev libpciaccess-dev libnl-3-de…...
Ubuntu 下载安装 Consul1.17.1
下载 来到 Consul 的下载页面:https://developer.hashicorp.com/consul/install?product_intentconsul 上面标注的地方可以切换你想要的版本,复制下载链接,使用 wget 下载这个文件: wget https://releases.hashicorp.com/consu…...
怎么实现Redis的高可用?
大家好,我是锋哥。今天分享关于【怎么实现Redis的高可用?】面试题。希望对大家有帮助; 怎么实现Redis的高可用? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 为了实现 Redis 的高可用性,我们需要保证在发…...
Domain Adaptation(李宏毅)机器学习 2023 Spring HW11 (Boss Baseline)
1. 领域适配简介 领域适配是一种迁移学习方法,适用于源领域和目标领域数据分布不同但学习任务相同的情况。具体而言,我们在源领域(通常有大量标注数据)训练一个模型,并希望将其应用于目标领域(通常只有少量或没有标注数据)。然而,由于这两个领域的数据分布不同,模型在…...
Chatper 4: mplementing a GPT model from Scratch To Generate Text
4 Implementing a GPT model from Scratch To Generate Text 本章节包含 编写一个类似于GPT的大型语言模型(LLM),这个模型可以被训练来生成类似人类的文本。Normalizing layer activations to stabilize neural network training在深度神经网…...
websocket股票行情接口
股票行情区别 交易所出来的数据,不管通过什么渠道,延时一般都不会差太远,估计一般也就几十ms的差别。 但是如果是通过http轮询,不太可能几十ms全部轮询一次。所以,做量化的话,用http协议是最次的选择。 …...
一键部署Netdata系统无需公网IP轻松实现本地服务器的可视化监控
文章目录 前言1.关于Netdata2.本地部署Netdata3.使用Netdata4.cpolar内网穿透工具安装5.创建远程连接公网地址6.固定Netdata公网地址 💡 推荐 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。…...
概率图模型01
机器学习中,线性回归、树、集成和概率图都属于典型的统计学习方法,概率图模型会更深入地体现出‘统计’两字 概率图模型的常见算法 概率图模型中的图 概率图模型如图主要分为两种,即贝叶斯网络和马尔可夫网络,有向图与无向图&…...
oxml中创建CT_Document类
概述 本文基于python-docx源码,详细记录CT_Document类创建的过程,以此来加深对Python中元类、以及CT_Document元素类的认识。 元类简介 元类(MetaClass)是Python中的高级特性。元类是什么呢?Python是面向对象编程…...
YARN 集群
一、集群角色 1.1 概述 Apache Hadoop YARN是一个标准的Master/Slave集群(主从架构)。其中ResourceManager(RM) 为Master, NodeManager(NM) 为 Slave。常见的是一主多从集群,也可以…...
电机控制的数字化升级:基于DSP和FPGA的仿真与实现
数字信号处理器(DSP,Digital Signal Processor)在工业自动化领域的应用日益广泛。DSP是一种专门用于将模拟信号转换成数字信号并进行处理的技术,能够实现信号的数字滤波、重构、调制和解调等多项功能,确保信号处理的精…...
homework 2025.01.11 math 6
homework 2025.01.11 math 6 小学6年级数学...
【会话详解】
会话详解 概述 会话: 用户通过浏览器访问多个Web资源的过程,从打开浏览器开始访问特定网站,直到关闭浏览器的过程称为会话(Session)。会话管理是Web应用中跟踪和存储用户状态的重要机制。 有状态会话: …...
Unity 的 Vector3 与 Babylon.js 的 Vector3:使用上的异同
在 3D 开发中,向量是不可或缺的数学工具,用于表示位置、方向、速度等物理量。Unity 和 Babylon.js 都提供了 Vector3 类来处理三维向量,但它们在实现和使用上有一些异同。本文将详细对比 Unity 的 Vector3 和 Babylon.js 的 Vector…...
【2024年华为OD机试】(A卷,100分)- 单词倒序(Java JS PythonC/C++)
一、问题描述 题目描述 输入单行英文句子,里面包含英文字母,空格以及,.?三种标点符号,请将句子内每个单词进行倒序,并输出倒序后的语句。 输入描述 输入字符串S,S的长度 1 ≤ N ≤ 100 输出描述 输出倒序后的字…...
芯片:CPU和GPU有什么区别?
CPU(中央处理器)和GPU(图形处理单元)是计算机系统中两种非常重要的处理器,它们各自有不同的设计理念、架构特点以及应用领域。下面是它们之间的一些主要差异: 1. 设计目的与应用领域 CPU:设计…...
springboot整合mysql
1.首先在pom.xml中添加依赖: <!-- MySQL Driver --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- Druid连接池 -->…...
复合机器人助力手机壳cnc加工向自动化升级
在当今竞争激烈的制造业领域,如何提高生产效率、降低成本、提升产品质量,成为众多企业面临的关键挑战。尤其是在手机壳 CNC 加工这一细分行业,随着市场需求的持续增长,对生产效能的要求愈发严苛。而复合机器人的出现,正…...
深入浅出负载均衡:理解其原理并选择最适合你的实现方式
负载均衡是一种在多个计算资源(如服务器、CPU核心、网络链接等)之间分配工作负载的技术,旨在优化资源利用率、提高系统吞吐量和降低响应时间。负载均衡的实现方式多种多样,以下是几种常见的实现方式: 1. 硬件负载均衡&…...
征程 6X release版本内核模块安全加载
1.概述 征程 6X 系统在 release 编译时支持内核模块签名验证,仅加载使用正确密钥进行数字签名的内核模块。禁止加载未签名的内核模块或使用错误密钥签名的内核模块,客户需要替换成自己的 key 进行签名。 模块签名启用后,Linux 内核将仅加载…...
uni-app的学习
uni-app 有着跨平台支持、丰富的插件和生态系统、高性能、集成开发工具HBuilderX的配合使用。允许使用者仅通过一套代码发布到多平台使用。 uni-app官网 uni-app 是一个适合开发跨平台移动应用和小程序的框架,能够大幅提高开发效率。 一、了解 1.1 工具准备 从Git…...
国产信创实践(国能磐石服务器操作系统CEOS +东方通TongHttpServer)
替换介绍: 国能磐石服务器操作系统CEOS 对标 Linux 服务器操作系统(Ubuntu, CentOS) 东方通TongHttpServer 对标 Nginx 负载均衡Web服务器 第一步: 服务器安装CEOS映像文件,可直接安装,本文采用使用VMware …...
前端实时显示当前在线人数的实现
实时显示当前在线人数的实现 本文档提供了在网页上实时显示当前在线人数的多种实现方法,包括使用 WebSocket 实现实时更新和轮询方式实现非实时更新。 方法一:使用 WebSocket 实现实时更新 服务器端设置 通过 Node.js 和 WebSocket 库(如 …...
为AI聊天工具添加一个知识系统 之27 支持边缘计算设备的资源存储库及管理器
本文问题 现在我们回到 ONE/TWO/TREE 的资源存储库 的设计--用来指导 足以 支持 本项目(为AI聊天工具增加一套知识系统)的 核心能力 “语言处理” 中 最高难度系数的“自然语言处理” 中最具挑战性的“含糊性” 问题的解决。--因为足以解决 自然语言中最…...
继续坚持与共勉
经过期末考试后,又要开始学习啦。 当时一直在刷算法题就很少写博客了,现在要继续坚持写博客,将每天对于题的感悟记录下来。 同时我将会在学习Linux操作系统,对于过去学习的内容进行回顾!! 在此ÿ…...
PHP的扩展Imagick的安装
windows下的安装 下载:Imagick扩展 PECL :: Package :: imagick 3.7.0 for Windows 下载:ghostscript(PDF提取图片时用到,不处理PDF可以不安装) Ghostscript : Downloads 安装扩展 Imagick解压后&…...
【2024年华为OD机试】 (A卷,100分)- 租车骑绿岛(Java JS PythonC/C++)
一、问题描述 题目描述 部门组织绿岛骑行团建活动。租用公共双人自行车,每辆自行车最多坐两人,最大载重 M。 给出部门每个人的体重,请问最多需要租用多少双人自行车。 输入描述 第一行两个数字 m、n,分别代表自行车限重&#…...
Solidity入门: 函数
函数 Solidity语言的函数非常灵活,可以进行各种复杂操作。在本教程中,我们将会概述函数的基础概念,并通过一些示例演示如何使用函数。 我们先看一下 Solidity 中函数的形式: function <function name>(<parameter types>) {in…...
1、docker概念和基本使用命令
docker概念 微服务:不再是以完整的物理机为基础的服务软件,而是借助于宿主机的性能。以小量的形式,单独部署的应用。 docker:是一个开源的应用容器引擎,基于go语言开发的,使用时apache2.0的协议。docker是…...
【Python】深入Python元类:动态生成类与对象的艺术
在Python中,元类(Metaclass)是一个强大且高级的特性,允许开发者在类创建时控制其行为与属性。通过元类,开发者可以动态生成类和对象,实现自定义的类行为,进而增强代码的灵活性和可扩展性。本文将…...
数字孪生可视化在各个行业的应用场景
数字孪生技术,作为新一代信息技术的集大成者,正在深刻改变着我们对物理世界的认知和管理方式。本文将探讨数字孪生可视化在不同行业的应用场景,以及它们如何赋能行业数字化转型。 1. 智慧城市与交通 在智慧城市领域,数字孪生技术…...
CES Asia 2025科技盛宴,AI智能体成焦点
2025第七届亚洲消费电子技术展(CES Asia赛逸展)将在北京拉开帷幕,AI智能体有望成为展会的核心亮点。 深圳市人工智能行业协会发文表示全力支持CES Asia 2025(赛逸展),称其为人工智能领域的创新发展提供了强…...
【第04阶段-机器学习深度学习篇-1-深度学习基础-深度学习介绍】
1 深度学习概念 深度学习是基于机器学习延伸出来的一个新的领域,由以人大脑结构为启发的神经网络算法为起源加之模型结构深度的增加发展,并伴随大数据和计算能力的提高而产生的一系列新的算法。 2 深度学习发展 其概念由著名科学家Geoffrey Hinton等人…...
android framework.jar 在应用中使用
在开发APP中,有时会使用系统提供的framework.jar 来替代 android.jar, 在gradle中配置如下: 放置framework.jar 依赖配置 3 优先级配置 gradle.projectsEvaluated {tasks.withType(JavaCompile) {Set<File> fileSet options.bootstrapClasspat…...
带格式 pdf 翻译
支持 openAI 接口,国内 deepseek 接口兼容 openAI 接口, deepseek api 又非常便宜 https://pdf2zh.com/ https://github.com/Byaidu/PDFMathTranslate...
Flutter项目适配鸿蒙
Flutter项目适配鸿蒙 前言Flutter项目适配鸿蒙新工程直接支持ohos构建新项目编译运行 适配已有的Flutter项目 前言 目前市面上使用Flutter技术站的app不在少数,对于Flutter的项目,可能更多的是想直接兼容Harmonyos,而不是直接在重新开发一个…...
轻量自高斯注意力机制LSGAttention模型详解及代码复现
模型背景 近年来,卷积神经网络(CNN)在高光谱图像分类领域取得了显著进展。然而,CNN面临 长距离关系建模 和 计算成本 增加的挑战。为解决这些问题,研究人员提出了基于 轻量自高斯注意(Light Self-Gaussian-Attention, LSGA) 机制的视觉转换器(Vision Transformer, VIT),旨…...
vue事件对象$event
事件参数可以获取event对象和通过事件传递数据 获取 event 对象 <template><h1>Hello world</h1><button click"addCount">Add</button><p>{{ count }}</p> </template> <script>export default{data(){ret…...
PyCharm文档管理
背景:使用PyCharmgit做文档管理 需求:需要PyCharm自动识别docx/xslx/vsdx等文件类型,并在PyCharm内点击文档时唤起系统内关联应用(如word、excel、visio) 设置步骤: 1、file -》 settings -》file types 2、在Files opened i…...
Windows下调试Dify相关组件(2)--后端Api
1.部署依赖的服务(代码最外层的docker目录) 1.1 将middleware.env.example复制,并改名为middleware.env。 1.2 查看docker-compose.middleware.yaml,有5个服务 db:postgres数据库。 redis:redis缓存。 sa…...
Flask----前后端不分离-登录
文章目录 扩展模块flask-wtf 的简单使用定义用户数据模型flask-login完成用户登录 扩展模块 flask-sqlalchmy,连接数据库flask-login,处理用户的登录,认证flask-session,会话保持,默认对用户数据加密,存储…...
Group3r:一款针对活动目录组策略安全的漏洞检测工具
关于Group3r Group3r是一款针对活动目录组策略安全的漏洞检测工具,可以帮助广大安全研究人员迅速枚举目标AD组策略中的相关配置,并识别其中的潜在安全威胁。 Group3r专为红蓝队研究人员和渗透测试人员设计,该工具可以通过将 LDAP 与域控制器…...
ElasticSearch 认识和安装ES
文章目录 一、为什么学ElasticSearch?1.ElasticSearch 简介2.ElasticSearch 与传统数据库的对比3.ElasticSearch 应用场景4.ElasticSearch 技术特点5.ElasticSearch 市场表现6.ElasticSearch 的发展 二、认识和安装ES1.认识 Elasticsearch(简称 ES)2.El…...
CNN Test Data
由于数据量过大,打不开了 搞一组小的吧。收工睡觉 https://download.csdn.net/download/spencer_tseng/90256048...
git 转移文件夹
打开终端或命令行界面:首先,确保你的电脑上安装了 Git,并打开终端或命令行界面。 导航到你的仓库目录:使用 cd 命令来切换到包含你想要移动文件夹的仓库的目录。 cd /path/to/your/repository使用 git mv 命令移动文件夹&#x…...
计算机网络学习
网络安全:前端开发者必知:Web安全威胁——XSS与CSRF攻击及其防范-CSDN博客 三次握手四次挥手:前端网络---三次握手四次挥手_前端三次握手-CSDN博客 http协议和https协议的区别:前端网络---http协议和https协议的区别-CSDN博客 …...
Android车机DIY开发之学习篇(二)编译Kernel以正点原子为例
Android车机DIY开发之学习篇(二)编译Kernel以正点原子为例 1.代码在/kernel-5.10文件夹下 2.在kernel-5.10目录下执行如下命令编译 : 编译之前,需要将 clang 导出到 PATH 环境变量: 如果是 Android12 执行下面这条命令 export PATH../pr…...