【大数据学习 | Spark-SQL】定义UDF和DUAF,UDTF函数
1. UDF函数(用户自定义函数)
一般指的是用户自己定义的单行函数。一进一出,函数接受的是一行中的一个或者多个字段值,返回一个值。比如MySQL中的,日期相关的dateDiff函数,字符串相关的substring函数。
先准备数据:
1.1 导入必要的包
首先,确保导入必要的Spark包:
import org.apache.spark.sql.SparkSession
1.2 创建SparkSession
创建一个SparkSession对象,这是与Spark交互的入口。
1.3 定义UDF并注册到SparkSQL
定义一个Scala函数,并将其注册为UDF。示例
1.4 使用UDF在SQL查询中:
调用udf的register方法,第一个参数是udf函数的函数名,第二个参数是要注册为UDF的函数。
session.udf.register("all_income",(sal:Int,bonus:Int)=>{sal*12 + bonus})
1.5 代码:
尽量使用SparkSQL的sql形式的写法,api写法太麻烦了。
object TestUDF{def main(args: Array[String]): Unit = {val session = SparkSession.builder().master("local[*]").appName("testUDF").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt").map(t => {val strs = t.split(" ")(strs(0), strs(1), strs(2).toInt, strs(3).toInt)}).toDF("id", "name", "salary", "bonus")session.udf.register("all_income",(sal:Int,bonus:Int)=>{sal*12 + bonus})import org.apache.spark.sql.functions
// df.withColumn("all",functions.callUDF("all_income",$"salary",$"bonus"))
// .select("id","name","all")
// .show()df.createTempView("salary")session.sql("""|select id,name,all_income(salary,bonus) all from salary|""".stripMargin).show()}
}
输出:
2. UDAF(用户自定义的聚合函数)
指的是用户自定义的聚合函数,多进一出,比如MySQL中的,count函数,avg函数。
以学生信息为主进行统计,所有人员的年龄的总和
或者每个性别的年龄的平均值
计算所有人的年龄之和:
package com.atguigu.bigdata.testimport org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator/*** ClassName : TestUDAF* Package : com.atguigu.bigdata.test* Description** @Author HeXua* @Create 2024/11/29 19:09* Version 1.0*/
object TestUDAF {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("test udaf").master("local[*]").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt").map(t => {val strs = t.split(" ")(strs(0), strs(1), strs(2).toInt, strs(3))}).toDF("id", "name", "age", "gender")import org.apache.spark.sql.functions._// 注册udaf函数session.udf.register("mysum",udaf(new MySum))df.createTempView("student")session.sql("""|select mysum(age) from student|""".stripMargin).show()}
}
// udaf的类继承Aggregator抽象类
class MySum extends Aggregator[Int,Int,Int]{//初始化def zero: Int = 0//聚合逻辑def reduce(b: Int, a: Int): Int = a+b//整体聚合def merge(b1: Int, b2: Int): Int = b1+b2//最终返回值def finish(reduction: Int): Int = reduction//累加值的类型def bufferEncoder: Encoder[Int] = Encoders.scalaInt//输出结果的类型def outputEncoder: Encoder[Int] = Encoders.scalaInt
}
定义用户自定义聚合函数时,继承Aggregator类需要指定三个泛型参数。这三个泛型参数分别代表不同的概念。
泛型参数解释:
1. 输入类型(IN)
这是聚合函数的输入类型,即每次调用reduce方法时传入的单个元素的类型。例如你要计算一组整数的平均值,输入类型就是int。
2. 缓冲区类型(BUFFER)
这是聚合函数的中间状态类型,也称为缓冲区类型。
例如你要计算一组整数的平均值,缓冲区可能包含两个字段:总和和计数,因为iBUF可能是一个元组。
3. 输出类型(OUT)
这是聚合函数的最终输出类型,即finish方法返回的类型。例如你要计算平均值,最终输出类型是Double。
方法解释:
zero:初始化缓冲区的值,对于平均值计算,初始化和计数都是0。
reduce:更新缓冲区,每次传入一个新的输入值时,更新总和和计数。
finish:计算最终结果,根据缓冲区中的总和和计数,计算平均值。
bufferEncoder:定义缓冲区类型的编码器,用于序列化和反序列化缓冲区。
outputEncoder:定义最终输出类型的编码器,用于序列化和反序列化输出结果。
计算每个性别的年龄的平均值:
case class AggragateVo(var cnt:Int,var sum:Int)
object MyAvg extends Aggregator[Int,AggragateVo,Double]{override def zero: AggragateVo = AggragateVo(0,0)override def reduce(b: AggragateVo, a: Int): AggragateVo = {b.cnt += 1b.sum += ab}override def merge(b1: AggragateVo, b2: AggragateVo): AggragateVo = {b1.cnt += b2.cntb1.sum += b2.sumb1}override def finish(reduction: AggragateVo): Double = {reduction.sum.toDouble /reduction.cnt}override def bufferEncoder: Encoder[AggragateVo] = Encoders.productoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
3. UDTF(用户自定义炸裂函数)
拆分函数,进入的是一行内容出现的结果是多行内容。
spark中并不直接支持UDTF函数。但可以使用hive中的炸裂函数达到效果。
import org.apache.spark.sql.SparkSessionobject TestUDTF {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("test udtf").master("local[*]").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("file:///headless/workspace/spark/data/m.txt").map(t => {val strs = t.split(",")(strs(0), strs(1), strs(2))}).toDF("id", "name", "actors")//explode map arraydf.createTempView("movies")session.sql("""|select id,name,actor from movies lateral view explode(split(actors,'\\|')) t as actor|""".stripMargin).createTempView("movies1")session.sql("""|select count(1),actor from movies1 group by actor|""".stripMargin).show()}
}
相关文章:
【大数据学习 | Spark-SQL】定义UDF和DUAF,UDTF函数
1. UDF函数(用户自定义函数) 一般指的是用户自己定义的单行函数。一进一出,函数接受的是一行中的一个或者多个字段值,返回一个值。比如MySQL中的,日期相关的dateDiff函数,字符串相关的substring函数。 先…...
Springboot集成通义大模型
1.先到阿里云平台开头阿里云白炼账号,创建apiKey 2. 引入maven依赖 <dependency><groupId>com.alibaba</groupId><artifactId>dashscope-sdk-java</artifactId><version>2.8.3</version></dependency><!-- htt…...
Xilinx PCIe高速接口入门实战(一)
引言:本文对Xilinx 7 Series Intergrated Block for PCI Express PCIe硬核IP进行简要介绍,主要包括7系列FPGA PCIe硬核资源支持、三IP硬核差异、PCIe硬核资源利用等相关内容。 1. 概述 1.1 7系列FPGA PCIe硬件资源支持 7系列FPGA对PCIe接口最大支持如…...
区块链游戏的新观察:自治世界能否成为未来链游的突破口?
区块链游戏(链游)作为加密领域的创新方向,一直被寄予厚望。然而,尽管各类链游层出不穷,大多只是靠代币激励一时爆火,缺乏持久吸引力。这种现象让人对链游未来的发展充满疑虑:是否有一种全新的设…...
Linq中的投影运算 (C#):Select、SelectMany、Zip
投影是指将对象转换为一种新形式的操作,该形式通常只包含那些将随后使用的属性。 通过使用投影,您可以构造从每个对象生成的新类型。 可以投影属性,并对该属性执行数学函数。 还可以在不更改原始对象的情况下投影该对象。 1、Select 下面的…...
GPU 服务器厂家:怎样铸就卓越 AI 算力?
文章来源于百家号:GPU服务器厂家 今天咱来聊聊 GPU 服务器厂家那些事儿,而这其中衡量 AI 算力的因素可是关键所在哦。 先讲讲计算速度这一块。咱都知道 AI 那复杂的活儿,像训练超厉害的图像识别模型,得处理海量图像数据&#x…...
数据结构与算法——N叉树(自学笔记)
本文参考 N 叉树 - LeetBook - 力扣(LeetCode)全球极客挚爱的技术成长平台 遍历 前序遍历:A->B->C->E->F->D->G后序遍历:B->E->F->C->G->D->A层序遍历:A->B->C->D->…...
浏览器的数据六种存储方法比较 :LocalStorage vs. IndexedDB vs. Cookies vs. OPFS vs. WASM-SQLite
在构建该 Web 应用程序,并且希望将数据存储在用户浏览器中。也许您只需要存储一些小标志,或者甚至需要一个成熟的数据库。 我们构建的 Web 应用程序类型发生了显着变化。在网络发展的早期,我们提供静态 html 文件。然后我们提供动态渲染的 h…...
Rust个人认为将抢占C和C++市场,逐渐成为主流的开发语言
本人使用C开发8年、C#开发15年、中间使用JAVA开发过项目、后期在学习过程中发现了Rust语言说它是最安全的语言,能够解决C、C的痛点、于是抽出一部分时间网上买书,看网上资料进行学习,这一学习起来发现和其它语言比较起来,在编码的…...
electron-updater软件自动检测更新 +无服务器本地测试
大家好,我是小黄。 今天分享一下如何0基础实现electron自动检测更新功能。 一. 安装 electron-updater 实现自动更新 安装依赖 electron-updater npm install electron-updater 二. 修改package.josn "publish": {"provider": "generi…...
Flink在Linux系统上的安装与入门
一、Flink的引入 这几年大数据的飞速发展,出现了很多热门的开源社区,其中著名的有Hadoop、Storm,以及后来的Spark,他们都有着各自专注的应用场景。Spark 掀开了内存计算的先河,也以内存为赌注,赢得了内存计…...
鸿蒙面试---都用过哪些装饰器
必答的 State装饰器:组件内状态 State装饰的变量,或称为状态变量,一旦变量拥有了状态属性,就可以触发其直接绑定UI组件的刷新。当状态改变时,UI会发生对应的渲染改变。Prop装饰器:父子单向同步Prop装饰的变…...
微信小游戏/抖音小游戏SDK接入踩坑记录
文章目录 前言问题记录1、用是否存在 wx 这个 API 来判断是微小平台还是抖小平台不生效2、微小支付的参数如何获取?3、iOS 平台不支持虚拟支付怎么办?微小 iOS 端支付时序图:抖小 iOS 端支付:4、展示广告时多次回调 onClose5、在使用单例时 this 引起的 bug6、使用 fetch 或…...
uniapp配置全局消息提醒
1.H5使用根标签插入dom的方式实现。 2.app端使用plus.nativeObj.View的方式绘制实现 H5端app端 H5端 创建组件orderAlert.vue <template><div class"view"><div class"content" v-if"visible"><div class"message&q…...
Docker学习
🎉Docker 简介和安装 Docker 是什么 Docker 是一个应用打包、分发、部署的工具 你也可以把它理解为一个轻量的虚拟机,它只虚拟你软件需要的运行环境,多余的一点都不要, 而普通虚拟机则是一个完整而庞大的系统,包含各…...
【Electron学习笔记(三)】Electron的主进程和渲染进程
Electron的主进程和渲染进程 Electron的主进程和渲染进程前言正文1、主进程2、渲染进程3、Preload 脚本3.1 在项目目录下创建 preload.js 文件3.2 在 main.js 文件下创建路径变量并将 preload.js 定义为桥梁3.3 在 preload.js 文件下使用 electron 提供的contextBridge 模块3.4…...
人工智能的微积分基础
目录 编辑 引言 微积分的基本概念 1. 导数 2. 积分 3. 微分方程 微积分在人工智能中的应用 1. 机器学习中的优化 2. 反向传播算法 3. 概率与统计 4. 控制理论 5. 自然语言处理中的梯度 6. 计算机视觉中的积分 7. 优化算法中的微积分 8. 微分几何在深度学习中的…...
关于BeanUtils.copyProperties是否能正常复制字段【详细版】
话不多说!先总结: 1、字段相同,类型不同(不复制,也不报错) 2、子类父类 (1)子类传给父类(可以正常复制) (2)父类传给子类(可以正常复制) 3、子类父类&#x…...
oracle将select作为字段查询
在Oracle中,如果你想将一个SELECT语句作为字段的值,你可以使用子查询或者使用WITH子句(也称为公用表表达式CTE)。以下是两种方法的示例: 方法1:使用子查询 语法如下: SELECTcolumn1,(SELECT …...
FFmpeg 简介与编译
1. ffmpeg 简介: FFmpeg是一套可以用来记录、转换数字音频、视频,并能将其转化为流的开源计算机程序。采用LGPL或GPL许可证。它提供了录制、转换以及流化音视频的完整解决方案。它包含了非常先进的音频/视频编解码库libavcodec,为了保证高可移…...
qt QLinearGradient详解
1、概述 QLinearGradient是Qt框架中QGradient的一个子类,用于创建线性渐变效果。线性渐变是一种颜色沿着一条直线平滑过渡到另一种颜色的效果。QLinearGradient允许你定义渐变的起点和终点,以及在这些点之间的颜色变化。你可以使用它来为图形、背景、边…...
点击A组件跳转到B页面的tab的某一列
1、使用vuex存储点击的数据; 点击A组件里面的button按钮: <div><button click"banli(first)">已办理</button><button click"banli(second)">未办理</button><button click"banli(third)&quo…...
图像小波去噪与总变分去噪详解与Python实现
目录 图像小波去噪与总变分去噪详解与实现1. 基础概念1.1 噪声类型及去噪问题定义1.2 小波去噪算法基础1.3 总变分去噪算法基础2. 小波去噪算法2.1 理论介绍2.2 Python实现及代码详解2.3 案例分析3. 总变分去噪算法3.1 理论介绍3.2 Python实现及代码详解3.3 案例分析4. 两种算法…...
mvn-mac操作小记
1.安装brew 如果报错,Warning: /opt/homebrew/bin is not in your PATH. vim ~/.zshrc,最后一行追加 export PATH“/opt/homebrew/bin:$PATH” source ~/.zshrc 2.安装brew install maven mvn -version查看路径 Maven home: /opt/homebrew/Cellar/mav…...
【娱乐项目】基于批处理脚本与JavaScript渲染视频列表的Web页面
Demo介绍 一个简单的视频播放器应用,其中包含了视频列表和一个视频播放区域。用户可以通过点击视频列表中的项来选择并播放相应的视频,播放器会自动播放每个视频并在播放完毕后切换到下一个视频。本项目旨在通过自动化脚本和动态网页渲染,帮助…...
Faster R-CNN (目标检测)
Faster R-CNN (Faster Region-based Convolutional Neural Networks) Faster R-CNN 是一种高效的目标检测模型,它是在 R-CNN 系列(包括 R-CNN 和 Fast R-CNN)的基础上发展而来的,能够实现对图像中多个对象的检测。Faster R-CNN 引…...
Diffusion中的Unet (DIMP)
针对UNet2DConditionModel模型 查看Unet的源码,得知Unet的down,mid,up blocks的类型分别是: down_block_types: Tuple[str] ("CrossAttnDownBlock2D","CrossAttnDownBlock2D","CrossAttnDownBlock2D","DownBlock2…...
docker服务容器化
docker服务容器化 1 引言2 多个容器间网络联通2.1 单独创建关联2.2 创建时关联 3 服务搭建3.1 镜像清单3.2 容器创建 4 联合实战4.2 flink_sql之kafka到starrocks4.2 flink_sql之mysql到starrocks 5 文献借鉴 1 引言 利用docker可以很效率地搭建服务,本文在win1…...
Flink 从入门到实战
Flink中的批和流 批处理的特点是有界、持久、大量,非常适合需要访问全部记录才能完成的计算工作,一般用于离线统计。 流处理的特点是无界、实时, 无需针对整个数据集执行操作,而是对通过系统 传输的每个数据项执行操作,一般用于实…...
ffmpeg安装(windows)
ffmpeg安装-windows 前言ffmpeg安装路径安装说明 前言 ffmpeg的安装也是开箱即用的,并没有小码哥说的那么难 ffmpeg安装路径 这就下载好了! 安装说明 将上面的bin目录加入到环境变量,然后在cmd中测试一下: C:\Users\12114\Desktop\test\TaskmgrPlayer\x64\Debug>ffmpe…...
深度解析:Android APP集成与拉起微信小程序开发全攻略
目录 一、背景以及功能介绍 二、Android开发示例 2.1 下载 SDK 2.2 调用接口 2.3 获取小程序原始Id 2.4 报错提示:bad_param 2.4.1 错误日志 2.4.2 解决方案 相关推荐 一、背景以及功能介绍 需求:产品经理需要APP跳转到公司的小程序(最好指定页…...
DeSTSeg: Segmentation Guided Denoising Student-Teacher for Anomaly Detection
DeSTSeg: Segmentation Guided Denoising Student-Teacher for Anomaly Detection 清华、苹果 个人感觉 Introduction 很自然的让读者理解作者问题的提出,也有例子直接证明了这个问题的存在,值得借鉴!! Related work写的也很不…...
Xilinx Blockset Gateway In 和Gateway out模块使用及参数配置
目录 一、Gateway InSimulink数据到System Generator数据的转换Gateway BlocksBlock Parameters(模块参数)Basic选项卡参数Implementation选项卡参数 二、Gateway OutGateway BlocksBlock Parameters(模块参数)Basic选项卡参数Imp…...
set up RAGFlow on your Mac
个人思考:这些仅仅是工具,和人的思维实际还是有很大差距。 可能是我认知片面,你需要投喂大量的内容给它,它自己其实并不会思考,只是从它的认知里告诉它他知道的东西。举个不太巧当的例子,和以往的方式恰恰相…...
SSM搭建(1)——配置MyBatis
目录 一、框架概述 1.什么是JDBC? 2.JDBC基本流程 3.JDBC的缺点 二、MyBatis的入门程序 1. 创建数据库和表结构 2. MyBatis入门流程总结 3. MyBatis的入门步骤 (1) 创建maven的项目,创建Java工程即可。 &…...
SickOs: 1.1靶场学习小记
学习环境 kali攻击机:Get Kali | Kali Linux vulnhub靶场:https://download.vulnhub.com/sickos/sick0s1.1.7z 靶场描述: 这次夺旗赛清晰地模拟了在安全环境下如何对网络实施黑客策略从而入侵网络的过程。这个虚拟机与我在进攻性安全认证专…...
Flume 监控配置和实践
要解释 Flume 的监控机制,需要了解 Flume 是如何设计其监控架构的,以及如何将性能指标暴露给用户或集成工具。下面我将详细分解 Flume 的监控机制,从基础架构、实现原理到源码解析,并提供非专业人也能理解的通俗解释。 Flume 的监…...
二分法算法
提示:文章 文章目录 前言一、背景二、二分法2.2 最坏情况下冒泡排序的比较次数 三、大算法之一:分治法总结 前言 前期疑问: 本文目标: 二分法 一、背景 问题来源是一个题目,在A[N]字符串数组中匹配长度为M的字符串&…...
3.27浮点数计算
-127就是说,有8位的数来表示指数,然后给他减去127就是这八位劈半,一半表示负数的指数,一半表示整数的指数;对于移码来说,最高位为1时表示为正,为0时表示为负 对阶是要小阶向大阶对齐,…...
存储过程与自然语言处理逻辑的不同与结合
在现代软件开发中,存储过程与自然语言处理(NLP)逻辑都发挥着重要作用。存储过程是一种在数据库内部运行的预编译程序,通常用于处理与数据相关的任务,例如插入、更新、删除数据以及复杂的查询操作。而自然语言处理&…...
数据集搜集器(百科)008
对数据集搜集器(百科)007进行一下改进: 错误处理:增加更多的错误处理,比如网络请求超时、解析错误等。 用户界面:增加一些提示信息,让用户更清楚当前的操作状态。 多线程处理:确保多…...
用Pycharm安装manim
由于版本和工具的差异,manim的安装方式不尽相同。本文用Pycharm来安装manim. 一、准备工作:安装相应版本的python、pycharm和ffmpeg. 此处提供一种安装ffmpeg的方式 下载地址:FFmpeg 下载后,解压到指定目录。 配置环境变量&am…...
HTB:Love[WriteUP]
目录 连接至HTB服务器并启动靶机 信息收集 使用rustscan对靶机TCP端口进行开放扫描 使用nmap对靶机开放端口进行脚本、服务扫描 使用浏览器访问靶机443端口 尝试利用该功能访问靶机自身80端口 使用ffuf对靶机80端口进行路径FUZZ 漏洞利用 使用searchsploit搜索靶机80端…...
程序设计 26种设计模式,如何分类?
1. 创建型模式 (Creational Patterns) 这些模式关注如何实例化对象。它们通过各种方式封装对象的创建过程,从而提供灵活性和可扩展性。 单例模式 (Singleton):确保某个类只有一个实例,并提供全局访问点。工厂方法模式 (Factory Method)&…...
Oracle对比表与表之间的结构
自己首先想到的就是,navicat有提供结构同步 但是有些时候情况不一样,比如我遇到的是连接不同,而且是互相同步,以最多的列的那个表为样 没有说一个固定的源 那么还可以通过导出表结构去另一个库中执行看是否报错,以此来判断结构的不同 但是我感觉有点儿麻烦 最后想到通过sql语…...
MySQL 查询 执行顺序
MySQL查询的执行顺序大致如下: FROM子句:确定要查询的表。 ON:对JOIN语句中的表进行关联条件指定。 JOIN:如果有的话,对表进行关联。 WHERE:对记录进行过滤。 GROUP BY:根据指定的列分组记录…...
Scala习题
姓名,语文,数学,英语 张伟,87,92,88 李娜,90,85,95 王强,78,90,82 赵敏,92,88,91 孙涛,…...
VSCode 使用教程:项目使用配置、使用哪些插件、Live Server使用问题及解决方案(你想要的,都在这里)
VSCode的配置: Ⅰ、VSCode 可能需要的项目配置:1、项目颜色主题的切换:其一、点击设置 -> 选择主题 -> 选择颜色主题:其二、通过上下键操作,选择想要的主题: 2、项目文件图标主题的切换:其…...
RPA:电商订单处理自动化
哈喽,大家好,我是若木,最近闲暇时间较多,于是便跟着教程做了一个及RPA,谈到这个,可能很多人并不是很了解,但是实际上,这玩意却遍布文末生活的边边角角。话不多说,我直接上…...
分布式协同 - 分布式锁一二事儿
文章目录 导图Pre概述概述1. 分布式互斥和临界资源的协调2. 分布式锁的基本原理3. 分布式锁的实现方式a. 基于数据库实现的分布式锁b. 基于Redis实现的分布式锁c. 基于Zookeeper实现的分布式锁 4. 高并发场景下的分布式锁优化a. 分段锁(Sharded Locks)b.…...