使用FastAPI和Apache Flink构建跨环境数据管道
系统概述
本文介绍如何使用FastAPI微服务、Apache Flink和AWS ElastiCache Redis构建一个可扩展的数据管道,实现本地Apache Hive数据仓库与AWS云上Redis之间的数据交互。
该架构通过FastAPI提供RESTful接口,Apache Flink处理数据流,实现了本地Hive与云上Redis的高效数据交互。部署时需特别注意网络配置和安全设置,确保各组件间通信顺畅。
架构设计
系统架构分为三个主要组件:
+-------------------+ +-------------------+ +-------------------+
| 本地环境 | | Apache Flink | | AWS环境 |
| Apache Hive数据仓库 | <---> | 流处理引擎 | <---> | ElastiCache Redis |
+-------------------+ +-------------------+ +-------------------+
详细设计
1. FastAPI微服务
作为API层,提供与Redis交互的端点:
关键组件:
- 使用
aioredis
实现异步Redis操作 - 提供三种核心端点:GET/POST/DELETE
示例代码:
from fastapi import FastAPI, HTTPException
import aioredis
import jsonapp = FastAPI()
REDIS_URL = "redis://your-elasticache-endpoint:6379"
redis = aioredis.from_url(REDIS_URL, encoding="utf-8", decode_responses=True)@app.get("/data/{key}")
async def get_data(key: str):value = await redis.get(key)if value is None:raise HTTPException(status_code=404, detail="Item not found")return json.loads(value)@app.post("/data/{key}")
async def set_data(key: str, value: dict):await redis.set(key, json.dumps(value))return {"message": "Data stored successfully"}@app.delete("/data/{key}")
async def delete_data(key: str):await redis.delete(key)return {"message": "Data deleted successfully"}
部署方式:
- 使用Uvicorn在EC2实例上运行
- 或通过AWS Elastic Beanstalk部署
- 配置安全组开放8000端口
- 使用AWS Secrets Manager管理Redis凭证
2. Apache Flink流处理器
功能:
- 从本地Hive数据仓库读取数据
- 处理后写入AWS ElastiCache Redis
关键组件:
- Hive Catalog配置
- Redis Sink连接器
示例代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
HiveCatalog hive = new HiveCatalog("my_catalog", "default", hiveConf);
env.addSource(new FlinkHiveSource(...)).addSink(new RedisSink<>(new RedisSinkFunction(...)));
env.execute("Flink Streaming Job");
部署方式:
- 使用Amazon Kinesis Data Analytics
- 或在EC2上自管理
- 配置网络访问权限
- 建议使用AWS Direct Connect确保安全连接
部署步骤
FastAPI部署
- 设置EC2实例或Elastic Beanstalk环境
- 安装依赖:
pip install fastapi aioredis uvicorn
- 运行应用:
uvicorn main:app --host 0.0.0.0 --port 8000
Flink部署
- 在AWS上设置Flink环境
- 配置Hive Catalog和Redis Sink
- 提交并监控Flink作业
Redis配置
- 在AWS创建ElastiCache Redis集群
- 配置安全组和VPC设置
测试用例
FastAPI端点测试
- 测试GET /data/{key}(存在/不存在的键)
- 测试POST /data/{key}(有效/无效数据)
- 测试DELETE /data/{key}(存在/不存在的键)
Flink流处理测试
验证数据能正确从Hive读取
关键Python代码
# FastAPI主程序
from fastapi import FastAPI, HTTPException
import aioredis
import jsonapp = FastAPI()
REDIS_URL = "redis://your-elasticache-endpoint:6379"
redis = aioredis.from_url(REDIS_URL, encoding="utf-8", decode_responses=True)@app.get("/data/{key}")
async def get_data(key: str):value = await redis.get(key)if value is None:raise HTTPException(status_code=404, detail="Item not found")return json.loads(value)@app.post("/data/{key}")
async def set_data(key: str, value: dict):await redis.set(key, json.dumps(value))return {"message": "Data stored successfully"}@app.delete("/data/{key}")
async def delete_data(key: str):await redis.delete(key)return {"message": "Data deleted successfully"}
相关文章:
使用FastAPI和Apache Flink构建跨环境数据管道
系统概述 本文介绍如何使用FastAPI微服务、Apache Flink和AWS ElastiCache Redis构建一个可扩展的数据管道,实现本地Apache Hive数据仓库与AWS云上Redis之间的数据交互。 该架构通过FastAPI提供RESTful接口,Apache Flink处理数据流,实现了本…...
解决 SQL Server 2008 导入 Excel 表卡在“正在初始化数据流”问题
在使用 SQL Server 2008 导入 Excel 表时,可能会遇到卡在“正在初始化数据流”这一令人困扰的情况。笔者近期也遭遇了同样的问题,尝试了多种常规方法均未解决,最终通过特定命令成功化解难题,在此分享解决过程与经验。 一、问题描…...
【Linux系统】从零开始构建简易 Shell:从输入处理到命令执行的深度剖析
文章目录 前言一、打印命令行提示符代码功能概述 二、读取键盘输入的指令2.1 为什么不继续使用scanf()而换成了fgets()?2.2 调试输出的意义2.3 为什么需要去掉换行符? 三、指令切割补充知识: strtok 的函数原型 四、普通命令的执行代码功能概…...
SSRF服务端请求伪造
SSRF:服务端请求伪造 危害:任意文件读取、任意服务探测(通过端口来探测) 例:探测3306端口,看mysql服务是否开启,再通过文件读取,获得mysql配置文件 例:当我们点击链接…...
LVGL的三层屏幕结构
文章目录 🌟 LVGL 的三层屏幕架构1. **Top Layer(顶层)**2. **System Layer(系统层)**3. **Active Screen(当前屏幕层)** 🧠 总结对比🔍 整体作用✅ 普通屏幕层对象&…...
使用互斥锁保护临界
Linux线程互斥及相关概念解析 1. 临界资源(Critical Resource) 定义:被多个线程共享的资源(如变量、文件、内存区域等),需通过互斥访问确保数据一致性。特点: 共享性:多个线程可能…...
5.8线性动态规划2
P1004 [NOIP 2000 提高组] 方格取数 做法1:DFS剪枝 #include<bits/stdc.h> using namespace std; int n, a[10][10], maxs, minx 11, miny 11, maxx, maxy; void dfs(int x, int y, int s, int type){if(type 1 && x minx && y miny){…...
linux系统Ubuntn界面更改为中文显示,配置流程
Linux 系统是一种开源的、多用户的、多任务的操作系统,具有高度的稳定性、安全性和灵活性,被广泛应用于服务器、嵌入式系统、科研、教育以及个人电脑等领域。以下是关于 Linux 系统的一些基本信息: 发展历程:Linux 的发展始于 19…...
Looper死循环阻塞为什么没有ANR
Looper 死循环阻塞没有 ANR 的原因在于 ANR (Application Not Responding) 的检测机制依赖于特定线程的事件处理超时。以下是详细解释: 1. ANR 的触发机制: 主线程 (UI 线程) 阻塞: ANR 最常见的情况是主线程阻塞。Android 系统会监控主线程…...
数字孪生陆上风电场可视化管理系统
图扑软件搭建陆上风电场数字孪生平台,通过高精度建模与实时数据采集,1:1 还原风机设备、输电网络及场区环境。动态展示风机运行参数、发电量、设备健康状态等信息,实现风电场运维管理的智能化、可视化与高效化。...
图像处理篇---MJPEG视频流处理
文章目录 前言一、MJPEG流基础概念MJPEG流特点格式简单无压缩时序HTTP协议传输边界标记 常见应用场景IP摄像头视频流嵌入式设备(如ESP32)视频输出简单视频监控系统 二、基础处理方法方法1:使用OpenCV直接读取优点缺点 方法2:手动解…...
ensp的华为小实验
1.先进行子网划分 2.进行接口的IP地址配置和ospf的简易配置,先做到全网小通 3.进行ospf优化 对区域所有区域域间路由器进行一个汇总 对区域1进行优化 对区域2.3进行nssa设置 4.对ISP的路由进行协议配置 最后ping通5.5.5.5...
webpack和vite区别
webpack将文件视为模块打包 ,从入口文件递归解析依赖,生成依赖图,使用loader处理非JS模块,最终输出到dist目录 因为要解析所有依赖,所以他启动慢 vite利用浏览器对于es模块的原生支持,利用ESM能力&#x…...
从父类到子类:C++ 继承的奇妙旅程(2)
前言: 各位代码航海家,欢迎回到C继承宇宙!上回我们解锁了继承的「基础装备包」,成功驯服了public、protected和花式成员隐藏术。但—— ⚠️前方高能预警: 继承世界的暗流涌动远不止于此!今天我们将勇闯三大…...
ScaleTransition 是 Flutter 中的一个动画组件,用于实现缩放动画效果。
ScaleTransition 是 Flutter 中的一个动画组件,用于实现缩放动画效果。它允许你对子组件进行动态的缩放变换,从而实现平滑的动画效果。ScaleTransition 通常与 AnimationController 和 Tween 一起使用,以控制动画的开始、结束和过渡效果。 基…...
部署RocketMQ
部署环境:jdk8以上,Linux系统 下载和安装指令: wget https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip 显示下载成功: --2025-05-10 11:34:46-- https://archive.apache.org/dist/rocketm…...
从爬虫到网络---<基石9> 在VPS上没搞好Docker项目,把他卸载干净
1.停止并删除所有正在运行的容器 docker ps -a # 查看所有容器 docker stop $(docker ps -aq) # 停止所有容器 docker rm $(docker ps -aq) # 删除所有容器如果提示没有找到容器,可以忽略这些提示。 2.删除所有镜像 docker images # 查看所有镜像 dock…...
每日c/c++题 备战蓝桥杯(P2241 统计方形(数据加强版))
洛谷P2241 统计方形(数据加强版)题解 题目描述 给定一个 n m n \times m nm 的方格棋盘,要求统计其中包含的正方形数量和长方形数量(不包含正方形)。输入为两个正整数 n n n 和 m m m,输出两个整数分…...
LLaVA:开源多模态大语言模型深度解析
一、基本介绍 1.1 项目背景与定位 LLaVA(Large Language and Vision Assistant)是由Haotian Liu等人开发的开源多模态大语言模型,旨在实现GPT-4级别的视觉-语言交互能力。该项目通过视觉指令微调技术,将预训练的视觉编码器与语言模型深度融合,在多个多模态基准测试中达到…...
基于Spring Boot + Vue的母婴商城系统( 前后端分离)
一、项目背景介绍 随着母婴行业在互联网平台的快速发展,越来越多的家庭倾向于在线选购母婴产品。为了提高商品管理效率和用户购物体验,本项目开发了一个基于 Spring Boot Vue 技术栈的母婴商城系统,实现了商品分类、商品浏览、资讯展示、评…...
HNUST湖南科技大学-软件测试期中复习考点(保命版)
使用说明:本复习考点仅用于及格保命。软件测试和其他专业课不太一样,记忆的太多了,只能说考试的时候,想到啥就写啥,多写一点!多写一点!多写一点!(重要事情说三遍…...
【AI智能推荐系统】第七篇:跨领域推荐系统的技术突破与应用场景
第七篇:跨领域推荐系统的技术突破与应用场景 提示语:🔥 “打破数据孤岛,实现1+1>2的推荐效果!深度解析美团、亚马逊如何用跨领域推荐技术实现业务协同,知识迁移核心技术全公开!” 目录 跨领域推荐的商业价值跨领域推荐技术体系 2.1 基于共享表征的学习2.2 迁移学习…...
【现代深度学习技术】注意力机制04:Bahdanau注意力
【作者主页】Francek Chen 【专栏介绍】 ⌈ ⌈ ⌈PyTorch深度学习 ⌋ ⌋ ⌋ 深度学习 (DL, Deep Learning) 特指基于深层神经网络模型和方法的机器学习。它是在统计机器学习、人工神经网络等算法模型基础上,结合当代大数据和大算力的发展而发展出来的。深度学习最重…...
使用FastAPI和React以及MongoDB构建全栈Web应用01 概述
Are you ready to craft digital experiences that captivate and convert? 您准备好打造令人着迷并能带来转变的数字体验了吗? In a world driven by innovation, the demand for robust and scalable web applications has never been higher. Whether you’re…...
Flutter - UIKit开发相关指南 - 概览
环境 Flutter 3.29 macOS Sequoia 15.4.1 Xcode 16.3 概览 UIView与Widgets的比较 在UIKit使用UIView类的对象进行页面开发,布局也是UIView类的对象,在Flutter中使用的是Widget,在概念上Widget可以理解成UIView。 差异: 有效期: Widgets是不可变的,它的生存期只…...
扩容 QCOW2 磁盘镜像文件
🌈 个人主页:Zfox_ 目录 ✅ 一、扩展 QCOW2 文件大小✅ 二、启动虚拟机后扩展分区和文件系统方式一:如果使用的是标准分区(如 /dev/vda1)方式二:使用 gparted(图形工具) ✅ 总结 &am…...
【ts】for in对象时,ts如何正确获取对应的属性值
第一种:for…in keyof:适合需要遍历对象属性键并动态访问值的场景。 keyof typeof obj是ts的类型操作符,用于获取对象obj的所有属性键的联合类型(“name” | “age” | “city”)通过obj[key keyof typeof obj]&…...
软考 系统架构设计师系列知识点之杂项集萃(55)
接前一篇文章:软考 系统架构设计师系列知识点之杂项集萃(54) 第89题 某软件公司欲开发一个Windows平台上的公告板系统。在明确用户需求后,该公司的架构师决定采用Command模式实现该系统的界面显示部分,并设计UML类图如…...
绑定 SSH key(macos)
在 macOS 上绑定 Gitee 或 GitHub 的 SSH Key,通常分为以下几步操作,包括生成 SSH key、添加到 ssh-agent,并配置到 Gitee 或 GitHub 平台。 1. 检查是否已有 SSH Key ls -al ~/.ssh 看看是否已有 id_rsa 或 id_ed25519 等文件。如果没有就…...
PyTorch API 6 - 编译、fft、fx、函数转换、调试、符号追踪
文章目录 torch.compiler延伸阅读 torch.fft快速傅里叶变换辅助函数 torch.func什么是可组合的函数变换?为什么需要可组合的函数变换?延伸阅读 torch.futurestorch.fx概述编写转换函数图结构快速入门图操作直接操作计算图使用 replace_pattern() 进行子图…...
Unreal 从入门到精通之VR常用操作
文章目录 前言1.如何设置VRPawn视角的位置。2.如何播放视频3.如何播放VR全景视频。4.如何打开和关闭VR模式。前言 我们使用Unreal5 开发VR 项目的时候,会遇到很多常见问题。 比如: 1.如何设置VRPawn视角的位置。 2.如何播放视频。 3.如何播放VR全景视频。 4.如何打开和关闭V…...
Dify使用总结
最近完成了一个Dify的项目简单进行总结下搭建服务按照官方文档操作就行就不写了。 进入首页之后由以下组成: 探索、工作室、知识库、工具 探索: 可以展示自己创建的所有应用,一个应用就是一个APP,可以进行测试使用 工作室包含…...
事务连接池
一、事务概述 (一)事务的定义 事务是数据库提供的一种特性,用于确保数据操作的完整性和一致性。事务将多个数据操作组合成一个逻辑单元,这些操作要么全部成功,要么全部失败。 (二)事务的特性…...
如何用AWS Lambda构建无服务器解决方案:实战经验与场景解析
一、为什么开发者都在关注Serverless? 一、为什么开发者都在关注Serverless? 在云计算高速发展的今天,“无服务器架构”正成为技术新宠。根据Gartner预测,到2025年全球将有50%企业采用Serverless技术。而作为无服务器领域的领头…...
Android Compose 框架物理动画之捕捉动画深入剖析(29)
Android Compose 框架物理动画之捕捉动画深入剖析 一、引言 在 Android 应用开发中,动画是提升用户体验的关键元素之一。它能够让界面更加生动、交互更加自然。Android Compose 作为新一代的声明式 UI 框架,为开发者提供了强大且灵活的动画能力。其中&…...
Jmeter中的Json提取器如何使用?
在JMeter中使用JSON提取器可以方便地从JSON格式的响应数据中提取特定字段的值。以下是详细步骤和示例: 1. 添加JSON提取器 右击目标HTTP请求 -> 选择 添加 -> 后置处理器 -> JSON提取器。 2. 配置JSON提取器参数 变量名称(Names of created…...
STM32中断
STM32 GPIO外部中断简图 中断向量表 定义一块固定的内存,以4字节对齐,存放各个中断服务函数程序的首地址 中断向量表定义在启动文件,当发生中断,CPU会自动执行对应的中断服务函数 中断向量表以及中断函数 NVIC嵌套向量中断控制…...
navicat 如何导出数据库表 的这些信息 字段名 类型 描述
navicat 如何导出数据库表 的这些信息 字段名 类型 描述 数据库名字 springbootmt74k 表名字 address SELECT COLUMN_NAME AS 字段名,COLUMN_TYPE AS 类型,COLUMN_COMMENT AS 描述 FROM information_schema.COLUMNS WHERE TABLE_SCHEMA springbootmt74k AND TABLE_NAME a…...
LangGraph(三)——添加记忆
目录 1. 创建MemorySaver检查指针2. 构建并编译Graph3. 与聊天机器人互动4. 问一个后续问题5. 检查State参考 1. 创建MemorySaver检查指针 创建MemorySaver检查指针: from langgraph.checkpoint.memory import MemorySavermemory MemorySaver()这是位于内存中的检…...
数仓-可累计,半累加,不可累加指标,是什么,举例说明及解决方案
目录 1. 可累计指标定义:举例:解决方案: 2. 半累加指标定义:举例:解决方案: 3. 不可累加指标定义:举例:解决方案: 4. 总结对比5. 实际场景中的注意事项 这是数据仓库设计…...
Java ClassLoader双亲委派机制
Java ClassLoader双亲委派机制 1 什么是双亲委派模型 “类加载体系”及ClassLoader双亲委派机制。java程序中的 .java文件编译完会生成 .class文件,而 .class文件就是通过被称为类加载器的ClassLoader加载的,而ClassLoder在加载过程中会使用“双亲委派…...
upload-labs靶场通关详解:第四关
一、分析源代码 可以看出这一关仍然是黑名单验证,但是它禁止了更多的后缀。像php3,php4这类后缀也被加入了黑名单,第三关的方法在这里显然就失效了。那么我们想一想,既然配置文件中存在将php3当作php来执行的功能,那么…...
Webug4.0通关笔记25- 第30关SSRF
目录 一、SSRF简介 1.SSRF原理 2.渗透方法 二、第30关SSRF渗透实战 1.打开靶场 2.渗透实战 (1)Windows靶场修复 (2)Docker靶场修复 (3)获取敏感文件信息 (4)内网端口与服务…...
【 Redis | 实战篇 缓存 】
目录 前言: 1.认识缓存 2.添加Redis缓存 2.1.根据id查询商铺缓存 2.2.优化根据id查询商铺缓存 3.缓存更新策略 3.1.三种策略 3.2.策略选择 3.3.主动更新的方案 3.4. Cache Aside的模式选择 3.5.最佳实践方案 4.缓存三大问题 4.1.缓存穿透 4.1.1.介绍 …...
数字果园管理系统的设计与实现(Tensorflow的害虫识别结合高德API的害虫定位与Websocket的在线聊天室)
文章目录 技术栈主要功能害虫识别与定位害虫识别的实现训练与测试评估代码模型转化为TFLite预测脚本PredictController预测控制器害虫识别过程展示 害虫定位实现害虫定位代码害虫定位过程展示 专家咨询功能在线咨询聊天室主要前端代码如下主要后端代码如下 技术栈 Spring Boot…...
信息检索(包含源码)
实验目的 掌握逻辑回归模型在二分类问题中的应用方法熟悉机器学习模型评估指标PR曲线(精确率-召回率曲线)和ROC曲线(受试者工作特征曲线)的绘制与分析学习使用Python的scikit-learn库进行数据预处理、模型训练与评估理解特征选择…...
【金仓数据库征文】金仓数据库KingbaseES: 技术优势与实践指南(包含安装)
目录 前言 引言 一 : 关于KingbaseES,他有那些优势呢? 核心特性 典型应用场景 政务信息化 金融核心系统: 能源通信行业: 企业级信息系统: 二: 下载安装KingbaseES 三:目录一览表: 四:常用SQL语句 创建表: 修改表结构…...
Java数据结构——二叉树
二叉树 树的概念二叉树满二叉树和完全二叉树二叉树的性质二叉树的遍历 题目练习前序遍历中序遍历后序遍历 前言 已经知道了数据结构中的线性结构,那有没有非线性结构呢? 当然有就像我们文件夹,一个文件夹中有有另一个文件夹,这就是…...
用go从零构建写一个RPC(仿gRPC,tRPC)--- 版本2
在版本1中,虽然系统能够满足基本需求,但随着连接数的增加和处理请求的复杂度上升,性能瓶颈逐渐显现。为了进一步提升系统的稳定性、并发处理能力以及资源的高效利用,版本2引入了三个重要功能:客户端连接池、服务器长连…...
drf 使用jwt
安装jwt pip install pyJwt 添加登录url path("jwt/login",views.JwtLoginView.as_view(),namejwt-login),path("jwt/order",views.JwtOrderView.as_view(),namejwt-order), 创建视图 from django.contrib.auth import authenticateimport jwt from jw…...