day09_实时类标签/指标
文章目录
- day09_实时类标签/指标
- 一、日志数据实时采集
- 2、Flume简介
- 2.3 项目日志数据采集Flume配置
- 2.3.1 涉及的Flume组件和参数
- 2.3.2 Nginx日志采集
- 2.3.3 用户行为日志采集
- 二、Nginx日志数据统计
- 1、日志格式说明
- 2、数据ETL
- 2.1 日志抽取
- 2.1.1 正则表达式
- 2.1.2 基于Spark实现Nginx数据匹配
- 2.2 字段解析
- 2.2.1 日期格式转换
- 2.2.2 IP解析地理位置(了解)
- 2.2.3 UA解析
- 2.3 完整代码
- 2.4 使用Hive读取HDFS数据
- 3、指标统计
- 1、尝试进行用户行为日志的数据ETL、指标统计
day09_实时类标签/指标
一、日志数据实时采集
2、Flume简介
2.3 项目日志数据采集Flume配置
zookeeper、Kafka的启动命令
启动zookeeper(没有启动的,才需要执行)
/export/server/zookeeper/bin/zkServer.sh start启动Kafka
cd /export/server/kafka/bin
nohup ./kafka-server-start.sh ../config/server.sql 2>&1 &Kafka其他的相关命令
cd /export/server/kafka/bin
查看当前集群有哪些Topic
./kafka-topics.sh --list --bootstrap-server up01:9092
新建Topic(分区数没要求,副本数<=broker节点个数)
./kafka-topics.sh --create --bootstrap-server up01:9092 --topic xtzg_nginx_log
参看Topic的详细信息
./kafka-topics.sh --describe --bootstrap-server up01:9092 --topic xtzg_nginx_log注意: 要提前创建好Kafka的Topic
2.3.1 涉及的Flume组件和参数
- source
type: 类型,固定值TAILDIR。能同时监控一个目录或者多个文件,也能动态监控每个文件的变化,还支持断点续传,不会出现重复消费问题。
fiilegroups: 以空格分隔的文件组列表。每个文件组表示一组要跟踪的文件。
filegroups.<filegroupName>: 文件组的绝对路径。正则表达式(而不是文件系统模式)只能用于文件名。
positionFile: JSON格式的文件,记录每个文件的inode、绝对路径和最后位置。注意: type的TAILDIR大小写不能随便写
- channel
type: 类型,固定值 org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers: Kafka集群中的broker列表。格式:hostname:port,多个用逗号隔开。
kafka.topic: channel要用的topic
parseAsFlumeEvent: 是否需要对采集到的数据解析为Event对象,然后在内容前面增加topic前缀,会导致后续的内容会有部分缺失的情况。一般是false
补充:
如果采集到的数据最终想要输出到Kafka中,可以直接选择使用Kafka Channel。
注意: Kafka Channel和Kafka Sink,虽然都是将数据输出到Kafka中,但是两者的配置参数有区别
2.3.2 Nginx日志采集
- 创建nginx_to_kafka.conf文件
- nginx_to_kafka.conf配置文件内容如下
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/data/workspace/user_profile/log_generate/datacollection/source_data/access-nginx.*
a1.sources.r1.positionFile = /export/data/flume/nginx_position.json#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = up01:9092
a1.channels.c1.kafka.topic = xtzg_nginx_log
a1.channels.c1.parseAsFlumeEvent = false#组装
a1.sources.r1.channels = c1注意: 1- a1.sources.r1.filegroups.f1该参数值要改成你自己的路径2- 文件的模糊匹配的正则表达式中写的是.*表示匹配任意内容
将上面的配置文件复制到/export/server/flume/conf
cp /export/data/workspace/user_profile/scripts/flume/nginx_to_kafka.conf /export/server/flume/conf
- 在Kafka上创建topic(前提开启zk,kafka)
cd /export/server/kafka/bin./kafka-topics.sh --create --bootstrap-server up01:9092 --topic xtzg_nginx_log
- 启动Flume
cd /export/server/flumebin/flume-ng agent -n a1 -c conf/ -f conf/nginx_to_kafka.conf
- 查看Kafka中的数据
cd /export/server/kafka/bin./kafka-console-consumer.sh --bootstrap-server up01:9092 --topic xtzg_nginx_log
- 启动
运行python中的NginxLogSimulationData.py。查看kafka中数据变化,如果看到新增数据则配置成功。确认无误后关停Flume采集任务。
2.3.3 用户行为日志采集
-
创建user_event_to_kafka.conf文件
-
user_event_to_kafka.conf配置文件内容如下
#定义组件
a1.sources = r1
a1.channels = c1#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/data/workspace/user_profile/log_generate/datacollection/source_data/user-event.*
a1.sources.r1.positionFile = /export/data/flume/user_event_position.json#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = up01:9092
a1.channels.c1.kafka.topic = xtzg_user_event
a1.channels.c1.parseAsFlumeEvent = false#组装
a1.sources.r1.channels = c1
- 在Kafka上创建topic(前提开启zk,kafka)
cd /export/server/kafkabin/kafka-topics.sh --create --bootstrap-server up01:9092 --topic xtzg_user_event --partitions 1 --replication-factor 1
- 启动Flume
cd /export/server/flumebin/flume-ng agent -n a1 -c conf/ -f conf/user_event_to_kafka.conf
- 查看Kafka中的数据
cd /export/server/kafkabin/kafka-console-consumer.sh --bootstrap-server up01:9092 --from-beginning --topic xtzg_user_event
- 启动
运行python中的EventSimulationJsonData.py。查看kafka中数据变化,如果看到新增数据则配置成功。确认无误后关停Flume采集任务。
二、Nginx日志数据统计
1、日志格式说明
Nginx(发音 恩几可使)是异步框架的网页服务器,也可以用作反向代理、负载平衡器和HTTP缓存。该软件由俄罗斯程序员伊戈尔·赛索耶夫(Игорь Сысоев)开发并于2004年首次公开发布
- Nginx日志包含access_log和error_log两种类型日志数据。项目中分析的数据为:access_log
- Nginx开源官网:https://nginx.org/
- 项目采集Nginx数据格式。以下为一条Nginx日志:
116.85.48.25 - - [12/Nov/2024:11:36:46 +0800] "GET /login.html HTTP/1.1" 404 729 "https://xtx.itcast.cn/referAFriend.html" "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/7.0.18(0x17001233) NetType/WIFI Language/zh_CN" "-"Nginx日志格式说明:116.85.48.25: 用户访问IP地址- - : 用户标识(cookie信息)[14/Jul/2022:17:40:41 +0800]: 访问时间 + 时区GET : 请求方式/css/40.30d6d2b.css: 请求资源HTTP/1.1 : 请求的协议500 : 请求的状态码 (500 服务器错误, 200 成功 302 重定向 404 访问到未知资源)951 : 响应返回的字节大小"https://www.htv.com/official/component?WT.mc_id=3" : 来源的URL(从那个地方跳转到此页面)"Mozilla/5......: 浏览器标识
2、数据ETL
2.1 日志抽取
2.1.1 正则表达式
Java版本:
(?<ip>\d+\.\d+\.\d+\.\d+) (- - \[)(?<datetime>[\s\S]+)(?<t1>\][\s"]+)(?<request>[A-Z]+) (?<url>[\S]*) (?<protocol>[\S]+)["] (?<code>\d+) (?<sendbytes>\d+) ["](?<refferer>[\S]*) ["](?<useragent>[\S\s]+)["] ["](?<proxyaddr>[\S\s]+)["]Python版本:
(?P<ip>.*?) - - \[(?P<time>.*?)\] "(?P<request>.*?)" (?P<status>.*?) (?P<bytes>.*?) "(?P<referer>.*?)" "(?P<ua>.*?)" "(?P<proxy_address>.*)"
2.1.2 基于Spark实现Nginx数据匹配
代码实现:
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName("nginx_etl")\.master("local[*]")\.config("spark.sql.shuffle.partitions",2)\.getOrCreate()# 2- 数据输入:读取Kafka中的数据""""startingOffsets","earliest":该配置,在实际工作中一般不需要配置。这里是为了开发代码方便"""init_df = spark.readStream.format("kafka")\.option("kafka.bootstrap.servers","192.168.88.166:9092")\.option("subscribe","xtzg_nginx_log")\.option("startingOffsets","earliest")\.load()# 结构化流中不能以show()方式打印数据数据内容# init_df.show()# 3- 数据ETL处理# 3.1- value字段解码的操作"""cast(StringType()):将字段数据类型强制转换为字符串。等同于SQL语句中的cast(value as string)下面两种方式都可以,推荐使用第一种,因为性能更好"""# type_cast_df = init_df.select(init_df.value.cast(StringType()).alias("value"))type_cast_df = init_df.selectExpr("cast(value as string) as value")# 3.2- 通过正则表达式提取Nginx的字段pattern = '(?<ip>\d+\.\d+\.\d+\.\d+) (- - \[)(?<datetime>[\s\S]+)(?<t1>\][\s"]+)(?<request>[A-Z]+) (?<url>[\S]*) (?<protocol>[\S]+)["] (?<code>\d+) (?<sendbytes>\d+) ["](?<refferer>[\S]*) ["](?<useragent>[\S\s]+)["] ["](?<proxyaddr>[\S\s]+)["]'regexp_df = type_cast_df.select(F.regexp_extract("value",pattern,1).alias("ip"),F.regexp_extract("value",pattern,3).alias("datetime"),F.regexp_extract("value",pattern,4).alias("t1"),F.regexp_extract("value",pattern,5).alias("request"),F.regexp_extract("value",pattern,6).alias("url"),F.regexp_extract("value",pattern,7).alias("protocol"),F.regexp_extract("value",pattern,8).alias("code"),F.regexp_extract("value",pattern,9).alias("sendbytes"),F.regexp_extract("value",pattern,10).alias("refferer"),F.regexp_extract("value",pattern,11).alias("useragent"),F.regexp_extract("value",pattern,12).alias("proxyaddr"))# 4- 数据输出,启动流式任务regexp_df.writeStream.format("console").outputMode("append").start().awaitTermination()
运行结果截图:
可能遇到的错误:
原因: regexp_extract函数只能传递Java版的正则表达式,不能用Python的
2.2 字段解析
需求:根据nginx日志,ip标识唯一的用户,需要ip分组,统计得到用户访问的pv、uv、区域、状态码、终端设备的操作系统、设备品牌、浏览器、访问时间(年-月-日 时:分:秒)
2.2.1 日期格式转换
Python的datetime函数库
- 相关函数:
- strftime(): 把日期对象转成指定的时间格式的字符串
- strptime(): 把指定格式的日期字符串转换为日期对象
- 参考文档: https://docs.python.org/zh-cn/3/library/datetime.html#strftime-strptime-behavior
- 解析格式: %d/%b/%Y:%H:%M:%S %z => %Y-%m-%d %H:%M:%S
- 28/Jul/2022:16:22:07 +0800 => 日期对象 => 2022-07-28 16:22:07
- 测试代码
Python方式
from datetime import datetimeif __name__ == '__main__':date_str = "11/Feb/2025:14:34:49 +0800"print(datetime.strptime(date_str, "%d/%b/%Y:%H:%M:%S %z").strftime("%Y-%m-%d %H:%M:%S"))
SparkSQL方式(重点掌握)
regexp_df.withColumn("datetime",F.from_unixtime(F.unix_timestamp("datetime","dd/MMM/yyyy:HH:mm:ss Z"),"yyyy-MM-dd HH:mm:ss"))
2.2.2 IP解析地理位置(了解)
根据IP解析地理位置
- 方式一: 使用ip解析地理位置API
- ip地址:http://opendata.baidu.com/api.php?query=117.136.12.79&co=&resource_id=6006&oe=utf8
- 像百度地图开发平台 / 高德地图开放平台 … 都会提供IP解析的服务接口
- 百度地图:https://lbs.baidu.com/faq/api?title=webapi/ip-api-base
- 高德地图:https://lbs.amap.com/api/webservice/guide/api/ipconfig
- 其他平台:https://www.nowapi.com/
- 方式二: (了解)使用geo_ip依赖包和GeoLite2-City.mmdb库
- 依赖包:geoip2~=4.5.0
- 下载地址:https://gitcode.com/crownp/geolite2_demo/blob/master/src/main/resources/GeoLite2-City.mmdb
- IP在线解析测试代码
Python的Requests库的介绍:https://requests.readthedocs.io/en/latest/
#!/usr/bin/env python
# @desc :
__coding__ = "utf-8"
__author__ = "bytedance"import requestsdef parse_ip(ip_str):params = {"query": ip_str,"co": "","resource_id": "6006","oe": "utf8",}# 发送请求response = requests.get(url="https://opendata.baidu.com/api.php", params=params)# 解析响应内容result = response.json()status = result['status']if status == '0':# 正常try:return result['data'][0]['location'].split(" ")[0]except:return "未知区域"else:return "未知区域"if __name__ == '__main__':ip_str = "127.0.0.1"ip_str = "10.254.1.97"ip_str = "157.148.69.76"area = parse_ip(ip_str)print(area)
2.2.3 UA解析
UA说明
- UA为useragent简称,特指用户访问系统使用的客户端信息,一般包含操作系统,浏览器,设备品牌信息等
- UA字符串信息:http://useragentstring.com/
- 使用,需导入UA解析依赖包:from user_agents import parse
- UA的作用
- 1.客户端识别:通过User-Agent,服务器能够识别客户端的类型和版本,从而提供相应的内容和服务。比如,在移动设备上展示适合屏幕大小的网页布局,或在不同浏览器上提供兼容性优化。
- 2.统计分析:网站和应用开发者可以利用User-Agent来收集客户端的信息,进行用户行为分析和统计。这有助于了解用户使用的设备和偏好,以便进行产品和服务的改进。
- 3.安全性:User-Agent也可以用于安全验证和防止恶意行为。通过分析User-Agent,服务器可以检测到异常或伪造的请求,并采取相应的安全措施。
- 测试代码:
from user_agents import parseif __name__ == '__main__':ua_str = "Mozilla/5.0 (iPhone; CPU iPhone OS 13_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0 MQQBrowser/11.0.7 Mobile/15E148 Safari/604.1 QBWebViewUA/2 QBWebViewType/1 WKType/1"result = parse(ua_str)# os操作系统信息print("os----------")print(result.os.family)print(result.os.version)print(result.os.version_string)# brower浏览器信息print("browser----------")print(result.browser.family)print(result.browser.version)print(result.browser.version_string)# device设备信息print("device----------")print(result.device.family)print(result.device.model)
2.3 完整代码
需要将结果数据同时写入到Kafka和HDFS。清洗后的日志,可以用于其他业务分析,具有一定的价值。因为Kafka不能永久保存数据,所以需要把数据存储到HDFS一份。
因为每天都有很多日志,所以需要对日志进行分区。可以通过partitionBy()方法进行分区写入到HDFS。分区的字段需要进行计算。
另外,为了减少小文件生成,可以使用trigger来指定写入的时间间隔。
- 先创建Kafka的Topic
cd /export/server/kafka/bin
./kafka-topics.sh --create --bootstrap-server up01:9092 --topic dwd_nginx_etl_result
- 完整代码
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, MapType
import requests
from user_agents import parseos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName("nginx_etl")\.master("local[*]")\.config("spark.sql.shuffle.partitions",2)\.getOrCreate()# 配置checkpointLocation路径,推荐使用HDFS路径spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://192.168.88.166:8020/xtzg/chk")# 2- 数据输入:读取Kafka中的数据""""startingOffsets","earliest":该配置,在实际工作中一般不需要配置。这里是为了开发代码方便"""init_df = spark.readStream.format("kafka")\.option("kafka.bootstrap.servers","192.168.88.166:9092")\.option("subscribe","xtzg_nginx_log")\.option("startingOffsets","earliest")\.load()# 结构化流中不能以show()方式打印数据数据内容# init_df.show()# 3- 数据ETL处理# 3.1- value字段解码的操作"""cast(StringType()):将字段数据类型强制转换为字符串。等同于SQL语句中的cast(value as string)下面两种方式都可以,推荐使用第一种,因为性能更好"""# type_cast_df = init_df.select(init_df.value.cast(StringType()).alias("value"))type_cast_df = init_df.selectExpr("cast(value as string) as value")# 3.2- 通过正则表达式提取Nginx的字段pattern = '(?<ip>\d+\.\d+\.\d+\.\d+) (- - \[)(?<datetime>[\s\S]+)(?<t1>\][\s"]+)(?<request>[A-Z]+) (?<url>[\S]*) (?<protocol>[\S]+)["] (?<code>\d+) (?<sendbytes>\d+) ["](?<refferer>[\S]*) ["](?<useragent>[\S\s]+)["] ["](?<proxyaddr>[\S\s]+)["]'# 这里不允许使用Python正则表达式,只能使用Java正则表达式# pattern = '(?P<ip>.*?) - - \[(?P<time>.*?)\] "(?P<request>.*?)" (?P<status>.*?) (?P<bytes>.*?) "(?P<referer>.*?)" "(?P<ua>.*?)" "(?P<proxy_address>.*)"'regexp_df = type_cast_df.select(F.regexp_extract("value",pattern,1).alias("ip"),F.regexp_extract("value",pattern,3).alias("datetime"),F.regexp_extract("value",pattern,4).alias("t1"),F.regexp_extract("value",pattern,5).alias("request"),F.regexp_extract("value",pattern,6).alias("url"),F.regexp_extract("value",pattern,7).alias("protocol"),F.regexp_extract("value",pattern,8).alias("code"),F.regexp_extract("value",pattern,9).alias("sendbytes"),F.regexp_extract("value",pattern,10).alias("refferer"),F.regexp_extract("value",pattern,11).alias("useragent"),F.regexp_extract("value",pattern,12).alias("proxyaddr"))# 3.3- 日期时间格式转换datetime_df = regexp_df.withColumn("datetime",F.from_unixtime(F.unix_timestamp("datetime","dd/MMM/yyyy:HH:mm:ss Z"),"yyyy-MM-dd HH:mm:ss"))# 3.4- IP地理位置解析@F.udf(returnType=StringType())def parse_ip(ip_str):params = {"query": ip_str,"co": "","resource_id": "6006","oe": "utf8",}# 发送请求response = requests.get(url="https://opendata.baidu.com/api.php", params=params)# 解析响应内容result = response.json()status = result['status']if status == '0':# 正常try:return result['data'][0]['location'].split(" ")[0]except:return "未知区域"else:return "未知区域"area_df = datetime_df.withColumn("area",parse_ip("ip"))# 3.5- UA解析"""为什么这里用户自定义函数推荐返回字典?方便后续取值"""@F.udf(returnType=MapType(keyType=StringType(),valueType=StringType()))def parse_ua(ua_str):result = parse(ua_str)os = result.os.familybrowser = result.browser.familydevice = result.device.modelreturn {"os":os,"browser":browser,"device":device}ua_df = area_df.withColumn("os",parse_ua("useragent")['os'])\.withColumn("browser", parse_ua("useragent")['browser'])\.withColumn("device", parse_ua("useragent")['device'])# 4- 数据输出,启动流式任务# 4.1- 输出到HDFS# 新增一个分区字段dt_df = ua_df.withColumn("dt",F.split("datetime"," ")[0])# partitionBy表示按照哪个字段进行分区dt_df.writeStream.format("orc").partitionBy("dt")\.option("path","hdfs://192.168.88.166:8020/xtzg/etl/dwd_nginx_etl_result")\.start()# 4.2- 输出到Kafka# 注意:一般将数据内容转换为JSON格式输出到Kafka中,为了后续使用方便# 注意:输出到Kafka中的字段名称只能叫valuekafka_df = ua_df.select(F.to_json(F.struct("ip","datetime","t1","request","url","protocol","code","sendbytes","refferer","useragent","proxyaddr","area","os","browser","device")).alias("value"))kafka_df.writeStream.format("kafka")\.option("kafka.bootstrap.servers","192.168.88.166:9092")\.option("topic","dwd_nginx_etl_result")\.start()# 4.3- 输出到控制台(为了测试)# awaitTermination()只能加在最后一个start()的后面dt_df.writeStream.format("console").outputMode("append").start().awaitTermination()
可能遇到的错误一:
原因: 结构化流中将数据输出到文件系统中,需要配置checkpointLocation
可能遇到的错误二:
原因: 输出到Kafka中的字段名称只能叫value
2.4 使用Hive读取HDFS数据
- 创建表
CREATE external TABLE dwd.dwd_nginx_etl_result (ip string,`datetime` string,t1 string,request string,url string,protocol string,code string,sendbytes string,refferer string,useragent string,proxyaddr string,area string,os string,browser string,device string
)COMMENT 'nginx日志'PARTITIONED BY (dt string)STORED AS ORCLOCATION '/xtzg/etl/dwd_nginx_etl_result'TBLsql ('orc.compress' = 'SNAPPY')
;
- 同步分区
MSCK REPAIR TABLE dwd.dwd_nginx_etl_result;
3、指标统计
- 需求
统计实时请求总数(pv)
统计用户数(uv)
统计用户访问所在区域省(类似抖音的位置显示)
统计用户响应状态码
统计用户使用设备终端信息
统计用户操作系统信息
统计用户首次访问系统的时间
统计用户末次访问系统的时间ip: 用户访问系统的唯一地址
pv:访问系统的页面次数
uv:访问系统的用户数
area:访问系统用户来自的区域,根据ip解析出地址位置
status_code:访问系统用户请求http协议响应状态码
device_os:设备终端,从ua中提取手机或电脑的系统
device_brand:设备品牌名称,从ua中提取手机或电脑的品牌
browser_name:访问系统用户使用的浏览器名称
first_access_time:用户首次访问系统的时间
last_access_time:用户首次访问系统的时间
- Doris建表语句
使用unique模型。
CREATE DATABASE IF NOT EXISTS log_analysis_db;
CREATE TABLE IF NOT EXISTS log_analysis_db.nginx_log_result
(ip varchar(15) comment 'ip地址',pv int comment 'pv数',uv int comment 'uv数',area varchar(50) comment '用户所在区域,根据ip解析',status_code varchar(10) comment '请求响应状态码',device_os varchar(50) comment '设备系统,从ua中提取手机或电脑使用的系统',device_brand varchar(50) comment ',从ua中提取手机或电脑的品牌',browser_name varchar(50) comment '电脑和手机,使用浏览器,记录浏览器简称',first_access_time datetime comment 'nginx日志记录首次访问时间',last_access_time datetime comment 'nginx日志记录末次访问时间'
)
UNIQUE KEY(ip)
DISTRIBUTED BY HASH(ip) BUCKETS 10
sql("replication_num" = "1");
- 完整代码
from pyspark.sql import SparkSession, DataFrame
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder \.appName("nginx_analysis") \.master("local[*]") \.config("spark.sql.shuffle.partitions", 2) \.getOrCreate()# 2- 数据输入:读取Kafka中的数据init_df = spark.readStream.format("kafka") \.option("kafka.bootstrap.servers", "192.168.88.166:9092") \.option("subscribe", "dwd_nginx_etl_result") \.option("startingOffsets", "earliest") \.load()# 3- 数据处理# 3.1- value字段类型转换type_cast_df = init_df.select(init_df.value.cast(StringType()).alias("value"))# 3.2- 从JSON中提取一个个字段"""json_tuple与get_json_object的区别get_json_object:优点:同时能够解析嵌套的JSON缺点:一次只能得到一个字段json_tuple:优点:一次能得到多个字段缺点:针对嵌套JSON,只能一层层解析"""parse_json_df = type_cast_df.select(F.json_tuple("value","ip","datetime","code","area","os","browser","device")\.alias("ip","datetime","status_code","area","device_os","browser_name","device_brand"))# 3.3- 指标统计# F.lit(1)生成一列,每行的数据内容一样,全都是1。与F.col函数作用类似# 因为类似area的这些字段的数据类型是字符串,聚合函数没有太适合的,因此使用firstresult_df = parse_json_df.groupBy("ip").agg(F.count("ip").alias("pv"),F.lit(1).alias("uv"),F.first("area").alias("area"),F.first("status_code").alias("status_code"),F.first("device_os").alias("device_os"),F.first("device_brand").alias("device_brand"),F.first("browser_name").alias("browser_name"),F.min("datetime").alias("first_access_time"),F.max("datetime").alias("last_access_time"))# 4- 数据输出# 4.1- 输出到Dorisdef write_2_doris(batch_df:DataFrame, batch_id):"""将DataFrame输出到Doris中:param batch_df: 有界的DataFrame:param batch_id: 批次ID:return:"""# 注意:一般先用append。如果明确知道要怎么做,那可以再使用overwritebatch_df.write.jdbc(url="jdbc:mysql://192.168.88.166:9030/log_analysis_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",table="nginx_log_result",mode="append",sql={ 'user' : 'root', 'password' : '123456' })result_df.writeStream.foreachBatch(write_2_doris).outputMode("update").start()# 4.2- 输出到控制台result_df.writeStream.format("console").outputMode("update").start().awaitTermination()
- 结果数据核对
./kafka-console-producer.sh --broker-list up01:9092 --topic dwd_nginx_etl_result
{"ip":"210.27.147.62","cookie":"- - [","datetime":"2024-11-14 11:11:11","t1":"] \"","request":"GET","url":"/search.html","protocol":"HTTP/1.1","code":"401","sendbytes":"58840","refferer":"https://www.douyin.com/goods-recommend/search.html?keyword=美味\"","useragent":"Mozilla/5.0 (Linux; U; Android 9; zh-CN; MI 9 Build/PKQ1.181121.001) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 UCBrowser/13.1.6.1096 Mobile Safari/537.36","proxyaddr":"-","area":"广东省广州市","os":"Android","browser":"UC Browser","device":"XiaoMi MI 9","dt":"2024-11-12"}
1、尝试进行用户行为日志的数据ETL、指标统计
提示:核心是如何解析JSON格式,得到一个个独立的字段
相关文章:
day09_实时类标签/指标
文章目录 day09_实时类标签/指标一、日志数据实时采集2、Flume简介2.3 项目日志数据采集Flume配置2.3.1 涉及的Flume组件和参数2.3.2 Nginx日志采集2.3.3 用户行为日志采集 二、Nginx日志数据统计1、日志格式说明2、数据ETL2.1 日志抽取2.1.1 正则表达式2.1.2 基于Spark实现Ngi…...
AWTK-WEB 快速入门(4) - JS Http 应用程序
XMLHttpRequest 改变了 Web 应用程序与服务器交换数据的方式,fetch 是 XMLHttpRequest 继任者,具有更简洁的语法和更好的 Promise 集成。本文介绍一下如何使用 JS 语言开发 AWTK-WEB 应用程序,并用 fetch 访问远程数据。 用 AWTK Designer 新…...
github不翻墙就可以访问
目录 简介资料准备windows平台设置下载运行git设置firefox设置 ubuntu平台设置下载启动服务设置系统代理git设置firefox设置证书 注意事项 简介 由于github访问不稳定,严重影响了国内软件开发,在网上搜索并验证了一些方法.现在整理出来一个可以正常使用的方法, 在windows和Lin…...
【AI-32】浅显易懂地说一下LangChain
好的!我来用最通俗的方式解释一下 LangChain 是什么,以及它为什么在 AI 开发中如此重要。 一句话理解 LangChain LangChain 是一个帮你快速搭建AI应用的工具箱,它把大型语言模型(如 GPT)和外部数据、计算工具、业务流…...
使用 Nginx 搭建代理服务器(正向代理 HTTPS 网站)指南
在网络应用中,代理服务器是用于中转用户请求和服务端响应的工具。正向代理主要用于客户端与外部服务器之间的访问代理,帮助客户端隐藏其 IP 地址或访问受限资源。本文将详细介绍如何使用 Nginx 搭建正向代理服务器,特别是针对 HTTPS 网站的代…...
Linux学习笔记之进程
进程 进程的定义 进程是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配的基本单位,也是操作系统结构的基础。 例如当QQ程序运行的时候,计算机会先从磁盘读取QQ程序到内存,然后OS管理这个程序,…...
win11 终端乱码导致IDE 各种输出也乱码
因为 win11 终端乱码导致IDE 各种输出也乱码导致作者对此十分头大。所以研究了各种方法。 单独设置终端编码对 HKEY_CURRENT_USER\Console 注册表进行修改对 HKEY_LOCAL_MACHINE\Software\Microsoft\Command Processo 注册表进行修改使用命令[Console]::OutputEncoding [Syst…...
iOS主要知识点梳理回顾-5-运行时方法交换
方法交换可以放在 load 或 initialize 方法中,也可以自己根据时机来空,比如开启某个开关后才需要交换方法。如果是在load中调用,交换工作会在类加载时(程序启动)自动调用;如果是在initialize中调用…...
python后端调用Deep Seek API
python后端调用Deep Seek API 需要依次下载 ●Ollama ●Deepseek R1 LLM模型 ●嵌入模型nomic-embed-text / bge-m3 ●AnythingLLM 参考教程: Deepseek R1打造本地化RAG知识库:安装部署使用详细教程 手把手教你:deepseek R1基于 AnythingLLM API 调用本地…...
Unity开发播放视频
Unity开发播放视频 介绍VideoPlayer原理步骤VideoPlayer动态加载 总结 介绍 原生Unity播放视频VideoPlayer播放视频,这里我没有选择使用插件,还有一个播放视频的插件也可以AVPro Video,这里不过多介绍就说一下原生VideoPlayer的基础用法。 …...
【练习】图论
F. Friendly Group 图中选择一个点-1 边两端点都选择1 边一个端点选择-1 添加链接描述 #include<iostream> using namespace std; #include<vector> #include<cstring> const int N300010; int n,m; vector<int> G[N]; int temp1,temp2; bool vis[N…...
gis风场加载
https://gitee.com/openGPL json数据格式使用v1 “sakitam-gis/mapbox-wind”: “1.1.2” import { WindLayer } from "sakitam-gis/mapbox-wind"; windLayer new WindLayer("wind", res.data, {windOptions: {// colorScale: (m) > {// // consol…...
大疆无人机需要的kml文件如何制作kml导出(大疆KML文件)
大疆无人机需要的轨迹kml文件,是一种专门的格式,这个kml里面只有轨迹点,其它的属性信息都不需要。 BigemapPro提供了专门的大疆格式输出, 软件这里下载 www.bigemap.com 安装后,kml导入如下图: 然后选择…...
【CXX】2 CXX blobstore客户端说明
本示例演示了一个调用blobstore服务的C客户端的Rust应用程序。事实上,我们会看到两个方向的调用:Rust到C以及C到Rust。对于您自己的用例,您可能只需要其中一个方向。 示例中涉及的所有代码都显示在此页面上,但它也以可运行的形式提…...
图数据库neo4j进阶(一):csv文件导入节点及关系
CSV 一、load csv二、neo4j-admin import<一>、导入入口<二>、文件准备<三>、命令详解 一、load csv 在neo4j Browser中使用Cypher语句LOAD CSV,对于数据量比较大的情况,建议先运行create constraint语句来生成约束 create constraint for (s:Student) req…...
3.3 学习UVM中的uvm_driver 类分为几步?
文章目录 前言1. 定义2. 核心功能3. 适用场景4. 使用方法5. 完整代码示例5.1 事务类定义5.2 Driver 类定义5.3 Sequencer 类定义5.4 测试平台 6. 代码说明7. 总结 前言 以下是关于 UVM 中 uvm_driver 的详细解释、核心功能、适用场景、使用方法以及一个完整的代码示例ÿ…...
Docker 常用命令基础详解(一)
一、Docker 初相识 在当今数字化时代,软件开发和部署的效率与灵活性成为了关键因素。Docker,作为一款开源的应用容器引擎,犹如一颗璀璨的明星,照亮了软件开发与部署的道路,为开发者们带来了前所未有的便利。它就像是一…...
机器学习之AAE(Adversarial Autoencoder,对抗自编码器)生成数据样本
对抗自编码器(Adversarial Autoencoder,AAE)是一种结合了自编码器(Autoencoder)和生成对抗网络(GAN)的方法,用于生成新数据样本。它的目标是学习到数据的潜在分布,并通过生成对抗训练来优化潜在空间,使其接近一个已知的简单分布(例如,高斯分布)。 AAE的结构和训练…...
用大模型学大模型03-数学基础 概率论 最大似然估计(MLE)最大后验估计(MAP)
https://metaso.cn/s/r4kq4Ni 什么是最大似然估计(MLE)最大后验估计(MAP)?深度学习中如何应用,举例说明。 好的,我现在需要回答关于最大似然估计(MLE)和最大后验估计&…...
名词解释:npm,cnpm,yarn,vite,vue,electron
1. npm (Node Package Manager) 读音: “N-P-M” 或者直接读作 “npm”。 npm 是 Node.js 的官方包管理器,用于安装、发布和管理 JavaScript 软件包。它允许开发者轻松地共享代码,并且可以通过命令行工具来管理依赖关系。通过 npm init 命令可以交互式…...
ESXi安装【真机和虚拟机】(超详细)
项目简介: ESXi(Elastic Sky X Integrated)是VMware公司开发的一种裸机虚拟化管理程序,允许用户在单一物理服务器上运行多个虚拟机(VM)。它直接安装在服务器硬件上,而不是操作系统之上ÿ…...
每日一题——矩阵最长递增路径
矩阵最长递增路径问题 题目描述数据范围:进阶要求:示例示例 1示例 2 题解思路算法步骤:代码实现代码解释复杂度分析总结 题目描述 给定一个 n 行 m 列的矩阵 matrix,矩阵内所有数均为非负整数。你需要在矩阵中找到一条最长路径&a…...
设置ollama接口能外部访问
为了配置Ollama以允许外网访问,你可以按照以下步骤进行操作: 确认Ollama服务已正确安装并运行: 使用以下命令检查Ollama服务的状态: bash Copy Code systemctl status ollama如果服务未运行,使用以下命令启动它&…...
TOML介绍
0 Preface/Foreword TOML,一种配置文件格式。Toms Obvious Minimal Language. 1 介绍 TOML: Toms Obvious Minimal Language,“显而易见的最小化语言 ” JSON:不支持注释 YAML:过于复杂...
macOS部署DeepSeek-r1
好奇,跟着网友们的操作试了一下 网上方案很多,主要参考的是这篇 DeepSeek 接入 PyCharm,轻松助力编程_pycharm deepseek-CSDN博客 方案是:PyCharm CodeGPT插件 DeepSeek-r1:1.5b 假设已经安装好了PyCharm PyCharm: the Pyth…...
从云原生到 AI 原生,谈谈我经历的网关发展历程和趋势
作者:谢吉宝(唐三) 编者按: 云原生 API 网关系列教程即将推出,欢迎文末查看教程内容。本文整理自阿里云智能集团资深技术专家,云原生产品线中间件负责人谢吉宝(唐三) 在云栖大会的精…...
京东 旋转验证码 分析
声明: 本文章中所有内容仅供学习交流使用,不用于其他任何目的,抓包内容、敏感网址、数据接口等均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关! 逆向分析 使用的第三方接码平台识别…...
R18 XR L1 enhancement
这篇是R18 XR的最后一部分,主要是L1方面的增强。 这部分增强大概的背景如下。 一些 XR 应用(例如增强现实)不仅在 DL 方向需要高数据速率,在 UL 方向也需要高数据速率。如果应用需要在 UL 方向传输视频流量,则 UL 中支持的 XR 用户数量可能非常有限。因此,增加有限的时间…...
利用Java爬虫按图搜索1688商品(拍立淘):实战案例指南
在电商领域,按图搜索功能(如1688的“拍立淘”)为用户提供了更直观、便捷的购物体验。通过上传图片,用户可以快速找到与图片相似的商品。本文将详细介绍如何利用Java爬虫技术实现按图搜索1688商品,并获取其详情数据。 …...
算法-计算字符的最短距离
力扣题目:821. 字符的最短距离 - 力扣(LeetCode) 给你一个字符串 s 和一个字符 c ,且 c 是 s 中出现过的字符。 返回一个整数数组 answer ,其中 answer.length s.length 且 answer[i] 是 s 中从下标 i 到离它 最近 …...
sqlilabs--小实验
一、先盲注判断 ?id1 and sleep(2)-- 如果发现页面存在注点,使用时间盲注脚本进行注入 import requestsdef inject_database(url):name for i in range(1, 20): # 假设数据库名称长度不超过20low 48 # 0high 122 # zmiddle (low high) // 2while low &l…...
【JavaScript爬虫记录】记录一下使用JavaScript爬取m4s流视频过程(内含ffmpeg合并)
前言 前段时间发现了一个很喜欢的视频,可惜网站不让下载,简单看了一下视频是被切片成m4s格式的流文件,初步想法是将所有的流文件下载下来然后使用ffmpeg合并成一个完整的mp4,于是写了一段脚本来实现一下,电脑没有配python环境,所以使用JavaScript实现,合并功能需要安装ffmpeg,…...
腿足机器人之一- 机械与电子组件概览
腿足机器人之一机械与电子组件概览 引言机械组件骨架材料关节设计关节机械组件轴承(ings)连杆(Linkages)齿轮(Gears) 电气组件电机控制器传感器 四足机器人设计双足机器人设计波士顿Atlas机器人 引言 腿足…...
利用二分法+布尔盲注、时间盲注进行sql注入
一、布尔盲注: import requestsdef binary_search_character(url, query, index, low32, high127):while low < high:mid (low high 1) // 2payload f"1 AND ASCII(SUBSTRING(({query}),{index},1)) > {mid} -- "res {"id": payloa…...
本地部署DeepSeek Nodejs版
目录 1.下载 Ollama 2.下载DeepSeek模型 3.下载 ollama.js 1.下载 Ollama https://ollama.com/ 下载之后点击安装,等待安装成功后,打开cmd窗口,输入以下指令: ollama -v 如果显示了版本号,则代表已经下载成功了。…...
mapbox进阶,添加绘图扩展插件,绘制任意方向矩形
👨⚕️ 主页: gis分享者 👨⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨⚕️ 收录于专栏:mapbox 从入门到精通 文章目录 一、🍀前言1.1 ☘️mapboxgl.Map 地图对象1.2 ☘️mapboxgl.Map style属性1.3 ☘️MapboxDraw 绘图控件二、🍀添加绘图扩…...
哈希槽算法与一致性哈希算法比较
Redis 集群模式使用的 哈希槽(Hash Slot) 算法与传统的 一致性哈希(Consistent Hashing) 算法在数据分布和节点管理上有显著的区别。以下是两者的详细比较: 1. Redis 哈希槽算法 1.1 基本原理 Redis 集群将整个数据集…...
DeepSeek+Excel 效率翻倍
2025年初,DeepSeek以惊人的效率突破技术壁垒,用极低的成本实现了与行业顶尖AI相媲美的性能,瞬间成为全球科技领域的热门话题。 那么AI工具的普及将如何改变我们的工作方式?Excel会被取代吗? 今天,珠珠带你…...
【个人开发】cuda12.6安装vllm安装实践【内含踩坑经验】
1. 背景 vLLM是一个快速且易于使用的LLM推理和服务库。企业级应用比较普遍,尝试安装相关环境,尝试使用。 2. 环境 模块版本python3.10CUDA12.6torch2.5.1xformers0.0.28.post3flash_attn2.7.4vllm0.6.4.post1 2.1 安装flash_attn 具体选择什么版本&…...
Prompt通用技巧
Prompt 的典型构成 角色:给 AI定义一个最匹配任务的角色,比如:「你是一位软件工程师」「你是一位小学老师」指示:对任务进行描述上下文: 给出与任务相关的其它背景信息(尤其在多轮交互中)。例子 : 必要时给出举例,学术中称为 one-shot learning,few-sho…...
【R语言】方差分析
一、基本术语 在R语言以及更广泛的统计学领域中,方差分析(ANOVA,即Analysis of Variance)是一种用于比较两个或更多组数据的均值是否存在显著差异的统计方法。可以使用aov()函数或其他相关函数(如anova())…...
XSS 常用标签及绕过姿势总结
XSS 常用标签及绕过姿势总结 一、xss 常见标签语句 0x01. 标签 <a href"javascript:alert(1)">test</a> <a href"x" onfocus"alert(xss);" autofocus"">xss</a> <a href"x" onclickeval(&quo…...
haproxy详解笔记
一、概述 HAProxy(High Availability Proxy)是一款开源的高性能 TCP/HTTP 负载均衡器和代理服务器,用于将大量并发连接分发到多个服务器上,从而提高系统的可用性和负载能力。它支持多种负载均衡算法,能够根据服务器的…...
「软件设计模式」工厂方法模式 vs 抽象工厂模式
前言 在软件工程领域,设计模式是解决常见问题的经典方案。本文将深入探讨两种创建型模式:工厂方法模式和抽象工厂模式,通过理论解析与实战代码示例,帮助开发者掌握这两种模式的精髓。 一、工厂方法模式(Factory Metho…...
Flutter_学习记录_数据更新的学习
Flutter 如果界面上有数据更新时,目前学习到的有3种: 第一种: 直接用 StatefulWidget组件,然后当数据更新时,调用setState的方法更新数据,页面上的数据会直接更新;第二种: 用 State…...
淘宝订单列表Fragment转场动画卡顿解决方案
如何应对产品形态与产品节奏相对确定情况下转变为『在业务需求与产品形态高度不确定性的情况下,如何实现业务交付时间与交付质量的确定性』。我们希望通过混合架构(Native 业务容器 Weex 2.0)作为未来交易终端架构的重要演进方向,…...
【状态空间方程】对于状态空间方程矩阵D≠0时的状态反馈与滑模控制
又到新的一年啦,2025新年快乐~。前几个月都没更新,主要还是因为不能把项目上的私密工作写进去,所以暂时没啥可写的。最近在山里实习,突然想起年前遗留了个问题一直没解决,没想到这两天在deepseek的加持下很快解决了&am…...
优雅的git log输出内容更加醒目
执行命令 git config --global alias.lg "log --graph --prettyformat:%C(red)%h%C(reset) - %C(yellow)%d%C(reset) %C(magenta)<%an>%C(reset) %C(cyan)(%ad)%C(reset) %C(green)%s%C(reset) (%cr) --abbrev-commit --dateformat:%Y-%m-%d %H:%M:%S"...
PySide(PyQT)使用场景(QGraphicsScene)进行动态标注的一个demo
用以标注图像的一个基本框架demo import sys from PySide6.QtWidgets import QApplication, QGraphicsView, QGraphicsScene, QMainWindow, QLabel, QGraphicsPixmapItem from PySide6.QtGui import QPixmap, QPainter, QTransform from PySide6.QtCore import Qt, QPointF, S…...
LeetCode每日精进:876.链表的中间结点
题目链接:876.链表的中间结点 题目描述: 给你单链表的头结点 head ,请你找出并返回链表的中间结点。 如果有两个中间结点,则返回第二个中间结点。 示例 1: 输入:head [1,2,3,4,5] 输出:[3,4,5…...