Spark Streaming实时数据处理实战:从DStream基础到自定义数据源集成
park-Streaming概述
Spark-Streaming是什么
Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter等,以及和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。
DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。
所以简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。
Spark-Streaming的特点:易用、容错、易整合到spark体系。
易用性:Spark Streaming支持Java、Python、Scala等编程语言,可以像编写离线程序一样编写实时计算的程序
容错:Spark Streaming在没有额外代码和配置的情况下,可以恢复丢失的数据。对于实时计算来说,容错性至关重要。
易整合:Spark Streaming可以在Spark上运行,并且还允许重复使用相同的代码进行批处理。也就是说,实时处理可以与离线处理相结合,实现交互式的查询操作。
Spark-Streaming架构
背压机制:
在Spark 1.5 以前版本,用户如果要限制 Receiver 的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer 数据生产高于 maxRate,当前集群处理能力也高于 maxRate,这就会造成资源利用率下降等问题。
为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值为false,即不启用。
DStream实操
案例一:WordCount案例
需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数
实验步骤:
1.添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
2.编写代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object value26 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lineStreams = ssc.socketTextStream("node01", 9999)
val wordStreams = lineStreams.flatMap(_.split(" "))
val wordAndOneStreams = wordStreams.map((_, 1))
val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)
wordAndCountStreams.print()
ssc.start()
ssc.awaitTermination()
3.启动netcat发送数据
nc -lk 9999
案例解析:
Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有 一段时间间隔内的数据。
对数据的操作也是按照 RDD 为单位来进行的
DStream 创建
创建DStream的三种方式:RDD队列、自定义数据源、kafka数据源
RDD队列
可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个DStream 处理。
案例:
需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount
代码:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable.Queue
object value27 {
def main(args: Array[String]): Unit = {
// 创建Spark配置,设置应用在本地模式运行,使用所有可用核心,应用名为"RDDStream"
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
// 创建StreamingContext,批处理间隔为4秒
val ssc = new StreamingContext(sparkConf, Seconds(4))
// 创建一个可变队列用于存放RDD
val rddQueue = new mutable.Queue[org.apache.spark.rdd.RDD[Int]]()
// 通过队列创建DStream,设置不每次处理一个RDD
val inputStream = ssc.queueStream(rddQueue, oneAtATime = false)
// 将DStream中的元素映射为键值对,值为1
val mappedStream = inputStream.map((_, 1))
// 按键对值进行累加
val reducedStream = mappedStream.reduceByKey(_ + _)
// 打印结果
reducedStream.print()
// 启动StreamingContext
ssc.start()
// 循环创建5个RDD并放入队列,每个RDD包含1到300的数字,分区数为10
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
try {
Thread.sleep(2000)
} catch {
case e: InterruptedException => e.printStackTrace()
}
}
// 等待StreamingContext终止
ssc.awaitTermination()
}
}
自定义数据源
自定义数据源需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。
案例:自定义数据源,实现监控某个端口号,获取该端口号内容。
1)自定义数据源
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
import java.net.Socket
import java.io.{BufferedReader, InputStreamReader}
import java.nio.charset.StandardCharsets
Class CustomerReceiver(host:String,port:Int)extends Receiver[String](StorageLevel.MEMORY_ONLY) {
override def onStart(): Unit = {
new Thread("Socket - Receiver") {
override def run(): Unit = {
receive()
}
}.start()
}
def receive(): Unit = {
var socket: Socket = null
var input: String = null
var reader: BufferedReader = null
try {
socket = new Socket(host, port)
reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
while (!isStopped && {input = reader.readLine(); input != null}) {
store(input)
}
} catch {
case e: Exception =>
restart("Error receiving data", e)
} finally {
if (reader != null) reader.close()
if (socket != null) socket.close()
restart("restart")
}
}
override def onStop(): Unit = {
// 可添加停止相关逻辑,目前为空实现
}
}
2)使用自定义的数据源采集数据
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object value28 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
val ssc = new StreamingContext(sparkConf, Seconds(3))
// 使用自定义数据源创建DStream
val customStream = ssc.receiverStream(new CustomerReceiver("localhost", 9999))
customStream.print()
ssc.start()
ssc.awaitTermination()
}
}
相关文章:
Spark Streaming实时数据处理实战:从DStream基础到自定义数据源集成
park-Streaming概述 Spark-Streaming是什么 Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter等,以及和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:…...
微软GraphRAG的安装和在RAG中的使用体会
文章目录 0. 简介(1)**技术原理**(2)**优势**(3)**开源与演进** 1. 下载graphrag.git2.安装 poetry3.初始化项目:建立cases目录4. 修改.env5.修改settings.yaml,将两处 api_base改成中转站地址:…...
Python学习记录7——集合set()的使用指南
文章目录 引言一、集合特性二、创建方式三、元素操作1、添加元素(1)add(element)(2)update(iterables) 2、删除元素(1)remove(element)(2)discard(element)(3)…...
apkpure 谷歌插件 下载的apk包
谷歌插件市场搜索 apkpure 然后直接搜索下载就行了 想看apk包中的静态资源,直接改apk 为zip后缀解压就行了 apple的ipa包也是相同的道理...
Android四大核心组件
目录 一、为什么需要四大组件? 二、Activity:看得见的界面 核心功能 生命周期图解 代码示例 三、Service:看不见的劳动者 两大类型 生命周期对比 注意陷阱 四、BroadcastReceiver:消息传递专员 两种注册方式 广播类型 …...
WSL2里手动安装Docker 遇坑
在 WSL2 里手动安装 Docker Engine 时遇坑:systemctl 和 service 命令在默认的 WSL2 Ubuntu 中 无法使用,因为 WSL2 没有 systemd。怎么办? 自己操作让 Docker Engine(dockerd)直接跑起来,挂到 /var/run/do…...
【ROS2】ROS开发环境配置——vscode和git
古月21讲-ROS2/1.系统架构/1.5_ROS2开发环境配置/ ROS机器人开发肯定离不开代码编写,课程中会给大家提供大量示例源码,这些代码如何查看、编写、编译 安Linux中安装装git sudo apt install git下载教程源码 《ROS2入门21讲》课程源码的下载方式&#x…...
django.db.models.query_utils.DeferredAttribute object
在 Django 中,当你看到 django.db.models.query_utils.DeferredAttribute 对象时,通常是因为你在查询时使用了 only() 或 defer() 方法来延迟加载某些字段。这两个方法允许你控制数据库查询中的字段加载方式,从而优化查询性能。 only() 方法…...
Linux内核中的编译时安全防护:以网络协议栈控制块校验为例
引言:内存安全的无声守卫者 在操作系统内核开发中,内存溢出引发的错误往往具有极高的隐蔽性和破坏性。Linux内核作为承载全球数十亿设备的基石,其网络协议栈的设计尤其注重内存安全性。本文通过分析一段看似简单的内核代码,揭示Linux如何通过编译时静态检查(Compile-Time…...
第11章 安全网络架构和组件(一)
11.1 OSI 模型 协议可通过网络在计算机之间进行通信。 协议是一组规则和限制,用于定义数据如何通过网络介质(如双绞线、无线传输等)进行传输。 国际标准化组织(ISO)在20世纪70年代晚期开发了开放系统互连(OSI)参考模型。 11.1.1 OSI模型的…...
Git常用命令简明教程
本教程整合并优化了Git核心命令,涵盖初始化、配置、文件操作、分支管理、远程仓库操作及常见场景,适合快速入门和日常参考。命令按使用流程分组,简洁明了,包含注意事项和最佳实践。 1. 初始化与配置 初始化Git仓库并设置基本配置…...
在 Ubuntu 24.04 系统上安装和管理 Nginx
1、安装Nginx 在Ubuntu 24.04系统上安装Nginx,可以按照下面的步骤进行: 1.1、 更新系统软件包列表 在安装新软件之前,需要先更新系统的软件包列表,确保获取到最新的软件包信息。打开终端,执行以下命令: …...
数据结构——二叉树和堆(万字,最详细)
目录 1.树 1.1 树的概念与结构 1.2 树相关的术语 1.3 树的表示法 2.二叉树 2.1 概念与结构 2.2 特殊的二叉树 2.2.1 满二叉树 2.2.2 完全二叉树 2.3 二叉树存储结构 2.3.1 顺序结构 2.3.2 实现顺序结构二叉树 2.3.2.1 堆的概念与结构 2.3.2. 2 堆的插入与删除数据…...
IdeaVim 配置与使用指南
一、什么是 IdeaVim? IdeaVim 是 JetBrains 系列 IDE(如 IntelliJ IDEA, WebStorm, PyCharm 等)中的一个插件,让你在 IDE 里使用 Vim 的按键习惯,大大提升效率。 安装方法: 在 IDE 中打开 设置(Settings) →…...
前端浏览器窗口交互完全指南:从基础操作到高级控制
浏览器窗口交互是前端开发中构建复杂Web应用的核心能力,本文深入探讨23种关键交互技术,涵盖从传统API到最新的W3C提案,助您掌握跨窗口、跨标签页的完整控制方案。 一、基础窗口操作体系 1.1 窗口创建与控制 // 新窗口创建(现代浏…...
考研系列-计算机组成原理第五章、中央处理器
一、CPU的功能及结构 1.运算器的基本结构 2.控制器结构...
python+flask+flask-sockerio,部署后sockerio通信异常
前言 用python开发了一个flask web服务,前端用html,前后端通过socketio通信,开发环境,windowsminicondavscode,开发完成后本地运行没有问题,然后就开始部署,噩梦就开始了。 问题描述 程序是部…...
深度解析:TextRenderManager——Cocos Creator艺术字体渲染核心类
一、类概述 TextRenderManager 是 Cocos Creator 中实现动态艺术字体渲染的核心单例类。它通过整合资源加载、缓存管理、异步队列和自动布局等功能,支持普通字符模式和图集模式两种渲染方案,适用于游戏中的动态文本(如聊天内容、排行榜&…...
同样开源的自动化工作流工具n8n和Dify对比
n8n和Dify作为两大主流工具,分别专注于通用自动化和AI应用开发领域,选择哪个更“好用”需结合具体需求、团队能力及业务场景综合判断。以下是核心维度的对比分析: 一、核心定位与适用场景 维度n8nDify核心定位开源全场景自动化工具ÿ…...
设计模式每日硬核训练 Day 16:责任链模式(Chain of Responsibility Pattern)完整讲解与实战应用
🔄 回顾 Day 15:享元模式小结 在 Day 15 中,我们学习了享元模式(Flyweight Pattern): 通过共享对象,分离内部状态与外部状态,大量减少内存开销。适用于字符渲染、游戏场景、图标缓…...
基于边缘人工智能的AI无人机-更高效更安全的飞行任务执行
基于边缘人工智能的AI无人机-更高效更安全的飞行任务执行 人工智能有可能改变人们的生活和工作方式。人工智能和无人机是近年来发展迅速的两项技术。当这两种技术结合在一起时,它们会创造出许多以前不可能的应用。基于人工智能的无人机旨在独立执行任务,…...
30、不是说字符串是不可变的吗,string s=“abc“;s=“123“不就是变了吗?
一、核心概念澄清:不可变性的真实含义 1、不可变性的定义 字符串不可变性指对象内容不可修改,而非变量不可修改。 类比: 不可变字符串 装在密封信封里的信纸(内容不可更改)变量赋值 更换信封的指向(从…...
线上查询车辆出险记录:快速掌握事故情况!
在如今汽车成为人们日常不可或缺的交通工具之际,车辆出险记录成为了许多车主关注的焦点之一。为了帮助车主们快速了解车辆出险、理赔、事故记录,现在有了一种便捷的方式,那就是通过API接口在线查询。本文将介绍如何利用API接口,通…...
Python爬虫课程实验指导书
1.1Requests类库的认知 1.1.1 认识请求类库 Requests是用Python语言编写,基于,采用Apache2 Licensed开源协议的。它比urllib更加方便,可以节约我们大量的工作,完全满足HTTP测试需求。urllibHTTP库 Requests官网地址:ht…...
streamlit实现非原生的按钮触发效果 + flask实现带信息的按钮触发
目录 简介不携带信息的触发隐藏指定st.button(label, key)触发button的html代码汇总 携带信息的触发为什么需要携带信息前端JavaScript修改flask处理总代码 简介 由于streamlit可以同时在实现前后端结合,非常方便,但是这也造成了user难以方便的对页面的…...
机器学习基础——Seaborn使用
1.使用tips数据集,创建一个展示不同时间段(午餐/晚餐)账单总额分布的箱线图 import matplotlib.pyplot as plt import numpy as np import pandas as pd import seaborn as snstips pd.read_csv(./tips.csv)sns.boxplot(data tips,x time,y total_bill, )plt.show() 2.使用…...
Godot开发2D冒险游戏——第三节:游戏地图绘制
一、初步构建游戏地图 在游戏场景当中添加一个新的子节点:TileMapLayer 这一层称为瓦片地图层 根据提示,下一步显然是添加资源 为TileMapLayer节点添加一个TileSet 将地板添加进来,然后选择自动分割图集 自定义时要确保大小合适 让Godot自…...
Spark Mllib 机器学习
概述 机器学习是什么 根据百度百科的定义: 机器学习是一种通过算法和模型使计算机从数据中自动学习并进行预测或决策的技术。 定义比较抽象,根据常见的机器学习可以总结出三个关键字: 算法、经验、性能。 机器学习的过程可以抽象成一个pipel…...
在windows使用docker打包springboot项目镜像并上传到阿里云
1、前提:已有spring项目 2、在项目根目录下创建Dockerfile文件 FROM openjdk:11 WORKDIR /ruoyi-admin COPY ruoyi-admin/build/libs/lifecolor-web.jar lifecolor-web.jar CMD ["java", "-jar", "lifecolor-web.jar"] 3、选…...
前端高频面试题day3
JavaScript作用域理解 核心概念 作用域:定义变量/函数的可见范围及生命周期,分为 全局作用域、函数作用域、块级作用域。作用域链:变量查找从当前作用域逐级向上直至全局,遵循词法作用域(静态作用域)。闭…...
时空特征如何融合?LSTM+Resnet有奇效,SOTA方案预测准确率超91%
LSTM有着不错的时序信息提取能力,ResNet有着不错的空间特征信息提取能力。如果现在有时空特征融合的创新需求,我们是否能将LSTM和ResNet两者的优点融合起来呢? 随着这个思路下去,LSTM ResNet混合模型横空出世,在各个…...
蓝桥杯Java全攻略:从零到一掌握竞赛与企业开发实战
蓝桥杯Java软件开发竞赛已成为全国高校学生展示编程能力的重要舞台,本指南将带您从零开始构建完整的Java知识体系,不仅覆盖蓝桥杯高频考点,还延伸至企业级开发实战,助您在竞赛中脱颖而出并为未来职业发展奠定坚实基础。 一、Java基础语法与数据结构 竞赛解题流程图设计 蓝…...
【Nginx】负载均衡配置详解
Nginx作为高性能的HTTP服务器和反向代理服务器,提供了强大的负载均衡功能。本文将详细介绍Nginx负载均衡的配置方法和相关策略。 一、基础负载均衡配置 1.单服务示例配置 配置nginx.conf模块 在Nginx配置文件中定义upstream模块: worker_processes a…...
打造企业级AI文案助手:GPT-J+Flask全栈开发实战
一、智能文案革命的序幕:为什么需要AI文案助手? 在数字化营销时代,内容生产效率成为企业核心竞争力。据统计,营销人员平均每天需要撰写3.2篇文案,而传统人工创作存在三大痛点: 效率瓶颈:创意构…...
【文献速递】snoRNA-SNORD113-3/ADAR2通过对PHKA2的A-to-I编辑影响胶质母细胞瘤糖脂代谢
Cui等人于2025年在Cellular & Molecular Biology Letters上的发表一篇研究论文,题目为“Effect of SNORD113-3/ADAR2 on glycolipid metabolism in glioblastoma via A-to-I editing of PHKA2”。这篇文章的核心内容是研究胶质母细胞瘤(GBMÿ…...
视频HLS分片与关键帧优化深度解析
视频HLS分片与关键帧优化深度解析 🌐 HLS基础架构 #mermaid-svg-OQmrXfradiCv3EGC {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-OQmrXfradiCv3EGC .error-icon{fill:#552222;}#mermaid-svg-OQmrXfrad…...
再谈从视频中学习:从给视频打字幕的Humanoid-X、UH-1到首个人形VLA Humanoid-VLA:迈向整合第一人称视角的通用人形控制
前言 本博客内,之前写了比较多的大脑相关的,或者上肢操作,而解读运动控制、规划的虽也有,但相对少 故近期 准备多写写双足人形的运动控制相关 一方面,我们有些客户订单涉及这块二方面,想让双足人形干好活…...
Ubuntu下MySQL的安装
Ubuntu下MySQL的安装 1. 查看当前操作系统版本2. 添加MySQL APT源2.1 访问下载页面,并下载发布包2.2 执行安装指令2.3 安装MySQL 3. 查看MySQL状态4. 设置开机自启动 1. 查看当前操作系统版本 通过命令lsb_release -a查看: 2. 添加MySQL APT源 2.1 访问下…...
DataStreamAPI实践原理——快速上手
引入 通过编程模型,我们知道Flink的编程模型提供了多层级的抽象,越上层的API,其描述性和可阅读性越强,越下层API,其灵活度高、表达力越强,多数时候上层API能做到的事情,下层API也能做到&#x…...
《数据结构初阶》【顺序表 + 单链表 + 双向链表】
《数据结构初阶》【顺序表 单链表 顺序表】 前言:先聊些其他的东西!!!什么是线性表?什么是顺序表?顺序表的种类有哪些? 什么是链表?链表的种类有哪些? ---------------…...
【JS-Leetcode】2621睡眠函数|2629复合函数|2665计数器||
文章目录 2621睡眠函数2629复合函数2665计数器|| 这三个题目涉及setTimeout、promise、数组reduce方法,闭包。 2621睡眠函数 请你编写一个异步函数,它接收一个正整数参数 millis ,并休眠 millis 毫秒。要求此函数可以解析任何值。 原理&am…...
全国各地级城市月度平均房价统计数据2009-2021年
全国各地级城市月度平均房价统计数据2009-2021年.ziphttps://download.csdn.net/download/2401_84585615/90259770 https://download.csdn.net/download/2401_84585615/90259770 来源:安居客,本数据以excel格式展示,列举2.5万多条样本数据。总…...
ElasticSearch从入门到精通-覆盖DSL操作和Java实战
一、ElasticSearch基础概念 1.1 认识elasticSearch ElasticSearch(简称ES)是一款开源的、分布式的搜索引擎,它建立在Apache Lucene之上。简单来说,ElasticSearch就是一个能让你以极快速度进行数据搜索、存储和分析的系统。它不仅…...
SHCTF-REVERSE
前言 之前写的,一直没发,留个记录吧,万一哪天记录掉了起码在csdn有个念想 1.ezapk 反编译 快速定位关键函数 package com.mycheck.ezjv;import adrt.ADRTLogCatReader; import android.app.Activity; import android.content.Context; impo…...
C++学习:六个月从基础到就业——模板编程:模板特化
C学习:六个月从基础到就业——模板编程:模板特化 本文是我C学习之旅系列的第三十四篇技术文章,也是第二阶段"C进阶特性"的第十二篇,主要介绍C中的模板特化技术。查看完整系列目录了解更多内容。 目录 引言模板特化基础…...
【中级软件设计师】编译和解释程序的翻译阶段、符号表 (附软考真题)
【中级软件设计师】编译和解释程序的翻译阶段、符号表 (附软考真题) 目录 【中级软件设计师】编译和解释程序的翻译阶段、符号表 (附软考真题)一、历年真题二、考点:编译和解释程序的翻译阶段1、解释2、编译3、解释和编译的异同之处4、符号表 三、真题的答案与解析答…...
G1(Garbage-First)垃圾回收器与JVM内存
G1垃圾回收器简介 G1(Garbage-First)是Java虚拟机(JVM)中的一种垃圾回收器,它是针对服务器端应用设计的,旨在提供高吞吐量和低延迟的垃圾回收性能。G1垃圾回收器的主要目标是高效地管理JVM的堆内存,同时尽量减少垃圾回收(GC)过程对应用程序性能的影响。 特点 分区回收…...
STM32 驱动 INA226 测量电流电压功率
文章目录 一、INA226简介二、引脚功能三、寄存器介绍1.配置寄存器 0x002.分流电压寄存器 0x013.总线电压寄存器 0x024.功率寄存器 0x035.电流寄存器 0x046.基准寄存器 0x05 四、IIC 时序说明1.写时序2.读时序 五、程序六、实验现象1.线路图2.输出数据 一、INA226简介 INA226 是…...
解决新搭建的centos虚拟器,yum下载不了的问题
1. 检查网络连接 确保虚拟机可以访问互联网: ping 8.8.8.8 # 测试基础网络连通性若不通: 检查网卡 IP 配置(参考之前的 IP 恢复步骤)。 确认虚拟机网络模式(如 NAT 或桥接模式)是否允许访问外网。 检查网…...
python连接Elasticsearch并完成增删改查
python库提供了elasticsearch模块,可以通过以下命令进行快速安装,但是有个细节需要注意一下,安装的模块版本要跟es软件版本一致,此处举例:7.8.1 pip install elasticsearch==7.8.1 首先连接elasticsearch,以下是免密示例 from elasticsearch import Elasticsearch# El…...