Azure Delta Lake、Databricks和Event Hubs实现实时欺诈检测
设计Azure云架构方案实现Azure Delta Lake和Azure Databricks,结合 Azure Event Hubs/Kafka 摄入实时数据,通过 Delta Lake 实现 Exactly-Once 语义,实时欺诈检测(流数据写入 Delta Lake,批处理模型实时更新),以及具体实现的详细步骤和关键PySpark代码。
完整实现代码需要根据具体数据格式和业务规则进行调整,建议通过Databricks Repos进行CI/CD管理。
一、架构设计
- 数据摄入层:Azure Event Hubs/Kafka接收实时交易数据
- 流处理层:Databricks Structured Streaming处理实时数据流
- 存储层:Delta Lake实现ACID事务和版本控制
- 模型服务层:MLflow模型注册+批处理模型更新
- 计算层:Databricks自动伸缩集群
二、关键实现步骤
1. 环境准备
# 创建Azure资源
az eventhubs namespace create --name fraud-detection-eh --resource-group myRG --location eastus
az storage account create --name deltalakedemo --resource-group myRG --location eastus
2. 实时数据摄入(PySpark)
from pyspark.sql.streaming import StreamingQueryevent_hub_conf = {"eventhubs.connectionString": sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt("<CONNECTION_STRING>")
}raw_stream = (spark.readStream.format("eventhubs").options(**event_hub_conf).load())# Schema示例
from pyspark.sql.types import *
transaction_schema = StructType([StructField("transaction_id", StringType()),StructField("user_id", StringType()),StructField("amount", DoubleType()),StructField("timestamp", TimestampType()),StructField("location", StringType())
])parsed_stream = raw_stream.select(from_json(col("body").cast("string"), transaction_schema).alias("data")
).select("data.*")
3. Exactly-Once实现
delta_path = "abfss://delta@deltalakedemo.dfs.core.windows.net/transactions"
checkpoint_path = "/delta/checkpoints/fraud_detection"(parsed_stream.writeStream.format("delta").outputMode("append").option("checkpointLocation", checkpoint_path).trigger(processingTime="10 seconds").start(delta_path))
4. 实时欺诈检测
from pyspark.ml import PipelineModel# 加载预训练模型
model = PipelineModel.load("dbfs:/models/fraud_detection/v1")def predict_batch(df, epoch_id):# 去重处理df = df.dropDuplicates(["transaction_id"])# 特征工程df = feature_engineering(df)# 模型预测predictions = model.transform(df)# 写入警报表(predictions.filter(col("prediction") == 1).write.format("delta").mode("append").saveAsTable("fraud_alerts"))return dfstreaming_query = (parsed_stream.writeStream.foreachBatch(predict_batch).trigger(processingTime="30 seconds").start())
5. 模型更新(批处理)
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssemblerdef retrain_model():# 读取增量数据latest_data = spark.read.format("delta").load(delta_path)# 特征工程train_df = feature_engineering(latest_data)# 定义模型assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")gbt = GBTClassifier(maxIter=10)pipeline = Pipeline(stages=[assembler, gbt])# 训练model = pipeline.fit(train_df)# 版本控制model.write().overwrite().save("dbfs:/models/fraud_detection/v2")# 注册到MLflowmlflow.spark.log_model(model, "fraud_detection", registered_model_name="Fraud_GBT")# 每天调度执行
spark.sparkContext.addPyFile("retrain.py")
dbutils.library.restartPython()
6. 动态模型加载(流处理增强)
model_version = 1 # 初始版本def predict_batch(df, epoch_id):global model_versiontry:# 检查模型更新latest_model = get_latest_model_version()if latest_model > model_version:model = PipelineModel.load(f"dbfs:/models/fraud_detection/v{latest_model}")model_version = latest_modelexcept:pass# 剩余预测逻辑保持不变
三、关键技术点
-
Exactly-Once保障:
- 通过Delta Lake事务日志保证原子性写入
- 检查点机制+唯一transaction_id去重
- 使用Event Hubs的epoch机制避免重复消费
-
流批统一架构:
- 使用Delta Time Travel实现增量处理
latest_data = spark.read.format("delta") \.option("timestampAsOf", last_processed_time) \.table("transactions")
-
性能优化:
- Z-Order优化加速特征查询
spark.sql("OPTIMIZE fraud_alerts ZORDER BY (user_id)")
- 自动压缩小文件
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
-
监控告警:
display(streaming_query.lastProgress)
四、部署建议
- 使用Databricks Jobs调度批处理作业
- 通过Cluster Policy控制计算资源
- 启用Delta Lake的Change Data Feed
- 使用Azure Monitor进行全链路监控
五、扩展建议
- 添加特征存储(Feature Store)
- 实现模型A/B测试
- 集成Azure Synapse进行交互式分析
- 添加实时仪表板(Power BI)
该方案特点:
- 利用Delta Lake的ACID特性保证端到端的Exactly-Once
- 流批统一架构减少维护成本
- 模型热更新机制保证检测实时性
- 自动伸缩能力应对流量波动
相关文章:
Azure Delta Lake、Databricks和Event Hubs实现实时欺诈检测
设计Azure云架构方案实现Azure Delta Lake和Azure Databricks,结合 Azure Event Hubs/Kafka 摄入实时数据,通过 Delta Lake 实现 Exactly-Once 语义,实时欺诈检测(流数据写入 Delta Lake,批处理模型实时更新࿰…...
【从零开始学习计算机科学】软件测试(十)嵌入式系统测试、游戏开发与测试过程、移动应用软件测试 与 云应用软件测试
【从零开始学习计算机科学】软件测试(十)嵌入式系统测试、游戏开发与测试过程、移动应用软件测试 与 云应用软件测试 嵌入式系统测试测试策略及测试流程嵌入式软件测试问题及测试方法嵌入式软件的测试流程游戏开发与测试过程游戏开发与通用软件的开发过程区别游戏测试主要内容…...
C#零基础入门篇(18. 文件操作指南)
## 一、文件操作基础 在C#中,文件操作主要通过System.IO命名空间中的类来实现,例如File、FileStream、FileInfo等。 ## 二、常用文件操作方法 ### (一)文件读取 1. **使用File.ReadAllText方法读取文件内容为字符串** …...
深入探究 JVM 堆的垃圾回收机制(一)— 判活
垃圾回收分为两步:1)判定对象是否存活。2)将“消亡”的对象进行内存回收。 1 判定对象存活 可达性分析算法:通过一系列“GC Roots”对象作为起始节点集,从这些节点开始,根据引用关系向下搜索,…...
SQL优化主要有哪些方式
对经常查询的区分度高的条件字段建立索引,也就是用在where条件里的字段。使用没有建立索引的非主键字段作为条件查询时,会进行全表扫描,因为这个字段的数据分步是不规律的,但是需要避免在频繁更新的字段上建立索引,因为…...
基于Spring Boot的公司资产网站的设计与实现(LW+源码+讲解)
专注于大学生项目实战开发,讲解,毕业答疑辅导,欢迎高校老师/同行前辈交流合作✌。 技术范围:SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容:…...
笔记本电脑关不了机是怎么回事 这有解决方法
在快节奏的现代生活中,笔记本电脑已成为我们工作、学习和娱乐的得力助手。在使用电脑的过程中,笔记本电脑突然关不了机了,怎么回事?下面驱动人生就来讲一讲笔记本电脑不能正常关机的解决方法,有需要的可以来看看。 一、…...
OSPF 协议详解:从概念原理到配置实践的全网互通实现
什么是OSPF OSPF(开放最短路径优先)是由IETF开发的基于链路状态的自治系统内部路由协议,用来代替存在一些问题的RIP协议。与距离矢量协议不同,链路状态路由协议关心网络中链路活接口的状态(包括UP、DOWN、IP地址、掩码…...
【C++】多态
目录 文章目录 前言 一、多态的概念 二、多态的定义及实现 三、重载/重写/隐藏的对比 四、纯虚函数和抽象类 五、多态的原理 总结 前言 本文主要讲述C中的多态,涉及的概念有虚函数、协变、纯虚函数、抽象类、虚表指针和虚函数表等。 一、多态的概念 多态分…...
CentOS 8 停止维护后通过 rpm 包手动安装 docker
根据 Docker官方文档 的指引,进入 Docker rpm 包下载的地址,根据自己系统的架构和具体版本选择对应的路径 这里使用 Index of linux/centos/7/x86_64/stable/ 版本,根据 docker 官方的给出的安装命令选择性的下载对应的 rpm 包 最终使用 yum …...
STT-MRAM CIM 赋能边缘 AI:高性能噪声鲁棒贝叶斯神经网络宏架构详解
引言 近年来,基于卷积神经网络(CNN)和视觉转换器(ViT)的存算一体(CIM)边缘AI设备因其低延迟、高能效、低成本等性能受到越来越广泛的关注。然而,当环境中存在噪声时(例如…...
Performance Hub Active Report
Performance Hub 是 Oracle Enterprise Manager Database Express (EM Express) 中的一项功能,可提供给定时间范围内所有性能数据的新整合视图。用户可以使用 Database Express 页面顶部的时间选择器选择时间范围,详细信息选项卡将…...
小白闯AI:Llama模型Lora中文微调实战
文章目录 0、缘起一、如何对大模型进行微调二、模型微调实战0、准备环境1、准备数据2、模型微调第一步、获取基础的预训练模型第二步:预处理数据集第三步:进行模型微调第四步:将微调后的模型保存到本地4、模型验证5、Ollama集成部署6、结果测试三、使用总结AI是什么?他应该…...
【数学建模】TOPSIS法简介及应用
文章目录 TOPSIS法的基本原理TOPSIS法的基本步骤TOPSIS法的应用总结 在 多目标决策分析中,我们常常需要在多个选择中找到一个最优解。 TOPSIS(Technique for Order Preference by Similarity to Ideal Solution)法是一个广泛应用的决策方法…...
优选算法训练篇08--力扣15.三数之和(难度中等)
目录 1.题目链接:15.三数之和 2.题目描述: 3.解法(排序双指针) 1.题目链接:15.三数之和 2.题目描述: 给你一个整数数组 nums ,判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k &…...
【docker】--- 详解 WSL2 中的 Ubuntu 和 Docker Desktop 的区别和关系!
在编程的艺术世界里,代码和灵感需要寻找到最佳的交融点,才能打造出令人为之惊叹的作品。而在这座秋知叶i博客的殿堂里,我们将共同追寻这种完美结合,为未来的世界留下属于我们的独特印记。【WSL 】--- Windows11 迁移 WSL 超详细指南 —— 给室友换一个宿舍! 开发环境一、引…...
RAG 架构地基工程-Retrieval 模块的系统设计分享
目录 一、知识注入的关键前奏——RAG 系统中的检索综述 (一)模块定位:连接语言模型与知识世界的桥梁 (二)核心任务:四大关键问题的协调解法 (三)系统特征:性能、精度…...
解决stm32引脚如果选择输入模式
1. 输入模式分类 STM32的GPIO输入模式主要分为以下四种: 浮空输入(Floating Input / Input Floating) 上拉输入(Input Pull-Up) 下拉输入(Input Pull-Down) 模拟输入(Analog Inp…...
Java 填充 PDF 模版
制作 PDF 模版 安装 OnlyOffice 从 OnlyOffice 官网下载 OnlyOffice Desktop,安装过程很简单,一路下一步即可。用 OnlyOffice 制作 PDF 模版(表单) 使用 OnlyOffice 表单设计器,制作表单,如下图 注意命名…...
Maven安装与环境配置
首先我们先介绍一些关于Maven的知识,如果着急直接看下面的安装教程。 目录 Maven介绍 Maven模型 Maven仓库 Maven安装 下载 安装步骤 Maven介绍 Apache Maven是一个项目管理和构建工具,它基于项目对象模型(Project Object Model , 简称: POM)的概念…...
鸿蒙HarmonyOS NEXT应用崩溃分析及修复
鸿蒙HarmonyOS NEXT应用崩溃分析及修复 如何保证应用的健壮性,其中一个指标就是看崩溃率,如何降低崩溃率,就需要知道存在哪些崩溃,然后对症下药,解决崩溃。那么鸿蒙应用中存在哪些崩溃类型呢?又改如何解决…...
基于PySide6的CATIA自动化工具开发实战——空几何体批量清理系统
一、功能概述 本工具通过PySide6构建用户界面,结合PyCATIA库实现CATIA V5的自动化操作,提供两大核心功能: 空几何体清理:智能识别并删除零件文档中的无内容几何体(Bodies)空几何图形集清理࿱…...
【CSS文字渐变动画】
CSS文字渐变动画 HTML代码CSS代码效果图 HTML代码 <div class"title"><h1>今天是春分</h1><p>正是春天到来的日子,花都开了,小鸟也飞回来了,大山也绿了起来,空气也有点嫩嫩的气息了</p>…...
Mysql深分页的解决方案
在数据量非常大的情况下,深分页查询则变得很常见,深分页会导致MySQL需要扫描大量前面的数据,从而效率低下。例如,使用LIMIT 100000, 10时,MySQL需要扫描前100000条数据才能找到第10000页的数据。 在MySQL中解决深分页…...
使用pycel将Excel移植到Python
1.适用需求 有些工作可能长期适用excel来进行公式计算,当需要把工作流程转换为可视化界面时,开发人员不懂专业逻辑,手动摸索公式很大可能出错,而且费时费力 2.可用工具及缺点 pandas 方便进行数据处理,支持各种格…...
Apache Tomcat CVE-2025-24813 安全漏洞
Apache Tomcat CVE-2025-24813被广泛利用,但是他必须要满足两个点: 1.被广泛的使用,并且部署在服务器中。 2.漏洞必须依赖在服务器中的配置。 并且漏洞补丁已经发布。 漏洞攻击方式: CVE-2025-24813 是 Apache Tomcat 部分 PUT…...
Spring常用注解汇总
1. IOC容器与Bean管理 注解说明示例Component通用注解,标记类为Spring Bean Component public class MyService { ... } Controller标记Web控制器(应用在MVC的控制层) Controller public class UserController { ... } Service标记业务逻辑层…...
【CXX-Qt】2.1.1 为 WebAssembly 构建
CXX-Qt 及其编写的应用程序可以编译为 WebAssembly,但存在一些限制。以下是关于如何为 WASM 目标构建的详细说明。 你需要安装 Qt for WebAssembly。下一篇将展示已测试的版本。 此外,如果尚未完成,请从此处克隆 emsdk git 仓库。 使用正确…...
MySql创建分区表并且按月分区
前言 在mysql中,按月份分区,再使用分区字段时间来查询数据将会很快,因为这样只需要扫描指定的分区。因此,在处理大量数据时,使用分区表是一个非常好的选择。 1、创建表,并使用RANGE COLUMNS分区 按创建时间…...
YOLO-UniOW: 高效通用开放世界目标检测模型【附论文与源码】
《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…...
Flink实战教程从入门到精通(基础篇)(一)Flink简介
目录 一、Flink 二、谁在用Flink? 三、Flink特点 1、批流统一 2、性能卓越 3、规模计算 4、生态兼容性 5、高容错性 四、Flink介绍 1、无界数据 2、有界数据流 3、有状态流处理 五、Flink的发展历史 六、Flink的核心特点 1、高吞吐和低延迟 2、结果的准确性 …...
C/C++编程:Openssl使用 Windows安装包32和64位 RSA加密/解密、AES-GCM加密/解密以及ECDSA签名/验证示例
Openssl的头文件和库 C/C使用openssl,需要openssl的头文件和库,这些都在安装包里。从http://slproweb.com/products/Win32OpenSSL.html下载已经编译好的包含 lib 和 include 文件的安装包。 也可以从官网下载源码,再编译成安装包࿰…...
Es6新特性
1. let 和 const 概念 let:用于声明 块级作用域 的变量。const:用于声明 块级作用域 的常量,声明后不可重新赋值(但可以修改对象的属性或数组的内容)。 原理 JavaScript 在 ES5 中只有全局作用域和函数作用域&…...
大数据学习(80)-数仓分层
🍋🍋大数据学习🍋🍋 🔥系列专栏: 👑哲学语录: 用力所能及,改变世界。 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言📝支持一…...
StarRocks 升级注意事项
前段时间升级了生产环境的 StarRocks,从 3.3.3 升级到了 3.3.9,期间还是踩了不少坑所以在这里记录下。 因为我们的集群使用的是存算分离的版本,也是使用官方提供的 operator 部署在 kubernetes 里的,所以没法按照官方的流程进入虚…...
Java 大视界 -- Java 大数据分布式计算中的通信优化与网络拓扑设计(145)
💖亲爱的朋友们,热烈欢迎来到 青云交的博客!能与诸位在此相逢,我倍感荣幸。在这飞速更迭的时代,我们都渴望一方心灵净土,而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识,也…...
LabVIEW软件长时间运行导致蓝屏问题排查与优化
计算机在长时间运行LabVIEW或其他软件后出现蓝屏(BSOD),通常由硬件资源耗尽、驱动冲突或软件内存泄漏引发。本文提供从日志分析到根本性优化的全流程解决方案,确保系统稳定运行。 一、蓝屏记录查询方法 1. 查看Windows事件日志 操…...
【机密计算顶会解读】11:ACAI——使用 Arm 机密计算架构保护加速器执行
导读:本文介绍ACAI,其构建一个基于CCA的解决方案,使得机密虚拟机能够安全地使用加速器,同时保持与现有应用程序的兼容性和安全性,能够实现对加速器的安全访问。 原文链接:ACAI: Protecting Accelerator Ex…...
【WRF模拟】WPS预处理设置生成文件地址
目录 WPS 运行 geogrid.exe在 namelist.wps 中指定 geogrid.exe 输出路径WPS 运行 ungrid.exe方法 1:在 namelist.wps 中指定输出路径方法 2:手动移动 FILE:* 文件方法 3:使用环境变量 WPS_UNGRIB_OUTPUT(不推荐)另:设置文件链接地址WPS 运行 metgrid.exe方法 1:在 name…...
Midjourney使用教程—2.作品修改
当您已生成第一张Midjourney图像的时候,接下来该做什么?了解我们用于修改图像的工具!使用 Midjourney 制作图像后,您的创意之旅就不会止步于此。您可以使用各种工具来修改和增强图像。 一、放大操作 Midjourney每次会根据提示词…...
基于 ABAP RESTful 应用程序编程模型开发 OData V4 服务
一、概念 以个人图书管理为例,创建一个ABAP RESTful 应用程序编程模型项目。最终要实现的效果: 用于管理书籍的程序。读取、修改和删除书籍。 二、Data Model-数据模型 2.1 创建项目基础数据库表 首先,创建一个图书相关的表,点…...
微信小程序登陆之反向代理
一.背景 在互联网架构中,反向代理是连接客户端与后端服务的核心组件。它的核心价值在于: 安全性:隐藏内部服务细节,防止直接暴露到公网。 负载均衡:分散请求到多个后端实例,提升吞吐量。 SSL终止&#x…...
[解决] PDF转图片,中文乱码或显示方框的解决方案
在Java开发中,将PDF文件转换为图片是一项常见的需求,但过程中可能会遇到中文乱码或显示方框的问题。本文将深入探讨这一问题,并提供详细的解决方案,帮助开发者顺利地完成PDF到图片的转换。 一、问题现象 在使用Java库(如Apache PDFBox)将PDF转换为图片时,如果PDF文件中…...
面试康复训练-SQL语句
一,数据库操作 1查看所有库 show databases; --查看所有库2使用数据库 use 数据库名; --使用数据库; 3查看当前使用数据库 select database(); --查看当前使用的数据库 4 创建数据库 create databse 数据库名 charsetutf8; --创建数据库 5删…...
经典面试题:C/C++中static关键字的三大核心作用与实战应用
一、修饰局部变量:改变生命周期,保留跨调用状态 核心作用: 延长生命周期:将局部变量从栈区移至静态存储区(数据段或BSS段),生命周期与程序一致保留状态:变量在函数多次调用间保…...
Linux固定IP方法(RedHat+Net模式)
1、查看当前网关 ip route | grep default 2、配置静态IP 双击重启 3、验证...
JVM 学习前置知识
JVM 学习前置知识 Java 开发环境层次结构解析 下图展示了 Java 开发环境的层级关系及其核心组件,从底层操作系统到上层开发工具,逐步构建完整的开发与运行环境: 1. 操作系统(Windows, MacOS, Linux, Solaris) 作用&…...
数据结构---图的深度优先遍历(DFS)
一、与树的深度优先遍历之间的联系 1.类似于树的先根遍历。 递归访问各个结点: 2.图的深度优先遍历 先设置一个数组,初始值全部设置为false,先访问一个结点,在用一个循环,依次检查和这个结点相邻的其他结点,…...
QPrintDialog弹出慢的问题
开发环境 操作系统: openkylin2qt版本 : 5.15.10排查过程 首先看下问题的现象, 问题现象 复现问题的demo很简单,只能是从跟踪qt代码方面入手 void MainWindow::on_pushButton_clicked(){QPrinter printer;QPrintDialog dialog(&printer,this);dialog.exec();} 现在需要找一…...
QT-LINUX-Bluetooth蓝牙开发
BlueToothAPI QT-BlueToothApi Qt Bluetooth 6.8.2 官方提供的蓝牙API不支持linux。 D-Bus的API实现蓝牙 确保系统中安装了 BlueZ(版本需≥5.56),并且 Qt 已正确安装并配置了 D-Bus 支持。 默默看了下自己的版本.....D-BUS的API也不支持。 在 D-Bus 中,org 目录是 D-Bus…...