当前位置: 首页 > news >正文

day07_Spark SQL

文章目录

  • day07_Spark SQL课程笔记
    • 一、今日课程内容
    • 二、Spark SQL函数定义(掌握)
      • 1、窗口函数
      • 2、自定义函数背景
        • 2.1 回顾函数分类标准:
          • SQL最开始是_内置函数&自定义函数_两种
        • 2.2 自定义函数背景
      • 3、Spark原生自定义UDF函数
        • 3.1 自定义函数流程:
        • 3.2 自定义演示一:
        • 3.3 自定义演示二:
        • 3.4 自定义演示三:
    • 4、Pandas的自定义函数
        • 4.1 Apache Arrow框架
        • 4.2 基于Arrow完成Pandas和Spark的DataFrame互转
        • 4.3 基于Pandas自定义函数
          • 4.3.1 自定义函数流程
          • 4.3.2 自定义UDF函数
          • 4.3.3 自定义UDAF函数
    • 三、Spark on Hive(操作)
      • 1、集成原理
      • 2、集成环境配置
      • 3、启动metastore服务
      • 4、SparkOnHive操作
        • 4.1 黑窗口测试spark-sql
        • 4.2 python代码测试spark-sql
    • 四、SparkSQL的分布式执行引擎(了解)
      • 1、启动Thrift服务
      • 2、beeline连接Thrift服务
      • 3、开发工具连接Thrift服务
      • 4、控制台编写SQL代码
    • 五、Spark SQL的运行机制(掌握)
      • 5.1 **Catalyst**内部具体的执行流程:
      • **为什么 SparkSQL 的执行流程就像是“从 SQL 语句到结果的流水线”?**
      • **实际意义**
      • 5.2 SparkSQL的执行流程总结:
  • 01_spark原生自定义UDF函数_返回字符串.py
    • 结果
  • 02_spark原生自定义UDF函数_返回列表.py
    • 结果
  • 03_spark原生自定义UDF函数_返回字典.py
    • 结果
  • 04_sparkSQL和pandas中df对象互转操作.py
  • 05_spark基于pandas定义udf函数_s到s.py
  • 06_spark基于pandas定义udaf函数_s到标量.py
  • 07_spark_sql操作数据库.py

day07_Spark SQL课程笔记

在这里插入图片描述

一、今日课程内容

  • 1- Spark SQL函数定义(掌握)
  • 2- Spark On Hive(操作)
  • 3- Spark SQL的分布式执行引擎(了解)
  • 4- Spark SQL的运行机制(掌握)

今日目的:掌握Spark SQL函数定义的两种方式;理解->掌握Spark SQL的运行机制

二、Spark SQL函数定义(掌握)

1、窗口函数

回顾之前学习过的窗口函数:

分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])分析函数可以大致分成如下3类:
1- 第一类: 聚合函数 sum() count() avg() max() min()
2- 第二类: 排序函数 row_number() rank() dense_rank() 
3- 第三类: 其他函数 ntile()  first_value() last_value() lead() lag() 三个排序函数的区别?
row_number(): 巧记 1234  特点: 唯一且连续
rank(): 巧记 1224 特点: 并列不连续
dense_rank(): 巧记 1223  特点: 并列且连续

在Spark SQL中使用窗口函数案例:

已知数据如下:

cookie1,2018-04-10,1
cookie1,2018-04-11,5
cookie1,2018-04-12,7
cookie1,2018-04-13,3
cookie1,2018-04-14,2
cookie1,2018-04-15,4
cookie1,2018-04-16,4
cookie2,2018-04-10,2
cookie2,2018-04-11,3
cookie2,2018-04-12,5
cookie2,2018-04-13,6
cookie2,2018-04-14,3
cookie2,2018-04-15,9
cookie2,2018-04-16,7

需求: 要求找出每个cookie中pv排在前3位的数据,也就是分组取TOPN问题

# 导包
import os
from pyspark.sql import SparkSession,functions as F,Window as W# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.read.csv(path='file:///export/data/spark_project/spark_sql/data/cookie.txt',sep=',',schema='cookie string,datestr string,pv int')# 3.数据处理(切分,转换,分组聚合)# 4.数据输出etldf = df.dropDuplicates().dropna()# SQL方式etldf.createTempView('cookie_logs')spark.sql("""select cookie,datestr,pvfrom (select cookie,datestr,pv,dense_rank() over(partition by cookie order by pv desc) as rnfrom cookie_logs) temp where rn <=3 """).show()# DSL方式etldf.select('cookie', 'datestr', 'pv',F.dense_rank().over( W.partitionBy('cookie').orderBy(F.desc('pv')) ).alias('rn')).where('rn <=3').select('cookie', 'datestr', 'pv').show()# 5.关闭资源spark.stop()

运行结果截图:
在这里插入图片描述

2、自定义函数背景

2.1 回顾函数分类标准:
SQL最开始是_内置函数&自定义函数_两种

SQL函数,主要分为以下三大类:

  • UDF函数:普通函数
    • 特点:一对一,输入一个得到一个
    • 例如:split() …
  • UDAF函数:聚合函数
    • 特点:多对一,输入多个得到一个
    • 例如:sum() avg() count() min() max() …
  • UDTF函数:表生成函数
    • 特点:一对多,输入一个得到多个
    • 例如:explode() …

在SQL中提供的所有的内置函数,都是属于以上三类中某一类函数

  1. 简单来说:UDF、UDAF和UDTF是Spark SQL中用于扩展SQL功能的三种自定义函数,分别像是“单兵作战”、“团队协作”和“多面手”,满足不同的数据处理需求。

  2. 具体而言

    • UDF(用户自定义函数)
      • 功能:对单行数据进行操作,输入一行输出一行。
      • 场景:适合简单的数据转换,比如将字符串转换为大写。
      • 示例spark.udf.register("to_upper", lambda x: x.upper())
    • UDAF(用户自定义聚合函数)
      • 功能:对多行数据进行聚合操作,输入多行输出一行。
      • 场景:适合复杂的聚合计算,比如自定义加权平均。
      • 示例:继承UserDefinedAggregateFunction类,实现initializeupdatemerge等方法。
    • UDTF(用户自定义表生成函数)
      • 功能:对单行数据进行操作,输入一行输出多行。
      • 场景:适合数据展开操作,比如将JSON数组拆分为多行。
      • 示例:继承GenericUDTF类,实现initializeprocessclose等方法。
  3. 实际生产场景

    • 在数据清洗中,使用UDF将日期格式统一为标准格式。
    • 在数据分析中,使用UDAF计算复杂的业务指标,如客户生命周期价值(CLV)。
    • 在数据展开中,使用UDTF将嵌套的JSON数据拆分为多行,便于后续分析。
  4. 总之:UDF、UDAF和UDTF是Spark SQL中强大的扩展工具,能够满足从简单转换到复杂聚合、数据展开的多种需求,为数据处理提供了极大的灵活性。

2.2 自定义函数背景

思考:有这么多的内置函数,为啥还需要自定义函数呢?

	为了扩充函数功能。在实际使用中,并不能保证所有的操作函数都已经提前的内置好了。很多基于业务处理的功能,其实并没有提供对应的函数,提供的函数更多是以公共功能函数。此时需要进行自定义,来扩充新的功能函数

​ 在Spark SQL中,针对Python语言,对于自定义函数,原生支持的并不是特别好。目前原生仅支持自定义UDF函数,而无法自定义UDAF函数和UDTF函数。

​ 在1.6版本后,Java 和scala语言支持自定义UDAF函数,但Python并不支持。

1- SparkSQL原生的时候,Python只能开发UDF函数
2- SparkSQL借助其他第三方组件(Arrow,pandas...),Python可以开发UDF、UDAF函数,同时也提升效率

在这里插入图片描述

Spark SQL原生UDF函数存在的问题:大量的序列化和反序列

	虽然Python支持自定义UDF函数,但是其效率并不是特别的高效。因为在使用的时候,传递一行处理一行,返回一行的方式。这样会带来非常大的序列化的开销的问题,导致原生UDF函数效率不好早期解决方案: 基于Java/Scala来编写自定义UDF函数,然后基于python调用即可目前主要的解决方案: 引入Arrow框架,可以基于内存来完成数据传输工作,可以大大的降低了序列化的开销,提供传输的效率,解决原生的问题。同时还可以基于pandas的自定义函数,利用pandas的函数优势完成各种处理操作

在这里插入图片描述

3、Spark原生自定义UDF函数

3.1 自定义函数流程:
第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可第二步: 将Python函数注册到Spark SQL中注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)参数1: 【UDF函数名称】,此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范参数2: 【自定义的Python函数】,表示将哪个Python的函数注册为Spark SQL的函数参数3: 【UDF函数的返回值类型】。用于表示当前这个Python的函数返回的类型udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用说明: 如果通过方式一来注册函数, 【可以用在SQL和DSL】注册方式二:  udf对象 = F.udf(参数1,参数2)参数1: Python函数的名称,表示将那个Python的函数注册为Spark SQL的函数参数2: 返回值的类型。用于表示当前这个Python的函数返回的类型udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用说明: 如果通过方式二来注册函数,【仅能用在DSL中】注册方式三:  语法糖写法  @F.udf(returnType=返回值类型)  放置到对应Python的函数上面说明: 实际是方式二的扩展。如果通过方式三来注册函数,【仅能用在DSL中】第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可
3.2 自定义演示一:

需求1: 请自定义一个函数,完成对 数据 统一添加一个后缀名的操作 , 例如后缀名 ‘_itheima’

效果如下:

在这里插入图片描述

# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType# 绑定指定的python解释器os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.createDataFrame(data=[(1,'张三','广州'),(2,'李四','深圳')],schema='id int,name string,address string')df.show()# 3.SparkSQL自定义udf函数# 第一步.自定义python函数def add_suffix(data):return data+'_itheima'# 第二步.把python函数注册到SparkSQL# ① spark.udf.register注册dsl1_add_suffix = spark.udf.register('sql_add_suffix',add_suffix,StringType())# ②F.udf注册dsl2_add_suffix = F.udf(add_suffix, StringType())# ③@F.udf注册@F.udf( StringType())def candy_add_suffix(data):return data+'_itheima'# 第三步.在SparkSQL中调用自定义函数# SQL方式df.createTempView('temp')spark.sql("""select id,name,sql_add_suffix(address) as new_address from temp""").show()# DSL方式# 调用dsl1_add_suffixdf.select('id', 'name', dsl1_add_suffix('address').alias('new_address')).show()# 调用dsl2_add_suffixdf.select('id', 'name', dsl2_add_suffix('address').alias('new_address')).show()# 调用candy_add_suffixdf.select('id', 'name', candy_add_suffix('address').alias('new_address')).show()# 4.关闭资源spark.stop()

斌哥友情提醒: 可能遇到的问题如下

在这里插入图片描述

原因: 在错误的地方调用了错误的函数。spark.udf.register参数1取的函数名只能在SQL中使用,不能在DSL中用。
3.3 自定义演示二:

需求2: 请自定义一个函数,返回值类型为复杂类型: 列表

效果如下:

在这里插入图片描述

参考代码:

# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType, ArrayType# 绑定指定的python解释器os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.createDataFrame(data=[(1,'张三_广州'),(2,'李四_深圳')],schema='id int,name_address string')df.show()# 3.SparkSQL自定义udf函数# 第一步.自定义python函数def my_split(data:str):list1 = data.split('_')return list1# 第二步.把python函数注册到SparkSQL# ① spark.udf.register注册dsl1_add_suffix = spark.udf.register('sql_add_suffix',my_split,ArrayType(StringType()))# ②F.udf注册dsl2_add_suffix = F.udf(my_split, ArrayType(StringType()))# ③@F.udf注册@F.udf(ArrayType(StringType()))def candy_add_suffix(data):list1 = data.split('_')return list1# 第三步.在SparkSQL中调用自定义函数# SQL方式df.createTempView('temp')spark.sql("""select id,sql_add_suffix(name_address) as new_address from temp""").show()# DSL方式# 调用dsl1_add_suffixdf.select('id',  dsl1_add_suffix('name_address').alias('new_name_address')).show()# 调用dsl2_add_suffixdf.select('id',dsl2_add_suffix('name_address').alias('new_name_address')).show()# 调用candy_add_suffixdf.select('id',candy_add_suffix('name_address').alias('new_name_address')).show()# 4.关闭资源spark.stop()
3.4 自定义演示三:

需求3: 请自定义一个函数,返回值类型为复杂类型: 字典

效果如下:

在这里插入图片描述

注意: 注意: 如果是字典类型,StructType中列名需要和字典的key值一致,否则是null补充
# 导包
import os
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import StringType, ArrayType, StructType# 绑定指定的python解释器os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.createDataFrame(data=[(1,'张三_广州'),(2,'李四_深圳')],schema='id int,name_address string')df.show()# 3.SparkSQL自定义udf函数# 第一步.自定义python函数def my_split(data:str):list1 = data.split('_')return {'name':list1[0],'address':list1[1]}# 第二步.把python函数注册到SparkSQL# 注意: 如果是字典类型,StructType中列名需要和字典的key值一致,否则是nullt = StructType().add('name',StringType()).add('address',StringType())# ① spark.udf.register注册dsl1_add_suffix = spark.udf.register('sql_add_suffix',my_split,t)# ②F.udf注册dsl2_add_suffix = F.udf(my_split, t)# ③@F.udf注册@F.udf(t)def candy_add_suffix(data):list1 = data.split('_')return {'name':list1[0],'address':list1[1]}# 第三步.在SparkSQL中调用自定义函数# SQL方式df.createTempView('temp')spark.sql("""select id,sql_add_suffix(name_address) as new_name_address from temp""").show()# DSL方式# 调用dsl1_add_suffixdf.select('id', dsl1_add_suffix('name_address').alias('new_name_address')).show()# 调用dsl2_add_suffixdf.select('id',dsl2_add_suffix('name_address').alias('new_name_address')).show()# 调用candy_add_suffixdf.select('id',candy_add_suffix('name_address').alias('new_name_address')).show()# 4.关闭资源spark.stop()

4、Pandas的自定义函数

2-如果不是3.1.2版本,那么先卸载pyspark

命令: pip uninstall pyspark

3- 再按照【Spark课程阶段_部署文档.doc】中重新安装3.1.2版本pyspark

命令: pip install -i https://pypi.tuna.tsinghua.edu.cn/simple
pyspark==3.1.2

4.1 Apache Arrow框架

​ Apache Arrow是Apache旗下的一款顶级的项目。是一个跨平台的在内存中以列式存储的数据层,它的设计目标就是作为一个跨平台的数据层,来加快大数据分析项目的运行效率

​ Pandas 与 Spark SQL 进行交互的时候,建立在Apache Arrow上,带来低开销 高性能的UDF函数

如何安装? 三个节点建议都安装

检查服务器上是否有安装pyspark
pip list | grep pyspark  或者 conda list | grep pysparkpip list | grep pyarrow  
如果服务器已经安装了pyspark的库,那么仅需要执行以下内容,即可安装。例如在 node1安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]如果服务器中python环境中没有安装pyspark,建议执行以下操作,即可安装。例如在 node2 和 node3安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyarrow==10.0.0

在这里插入图片描述

Arrow并不会自动使用,在某些情况下,需要配置 以及在代码中需要进行小的更改才可以使用

如何使用呢? 默认不会自动启动的, 一般建议手动配置

spark.conf.set('spark.sql.execution.arrow.pyspark.enabled',True)
4.2 基于Arrow完成Pandas和Spark的DataFrame互转

Pandas中DataFrame:

DataFrame:表示一个二维表对象,就是表示整个表

字段、列、索引;Series表示一列
在这里插入图片描述

Spark SQL中DataFrame:
在这里插入图片描述

使用场景:

1- Spark的DataFrame -> Pandas的DataFrame:当大数据处理到后期的时候,可能数据量会越来越少,这样可以考虑使用单机版的Pandas来做后续数据的分析

2- Pandas的DataFrame -> Spark的DataFrame:当数据量达到单机无法高效处理的时候,或者需要和其他大数据框架集成的时候,可以转成Spark中的DataFrame

Pandas的DataFrame -> Spark的DataFrame: spark.createDataFrame(data=pandas_df)
Spark的DataFrame -> Pandas的DataFrame: init_df.toPandas()

示例:

# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# TODO: 手动开启arrow框架spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)# 2.数据输入df = spark.createDataFrame(data=[(1,'张三_广州'),(2,'李四_深圳')],schema='id int ,name_address string')df.show()print(type(df))print('------------------------')# 3.数据处理(切分,转换,分组聚合)# 4.数据输出# spark->pandaspd_df = df.toPandas()print(pd_df)print(type(pd_df))print('------------------------')# pandas->sparkdf2 = spark.createDataFrame(pd_df)df2.show()print(type(df2))# 5.关闭资源spark.stop()
4.3 基于Pandas自定义函数

​ 基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。

​ Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型

基于Pandas的UDF可以使用自定义UDF函数和自定义UDAF函数

4.3.1 自定义函数流程
第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可第二步: 将Python函数包装成Spark SQL的函数注册方式一: udf对象 = spark.udf.register(参数1, 参数2)参数1: UDF函数名称。此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范参数2: Python函数的名称。表示将哪个Python的函数注册为Spark SQL的函数使用: udf对象只能在DSL中使用。参数1指定的名称只能在SQL中使用注册方式二: udf对象 = F.pandas_udf(参数1, 参数2)参数1: 自定义的Python函数。表示将哪个Python的函数注册为Spark SQL的函数参数2: UDF函数的返回值类型。用于表示当前这个Python的函数返回的类型对应到Spark SQL的数据类型udf对象: 返回值对象,是一个UDF对象。仅能用在DSL中使用注册方式三: 语法糖写法  @F.pandas_udf(returnType)  放置到对应Python的函数上面说明: 实际是方式二的扩展。仅能用在DSL中使用第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可基于pandas方式还支持自定义UDAF函数
注意: 如果要用于自定义UDAF函数,理论上只能用上述注册方式三语法糖方式,也就意味着理论只能DSL使用
注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式register注册,就可以使用了!
4.3.2 自定义UDF函数
  • 自定义Python函数的要求:SeriesToSeries

    在这里插入图片描述

# 导包
import os
from pyspark.sql import SparkSession,functions as F
import pandas as pd# 绑定指定的python解释器
from pyspark.sql.types import LongType, IntegerTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# TODO: 开启Arrow的使用spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'True')# 2.数据输入df = spark.createDataFrame(data = [(1,1),(2,2),(3,3)],schema= 'num1 int,num2 int')df.show()# 3.基于pandas自定义函数 :SeriesTOSeries# 第一步: 自定义python函数def multiply(num1:pd.Series,num2:pd.Series)->pd.Series:return num1*num2# 第二步: 把python注册为SparkSQL函数# ①spark.udf.register注册dsl1_multiply = spark.udf.register('sql_multiply',multiply)# ②F.pandas_udf注册dsl2_multiply = F.pandas_udf(multiply,IntegerType())# ③@F.pandas_udf注册@F.pandas_udf(IntegerType())def candy_multiply(num1: pd.Series, num2: pd.Series) -> pd.Series:return num1 * num2# 第三步: 在SparkSQL中调用注册后函数# SQL方式df.createTempView('temp')spark.sql("""select num1,num2,sql_multiply(num1,num2) as result from temp""").show()# DSL方式#调用dsl1_multiplydf.select('num1','num2',dsl1_multiply('num1','num2').alias('result')).show()# 调用dsl2_multiplydf.select('num1', 'num2', dsl2_multiply('num1', 'num2').alias('result')).show()# 调用candy_multiplydf.select('num1', 'num2', candy_multiply('num1', 'num2').alias('result')).show()# 4.关闭资源spark.stop()
4.3.3 自定义UDAF函数
  • 自定义Python函数的要求:Series To 标量
  1. 简单来说:Series To 标量是指将一个Pandas Series(一维数组)转换为一个标量值(单个值),就像是“把一串数据浓缩成一个结果”。

  2. 具体而言

    • Series:Pandas中的一维数据结构,类似于带标签的数组,可以存储任意类型的数据。
    • 标量:单个值,比如整数、浮点数、字符串等。
    • 转换场景
      • 聚合操作:将Series中的所有值通过某种计算(如求和、平均值)转换为一个标量值。
      • 提取操作:从Series中提取某个特定位置的值作为标量。
    • 示例
      import pandas as pd# 创建一个Series
      s = pd.Series([1, 2, 3, 4, 5])# 聚合操作:求和
      sum_result = s.sum()  # 输出:15# 提取操作:获取第一个值
      first_value = s[0]    # 输出:1
      
  3. 实际生产场景

    • 在数据分析中,使用聚合操作将一列数据(如销售额)转换为总销售额或平均销售额。
    • 在数据处理中,从时间序列数据中提取某个时间点的值作为标量。
  4. 总之:Series To 标量是Pandas中常见的操作,通过聚合或提取,将一维数据转换为单个值,为数据分析和处理提供了便利。

表示:自定义函数的输入数据类型是Pandas中的Series对象,返回值数据类型是标量数据类型。也就是Python中的数据类型,例如:int、float、bool、list…

在这里插入图片描述

基于pandas方式还支持自定义UDAF函数
注意: 如果要用于自定义UDAF函数,理论上只能用上述注册方式三语法糖方式,也就意味着理论只能DSL使用
注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式register注册,就可以使用了!
# 导包
import os
from pyspark.sql import SparkSession, functions as F
import pandas as pd# 绑定指定的python解释器
from pyspark.sql.types import LongType, IntegerType, FloatTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# TODO: 开启Arrow的使用spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'True')# 2.数据输入df = spark.createDataFrame(data=[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],schema='id int,value float')df.show()# 3.基于pandas自定义函数 :SeriesTOSeries# 第一步: 自定义python函数# ③@F.pandas_udf注册  注意: 理论上UDAF只能用注册方式三语法糖方式,也就意味着只能DSL使用@F.pandas_udf(FloatType())def candy_mean_v(value: pd.Series) -> float:return value.mean()# 第二步: 注意: 如果还想同时用SQL方式和DSL方式,可以把加了语法糖的函数,再传入到方式一register注册# ①spark.udf.register注册dsl1_mean_v = spark.udf.register('sql_mean_v', candy_mean_v)# 第三步: 在SparkSQL中调用注册后函数# DSL方式# 调用candy_mean_vdf.groupBy('id').agg(candy_mean_v('value').alias('result')).show()# 调用dsl1_mean_vdf.groupBy('id').agg(dsl1_mean_v('value').alias('result')).show()# SQL方式df.createTempView('temp')spark.sql("""select id,sql_mean_v(value) as result from temp group by id""").show()# 4.关闭资源spark.stop()

三、Spark on Hive(操作)

1、集成原理

在这里插入图片描述

HiveServer2的主要作用: 接收SQL语句,进行语法检查;解析SQL语句;优化;将SQL转变成MapReduce程序,提交到Yarn集群上运行SparkSQL与Hive集成,实际上是替换掉HiveServer2。是SparkSQL中的HiveServer2替换掉了Hive中的HiveServer2。集成以后优点如下:
1- 对于SparkSQL来说,可以避免在代码中编写schema信息。直接向MetaStore请求元数据信息
2- 对于SparkSQL来说,多个人可以共用同一套元数据信息,避免每个人对数据理解不同造成代码功能兼容性问题
3- 对于Hive来说,底层执行引擎由之前的MapReduce变成了Spark Core,能够提升运行效率
4- 对于使用者/程序员来说,SparkSQL与Hive集成,对于上层使用者来说,是完全透明的。

2、集成环境配置

环境搭建,参考【Spark课程阶段_部署文档.doc】的7章节内容。

1-node1上将hive-site.xml拷贝到spark安装路径conf目录

cd /export/server/hive/confcp hive-site.xml /export/server/spark/conf/

2-node1上执行以下命令将mysql的连接驱动包拷贝到spark的jars目录下

注意: 之前拷贝过的可以忽略此操作

cd /export/server/hive/libcp mysql-connector-java-5.1.32.jar  /export/server/spark/jars/

3、启动metastore服务

# 注意: 
# 启动 hadoop集群
start-all.sh# 启动hive的metastore
nohup /export/server/hive/bin/hive --service metastore &# 测试spark-sql
/export/server/spark/bin/spark-sql

4、SparkOnHive操作

4.1 黑窗口测试spark-sql
[root@node1 bin]# /export/server/spark/bin/spark-sql
...
spark-sql>show databases;
...
spark-sql>create database if not exists spark_demo;
...
spark-sql>create table if not exists spark_demo.stu(id int,name string);
...
spark-sql>insert into  spark_demo.stu values(1,'张三'),(2,'李四');
...
4.2 python代码测试spark-sql

SparkOnHive配置:

spark.sql.warehouse.dir: 告知Spark数据表存放的地方。推荐使用HDFS。如果不配置,默认使用本地磁盘存储。
hive.metastore.uris: 告知Spark,MetaStore元数据管理服务的连接信息
enableHiveSupport() : 开启Spark和Hive的集成使用格式如下:spark = SparkSession.builder\.config('spark.sql.warehouse.dir','hdfs://node1:8020/user/hive/warehouse')\.config('hive.metastore.uris','thrift://node1.itcast.cn:9083')\.appName('pyspark_demo')\.master('local[1]')\.enableHiveSupport()\.getOrCreate()

示例:

# 导包
import os
import timefrom pyspark.sql import SparkSession# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder\.config('spark.sql.warehouse.dir','hdfs://node1:8020/user/hive/warehouse')\.config('hive.metastore.uris','thrift://node1.itcast.cn:9083')\.appName('pyspark_demo')\.master('local[1]')\.enableHiveSupport()\.getOrCreate()# 2.执行sql# 查看所有库spark.sql( "show databases").show()# 查看demo1的student表内容spark.sql("select * from demo1.student").show()# 测试是否能建库: 可以spark.sql( "create database if not exists spark_demo" )# 测试是否能在spark_demo建表: 可以spark.sql("""create table if not exists spark_demo.stu(id int,name string)""")# 测试是否可以往spark_demo.stu表插入数据: 可以spark.sql("""insert into  spark_demo.stu values(1,'张三'),(2,'李四')""")# 为了方便查看web页面time.sleep(500)# 3.关闭资源spark.stop()

四、SparkSQL的分布式执行引擎(了解)

分布式执行引擎 == Thrift服务 == ThriftServer == SparkSQL中的Hiveserver2

1、启动Thrift服务

​ 目前,我们已经完成Spark集成Hive的配置。但是目前集成后,如果需要连接Hive,此时需要启动一个Spark的客户端(spark-sql、代码)才可以。这个客户端底层相当于启动服务项,用于连接Hive的metastore的服务,进行处理操作。一旦退出客户端,相当于这个服务也就没有了,无法再使用

​ 目前的情况非常类似于在Hive部署的时候,有一个本地模式部署(在启动Hive客户端的时候,内部自动启动一个Hive的hiveserver2服务项)

大白话: 目前在Spark后台,并没有一个长期挂载的Spark的服务(Spark HiveServer2服务)。导致每次启动Spark客户端,都需要在内部启动一个服务项。这种方式,不适合测试使用,不合适后续的快速开发

​ 如何启动Spark 提供的分布式的执行引擎呢? 这个引擎大家完全可以将其理解为Spark的HiveServer2服务,实际上就是Spark的Thrift服务项

# 注意: 要启动sparkThriftServer2服务,必须要保证先启动好Hadoop以及Hive的metastore,不能启动Hive的hiveserver2服务!
# 启动 hadoop集群
start-all.sh# 启动hive的metastore
nohup /export/server/hive/bin/hive --service metastore &# 最后执行以下命令启动sparkThriftServer2:
/export/server/spark/sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=10000 \
--hiveconf hive.server2.thrift.bind.host=node1 \
--hiveconf spark.sql.warehouse.dir=hdfs://node1:8020/user/hive/warehouse \
--master local[2]

校验是否成功:
在这里插入图片描述

访问界面:默认4040

在这里插入图片描述

2、beeline连接Thrift服务

启动后,可以通过spark提供beeline的方式连接这个服务。连接后,直接编写SQL即可

相当于模拟了一个Hive的客户端,但是底层执行的是Spark SQL,最终将其转换为Spark RDD的程序

启动命令:/export/server/spark/bin/beeline然后输入:!connect jdbc:hive2://node1:10000继续输入用户名: root
注意密码: 不需要写,直接回车

在这里插入图片描述

3、开发工具连接Thrift服务

如何通过DataGrip或者PyCharm连接Spark进行操作
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4、控制台编写SQL代码

进入以下页面就可以愉快的编写sql了,再也不用担心在spark.sql()中编写没有提示了:)
在这里插入图片描述

五、Spark SQL的运行机制(掌握)

Spark SQL底层依然运行的是Spark RDD的程序,所以说Spark RDD程序的运行的流程,在Spark SQL中依然是存在的,只不过在这个流程的基础上增加了从SQL翻译为RDD的过程

​ Spark SQL的运行机制,其实就是在描述如何将Spark SQL翻译为RDD程序:
在这里插入图片描述

​ 整个Spark SQL 转换为RDD 是基于Catalyst 优化器实施,基于这个优化器即可完成整个转换操作

5.1 Catalyst内部具体的执行流程:

在这里插入图片描述
在这里插入图片描述

大白话:

SQL执行顺序: from->join on->where->groupby->聚合操作->having->select [distinct] ->order by ->limit

1- 接收客户端提交过来的SQL/DSL代码,首先会校验SQL/DSL的语法是否正常。如果通过校验,根据SQL/DSL的执行顺序,生成未解析的逻辑计划,也叫做AST抽象语法树2- 对于AST抽象语法树加入元数据信息,确定一共涉及到哪些字段、字段的数据类型是什么,以及涉及到的表的其他相关元数据信息。加入元数据信息以后,就得到了(已经解析但是未优化的)逻辑计划3- 对(未优化的)逻辑计划执行优化操作,整个优化通过优化器来执行。在优化器匹配相对应的优化规则,实时具体的优化。SparkSQL底层提供了一两百中优化规则,得到优化的逻辑计划。例如: 谓词下推(断言下推)、列值裁剪3.1- 谓词下推: 也叫做断言下推。将数据过滤操作提前到数据扫描的时候执行,减少后续处理数据量,提升效率。3.2- 列值裁剪: 在表中只加载数据分析用到的字段,不相关的字段不加载进来。减少后续处理数据量,提升效率。4- 由于优化规则很多,导致会得到多个优化的逻辑计划。在转换成物理执行计划的过程中,会根据 成本模型(对比每个计划运行的耗时、资源消耗等)得到最优的一个物理执行计划5- 将物理执行计划通过code generation(代码生成器),转变成Spark RDD的代码6- 最后就是将Spark RDD代码部署到集群上运行。后续过程与Spark内核调度中Job的调度流程完全一致。

专业的术语:

1- Spark SQL底层解析是由RBO(基于规则的优化器)和CBO(基于代价的优化器)优化完成的2- RBO是基于规则优化,对于SQL或DSL的语句通过执行引擎得到未执行逻辑计划,在根据元数据得到逻辑计划,之后加入列值裁剪或谓词下推等优化手段形成优化的逻辑计划3- CBO是基于优化的逻辑计划得到多个物理执行计划,根据 代价函数(成本模型) 选择出最优的物理执行计划4- 通过code genaration代码生成器完成RDD的代码构建5- 底层依赖于DAGScheduler和TaskScheduler完成任务计算执行后续过程与Spark内核调度中Job的调度流程完全一致。
  1. 简单来说:SparkSQL的执行流程就像是“从SQL语句到结果的流水线”,通过解析、优化和执行,将SQL查询转化为分布式计算任务,最终返回结果。

  2. 具体而言

    • SQL解析
      • 将SQL语句解析为抽象语法树(AST)。
      • 使用ANTLR工具将AST转换为逻辑计划(Logical Plan)。
    • 逻辑优化
      • 对逻辑计划进行优化,如谓词下推、列剪裁等。
      • 生成优化后的逻辑计划。
    • 物理计划生成
      • 将逻辑计划转换为物理计划(Physical Plan),选择最优的执行策略。
      • 物理计划包括RDD转换、数据源读取等具体操作。
    • 任务调度与执行
      • 将物理计划分解为多个Stage和Task。
      • 通过DAGScheduler和TaskScheduler将Task分配到集群节点上执行。
    • 结果返回
      • 将计算结果返回给客户端,如DataFrame或直接输出。
  3. 实际生产场景

    • 在数据仓库中,使用SparkSQL查询海量数据,生成报表和洞察。
    • 在实时分析中,结合Structured Streaming,使用SparkSQL处理实时数据流。
  4. 总之:SparkSQL的执行流程通过解析、优化和执行,将SQL查询高效地转化为分布式计算任务,为大规模数据处理提供了强大的支持。

为什么 SparkSQL 的执行流程就像是“从 SQL 语句到结果的流水线”?

  1. 流水线:分阶段处理

    • 流水线:将复杂任务分解为多个阶段,每个阶段专注于特定任务。
    • SparkSQL:将SQL查询分解为SQL解析逻辑优化物理计划生成任务调度与执行结果返回等多个阶段,每个阶段完成特定任务。
  2. 高效流转:逐步优化和执行

    • 流水线:每个阶段完成后,数据会流转到下一个阶段,逐步完成最终目标。
    • SparkSQL:SQL语句经过解析、优化、物理计划生成等步骤,逐步转化为分布式计算任务,最终高效执行并返回结果。
  3. 自动化:无需手动干预

    • 流水线:自动化完成每个阶段的任务,无需人工干预。
    • SparkSQL:通过Catalyst优化器和Tungsten引擎,自动优化查询计划并执行,开发者只需关注SQL语句和结果。
  4. 结果导向:最终输出

    • 流水线:最终输出成品。
    • SparkSQL:最终输出查询结果(如DataFrame或报表),为业务决策提供支持。

实际意义

SparkSQL的执行流程就像“从SQL语句到结果的流水线”,通过分阶段、高效流转和自动化的方式,将SQL查询转化为分布式计算任务,最终返回结果,为大规模数据处理提供了强大的支持。

5.2 SparkSQL的执行流程总结:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

01_spark原生自定义UDF函数_返回字符串.py

# 导包
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 创建DF对象df = spark.createDataFrame([(1, "张三", '广州'),(2, "李四", '深圳'),(3, "王五", '上海')], schema=["id", "name", "address"])# 测试是否有数据df.show()# 需求: 自定义函数,功能是给df的所有地址都添加一个后缀'_itheima'# 一.自定义能添加后缀'_itheima'功能的python函数def add_suffix(address):return address + '_itheima'# 二.将python函数注册为spark的UDF函数(SQL风格和DSL风格)# 注册方式1: 适用于sql和dsl风格dsl_add_suffix = spark.udf.register('sql_add_suffix', add_suffix, StringType())# 三.使用UDF函数# 方式1: SQL风格# 先有临时表,再调用sql执行df.createTempView("stu_tb")spark.sql("""select *,sql_add_suffix(address) as address_newfrom stu_tb""").show()# 方式2: DSL风格# df.select(#     "*",#     dsl_add_suffix("address").alias("address_new")# ).show()# 注意: 最后一定释放资源spark.stop()

在这里插入图片描述

结果

在这里插入图片描述

02_spark原生自定义UDF函数_返回列表.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType, ArrayType# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 创建DF对象df = spark.createDataFrame([(1, "张三_广州"),(2, "李四_深圳"),(3, "王五_上海")], schema=["id", "name_address"])# 测试是否有数据df.show()# 需求: 自定义函数# 一.自定义能返回列表的功能的python函数def my_split(name_address):return name_address.split('_')# 二.将python函数注册为spark的UDF函数(SQL风格和DSL风格)# 注册方式1: 适用于sql和dsl风格dsl_my_split = spark.udf.register('sql_my_split', my_split, ArrayType(StringType()))# 三.使用UDF函数# 方式1: SQL风格# 先有临时表,再调用sql执行df.createTempView("stu_tb")spark.sql("""select *,sql_my_split(name_address)[0] as name,sql_my_split(name_address)[1] as addressfrom stu_tb""").show()# 方式2: DSL风格df.select("*",dsl_my_split("name_address")[0].alias("name"),dsl_my_split("name_address")[1].alias("address")).show()# 注意: 最后一定释放资源spark.stop()

结果

在这里插入图片描述

03_spark原生自定义UDF函数_返回字典.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StringType, ArrayType, StructType# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 创建DF对象df = spark.createDataFrame([(1, "张三_广州"),(2, "李四_深圳"),(3, "王五_上海")], schema=["id", "name_address"])# 测试是否有数据df.show()# 需求: 自定义函数# 一.自定义能返回字典的功能的python函数def my_split(name_address):list1 = name_address.split('_')dict1 = {"name": list1[0], "address": list1[1]}return dict1# 二.将python函数注册为spark的UDF函数(SQL风格和DSL风格)# 注册方式1: 适用于sql和dsl风格# 注意: 如果原始函数返回的是字典,就必须用StructType()且字段名必须和原生字典的key值一样,否则null补充t = StructType().add("name", StringType()).add("address", StringType())dsl_my_split = spark.udf.register('sql_my_split', my_split, t)# 三.使用UDF函数# 方式1: SQL风格# 先有临时表,再调用sql执行df.createTempView("stu_tb")spark.sql("""select *,sql_my_split(name_address)['name'] as name,sql_my_split(name_address)['address'] as addressfrom stu_tb""").show()# 方式2: DSL风格df.select("*",dsl_my_split("name_address")['name'].alias("name"),dsl_my_split("name_address")['address'].alias("address")).show()# 注意: 最后一定释放资源spark.stop()

结果

在这里插入图片描述

04_sparkSQL和pandas中df对象互转操作.py

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果想优化createDataFrame()效率可以手动开启arrow设置# TODO: 手动开启arrow设置spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)# 1.先创建sparkSQL的df对象spark_df = spark.createDataFrame([(1, "张三"),(2, "李四"),(3, "王五")], schema=["id", "name"])# 查看数据类型print(type(spark_df))  # <class 'pyspark.sql.dataframe.DataFrame'>spark_df.show()# 2.把saprk_df转换为pandas的df对象pd_df = spark_df.toPandas()# 查看数据类型print(type(pd_df))  # <class 'pandas.core.frame.DataFrame'>print(pd_df)# 3.把pandas的df对象转换为sparkSQL的df对象spark_df2 = spark.createDataFrame(pd_df)# 查看数据类型print(type(spark_df2))  # <class 'pyspark.sql.dataframe.DataFrame'>spark_df2.show()# 注意: 最后一定释放资源spark.stop()

05_spark基于pandas定义udf函数_s到s.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F
import pandas as pd
from pyspark.sql.types import DoubleType, IntegerType# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# TODO: 手动开启arrow设置spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)# 创建DF对象df = spark.createDataFrame([(1, 1), (2, 2), (3, 3)], schema=["n1", "n2"])df.show()# 一.自定义python函数# 功能:输入两列,输出对应乘积1列def mul(n1: pd.Series, n2: pd.Series) -> pd.Series:return n1 * n2# 二.把python函数包装成spark的UDF函数(sql和dsl风格)# 注册方式1: 适用于sql和dsl风格dsl_mul = spark.udf.register("sql_mul", mul)# 注册方式2: 仅适用于dsl风格dsl2_mul = F.pandas_udf(mul, IntegerType())# 注册方式3: 仅适用于dsl风格@F.pandas_udf(IntegerType())def candy_mul(n1: pd.Series, n2: pd.Series) -> pd.Series:return n1 * n2# 三.使用UDF函数# 方式1: SQL风格# 先有临时表,再调用sql执行df.createTempView("nums_tb")spark.sql("""select n1,n2,sql_mul(n1, n2) as n3 from nums_tb""").show()# 方式2: DSL风格df.select("n1", "n2",dsl_mul("n1", "n2").alias("n3"),dsl2_mul("n1", "n2").alias("n4"),candy_mul("n1", "n2").alias("n5")).show()# 注意: 最后一定释放资源spark.stop()

06_spark基于pandas定义udaf函数_s到标量.py

# 导包
import osimport pandas as pd
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 创建DF对象df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],schema="id int, value float")df.show()# 二.使用语法糖方式注册原始函数为udaf函数@F.pandas_udf("float")# 一.定义原始python函数def candy_my_avg(values: pd.Series) -> float:return values.mean()# 三.使用自定义的udaf函数# dsl方式df.groupby("id").agg(candy_my_avg("value").alias("avg_value")).show()# 如果想用sql方式怎么办?把添加了语法糖的函数,再注册为udaf函数dsl_my_avg = spark.udf.register("sql_my_avg", candy_my_avg)df.createTempView('nums_tb')spark.sql("""select id,sql_my_avg(value) as avg_valuefrom nums_tbgroup by id""").show()# 注意: 最后一定释放资源spark.stop()

07_spark_sql操作数据库.py

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = (SparkSession.builder.config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse").config("hive.metastore.uris", "thrift://node1.itcast.cn:9083").appName("spark_demo").master("local[1]").enableHiveSupport().getOrCreate())spark.sql("""create database if not exists spark_demo2""")spark.sql("""create table if not exists spark_demo2.stu(id int,name string,age int);""")spark.sql("""insert into spark_demo2.stu values(1,'张三',18),(2,'李四',28)""")spark.sql("""select * from  spark_demo2.stu""").show()# 注意: 最后一定释放资源spark.stop()

相关文章:

day07_Spark SQL

文章目录 day07_Spark SQL课程笔记一、今日课程内容二、Spark SQL函数定义&#xff08;掌握&#xff09;1、窗口函数2、自定义函数背景2.1 回顾函数分类标准:SQL最开始是_内置函数&自定义函数_两种 2.2 自定义函数背景 3、Spark原生自定义UDF函数3.1 自定义函数流程&#x…...

【LC】2270. 分割数组的方案数

题目描述&#xff1a; 给你一个下标从 0 开始长度为 n 的整数数组 nums 。 如果以下描述为真&#xff0c;那么 nums 在下标 i 处有一个 合法的分割 &#xff1a; 前 i 1 个元素的和 大于等于 剩下的 n - i - 1 个元素的和。下标 i 的右边 至少有一个 元素&#xff0c;也就是…...

Docker 容器通信的网络模式详解

Docker 的网络模式是容器化技术中非常重要的一部分&#xff0c;它决定了容器之间以及容器与外部世界如何通信。Docker 提供了多种网络模式&#xff0c;每种模式都有其特定的使用场景和优势。本文将深入探讨 Docker 的网络模式&#xff0c;包括桥接模式、主机模式、覆盖网络模式…...

Apache和PHP:构建动态网站的黄金组合

在当今的互联网世界&#xff0c;网站已经成为了企业、个人和机构展示自己、与用户互动的重要平台。而在这些动态网站的背后&#xff0c;Apache和PHP无疑是最受开发者青睐的技术组合之一。这一组合提供了高效、灵活且可扩展的解决方案&#xff0c;帮助您快速搭建出强大的网站&am…...

一个简单的html5导航页面

一个简单的 HTML5 导航页面的示例代码&#xff1a; html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><ti…...

积木仪表盘 出现 “没有权限,请联系管理员分配权限“ 解决方法

目录 前言1. 问题所示2. 解决方法前言 🤟 找工作,来万码优才:👉 #小程序://万码优才/r6rqmzDaXpYkJZF 原先写过报表的错误!但错误解决方式不一样:jmreport测试数据库出现 权限不足,此功能需要分配角色 解决方法 1. 问题所示 出现 没有权限,请联系管理员分配权限 的…...

C++语言的循环实现

C语言中的循环实现 引言 在程序设计中&#xff0c;循环是一个至关重要的概念。它允许我们在满足某种条件时重复执行某段代码&#xff0c;从而实现复杂的逻辑和算法。C作为一种强大的编程语言&#xff0c;提供了多种循环结构来满足不同的需求。本文将深入探讨C中的循环实现&am…...

高级java每日一道面试题-2025年01月13日-框架篇[Spring篇]-Spring 是怎么解决循环依赖的?

如果有遗漏,评论区告诉我进行补充 面试官: Spring 是怎么解决循环依赖的? 我回答: 在Java高级面试中&#xff0c;Spring框架如何解决循环依赖是一个重要且常见的问题。以下是对Spring解决循环依赖的详细解释&#xff1a; 循环依赖的定义与类型 循环依赖是指两个或多个Bea…...

.Net Core Record 类型

public class Person { public string id {get;init;} public string code {get;init;} public string name {get;init;} } //Person 属性不可单独赋值&#xff0c;相当于使用record定义 public record Person string id,string code,string name&#xff09; //record类…...

GitLab CI/CD使用runner实现自动化部署前端Vue2 后端.Net 7 Zr.Admin项目

1、查看gitlab版本 建议安装的runner版本和gitlab保持一致 2、查找runner 执行 yum list gitlab-runner --showduplicates | sort -r 找到符合gitlab版本的runner&#xff0c;我这里选择 14.9.1版本 如果执行出现找不到下载源&#xff0c;添加官方仓库 执行 curl -L &quo…...

重邮+数字信号处理实验七:用 MATLAB 设计 IIR 数字滤波器

一、实验目的 1 、加深对窗函数法设计 FIR 数字滤波器的基本原理的理解。 2 、学习用 Matlab 语言的窗函数法编写设计 FIR 数字滤波器的程序。 3 、了解 Matlab 语言有关窗函数法设计 FIR 数字滤波器的常用函数用法。 4 、掌握 FIR 滤波器的快速卷积实现原理。…...

CES 2025:INAIR 推出“另类”AR电脑,重新定义移动计算体验

在2025年国际消费类电子产品展览会(CES)上,INAIR公司凭借其创新的AR电脑产品吸引了众多目光。这款设备不仅融合了增强现实(AR)技术与传统个人电脑的功能,还通过独特的设计理念为用户带来了前所未有的移动计算体验。本文将详细介绍INAIR AR电脑的特点、技术创新及其对未来…...

了解 ASP.NET Core 中的中间件

在 .NET Core 中&#xff0c;中间件&#xff08;Middleware&#xff09; 是处理 HTTP 请求和响应的核心组件。它们被组织成一个请求处理管道&#xff0c;每个中间件都可以在请求到达最终处理程序之前或之后执行操作。中间件可以用于实现各种功能&#xff0c;如身份验证、路由、…...

数据结构与算法之链表: LeetCode 234. 回文链表 (Ts版)

回文链表 https://leetcode.cn/problems/palindrome-linked-list/description/ 描述 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 示例 1 输入&#xff1a;head [1,2,2,1]…...

DVWA靶场CSRF漏洞通关教程及源码审计

目录标题 CSRFlow源码审计 medium源码审计 high源码审计 impossible源码审计 CSRF low 先修改密码 看到地址栏 复制在另一个网页打开 成功登录 源码审计 没有任何过滤措施&#xff0c;很危险&#xff0c;并且采用了不安全的md5加密 <?phpif( isset( $_GET[ Change ] )…...

支持Google Analytics快捷添加的CMS:费用与部署形式详解

CMS 的费用和部署形式是选择平台的重要参考因素&#xff0c;不同的业务需求需要不同的解决方案。本文将从费用和部署形式两个角度&#xff0c;详细分析支持 Google Analytics 快捷集成的 CMS 和工具&#xff0c;帮助您更好地了解这些平台的特点。 1. BigCommerce 费用&#xff…...

Kibana操作ES基础

废话少说&#xff0c;开干&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;截图更清晰&#xff0c;复制在下面 #库操作#创建索引【相当于数据库的库】 PUT /first_index#获…...

如何在Ubuntu上安装和配置Git

版本控制系统&#xff08;VCS&#xff09;是软件开发过程中不可或缺的工具之一&#xff0c;它帮助开发者跟踪代码变更、协作开发以及管理不同版本的项目。Git作为当前最流行的分布式版本控制系统&#xff0c;因其高效性和灵活性而广受青睐。本文将指导你如何在Ubuntu操作系统上…...

基于springboot+vue+微信小程序的宠物领养系统

基于springbootvue微信小程序的宠物领养系统 一、介绍 本项目利用SpringBoot、Vue和微信小程序技术&#xff0c;构建了一个宠物领养系统。 本系统的设计分为两个层面&#xff0c;分别为管理层面与用户层面&#xff0c;也就是管理者与用户&#xff0c;管理权限与用户权限是不…...

HTB:Driver[WriteUP]

目录 连接至HTB服务器并启动靶机 信息收集 使用rustscan对靶机TCP端口进行开放扫描 将靶机TCP开放端口号提取并保存 使用nmap对靶机TCP开放端口进行脚本、服务扫描 使用nmap对靶机TCP开放端口进行漏洞、系统扫描 使用nmap对靶机常用UDP端口进行开放扫描 使用smbclient尝…...

Require:利用MySQL binlog实现闪回操作

1&#xff0c;闪回原理 【binlog】MySQL binlog以event的形式&#xff0c;记录了MySQL server从启用binlog以来所有的变更信息&#xff0c;能够帮助重现这之间的所有变化。MySQL引入binlog主要有两个目的&#xff1a;一是为了主从复制&#xff1b;二是某些备份还原操作后需要重…...

黑马linux笔记(03)在Linux上部署各类软件 MySQL5.7/8.0 Tomcat(JDK) Nginx RabbitMQ

文章目录 实战章节&#xff1a;在Linux上部署各类软件tar -zxvf各个选项的含义 为什么学习各类软件在Linux上的部署 一 MySQL数据库管理系统安装部署【简单】MySQL5.7版本在CentOS系统安装MySQL8.0版本在CentOS系统安装MySQL5.7版本在Ubuntu&#xff08;WSL环境&#xff09;系统…...

FFmpeg入门

在音视频处理领域&#xff0c;有一款神器级的工具横扫开发者圈&#xff0c;那就是 FFmpeg。它被誉为“音视频处理的瑞士军刀”&#xff0c;凭借强大的功能和开源的特性成为众多开发者和媒体从业者的首选。今天&#xff0c;我们就来聊聊 FFmpeg 的入门使用&#xff0c;带你轻松开…...

如何将 sqlserver 数据迁移到 mysql

文章目录 前言一、导出SQL Server 数据二、转换数据格式为MySQL兼容格式三、导入数据到MySQL数据库五、使用ETL工具六、通过 navicat 工具七、总结 前言 将 SQL Server 数据迁移到 MySQL 是一个常见的数据库迁移任务&#xff0c;通常涉及以下几个关键步骤&#xff1a;导出 SQL…...

【leetcode 13】哈希表 242.有效的字母异位词

原题链接 题解链接 一般哈希表都是用来快速判断一个元素是否出现集合里。 当我们想使用哈希法来解决问题的时候&#xff0c;我们一般会选择如下三种数据结构。 数组 set &#xff08;集合&#xff09; map(映射) 如果在做面试题目的时候遇到需要判断一个元素是否出现过的场景…...

git - 用SSH方式迁出远端git库

文章目录 git - 用SSH方式迁出远端git库概述笔记以gitee为例产生RSA密钥对 备注githubEND git - 用SSH方式迁出远端git库 概述 最近一段时间&#xff0c;在网络没问题的情况下&#xff0c;用git方式直接迁出git库总是会失败。 失败都是在远端, 显示RPC错误。 但是git服务器端…...

21天学通C++——9.5复制构造函数

浅复制 复制类对象时只是单纯的复制所有的值&#xff0c;如指针只会复制指针的大小&#xff0c;而不会再开辟同一空间大小的内存&#xff0c;即两个指针指向同一片内存空间。 伪代码&#xff1a; class MyString { private:char*buffer; public:MyString(const char* initStri…...

GPT 系列论文精读:从 GPT-1 到 GPT-4

学习 & 参考资料 前置文章 Transformer 论文精读 机器学习 —— 李宏毅老师的 B 站搬运视频 自监督式学习(四) - GPT的野望[DLHLP 2020] 來自猎人暗黑大陆的模型 GPT-3 论文逐段精读 —— 沐神的论文精读合集 GPT&#xff0c;GPT-2&#xff0c;GPT-3 论文精读【论文精读】…...

【python】OpenCV—Local Translation Warps

文章目录 1、功能描述2、原理分析3、代码实现4、效果展示5、完整代码6、参考 1、功能描述 利用液化效果实现瘦脸美颜 交互式的液化效果原理来自 Gustafsson A. Interactive image warping[D]. , 1993. 2、原理分析 上面描述很清晰了&#xff0c;鼠标初始在 C&#xff0c;也即…...

elasticsearch集群部署

一、创建 elasticsearch-cluster 文件夹 创建 elasticsearch-7.6.2-cluster文件夹 修改服务es服务文件夹为node-001 修改config/elasticsearch.yml 配置文件 # Elasticsearch Configuration # # NOTE: Elasticsearch comes with reasonable defaults for most settings. # …...

python调用window库全屏截图生成bmp位图学习

import io import time import struct import ctypes s time.time() gdi32 ctypes.windll.gdi32 user32 ctypes.windll.user32# 定义常量 SM_CXSCREEN 0 SM_CYSCREEN 1# 缩放比例 zoom 1 screenWidth int(user32.GetSystemMetrics(SM_CXSCREEN) * zoom) screenHeight i…...

Wireshark使用

1.抓包过滤器--BPF语法 类型Type&#xff1a;主机&#xff08;host&#xff09;、网段&#xff08;net&#xff09;、端口&#xff08;port&#xff09; 方向Dir&#xff1a;源地址&#xff08;src&#xff09;、目标地址&#xff08;dst&#xff09; 协议Proto&#xff1a;各种…...

FLASK 上传文件

HTML form enctype"multipart/form-data" 编码类型说明application/x-www-form-urlencoded表单数据编码为名称/值对。 这是标准编码格式。multipart/form-data表单数据编码为消息&#xff0c;页面上每个控件都有单独的部分。text/plain表单数据以纯文本编码&#x…...

卷积神经网络

卷积神经网络 随着输入数据规模的增大&#xff0c;计算机视觉的处理难度也大幅增加。 64 64 3 64 \times 64 \times 3 64643 的图片特征向量维度为12288&#xff0c;而 1000 1000 3 1000 \times 1000 \times 3 100010003 的图片数据量达到了300万。随着数据维度的增加&am…...

SparrowRTOS系列:链表版本内核

前言 Sparrow RTOS是笔者之前写的一个极简性RTOS&#xff0c;初代版本只有400行&#xff0c;后面笔者又添加了消息队列、信号量、互斥锁三种IPC机制&#xff0c;使之成为一个较完整、堪用的内核&#xff0c;初代版本以简洁为主&#xff0c;使用数组和表作为任务挂载的抽象数据…...

【redis初阶】环境搭建

​​​​​​​ 目录 ​​​​​​​ 一、Ubuntu 安装 redis 二、Centos7 安装 redis 三、Centos8 安装 redis 四、redis客户端介绍 redis学习&#x1f973; 一、Ubuntu 安装 redis 使用 apt 安装 apt install redis -y 查看redis版本 redis-server --version 支持远程连接…...

OpenCV相机标定与3D重建(54)解决透视 n 点问题(Perspective-n-Point, PnP)函数solvePnP()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 根据3D-2D点对应关系找到物体的姿态。 cv::solvePnP 是 OpenCV 库中的一个函数&#xff0c;用于解决透视 n 点问题&#xff08;Perspective-n-Po…...

shell脚本回顾1

1、shell 脚本写出检测 /tmp/size.log 文件如果存在显示它的内容&#xff0c;不存在则创建一个文件将创建时间写入。 一、 ll /tmp/size.log &>/dev/null if [ $? -eq 0 ];then cat /tmp/size.log else touch /tmp/size.log echo date > /tmp/size.log fi二、 if …...

HarmonyOS命令行工具

作为一个从Android转过来的鸿蒙程序猿&#xff0c;在开发过程中不由自主地想使用类似adb命令的命令行工具去安装/卸载应用&#xff0c;往设备上推或者拉去文件&#xff0c;亦或是抓一些日志。但是发现在鸿蒙里边&#xff0c;华为把命令行工具分的很细&#xff0c;种类相当丰富 …...

V少JS基础班之第四弹

一、 前言 第四弹内容是操作符。 本章结束。第一个月的内容就完成了&#xff0c; 是一个节点。 下个月我们就要开始函数的学习了。 我们学习完函数之后。很多概念就可以跟大家补充说明了。 OK&#xff0c;那我们就开始本周的操作符学习 本系列为一周一更&#xff0c;计划历时6…...

从前端视角看设计模式之创建型模式篇

设计模式简介 "设计模式"源于GOF&#xff08;四人帮&#xff09;合著出版的《设计模式&#xff1a;可复用的面向对象软件元素》&#xff0c;该书第一次完整科普了软件开发中设计模式的概念&#xff0c;他们提出的设计模式主要是基于以下的面向对象设计原则&#xff…...

网络应用技术 实验七:实现无线局域网

一、实验简介 在 eNSP 中构建无线局域网&#xff0c;并实现全网移动终端互相通信。 二、实验目的 1 、理解无线局域网的工作原理&#xff1b; 2 、熟悉无线局域网的规划与构建过程&#xff1b; 3 、掌握无线局域网的配置方法&#xff1b; 三、实验学时 2 学时 四、实…...

Kotlin 循环语句详解

文章目录 循环类别for-in 循环区间整数区间示例1&#xff1a;正向遍历示例2&#xff1a;反向遍历 示例1&#xff1a;遍历数组示例2&#xff1a;遍历区间示例3&#xff1a;遍历字符串示例4&#xff1a;带索引遍历 while 循环示例&#xff1a;计算阶乘 do-while 循环示例&#xf…...

B+树的原理及实现

文章目录 B树的原理及实现一、引言二、B树的特性1、结构特点2、节点类型3、阶数 三、B树的Java实现1、节点实现2、B树操作2.1、搜索2.2、插入2.3、删除2.4、遍历 3、B树的Java实现示例 四、总结 B树的原理及实现 一、引言 B树是一种基于B树的树形数据结构&#xff0c;它在数据…...

ArkTS 基础语法:声明式 UI 描述与自定义组件

1. ArkTS 简介 ArkTS 是 HarmonyOS 应用开发中的一种编程语言&#xff0c;它结合了 TypeScript 的类型检查和声明式 UI 描述方式&#xff0c;帮助开发者更高效地构建用户界面。 2. 声明式 UI 描述 ArkTS 使用声明式语法来定义 UI 结构&#xff0c;通过组件、属性和事件配置实…...

list的模拟实现详解

文章目录 list的模拟实现list的迭代器begin()和end() list的模拟实现 #pragma once #include<iostream> #include<list>using namespace std;namespace wbc {// 类模版template<class T>struct list_node // 链表的节点{T _data;list_node<T>* _next;…...

图解Git——分支的新建与合并《Pro Git》

⭐分支的新建与合并 先引入一个实际开发的工作流&#xff1a; 开发某个网站。为实现某个新的需求&#xff0c;创建一个分支。在这个分支上开展工作。 正在此时&#xff0c;你突然接到一个电话说有个很严重的问题需要紧急修补。你将按照如下方式来处理&#xff1a; 切换到你…...

SQLite 语法快速入门

SQLite 是一个软件库&#xff0c;实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。 提供一个免费的在线SQLite编辑器 &#xff08;0&#xff09;常用命令 # 格式化 .header on .mode column .timer on# 查看表格 .tables# 查看表结构(建表语句) .schema …...

高速光电探测器设计 PIN APD铟镓砷TIA放大脉冲误码测试800-1700nm

高速光电探测器PIN APD铟镓砷TIA放大脉冲误码测试800-1700nm &#xff08;对标:索雷博APD431A&#xff09; &#xff08;对标:索雷博APD431A&#xff09; &#xff08;对标:索雷博APD431A&#xff09; 规格参数: 波长范围:800-1700nm 输出带宽:DC-400MHz&#xff08;-3dB&…...

【Linux】【内存】Buddy内存分配基础 NUMA架构

【Linux】【内存】Buddy内存分配基础 NUMA架构 NUMA架构 在 NUMA 架构中&#xff0c;计算机的多个 CPU 被划分为不同的处理单元&#xff0c;每个处理单元有一个本地内存。这些内存被称为内存节点&#xff08;memory node&#xff09;。处理器尽量访问自己的本地内存 node_da…...