当前位置: 首页 > news >正文

day08_Kafka

文章目录

  • day08_Kafka课程笔记
    • 一、今日课程内容
    • 一、消息队列(了解)
      • **为什么消息队列就像是“数据的快递员”?**
      • **实际意义**
      • 1、产生背景
      • 2、消息队列介绍
        • 2.1 常见的消息队列产品
        • 2.2 应用场景
        • 2.3 消息队列中两种消息模型
    • 二、Kafka的基本介绍
      • 1、Kafka基本介绍
      • **为什么 Kafka 就像是“数据的高速公路”?**
      • **实际意义**
      • 2、回顾zookeeper知识
        • 回顾启动zookeeper服务
        • 回顾zookeeper工具连接
      • 3、Kafka的架构(掌握)
    • 三、Kafka的集群搭建(操作)
      • 1、软件安装
      • 2、安装易错点
      • 3、配置Kafka的一键化启动
      • 4、启动服务
      • 5、操作kafka的多种方式
    • 四、Kafka的shell命令使用
        • topics操作
        • producer和consumer操作
        • bootstrap-server和zookeeper以及broker-list的区别:
    • 五、kafka tools工具使用(熟悉)
      • 3-1 连接配置
      • 3-2 创建主题
      • 3-3 删除主题
      • 3-4 主题下的数据查看
      • 3-5 数据发送和接收
    • 六、Kafka的Python API的操作(熟悉)
        • 模块安装
        • 模块使用
          • 3.1 完成生产者代码
          • 3.2 完成消费者代码
  • 01_生产者代码入门.py
  • 02_消费者代码入门.py

day08_Kafka课程笔记

一、今日课程内容

  • 1- 消息队列(了解)
  • 2- Kafka的基本介绍(掌握架构,其他了解)
  • 3- Kafka的集群搭建(操作)
  • 4- Kafka的相关使用(掌握kafka常用shell命令)

今日目的:掌握Kafka架构

在这里插入图片描述

一、消息队列(了解)

  1. 简单来说:消息队列就像是“数据的快递员”,在应用程序之间传递消息,确保数据能够可靠、高效地传输和处理。

  2. 具体而言

    • 核心概念
      • 消息:需要传递的数据单元,可以是文本、JSON、二进制等格式。
      • 队列:存储消息的容器,遵循先进先出(FIFO)的原则。
      • 生产者:发送消息的应用程序。
      • 消费者:接收和处理消息的应用程序。
    • 特点
      • 异步通信:生产者和消费者不需要同时在线,消息可以暂存于队列中。
      • 解耦:生产者和消费者之间无需直接交互,降低了系统耦合度。
      • 可靠性:消息队列通常支持持久化,确保消息不会丢失。
      • 扩展性:支持多个生产者和消费者,便于系统扩展。
  3. 实际生产场景

    • 在电商系统中,使用消息队列处理订单,确保订单数据可靠传输。
    • 在日志收集中,使用消息队列缓冲日志数据,避免数据丢失。
    • 在微服务架构中,使用消息队列实现服务间的异步通信。
  4. 总之:消息队列是分布式系统中重要的组件,通过异步通信和解耦,提高了系统的可靠性、扩展性和灵活性。

为什么消息队列就像是“数据的快递员”?

  1. 传递数据:可靠传输

    • 快递员:将包裹从发件人送到收件人,确保包裹安全到达。
    • 消息队列:将消息从生产者传递到消费者,确保数据可靠传输。
  2. 异步通信:无需实时交互

    • 快递员:发件人和收件人不需要同时在场,包裹可以暂存在快递点。
    • 消息队列:生产者和消费者不需要同时在线,消息可以暂存于队列中,实现异步通信。
  3. 解耦系统:降低依赖

    • 快递员:发件人和收件人无需直接联系,通过快递员完成交互。
    • 消息队列:生产者和消费者之间无需直接交互,降低了系统耦合度。
  4. 可靠性:确保数据不丢失

    • 快递员:通过物流系统确保包裹不丢失。
    • 消息队列:通过持久化机制确保消息不丢失,即使系统故障也能恢复。
  5. 扩展性:支持多对多通信

    • 快递员:可以同时为多个发件人和收件人服务。
    • 消息队列:支持多个生产者和消费者,便于系统扩展和负载均衡。

实际意义

消息队列就像“数据的快递员”,通过异步通信、解耦系统、可靠传输和扩展性,确保数据能够在分布式系统中高效、可靠地传递和处理。

1、产生背景

消息队列:指的数据在一个容器中,从容器中一端传递到另一端的过程

消息(message): 指的是数据,只不过这个数据存在一定流动状态
队列(queue): 指的容器,可以存储数据,只不过这个容器具备FIFO(先进先出)特性
思考: 公共容器需要具备什么特点?
1- 公共性: 各个程序都可以与之对接
2- FIFO特性: 先进先出
3- 具备高效的并发能力: 能够承载海量数据
4- 具备一定的容错能力: 比如支持重新读取消息方案

2、消息队列介绍

2.1 常见的消息队列产品

MQ:message queue消息队列

activeMQ: 出现时期比较早的一款消息队列的中间件产品,在早期使用人群是非常多,目前整个社区活跃度严重下降,使用人群很少了
rabbitMQ: 此款是目前使用人群比较多的一款消息队列的中间件的产品,社区活跃度比较高,主要是应用传统业务领域中
rocketMQ: 是阿里推出的一款消息队列的中间件的产品,目前主要是在阿里系环境中使用,目前支持的客户端比较少,主要是Java中应用较多
Kafka: Apache旗下的顶级开源项目,是一款消息队列的中间件产品项目来源于领英,是大数据体系中目前为止最为常用的一款消息队列的产品

在这里插入图片描述

2.2 应用场景
  • 应用解耦合
  • 异步处理
  • 限流削峰
  • 消息驱动系统
2.3 消息队列中两种消息模型
在Java中, 为了能够集成消息队列的产品, 专门提供了一个消息队列的协议: JMS(Java Message Server)  java消息服务消息队列中两个角色: 生产者(producer) 和 消费者(consumer)
生产者: 生产/发送消息到消息队列中
消费者: 从消息队列中获取消息在JMS规范中, 专门规定了两种消息消费模型: 
1- 点对点消费模型: 指的一条消息最终只能被一个消费者所消费。微信聊天的私聊
2- 发布订阅消费模型: 指的一条消息最终被多个消费者所消费。微信聊天的群聊

二、Kafka的基本介绍

1、Kafka基本介绍

​ Kafka是一款消息队列的中间件产品, 来源于领英公司, 后期贡献给了Apache, 目前是Aapche旗下的顶级开源项目, 采用语言是Scala

​ 官方地址: http://kafka.apache.org

kafka的特点:

  • 可靠性:Kafka集群是分布式的,并且有多副本的机制。数据可以自动复制
  • 可扩展性:Kafka集群可以灵活的调整,在线扩容
  • 耐用性:Kafka数据保存在磁盘上面,数据并且有多副本的机制。数据持久化,而且可以一定程度上防止数据丢失
  • 高性能:Kafka可以存储海量的数据,虽然是使用磁盘进行数据存储,但是Kafka有各种优化手段(例如:磁盘的顺序读写、零拷贝等)提高数据的读写速度(吞吐量)
  1. 简单来说:Kafka是一个分布式的流数据平台,就像是“数据的高速公路”,能够高效地处理实时数据流,支持数据的发布、订阅、存储和处理。

  2. 具体而言

    • 核心概念
      • Topic:数据流的分类,类似于消息队列的主题。
      • Producer:数据生产者,将数据发布到Topic。
      • Consumer:数据消费者,从Topic订阅数据。
      • Broker:Kafka集群中的服务器节点,负责存储和转发数据。
      • Partition:Topic的分区,支持并行处理和水平扩展。
    • 特点
      • 高吞吐量:支持每秒处理数百万条消息。
      • 持久化:数据持久化到磁盘,支持数据重放。
      • 分布式:支持集群部署,具备高可用性和容错能力。
      • 实时性:支持实时数据流的处理和分析。
  3. 实际生产场景

    • 在日志收集中,使用Kafka收集和传输分布式系统的日志数据。
    • 在实时推荐系统中,使用Kafka处理用户行为数据,实时更新推荐结果。
    • 在金融领域,使用Kafka处理交易数据,进行实时风险监控。
  4. 总之:Kafka是一个强大的分布式流数据平台,广泛应用于实时数据处理、日志收集和消息传递等场景,为大数据生态系统提供了高效、可靠的数据传输和处理能力。

为什么 Kafka 就像是“数据的高速公路”?

  1. 高速传输:高效处理数据流

    • 高速公路:车辆可以快速、高效地通行。
    • Kafka:支持高吞吐量,每秒可以处理数百万条消息,确保数据流的高速传输。
  2. 多车道并行:分区与扩展

    • 高速公路:多车道设计,支持车辆并行通行。
    • Kafka:通过Partition(分区)实现数据的并行处理和水平扩展,提高数据处理能力。
  3. 持久化存储:数据不丢失

    • 高速公路:有完善的基础设施,确保车辆安全通行。
    • Kafka:数据持久化到磁盘,支持数据重放,确保数据不丢失。
  4. 分布式架构:高可用性与容错

    • 高速公路:有多条备用路线,避免交通堵塞或事故中断。
    • Kafka:支持集群部署,具备高可用性和容错能力,即使部分节点故障,数据仍能正常传输。
  5. 实时性:快速响应

    • 高速公路:车辆可以快速到达目的地。
    • Kafka:支持实时数据流的处理和分析,确保数据能够快速传递到消费者。

实际意义

Kafka就像“数据的高速公路”,通过高吞吐量、分区并行、持久化存储和分布式架构,确保数据流能够高效、可靠地传输和处理,为实时数据应用提供了强大的支持。

2、回顾zookeeper知识

Kafka需要使用到zookeeper服务!

回顾启动zookeeper服务
# 三台都需要启动zookeeper服务
[root@node1 ~]# /export/server/zookeeper/bin/zkServer.sh start
[root@node2 ~]# /export/server/zookeeper/bin/zkServer.sh start
[root@node3 ~]# /export/server/zookeeper/bin/zkServer.sh start
回顾zookeeper工具连接

使用hadoop阶段发的ZooInspector软件,双击zookeeper-dev-ZooInspector.jar启动

在这里插入图片描述
在这里插入图片描述

3、Kafka的架构(掌握)

回顾HDFS写入过程:

在这里插入图片描述

Kafka架构:
在这里插入图片描述

1- Kafka中集群节点叫broker,节点和节点之间没有主从之分,地位是完全一样
2- Topic:主题/话题,是业务层面对消息进行分类的。
3- 一个Topic可以设置多个Partition分区。
4- 同一个Partition分区可以设置多个副本,但是副本数不能超过(>)集群broker节点的个数
5- 虽然broker节点间没有主从之分,但是同一个Partition分区的不同副本间有主从之分,分为了Leader主副本和Follower从副本
6- 生产者将数据首先发送给到Leader主副本,接着是Leader主副本主动的往Follower从副本上同步消息
7- Zookeeper用来管理集群,以及管理元数据信息
8- ISR同步列表。该列表中存放的是与Leader主副本消息同步程度最接近的Follower从副本,也就是消息最小的一个列表。该列表作用,当Leader主副本无法对外提供服务的时候,会从该ISR列表中选择一个Follower从副本变成Leader主副本,对外提供服务相关名词:
Kafka Cluster: Kafka集群
Topic: 主题/话题
Broker: Kafka中的节点
Producer: 生产者,负责生产/发送消息到Kafka中
Consumer: 消费者,负责从Kafka中获取消息
Partition: 分区。一个Topic可以设置多个分区,没有数量限制

三、Kafka的集群搭建(操作)

1、软件安装

环境搭建,参考【Spark课程阶段_部署文档.doc】的9.1章节内容。

2、安装易错点

  • 1- 配置文件中监听地址前面的注释,记得打开。也就是删除最前面的#

  • 2- 分发之后,记得要修改每个server.sql的 id 和 监听地址

  • 3- 分发之后,记得source /etc/profile让环境变量生效

  • 4- 没有启动zookeeper,或者仅仅启动了其中一台

  • 5- 启动的时候server.sql中路径,不要写错了

3、配置Kafka的一键化启动

注意:使用一键化脚本,也得需要先启动zookeeper

环境搭建,参考【Spark课程阶段_部署文档.doc】的9.4章节内容。

4、启动服务

方式1: 正常启动

# 1.先在三台机器都输入以下命令,启动ZooKeeper
/export/server/zookeeper/bin/zkServer.sh start# 2.再在三台集群上都输入以下命令,启动Kafka
# 注意:下面是一条命令!!!
nohup /export/server/kafka/bin/kafka-server-start.sh /export/server/kafka/config/server.sql 2>&1 &

方式2: 使用kafka的onekey脚本

# 1.先在三台机器都输入以下命令,启动ZooKeeper
/export/server/zookeeper/bin/zkServer.sh start# 2.只在node1上一键启动所有kafka服务
/export/onekey/start-kafka.sh

5、操作kafka的多种方式

在这里插入图片描述

四、Kafka的shell命令使用

​ Kafka本质上就是一个消息队列的中间件的产品,主要负责消息数据的传递。也就说学习Kafka 也就是学习如何使用Kafka生产数据,以及如何使用Kafka来消费数据

topics操作

注意:

创建topic不指定分区数和副本数,默认都是1个

分区数可以后期通过alter增大,但是不能减小

副本数一旦确定,不能修改!

参数如下:

cd /export/server/kafka/bin./kafka-topics.sh 参数说明:--bootstrap-server: Kafka集群中broker服务器--topic: 指定Topic名称--partitions: 设置Topic的分区数,可以省略不写--replication-factor: 设置Topic分区的副本数,可以省略不写--create: 指定操作类型。这里是新建Topic--delete: 指定操作类型。这里是删除Topic--alter: 指定操作类型。这里是修改Topic--list: 指定操作类型。这里是查看所有Topic列表--describe: 指定操作类型。这里是查看详细且具体的Topic信息
  • 1- 创建Topic
# 创建topic,默认1个分区,1个副本
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itcast # 可以使用以下参数提前查看是否是默认1个
--list: 指定操作类型。这里是查看所有Topic列表
--describe: 指定操作类型。这里是查看详细且具体的Topic信息
# 注意: 如果副本数超过了集群broker节点个数,就会报错
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itheima --partitions 4 --replication-factor 4

在这里插入图片描述

# 把replication-factor改成3以内就能创建成功了
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic itheima --partitions 4 --replication-factor 3
  • 2- 查看Topic
# --list查看所有topic  只有名称信息
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list
# --describe 可以查看详细Topic信息
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --describe # --describe 可以查看具体Topic信息
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --describe --topic itheima

在这里插入图片描述

当然也可使用zookeeper客户端查看
在这里插入图片描述

  • 3- 修改Topic

    本质就是扩容分区!!!

    因为分区不能减小,副本不能修改

# 增大topic分区
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 4
# 注意: partitions分区,只能增大,不能减小。而且没有数量限制
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 1

在这里插入图片描述

# 注意: 副本既不能增大,也不能减小
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic itcast --partitions 4 --replication-factor 2

在这里插入图片描述

  • 4- 删除Topic
# 再创建一个spark主题
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic spark/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list# 删除spark主题/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --delete --topic spark/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --list
producer和consumer操作

消费者要和生产者指定是同一个topic主题,才能接收到消息

参数如下:

cd /export/server/kafka/bin./kafka-console-producer.sh 参数说明--broker-list: Kafka集群中broker服务器--topic: 指定Topic./kafka-console-consumer.sh 参数说明--bootstrap-server: Kafka集群中broker连接信息--topic: 指定Topiclatest: 消费者(默认)从最新的地方开始消费--from-beginning: 指定该参数以后,会从最旧的地方开始消费--max-messages: 最多消费的条数。
  • 1- 模拟生产者Producer
# 为了方便演示再创建一个topic,名称为spark
/export/server/kafka/bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic spark# 模拟生产者给spark发送消息
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark-- 注意: 上述命令执行完后,出现 >,可以输入对应的消息了
  • 2- 模拟消费者Consumer

    注意: 可以右键CRT客户端连接->克隆会话来模拟多个消费者

# 模拟消费者从spark获取消息,默认每次拿最新的
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark -- 注意: 输入完上述命令后,自动接收最新的消息(因为默认latest),还可以持续接收...# --from-beginning 会从最旧的地方开始消费
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark --from-beginning-- 注意: 输入完上述命令后,自动接收了生产者发送的所有消息,还可以持续接收...# --max-messages x 可以设置从最旧的地方最大消费次数x
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic spark --from-beginning --max-messages 2-- 注意: 输入完上述命令后,只接收了前2个消息就结束了,不会持续接收

注意:

我们有时候发现消费者打印出来的消息和生产者生产的顺序不一致,是乱序的。原因如下:

topic有多个分区,底层是多线程来读取数据并进行打印输出。因此会存在乱序现象

bootstrap-server和zookeeper以及broker-list的区别:
旧版(<v2.2): kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --create --topic ..
注意: 旧版用--zookeeper参数,主机名(或IP)和端口用ZooKeeper的2181,也就是server.sql文件中zookeeper.connect属性的配置值.新版(>v2.2): kafka-topics.sh --bootstrap-server node1:9092 --create --topic ..
注意: 新版用--bootstrap-server参数,主机名(或IP)和端口用某个节点的即可,即主机名(或主机IP):9092。9092是Kafka的监听端口broker-list:broker指的是kafka的服务端,可以是一个服务器也可以是一个集群。producer和consumer都相当于这个服务端的客户端。一般我们在使用console producer的时候,broker-list参数是必备参数,另外一个必备的参数是topicbootstrap-servers: 指的是kafka集群的服务器地址,这个和broker-list功能是一样的,只不过我们在console producer要求用broker-list,其他地方都采用bootstrap-servers。
  1. 简单来说bootstrap-serverzookeeperbroker-list是Kafka中用于连接和协调的核心概念,分别像是“导航员”、“协调员”和“服务列表”,各自承担不同的角色和功能。

  2. 具体而言

    • bootstrap-server

      • 功能:用于客户端(Producer和Consumer)连接Kafka集群的入口点。客户端通过bootstrap-server获取集群的元数据(如Topic分区信息),并自动发现其他Broker。
      • 特点:支持自动发现集群中的其他Broker,简化了配置和管理。
      • 使用场景:适用于Kafka 0.9及以上版本,推荐在新版本中使用。
      • 示例
        kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
        
    • zookeeper

      • 功能:在早期版本(Kafka 0.8及以前)中,Zookeeper用于管理Kafka集群的元数据(如Broker状态、Topic分区信息)和消费者偏移量(offset)。
      • 特点:依赖Zookeeper会增加系统复杂性和性能开销,因此在Kafka 0.9及以后版本中,逐渐被bootstrap-server取代。
      • 使用场景:适用于旧版本Kafka,或需要与Zookeeper集成的场景。
      • 示例
        kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
        
    • broker-list

      • 功能:指定Kafka集群中的一个或多个Broker地址,主要用于Producer连接集群并发送消息。
      • 特点:需要手动配置Broker地址列表,不支持自动发现其他Broker。
      • 使用场景:适用于Producer的配置,尤其是在旧版本或特定场景下。
      • 示例
        kafka-console-producer.sh --broker-list localhost:9092 --topic test
        
  3. 实际生产场景

    • 在新版Kafka中,推荐使用bootstrap-server,因为它简化了配置并支持自动发现集群中的其他Broker。
    • 在旧版Kafka中,可能需要使用zookeeper来管理消费者偏移量和集群元数据。
    • 对于Producer,broker-list仍然是一个常用的配置参数,尤其是在需要手动指定Broker地址时。
  4. 总之bootstrap-serverzookeeperbroker-list在Kafka中各有其作用,bootstrap-server是新版本的推荐选择,zookeeper适用于旧版本,而broker-list主要用于Producer的配置。根据Kafka版本和具体需求选择合适的配置方式,可以提高系统的性能和可维护性。

如果需要更详细的信息,可以参考相关文档或搜索来源。

五、kafka tools工具使用(熟悉)

可以在可视化的工具通过点击来操作kafka完成主题的创建,分区等操作
在这里插入图片描述
注意: 安装完后桌面不会有快捷方式,需要去电脑上搜索,或者去自己选的安装位置找到发送快捷方式到桌面!
在这里插入图片描述
在这里插入图片描述

3-1 连接配置

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • 修改工具的数据显示类型字符串类型

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

3-2 创建主题

在这里插入图片描述
在这里插入图片描述

3-3 删除主题

在这里插入图片描述
在这里插入图片描述

3-4 主题下的数据查看

在这里插入图片描述

3-5 数据发送和接收

  • 3-5 发送消息数据到kafka

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

六、Kafka的Python API的操作(熟悉)

模块安装

纯Python的方式操作Kafka。

准备工作:在node1的节点上安装一个python用于操作Kafka的库

安装kafka-python 模模块 ,模块中提供了操作kafka的方法

在线安装

在node1上安装就可以,需要保证服务器能够连接网络

安装命令: python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

在这里插入图片描述

离线安装

将kafka_python-2.0.2-py2.py3-none-any.whl安装包上传服务器software目录下进行安装

安装命令: pip install kafka_python-2.0.2-py2.py3-none-any.whl
模块使用

API使用的参考文档: https://kafka-python.readthedocs.io/en/master/usage.html#kafkaproducer

模块中封装了两个类,

一个是生成者类KafkaProducer,提供了向kafka写数据的方法

另一个是消费者类KafkaConsumer,提供了读取kafka数据的方法

3.1 完成生产者代码

生成者类KafkaProducer,提供了向kafka写数据的方法

send(topic,valu)方法: 发送消息
topic参数:指定向哪个主题发送消息
value参数:指定发送的消息数据 ,数据类型要求是bytes类型

示例:

# 导包
from kafka import KafkaProducer# 编写代码
if __name__ == '__main__':# 创建生产者对象并指定对应服务器producer = KafkaProducer(bootstrap_servers=['node1:9092'])# 发送消息for i in range(1,101):future = producer.send('kafka', f'hi_kafka_{i}'.encode())# 获取元数据record_metadata = future.get()# 从元数据中获取主题,分区,偏移print(record_metadata.topic)print(record_metadata.partition)print(record_metadata.offset)
3.2 完成消费者代码

消费者类KafkaConsumer,提供了读取kafka数据的方法

KafkaConsumer(topic,bootstrap_servers)
第一个参数:指定消费者连接的主题,
第二个参数:指定消费者连接的kafka服务器

示例:

# 导包
from kafka import KafkaConsumer# 编写代码
if __name__ == '__main__':# 创建消费者对象consumer = KafkaConsumer('kafka',bootstrap_servers=['node1:9092'])# 遍历对象for message in consumer:# 格式化打印,设置相关参数# 因为value是二进制,需要decode解码print ("主题:%s,分区:%d,偏移:%d : key=%s value=%s"% (message.topic, message.partition,message.offset, message.key, message.value.decode('utf8')))

可能遇到的错误:

在这里插入图片描述

原因: 服务器环境有问题。是因为服务器上既安装了kafka-python的第三方依赖,同时还安装kafka的第三方依赖。可以通过pip list | grep kafka进行确定
解决办法: 先将这两个第三方依赖全部卸载,然后再重新执行如下命令
python -m pip install kafka-python -i https://pypi.tuna.tsinghua.edu.cn/simple

01_生产者代码入门.py

# 导包
import timefrom kafka import KafkaProducer# 异步发送:生产者发送消息后不会等待 Kafka 的确认,这种方式可以提高吞吐量,但可能会牺牲一定的可靠性,因为生产者无法立即知道消息是否成功发送。通常可以通过回调函数来处理发送结果。
def yibu():# 创建生产者对象producer = KafkaProducer(bootstrap_servers=['node1:9092'])# 发送消息到kafka# 发送1到10数字到test_python主题中for i in range(1, 11):# 如果主题不存在就会自动创建对应主题(默认分区数是1),存在就使用producer.send(topic='python_test2', value=f'你好啊~{i}'.encode('utf-8'))# 最后一定用close()释放资源一次性把对应消息发送到kafka中或者让程序多等待一会儿# producer.close()time.sleep(3)# 同步发送:生产者发送消息后会阻塞等待 Kafka 确认消息已成功写入,这种方式可以确保消息发送的可靠性,但会降低吞吐量。
def tongbu():# 创建生产者对象producer = KafkaProducer(bootstrap_servers=['node1:9092'])# 发送消息到kafka# 发送1到10数字到test_python主题中for i in range(1, 11):# 如果主题不存在就会自动创建对应主题(默认分区数是1),存在就使用f = producer.send(topic='python_test2', value=f'测试积压问题5~{i}'.encode('utf-8'))# 获取消息的元数据meta = f.get(timeout=10)print(f"主题:{meta.topic},分区:{meta.partition},消息偏移量:{meta.offset}")# main程序入口
if __name__ == '__main__':# 抽取方法的快捷键: 先选中代码, 然后按住ctrl+alt+M,最后起名字确认就行tongbu()

02_消费者代码入门.py

# 导包
from kafka import KafkaConsumer# main程序入口
if __name__ == '__main__':# 创建消费者对象consumer = KafkaConsumer('python_test2', group_id='g_1', bootstrap_servers=['node1:9092'],  auto_offset_reset='earliest')# 获取消息# 注意: 默认是获取最新的消息for msg in consumer:print(f"主题:{msg.topic},分区:{msg.partition},消息内容:{msg.value.decode('utf-8')}")

相关文章:

day08_Kafka

文章目录 day08_Kafka课程笔记一、今日课程内容一、消息队列&#xff08;了解&#xff09;**为什么消息队列就像是“数据的快递员”&#xff1f;****实际意义**1、产生背景2、消息队列介绍2.1 常见的消息队列产品2.2 应用场景2.3 消息队列中两种消息模型 二、Kafka的基本介绍1、…...

fbx 环境搭建

python 3.7 3.8 版本支持 https://github.com/Shiiho11/FBX-Python-SDK-for-Python3.x 只有python3.7 https://www.autodesk.com/developer-network/platform-technologies/fbx-sdk-2020-3 scipy安装&#xff1a; python安装scipy_scipy1.2.1库怎么安装-CSDN博客 smpl 2 fbx…...

【大数据】机器学习------神经网络模型

一、神经网络模型 1. 基本概念 神经网络是一种模拟人类大脑神经元结构的计算模型&#xff0c;由多个神经元&#xff08;节点&#xff09;组成&#xff0c;这些节点按照不同层次排列&#xff0c;通常包括输入层、一个或多个隐藏层和输出层。每个神经元接收来自上一层神经元的输…...

YOLOv5训练长方形图像详解

文章目录 YOLOv5训练长方形图像详解一、引言二、数据集准备1、创建文件夹结构2、标注图像3、生成标注文件 三、配置文件1、创建数据集配置文件2、选择模型配置文件 四、训练模型1、修改训练参数2、开始训练 五、使用示例1、测试模型2、评估模型 六、总结 YOLOv5训练长方形图像详…...

【Vim Masterclass 笔记11】S06L24 + L25:Vim 文本的插入、变更、替换与连接操作同步练习(含点评课)

文章目录 S06L24 Exercise 06 - Inserting, Changing, Replacing, and Joining1 训练目标2 操作指令2.1. 打开 insert-practice.txt 文件2.2. 练习 i 命令2.3. 练习 I 命令2.4. 练习 a 命令2.5. 练习 A 命令2.6. 练习 o 命令2.7. 练习 O 命令2.8. 练习 j 命令2.9. 练习 R 命令2…...

【计算机网络】深入浅出计算机网络

第一章 计算机网络在信息时代的作用 计算机网络已由一种通信基础设施发展成一种重要的信息服务基础设施 CNNIC 中国互联网网络信息中心 因特网概述 网络、互联网和因特网 网络&#xff08;Network&#xff09;由若干结点&#xff08;Node&#xff09;和连接这些结点的链路…...

HTTP详解——HTTP基础

HTTP 基本概念 HTTP 是超文本传输协议 (HyperText Transfer Protocol) 超文本传输协议(HyperText Transfer Protocol) HTTP 是一个在计算机世界里专门在 两点 之间 传输 文字、图片、音视频等 超文本 数据的 约定和规范 1. 协议 约定和规范 2. 传输 两点之间传输&#xf…...

ubuntu 安装 python

一、安装python依赖的包。 sudo apt-get install -y make zlib1g zlib1g-dev build-essential libbz2-dev libsqlite3-dev libssl-dev libxslt1-dev libffi-dev openssl python3-tklibsqlite3-dev需要在python安装之前安装&#xff0c;如果用户操作系统已经安装python环境&…...

CSS:定位

CSS定位核心知识点详解 CSS定位是网页布局中的重要概念&#xff0c;它允许开发者将元素放置在页面的指定位置。以下是对CSS定位所有相关详细重要知识点的归纳&#xff1a; 为什么要使用定位&#xff1a; 小黄色块在图片上移动&#xff0c;吸引用户的眼球。 当我们滚动窗口的…...

ros2笔记-7.1 机器人导航介绍

7.1 机器人导航介绍 7.1.1 同步定位与地图构建 想要导航&#xff0c;就是要确定当前位置跟目标位置。确定位置就是定位问题。 手机的卫星导航在室内 受屏蔽&#xff0c;需要其他传感器获取位置信息。 利用6.5 章节的仿真&#xff0c;打开并运行 会发现轨迹跟障碍物都被记录…...

一些常见的Java面试题及其答案

Java基础 1. Java中的基本数据类型有哪些&#xff1f; 答案&#xff1a;Java中的基本数据类型包括整数类型&#xff08;byte、short、int、long&#xff09;、浮点类型&#xff08;float、double&#xff09;、字符类型&#xff08;char&#xff09;和布尔类型&#xff08;boo…...

今日总结 2025-01-14

学习目标 掌握运用 VSCode 开发 uni - app 的配置流程。学会将配置完善的项目作为模板上传至 Git&#xff0c;实现复用。项目启动 创建项目&#xff1a;借助 Vue - Cli 方式创建项目&#xff0c;推荐从国内地址 https://gitee.com/dcloud/uni - preset - vue/repository/archiv…...

图像处理|开运算

开运算是图像形态学中的一种基本操作&#xff0c;通常用于去除小的噪声点&#xff0c;同时保留目标物体的整体形状。它结合了 腐蚀 和 膨胀 操作&#xff0c;且顺序为 先腐蚀后膨胀&#xff08;先腐蚀后膨胀&#xff0c;胀开了&#xff0c;开运算&#xff09;。 开运算的作用 …...

基于当前最前沿的前端(Vue3 + Vite + Antdv)和后台(Spring boot)实现的低代码开发平台

项目是一个基于当前最前沿的前端技术栈&#xff08;Vue3 Vite Ant Design Vue&#xff0c;简称Antdv&#xff09;和后台技术栈&#xff08;Spring Boot&#xff09;实现的低代码开发平台。以下是对该项目的详细介绍&#xff1a; 一、项目概述 项目名称&#xff1a;lowcode-s…...

ASP.NET Core - 依赖注入(三)

ASP.NET Core - 依赖注入&#xff08;三&#xff09; 4. 容器中的服务创建与释放 4. 容器中的服务创建与释放 我们使用了 IoC 容器之后&#xff0c;服务实例的创建和销毁的工作就交给了容器去处理&#xff0c;前面也讲到了服务的生命周期&#xff0c;那三种生命周期中对象的创…...

Unity 视频导入unity后,播放时颜色变得很暗很深,是什么原因导致?

视频正常播放时的颜色&#xff1a; 但是&#xff0c;当我在unity下&#xff0c;点击视频播放按钮时&#xff0c;视频的颜色立马变得十分昏暗&#xff1a; 解决办法&#xff1a; 将File—BuildSettings—PlayerSettings—OtherSettings下的Color Space改为&#xff1a;Gamma即可…...

数仓建模(五)选择数仓技术栈:Hive ClickHouse 其它

在大数据技术的飞速发展下&#xff0c;数据仓库&#xff08;Data Warehouse&#xff0c;简称数仓&#xff09;成为企业处理和分析海量数据的核心工具。市场上主流数仓技术栈丰富&#xff0c;如Hive、ClickHouse、Druid、Greenplum等&#xff0c;对于初学者而言&#xff0c;选择…...

MySQL主从:如何处理“Got Fatal Error 1236”或 MY-013114 错误(percona译文)

错误的 GTID 如今&#xff0c;典型的复制设置使用 GTID 模式&#xff0c;完整的错误消息可能如下所示&#xff1a; mysql > show replica status\G *************************** 1. row ***************************Replica_IO_Running: NoReplica_SQL_Running: YesLast_I…...

01.14周二F34-Day55打卡

文章目录 1. Jim 在看电视的时候他的老婆正在做饭。(两个动作同时进行)2. 他刚睡着电话就响了。3. 我正在想事情,这时忽然有人从后面抓我胳膊。4. 我们总是边吃火锅边唱歌。5. 他一听说出了事故,马上就来了现场。6. He entered the room until I returned. (英译汉)7.直到…...

Docker 部署 Typecho

1. 官网 https://typecho.org/插件 & 主题 https://github.com/typecho-fans/plugins https://typechx.com/ https://typecho.work/2. 通过 compose 文件安装 github官网&#xff1a; https://github.com/typecho/Dockerfile 新建一个目录&#xff0c;存放 typecho 的相…...

electron 打包后的 exe 文件,运行后是空白窗口

一、代码相关问题 1. 页面加载失败 1.1 原因 在 Electron 应用中&#xff0c;若loadFile或loadURL方法指定的页面路径或 URL 错误&#xff0c;就无法正确加载页面&#xff0c;导致窗口空白。 1.2. 解决 仔细检查loadFile或loadURL方法中传入的路径或 URL 是否正确&#xf…...

《leetcode-runner》如何手搓一个debug调试器——架构

本文主要聚焦leetcode-runner对于debug功能的整体设计&#xff0c;并讲述设计原因以及存在的难点 设计引入 让我们来思考一下&#xff0c;一个最简单的调试器需要哪些内容 首先&#xff0c;它能够接受用户的输入 其次&#xff0c;它能够读懂用户想让调试器干嘛&#xff0c;…...

matlab实现了一个优化的遗传算法,用于求解注汽站最优位置的问题

function [best_chromosome, best_fitness] optimized_genetic_algorithm()%% 遗传算法参数初始化% 定义井信息&#xff0c;包括坐标、管道长度、流量、压力等wells defineWells(); % 返回井的结构体数组N length(wells); % 注汽井数量% 遗传算法相关参数L_chromosome 20; …...

八股学习 Redis

八股学习 Redis 使用场景常见问题问题1、2示例场景缓存穿透解决方案一解决方案二 问题3示例场景缓存击穿解决方案 问题4示例场景缓存雪崩解决方案 问题5示例场景双写一致性强一致方案允许延时一致方案 问题6RDB方式AOF方式两种方式对比 问题7示例场景惰性删除定期删除 使用场景…...

C++ 中 :: 的各种用法

C 中 :: 的各种用法 文章目录 C 中 :: 的各种用法1. 全局作用域解析示例&#xff1a;访问全局变量 2. 类作用域2.1. 访问类的静态成员示例&#xff1a;访问静态成员2.2. 定义类外成员函数示例&#xff1a;定义类外成员函数 3. 命名空间作用域3.1. 访问命名空间中的成员示例&…...

【Redis】初识分布式系统

目录 单机架构 分布式系统 应用数据分离架构 应用服务集群架构 读写分离/主从分离架构 冷热分离架构 垂直分库 微服务架构 分布式名词概念 本篇博文&#xff0c;将根据分布式系统的演进一步一步介绍每一种架构的形式&#xff0c;最后为大家总结了一些分布式中常用的…...

【EI 会议征稿】第四届材料工程与应用力学国际学术会议(ICMEAAE 2025)

2025 4th International Conference on Materials Engineering and Applied Mechanics 重要信息 大会官网&#xff1a;www.icmeaae.com 大会时间&#xff1a;2025年3月7-9日 大会地点&#xff1a;中国西安 截稿时间&#xff1a;2025年1月24日23:59 接受/拒稿通知&#xf…...

redisson 连接 redis5报错 ERR wrong number of arguments for ‘auth‘ command

依赖版本 org.redisson:redisson-spring-boot-starter:3.25.2 现象 启动报错 org.redisson.client.RedisException: ERR wrong number of arguments for ‘auth’ command. channel: [xxx] command: (AUTH), params: (password masked) 原因 redis6以下版本认证参数不包含用…...

GPT(General Purpose Timer)定时器

基本概念&#xff1a; 在嵌入式系统中&#xff0c;General Purpose Timer&#xff08;GPT&#xff09;是一种非常重要的硬件组件&#xff0c;用于提供定时功能。 定义&#xff1a;通用定时器是一种能够提供精确时间测量和控制功能的电子设备或电路模块。它可以产生周期性的时…...

Node.js - HTTP

1. HTTP请求 HTTP&#xff08;Hypertext Transfer Protocol&#xff0c;超文本传输协议&#xff09;是客户端和服务器之间通信的基础协议。HTTP 请求是由客户端&#xff08;通常是浏览器、手机应用或其他网络工具&#xff09;发送给服务器的消息&#xff0c;用来请求资源或执行…...

Vue数据响应式,reaction,ref的使用

目录 数据响应式 如何使用 reactionyu区别 数据响应式 什么是数据响应式 简单来说就是当数据变了的时候&#xff0c;页面的展示也会跟着发生变化。 在Vue当中我们有两个函数可以实现这个功能reaction&#xff0c;ref 如何使用 首先这两个函数在是在Vue对象中我们可以先对…...

【Vue实战】Vuex 和 Axios 拦截器设置全局 Loading

目录 1. 效果图 2. 思路分析 2.1 实现思路 2.2 可能存在的问题 2.2.1 并发请求管理 2.2.2 请求快速响应和缓存带来的问题 3. 代码实现 4. 总结 1. 效果图 如下图所示&#xff0c;当路由变化或发起请求时&#xff0c;出现 Loading 等待效果&#xff0c;此时页面不可见。…...

JVM:ZGC详解(染色指针,内存管理,算法流程,分代ZGC)

1&#xff0c;ZGC&#xff08;JDK21之前&#xff09; ZGC 的核心是一个并发垃圾收集器&#xff0c;所有繁重的工作都在Java 线程继续执行的同时完成。这极大地降低了垃圾收集对应用程序响应时间的影响。 ZGC为了支持太字节&#xff08;TB&#xff09;级内存&#xff0c;设计了基…...

在 Ubuntu 上安装和配置 Redis

在 Ubuntu 上安装和配置 Redis&#xff0c;并使用发布-订阅&#xff08;Pub/Sub&#xff09;功能&#xff0c;可以按照以下步骤进行&#xff1a; 一、安装 Redis 1. 更新包列表 首先&#xff0c;更新本地的包列表以确保获取到最新的软件包信息&#xff1a; sudo apt update…...

【WPS】【WORDEXCEL】【VB】实现微软WORD自动更正的效果

1. 代码规范方面 添加 Option Explicit&#xff1a;强制要求显式声明所有变量&#xff0c;这样可以避免因变量名拼写错误等情况而出现难以排查的逻辑错误&#xff0c;提高代码的健壮性。使用 On Error GoTo 进行错误处理&#xff1a;通过设置错误处理机制&#xff0c;当代码执行…...

相机SD卡照片数据不小心全部删除了怎么办?有什么方法恢复吗?

前几天&#xff0c;小编在后台友收到网友反馈说他在整理相机里的SD卡&#xff0c;原本是想把那些记录着美好瞬间的照片导出来慢慢欣赏。结果手一抖&#xff0c;不小心点了“删除所有照片”&#xff0c;等他反应过来&#xff0c;屏幕上已经显示“删除成功”。那一刻&#xff0c;…...

【机器学习:十四、TensorFlow与PyTorch的对比分析】

1. 发展背景与社区支持 1.1 TensorFlow的背景与发展 TensorFlow是Google于2015年发布的开源深度学习框架&#xff0c;基于其前身DistBelief系统。作为Google大规模深度学习研究成果的延续&#xff0c;TensorFlow从一开始就定位为生产级框架&#xff0c;强调跨平台部署能力和性…...

从零搭建一个Vue3 + Typescript的脚手架——day1

1.开发环境搭建 (1).配置vite vite简介 Vite 是一个由尤雨溪开发的现代化前端构建工具&#xff0c;它利用了浏览器对 ES 模块的原生支持&#xff0c;极大地提升了开发服务器的启动速度和热更新效率。Vite 不仅适用于 Vue.js&#xff0c;还支持 React、Svelte 等多种框架&…...

unity打包sdk热更新笔记

基础打包需要知识&#xff1a; 安装包大小不要超过2G&#xff0c;AB包数量过多会影响加载和构建&#xff0c;多次IO&#xff0c;用Gradle打包&#xff0c;要支持64位系统&#xff0c;不同的渠道包&#xff1a;让做sdk的人支持&#xff0c;提供渠道包的打包工具 配置系统环境变量…...

算法-贪心算法简单介绍

下面是贪心算法视频课的导学内容. 目录 1. 什么是贪心算法?2. 贪心算法简单的三个例子:1. 找零问题2. 最小路径和问题3. 背包问题 3. 贪心算法的特点4. 贪心算法学习的方式? 1. 什么是贪心算法? 简单来说, 我们称以局部最优进而使得全局最优的一种思想实现出来的算法为贪心…...

1Hive概览

1Hive概览 1hive简介2hive架构3hive与Hadoop的关系4hive与传统数据库对比5hive的数据存储 1hive简介 Hive是基于Hadoop的一个数据仓库工具&#xff0c;可以将结构化的数据文件映射为一张数据库表&#xff0c;并提供类SQL查询功能。 其本质是将SQL转换为MapReduce/Spark的任务进…...

Linux SUID提权

文章目录 1. SUID/SGID2. 查找SUID文件3. SUID/SGID提权3.1 SUID配置不当3.2 SUID systemctl提权3.3 $PATH变量劫持 参考 1. SUID/SGID SUID&#xff08;Set User ID&#xff09;意味着如果某个用户对属于自己的文件设置了这种权限&#xff0c;那么其他用户在执行这一脚本时也…...

RabbitMQ确保消息可靠性

消息丢失的可能性 支付服务先扣减余额和更新支付状态&#xff08;这俩是同步调用&#xff09;&#xff0c;然后通过RabbitMq异步调用支付服务更新订单状态。但是有些情况下&#xff0c;可能订单已经支付 &#xff0c;但是更新订单状态却失败了&#xff0c;这就出现了消息丢失。…...

用plotly制作一条带颜色的时间轴,显示学习情况

前一篇文章我写到用matplotlib制作一条带颜色的时间轴&#xff0c;显示学习情况-CSDN博客&#xff0c;这是我在工作地方写的程序&#xff0c;我回家后发现家里的笔记本用不了matplotlib&#xff0c;所以我尝试用plotly这另外的模块也写一段程序&#xff0c;让我的程序能够回家使…...

MySQL:索引

目录 1.MySQL索引是干什么的 2.铺垫知识 3.单个page的理解 4.页目录 单页情况 多页情况 1.MySQL索引是干什么的 MySQL的索引是提高查询效率&#xff0c;主要提高海量数据的检索速度。 2.铺垫知识 操作系统与磁盘之间IO的基本单位是4kb。 数据库是一个应用层软件&#…...

Kylin: `GLIBC_2.34‘ not found

需要查看服务器GLIBC版本 strings /lib64/libc.so.6 |grep GLIBC_如果没有&#xff0c;有两种办法&#xff0c;一种是libc.so.6降级&#xff0c;但是这样很容易将服务器搞崩溃 所以可以尝试下载对应版本 glibc 打包编译&#xff0c;重新建立软连&#xff0c;下列是RPM资源可以…...

ASP.NET Core - 依赖注入(四)

ASP.NET Core - 依赖注入&#xff08;四&#xff09; 4. ASP.NET Core默认服务5. 依赖注入配置变形 4. ASP.NET Core默认服务 之前讲了中间件&#xff0c;实际上一个中间件要正常进行工作&#xff0c;通常需要许多的服务配合进行&#xff0c;而中间件中的服务自然也是通过 Ioc…...

【全套】基于分类算法的学业警示预测信息管理系统

【全套】基于分类算法的学业警示预测信息管理系统 【摘 要】 随着网络技术的发展基于分类算法的学业警示预测信息管理系统是一种新的管理方式&#xff0c;同时也是现代学业预测信息管理的基础&#xff0c;利用互联网的时代与实际情况相结合来改变过去传统的学业预测信息管理中…...

《零基础Go语言算法实战》【题目 2-25】goroutine 的执行权问题

《零基础Go语言算法实战》 【题目 2-25】goroutine 的执行权问题 请说明以下这段代码为什么会卡死。 package main import ( "fmt" "runtime" ) func main() { var i byte go func() { for i 0; i < 255; i { } }() fmt.Println("start&quo…...

回归预测 | MATLAB实RVM相关向量机多输入单输出回归预测

回归预测 | MATLAB实RVM相关向量机多输入单输出回归预测 目录 回归预测 | MATLAB实RVM相关向量机多输入单输出回归预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 RVM-Adaboost相关向量机集成学习多输入单输出回归预测是一种先进的机器学习方法&#xff0c;用于处理…...