当前位置: 首页 > news >正文

Spark-Streaming核心编程

以下是今天所学的知识点与代码测试:

Spark-Streaming

DStream实操

案例一:WordCount案例

需求:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并统计不同单词出现的次数

实验步骤:

  1. 添加依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

  1. 编写代码

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
val ssc = new StreamingContext(sparkConf,Seconds(3))

val lineStreams = ssc.socketTextStream("node01",9999)
val wordStreams = lineStreams.flatMap(_.split(" "))
val wordAndOneStreams = wordStreams.map((_,1))
val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)
wordAndCountStreams.print()

ssc.start()
ssc.awaitTermination()

  1. 启动netcat发送数据

nc -lk 9999

 Spark-Streaming核心编程(一)

DStream 创建

创建DStream的三种方式:RDD队列、自定义数据源、kafka数据源

RDD队列

可以通过使用 ssc.queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个DStream 处理。

案例:

需求:循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount

代码:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
val ssc = new StreamingContext(sparkConf, Seconds(4))

val rddQueue = new mutable.Queue[RDD[Int]]()
val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
val mappedStream = inputStream.map((_,1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()

ssc.start()
for (i <- 1 to 5) {
  rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
  Thread.sleep(2000)
}
ssc.awaitTermination()

结果展示:

自定义数据源

自定义数据源需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

案例:自定义数据源,实现监控某个端口号,获取该端口号内容。

  1. 自定义数据源

class CustomerReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
  override def onStart(): Unit = {
    new Thread("Socket Receiver"){
      override def run(): Unit ={
        receive()
      }
    }.start()
  }

  def receive(): Unit ={
    var socket:Socket = new Socket(host,port)
    var input :String = null
    var reader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8))

    input = reader.readLine()
    while(!isStopped() && input != null){
      store(input)
      input = reader.readLine()
    }
    reader.close()
    socket.close()

    restart("restart")
  }

  override def onStop(): Unit = {}
}

  1. 使用自定义的数据源采集数据

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
val ssc = new StreamingContext(sparkConf,Seconds(5))

val lineStream = ssc.receiverStream(new CustomerReceiver("node01",9999))

val wordStream = lineStream.flatMap(_.split(" "))
val wordAndOneStream = wordStream.map((_,1))
val wordAndCountStream = wordAndOneStream.reduceByKey(_+_)
wordAndCountStream.print()

ssc.start()
ssc.awaitTermination()

相关文章:

Spark-Streaming核心编程

以下是今天所学的知识点与代码测试&#xff1a; Spark-Streaming DStream实操 案例一&#xff1a;WordCount案例 需求&#xff1a;使用 netcat 工具向 9999 端口不断的发送数据&#xff0c;通过 SparkStreaming 读取端口数据并统计不同单词出现的次数 实验步骤&#xff1a;…...

深度剖析神经网络:从基础原理到面试要点(二)

引言 在人工智能蓬勃发展的今天&#xff0c;神经网络作为其核心技术之一&#xff0c;广泛应用于图像识别、自然语言处理、语音识别等众多领域。深入理解神经网络的数学模型和结构&#xff0c;对于掌握人工智能技术至关重要。本文将对神经网络的关键知识点进行详细解析&#xf…...

c#操作excel

说明 vs2022开发&#xff0c;调用excel 代码 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; using Sy…...

MQTTX + MCP:MQTT 客户端秒变物联网 Agent

引言&#xff1a;MQTTX 与 MCP 的融合 作为最受欢迎的 MQTT 客户端工具&#xff0c;MQTTX 在 1.12.0 beta 版本中集成了模型上下文协议&#xff08;MCP&#xff09;到 Copilot AI 功能中&#xff0c;显著提升了服务能力。这一融合让 MQTTX 转变为 MCP Host&#xff08;也就是发…...

GSAP 动画引擎实战:打造丝滑动效交互组件库

目录 一、前言二、项目初始化三、核心动效组件实战1. 元素淡入组件&#xff1a;FadeIn.vue2. 列表级联动画&#xff1a;SlideList.vue3. 滚动触发 Reveal 动画&#xff1a;ScrollReveal.vue4. 拖拽盒子组件&#xff1a;DraggableBox.vue5. 打字机效果组件&#xff1a;Typewrite…...

[OpenGL] Lambertian材质漫反射BRDF方程的解释与推导

一、简介 本文简单的介绍了 Physical Based Rendering, PBR 中的 Lambertian 材质漫反射BRDF公式 f r l a m b e r t i a n c d i f f π fr_{lambertian}\frac{c_{diff}}{\pi} frlambertian​πcdiff​​的推导。 二、漫反射项 根据 渲染方程&#xff1a; L o ( v ) ∫ …...

网易云音乐如何修改缓存地址到D盘

你可以通过创建 符号链接&#xff08;Symbolic Link&#xff09; 将网易云音乐的缓存目录转移到D盘&#xff0c;无需修改软件设置。以下是具体步骤&#xff1a; 操作步骤 关闭网易云音乐 确保程序完全退出&#xff08;任务栏右下角无残留进程&#xff09;。 备份并移动原缓存文…...

react使用01

React.cloneElement(element,props,…children) 这个是React的官方API&#xff0c;&#xff0c;主要用于克隆并修改React元素&#xff0c;&#xff0c; 本质&#xff1a; 复制一个已有的React元素&#xff0c;并允许你修改他的props element : 必须是一个有效的element元素p…...

yooAsset打包后材质丢失

以安卓为目标平台打出的AssetBundle包&#xff08;尤其是YooAsset打出的&#xff09;&#xff0c;在Window下Unity编辑器以HostPlayMode运行&#xff0c;有时显示会丢失部分材质。 这是因为安卓目标的AssetBundle包适合OpenglES&#xff0c;而window下Unity编辑器模式是Dx11&a…...

Codeforces Round 1019 (Div. 2)

A. Common Multiple 找不同的数字 #include<iostream> #include<vector> #include<algorithm> using namespace std; int main() {int t; cin >> t;while (t--) {int n;cin >> n;vector<int> a(n);for (int i 0; i < n; i)cin >&…...

【Spring Boot】MyBatis多表查询的操作:注解和XML实现SQL语句

1.准备工作 1.1创建数据库 &#xff08;1&#xff09;创建数据库&#xff1a; CREATE DATABASE mybatis_test DEFAULT CHARACTER SET utf8mb4;&#xff08;2&#xff09;使用数据库 -- 使⽤数据数据 USE mybatis_test;1.2 创建用户表和实体类 创建用户表 -- 创建表[⽤⼾表…...

Docker离线安装与配置指南

Docker离线安装与配置指南 离线安装步骤 1. 下载离线安装包 官方下载地址&#xff1a; https://download.docker.com/linux/static/stable/x86_64/注意&#xff1a;国内用户若无法访问&#xff0c;可能需要使用科学上网工具。本文档以Docker 20.10.23版本为例。 2. 安装与部…...

N8N 官方 MCP 节点实战指南:AI 驱动下的多工具协同应用场景全解析

在低代码自动化领域&#xff0c;N8N 凭借其强大的节点扩展能力和灵活的工作流编排&#xff0c;成为企业构建复杂自动化流程的首选工具。随着 AI Agent 技术的兴起&#xff0c;通过 MCP&#xff08;Multi-Tool Coordination Protocol&#xff09;实现 AI 与外部工具的协同调用&a…...

v-html 显示富文本内容

返回数据格式&#xff1a; 只有图片名称 显示不出完整路径 解决方法&#xff1a;在接收数据后手动给img格式的拼接vite.config中的服务器地址 页面&#xff1a; <el-button click"">获取信息<el-button><!-- 弹出层 --> <el-dialog v-model&…...

UWB与GPS技术融合的室内外无缝定位方案

‌ 一、技术原理与互补性‌ ‌双模定位机制‌ ‌室外场景‌&#xff1a;GPS/北斗提供‌10-30厘米级定位精度‌&#xff08;RTK技术辅助&#xff09;&#xff0c;覆盖露天区域。‌室内场景‌&#xff1a;UWB通过‌TOF/TDOA算法‌实现‌10-50厘米级定位精度‌&#xff0c;穿透金…...

AiEditor v1.3.8 发布

2025 年 4 月 22 日&#xff0c;AI 富文本编辑器 AiEditor 发布了 v1.3.8 版本。 AiEditor 是一个面向 AI 的下一代富文本编辑器&#xff0c;基于 Web Component 开发&#xff0c;支持 Layui、Vue、React、Angular 等几乎任何前端框架&#xff0c;适配 PC Web 端和手机端&#…...

从零学会epoll的使用和原理

从零学会epoll的使用和原理 第一步&#xff1a;理解 select / poll 的缺陷 一、select 和 poll 是什么&#xff1f; 它们是 Linux 提供的 I/O 多路复用机制&#xff0c;可以让我们同时监听多个文件描述符&#xff08;fd&#xff09;&#xff0c;比如 socket&#xff0c;来等…...

XHTMLConverter把docx转换html报java.lang.NullPointerException异常

一.报错 1.报错信息 org.apache.poi.xwpf.converter.core.XWPFConverterException: java.lang.NullPointerExceptionat org.apache.poi.xwpf.converter.xhtml.XHTMLConverter.convert(XHTMLConverter.java:77)at org.apache.poi.xwpf.converter.xhtml.XHTMLConverter.doConve…...

教育科技质检的三重挑战 质检LIMS系统在教育技术研发的应用

在教育技术研发领域&#xff0c;实验室作为产品验证的核心环节&#xff0c;其质检效率与数据安全性直接关乎企业的创新竞争力。LIMS&#xff08;实验室信息管理系统&#xff09;作为贯穿检测全流程的数字化中枢&#xff0c;正在成为教育科技企业的"质量守护者"。本文…...

MySQL最左前缀原则深度解析:优化索引设计的核心法则

一、什么是最左前缀原则&#xff1f; 最左前缀原则&#xff08;Leftmost Prefix Principle&#xff09; 指在使用复合索引&#xff08;Composite Index&#xff09;时&#xff0c;MySQL会按照索引定义的列顺序&#xff0c;从左到右匹配查询条件。只有连续且从最左侧开始的列组…...

多模态大语言模型arxiv论文略读(三十五)

On the Out-Of-Distribution Generalization of Multimodal Large Language Models ➡️ 论文标题&#xff1a;On the Out-Of-Distribution Generalization of Multimodal Large Language Models ➡️ 论文作者&#xff1a;Xingxuan Zhang, Jiansheng Li, Wenjing Chu, Junjia…...

Linux 安装pm2并全局可用

前言 本文基于&#xff1a;操作系统 CentOS Stream 8 使用工具&#xff1a;Xshell8、Xftp8 服务器基础环境&#xff1a; node - 请查看 Linux安装node并全局可用 所需服务器基础环境&#xff0c;请根据提示进行下载、安装。 1.安装依赖 npm install pm2 -g2.配置全局软链…...

39.剖析无处不在的数据结构

数据结构是计算机中组织和存储数据的特定方式&#xff0c;它的目的是方便且高效地对数据进行访问和修改。数据结构表述了数据之间的关系&#xff0c;以及操作数据的一系列方法。数据又是程序的基本单元&#xff0c;因此无论是哪种语言、哪种领域&#xff0c;都离不开数据结构&a…...

基于 Vue 的Tiptap 富文本编辑器使用指南

目录 &#x1f9f0; 技术栈 &#x1f4e6; 所需依赖 &#x1f4c1; 文件结构 &#x1f9f1; 编辑器组件实现&#xff08;components/Editor.vue&#xff09; ✨ 常用操作指令 &#x1f9e0; 小贴士 &#x1f9e9; Tiptap 扩展功能使用说明&#xff08;含快捷键与命令&am…...

【音视频】AAC-ADTS分析

AAC-ADTS 格式分析 AAC⾳频格式&#xff1a;Advanced Audio Coding(⾼级⾳频解码)&#xff0c;是⼀种由MPEG-4标准定义的有损⾳频压缩格式&#xff0c;由Fraunhofer发展&#xff0c;Dolby, Sony和AT&T是主 要的贡献者。 ADIF&#xff1a;Audio Data Interchange Format ⾳…...

vue中将elementUI和echarts转成pdf文件

若要将包含 ElementUI 组件数据和多个 ECharts 图表的数据转换为 PDF 文档&#xff0c;可结合 html2canvas、jspdf 以及 dom-to-image 来实现。其中&#xff0c;html2canvas 和 dom-to-image 可将 ECharts 图表转换为图片&#xff0c;jspdf 则用于生成 PDF 文档。对于 ElementU…...

基于 Electron、Vue3 和 TypeScript 的辅助创作工具全链路开发方案:涵盖画布系统到数据持久化的完整实现

基于 Electron、Vue3 和 TypeScript 的辅助创作工具全链路开发方案&#xff1a;涵盖画布系统到数据持久化的完整实现 引言 在数字内容创作领域&#xff0c;高效的辅助工具是连接创意与实现的关键桥梁。创作者需要一款集可视化画布、节点关系管理、数据持久化于一体的专业工具&…...

本地部署DeepSeek-R1模型接入PyCharm

以下是DeepSeek-R1本地部署及接入PyCharm的详细步骤指南,整合了视频内容及官方文档核心要点: 一、本地部署DeepSeek-R1模型 1. 安装Ollama框架 ​下载安装包 访问Ollama官网(https://ollama.com/download)或通过视频提供的百度云盘链接下载对应系统的安装包。Windows用户…...

基于LightGBM-TPE算法对交通事故严重程度的分析与可视化

基于LightGBM-TPE算法对交通事故严重程度的分析与可视化 原文&#xff1a; Analysis and visualization of accidents severity based on LightGBM-TPE 1. 引言部分 文章开篇强调了道路交通事故作为意外死亡的主要原因&#xff0c;引起了多学科领域的关注。分析事故严重性特…...

音视频小白系统入门课-3

本系列笔记为博主学习李超老师课程的课堂笔记&#xff0c;仅供参阅 往期课程笔记传送门&#xff1a; 音视频小白系统入门笔记-0音视频小白系统入门笔记-1音视频小白系统入门笔记-2 视频&#xff1a; 由一组图像组成&#xff1a;像素、分辨率、RGB 8888(24位) 、RGBA(32位)为…...

考研系列-计算机网络-第五章、传输层

一、传输层提供的服务 1.重点知识...

将Ubuntu系统中已有的Python环境迁移到Anaconda的虚拟环境中

需求&#xff1a;关于如何将Ubuntu系统中已有的Python环境迁移到Anaconda的虚拟环境test2里&#xff0c;而且他们提到用requirements.txt 安装一直报错&#xff0c;所以想尝试直接拷贝的方法。 可以尝试通过直接拷贝移植的方式迁移Python环境到Anaconda虚拟环境&#xff0c;但…...

AI 数字短视频数字人源码开发:多维赋能短视频生态革新​

在短视频行业深度发展的进程中&#xff0c;AI 数字短视频数字人源码开发凭借其独特的技术优势&#xff0c;从多个维度为行业生态带来了革命性的变化&#xff0c;重塑短视频创作、传播与应用的格局。 数据驱动&#xff0c;实现内容精准化创作 AI 数字短视频数字人源码开发能够深…...

ffmpeg 硬解码相关知识

一&#xff1a;FFMPEG 支持的硬解方式&#xff1a;如下都是了解知识 DXVA2 - windows DXVA2 硬件加速技术解析‌ ‌一、核心特性与适用场景‌ ‌技术定义‌&#xff1a;DXVA2&#xff08;DirectX Video Acceleration 2&#xff09;是微软推出的基于 DirectX 的硬件加速标准…...

Ubuntu数据连接访问崩溃问题

目录 一、分析问题 1、崩溃问题本地调试gdb调试&#xff1a; 二、解决问题 1. 停止 MySQL 服务 2. 卸载 MySQL 相关包 3. 删除 MySQL 数据目录 4. 清理依赖和缓存 5.重新安装mysql数据库 6.创建程序需要的数据库 三、验证 1、动态库更新了 2、头文件更新了 3、重新…...

边缘计算全透视:架构、应用与未来图景

边缘计算全透视&#xff1a;架构、应用与未来图景 一、产生背景二、本质三、特点&#xff08;一&#xff09;位置靠近数据源&#xff08;二&#xff09;分布式架构&#xff08;三&#xff09;实时性要求高 四、关键技术&#xff08;一&#xff09;硬件技术&#xff08;二&#…...

迅为iTOP-RK3576开发板/核心板6TOPS超强算力NPU适用于ARM PC、边缘计算、个人移动互联网设备及其他多媒体产品

迅为iTOP-3576开发板采用瑞芯微RK3576高性能、低功耗的应用处理芯片&#xff0c;集成了4个Cortex-A72和4个Cortex-A53核心&#xff0c;以及独立的NEON协处理器。它适用于ARM PC、边缘计算、个人移动互联网设备及其他多媒体产品。 支持INT4/INT8/INT16/FP16/BF16/TF32混合运算&a…...

前沿分享|技术雷达202504月刊精华

本期雷达 ###技术部分 7. GraphRAG 试验 在上次关于 检索增强生成&#xff08;RAG&#xff09;的更新中&#xff0c;我们已经介绍了GraphRAG。它最初在微软的文章中被描述为一个两步的流程&#xff1a; &#xff08;1&#xff09;对文档进行分块&#xff0c;并使用基于大语言…...

[创业之路-380]:企业法务 - 企业经营中,企业为什么会虚开増值税发票?哪些是虚开増值税发票的行为?示例?风险?

一、动机与风险 1、企业虚开增值税发票的动机 利益驱动 骗抵税款&#xff1a;通过虚开发票虚增进项税额&#xff0c;减少应纳税额&#xff0c;降低税负。公司套取国家的利益。非法套现&#xff1a;虚构交易开具发票&#xff0c;将资金从公司账户转移至个人账户&#xff0c;用…...

嵌入式:ARM公司发展史与核心技术演进

一、发展历程&#xff1a;从Acorn到全球算力基石 1. ​起源&#xff08;1978-1990&#xff09;​ 1978年​&#xff1a;奥地利物理学家Hermann Hauser与工程师Chris Curry创立剑桥处理器公司&#xff08;CPU Ltd.&#xff09;&#xff0c;后更名为**艾康电脑&#xff08;Acor…...

ubuntu的各种工具配置

1.nfs&#xff1a;虚拟机桥接模式下&#xff0c;开发板和虚拟机保持在同一网段下&#xff0c;开发板不要直连电脑 挂载命令&#xff1a;mount -v -t nfs 192.168.110.154:/home/lhj /mnt -o nolock (1) 安装 NFS 服务器 sudo apt update sudo apt install nfs-kernel-server -y…...

Go 剥离 HTML 标签的三把「瑞士军刀」——从正则到 Bluemonday

1 为什么要「剥皮」&#xff1f; 安全&#xff1a;去掉潜在的 <script onload…> 等恶意标签&#xff0c;防止存储型 XSS。可读性&#xff1a;日志、消息队列、搜索索引里往往只需要纯文本。一致性&#xff1a;不同富文本编辑器生成的 HTML 五花八门&#xff0c;统一成「…...

【Java面试笔记:基础】6.动态代理是基于什么原理?

1. 反射机制 定义&#xff1a;反射是 Java 语言提供的一种基础功能&#xff0c;允许程序在运行时自省&#xff08;introspect&#xff09;&#xff0c;直接操作类或对象。功能&#xff1a; 获取类定义、属性和方法。调用方法或构造对象。运行时修改类定义。 应用场景&#xff…...

docker容器中uv的使用

文章目录 TL;DRuv简介uv管理项目依赖step 1step 2WindowsLinux/Mac step 3依赖包恢复 在Docker容器中使用uv TL;DR 本文记录uv在docker容器中使用注意点, uv简介 uv是用rust编写的一个python包管理器&#xff0c;特点是速度快&#xff0c;且功能强大&#xff0c;目标是替代p…...

分部积分选取u、v的核心是什么?

分部积分选取u、v的核心是什么&#xff1f;是反对幂指三吗&#xff1f; 不全是&#xff0c;其实核心是&#xff1a;v要比u更容易积分&#xff0c;也就是更容易求得原函数&#xff0c;来看一道例题&#xff1a;...

Android Studio调试中的坑二

下载新的Android studio Meerkat后&#xff0c;打开发现始终无法更新对应的SDK&#xff0c;连Android 15的SDK也无法在SDK Manger中显示出来&#xff0c;但是Meerkat必须要使用新版本SDK。 Android studio下载地址 命令行工具 | Android Studio | Android Developers 解决…...

【Redis】缓存三剑客问题实践(上)

本篇对缓存三剑客问题进行介绍和解决方案说明&#xff0c;下篇将进行实践&#xff0c;有需要的同学可以跳转下篇查看实践篇&#xff1a;&#xff08;待发布&#xff09; 缓存三剑客是什么&#xff1f; 缓存三剑客指的是在分布式系统下使用缓存技术最常见的三类典型问题。它们分…...

2025年4月22日(平滑)

在学术和工程语境中&#xff0c;表达“平滑”需根据具体含义选择术语。以下是专业场景下的精准翻译及用法解析&#xff1a; 1. 数学/信号处理中的「平滑」&#xff08;消除噪声&#xff09; Smooth (verb/noun/adjective) “Apply a Gaussian filter to smooth the noisy signa…...

给vue-admin-template菜单栏 sidebar-item 添加消息提示

<el-badge :value"200" :max"99" class"item"><el-button size"small">评论</el-button> </el-badge> <!-- 在 SidebarItem.vue 中 --> <template><div v-if"!item.hidden" class&q…...

C++(初阶)(十二)——stack和queue

十二&#xff0c;stack和queue 十二&#xff0c;stack和queueStackQueuepriority_queue 简单使用模拟实现deque Stack 函数说明stack()构造空栈empty()判断栈是否为空size()返回栈的有效元素个数top()返会栈顶元素的引用push()将所给元素val压入栈中pop()将栈的尾部元素弹出 …...