Flume 与 Kafka 整合实战
目录
一、Kafka 作为 Source【数据进入到kafka中,抽取出来】
(一)环境准备与配置文件创建
(二)创建主题
(三)测试步骤
二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】
(一)编写配置脚本
(二)创建 topic
(三)测试过程
三、应用场景示例
四、总结
在大数据处理的生态系统中,Flume 和 Kafka 都是非常重要的组件。Flume 擅长收集、聚合和传输大量的日志数据等,而 Kafka 则是一个高性能的分布式消息队列,能够处理海量的实时数据。将 Flume 和 Kafka 进行整合,可以构建强大的数据处理管道,实现数据的高效采集、传输和处理。本文将详细介绍 Flume 和 Kafka 整合的两种常见方式:Kafka 作为 Source 和 Kafka 作为 Sink。
一、Kafka 作为 Source【数据进入到kafka中,抽取出来】
(一)环境准备与配置文件创建
在 Flume 的 conf 文件夹下,创建一个名为 kafka - memory - logger.conf 的脚本文件。这里需要注意,在实际操作中可能会遇到错误,例如 kafka 的每一批次的读取数量大于了 channel 的容量。这种情况下的解决方案是要么降低 kafka 的每一批次读取的容量,要么提高 channel 的容量。
https://flume.liyifeng.org/#kafka-source
kafka-memory-logger.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = five
a1.sources.r1.kafka.consumer.group.id = qiaodaohu# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 128
(二)创建主题
接着创建一个 topic,名字可以叫做 kafka - flume,当然也可以直接使用以前创建好的主题。
kafka-topics.sh --create --topic kafka-flume --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1
(三)测试步骤
首先启动一个消息生产者,向 topic 中发送消息。
kafka-console-producer.sh --topic kafka-flume --bootstrap-server bigdata01:9092
然后启动 Flume,接收消息并查看 log 日志,这样就可以验证数据是否能够从 Kafka 成功抽取到 Flume 中并进行后续处理。
在flume的flumeconf 文件夹下
flume-ng agent -n a1 -c ../conf -f ./kafka-memory-logger.conf -Dflume.root.logger=INFO,console
二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】
(一)编写配置脚本
编写一个名为 flume - kafka - sink.conf 的脚本,内容如下:
##a1就是flume agent的名称
## source r1
## channel c1
## sink k1
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 44444# 修改sink为kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092
a1.sinks.k1.kafka.topic = five
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
这里的流程是 netcat(模拟数据源)→ memory(内存通道)→ kafka。
(二)创建 topic
使用以下命令创建 topic(flume - kafka):
kafka-topics.sh --create --topic flume-kafka --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1
(三)测试过程
启动 Flume:
flume-ng agent -n a1 -c conf -f $FLUME_HOME/job/flume-kafka-sink.conf -Dflume.root.logger=INFO,console
使用 telnet 命令,向端口发送消息:
yum -y install telnettelnet bigdata01 44444
在窗口不断地发送文本数据,数据就会被抽取到 Kafka 中。
使用消费者获取 Kafka 数据:
kafka-console-consumer.sh --topic flume-kafka --bootstrap-server bigdata01:9092 --from-beginning
三、应用场景示例
假定有这样一个场景:Flume 可以抽取不断产生的日志,抽取到的日志数据,发送给 Kafka,Kafka 经过处理,可以展示在页面上,或者进行汇总统计。这样就实现了一定的实时效果,在实际的大数据处理流程中,这种整合方式能够有效地处理海量的实时数据,提高数据处理的效率和可靠性。
四、总结
通过 Flume 和 Kafka 的整合,我们能够构建更加灵活、高效的数据处理架构,满足不同场景下的大数据处理需求,为后续的数据挖掘、分析等提供坚实的数据基础。
相关文章:
Flume 与 Kafka 整合实战
目录 一、Kafka 作为 Source【数据进入到kafka中,抽取出来】 (一)环境准备与配置文件创建 (二)创建主题 (三)测试步骤 二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】 (…...
Web开发技术栈选择指南
互联网时代的蓬勃发展,让越来越多人投身软件开发领域。面对前端和后端的选择,很多初学者往往陷入迷茫。让我们一起深入了解这两个领域的特点,帮助你做出最适合自己的选择。 在互联网发展的早期,前端开发主要负责页面布局和简单的…...
社群赋能电商:小程序 AI 智能名片与 S2B2C 商城系统的整合与突破
摘要:本文聚焦于社群在电商领域日益凸显的关键地位,深入探讨在社群粉丝经济迅猛发展背景下,小程序 AI 智能名片与 S2B2C 商城系统如何与社群深度融合,助力电商突破传统运营局限,挖掘新增长点。通过分析社群对电商的价值…...
C++11 http服务端和客户端库cpp-httplib
C11 http服务端和客户端库cpp-httplib 环境: http: yhirose/cpp-httplib v0.18.1 json: nlohmann/json v3.11.31. 简介 cpp-httplib 是一个轻量级且易于使用的 C11 HTTP 库,由 yhirose 开发和维护,开源协议为MIT。它支持 HTTP/HTTPS 协议&…...
spring-boot自定义ApplicationListener及源码分析
ApplicationListener是spring boot应用启动时的事件监听器。监听的事件有(包括但不限于): (1)接下来,我们先通过一个例子实现自定义ApplicationListener: 监听器需要实现ApplicationListener<…...
打造双层环形图:基础与高级渐变效果的应用
在数据可视化领域,环形图因其独特的展示方式而广受欢迎。今天,我们将通过ECharts库来创建一个具有双层渐变效果的高级环形图。本文将详细介绍如何实现这种视觉效果。 1. 环形图基础 首先,我们需要了解环形图的基本构成。环形图由内外两个圆…...
BUGKU printf
整体思路 实现循环-->获取libc版本和system函数地址->将strcpy的got表项修改为system并获得shell 第一步:实现循环 从汇编语句可以看出,在每次循环结束时若0x201700处的值是否大于1则会继续循环。 encode1会将编码后的结果保存至0x2015c0处&am…...
spring boot2.7集成OpenFeign 3.1.7
1.Feign Feign是一个声明式web服务客户端。它使编写web服务客户端更容易。要使用Feign,请创建一个接口并对其进行注释。它具有可插入注释支持,包括Feign注释和JAX-RS注释。Feign还支持可插拔编码器和解码器。Spring Cloud增加了对Spring MVC注释的支持&…...
SSM相关面试题01
目录 1.何为Spring Bean容器?Spring Bean容器与Spring IOC 容器有什么不同吗? 2.Spring IOC 如何理解? 3.Spring DI 如何理解? 4.Spring 中基于注解如何配置对象作用域?以及如何配置延迟加载机制? 5.Spring 工厂底层构建Bean对象借助什么机制?当对象不使用了要释放…...
Python websocket
router.websocket(/chat/{flow_id}) 接口代码,并了解其工作流程、涉及的组件以及如何基于此实现你的新 WebSocket 接口。以下内容将分为几个部分进行讲解: 接口整体概述代码逐行解析关键组件和依赖关系如何基于此实现新功能示例:创建一个新的…...
正则表达式
正则表达式: 正则表达式区别于通配符,正则表达式是用来匹配文本的内容,命令的输出结果也属于文本内容。也可以使用正则表达式。 通配符用来匹配文件名和目录名。 grep用来过滤文本内容,以匹配要查询的结果。 linux的文本三剑客…...
机器学习——生成对抗网络(GANs):原理、进展与应用前景分析
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一. 生成对抗网络的基本原理二. 使用步骤2.1 对抗性训练2.2 损失函数 三. GAN的变种和进展四. 生成对抗网络的应用五. 持续挑战与未来发展方向六. 小结 前言 生…...
HTTPS 加密
HTTPS 加密技术 1. HTTPS 概述 HTTPS(HyperText Transfer Protocol Secure)是 HTTP 协议的安全版本,利用 SSL/TLS 协议对通信进行加密,确保数据的机密性、完整性和身份认证。HTTPS 在保护敏感数据的传输(如登录凭证、…...
Golang 构建学习
Golang 构建学习 如何搭建Golang开发环境 1. 下载GOlang包 https://golang.google.cn/dl/ 在地址上下载Golang 2. 配置包环境 修改全局环境变量,GOPROXY,GOPATH,GOROOT GOPROXYhttps://goproxy.cn,direct GOROOT"" // go二进…...
OpenCV相机标定与3D重建(7)鱼眼镜头立体校正的函数stereoRectify()的使用
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 cv::fisheye::stereoRectify 是 OpenCV 中用于鱼眼镜头立体校正的函数。该函数计算两个相机之间的校正变换,使得从两个相机拍摄的图像…...
JVM_垃圾收集器详解
1、 前言 JVM就是Java虚拟机,说白了就是为了屏蔽底层操作系统的不一致而设计出来的一个虚拟机,让用户更加专注上层,而不用在乎下层的一个产品。这就是JVM的跨平台,一次编译,到处运行。 而JVM中的核心功能其实就是自动…...
数据结构4——栈和队列
目录 1.栈 1.1.栈的概念及结构 1.2栈的实现 2.队列 2.1队列的概念及结构 2.2队列的实现 1.栈 1.1.栈的概念及结构 栈:一种特殊的线性表,其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一段称为栈顶,另一端称为…...
【AIGC】大模型面试高频考点-数据清洗篇
【AIGC】大模型面试高频考点-数据清洗篇 (一)常用文本清洗方法1.去除无用的符号2.去除表情符号3.文本只保留汉字4.中文繁体、简体转换5.删除 HTML 标签和特殊字符6.标记化7.小写8.停用词删除9.词干提取和词形还原10.处理缺失数据11.删除重复文本12.处理嘈…...
Java基于SSM框架的跑腿平台小程序【附源码、文档】
博主介绍:✌IT徐师兄、7年大厂程序员经历。全网粉丝15W、csdn博客专家、掘金/华为云//InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇dz…...
数据库连接池
在Java的多线程中,有线程池负责线程管理,类似线程池,在数据库中也有数据库连接池,负责数据库连接的管理。数据库连接池是一个容器。负责分配、管理数据库连接(Connection)。它允许应用程序重复使用一个现有…...
本地部署 WireGuard 无需公网 IP 实现异地组网
WireGuard 是一个高性能、极简且易于配置的开源虚拟组网协议。使用路由侠内网穿透使其相互通讯。 第一步,服务端(假设为公司电脑)和客户端(假设为公司外的电脑)安装部署 WireGuard 1,点此下载(…...
Educator头歌:离散数学 - 图论
第1关:图的概念 任务描述 本关任务:学习图的基本概念,完成相关练习。 相关知识 为了完成本关任务,你需要掌握:图的概念。 图的概念 1.一个图G是一个有序三元组G<V,R,ϕ>,其中V是非空顶点集合&am…...
axios的认识与基本使用
axios简介 Axios 是一个基于 promise 网络请求库,作用于node.js 和浏览器中。 它是 isomorphic 的(即同一套代码可以运行在浏览器和node.js中)。在服务端它使用原生 node.js http 模块, 而在客户端 (浏览端) 则使用 XMLHttpRequests。 主要特点 从浏览器创建 XML…...
springboot358智慧社区居家养老健康管理系统(论文+源码)_kaic
毕 业 设 计(论 文) 智慧社区居家养老健康管理系统设计与实现 摘 要 传统办法管理信息首先需要花费的时间比较多,其次数据出错率比较高,而且对错误的数据进行更改也比较困难,最后,检索数据费事费力。因此&…...
java-a+b 开启java语法学习
代码 (ab) import java.util.Scanner; //导入 java.util包中的Scanner 类,允许读取键盘输入数据public class Main { // 创建一个公共类 Mainpublic static void main(String[] args) {//程序入口点,main方法Scanner scanner new Scanner(…...
SpringAi整合免费大模型(NVIDIA)
接上回,发布了springAI整合本地大模型之后,我们来看看怎么去利用别人已经训练好的大模型。 如果对整合本地大模型感兴趣的,请阅读: SpringAI集成本地AI大模型ollama(调用篇)非常简单!…...
Flutter中的Future和Stream
在 Flutter 中,Future 和 Stream 都是用于处理异步操作的类,它们都基于 Dart 的异步编程模型,但是它们的使用场景和工作方式有所不同。以下是它们的区别以及各自适用的场景。 目录 一、Future1、基本使用2、异常处理1. catchError2. onError…...
Python将Excel文件转换为JSON文件
工作过程中,需要从 Excel 文件中读取数据,然后交给 Python 程序处理数据,中间需要把 Excel 文件读取出来转为 json 格式,再进行下一步数据处理。 这里我们使用pandas库,这是一个强大的数据分析工具,能够方便地读取和处理各种数据格式。需要注意的是还需要引入openpyxl库,…...
MySQL中EXPLAIN的介绍、作用、字段含义
MySQL中EXPLAIN的介绍、作用、字段含义 在MySQL中,EXPLAIN 是一个非常有用的命令,它可以帮助开发者和DBA理解查询执行计划,从而优化查询性能。EXPLAIN 可以模拟优化器执行SQL查询语句,而不真正执行这条语句,从而帮助用…...
Socket编程:UDP网络编程项目
目录 一、回显服务器 二、翻译器 三、聊天室 一、回显服务器 项目介绍:使用UDPIPv4协议进行Linux网络编程,实现回显服务器和客户端 功能介绍:客户端发送数据,经过服务端再返回到客户端,输出数据 源代码࿱…...
uniapp echarts tooltip formation 不识别html
需求: echarts 的tooltip 的域名太长,导致超出屏幕 想要让他换行 思路一: 用formation自定义样式实现换行 但是: uniapp 生成微信小程序, echart种的tooltip 的formation 识别不了html ,自定义样式没办…...
从0开始linux(39)——线程(2)线程控制
欢迎来到博主的专栏:从0开始linux 博主ID:代码小豪 文章目录 线程创建线程标识符线程参数多线程竞争资源 回收线程detach 线程退出pthread_cancel 线程创建 线程创建的函数为pthread_create。该函数是包含在posix线程库当中,posix线程是C语言…...
对载入的3dtiles进行旋转、平移和缩放变换。
使用 params: {tx: 129.75845, //模型中心X轴坐标(经度,单位:十进制度)//小左ty: 46.6839, //模型中心Y轴坐标(纬度,单位:十进制度)//小下tz: 28, //模型中心Z轴坐标(高…...
YOLO模型训练后的best.pt和last.pt区别
在选择YOLO模型训练后的权重文件best.pt和last.pt时,主要取决于具体的应用场景:12 best.pt:这个文件保存的是在训练过程中表现最好的模型权重。通常用于推理和部署阶段,因为它包含了在验证集上表现最好的模型权重&#x…...
Qt 项目中同时使用 CMAKE_AUTOUIC 和 UiTools 的注意事项
在 Qt 项目开发中,.ui 文件是界面设计的重要组成部分。开发者可以通过两种主要方式使用 .ui 文件: 编译期处理:通过 Qt 的 uic 工具将 .ui 文件转化为 C 代码(ui_xxx.h),静态绑定到项目中。运行时动态加载…...
不玩PS抠图了,改玩Python抠图
网上找了两个苏轼的印章图片: 把这两个印章抠出来的话,对于不少PS高手来说是相当容易,但是要去掉其中的水印,可能要用仿制图章慢慢描绘,图章的边缘也要慢慢勾画或者用通道抠图之类来处理,而且印章的红色也不…...
ubuntu 22.04 mini 安装,在配置网络时重启后配置文件被重置原因与解决方法
在 /etc/netplan/50-cloud-init.yaml 配置文件中有一段注释中有说明 rootlocalhost:/etc/netplan# cat 50-cloud-init.yaml # This file is generated from information provided by the datasource. Changes # to it will not persist across an instance reboot. To disab…...
【Go底层】time包Ticker定时器原理
目录 1、背景2、go版本3、源码解释【1】Ticker结构【2】NewTicker函数解释 4、代码示例5、总结 1、背景 说到定时器我们一般想到的库是cron,但是对于一些简单的定时任务场景,标准库time包下提供的定时器就足够我们使用,本篇文章我们就来研究…...
mac下Gpt Chrome升级成GptBrowser书签和保存的密码恢复
cd /Users/自己的用户名/Library/Application\ Support/ 目录下有 GPT\ Chrome/ Google/ GptBrowser/ GPT\ Chrome 为原来的chrome浏览器的文件存储目录. GptBrowser 为升级后chrome浏览器存储目录 书签所在的文件 Bookmarks 登录账号Login 相关的文件 拷贝到GptBrow…...
[Redis#6] list | 命令 | 应用 | 消息队列 | 微博 Timeline
目录 List 列表 特点 2. 命令 头插和尾插 下标 range 查询 头删和尾删 LINSERT LLEN LREM LTRIM LSET 阻塞命令 BLPOP BRPOP 操作 总结 3. 内部编码 ziplist(压缩列表) linkedlist(链表) ✔️quicklist(快速链…...
服务器数据恢复—raid6阵列硬盘被误重组为raid5阵列的数据恢复案例
服务器存储数据恢复环境: 存储中有一组由12块硬盘组建的RAID6阵列,上层linux操作系统EXT3文件系统,该存储划分3个LUN。 服务器存储故障&分析: 存储中RAID6阵列不可用。为了抢救数据,运维人员使用原始RAID中的部分…...
Xcode15(iOS17.4)打包的项目在 iOS12 系统上启动崩溃
0x00 启动崩溃 崩溃日志,只有 2 行,看不出啥来。 0x01 默认配置 由于我开发时,使用的 Xcode 14.1,打包在另外一台电脑 Xcode 15.3 Xcode 14.1 Build Settings -> Asset Catalog Compliter - Options Xcode 15.3 Build S…...
Netty的心跳机制怎么实现的?
大家好,我是锋哥。今天分享关于【Netty的心跳机制怎么实现的?】面试题。希望对大家有帮助; Netty的心跳机制怎么实现的? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Netty 的心跳机制用于维持客户端和服务器之间的…...
常用函数的使用错题汇总
目录 new/delete malloc/free1. 语言和类型2. 内存分配3. 内存释放4. 安全性和类型安全5. 其他特性总结 线程停止文件流 new/delete malloc/free malloc/free 和 new/delete 是 C/C 中用于动态内存管理的两种方式,它们有一些重要的区别。以下是这两种方式的比较&…...
使用 `aircrack-ng`扫描、获取握手包
使用 aircrack-ng 工具集来扫描 5GHz WiFi 网络的过程与扫描 2.4GHz 网络类似,但需要注意一些特定的配置和命令。以下是一个详细的步骤指南,帮助你在 5GHz 频段上扫描 WiFi 网络并捕获握手包。 ### 前提条件 1. **操作系统**:通常在 Linux 系…...
css—轮播图实现
一、背景 最近和朋友在一起讨论的时候,我们提出了这样的一个提问,难道轮播图的效果只能通过js来实现吗?经过我们的一系列的争论,发现了这是可以通过纯css来实现这一效果的,CSS轮播图也是一种常见的网页展示方式&#x…...
Ardusub源码剖析(1)——AP_Arming_Sub
代码 AP_Arming_Sub.h #pragma once#include <AP_Arming/AP_Arming.h>class AP_Arming_Sub : public AP_Arming { public:AP_Arming_Sub() : AP_Arming() { }/* Do not allow copies */CLASS_NO_COPY(AP_Arming_Sub);bool rc_calibration_checks(bool display_failure)…...
ESP32-S3模组上跑通ES8388(10)
接前一篇文章:ESP32-S3模组上跑通ES8388(9) 二、利用ESP-ADF操作ES8388 2. 详细解析 上一回解析了es8388_init函数中的第3段代码(也是实际与ES8388寄存器打交道的第1段代码),本回继续往下解析。为了便于理…...
AI/ML 基础知识与常用术语全解析
目录 一.引言 二.AI/ML 基础知识 1.人工智能(Artificial Intelligence,AI) (1).定义 (2).发展历程 (3).应用领域 2.机器学习(Machine Learning,ML) (1).定义 (2).学习方式 ①.监督学习 ②.无监督…...
【C#设计模式(15)——命令模式(Command Pattern)】
前言 命令模式的关键通过将请求封装成一个对象,使命令的发送者和接收者解耦。这种方式能更方便地添加新的命令,如执行命令的排队、延迟、撤销和重做等操作。 代码 #region 基础的命令模式 //命令(抽象类) public abstract class …...