Python FastAPI + Celery + RabbitMQ 分布式图片水印处理系统
- FastAPI 服务器
- Celery 任务队列
- RabbitMQ 作为消息代理
- 定时任务处理
首先创建项目结构:
c:\Users\Administrator\Desktop\meitu\
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── celery_app.py
│ ├── tasks.py
│ └── config.py
├── requirements.txt
└── celery_worker.py
- 首先创建 requirements.txt:
fastapi==0.104.1
uvicorn==0.24.0
celery==5.3.4
python-dotenv==1.0.0
requests==2.31.0
- 创建配置文件:
from dotenv import load_dotenv
import osload_dotenv()# RabbitMQ配置
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = os.getenv("RABBITMQ_PORT", "5672")
RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")# Celery配置
CELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}//"
CELERY_RESULT_BACKEND = "rpc://"# 定时任务配置
CELERY_BEAT_SCHEDULE = {'process-images-every-hour': {'task': 'app.tasks.process_images','schedule': 3600.0, # 每小时执行一次},'daily-cleanup': {'task': 'app.tasks.cleanup_old_images','schedule': 86400.0, # 每天执行一次}
}
- 创建 Celery 应用:
from celery import Celery
from app.config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, CELERY_BEAT_SCHEDULEcelery_app = Celery('image_processing',broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND,include=['app.tasks']
)# 配置定时任务
celery_app.conf.beat_schedule = CELERY_BEAT_SCHEDULE
celery_app.conf.timezone = 'Asia/Shanghai'
- 创建任务文件:
from app.celery_app import celery_app
from app.watermark import ImageWatermarker
import os
from datetime import datetime, timedelta@celery_app.task
def add_watermark_task(image_path, text, position='center'):"""异步添加水印任务"""watermarker = ImageWatermarker()try:result_path = watermarker.add_watermark(image_path=image_path,text=text,position=position)return {"status": "success", "output_path": result_path}except Exception as e:return {"status": "error", "message": str(e)}@celery_app.task
def process_images():"""定时处理图片任务"""image_dir = "images/pending"if not os.path.exists(image_dir):return {"status": "error", "message": "Pending directory not found"}processed = 0for image in os.listdir(image_dir):if image.lower().endswith(('.png', '.jpg', '.jpeg')):add_watermark_task.delay(os.path.join(image_dir, image),"自动处理水印",'center')processed += 1return {"status": "success", "processed": processed}@celery_app.task
def cleanup_old_images():"""清理旧图片任务"""output_dir = "images/processed"if not os.path.exists(output_dir):return {"status": "error", "message": "Output directory not found"}threshold_date = datetime.now() - timedelta(days=7)cleaned = 0for image in os.listdir(output_dir):image_path = os.path.join(output_dir, image)if os.path.getctime(image_path) < threshold_date.timestamp():os.remove(image_path)cleaned += 1return {"status": "success", "cleaned": cleaned}
- 创建 FastAPI 应用:
from fastapi import FastAPI, File, UploadFile, BackgroundTasks
from fastapi.responses import JSONResponse
import os
from app.tasks import add_watermark_task
from app.celery_app import celery_appapp = FastAPI(title="图片水印处理服务")@app.post("/upload/")
async def upload_image(file: UploadFile = File(...),text: str = "水印文本",position: str = "center"
):# 保存上传的文件file_path = f"images/uploads/{file.filename}"os.makedirs(os.path.dirname(file_path), exist_ok=True)with open(file_path, "wb") as buffer:content = await file.read()buffer.write(content)# 创建异步任务task = add_watermark_task.delay(file_path, text, position)return JSONResponse({"status": "success","message": "图片已上传并加入处理队列","task_id": task.id})@app.get("/task/{task_id}")
async def get_task_status(task_id: str):task = celery_app.AsyncResult(task_id)if task.ready():return {"status": "completed", "result": task.result}return {"status": "processing"}@app.get("/tasks/scheduled")
async def get_scheduled_tasks():return {"tasks": celery_app.conf.beat_schedule}
- 创建 Celery worker 启动文件:
from app.celery_app import celery_appif __name__ == '__main__':celery_app.start()
使用说明:
- 首先安装依赖:
pip install -r requirements.txt
-
确保 RabbitMQ 服务已启动
-
启动 FastAPI 服务器:
uvicorn app.main:app --reload
- 启动 Celery worker:
celery -A celery_worker.celery_app worker --loglevel=info
- 启动 Celery beat(定时任务):
celery -A celery_worker.celery_app beat --loglevel=info
这个系统提供以下功能:
- 通过 FastAPI 接口上传图片并异步处理水印
- 使用 Celery 处理异步任务队列
- 使用 RabbitMQ 作为消息代理
- 支持定时任务:
- 每小时自动处理待处理图片
- 每天清理一周前的旧图片
- 支持任务状态查询
- 支持查看计划任务列表
API 端点:
- POST /upload/ - 上传图片并创建水印任务
- GET /task/{task_id} - 查询任务状态
- GET /tasks/scheduled - 查看计划任务列表
相关文章:
Python FastAPI + Celery + RabbitMQ 分布式图片水印处理系统
FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理 首先创建项目结构: c:\Users\Administrator\Desktop\meitu\ ├── app/ │ ├── __init__.py │ ├── main.py │ ├── celery_app.py │ ├── tasks.py │ └── config.py…...
阶段项目:Windows 服务器的组建与管理
项目概述 公司简介 创鑫公司是一家新成立的小型 IT 公司 公司决定组建部署一个小型的企业网络 员工人数不到20人 使用一台独立的 Windows 服务器提供各种网络服务 网络拓扑 设计需求 权限部分 权限部分要求 公司的网络管理员对办公计算机和服务器分别进行独立管理ÿ…...
【408】26考研-王道计算机408
王道408考研全套视频资料: 讲义01.26考研王道计算机【C语言督学营】02.【408领学班】26考研王道计算机B站独家03.26考研王道计算机【组成原理领学班】04.26王道计算机【计算机网络领学班】05.26考研王道计算机【数据结构领学班】06.26王道计算机【操作系统领学班】…...
数据分析问题思考路径
一、思考问题 1. 确认问题 因为背景: 因为5月1日的营业额突然下滑了10%,而历史从未出现过类似的跌幅 我想目的: 我想知道本次下滑的原因以此避免再出现这样的异常情况 现在思路: 现在能想到是原因是节假日和产品环节转化异常 最后感谢: 想请你帮我取数分析一下,…...
vue省市区懒加载,用el-cascader 新增和回显
el-cascader对于懒加载有支持方法,小难点在于回显的时候,由于懒加载第一次只有一层,所以要根据选中id数组一层层的加载。 子组件 <template><el-cascaderref"cascaderRef"v-model"selectedValue":props"…...
从零构建大语言模型全栈开发指南:第三部分:训练与优化技术-3.3.3领域适配案例:医疗文本分类与法律合同生成
👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 文章大纲 从零构建大语言模型全栈开发指南-第三部分:训练与优化技术-3.3.3 领域适配案例:医疗文本分类与法律合同生成1. 领域适配的核心挑战与解决方案2. 医疗文本分类:从通用到专业的跃迁2.1 医疗领域适配的技…...
Web网页内嵌 Adobe Pdf Reader 谷歌Chrome在线预览编辑PDF文档
随着数字化办公的普及,PDF文档已成为信息处理的核心载体,虽然桌面端有很多软件可以实现预览编辑PDF文档,而在线在线预览编辑PDF也日益成为一个难题。 作为网页内嵌本地程序的佼佼者——猿大师中间件,之前发布的猿大师办公助手&am…...
Python WebSockets 库详解:从基础到实战
1. 引言 WebSocket 是一种全双工、持久化的网络通信协议,适用于需要低延迟的应用,如实时聊天、股票行情推送、在线协作、多人游戏等。相比传统的 HTTP 轮询方式,WebSocket 减少了带宽开销,提高了实时性。 在 Python 中ÿ…...
php根据一个数组里面的元素顺序来排序另外一个数组的的顺序
根据arr2的顺序来排序arr $arr [[size_id > 9],[size_id > 1],[size_id > 1],[size_id > 6],[size_id > 6],[size_id > 8],];$arr2 [1,9,6,8];usort($arr, function ($item1, $item2) use ($arr2) {return array_search($item1[size_id], $arr2) - array_s…...
从JVM到分布式锁:高并发架构设计的六把密钥
【300秒速览分布式核心技术栈】 作为十年架构老兵,今天用一张图说透高并发系统的底层逻辑: 🔑 JVM锁:synchronized与AQS构筑单机防线,却难逃分布式困局 🔑 数据库锁:MySQL行锁/间隙锁守住…...
《深度剖析SQL游标:复杂数据处理场景下的智慧抉择》
在数据库领域的广袤天地中,SQL游标宛如一把独特的钥匙,为复杂数据处理场景开启了一扇充满可能的大门。它以一种细腻且精准的方式,穿梭于数据库的记录之间,为众多棘手的数据处理难题提供了解决之道。 复杂数据处理场景的挑战 随着…...
【数据分享】中国3254座水库集水区特征数据集(免费获取)
水库在水循环、碳通量、能量平衡中扮演关键角色,实实在在地影响着我们的生活。其功能和环境影响高度依赖于地理位置、上游流域属性(如地形、气候、土地类型)和水库自身的动态特征(如水位、蒸发量)。但在此之前一直缺乏…...
【蓝桥杯每日一题】4.1
🏝️专栏: 【蓝桥杯备篇】 🌅主页: f狐o狸x "今日秃头刷题,明日荣耀加冕!" 今天我们来练习二分算法 不熟悉二分算法的朋友可以看:【C语言刷怪篇】二分法_编程解决算术问题-CSDN博客 …...
PHY——LAN8720A 代码解析 (三)
文章目录 PHY——LAN8720A 代码解析 (三)PHY 源码解析ETH_PHY_IO_InitETH_PHY_IO_DeInitETH_PHY_IO_WriteRegETH_PHY_IO_ReadRegETH_PHY_IO_GetTick LAN8720 源码解析LAN8720_RegisterBusIOLAN8720_InitLAN8720_DisablePowerDownModeLAN8720_EnablePowerDownMode PHY——LAN872…...
【工具】BioPred一个用于精准医疗中生物标志物分析的 R 软件包
介绍 R 语言包 BioPred 提供了一系列用于精准医疗中的亚组分析和生物标志物分析的工具。它借助极端梯度提升(XGBoost)算法,并结合倾向得分加权和 A 学习方法,帮助优化个体化治疗规则,从而简化亚组识别过程。BioPred 还…...
如何修复 SQL Server 数据库中的恢复挂起状态?
原文:如何修复 SQL Server 数据库中的恢复挂起状态? | w3cschool笔记 当我们想与关系数据库交互时,SQL 就会出现并帮助用户与数据库进行交互。SQL 从高级语言中获取用户的输入,然后访问将代码转换为机器可理解的形式。SQL 确实会…...
C++11QT复习 (十)
基类与派生类之间的转换 **Day7-4 基类与派生类之间的转换****一、问题回顾****二、基类与派生类间的转换****1. 类型适应(Upcasting)****2. 逆向转换(Downcasting)** **三、代码示例****四、派生类间的复制控制****五、总结****1…...
Linux——冯 • 诺依曼体系结构操作系统初识
目录 1. 冯 • 诺依曼体系结构 1.1 冯•诺依曼体系结构推导 1.2 内存提高冯•诺依曼体系结构效率的方法 1.3 理解数据流动 2. 初步认识操作系统 2.1 操作系统的概念 2.2 设计OS的目的 3. 操作系统的管理精髓 1. 冯 • 诺依曼体系结构 1.1 冯•诺依曼体系结构推导 计算…...
JVM 学习计划表(2025 版)
JVM 学习计划表(2025 版) 📚 基础阶段(2 周) 1. JVM 核心概念 JVM 作用与体系结构 理解 JVM 在 Java 跨平台运行中的核心作用,掌握类加载子系统、运行时数据区、执行引擎的交互流程内存结构与数据存…...
arm_mat_init_f32用法 dsp库
arm_mat_init_f32 是 CMSIS DSP 库中的一个函数,用于初始化一个浮点矩阵结构体。以下是其使用方法: 函数原型 c复制 void arm_mat_init_f32(arm_matrix_instance_f32 * S,uint16_t nRows,uint16_t nColumns,float32_t * pData ); 参数说明 S…...
【蓝桥杯14天冲刺课题单】Day3
1. 题目链接:1025 答疑 贪心类型的题目做法很简单,只需要保证局部解最优即可保证整体解最优。 这里的思路就是第i个学生前面的人答疑所用的时间最短,那么他所发送短信的时间节点越小。这道题目有个需要注意的点是:要先将前i-1个…...
基于开源AI大模型与S2B2C模式的线下服务型门店增长策略研究——以AI智能名片与小程序源码技术为核心
摘要 在传统零售行业中,商品零售可通过无限流量实现销量增长,但服务型门店(如餐饮、医疗、美容等)因受限于地理位置、服务承载能力及非标化服务特性,需从“流量驱动”转向“复购驱动”增长模式。本研究以“开源AI大…...
批量修改图像命名
打开存放图片的文件 ctrA全选 找到功能栏上的三个点的位置,点击选择复制路径 打开一个Excel表格 将复制的图片路径复制到Excel表格中 选中刚复制的图片路径,点击选择数据->分列->分列 在打开的窗口中选中分隔符号,在点击下一步 选中…...
linux-- 0. C语言过、Java半静对、Python纯动和C++对+C
学习目标: java,CPYTHONC 学习内容: java,CPYTHONC 目录 学习目标: 学习内容: java 纯解释型语言(如 Python)的对比 C语言与Java的核心区别 java,C PYTHON C 学习时间: 学习产出…...
程序化广告行业(50/89):Cookie映射技术深度剖析
程序化广告行业(50/89):Cookie映射技术深度剖析 大家好!一直以来,我都希望能和大家一起深入探索程序化广告行业,共同学习进步。在之前的分享中,我们已经了解了程序化广告的很多关键内容&#x…...
大语言模型智体的综述:方法论、应用和挑战(下)
25年3月来自北京大学、UIC、广东大亚湾大学、中科院计算机网络信息中心、新加坡南阳理工、UCLA、西雅图华盛顿大学、北京外经贸大学、乔治亚理工和腾讯优图的论文“Large Language Model Agent: A Survey on Methodology, Applications and Challenges”。 智体时代已经到来&a…...
【操作系统】Linux进程管理和调试
在 Linux 中,可以通过以下方法查看 PID(进程ID)对应的进程名称和详细信息: 1. 使用 ps 命令(最直接) ps -p <PID> -o pid,comm,cmd示例: ps -p 1234 -o pid,comm,cmd输出: P…...
C++---RAII模式
一、RAII模式概述 1. 定义 RAII(Resource Acquisition Is Initialization)即资源获取即初始化,是C中用于管理资源生命周期的一种重要编程模式。其核心在于将资源的获取和释放操作与对象的生命周期紧密绑定。当对象被创建时,资源…...
Clion刷题攻略-配置Cmake
使用Clion刷题,在一个项目中创建多个main函数,每一个文件对应一道题目,将Clion作为题目管理系统使用,并且cpp文件允许使用中文名,exe文件统一输出到runtime目录,防止污染根目录,CmakeLists文件如…...
DEBUG:file命令
file 命令详解 file 是 Linux/Unix 系统中用于检测文件类型的实用工具。它通过检查文件的**魔数(magic number)**和内容结构来判断文件类型,而不是依赖文件扩展名。 1. 基本语法 file [选项] 文件名... 常用选项 选项说明-b (--brief)简洁…...
hackmyvn-casino
arp-scan -l nmap -sS -v 192.168.255.205 目录扫描 dirsearch -u http://192.168.255.205/ -e * gobuster dir -u http://192.168.255.205 -w /usr/share/wordlists/dirbuster/directory-list-2.3-medium.txt -x php -b 301,401,403,404 80端口 随便注册一个账号 玩游戏时的…...
Elasticsearch笔记
官网 https://www.elastic.co/docs 简介 Elasticsearch 是一个分布式、开源的搜索引擎,专门用于处理大规模的数据搜索和分析。它基于 Apache Lucene 构建,具有实时搜索、分布式计算和高可扩展性,广泛用于 全文检索、日志分析、监控数据分析…...
在Windows下使用Docker部署Nacos注册中心(基于MySQL容器)
需要两个容器Nacos容器和MySQL容器,MySQL容器专注数据存储,Nacos容器专注服务发现/配置管理 准备工作 确保已安装Docker Desktop for Windows确保已启用WSL 2(推荐)或Hyper-V确保Docker服务正在运行 部署步骤 1. 拉取所需镜像 # 拉取MySQL镜像(这里…...
去中心化自治组织(DAO):革新未来治理的下一站
去中心化自治组织(DAO):革新未来治理的下一站 引言 去中心化自治组织(DAO)的诞生,像是互联网时代的一道新曙光。它打破了传统组织的等级壁垒,以去中心化和智能合约为核心,让社区成员能够直接参与决策并共享收益。从NFT社区到投资基金,DAO的应用场景正以前所未有的速…...
ideal自动生成类图的方法
在 IntelliJ IDEA 中,“**在项目资源管理器中选择以下类**” 是指通过 **项目资源管理器(Project Tool Window)** 找到并选中你需要生成类图的类文件(如 .java 文件),然后通过右键菜单或快捷键操作生成类图…...
爬虫获取1688关键字搜索接口的实战指南
在当今电商行业竞争激烈的环境下,数据的重要性不言而喻。1688作为国内领先的B2B电商平台,拥有海量的商品信息,这些数据对于商家的市场分析、选品决策、价格策略制定等都有着重要的价值。本文将详细介绍如何通过爬虫技术获取1688关键字搜索接口…...
视频设备轨迹回放平台EasyCVR渡口码头智能监控系统方案,确保港口安全稳定运行
一、背景 近年来,随着水上交通运输业的快速发展,辖区内渡口码头数量持续增加,船舶运营规模不断扩大,各类船舶活动频繁,给水上交通安全监管带来了巨大挑战。近期发生的多起村民使用无证木船捕鱼导致的伤亡事故…...
使用 Sales_data 类实现交易合并(三十)
1. Sales_data 类定义 假设 Sales_data 类定义在头文件 Sales_data.h 中,其基本定义如下: // Sales_data.h #ifndef SALES_DATA_H #define SALES_DATA_H#include <string>struct Sales_data {std::string bookNo; // ISBN 编号unsigned uni…...
电力系统惯量及其作用解析
电力系统中的惯量是指由同步发电机的旋转质量提供的惯性,用于抵抗系统频率变化的能力。其核心作用及要点如下: 1. 物理基础 转动惯量:同步发电机的转子具有质量,其转动惯量()决定了转子抵抗转速变化的能力…...
HNSW(Hierarchical Navigable Small World,分层可导航小世界)用来高效搜索高维向量的最近邻
HNSW(Hierarchical Navigable Small World,分层可导航小世界)是一种用于 高效最近邻搜索(ANN, Approximate Nearest Neighbors) 的索引结构,专门用于在 高维向量(比如文本、图像、音频的嵌入向量…...
STM32 CAN学习(一)
CAN总线应用最多的是汽车领域。 CAN(Controller Area Network)控制器 局域 网 局域网:把几台电脑连接到一台路由器上,这几台电脑就可以进行通讯了。 控制器在汽车中的专业术语叫做ECU(Electronic Control Unit&…...
高效内存位操作:如何用C++实现数据块交换的性能飞跃?
「性能优化就像考古,每一层都有惊喜」—— 某匿名C工程师 文章目录 问题场景:当内存操作成为性能瓶颈性能深潜:揭开内存操作的面纱内存访问的三重代价原始方案的性能缺陷 性能突破:从编译器视角重构代码方案一:指针魔法…...
Spring Boot向Vue发送消息通过WebSocket实现通信
后端实现步骤 添加Spring Boot WebSocket依赖配置WebSocket端点和消息代理创建控制器,使用SimpMessagingTemplate发送消息 前端实现步骤 安装sockjs-client和stompjs库封装WebSocket连接工具类在Vue组件中建立连接,订阅主题 详细实现步骤 后端&…...
USB转串口数据抓包--Bus hound
Bus Hound是一款强大的总线分析工具。 Bus Hound 支持哪些设备 ? 所有的 IDE , SCSI , USB 和 1394 设备都得到支持,包括磁盘驱动器,鼠 标、扫描仪,网络摄像头,等等。只要是枚举成以上所列的总线类型的…...
Android 使用CameraX实现预览、拍照、录制视频(Java版)
Android 官方关于相机的介绍如下: https://developer.android.google.cn/media/camera/get-started-with-camera?hlzh_cn 一、开始使用 Android 相机 Android相机一般包含前置摄像头和后置摄像头,使用相机可以开发一系列激动人心的应用,例…...
【已解决】Javascript setMonth跨月问题;2025-03-31 setMonth后变成 2025-05-01
文章目录 bug重现解决方法:用第三方插件来实现(不推荐原生代码来实现)。项目中用的有dayjs。若要自己实现,参考 AI给出方案: bug重现 今天(2025-04-01)遇到的一个问题。原代码逻辑大概是这样的…...
DeepSeek技术架构解析:MLA多头潜在注意力
一、前言 我们上一篇已经讲了 DeepSeek技术架构解析:MoE混合专家模型 这一篇我们来说一说DeepSeek的创新之一:MLA多头潜在注意力。 MLA主要通过优化KV-cache来减少显存占用,从而提升推理性能。我们知道这个结论之前,老周带大家…...
02.02、返回倒数第 k 个节点
02.02、[简单] 返回倒数第 k 个节点 1、题目描述 实现一种算法,找出单向链表中倒数第 k 个节点。返回该节点的值。 2、题解思路 本题的关键在于使用双指针法,通过两个指针(fast 和 slow),让 fast 指针比 slow 指针…...
剑指Offer(数据结构与算法面试题精讲)C++版——day2
剑指Offer(数据结构与算法面试题精讲)C++版——day2 题目一:只出现一次的数据题目二:单词长度的最大乘积题目三:排序数组中的两个数字之和题目一:只出现一次的数据 一种很简单的思路是,使用数组存储出现过的元素,比如如果0出现过,那么arr[0]=1,但是有个问题,题目中没…...
nginx的自动跳转https
mkdir /usr/local/nginx/certs/ 创建一个目录 然后用openssl生成证书 编辑nginx的配置文件 自动跳转成功 做一个优化,如果访问的时候后面加了其他的uri也一起自动跳转了...