当前位置: 首页 > news >正文

使用 Node.js 从零搭建 Kafka 生产消费系统

目录

一、Kafka 核心概念速览

二、环境准备

三、生产者实现:发送消息

四、消费者实现:处理消息

五、高级配置与最佳实践

六、常见问题解决

七、应用场景示例

总结


Apache Kafka 作为高吞吐、分布式的消息队列系统,在实时数据流处理中占据重要地位。本文将以 Node.js 为例,从基础概念到代码实战,手把手教你实现 Kafka 的生产者与消费者。


一、Kafka 核心概念速览

1. Topic 与 Partition

- Topic:消息的分类(如 `userlogs`),生产者发送到 Topic,消费者订阅 Topic。

- Partition:每个 Topic 分为多个分区,实现并行处理。分区内有序,分区间无序

- 例如:将 `userlogs` 分为 3 个分区,可同时由 3 个消费者处理。

2. Producer 与 Consumer

- Producer:向 Kafka 发送消息的客户端。

- Consumer:订阅 Topic 并处理消息,消费者组(Consumer Group) 实现负载均衡。

3. Broker 与 Cluster

- Broker:Kafka 服务节点,负责存储和转发消息。

- Cluster:多个 Broker 组成集群,通过副本机制保障高可用。


二、环境准备

1. 安装 Kafka

参考 Kafka 官方文档 启动本地 Kafka 服务(需 Zookeeper 或 KRaft 模式)。

2. Node.js 客户端库

npm install kafkajs # 推荐:轻量、API 友好# 或使用 node-rdkafka(高性能,但配置复杂)

三、生产者实现:发送消息
// producer.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({clientId: 'node-producer',brokers: ['localhost:9092'],  // 替换为实际 Broker 地址
});
const producer = kafka.producer();
async function sendMessage() {await producer.connect();await producer.send({topic: 'user_actions',messages: [{ key: 'user1',  // 相同 Key 的消息分配到同一分区value: JSON.stringify({ action: 'click', page: 'home' })},],});console.log('✅ 消息发送成功');await producer.disconnect();
}
sendMessage().catch(console.error);

运行命令:`node producer.js`


四、消费者实现:处理消息
// consumer.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({clientId: 'node-consumer',brokers: ['localhost:9092'],
});
const consumer = kafka.consumer({ groupId: 'user-analytics-group' });
async function startConsumer() {await consumer.connect();await consumer.subscribe({ topic: 'user_actions', fromBeginning: true });await consumer.run({eachMessage: async ({ topic, partition, message }) => {console.log(`📩 收到消息: Topic: ${topic}Partition: ${partition}Key: ${message.key.toString()}Value: ${message.value.toString()}`);// 手动提交 Offset(确保消息处理完成)await consumer.commitOffsets([{ topic, partition, offset: message.offset }]);},});
}
startConsumer().catch(console.error);

运行命令:`node consumer.js`


五、高级配置与最佳实践

1. 生产者优化

const producer = kafka.producer({idempotent: true, // 开启幂等性,防止重复消息transactionTimeout: 30000,
});// 批量发送提升吞吐量
await producer.send({topic: 'logs',messages: batchMessages,acks: -1 // 所有副本确认后才返回成功
});

2. 消费者容错处理

consumer.on('consumer.crash', (error) => {console.error('消费者崩溃:', error);process.exit(1); // 重启或告警
});// 处理消息时捕获异常,避免 Offset 提交错误数据

3. 安全认证(生产环境必配)

new Kafka({brokers: ['kafka-server:9093'],ssl: { rejectUnauthorized: false },sasl: {mechanism: 'scram-sha-256',username: process.env.KAFKAUSER,password: process.env.KAFKAPASS}
});

六、常见问题解决

消息重复消费:消费者处理消息后崩溃,导致 Offset 未提交。

方案:实现业务逻辑的幂等性(如数据库唯一键)。

性能瓶颈:单个消费者处理速度慢。

方案:增加分区数,启动多个消费者实例(相同 Group ID)。

数据丢失风险:生产者配置 `acks: 0` 时,不等待 Broker 确认。

方案:生产环境至少设置 `acks: 1`(Leader 确认)。


七、应用场景示例

用户行为追踪:Web 端埋点数据实时发送到 Kafka,消费者计算点击率。

日志聚合:微服务日志统一写入 Kafka,供 ELK 系统分析。

订单状态通知:订单支付成功后,通过 Kafka 触发短信通知。


总结

通过 `kafkajs`,Node.js 可快速集成 Kafka 实现高可靠的消息处理。关键点:

1. 生产者关注消息分区策略与批量发送。

2. 消费者需处理 Offset 提交与容错。

3. 生产环境务必配置 SSL 和 SASL 认证。

进一步学习:Kafka 中文学习网

相关文章:

使用 Node.js 从零搭建 Kafka 生产消费系统

目录 一、Kafka 核心概念速览 二、环境准备 三、生产者实现:发送消息 四、消费者实现:处理消息 五、高级配置与最佳实践 六、常见问题解决 七、应用场景示例 总结 Apache Kafka 作为高吞吐、分布式的消息队列系统,在实时数据流处理中…...

【Linux系统】Linux权限讲解!!!超详细!!!

目录 Linux文件类型 区分方法 文件类型 Linux用户 用户创建与删除 用户之间的转换 su指令 普通用户->超级用户(root) 超级用户(root) ->普通用户 普通账户->普通账户 普通用户的权限提高 sudo指令 注: Linux权限 定义 权限操作 1、修改文…...

Ubuntu安装TensorFlow 2.13-GPU版全流程指南(anaconda)

目录 一、安装前准备​1.版本选择依据2.​创建独立环境​ 二、详细安装步骤1.​通过conda自动安装依赖2.​手动验证依赖版本 三、补充说明1.组件依赖关系表2.常见问题解决方案​3.性能验证脚本 一、安装前准备 ​1.版本选择依据 当前最新稳定版:TensorFlow 2.13&a…...

Spring事务管理

介绍了事务的概念,事务的特性,JDBC 事务管理的步骤和操作过程,以及Spring事务管理的两种实现方式:编程式事务管理和声明式事务管理。 1.事务的概念 事务(Transaction)就是将一系列的数据库操作作为一个整体…...

避雷 :C语言中 scanf() 函数的错误❌使用!!!

1. 返回值说明 scanf函数会返回成功匹配并赋值的输入项个数&#xff0c;而不是返回输入的数据。 可以通过检查返回值数量来确认输入是否成功。若返回值与预期不符&#xff0c;就表明输入存在问题。 #include <stdio.h>int main() {int num;if (scanf("%d", …...

判断一个操作是不是允许

一、目的 简单探索一个URL请求是不是允许的。 二、具体过程 (一)系统的初始化 系统数据库有账户"admin"&#xff0c;密码是"123"。 账号"admin"的角色是管理员"manager"。 假设管理员身份设定的权限是&#xff1a; 1、对于/user/开头的…...

【FPGA开发】Cordic原理推导、Xilinx PG105手册解读

目录 Cordic原理推导PG105手册解读IP核总览核心计算功能总览基本握手信号非阻塞模式 NonBlocking Mode阻塞模式 Block Mode 数据格式数据映射 本文针对Cordic算法本身&#xff0c;以及Xilinx官方CORDIC IP做学习记录&#xff0c;如有纰漏&#xff0c;欢迎指正&#xff01; Cord…...

数据结构与算法:宽度优先遍历

前言 进入图论部分难度明显提升了一大截&#xff0c;思路想不到一点…… 一、宽度优先遍历 1.内容 宽度优先遍历主要用于在图上求最短路。 &#xff08;1&#xff09;特点 宽度优先遍历的特点就是逐层扩展&#xff0c;最短路即层数 &#xff08;2&#xff09;使用条件 …...

PyTorch 面试题及参考答案(精选100道)

目录 PyTorch 的动态计算图与 TensorFlow 的静态计算图有何区别?动态图的优势是什么? 解释张量(Tensor)与 NumPy 数组的异同,为何 PyTorch 选择张量作为核心数据结构? 什么是 torch.autograd 模块?它在反向传播中的作用是什么? 如何理解 PyTorch 中的 nn.Module 类?…...

【数理基础】【概率论与数理统计】概率论与数理统计本科课程总结、资料汇总、个人理解

1 前言 概率论与数理统计是数学系核心的基础专业课&#xff0c;我本科的时候&#xff0c;是拆开上的&#xff0c;对应工科专业的高数中的概率论与数理统计&#xff0c;在量子力学&#xff0c;机器学习&#xff0c;计算机领域深度学习&#xff0c;大模型&#xff0c;机器人控制…...

美制 / 英制单位换算/公制/帝国制 单位转换速查表

文章目录 &#x1f4a1;Introduction&#x1f4cf; 英制&#xff08;美制&#xff09;单位与公制换算速查表&#x1f9f1; 一、长度&#xff08;Length&#xff09;&#x1f9f4; 二、体积&#xff08;Volume / Liquid Measure&#xff09;⚖️ 三、质量 / 重量&#xff08;Wei…...

ENSP学习day9

ACL访问控制列表实验 ACL&#xff08;Access Control List&#xff0c;访问控制列表&#xff09;是一种用于控制用户或系统对资源&#xff08;如文件、文件夹、网络等&#xff09;访问权限的机制。通过ACL&#xff0c;系统管理员可以定义哪些用户或系统可以访问特定资源&#x…...

我爱学算法之——滑动窗口攻克子数组和子串难题(中)

学习算法&#xff0c;继续加油&#xff01;&#xff01;&#xff01; 一、将 x 减到 0 的最小操作数 题目解析 来看这一道题&#xff0c;题目给定一个数组nums和一个整数x&#xff1b;我们可以在数组nums的左边或者右边进行操作&#xff08;x减去该位置的值&#xff09;&#…...

Linux centos 7 vsftp本地部署脚本

下面是脚本: #!/bin/bash #function:vsftpd脚本 #author: 20230323 IT 小旋风# 判断是否是root用户 if [ "$USER" ! "root" ]; thenecho "不是root 装个蛋啊"exit 1 fi# 关闭防火墙 systemctl stop firewalld && systemctl disable …...

编程考古-安德斯·海尔斯伯格(Anders Hejlsberg)回答离开Borland的原因

安德斯海尔斯伯格&#xff08;Anders Hejlsberg&#xff09;是著名的编程语言和工具开发者&#xff0c;曾主导开发了 Turbo Pascal、Delphi&#xff08;Borland 时期&#xff09;&#xff0c;以及加入微软后参与的 C# 和 TypeScript。关于他离开 Borland 的原因&#xff0c;可以…...

数据库数值函数详解

各类资料学习下载合集 ​​https://pan.quark.cn/s/8c91ccb5a474​​ 数值函数是数据库中用于处理数值数据的函数,可以用于执行各种数学运算、统计计算等。数值函数在数据分析及处理时非常重要,能够帮助我们进行数据的聚合、计算和转换。在本篇博客中,我们将详细介绍常用的…...

SpringBoot与Redisson整合,用注解方式解决分布式锁的使用问题

文章引用&#xff1a;https://mp.weixin.qq.com/s/XgdKE2rBKL0-nFk2NJPuyg 一、单个服务 1.代码 该接口的作用是累加一个值&#xff0c;访问一次该值加1 RestController public class LockController {Autowiredprivate StringRedisTemplate stringRedisTemplate;GetMappin…...

Bash 脚本基础

一、Bash 脚本基础 什么是 Bash 脚本&#xff1a;Bash 脚本是一种文本文件&#xff0c;其中包含了一系列的命令&#xff0c;这些命令可以被 Bash shell 执行。它用于自动化重复性的任务&#xff0c;提高工作效率。 Bash 脚本的基本结构&#xff1a;以 #!/bin/bash 开头&#x…...

【Linux】线程库

一、线程库管理 tid其实是一个地址 void* start(void* args) {const char* name (const char *)args;while(true){printf("我是新线程 %s &#xff0c;我的地址&#xff1a;0x%lx\n",name,pthread_self());sleep(1);}return nullptr; }int main() {pthread_t tid…...

Smith3.0 4.0的阻抗匹配操作方法

阅读了这篇文章中&#xff0c;我get到一些知识点的总结&#xff1a; 百度安全验证https://baijiahao.baidu.com/s?id1822624157494292625 1&#xff09;红色圆代表阻抗圆&#xff0c;绿色圆代表导纳圆。 2&#xff09;圆心位于50欧&#xff0c;最左侧为0欧&#xff0c;最右侧…...

装饰器模式 (Decorator Pattern)

装饰器模式 (Decorator Pattern) 是一种结构型设计模式,它动态地给一个对象添加一些额外的职责,就增加功能来说,装饰器模式相比生成子类更为灵活。 一、基础 1 意图 动态地给一个对象添加一些额外的职责。 就增加功能来说,装饰器模式相比生成子类更为灵活。 2 适用场景 当…...

生活电子类常识——搭建openMauns工作流+搭建易犯错解析

前言 小白一句话生成一个网站&#xff1f;小白一句话生成一个游戏&#xff1f;小白一句话生成一个ppt?小白一句话生成一个视频&#xff1f; 可以 原理 总体的执行流程是 1&#xff0c;用户下达指令 2&#xff0c;大模型根据用户指令&#xff0c;分解指令任务为多个细分步骤…...

题型笔记 | Apriori算法

目录 内容拓展知识 内容 其步骤如下&#xff1a; 扫描全部数据&#xff0c;产生候选项 1 1 1 项集的集合 C 1 C_1 C1​根据最小支持度&#xff0c;由候选 1 1 1 项集的集合 C 1 C_1 C1​ 产生频繁 1 1 1 项集的集合 L 1 L_1 L1​。若 k > 1 k > 1 k>1&#xf…...

雷电模拟器启动94%卡住不动解决方案

安卓模拟器启动失败/启动加载卡0-29%/启动卡50%/启动卡94%的解决方法 首先看官方论坛常见问题来尝试解决&#xff1a; 安卓模拟器启动失败/启动加载卡0-29%/启动卡50%/启动卡94%的解决方法-雷电安卓模拟器-手游模拟器安卓版_android手机模拟器电脑版_雷电模拟器帮助中心 所有…...

站群服务器是什么意思呢?

站群服务器是一种专门为托管和管理多个网站而设计的服务器&#xff0c;其核心特点是为每个网站分配独立的IP地址。这种服务器通常用于SEO优化、提高网站权重和排名&#xff0c;以及集中管理多个网站的需求。以下是站群服务器的详细解释&#xff1a; 一、站群服务器的定义 站群…...

靶场(十五)---小白心得思路分析---LaVita

启程&#xff1a; 扫描端口&#xff0c;发现开放22&#xff0c;80端口&#xff0c;发现ws.css可能存在exp&#xff0c;经查发现无可利用的exp PORT STATE SERVICE VERSION 22/tcp open ssh OpenSSH 8.4p1 Debian 5deb11u2 (protocol 2.0) | ssh-hostkey: | 3072 c9…...

第十六次CCF-CSP认证(含C++源码)

第十六次CCF-CSP认证 小中大满分思路遇到的问题 二十四点&#xff08;表达式求值&#xff09;难点满分思路 小中大 这次我觉得是非常难的 只有一道easy 做的时候看这个通过率就有点不对劲 上官网看了一眼平均分 106 就是人均A一道的水准 一开始看了半天 第三题几乎还是下不了手…...

大数据中的数据预处理:脏数据不清,算法徒劳!

大数据中的数据预处理&#xff1a;脏数据不清&#xff0c;算法徒劳&#xff01; 在大数据世界里&#xff0c;数据预处理是个让人又爱又恨的环节。爱它&#xff0c;是因为数据预处理做好了&#xff0c;后续的模型跑起来又快又准&#xff0c;仿佛给AI装上了火箭助推器&#xff1…...

17153 班级活动

17153 班级活动 ⭐️难度&#xff1a;简单 &#x1f31f;考点&#xff1a;2023、思维、国赛 &#x1f4d6; &#x1f4da; import java.util.Arrays; import java.util.LinkedList; import java.util.Queue; import java.util.Scanner;public class Main {static int N 10…...

算力100问☞第93问:算力资源为何更分散了?

目录 1、政策驱动与地方投资的盲目性 2、美国芯片断供与国产替代的阵痛 3、政企市场对私有云的偏好 4、技术标准与供需结构的失衡 5、产业生态与市场机制的滞后 6、破局路径与未来展望 在大模型和人工智能技术快速发展的背景下,算力资源已成为数字经济时代的核心基础设施…...

记录 macOS 上使用 Homebrew 安装的软件

Homebrew 是 macOS 上最受欢迎的软件包管理器之一&#xff0c;能够轻松安装各种命令行工具和 GUI 应用。本文记录了我通过 Homebrew 安装的各种软件&#xff0c;并对它们的用途和基本使用方法进行介绍。 &#x1f37a; Homebrew 介绍 Homebrew 是一个开源的包管理器&#xff…...

Java 安装开发环境(Mac Apple M1 Pro)

下载 Java Downloads 查看本地安装的 JDK 所在位置以及 JAVA 版本 系统默认的安装处 /Library/Java/JavaVirtualMachines配置Java 添加环境变量 vim &#xff5e;/.bash_profileAdd # 安装位置 export JAVA_11_HOME"/Library/Java/JavaVirtualMachines/zulu-11.jdk…...

微前端框架的实战demo

以下是基于主流微前端框架的实战 Demo 开发指南&#xff0c;综合多个开源项目与实践案例整理而成&#xff1a; 一、qiankun 框架实战 1. 核心架构 主应用&#xff1a;负责路由分发、子应用注册与生命周期管理子应用&#xff1a;独立开发部署&#xff0c;支持不同技术栈&#…...

win32汇编环境,网络编程入门之九

;在上一教程里&#xff0c;我们学习了在连接成功网站后&#xff0c;应该发送什么数据给网站 ;在前面的几个教程里&#xff0c;简单地运行了套接字机制连接网站的方式&#xff0c;这是字节级的网络连接&#xff0c;扩展几乎是无限的。 ;想了想&#xff0c;这个开个头就行了&…...

OceanBase 4.3.3 AP 解析:应用 RoaringBitmaps 类型处理海量数据的判重和基数统计

对于大数据开发人员而言&#xff0c;处理海量数据的判重操作和基数统计是常见需求&#xff0c;而 RoaringBitmap类型及其相关函数是当前非常高效的一种解决方案&#xff0c;许多大数据库产品已支持RoaringBitmap类型。OceanBase 4.3.3版本&#xff0c;作为专为OLAP场景设计的正…...

点亮STM32最小系统板LED灯

对于如何点亮板载LED灯只需要掌握如何初始化GPIO引脚&#xff0c;并改变GPIO引脚的电平即可实现点亮或者熄灭LED。 Led_INFO led_info {0}; led_info 是一个结构体变量&#xff0c;类型为 Led_INFO&#xff0c;用于存储LED的状态信息。这里初始化为 {0}&#xff0c;表示所有成…...

分区表的应用场景与优化实践

当表的数据量非常大,达到几千万甚至上亿行时,全表扫描会很慢,这时候分区可以帮助缩小扫描范围。比如,在一个电商系统中,订单表可能按月份分区,这样查询某个月的订单时,只需要扫描对应的分区,而不是整个表。或者在日志系统中,按天分区,方便归档和删除旧日志。 另外,如…...

如何高效参与 GitHub 知名项目开发并成为核心贡献者

参与知名 GitHub 项目开发不仅能提升你的编程能力&#xff0c;还能积累开源贡献经验&#xff0c;甚至为求职加分。以下是详细步骤&#xff1a; 1. 选择合适的 GitHub 项目 (1) 确定兴趣方向 后端开发&#xff1a;Spring、Spring Boot、Netty前端开发&#xff1a;React、Vue、…...

AIGC 新势力:探秘海螺 AI 与蓝耘 MaaS 平台的协同创新之旅

探秘海螺AI&#xff1a;多模态架构下的认知智能新引擎 在人工智能持续进阶的进程中&#xff0c;海螺AI作为一款前沿的多功能AI工具&#xff0c;正凭借其独特的多模态架构崭露头角。它由上海稀宇科技有限公司&#xff08;MiniMax&#xff09;精心打造&#xff0c;依托自研的万亿…...

银河麒麟桌面版包管理器(三)

以下内容摘自《银河麒麟操作系统进阶应用》一书 麒麟系统软件源配置 使用官方内置源时&#xff0c;无须任何操作。仅在使用其他镜像源&#xff08;Mirror&#xff09;时&#xff0c;需要修改/etc/apt/sources.list文件&#xff0c;根据不同版本&#xff0c;将原始sources.lis…...

在 .NET 9.0 Web API 中实现 Scalar 接口文档及JWT集成

示例代码&#xff1a;https://download.csdn.net/download/hefeng_aspnet/90408075 介绍 随着 .NET 9 的发布&#xff0c;微软宣布他们将不再为任何 .NET API 项目提供默认的 Swagger gen UI。以前&#xff0c;当我们创建 .NET API 项目时&#xff0c;微软会自动添加 Swagger…...

XSS跨站脚本攻击漏洞(Cross Site Scripting)

前提概要 本文章主要用于分享XSS跨站脚本攻击漏洞基础学习&#xff0c;以下是对XSS跨站脚本攻击漏洞的一些个人解析&#xff0c;请大家结合参考其他文章中的相关信息进行归纳和补充。 XSS跨站脚本攻击漏洞描述 跨站脚本攻击&#xff08;XSS&#xff09;漏洞是一种常见且危害较…...

Android:蓝牙设置配套设备配对

一、概述 在搭载 Android 8.0&#xff08;API 级别 26&#xff09;及更高版本的设备上&#xff0c;配套设备配对会代表您的应用对附近的设备执行蓝牙或 Wi-Fi 扫描&#xff0c;而不需要 ACCESS_FINE_LOCATION 权限。这有助于最大限度地保护用户隐私。使用此方法执行配套设备&am…...

MFC中CString类型是如何怎么转std::string的

文章目录 一、转换方法总结二、详细步骤1. Unicode 项目&#xff08;CStringW → std::string&#xff09;2. 多字节项目&#xff08;CStringA → std::string&#xff09; 三、注意事项四、总结更多信息(知识点存在重复&#xff0c;可跳过)方法 1&#xff1a;项目使用 Unicode…...

使用vscode搭建pywebview集成vue项目示例

文章目录 前言环境准备项目源码下载一、项目说明1 目录结构2 前端项目3 后端项目获取python安装包(选择对应版本及系统) 三、调试与生成可执行文件1 本地调试2 打包应用 四、核心代码说明1、package.json2、vite.config.ts设置3、main.py后端入口文件说明 参考文档 前言 本节我…...

JAVASCRIPT 基础 DOM元素,MAP方法,获取输入值

从输入框获取数据的一般写法是&#xff1a; javascript const w parseFloat(document.getElementById("weight").value); const h parseFloat(document.getElementById("height").value); 而从弹窗获取数据一般写法是&#xff1a; javascript const w …...

VLAN间通信

目录 第一步&#xff1a;配vlan 第二步&#xff1a;配置核心vlanif,MAC地址信息。 第三步&#xff1a;ospf协议 三层交换机&#xff08;汇聚层&#xff09;: 对于交换机、路由器、防火墙等网络设备而言&#xff0c;接口类型一般存在两种&#xff1a;二层接口&#xff0c;三…...

MySQL颠覆版系列————MySQL新特性(开启数据库的新纪元)上篇

文章目录 前言一、窗口函数&#xff08;Window Functions&#xff09;1.1 窗口函数概念1.2 常见的窗口函数 二、公用表表达式&#xff08;Common Table Expressions, CTEs&#xff09;2.1 公用表表达式的概念2.2 常见的公用表表达式 三、JSON增强3.1 JSON增强的概念3.2 常见的J…...

5.安全相关(双手启动、安全触边传感器)

一、关于双手启动按钮的使用规范 本文介绍双手启动按钮的使用。概括来讲&#xff1a; 双手按下之间的时间差间隔应该在0.5-2秒之间。一旦释放任何一个按钮&#xff0c;启动信号输出结束。只有两个按钮都被释放之后&#xff0c;才能再次触发双手启动信号。如果某按钮被按下超过…...

国产达梦(DM)数据库的安装(Linux系统)

目录 一、安装前的准备工作 1.1 导包 1.2 创建用户和组 1.3 修改文件打开最大数 1.4 目录规划 1.5 修改目录权限 二、安装DM8 2.1 挂载镜像 2.2 命令行安装 2.3 配置环境变量 2.4 启动图形化界面 三、配置实例 四、注册服务 五、启动 停止 查看状态 六、数据库客…...