Spark-Streaming
Spark-Streaming
一、Spark-Streaming简介
1、Spark-Streaming概述
1.1、Spark-Streaming是什么
Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter等,以及和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。
DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。
所以简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装。
Spark-Streaming的特点:易用、容错、易整合到spark体系。
①易用性:Spark Streaming支持Java、Python、Scala等编程语言,可以像编写离线程序一样编写实时计算的程序
②容错:Spark Streaming在没有额外代码和配置的情况下,可以恢复丢失的数据。对于实时计算来说,容错性至关重要。
③易整合:Spark Streaming可以在Spark上运行,并且还允许重复使用相同的代码进行批处理。也就是说,实时处理可以与离线处理相结合,实现交互式的查询操作。
1.2、Spark-Streaming架构
Spark-Streaming架构图:
背压机制:
在Spark 1.5 以前版本,用户如果要限制 Receiver 的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer 数据生产高于 maxRate,当前集群处理能力也高于 maxRate,这就会造成资源利用率下降等问题。
为了更好的协调数据接收速率与资源处理能力,1.5 版本开始 Spark Streaming 可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即 Spark Streaming Backpressure): 根据JobScheduler 反馈作业的执行信息来动态调整 Receiver 数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用 backpressure 机制,默认值为false,即不启用。
2、DStream实操
案例一:WordCount案例
需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数
实验步骤:
1)添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
2)编写代码
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val ssc = new StreamingContext(sparkConf,Seconds(3))
val lineStreams = ssc.socketTextStream("node01",9999)
val wordStreams = lineStreams.flatMap(_.split(" "))
val wordAndOneStreams = wordStreams.map((_,1))
val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)
wordAndCountStreams.print()
ssc.start()
ssc.awaitTermination()
3)启动netcat发送数据
nc -lk 9999
案例解析:
Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有 一段时间间隔内的数据。
对数据的操作也是按照 RDD 为单位来进行的
计算过程由 Spark Engine 来完成
二、Spark-Streaming核心编程(一)
DStream 创建
创建DStream的三种方式:RDD队列、自定义数据源、kafka数据源
1、RDD队列
可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个DStream 处理。
案例:
需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount
代码:
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
val ssc = new StreamingContext(sparkConf, Seconds(4))
val rddQueue = new mutable.Queue[RDD[Int]]()
val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
val mappedStream = inputStream.map((_,1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()
ssc.start()
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
}
ssc.awaitTermination()
结果展示:
2、自定义数据源
自定义数据源需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。
案例:自定义数据源,实现监控某个端口号,获取该端口号内容。
1) 自定义数据源
class CustomerReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
override def onStart(): Unit = {
new Thread("Socket Receiver"){
override def run(): Unit ={
receive()
}
}.start()
}
def receive(): Unit ={
var socket:Socket = new Socket(host,port)
var input :String = null
var reader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8))
input = reader.readLine()
while(!isStopped() && input != null){
store(input)
input = reader.readLine()
}
reader.close()
socket.close()
restart("restart")
}
override def onStop(): Unit = {}
}
2)使用自定义的数据源采集数据
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
val ssc = new StreamingContext(sparkConf,Seconds(5))
val lineStream = ssc.receiverStream(new CustomerReceiver("node01",9999))
val wordStream = lineStream.flatMap(_.split(" "))
val wordAndOneStream = wordStream.map((_,1))
val wordAndCountStream = wordAndOneStream.reduceByKey(_+_)
wordAndCountStream.print()
ssc.start()
ssc.awaitTermination()
相关文章:
Spark-Streaming
Spark-Streaming 一、Spark-Streaming简介 1、Spark-Streaming概述 1.1、Spark-Streaming是什么 Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter等,以及和简单的 TCP 套接字等等…...
工程投标k值分析系统(基于微服务实现)
1 需求总括 2 企业管理模块: 新增、删除、修改企业/部门 <...
WebGL 工作原理
WebGL在GPU上的工作基本上分为两部分 第一部分是将顶点(或数据流)转换到裁剪空间坐标 就是将传入的位置坐标,转换为0-1的值,并绘制出来每个顶点的坐标(传入的值)通过顶点着色器计算转换为裁剪空间坐标转换…...
【 React 】重点知识总结 快速上手指南
react 是 facebook 出的一款针对视图层的库。react 使用的是单向数据流的机制 React 官方中文文档 基础 api 和语法 jsx 语法 就是在 js 中插入 html 片段 在 React 中所有的组件都是 function 组件定义 function 定义组件 就是使用 function 定义组件 任何一个 function …...
Docker 部署 Redis 缓存服务
Docker 部署 Redis 缓存服务 基于 Docker 部署 Redis 缓存服务一、拉取 Redis 镜像二、运行 Redis 容器三、运行命令参数详解四、查看容器运行状态 基于 Docker 部署 Redis 缓存服务 一、拉取 Redis 镜像 确保 Docker 环境已正确安装并运行,打开终端执行以下命令拉…...
A2A + MCP:构建实用人工智能系统的超强组合
构建真正有效的连接型人工智能系统的挑战 如果你正在构建人工智能应用,这种情况可能听起来很熟悉: 你需要特定的人工智能能力来解决业务问题。你找到了完成每个单独任务的出色工具。但把所有东西连接在一起却占据了大部分开发时间,还创建了…...
全能 Sui 技术栈,构建 Web3 的未来
本文翻译自:FourPillarsFP,文章仅代表作者观点。 2025 年,SuiNetwork正在以一套全栈区块链策略强势出击,彻底打破加密行业的传统范式。正如 Mysten Labs 联合创始人 Adeniyi Abiodun 所说:“Sui 不只是一条区块链&…...
企业微信私域运营,基于http协议实现SCRM+AI完整解决方案
1、方案介绍 基于企业微信原生功能已实现全场景的能力覆盖,并提供标准化可直接调用的API接口,可以帮助企业轻松实现上层应用的开发及落地,方案采用模拟通信技术可实现PC,手机,ipad三端的同时在线,单服务器…...
【C++】Json-Rpc框架项目介绍(1)
项目介绍 RPC(Remote Procedure Call)即远程过程调用,是一种通过网络从远程计算机程序中请求服务而不需要了解底层网络实现细节的一种 协议 。 RPC(Remote Procedure Call)可以使用多种网络协议进行通信,如…...
数据结构图论基础知识(一)
文章目录 1. 图的基本概念2. 图的一些现实的应用2.1 ABCDE各个城市之间的关系2.2 社交关系 3. 图的存储结构3.1邻接矩阵3.2 邻接矩阵的实现3.3 邻接表 1. 图的基本概念 1. (graph)图由边(edge)和顶点(Vertexÿ…...
安宝特案例 | AR如何大幅提升IC封装厂检测效率?
前言:如何提升IC封装厂检测效率? 在现代电子产品的制造过程中,IC封装作为核心环节,涉及到复杂处理流程和严格质量检测。这是一家专注于IC封装的厂商,负责将来自IC制造商的晶圆进行保护、散热和导通处理。整个制程繁琐…...
2024年ESWA SCI1区TOP:量子计算蜣螂算法QHDBO,深度解析+性能实测
目录 1.摘要2.蜣螂优化算法DBO原理3.改进策略4.结果展示5.参考文献6.代码获取 1.摘要 蜣螂优化算法是一种群体智能优化算法,具有较强的优化能力和快速收敛性,但容易在优化过程后期陷入局部最优解。本文提出了一种量子计算和多策略混合的蜣螂优化算法&am…...
数据结构*链表- LinkedList
什么是链表 相较于ArrayList顺序表来说,链表是物理存储结构上非连续存储结构(也就是地址不是连续的)。链表由一系列节点构成,每个节点包含数据和指向下一个节点的引用。链表的各个节点在内存里并非连续存储,而是通过引…...
WebRTC服务器Coturn服务器用户管理和安全性
1、概述 Coturn服务器对用户管理和安全方面也做了很多的措施,以下会介绍到用户方面的设置 1.1、相关术语 1.1.1 realm 在 coturn 服务器中,域(realm)是一种逻辑上的分组概念,用于对不同的用户群体、应用或者服务进行区…...
MySQL聚簇索引和非聚簇索引
聚簇索引(Clustered Index)和非聚簇索引(Non-Clustered Index)是数据库中常用的两种索引类型,它们在数据存储和检索方式上有显著的区别。 一、聚簇索引(Clustered Index) 聚簇索引是数据表中的…...
QT加入阿里云OSS上传图片SDK,编译报错:error LNK2019: 无法解析的外部符号 EVP_MD_CTX_init
参考此链接把OSS上传图片的SDK,cmake成lib库,如下图 将lib库放入自己的项目文件夹下,并在pro文件中添加此lib库。而解决 “无法解析的外部符号 EVP_MD_CTX_init” 问题,则需要将third_party文件夹下的一些依赖库和头文件都放到自己…...
正点原子TFTLCD扩展
声明:该文章代码是在正点原子教学下写出的LCD驱动代码上进行了修改能兼容更多字号( ˘ ˘)❤️ 如有侵权,请私信联系删除 文章目录 前言代码lcd.clcd.hfont.h 前言 由于TFTLCD4.3寸屏幕太大了,正点原子的代码只能显示12/16/24字号的字符或者…...
部署Megatron - LM,快速上手使用
安装Megatron - LM 首先检查一下当前环境是否已经有 NVIDIA 驱动和 CUDA: nvidia-smi 直接在当前环境安装运行 PyTorch 和 Megatron-LM不使用 Docker 之前我们看到目前的环境有 NVIDIA V100 GPU 和 CUDA 12.1,我们可以安装对应的 GPU 版本 PyTorch。…...
赛灵思 XC7K325T-2FFG900I FPGA Xilinx Kintex‑7
XC7K325T-2FFG900I 是 Xilinx Kintex‑7 系列中一款工业级 (I) 高性能 FPGA,基于 28 nm HKMG HPL 工艺制程,核心电压标称 1.0 V,I/O 电压可在 0.97 V–1.03 V 之间灵活配置,并可在 –40 C 至 100 C 温度范围内稳定运行。该器件提供…...
20.1Linux的PWM驱动实验(知识点)_csdn
PWM 是很常用到功能,我们可以通过 PWM 来控制电机速度,也可以使用 PWM 来控制LCD 的背光亮度。本章我们就来学习一下如何在 Linux 下进行 PWM 驱动开发。 在之前学过STM32的就知道这部分内容,利用占空比来调节电机的转速。 1、PWM 驱动简析 …...
如何在 Java 中从 PDF 文件中删除页面(教程)
由于 PDF 文件格式不是 Java 原生支持的,因此要从 PDF 中删除页面,你需要使用外部库。 本教程介绍如何使用 JPedal 来实现这一功能。 开始使用 • 将 JPedal 添加到你的类路径或模块路径中(可从官网下载安装试用版 JAR 文件) •…...
2026《数据结构》考研复习笔记五(栈、队列)
栈、队列 一、栈1.卡特兰数2.不合法的出栈序列 二、队列1.循环队列2.输入输出受限队列(四个数1234) 三、算法1.栈在括号匹配中的应用2.中缀表达式求值(通过转化为后缀表达式再后缀表达式求值)3.中缀表达式转化为后缀表达式4.后缀表…...
本地部署DeepSeek大模型
本地部署 ollama 下载Ollama ollama支持的模型:Ollama Search 直接下载,发现默认是下载C盘,并且不能选择安装目录,这对我C盘的压力太大了。 查阅资料:发现可以修改 参考将Ollama安装到非C盘路径_ollama不安装在c盘…...
C++ / 引用 | 类
引用本 作用: 给变量起别名 语法: 数据类型 & 别名 原名; 应用代码 #include <iostream>using namespace std;int main() {int a 10;int& b a;b 100;cout << "a " << a << endl;cout <…...
在任意路径下简单开启jupyter notebook
正常的开启方式为: 安装anaconda, 在anaconda界面开启jupyter. 但是启动后root的路径不好改变。 可以直接通过cmd方式打开jupyter. cd D: # cd到d盘 jupyter notebook # 启动此时开启jupyter notebook, root为D盘路径。 按此方式可以在任意路径启动jupyter noteb…...
2025年阿里云云计算ACP高级工程师认证模拟试题(附答案解析)
这篇文章的内容是阿里云云计算ACP高级工程师认证考试的模拟试题。 所有模拟试题由AI自动生成,主要为了练习和巩固知识,并非所谓的 “题库”,考试中如果出现同样试题那真是纯属巧合。 1、设计云上架构时,如果能充分了解云服务的特…...
锂电池4.2V升压24V推荐哪些升压芯片?高效率国产SL4013输入耐压2.7V-25V
SL4013作为一款高性能升压型DC-DC转换芯片,在单节锂电池(4.2V)升压至24V的应用中展现出以下核心优势: 一、宽输入电压适应性 SL4013支持2.7V-25V的输入范围,尤其适合单节锂电池(3.7V-4.2V)的宽…...
中电金信联合阿里云推出智能陪练Agent
在金融业加速数智化转型的今天,提升服务效率与改善用户体验已成为行业升级的核心方向。面对这一趋势,智能体与智能陪练的结合应用,正帮助金融机构突破传统业务模式,开拓更具竞争力的创新机遇。 在近日召开的阿里云AI势能大会期间&…...
基于扣子(Coze.cn)与火山引擎构建高性能智能体的实践指南
1. 引言 1.1. 背景与目标 人工智能(AI)智能体(Agent)平台的兴起,例如扣子(Coze),正显著改变我们构建复杂 AI 应用的方式 1。这些平台旨在降低技术门槛,让不同技能水平的…...
w~视觉~3D~合集2
我自己的原文哦~ https://blog.51cto.com/whaosoft/13766161 #Sin3DGen 最近有点忙 可能给忘了,贴了我只是搬运工 发这些给自己看, 还有下面不是隐藏是发布出去 ~ 北京大学xxx团队联合山东大学和xxx AI Lab的研究人员,提出了首个基于单样例场景无需训练便可生…...
SAS宏核心知识与实战应用
1. SAS宏基础 1.1 核心概念 1.1.1 宏处理器 宏处理器在SAS程序运行前执行,用于生成动态代码,可实现代码的灵活定制。 通过宏处理器,可基于输入参数动态生成不同的SAS代码,提高代码复用性。 1.1.2 宏变量 宏变量是存储文本值的容器,用&符号引用,如&var,用于存储…...
windows使用openssl生成IIS自签证书全流程
使用 OpenSSL 生成适用于 IIS 的证书,通常需要经过以下步骤:生成私钥、生成证书签名请求(CSR)、生成自签名证书或通过 CA 签名,最后将证书转换为 IIS 所需的 PFX 格式。以下是详细步骤: 1. 安装 OpenSSL …...
笔记本电脑研发笔记:BIOS,Driver,Preloader详记
在笔记本电脑的研发过程中,Driver(驱动程序)、BIOS(基本输入输出系统)和 Preloader(预加载程序)之间存在着密切的相互关系和影响,具体如下: 相互关系 BIOS 与 Preload…...
鸿蒙生态:鸿蒙生态校园行心得
(个人观点,仅供参考) 兄弟们,今天来浅浅聊一聊这次的设立在长沙的鸿蒙生态行活动。 老样子,我们先来了解一下这个活动: Harmon&#x…...
云原生周刊:KubeSphere 平滑升级
开源项目推荐 Kagent Kagent 是一个开源的 K8s 原生框架,旨在帮助 DevOps 和平台工程师在 K8s 环境中构建和运行 AI 代理(Agentic AI)。与传统的生成式 AI 工具不同,Kagent 强调自主推理和多步骤任务的自动化执行,适…...
Uniapp:swiper(滑块视图容器)
目录 一、基本概述二、属性说明三、基本使用 一、基本概述 一般用于左右滑动或上下滑动,比如banner轮播图 二、属性说明 属性名类型默认值说明平台差异说明indicator-dotsBooleanfalse是否显示面板指示点indicator-colorColorrgba(0, 0, 0, .3)指示点颜色indicat…...
开源的自动驾驶模拟器
以下是目前主流的 开源自动驾驶模拟器,适用于算法开发、测试和研究: 1. CARLA 官网/GitHub: carla.org | GitHub许可证: MIT特点: 基于虚幻引擎(Unreal Engine),提供高保真城市场景(支…...
Uniapp:scroll-view(区域滑动视图)
目录 一、基本概述二、属性说明三、基本使用3.1 纵向滚动3.2 横向滚动 一、基本概述 scroll-view,可滚动视图区域。用于区域滚动。 二、属性说明 属性名类型默认值说明平台差异说明scroll-xBooleanfalse允许横向滚动scroll-yBooleanfalse允许纵向滚动 三、基本使…...
【漏洞复现】Struts2系列
【漏洞复现】Struts2系列 1. 了解Struts21. Struts2 S2-061 RCE (CVE-2020-17530)1. 漏洞描述2. 影响版本3. 复现过程 1. 了解Struts2 Apache Struts2是一个基于MVC设计模式的Web应用框架,会对某些标签属性(比如 id)的…...
洗车小程序系统前端uniapp 后台thinkphp
洗车小程序系统 前端uniapp 后台thinkphp 支持多门店 分销 在线预约 套餐卡等...
【RuleUtil】适用于全业务场景的规则匹配快速开发工具
一、RuleUtil 开发背景 1.1 越来越多,越来越复杂的业务规则 1、规则的应用场景多 2、规则配置的参数类型多(ID、数值、文本、日期等等) 3、规则的参数条件多(大于、小于、等于、包含、不包含、区间等等) 4、规则的结…...
多表查询之嵌套查询
目录 引言 一、标量嵌套查询 二、列嵌套查询 三、行嵌套查询 四、表嵌套查询 引言 1、概念 SQL语句中嵌套 select 语句,称为嵌套查询,又称子查询。嵌套查询外部的语句可以是 insert / update / delete / select 的任何一个。 嵌套…...
js原型链prototype解释
function Person(){} var personnew Person() console.log(啊啊,Person instanceof Function);//true console.log(,Person.__proto__Function.prototype);//true console.log(,Person.prototype.__proto__ Object.prototype);//true console.log(,Function.prototype.__prot…...
RK3588 ubuntu20禁用自带的TF卡挂载,并设置udev自动挂载
禁用系统的自动挂载(udisks2) sudo vim /etc/udev/rules.d/80-disable-automount.rules添加 ACTION"add", KERNEL"mmcblk1p1", ENV{UDISKS_IGNORE}"1"KERNEL“mmcblk1p1”:匹配设备名(TF卡通常是…...
【Pytorch 中的扩散模型】去噪扩散概率模型(DDPM)的实现
介绍 广义上讲,扩散模型是一种生成式深度学习模型,它通过学习到的去噪过程来创建数据。扩散模型有很多变体,其中最流行的通常是文本条件模型,它可以根据提示生成特定的图像。一些扩散模型(例如 Control-Net࿰…...
AR/VR衍射光波导性能提升遇阻?OAS光学软件有方法
衍射波导准直系统设计案例 简介 在现代光学显示技术中,衍射光波导系统因其独特的光学性能和紧凑的结构设计,在增强现实(AR)、虚拟现实(VR)等领域展现出巨大的应用潜力。本案例聚焦于衍射波导准直系统&…...
联易融受邀参加上海审计局金融审计处专题交流座谈
近日,联易融科技集团受邀出席了由上海市审计局金融审计处组织的专题交流座谈,凭借其在供应链金融领域的深厚积累和创新实践,联易融为与会人员带来了精彩的分享,进一步加深现场对供应链金融等金融发展前沿领域的理解。 在交流座谈…...
【中级软件设计师】程序设计语言基础成分
【中级软件设计师】程序设计语言基础成分 目录 【中级软件设计师】程序设计语言基础成分一、历年真题二、考点:程序设计语言基础成分1、基本成分2、数据成分3、控制成分 三、真题的答案与解析答案解析 复习技巧: 若已掌握【程序设计语言基础成分】相关知…...
高并发抢券系统设计与落地实现详解
📚 目录 一、业务背景与系统目标 二、架构设计总览 三、热点数据预热与缓存设计 四、抢券逻辑核心 —— Redis Lua 脚本 五、抢券接口实现要点 六、结果同步机制设计 七、性能优化策略 八、总结 在电商系统中,抢券作为一种典型的秒杀业务场景&a…...
外商在国内宣传 活动|发布会|参展 邀请媒体
传媒如春雨,润物细无声,大家好,我是51媒体胡老师。 外商在国内开展宣传活动、发布会或参展时,邀请媒体是扩大影响力、提升品牌知名度的关键环节。 一、活动筹备阶段:选择具有实力且更有性价比的媒体服务商(…...