当前位置: 首页 > news >正文

Spark-Streaming核心编程(四)总结

有状态转化操作 - UpdateStateByKey

功能描述

UpdateStateByKey原语用于在DStream中跨批次维护状态,例如流计算中的累加wordcount

它允许对一个状态变量进行访问和更新,适用于键值对形式的DStream

工作原理

给定一个由(键,事件)对构成的DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数。

构建出一个新的DStream,其内部数据为(键,状态)对。

使用步骤

定义状态:状态可以是一个任意的数据类型。

定义状态更新函数:使用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

配置检查点目录:updateStateByKey需要使用检查点来保存状态。

示例代码

scalaCopy Code

val updateFunc = (values: Seq[Int], state: Option[Int]) => {

val currentCount = values.foldLeft(0)(_ + _)

val previousCount = state.getOrElse(0)

Some(currentCount + previousCount)

}

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("update")

val ssc = new StreamingContext(sparkConf, Seconds(5))

ssc.checkpoint("./ck")

val lines = ssc.socketTextStream("node01", 9999)

val words = lines.flatMap(_.split(" "))

val pairs = words.map((_, 1))

val stateDStream = pairs.updateStateByKeyInt](updateFunc)

stateDStream.print()

ssc.start()

ssc.awaitTermination()

窗口操作 - Window Operations

功能描述

窗口操作允许设置窗口的大小和滑动窗口的间隔,以动态地获取当前Streaming的状态。

参数说明

窗口时长:计算内容的时间范围。

滑动步长:触发计算的间隔。

这两者都必须为采集周期大小的整数倍。

示例代码

scalaCopy Code

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("window")

val ssc = new StreamingContext(sparkConf, Seconds(3))

ssc.checkpoint("./ck")

val lines = ssc.socketTextStream("node01", 9999)

val words = lines.flatMap(_.split(" "))

val pairs = words.map((_, 1))

val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(12), Seconds(6))

wordCounts.print()

ssc.start()

ssc.awaitTermination()

DStream输出操作

输出操作的重要性

输出操作指定了对流数据经转化操作得到的数据所要执行的操作。

RDD中的惰性求值类似,如果没有执行输出操作,DStream将不会被求值。

常见的输出操作

print():在驱动结点上打印DStream中每一批次数据的最开始10个元素,用于开发和调试。

saveAsTextFiles(prefix, [suffix]):以text文件形式存储DStream的内容。

saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式存储数据。

saveAsHadoopFiles(prefix, [suffix]):将数据保存为Hadoop文件。

foreachRDD(func):最通用的输出操作,对DStream中的每个RDD运行任意计算。可以将数据推送到外部系统,如MySQL数据库。

使用注意事项

连接操作不能写在driver层面(序列化问题)。

避免在foreach中对每个RDD中的每条数据都创建连接,效率较低。

可以使用foreachPartition在分区层面创建连接。

相关文章:

Spark-Streaming核心编程(四)总结

有状态转化操作 - UpdateStateByKey ‌功能描述‌ UpdateStateByKey原语用于在DStream中跨批次维护状态,例如流计算中的累加wordcount。 它允许对一个状态变量进行访问和更新,适用于键值对形式的DStream。 ‌工作原理‌ 给定一个由(键,事…...

关系型数据库PostgreSQL for Mac 保姆级使用教程

第一部分:安装PostgreSQL 方法一:使用Postgres.app(最简单) 访问 Postgres.app官网 下载最新版本,将 Postgres.app 移动到 “Applications” 文件夹。 双击Postgres.app打开应用,点击"Initialize&q…...

新增 29 个专业,科技成为关键赛道!

近日,教育部正式发布《普通高等学校本科专业目录(2025年)》,新增 29 个本科专业,包括区域国别学、碳中和科学与工程、海洋科学与技术、健康与医疗保障、智能分子工程、医疗器械与装备工程、时空信息工程、国际邮轮管理…...

云计算市场的重新分类研究

云计算市场传统分类方式,比如按服务类型分为IaaS、PaaS、SaaS,或者按部署模式分为公有云、私有云、混合云。主要提供计算资源、存储和网络等基础设施。 但随着AI大模型的出现,云计算市场可以分为计算云和智算云,智算云主要是AI模…...

大模型时代的具身智能:从虚拟到现实的智能体进化革命

一、具身智能:重新定义 AI 与物理世界的交互范式 (一)概念解析:从 "离身" 到 "具身" 的认知革命 具身智能(Embodied AI)是融合大模型决策能力与物理实体执行能力的新型智能系统&…...

鸿蒙NEXT开发正则工具类(ArkTs)

import { FormatUtil } from ./FormatUtil;/*** 正则工具类* author CSDN-鸿蒙布道师* since 2025/04/27*/ export class RegexUtil {/*** 英文字母、数字和下划线*/static readonly REG_GENERAL "^\\w$";/*** 数字*/static readonly REG_NUMBERS "^\\d$"…...

Flink维表深度解析

一、维表的概念与作用 维表(Dimension Table) 是数据仓库中的核心概念,通常用于存储静态或缓慢变化的业务实体信息(如用户资料、商品信息、地理位置等)。在实时流处理场景中,维表的作用是为主数据流&#…...

基于ArcGIS的洪水灾害普查、风险评估及淹没制图技术研究​

一、洪水普查技术规范解读 1.1 全国水旱灾害风险普查实施方案解读 1.2 洪水风险区划及防治区划编制技术要求解读 1.3 山丘区中小河流洪水淹没图编制技术要求解读 二、ArcGIS介绍及数据管理 2.1 ArcGIS界面及数据加载 2.2 ArcGIS常见数据格式 2.3基于Geodatabase的洪水灾…...

初识数据结构——二叉树从基础概念到实践应用

数据结构专栏 ⬅(click) 初识二叉树:从基础概念到实践应用🌳 一、树型结构基础 1.1 树的基本概念 树是一种非线性的数据结构,由n(n>0)个有限节点组成一个具有层次关系的集合。它看起来像一棵倒挂的树,根朝上而叶朝下。 关键特…...

手搓传染病模型(SEIR)

先看模型 在本模型中,人群有四种自然史状态:易感者(S),暴露者(E),感染者(I)以及康复者(R) 2.模型假设人群分布是同质均匀的,未考虑人群出生、死亡、迁入迁出对疾病传播的影响 3.康复者永久免疫:康复者永久免…...

企业数据赋能 | 应用模板分享:汽车销售仪表板

实时监控销售数据,比较车型、地区业绩~ 今天,小编向大家分享 Tableau 应用分析模板:由 Imran Shaikh 搭建的汽车销售仪表板。借助此仪表板,企业可以实时跟踪销售情况,了解市场趋势,并比较不同车型、地区和销…...

C++?动态内存管理!!!

一、引言 之前我们一起讨论了类和对象的相关知识,接下来我们将继续完善我们的知识体系,为以后继续深入学习C知识添砖加瓦,在本期我们将一起学习C中关于动态内存管理的相关知识,在学习之前将要先回顾C语言中是如何进行动态内存管理…...

MCP协议:AI生态的统一标准

MCP(Model Context Protocol,模型上下文协议)是人工智能领域的革命性协议标准,被广泛类比为“AI世界的USB-C接口”。它通过统一模型、算力和数据的交互方式,解决了AI生态中的碎片化问题,重构了智能协作的技术范式。以下是其核心解析与技术哲学: 一、MCP协议的核心定位与…...

在 UniApp 中实现 App 与 H5 页面的跳转及通信

在移动应用开发中,内嵌 H5 页面或与外部网页交互是常见需求。UniApp 作为跨平台框架,提供了灵活的方式实现 App 与 H5 的跳转和双向通信。本文将详细讲解实现方法,并提供可直接复用的代码示例。 文章目录 一、 App 内嵌 H5 页面(使…...

目标跟踪最新文章阅读列表

AAAI2025 TrackFormer: Multi-Object Tracking with Transformers 论文:https://arxiv.org/abs/2101.02702 代码:https://github.com/timmeinhardt/trackformer AAAI2025 SUTrack 单目标跟踪 论文:https://pan.baidu.com/s/10cR4tQt3lSH5V2RNf7-3gg?pwd=pks2 代码:htt…...

Spark RDD行动算子与共享变量实战:从数据聚合到分布式通信

RDD行动算子: 行动算子就是会触发action的算子,触发action的含义就是真正的计算数据。 1、reduce import org.apache.spark.{SparkConf, SparkContext} object value11 { def main(args: Array[String]): Unit { // 创建 SparkConf 对象并设置应用…...

《2025全球机器学习技术大会:阿里云讲师张玉明深度剖析通义灵码AI程序员》

4 月 18 日 - 19 日,由 CSDN & Boolan 联合举办的 2025 全球机器学习技术大会(ML-Summit)于上海顺利举行。大会聚焦人工智能与机器学习前沿技术,汇聚了来自科技与人工智能领域的数位顶尖专家以及数千名开发者和研究者&#xf…...

python+adafruit_pca9685 测试舵机存储当前角度

测试代码如下: # -*- coding: UTF-8 -*- import time from board import SCL, SDA import busio from adafruit_pca9685 import PCA9685 from adafruit_motor import servo 测试控制1块驱动板或者多块 pip install Adafruit-PCA9685 --break-system-packages pip i…...

视觉/深度学习/机器学习相关面经总结(2)(持续更新)

目录 1、跨模态对齐的方案2、位置编码方式1. **正弦和余弦位置编码(Sinusoidal Positional Encoding)**2. **可学习的位置编码(Learnable Positional Encoding)**3. **相对位置编码(Relative Positional Encoding&…...

缓存并发更新的挑战

缓存并发更新的挑战 1. 引言:并发更新的挑战2. 并发场景下的常见“坑”最后写入胜出 (Last-Write-Wins)脏读 (Dirty Read)丢失更新 (Lost Update)不可重复读 (Non-repeatable Read)幻读 (Phantom Read)写偏斜 (Write Skew)缓存与数据库不一致分布式系统中的时序问题…...

LeetCode题解1297. 子串的最大出现次数

(好久没写题解了,忙着学ai去了) 先来看题目 很显然,题目就是要我们在给定的字符串中找到一个满足要求的连续子串。 首先,要求子串中不同字母的数目得小于等于maxLetters,我们可以用一个DifLettsers函数来…...

零基础小白如何上岸数模国奖

零基础小白如何上岸数模国奖 我自己本人第一次参加数模国赛顺利上岸国奖,当然那段经历也是比较痛苦了,差不多也是从当年四月开始接触数学建模,第一次参加妈妈杯成绩并不理想,后面不断参加数模比赛进行模拟,最后顺利上岸…...

Redux-Saga vs Redux-Thunk

Redux-Saga与Redux-Thunk对比 #mermaid-svg-zbDYIbzoVqlMJXiE {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-zbDYIbzoVqlMJXiE .error-icon{fill:#552222;}#mermaid-svg-zbDYIbzoVqlMJXiE .error-text{fill:#55222…...

Win11 配置 Git 绑定 Github 账号的方法与问题汇总

目录 一、创建 Github 项目库(远程仓库)二、配置安装好的 Git1. 设置用户信息2. 查看已配置的信息3. 建立本地仓库4. Git 常用命令速查表 三、配置 SSH 公钥1. 生成 ssh key出现的问题 2. 启动 ssh-agent 并添加秘钥3. 在 Github 上绑定 ssh 公钥出现的问…...

李臻20242817_安全文件传输系统项目报告_第9周

安全文件传输系统项目报告(第 9 周) 1. 代码链接 Gitee 仓库地址:https://gitee.com/li-zhen1215/homework/tree/master/Secure-file 代码结构说明: project-root/├── src/ # 源代码目录│ ├── main.c # 主程序入口│ ├…...

OceanBase TPCC测试常见报错汇总

OceanBase TPCC测试常见报错汇总 报错1:加载测试数据时创建tablegroup失败报错2:加载测试数据时执行超时报错3:加载测试数据时funcs.sh函数找不到报错4:加载数据时报错超过租户内存上限办法一:增加租户内存办法二:调高转储线程数办法三:调整MemStore内存占比和冻结触发阈…...

文心一言开发指南06——千帆大模型平台新手指南

版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl 千帆大模型平台为新手用户提供了一个全面的入门指南,以便用户能够快速熟悉平台的操作和功能。千帆大模型平台通过提供详细的新手指南,确保用户能够顺…...

解决SSLError: [SSL: DECRYPTION_FAILED_OR_BAD_RECORD_MAC] decryption faile的问题

问题描述: 在pip安装第三方库时,出现SSL的问题。 传输层安全性协议(TLS)及其前身安全套接层(SSL)是现在的 HTTPS 协议中的一种安全协议,目的是为互联网通信提供安全及数据完整性保障。而较新版…...

Clang中ext_vector_type和address_space __attribute__的使用

Clang中ext_vector_type和address_space __attribute__的使用 一.背景二. 关键概念详解三.示例代码与验证四. 总结 一.背景 在使用 Clang 编译可执行程序时,遇到类似下面的链接错误: undefined reference to atomic_add(int volatile AS4*, int) undef…...

echarts自定义图表

普通柱状图 自定义柱状图 实现思路 4个图层 背景 边框实体 内部透明地步透明间隙红色柱形数据数据使用自定义字体倾斜柱形上方扁平矩形矩形颜色透明 label字体颜色和背景色相同实现矩形块 上下靠padding实现 向下 paddingBottom使用负数 完整代码 <!DOCTYPE html> <…...

【应用密码学】实验二 分组密码(2)

一、实验要求与目的 1&#xff09; 学习AES密码算法原理 2&#xff09; 学习AES密码算法编程实现 二、实验内容与步骤记录&#xff08;只记录关键步骤与结果&#xff0c;可截图&#xff0c;但注意排版与图片大小&#xff09; 字符串加解密 运行python程序&#xff0c;输入…...

【深度学习】多头注意力机制的实现|pytorch

博主简介&#xff1a;努力学习的22级计算机科学与技术本科生一枚&#x1f338;博主主页&#xff1a; Yaoyao2024往期回顾&#xff1a;【深度学习】注意力机制| 基于“上下文”进行编码,用更聪明的矩阵乘法替代笨重的全连接每日一言&#x1f33c;: 路漫漫其修远兮&#xff0c;吾…...

OceanBase数据库磁盘空间管理

OceanBase数据库磁盘空间管理 日志盘空间管理日志盘容量参数日志盘空间满应急处理 数据盘空间管理数据盘容量参数数据文件自动扩展数据盘空间满应急处理表占用的磁盘空间 日志盘空间管理 日志盘容量参数 &#x1f42f; 与日志盘redo_dir相关的四个重要参数&#xff1a; log_…...

自然语言处理之机器翻译:Statistical Machine Translation(SMT)的评估方法解析与创新实践

## 机器翻译与评估的重要性 机器翻译(Machine Translation, MT)作为自然语言处理(NLP)的核心任务之一,旨在通过计算机实现跨语言的信息传递。随着全球化进程加速,机器翻译在商业、科研、社交等领域的应用愈发广泛。然而,翻译质量直接决定了其实际价值,因此**翻译质量…...

数据集下载(AER 和causaldata R包)

1.AER #install.packages("AER") library(AER)# 引用R包 citation("AER") # 参考文献&#xff1a;Kleiber, Christian, and Achim Zeileis. Applied econometrics with R. Springer Science & Business Media, 2008.# 查看有哪些数据集 data(package …...

【Linux系统】详解Linux权限

文章目录 前言一、学习Linux权限的铺垫知识1.Linux的文件分类2.Linux的用户2.1 Linux下用户分类2.2 创建普通用户2.3 切换用户2.4 sudo&#xff08;提升权限的指令&#xff09; 二、Linux权限的概念以及修改方法1.权限的概念2.文件访问权限 和 访问者身份的相关修改&#xff08…...

Go语言--语法基础4--基本数据类型--字符串类型

在 Go 语言中&#xff0c;字符串也是一种基本类型。相比之下&#xff0c; C/C 语言中并不存在原 生的字符串类型&#xff0c; 通常使用字符数组来表示&#xff0c;并以字符指针来传递。 Go 语言中字符串的声明和初始化非常简单&#xff0c;举例如下&#xff1a; var str st…...

分布式GPU上计算长向量模的方法

分布式GPU上计算长向量模的方法 当向量分布在多个GPU卡上时&#xff0c;计算向量模(2-范数)需要以下步骤&#xff1a; 在每个GPU上计算本地数据的平方和跨GPU通信汇总所有平方和在根GPU上计算总和的平方根 实现方法 下面是一个完整的CUDA示例代码&#xff0c;使用NCCL进行多…...

项目班——0422——日志

...

【音视频】音频编码实战

FFmpeg流程 从本地⽂件读取PCM数据进⾏AAC格式编码&#xff0c;然后将编码后的AAC数据存储到本地⽂件。 示例的流程如下所示。 关键函数说明&#xff1a; avcodec_find_encoder&#xff1a;根据指定的AVCodecID查找注册的编码器。avcodec_alloc_context3&#xff1a;为AVCod…...

Git Bash 下使用 SSH 连接出现 “Software caused connection abort” 问题

目录 一、检查网络环境和防火墙设置&#xff08;失败&#xff09;二、尝试使用 GitHub 的备用 SSH 端口 443&#xff08;成功&#xff09;三、检查 SSH Key 是否被正确加载四、检查是否多个 SSH 进程干扰或者服务异常五、使用 HTTPS 方式临时解决&#xff08;非 SSH&#xff09…...

K8S Pod 常见数据存储方案

假设有如下三个节点的 K8S 集群&#xff1a; k8s31master 是控制节点 k8s31node1、k8s31node2 是工作节点 容器运行时是 containerd 一、理论介绍 1.1、Volumes 卷 Kubernetes 的卷是 pod 的⼀个组成部分&#xff0c;因此像容器⼀样在 pod 的规范&#xff08;pod.spec&#x…...

JavaScript 模板字符串:更优雅的字符串处理方式

什么是模板字符串&#xff1f; 模板字符串&#xff08;Template Literals&#xff09;是 ES6&#xff08;ES2015&#xff09;引入的一种新的字符串表示方式&#xff0c;它提供了更强大、更灵活的字符串拼接功能。与传统的字符串使用单引号&#xff08;&#xff09;或双引号&am…...

DeepSeek智能时空数据分析(五):基于区域人口数量绘制地图散点-大模型搜集数据NL2SQL加工数据

序言&#xff1a;时空数据分析很有用&#xff0c;但是GIS/时空数据库技术门槛太高 时空数据分析在优化业务运营中至关重要&#xff0c;然而&#xff0c;三大挑战仍制约其发展&#xff1a;技术门槛高&#xff0c;需融合GIS理论、SQL开发与时空数据库等多领域知识&#xff1b;空…...

PostSwigger 的 CSRF 漏洞总结

本文所提供的关于 web 安全的相关信息、技术讲解及案例分析等内容&#xff0c;仅用于知识分享与学术交流目的&#xff0c;旨在提升读者对 web 安全领域的认知与理解。以下仅仅是作者对 PostSwigger Web 安全的知识整理和分享&#xff0c;严禁任何非法犯罪活动。 限制 CSRF 的三…...

vue项目页面适配

vue项目页面适配 目的&#xff1a;结合动态设置根字体大小的脚本&#xff08;如通过 JavaScript 监听屏幕尺寸变化&#xff09;&#xff0c;实现页面元素在不同设备上的自适应缩放 1、安装postcss-pxtorem ### 若项目未集成 postcss&#xff0c;需同步安装&#xff1a; npm …...

AI-Browser适用于 ChatGPT、Gemini、Claude、DeepSeek、Grok的客户端开源应用程序,集成了 Monaco 编辑器。

一、软件介绍 文末提供程序和源码下载学习 AI-Browser适用于 ChatGPT、Gemini、Claude、DeepSeek、Grok、Felo、Cody、JENOVA、Phind、Perplexity、Genspark 和 Google AI Studio 的客户端应用程序&#xff0c;集成了 Monaco 编辑器。使用 Electron 构建的强大桌面应用程序&a…...

Flutter Dart新特性NulI safety late 关键字、空类型声明符?、非空断言!、required 关键字

目录 late 关键字 required关键词: 常用的Model对象使用required Null safety翻译成中文的意思是空安全 null safety 可以帮助开发者避免一些日常开发中很难被发现的错误&#xff0c;并且额外的好处是可以改善性能后的版本都要求使用nul1 safety。Flutter2.2.0(2021年5月19日…...

CF2096G Wonderful Guessing Game 构造

题解 首先考虑没有 ? ? ? 回答的时候&#xff0c;答案是多少。 猜猜需要多少个询问。 ⌈ log ⁡ 2 n ⌉ ? ⌈ log ⁡ 3 n ⌉ ? \lceil \log_2n\rceil ? \lceil \log_3n\rceil ? ⌈log2​n⌉?⌈log3​n⌉? 可以构造一个表&#xff0c;行表示不同的询问&#xff0c;…...

制作一款打飞机游戏26:精灵编辑器

虽然我们基本上已经重建了Axel编辑器&#xff0c;但我不想直接使用它。我想创建一个真正适合我们当前目的的编辑器&#xff0c;那就是编辑精灵&#xff08;sprites&#xff09;。这将是今天的一个大目标——创建一个基于模板的编辑器&#xff0c;用它作为我们实际编辑器的起点。…...