day10_Structured Steaming
文章目录
- Structured Steaming
- 一、结构化流介绍(了解)
- 1、有界和无界数据
- 2、基本介绍
- 3、使用三大步骤(掌握)
- 4.回顾sparkSQL的词频统计案例
- 二、结构化流的编程模型(掌握)
- 1、数据结构
- 2、读取数据源
- 2.1 File Source
- 2.2 Socket Source
- 2.3 Rate Source
- 3、数据处理
- 4、数据输出
- 4.1 输出模式
- 4.1.1 append 模式
- 4.1.2 complete模式
- 4.1.3 update模式
- 4.2 输出终端/位置
- 5、综合案例(练习)
- 词频统计_读取文件方式
- 词频统计_Socket方式
- 自动生成数据_Rate方式
- 6、设置触发器Trigger
- 7、CheckPoint检查点目录设置
- JSON是什么?
- 三、Spark 和 Kafka 整合(掌握)
- 0、整合Kafka准备工作
- 1.spark和kafka集成
- 1.1 官网文档链接:
- 1.2 常见选项:
- 1.3 常见参数
- 2、从kafka中读取数据
- 2.1 流式处理
- 官方示例:
- 练习示例
- 2.2 批处理
- 官方示例:
- 演示示例
- 3、数据写入Kafka中
- 3.1 流式处理
- 官方示例:
- 练习示例
- 3.2 批处理
- 官方示例:
- 演示示例
- 01_回顾sparkSQL词频统计过程.py
- 02_结构化流词频统计案例_读取文件方式.py
- 03_结构化流词频统计案例_socket方式.py
- 04_结构化流词频统计案例_设置触发器和检查点.py
- 05_流方式读取kafka数据.py
- 06_流方式写数据到kafka.py
Structured Steaming
简单来说:Structured Streaming是Spark提供的一种流处理引擎,就像是“实时数据处理的流水线”,能够以批处理的方式处理实时数据流。
具体而言:
- 核心概念:
- 流式DataFrame:将实时数据流视为一个无限扩展的DataFrame,支持类似批处理的API。
- 触发器:控制流处理的时间间隔,如每1秒处理一次数据。
- 输出模式:支持多种输出模式,如
append
(追加)、update
(更新)和complete
(完整输出)。- 特点:
- 易用性:提供与Spark SQL一致的API,降低学习成本。
- 容错性:通过检查点机制(Checkpoint)确保数据处理的可靠性。
- 扩展性:支持从Kafka、文件系统等多种数据源读取数据,并输出到多种目标系统。
实际生产场景:
- 在实时监控中,使用Structured Streaming处理传感器数据,实时生成报警。
- 在用户行为分析中,使用Structured Streaming处理点击流数据,实时更新用户画像。
总之:Structured Streaming通过易用的API和强大的容错机制,为实时数据处理提供了高效、可靠的解决方案,广泛应用于实时监控、用户行为分析等场景。
一、结构化流介绍(了解)
1、有界和无界数据
简单来说:有界数据就像是“有限的书本”,数据量固定且已知;无界数据则像是“无限的河流”,数据持续生成且量未知。
具体而言:
- 有界数据:
- 定义:数据量固定且已知,处理完成后任务结束。
- 示例:存储在文件或数据库中的历史数据。
- 处理方式:适合批处理(Batch Processing),如使用Spark的RDD或DataFrame处理。
- 无界数据:
- 定义:数据持续生成且量未知,处理任务通常不会结束。
- 示例:实时日志流、传感器数据、用户点击流。
- 处理方式:适合流处理(Stream Processing),如使用Spark的Structured Streaming或Flink处理。
实际生产场景:
- 在历史数据分析中,使用有界数据进行批处理,生成报表和洞察。
- 在实时监控中,使用无界数据进行流处理,实时生成报警和推荐。
总之:有界数据和无界数据分别适合批处理和流处理,根据数据特点选择合适的处理方式,能够高效地完成数据分析和处理任务。
- 有界数据:
有界数据: 指的数据有固定的开始和固定的结束,数据大小是固定。我们称之为有界数据。对于有界数据,一般采用批处理方案(离线计算)特点:1-数据大小是固定2-程序处理有界数据,程序最终一定会停止
- 无界数据:
无界数据: 指的数据有固定的开始,但是没有固定的结束。我们称之为无界数据
注意: 对于无界数据,我们一般采用流式处理方案(实时计算)特点:1-数据没有明确的结束,也就是数据大小不固定2-数据是源源不断的过来3-程序处理无界数据,程序会一直运行不会结束
2、基本介绍
结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL …
Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针对的有界的数据集,但是为了能够兼容实时计算的处理场景,提供微批处理模型,本质上还是批处理,只不过批与批之间的处理间隔时间变短了,让我们感觉是在进行流式的计算操作,目前默认的微批可以达到100毫秒一次
真正的流处理引擎: Storm(早期流式处理引擎)、Flink、Flume(流式数据采集)
3、使用三大步骤(掌握)
StructuredStreaming在进行数据流开发时的三个步骤
- 1、读取数据流数据 : 指定数据源模式
- sparksession对象.readStream.format(指定读取的数据源).option(指定读取的参数).load()
- 2、数据处理: 使用dsl或者sql方式计算数据和SparkSQL操作一样
- 3、将计算的结果保存 : 指定输出模式,指定位置
- writeStream.
outputMode
(输出模式).option(输出的参数配置).format
(指定输出位置).start
().awaitTermination()
- writeStream.
4.回顾sparkSQL的词频统计案例
# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.read\.format('text')\.load('file:///export/data/spark_project/structured_Streaming/data/w1.txt')# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式dsl_df = df.select(F.explode(F.split('value',' ')).alias('words')).groupBy('words').agg(F.count('words').alias('cnt'))# 4.数据输出sql_df.show()dsl_df.show()# 5.关闭资源spark.stop()
二、结构化流的编程模型(掌握)
1、数据结构
在结构化流中,我们可以将DataFrame称为无界的DataFrame或者无界的二维表
2、读取数据源
对应官网文档内容:
https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#input-sources
结构化流默认提供了多种数据源,从而可以支持不同的数据源的处理工作。目前提供了如下数据源:
-
File Source:文件数据源。读取文件系统,一般用于测试。如果文件夹下发生变化,有新文件产生,那么就会触发程序的运行
-
Socket Source:网络套接字数据源,一般用于测试。也就是从网络上消费/读取数据
-
Rate Source:速率数据源。了解即可,一般用于基准测试。通过配置参数,由结构化流自动生成测试数据。
-
Kafka Source:Kafka数据源。也就是作为消费者来读取Kafka中的数据。一般用于生产环境。
2.1 File Source
相关的参数:
option参数 | 描述说明 |
---|---|
maxFilesPerTrigger | 每次触发时要考虑的最大新文件数 (默认: no max) |
latestFirst | 是否先处理最新的新文件, 当有大量文件积压时有用 (默认: false) |
fileNameOnly | 是否检查新文件只有文件名而不是完整路径(默认值:false)将此设置为 true 时,以下文件将被视为同一个文件,因为它们的文件名“dataset.txt”相同: “file:///dataset.txt” “s3://a/dataset.txt " “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt” |
将目录中写入的文件作为数据流读取,支持的文件格式为:text、csv、json、orc、parquet。。。。
文件数据源特点:
1- 只能监听目录,不能监听具体的文件
2- 可以通过*通配符的形式监听目录中满足条件的文件
3- 如果监听目录中有子目录,那么无法监听到子目录的变化情况
读取代码通用格式:
# 原生API
sparksession.readStream.format('CSV|JSON|Text|Parquet|ORC...').option('参数名1','参数值1').option('参数名2','参数值2').option('参数名N','参数值N').schema(元数据信息).load('需要监听的目录地址')# 简化API
针对具体数据格式,还有对应的简写API格式,例如:sparksession.readStream.csv(path='需要监听的目录地址',schema=元数据信息。。。)
可能遇到的错误一:
原因: 如果是文件数据源,需要手动指定schema信息
可能遇到的错误二:
原因: File source只能监听目录,不能监听具体文件
2.2 Socket Source
首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据下载命令: yum -y install nc执行nc命令, 开启端口号, 写入数据: nc -lk 端口号查看端口号是否被使用命令: netstat -nlp | grep 要查询的端口
注意: 要先启动nc,再启动我们的程序
代码格式:df = spark.readStream \.format('socket') \.option('host', '主机地址') \.option('port', '端口号') \.load()
2.3 Rate Source
此数据源的提供, 主要是用于进行基准测试
option参数 | 描述说明 |
---|---|
rowsPerSecond | 每秒应该生成多少行 : (例如 100,默认值:1) |
rampUpTime | 在生成速度变为rowsPerSecond之前应该经过多久的加速时间(例如5 s,默认0) |
numPartitions | 生成行的分区: (例如 10,默认值:Spark 的默认并行度) |
3、数据处理
指的是数据处理部分,该操作和Spark SQL中是完全一致。可以使用SQL方式进行处理,也可以使用DSL方式进行处理。
4、数据输出
在结构化流中定义好DataFrame或者处理好DataFrame之后,调用**writeStream()**方法完成数据的输出操作。在输出的过程中,我们可以设置一些相关的属性,然后启动结构化流程序运行。
4.1 输出模式
可能遇到的错误:
原因: 在结构化流中不能调用show()方法
解决办法: 需要使用writeStream().start()进行结果数据的输出
在进行数据输出的时候,必须通过outputMode来设置输出模式。输出模式提供了3种不同的模式:
append模式:
- 定义:只输出新增的数据,适用于不需要更新历史结果的场景。
- 示例:实时日志处理中,只输出新产生的日志记录。
update模式:
- 定义:输出新增或更新的数据,适用于需要更新历史结果的场景。
- 示例:实时用户行为分析中,输出用户的最新行为数据。
complete模式:
- 定义:输出完整的结果集,适用于需要全局统计结果的场景。
- 示例:实时销售统计中,输出所有销售数据的汇总结果。实际生产场景:
- 在实时日志处理中,使用
append
模式输出新日志记录。- 在实时用户行为分析中,使用
update
模式输出用户的最新行为数据。- 在实时销售统计中,使用
complete
模式输出所有销售数据的汇总结果。
-
1- append模式:增量模式 (默认)
特点:当结构化程序处理数据的时候,如果有了新数据,才会触发执行。而且该模式只支持追加。不支持数据处理阶段有聚合的操作。如果有了聚合操作,直接报错。而且也不支持排序操作。如果有了排序,直接报错。
-
2- complete模式:完全(全量)模式
特点:当结构化程序处理数据的时候,每一次都是针对全量的数据进行处理。由于数据越来越多,所以在数据处理阶段,必须要有聚合操作。如果没有聚合操作,直接报错。另外还支持排序,但是不是强制要求。
-
3- update模式:更新模式
特点:支持聚合操作。当结构化程序处理数据的时候,如果处理阶段没有聚合操作,该模式效果和append模式是一致。如果有了聚合操作,只会输出有变化和新增的内容。但是不支持排序操作,如果有了排序,直接报错。
4.1.1 append 模式
1- append模式:增量模式
特点:当结构化程序处理数据的时候,如果有了新数据,才会触发执行。而且该模式只支持追加。不支持数据处理阶段有聚合的操作。如果有了聚合操作,直接报错。而且也不支持排序操作。如果有了排序,直接报错。
如果有了聚合操作,会报如下错误:
如果有了排序操作,会报如下错误:
4.1.2 complete模式
2- complete模式:完全(全量)模式
特点:当结构化程序处理数据的时候,每一次都是针对全量的数据进行处理。由于数据越来越多,所以在数据处理阶段,必须要有聚合操作。如果没有聚合操作,直接报错。另外还支持排序,但是不是强制要求。
如果没有聚合操作,会报如下错误:
4.1.3 update模式
3- update模式:更新模式
特点:支持聚合操作。当结构化程序处理数据的时候,如果处理阶段没有聚合操作,该模式效果和append模式是一致。如果有了聚合操作,只会输出有变化和新增的内容。但是不支持排序操作,如果有了排序,直接报错。
如果有了排序操作,会报如下错误:
4.2 输出终端/位置
默认情况下,Spark的结构化流支持多种输出方案:
1- console sink: 将结果数据输出到控制台。主要是用在测试中,并且支持3种输出模式2- File sink: 输出到文件。将结果数据输出到某个目录下,形成文件数据。只支持append模式3- foreach sink 和 foreachBatch sink: 将数据进行遍历处理。遍历后输出到哪里,取决于自定义函数。并且支持3种输出模式4- memory sink: 将结果数据输出到内存中。主要目的是进行再次的迭代处理。数据大小不能过大。支持append模式和complete模式5- Kafka sink: 将结果数据输出到Kafka中。类似于Kafka中的生产者角色。并且支持3种输出模式
5、综合案例(练习)
需求: 已知文件中存储了多个单词,要求计算统计出现的次数
词频统计_读取文件方式
# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder\.config('spark.sql.shuffle.partitions',1)\.appName('pyspark_demo')\.master('local[*]')\.getOrCreate()# 2.数据输入# 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读df = spark.readStream\.format('text')\.load('file:///export/data/spark_project/structured_Streaming/data/')# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式dsl_df = df.select(F.explode(F.split('value',' ')).alias('words')).groupBy('words').agg(F.count('words').alias('cnt'))# 4.数据输出# 注意: 输出不能使用原来sparksql的show()# 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可sql_df.writeStream.format('console').outputMode('complete').start()dsl_df.writeStream.format('console').outputMode('complete').start().awaitTermination()# 5.关闭资源spark.stop()
词频统计_Socket方式
首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据下载命令: yum -y install nc# 注意: 端口号: 范围0-65535 但是0-1024都是知名端口号查看端口号是否被使用命令: netstat -nlp | grep 55555执行nc命令, 开启端口号(选择没有被占用), 写入数据: nc -lk 55555
注意: 要先启动nc,再启动我们的程序
代码格式:df = spark.readStream \.format('socket') \.option('host', '主机地址') \.option('port', '端口号') \.load()
# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder\.config('spark.sql.shuffle.partitions',1)\.appName('pyspark_demo')\.master('local[*]')\.getOrCreate()# 2.数据输入# 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读df = spark.readStream\.format('socket')\.option('host',"192.168.88.161")\.option('port',"55555")\.load()# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式dsl_df = df.select(F.explode(F.split('value',' ')).alias('words')).groupBy('words').agg(F.count('words').alias('cnt'))# 4.数据输出# 注意: 输出不能使用原来sparksql的show()# 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可sql_df.writeStream.format('console').outputMode('complete').start()dsl_df.writeStream.format('console').outputMode('complete').start().awaitTermination()# 5.关闭资源spark.stop()
自动生成数据_Rate方式
from pyspark.sql import SparkSession
import osos.environ["SPARK_HOME"] = "/export/server/spark"
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"if __name__ == '__main__':# 1.创建SparkSession对象spark = SparkSession.builder \.appName("StructuredStream_rate") \.master('local[*]') \.getOrCreate()# 2。读取数据df = spark.readStream \.format('rate') \.option("rowsPerSecond", "5") \.option('numPartitions', 1) \.load()# 3.数据处理# 略# 4.数据输出:df.writeStream \.format('console') \.outputMode('update') \.option('truncate', 'false') \.start() \.awaitTermination()# 5.关闭资源spark.stop()
6、设置触发器Trigger
触发器Trigger:决定多久执行一次操作并且输出结果。也就是在结构化流中,处理完一批数据以后,等待一会,再处理下一批数据
主要提供如下几种触发器:
-
1- 默认方案:也就是不使用触发器的情况。如果没有明确指定,那么结构化流会自动进行决策每一个批次的大小。在运行过程中,会尽可能让每一个批次间的间隔时间变得更短
result_df.writeStream\.outputMode('append')\.start()\.awaitTermination()
-
2- 配置固定的时间间隔:在结构化流运行的过程中,当一批数据处理完以后,下一批数据需要等待一定的时间间隔才会进行处理**(常用,推荐使用)**
result_df.writeStream\.outputMode('append')\.trigger(processingTime='5 seconds')\.start()\.awaitTermination()情形说明: 1- 上一批次的数据在时间间隔内处理完成了,那么会等待我们配置触发器固定的时间间隔结束,才会开始处理下一批数据 2- 上一批次的数据在固定时间间隔结束的时候才处理完成,那么下一批次会立即被处理,不会等待 3- 上一批次的数据在固定时间间隔内没有处理完成,那么下一批次会等待上一批次处理完成以后立即开始处理,不会等待
-
3- 仅此一次:在运行的过程中,程序只需要执行一次,然后就退出。这种方式适用于进行初始化操作,以及关闭资源等
result_df.writeStream.foreachBatch(func)\.outputMode('append')\.trigger(once=True)\.start()\.awaitTermination()
7、CheckPoint检查点目录设置
设置检查点,目的是为了提供容错性。当程序出现失败了,可以从检查点的位置,直接恢复处理即可。避免出现重复处理的问题
默认位置: hdfs的/tmp/xxx
如何设置检查点:
1- SparkSession.conf.set("spark.sql.streaming.checkpointLocation", "检查点路径")
2- option("checkpointLocation", "检查点路径")推荐: 检查点路径支持本地和HDFS。推荐使用HDFS路径
检查点目录主要包含以下几个目录位置:
1-偏移量offsets: 记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据。在处理数据之前会将offset信息写入到该目录2-提交记录commits: 记录已经处理完成的批次。重启任务的时候会检查完成的批次和offsets目录中批次的记录进行对比。确定接下来要处理的批次3-元数据文件metadata: 和整个查询关联的元数据信息,目前只保留当前的job id4-数据源sources: 是数据源(Source)各个批次的读取的详情5-数据接收端sinks: 是数据接收端各个批次的写出的详情6-状态state: 当有状态操作的时候,例如:累加、聚合、去重等操作场景,这个目录会用来记录这些状态数据。根据配置周期性的生成。snapshot文件用于记录状态
JSON是什么?
简单来说:JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,就像是“数据的通用语言”,易于人阅读和编写,也易于机器解析和生成。
具体而言:
- 结构:
- 对象:用花括号
{}
表示,包含键值对,键和值之间用冒号:
分隔,键值对之间用逗号,
分隔。- 数组:用方括号
[]
表示,包含多个值,值之间用逗号,
分隔。- 值:可以是字符串、数字、布尔值、对象、数组或
null
。- 示例:
{"name": "Alice","age": 30,"isStudent": false,"courses": ["Math", "Science"],"address": {"city": "Beijing","zip": "100000"} }
- 特点:
- 轻量级:相比于XML,JSON格式更简洁,数据量更小。
- 易读性:结构清晰,易于人阅读和编写。
- 跨平台:支持多种编程语言,如JavaScript、Python、Java等。
实际生产场景:
- 在Web开发中,使用JSON作为前后端数据交换的格式。
- 在API设计中,使用JSON作为请求和响应的数据格式。
- 在配置文件中,使用JSON存储配置信息。
总之:JSON是一种轻量级、易读、跨平台的数据交换格式,广泛应用于Web开发、API设计和配置文件等领域。
三、Spark 和 Kafka 整合(掌握)
Spark天然支持集成Kafka, 基于Spark读取Kafka中的数据, 同时可以实施精准一次(仅且只会处理一次)的语义, 作为程序员, 仅需要关心如何处理消息数据即可, 结构化流会将数据读取过来, 转换为一个DataFrame的对象, DataFrame就是一个无界的DataFrame, 是一个无限增大的表
0、整合Kafka准备工作
说明: Jar包上传的位置说明
如何放置相关的Jar包? 1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,目录位置: /export/server/spark/jars2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包目录位置: /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下hdfs的spark的jars目录下: hdfs://node1:8020/spark/jars请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案: spark-submit --jars jar包路径jar包下载地址: https://mvnrepository.com/
1.spark和kafka集成
1.1 官网文档链接:
https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html
1.2 常见选项:
选项 | 值 | 解释 |
---|---|---|
kafka.bootstrap.servers | 以英文逗号分隔的host:port列表 | 指定kafka服务的地址 |
subscribe | 以逗号分隔的Topic主题列表 | 订阅一个主题topic1或者多个主题topic1,topic2 |
subscribePattern | 正则表达式字符串 | 订阅主题的模式。可以用 topic.* 代表多个主题 |
assign | 通过一个Json 字符串的方式来表示: {“topicA”:[0,1],“topicB”:[2,4]} | 要使用的特定TopicPartitions |
includeHeaders | 默认false | 是否在行中包含Kafka headers。 |
startingOffsets | 流或者批的查询开始时的起始点: “earliest”(批默认), “latest” (流默认), or json string json串格式如下 { “topicA”: {“0”:23,“1”:-1}, “topicB”:{“0”:-2} } | “earliest”表示最早的偏移量, “latest”表示最近的偏移量, 或每个TopicPartition起始偏移量的json字符串。在json中,-2作为偏移量表示最早,-1表示最晚。注意: 对于批量查询:不允许使用latest(无论是隐式查询还是在json中使用-1)。 对于流查询: 这只适用于新查询开始时,恢复总是从查询结束的地方继续。在查询期间新发现的分区将最早开始。 |
endingOffsets | 批量查询结束时的结束点: latest(默认) , or json string {“topicA”:{“0”:23,“1”:-1},“topicB”:{“0”:-1}} | “latest”,指的是最新的, 或每个TopicPartition结束偏移量的json字符串。在json中,-1可以用来表示最近的偏移量,-2(最早的)是不允许的! |
1.3 常见参数
参数 | 类型 | 解释 |
---|---|---|
topic | string | 表示消息是从哪个Topic中消费出来 |
value | binary | 最重要的字段。发送数据的value值,也就是消息内容。如果没有,就为null |
key | binary | 发送数据的key值。如果没有,就为null |
partition | int | 分区编号。表示消费到的该条数据来源于Topic的哪个分区 |
offset | long | 表示消息偏移量 |
timestamp | timestamp | 接收的时间戳 |
2、从kafka中读取数据
2.1 流式处理
官方示例:
# 订阅Kafka的一个Topic,从最新的消息数据开始消费
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 订阅Kafka的多个Topic,多个Topic间使用英文逗号进行分隔。从最新的消息数据开始消费
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1,topic2") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 订阅符合规则的Topic,从最新的数据开始消费
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribePattern", "topic.*") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 订阅一个Topic,并且指定header信息
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1") \.option("includeHeaders", "true") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
练习示例
从某一个Topic中读取消息数据
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费,可以使用startingoffsets从最早的地方开始消费df = spark.readStream\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092")\.option("subscribe","itheima")\.option("startingoffsets","earliest")\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df = df.select(df.topic,F.decode(df.value, 'utf8').alias('key'),F.decode(df.key,'utf8').alias('value'),df.partition,df.offset,df.timestamp,df.timestampType)# 获取数据etl_df.writeStream.format("console").outputMode("append").start().awaitTermination()# 3- 数据处理# result_df1 = df.select(F.expr("cast(value as string) as value"))# # selectExpr = select + F.expr# result_df2 = df.selectExpr("cast(value as string) as value")# result_df3 = df.withColumn("value",F.expr("cast(value as string)"))# 4- 数据输出# 5- 启动流式任务"""如果有多个输出,那么只能在最后一个start的后面写awaitTermination()"""# result_df1.writeStream.queryName("result_df1").format("console").outputMode("append").start()# result_df2.writeStream.queryName("result_df2").format("console").outputMode("append").start()# result_df3.writeStream.queryName("result_df3").format("console").outputMode("append").start().awaitTermination()
2.2 批处理
官方示例:
# 订阅一个Topic主题, 默认从最早到最晚的偏移量范围
df = spark \.read \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribe", "topic1") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 通过正则匹配多个Topic, 默认从最早到最晚的偏移量范围
df = spark \.read \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("subscribePattern", "topic.*") \.option("startingOffsets", "earliest") \.option("endingOffsets", "latest") \.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 订阅多个主题,明确指定Kafka偏移量
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
演示示例
订阅一个Topic
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('sparksql_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从Topic开头一直消费到结尾df = spark.read\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("subscribe","itheima")\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df = df.select(F.expr("cast(key as string) as key"),F.decode(df.key,'utf8'),F.expr("cast(value as string) as value"),F.decode(df.value, 'utf8'),df.topic,df.partition,df.offset)# 获取数据etl_df.show()# # 3- 数据处理# result_df1 = init_df.select(F.expr("cast(value as string) as value"))# # selectExpr = select + F.expr# result_df2 = init_df.selectExpr("cast(value as string) as value")# result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))# # 4- 数据输出# print("result_df1")# result_df1.show()# print("result_df2")# result_df2.show()# print("result_df3")# result_df3.show()# # 5- 释放资源# spark.stop()
3、数据写入Kafka中
3.1 流式处理
官方示例:
# 将Key和Value的数据都写入到Kafka当中
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()# 将Key和Value的数据都写入到Kafka当中。使用DataFrame数据中的Topic字段来指定要将数据写入到Kafka集群
# 的哪个Topic中。这种方式适用于消费多个Topic的情况
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
练习示例
写出到指定Topic
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费init_df = spark.readStream\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("subscribe","itheima")\.load()# 3- 数据处理result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))# 4- 数据输出# 注意: 咱们修改完直接保存到kafka的itcast主题中,所以控制台没有数据,这是正常的哦!!!# 5- 启动流式任务result_df.writeStream\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("topic","itcast")\.option("checkpointLocation", "hdfs://node1:8020/ck")\.start()\.awaitTermination()
3.2 批处理
官方示例:
# 从DataFrame中写入key-value数据到一个选项中指定的特定Kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \.write \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.option("topic", "topic1") \.save()# 使用数据中指定的主题将key-value数据从DataFrame写入Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \.write \.format("kafka") \.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \.save()
演示示例
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费init_df = spark.read\.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("subscribe","itheima")\.load()# 3- 数据处理result_df = init_df.select(F.expr("concat(cast(value as string),'_666') as value"))# 4- 数据输出# 5- 启动流式任务result_df.write.format("kafka")\.option("kafka.bootstrap.servers","node1:9092,node2:9092")\.option("topic","itcast")\.option("checkpointLocation", "hdfs://node1:8020/ck")\.save()
01_回顾sparkSQL词频统计过程.py
# 导包
import os
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 1.读取文件生成dfdf = spark.read.text("file:///export/data/spark_project/09_结构化流/data3.txt")# df.show()# 2.数据处理etl_df = df.dropDuplicates().fillna('未知')# 3.数据分析# 需求: 统计每个单词出现的次数# 方式1: sql方式etl_df.createTempView("word_tb")sql_result_df = spark.sql("""with t as (select explode(split(value," ")) as wordfrom word_tb)select word,count(*) as cnt from t group by word""")# 方式2: dsl方式dsl_result_df = etl_df.select(F.explode(F.split("value", " ")).alias("word")).groupby("word").agg(F.count("word").alias("cnt"))# 4.数据展示/导出sql_result_df.show()dsl_result_df.show()# 注意: 最后一定释放资源spark.stop()
02_结构化流词频统计案例_读取文件方式.py
# 导包
import os
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder \.config('spark.sql.shuffle.partitions', 1) \.appName('pyspark_demo') \.master('local[*]') \.getOrCreate()# 2.TODO 数据输入# 注意: 路径必须是目录路径,因为readStream会自动读取此目录下的所有文件,有新增会触发接着读df = spark.readStream \.format('text') \.load('file:///export/data/spark_project/09_结构化流/data/')# 查看数据类型print(type(df))# 3.数据处理(切分,转换,分组聚合)# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式 略# 4.数据输出# 注意: 输出不能使用原来sparksql的show(),否则报错# 注意: 如果需要多开启多个输出,.awaitTermination()只需要在最后一个出现即可sql_df.writeStream.format('console').outputMode('complete').start().awaitTermination()# 注意: 最后一定释放资源spark.stop()
03_结构化流词频统计案例_socket方式.py
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 1.读取socket发来的消息df = spark.readStream \.format('socket') \.option('host', '192.168.88.161') \.option('port', '55555') \.load()# 2.数据处理# 3.数据分析# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式 略# 4.数据输出sql_df.writeStream.format('console').outputMode('complete').start().awaitTermination()# 注意: 最后一定释放资源spark.stop()
04_结构化流词频统计案例_设置触发器和检查点.py
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# TODO: 设置检查点路径spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://node1:8020/ckpt2")# 1.读取socket发来的消息df = spark.readStream \.format('socket') \.option('host', '192.168.88.161') \.option('port', '55555') \.load()# 2.数据处理# 3.数据分析# 和SparkSQL操作一模一样,支持sql和dsl两种风格# SQL方式df.createTempView('tb')sql_df = spark.sql("""select words,count(1) as cntfrom (select explode(split(value,' ')) as words from tb) t group by words""")# DSL方式 略# 4.数据输出# TODO: .trigger(processingTime='5 seconds')添加触发器sql_df.writeStream.format('console').outputMode('complete').trigger(processingTime='5 seconds').start().awaitTermination()# 注意: 最后一定释放资源spark.stop()
05_流方式读取kafka数据.py
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费,可以使用startingoffsets从最早的地方开始消费df = spark.readStream\.format("kafka")\.option("kafka.bootstrap.servers","node2:9092")\.option("subscribe","kafka_spark1")\.option("startingoffsets","earliest")\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df = df.select(df.topic,F.decode(df.key, 'utf8').alias('key'),F.decode(df.value,'utf8').alias('value'),df.partition,df.offset,df.timestamp,df.timestampType)# 展示数据# 直接展示到控制台etl_df.writeStream.format("console").outputMode("append").start().awaitTermination()
06_流方式写数据到kafka.py
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('ss_read_kafka_1_topic')\.master('local[*]')\.getOrCreate()# 2- 数据输入# 默认从最新的地方开始消费,可以使用startingoffsets从最早的地方开始消费df = spark.readStream\.format("kafka")\.option("kafka.bootstrap.servers","node2:9092")\.option("subscribe","kafka_spark1")\.option("startingoffsets","earliest")\.load()# 查看类型print(type(df))# 注意: 字符串需要解码!!!etl_df = df.select(F.decode(df.value,'utf8').alias('value'))# TODO: 原来默认展示到控制台,接下来演示如何把数据存储到kafka中etl_df.writeStream\.format("kafka")\.option("kafka.bootstrap.servers","node2:9092")\.option("topic","kafka_spark2")\.option("checkpointLocation", "hdfs://node1:8020/ckpt3")\.start()\.awaitTermination()
相关文章:
day10_Structured Steaming
文章目录 Structured Steaming一、结构化流介绍(了解)1、有界和无界数据2、基本介绍3、使用三大步骤(掌握)4.回顾sparkSQL的词频统计案例 二、结构化流的编程模型(掌握)1、数据结构2、读取数据源2.1 File Source2.2 Socket Source…...
设计模式-工厂模式/抽象工厂模式
工厂模式 定义 定义一个创建对象的接口,让子类决定实列化哪一个类,工厂模式使一个类的实例化延迟到其子类; 工厂方法模式是简单工厂模式的延伸。在工厂方法模式中,核心工厂类不在负责产品的创建,而是将具体的创建工作…...
【算法学习】——整数划分问题详解(动态规划)
🧮整数划分问题是一个较为常见的算法题,很多问题从整数划分这里出发,进行包装,形成新的题目,所以完全理解整数划分的解决思路对于之后的进一步学习算法是很有帮助的。 「整数划分」通常使用「动态规划」解决࿰…...
【新教程】Ubuntu 24.04 单节点安装slurm
背景 网上教程老旧,不适用。 详细步骤 1、安装slurm sudo apt install slurm-wlm slurm-wlm-doc -y检查是否安装成功: slurmd --version如果得到slurm-wlm 23.11.4,表明安装成功。 2、配置slurm。 使用命令: sudo vi /etc/s…...
window下用vim
Windows 默认不支持 vim 命令,需要手动安装后才能使用。以下是解决方案: 1. 安装 Vim 编辑器 方法 1:通过 Scoop 或 Chocolatey 安装 使用 Scoop: 安装 Scoop(如果尚未安装):iwr -useb get.sco…...
citrix netscaler13.1 重写负载均衡响应头(基础版)
在 Citrix NetScaler 13.1 中,Rewrite Actions 用于对负载均衡响应进行修改,包括替换、删除和插入 HTTP 响应头。这些操作可以通过自定义策略来完成,帮助你根据需求调整请求内容。以下是三种常见的操作: 1. Replace (替换响应头)…...
使用PWM生成模式驱动BLDC三相无刷直流电机
引言 在 TI 的无刷直流 (BLDC) DRV8x 产品系列使用的栅极驱动器应用中,通常使用一些控制模式来切换MOSFET 开关的输出栅极。这些控制模式包括:1x、3x、6x 和独立脉宽调制 (PWM) 模式。 不过,DRV8x 产品系列(例如 DRV8311&…...
常见的php框架有哪几个?
一直以来,PHP作为一种广泛使用的编程语言,拥有着许多优秀的框架来帮助开发人员快速构建稳定的Web应用程序。本文降为大家介绍几种常见的PHP的主流框架,以及它们相关的特点和使用场景。如有问题,欢迎指正! 1.Laravel&a…...
机器学习(2):线性回归Python实现
1 概念回顾 1.1 模型假设 线性回归模型假设因变量y yy与自变量x xx之间的关系可以用以下线性方程表示: y β 0 β 1 ⋅ X 1 β 2 ⋅ X 2 … β n ⋅ X n ε y 是因变量 (待预测值);X1, X2, ... Xn 是自变量(特征)β0, β1,…...
Unity-Mirror网络框架-从入门到精通之RigidbodyPhysics示例
文章目录 前言示例一、球体的基础配置二、三个球体的设置差异三、示例意图LatencySimulation前言 在现代游戏开发中,网络功能日益成为提升游戏体验的关键组成部分。本系列文章将为读者提供对Mirror网络框架的深入了解,涵盖从基础到高级的多个主题。Mirror是一个用于Unity的开…...
【Unity-Animator】通过 StateMachineBehaviour 实现回调
StateMachineBehaviour 简介 StateMachineBehaviour是一个基类,所有状态脚本都派生自该类。它可以在状态机进入、退出或更新状态时执行代码,而无需编写自己的逻辑来测试和检测状态的变化。这使得开发者可以更方便地处理状态转换时的逻辑,例…...
并行服务、远程SSH无法下载conda,报错404
原下载代码无效,报错404 wget -c https://repo.anaconda.com/archive/Anaconda3-2023.03-1-Linux-x86_64.sh 使用下面代码下载 wget --user-agent"User-Agent: Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.12) Gecko/20101026 Firefox/3.6.12…...
cuquantum 简介
1. 关于 cuquantum 概述 官方文档: https://docs.nvidia.com/cuda/cuquantum/latest/appliance/overview.html#prerequisites NVIDIA 的 cuQuantum 是一个专门用于量子计算的高性能库,旨在加速量子电路的模拟和量子算法的执行。cuQuantum 提供了一系列…...
小程序如何引入腾讯位置服务
小程序如何引入腾讯位置服务 1.添加服务 登录 微信公众平台 注意:小程序要企业版的 第三方服务 -> 服务 -> 开发者资源 -> 开通腾讯位置服务 在设置 -> 第三方设置 中可以看到开通的服务,如果没有就在插件管理中添加插件 2.腾讯位置服务…...
【react】使用antd Table渲染数据遇到的报错问题
记录自己在开发过程中遇到的报错问题: 目录 原本写法:错误分析:解决方案: 原本写法: render: (text) > {console.log(text, "111111text");console.log(typeof text, "111111text");return t…...
55_OpenResty开发入门
Nginx编程需要用到Lua语言,因此我们必须先学习Lua的基本语法。Nginx本身也是C语言开发,因此也允许基于Lua做拓展。多级缓存的实现离不开Nginx编程,而Nginx编程又离不开OpenResty。 1.OpenResty概述 OpenResty是一款基于NGINX和LuaJIT的Web平台。通过Lua扩展NGINX实现的可伸…...
(即插即用模块-Attention部分) 四十四、(ICIP 2022) HWA 半小波注意力
文章目录 1、Half Wavelet Attention2、代码实现 paper:HALFWAVELET ATTENTION ON M-NET FOR LOW-LIGHT IMAGE ENHANCEMENT Code:https://github.com/FanChiMao/HWMNet 1、Half Wavelet Attention 传统的图像增强方法主要关注图像在空间域的特征信息&am…...
链家房价数据爬虫和机器学习数据可视化预测
完整源码项目包获取→点击文章末尾名片!...
全网首发:编译libssh,产生类似undefined reference to `EVP_aes_256_ctr@OPENSSL_1_1_0‘的大量错误
具体错误 前面和后面的: /opt/linux/x86-arm/aarch64-mix210-linux/host_bin/../lib/gcc/aarch64-linux-gnu/7.3.0/../../../../aarch64-linux-gnu/bin/ld: warning: libcrypto.so.1.1, needed by ../lib/libssh.so.4.10.1, not found (try using -rpath or -rpat…...
springboot 集成javaFx 两个面板之间如何进行跳转
1.创建两个面板 可参考博主的 java8 springboot 集成javaFx 实现一个客户端程序 文章来实现 2.完善代码 以下是博主创建的两个模板 博主在这里实现的是登录跳转功能,注意:这里登录按钮的触发实现方式做了以下小小的改动,也可根据自己的习惯来处理 相较第一篇文章,博主在Lo…...
vue-cli项目配置使用unocss
在了解使用了Unocss后,就完全被它迷住了。接手过的所有项目都配置使用了它,包括一些旧项目,也跟同事分享了使用Unocss的便捷性。 这里分享一下旧项目如何配置和使用Unocss的,项目是vue2vue-cli构建的,node<20平常开…...
ASP.NET Core - IStartupFilter 与 IHostingStartup
ASP.NET Core - IStartupFilter 与 IHostingStartup 1. IStartupFilter2 IHostingStartup2.5.1 创建外部程序集2.5.2 激活外部程序集 1. IStartupFilter 上面讲到的方式虽然能够根据不同环境将Startup中的启动逻辑进行分离,但是有些时候我们还会可以根据应用中的功能…...
学习ASP.NET Core的身份认证(基于JwtBearer的身份认证5)
用户在前端页面登录成功后会从服务端获取Token,后续调用服务器的服务接口时都得带着Token,否则就会验证失败。之前使用postman测试的时候,获取Token后再调用其它服务都是人工将Token添加到Header中,网页中没法这么做,只…...
【Vue】let、const、var的区别、适用场景
let、const、var,有哪些区别,适用场景 var 特点: var 是 JavaScript 中最传统的变量声明方式。具有函数作用域,即在函数内声明的 var 变量,在整个函数内都可以访问。变量提升:使用 var 声明的变量会被提升…...
【llama_factory】qwen2_vl训练与批量推理
训练llama factory配置文件 文件:examples/train_lora/qwen2vl_lora_sft.yaml ### model model_name_or_path: qwen2_vl/model_72b trust_remote_code: true### method stage: sft do_train: true finetuning_type: lora lora_target: all### dataset dataset: ca…...
计算机视觉与深度学习:使用深度学习训练基于视觉的车辆检测器(MATLAB源码-Faster R-CNN)
在人工智能领域,计算机视觉是一个重要且充满活力的研究方向。它使计算机能够理解和分析图像和视频数据,从而做出有意义的决策。其中,目标检测是计算机视觉中的一项关键技术,它旨在识别并定位图像中的多个目标对象。车辆检测作为目标检测的一个重要应用,在自动驾驶、智能交…...
Python 扫描枪读取发票数据导入Excel
财务需要一个扫描枪扫描发票文件,并将主要信息录入Excel 的功能。 文件中sheet表的列名称,依次为:发票编号、发票编码、日期、金额、工号、扫描日期。 扫描的时候,Excel 文件需要关闭,否则会报错。 import openpyxl …...
SpringMVC复习笔记
文章目录 SpringMVC 概念和基本使用SpringMVC 简介SpringMVC 核心组件和调用流程SpringMVC 基本使用第一步:导入依赖第二步:Controller 层开发第三步:SpringMVC 配置类配置核心组件第四步:SpringMVC 环境搭建第五步:部…...
arcgis提取不规则栅格数据的矢量边界
效果 1、准备数据 栅格数据:dem或者dsm 2、栅格重分类 分成两类即可 3、新建线面图层 在目录下选择预先准备好的文件夹,点击右键,选择“新建”→“Shapefile”,新建一个Shapefile文件。 在弹出的“新建Shapefile”对话框内“名称”命名为“折线”,“要素类型”选…...
【机器学习】零售行业的智慧升级:机器学习驱动的精准营销与库存管理
我的个人主页 我的领域:人工智能篇,希望能帮助到大家!!!👍点赞 收藏❤ 在当今数字化浪潮汹涌澎湃的时代,零售行业正站在转型升级的十字路口。市场竞争的白热化使得企业必须另辟蹊径࿰…...
链路追踪SkyWalking
链路追踪 链路追踪作用链路追踪的关键概念链路追踪的工作原理常用链路追踪工具链路追踪的实现步骤链路追踪的典型场景 SkyWalkingSkyWalking 的主要功能SkyWalking 的架构安装 SkyWalking从 SkyWalking 的官方 GitHub 仓库 下载最新版本。配置后端存储SkyWalking使用࿰…...
linux下的线程
一、pthread 线程 线程可以说是轻量级的进程,一般是一个进程中的多个任务。 进程:系统中的最小资源分配单元 线程:系统中最小执行单元 二、线程的特征 1、共享资源 2、效率高30% 3.使用第三方库(头文件加pthread.h 编译时添加 -lpthre…...
《研发管理 APQP 软件系统》——汽车电子行业的应用收益分析
全星研发管理 APQP 软件系统在汽车电子行业的应用收益分析 在汽车电子行业,技术革新迅猛,市场竞争激烈。《全星研发管理 APQP 软件系统》的应用,为企业带来了革命性的变化,诸多收益使其成为行业发展的关键驱动力。 《全星研发管理…...
mysql、oracle、sqlserver的区别
一、保存数据的持久性: MySQL:是在数据库更新或者重启,则会丢失数据。 Oracle:把提交的sql操作线写入了在线联机日志文件中,保持到了磁盘上,可以随时恢复。 SqlServer:2…...
CV(10)--目标检测
前言 仅记录学习过程,有问题欢迎讨论 目标检测 object detection,就是在给定的图片中精确找到物体所在位置,并标注出物体的类别;输出的是分类类别label物体的外框(x, y, width, height)。 目标检测算法:…...
SQL LAST()
SQL中的LAST()函数是一个用于返回指定列中最后一个记录值的函数。然而,需要注意的是,这个函数并不是SQL标准的一部分,因此并不是所有数据库系统都支持它。具体来说,只有MS Access直接支持LAST()函数【0†source】。 在其他数据库…...
传统以太网问题与VLAN技术详解
传统以太网的问题 广播域:在网络中能接收同一广播信息的所有设备(计算机、交换机)等的集合 说明:在一个广播域内,当一个设备发送广播帧时,该域内的所有设备都能接收到这个广播帧。工作原理:在以…...
Java 面试题 - ArrayList 和 LinkedList 的区别,哪个集合是线程安全的?
Java 面试题 - ArrayList 和 LinkedList 的区别,哪个集合是线程安全的? 在 Java 开发中,ArrayList和LinkedList是两个常用的集合类,它们在数据结构和性能上有诸多不同,同时线程安全性也各有特点。深入理解这些差异&am…...
flutter 安卓端打包
在 Flutter 中打包 Android 应用程序是一个相对简单的过程。你可以使用 Flutter 的命令行工具来构建并打包你的 APK 或 AAB(Android App Bundle)。以下是打包 Flutter Android 应用的步骤: 1. 安装 Flutter 环境 确保你已经安装了 Flutter SDK,并且正确配置了 Android 开…...
前端开发:CSS背景属性
1.背景颜色 background-color: [ 指定颜色 ] background-color :blue; background-color : transparent //设置背景是透明的 2.背景图片 background-image : url ( ... ) 1. url 不要遗漏 . 2. url 可以是绝对路径 , 也可以是相对路径 3. url 上可以…...
【Python通过UDP协议传输视频数据】(界面识别)
提示:界面识别项目 前言 随着网络通信技术的发展,视频数据的实时传输在各种场景中得到了广泛应用。UDP(User Datagram Protocol)作为一种无连接的协议,凭借其低延迟、高效率的特性,在实时性要求较高的视频…...
centos 8 中安装Docker
注:本次样式安装使用的是centos8 操作系统。 1、镜像下载 具体的镜像下载地址各位可以去官网下载,选择适合你们的下载即可! 1、CentOS官方下载地址:https://vault.centos.org/ 2、阿里云开源镜像站下载:centos安装包…...
leetcode hot 100 -划分字母区间
给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段,同一字母最多出现在一个片段中。 注意,划分结果需要满足:将所有划分结果按顺序连接,得到的字符串仍然是 s 。 返回一个表示每个字符串片段的长度的列表。 示例 1&am…...
CSS 元素的显示模式(块元素,行内元素,行内块元素)
一. 块元素(block) 又称:块级元素 特点: 1. 在页面中独占一行,不会与任何元素共用一行,是从上到下排列的。 2. 默认宽度:撑满父元素。 3. 默认高度:由内容撑开。 4. 可以通过 CSS 设…...
鸿蒙UI开发——键盘弹出避让模式设置
1、概 述 我们在鸿蒙开发时,不免会遇到用户输入场景,当用户准备输入时,会涉及到输入法的弹出,我们的界面针对输入法的弹出有两种避让模式:上抬模式、压缩模式。 下面针对输入法的两种避让模式的设置做简单介绍。 2、…...
Multi-Agent如何设计
文章小结 研究背景和目的 在单一大语言模型长期主导人工智能领域的背景下,多智能体系统在对话任务解决中逐渐崭露头角。 虽然先前的研究已经展示了多智能体系统在推理任务和创造性工作中的潜力,但对于其在对话范式方面的局限性以及单个智能体的影响&am…...
git操作(bitbucket仓库)
在代码远程版本控制和提交过程中需要经常使用git命令,熟练使用git是一个软件工程师必备的技能之一。 将主版本代码fork到自己的 bitbucket 子仓库中 克隆到本地 利用ssh链接进行克隆,将 fork 的子仓库克隆到本地。 git clone ssh://{$你fork的子bitbu…...
【MySQL | 三、 表的约束介绍】
举例表结构 字段名称字段类型是否NULL键值默认值附加信息FieldTypeNullKeyDefaultExtraidintNoUNI(唯一键)Nullauto_incrementnameverchar(10)NoPRI(主键)NullgenderenumNomanagetinyintYesNulltelphoneintYesNullhome_idintyesMUL(外键)Null 目录 举例表结构1. 空…...
音频DSP的发展历史
音频数字信号处理(DSP)的发展历史是电子技术、计算机科学和音频工程共同进步的结果。这个领域的进展不仅改变了音乐制作、音频后期制作和通信的方式,也影响了音频设备的设计和功能。以下是对音频DSP发展历史的概述: 早期概念和理论…...
学习笔记-Kotlin
准备工作 下载安装kotlin编译器,记录此笔记时的版本是v1.2.10,目前最新发布版本是v2.1.10解压下载的安装包,配置环境变量指向bin目录(官方指导). 如果不想在本地安装环境,也可以使用在线编辑器 尝试编写第一个helloWorld 新建一个名为hello.kt的文件,内容如下: f…...