Python10天突击--Day 2: 实现观察者模式
以下是 Python 实现观察者模式的完整方案,包含同步/异步支持、类型注解、线程安全等特性:
1. 经典观察者模式实现
from abc import ABC, abstractmethod
from typing import List, Anyclass Observer(ABC):"""观察者抽象基类"""@abstractmethoddef update(self, subject: Any) -> None:passclass Subject:"""被观察对象基类"""def __init__(self):self._observers: List[Observer] = []def attach(self, observer: Observer) -> None:if observer not in self._observers:self._observers.append(observer)def detach(self, observer: Observer) -> None:try:self._observers.remove(observer)except ValueError:passdef notify(self) -> None:"""同步通知所有观察者"""for observer in self._observers:observer.update(self)# 使用示例
class TemperatureSensor(Subject):"""具体被观察者:温度传感器"""def __init__(self):super().__init__()self._temperature = 0.0@propertydef temperature(self) -> float:return self._temperature@temperature.setterdef temperature(self, value: float) -> None:self._temperature = valueself.notify() # 温度变化时通知观察者class Display(Observer):"""具体观察者:显示屏"""def update(self, subject: TemperatureSensor) -> None:print(f"当前温度: {subject.temperature}°C")# 客户端代码
sensor = TemperatureSensor()
display = Display()
sensor.attach(display)sensor.temperature = 25.5 # 输出: 当前温度: 25.5°C
2. 线程安全增强版
import threading
from typing import List, Anyclass ThreadSafeSubject:"""线程安全的被观察对象"""def __init__(self):self._observers: List[Observer] = []self._lock = threading.RLock()def attach(self, observer: Observer) -> None:with self._lock:if observer not in self._observers:self._observers.append(observer)def detach(self, observer: Observer) -> None:with self._lock:try:self._observers.remove(observer)except ValueError:passdef notify(self) -> None:"""线程安全的通知"""with self._lock:observers = self._observers.copy()for observer in observers:observer.update(self)
3. 异步观察者模式
import asyncio
from abc import ABC, abstractmethod
from typing import List, Anyclass AsyncObserver(ABC):"""异步观察者接口"""@abstractmethodasync def update(self, subject: Any) -> None:passclass AsyncSubject:"""支持异步通知的被观察对象"""def __init__(self):self._observers: List[AsyncObserver] = []def attach(self, observer: AsyncObserver) -> None:if observer not in self._observers:self._observers.append(observer)async def notify(self) -> None:"""异步通知所有观察者"""await asyncio.gather(*[observer.update(self) for observer in self._observers])# 使用示例
class AsyncTemperatureSensor(AsyncSubject):def __init__(self):super().__init__()self._temp = 0.0async def set_temperature(self, value: float) -> None:self._temp = valueawait self.notify()class CloudLogger(AsyncObserver):async def update(self, subject: AsyncTemperatureSensor) -> None:print(f"云端记录温度: {subject._temp}°C")await asyncio.sleep(0.1) # 模拟网络请求async def main():sensor = AsyncTemperatureSensor()sensor.attach(CloudLogger())await sensor.set_temperature(28.5) # 输出: 云端记录温度: 28.5°Casyncio.run(main())
4. 事件总线实现(发布-订阅模式)
from typing import Dict, List, Callable, Any
import inspectclass EventBus:"""事件总线(高级观察者模式)"""_instance = Nonedef __new__(cls):if not cls._instance:cls._instance = super().__new__(cls)cls._instance._subscriptions: Dict[str, List[Callable]] = {}return cls._instancedef subscribe(self, event_type: str, callback: Callable) -> None:if not inspect.iscoroutinefunction(callback):callback = self._sync_to_async(callback)if event_type not in self._subscriptions:self._subscriptions[event_type] = []self._subscriptions[event_type].append(callback)async def publish(self, event_type: str, **data) -> None:if event_type in self._subscriptions:await asyncio.gather(*[callback(**data) for callback in self._subscriptions[event_type]])@staticmethoddef _sync_to_async(func: Callable) -> Callable:async def wrapper(*args, **kwargs):return func(*args, **kwargs)return wrapper# 使用示例
bus = EventBus()@bus.subscribe("temperature_change")
async def log_temp_change(value: float):print(f"温度变化记录: {value}°C")async def trigger_events():await bus.publish("temperature_change", value=30.0)asyncio.run(trigger_events()) # 输出: 温度变化记录: 30.0°C
5. 带过滤器的观察者模式
from typing import Callable, Anyclass FilteredObserver:"""带条件过滤的观察者"""def __init__(self, callback: Callable, filter_condition: Callable[[Any], bool]):self.callback = callbackself.filter = filter_conditiondef update(self, subject: Any) -> None:if self.filter(subject):self.callback(subject)# 使用示例
sensor = TemperatureSensor()def alert(temp: float):print(f"警报!当前温度过高: {temp}°C")# 只接收温度>30的通知
high_temp_observer = FilteredObserver(callback=alert,filter_condition=lambda s: s.temperature > 30
)sensor.attach(high_temp_observer)
sensor.temperature = 25 # 无输出
sensor.temperature = 35 # 输出: 警报!当前温度过高: 35°C
方案对比
实现方式 | 特点 | 适用场景 |
---|---|---|
经典实现 | 简单直接 | 单线程简单场景 |
线程安全版 | 避免竞态条件 | 多线程环境 |
异步实现 | 非阻塞通知 | I/O密集型应用 |
事件总线 | 松耦合,支持多对多 | 复杂事件系统 |
过滤观察者 | 条件触发 | 需要选择性通知的场景 |
最佳实践建议
-
生命周期管理:
# 使用上下文管理器自动取消注册 class ObserverContext:def __init__(self, subject: Subject, observer: Observer):self.subject = subjectself.observer = observerdef __enter__(self):self.subject.attach(self.observer)return selfdef __exit__(self, *args):self.subject.detach(self.observer)with ObserverContext(sensor, display):sensor.temperature = 20
-
性能优化:
- 对于高频事件,考虑使用弱引用(
weakref.WeakSet
) - 批量通知时使用
@dataclass
封装事件数据
- 对于高频事件,考虑使用弱引用(
-
异常处理:
def safe_notify(self):for observer in self._observers:try:observer.update(self)except Exception as e:print(f"Observer failed: {e}")
-
与Python生态集成:
- 使用
PyPubSub
等现成库 - 结合
asyncio.Queue
实现生产者-消费者模式
- 使用
根据项目复杂度选择合适实现,简单场景用经典模式即可,分布式系统建议使用事件总线架构。
相关文章:
Python10天突击--Day 2: 实现观察者模式
以下是 Python 实现观察者模式的完整方案,包含同步/异步支持、类型注解、线程安全等特性: 1. 经典观察者模式实现 from abc import ABC, abstractmethod from typing import List, Anyclass Observer(ABC):"""观察者抽象基类""…...
【C#】.NET 8适配器模式实战:用C#实现高可用系统集成与接口桥接艺术
系统集成挑战与适配器模式的价值 当需要整合不同架构或API的系统时,接口兼容性问题往往成为拦路虎。**适配器设计模式(Adapter Pattern)**通过转换接口形态,完美解决这种不兼容性问题。本文将通过C# .NET 8实战演示适配器模式的基…...
方案精读:51页 财政数据信息资源目录数据标准存储及大数据资产化规划方案【附全文阅读】
该方案聚焦财政数据信息资源管理,适用于财政部门工作人员、数据管理与分析人员以及关注财政大数据应用的相关人士。 方案旨在构建财政数据资源目录,推动大数据在财政领域的应用与落地。整体规划上,以 “金财工程” 应用支撑平台为基础,建立省、市、县三级目录体系,遵循相关…...
【CVE-2024-7881】ARM CPU漏洞安全通告
安全之安全(security)博客目录导读 目录 一、概述 二、CVE详情 三、受影响产品 四、修复建议 五、致谢 六、版本历史 一、概述 基于Arm架构的部分CPU中发现一个安全问题:非特权上下文可能触发数据内存依赖型预取引擎(data memory-dependent pref…...
idea中提高编译速度研究
探索过程: 有三种情况: 第一种: idea中用eclipse编译器编译springboot项目,然后debug启动Application报错找不到类。 有待继续研究。 第二种: idea中用javac编译器编译springboot项目,重新构建用时&a…...
基于Yocto构建Ubuntu 24.04 ARM64 Qt工具链
以下是基于Yocto构建Ubuntu 24.04 ARM64 Qt工具链的完整方案,综合多篇技术文档整理而成: 一、系统环境准备 Ubuntu基础系统 建议选择Ubuntu 24.04 LTS服务器版或桌面版,需满足至少300GB磁盘空间和16GB内存 若使用ARM64架构主机可直接运…...
如何使用阿里云邮件推送免费群发邮件
最近一直想利用自己的阿里云账号开一个邮件推送服务,同时还可以用python来实现邮件群发,之前没有成功,今天又尝试了一次终于成功了,现将过程记录如下,也便于网友们少走弯路。 一、申请阿里云账号 阿里云注册可以用淘…...
利用 Genspark 和 AI IDE 一键配置 Java 开发环境
以下是以 CSDN 博客风格撰写的文章,基于你提到的“利用 Genspark 和 AI IDE 实现 Java 环境一键配置”的流程。文章结构清晰,内容详实,符合 CSDN 技术博客的常见风格,包含标题、简介、目录、正文、代码示例和总结。 利用 Genspark…...
【软考系统架构设计师】计算机网络知识点
1、 TCP/IP协议族 2、 数据链路层 解决三个基本问题: 封装成帧(在⼀段数据的前后分别添加首部和尾部) 透明传输(发送⽅:若数据部分出现帧开始符或者帧结束符,会在其前面加转义字符;接收⽅&…...
RFID技术概览
一、RFID技术定义 RFID(Radio Frequency Identification,射频识别) 是一种通过无线电信号识别目标对象并获取相关数据的非接触式自动识别技术。它利用射频信号的空间耦合(电感或电磁耦合)实现无物理接触的信息传递与目…...
中间件--ClickHouse-2--OLAP和OLTP
OLAP(Online Analytical Processing,联机分析处理)和OLTP(Online Transaction Processing,联机事务处理)是两种不同类型的数据处理系统,它们分别针对不同的应用场景和需求。 1、OLTP࿰…...
使用ADB工具分析Android应用崩溃原因:以闪动校园为例
使用adb工具分析模拟器或手机里app出错原因以闪动校园为例 使用ADB工具分析Android应用崩溃原因:以闪动校园为例 前言 应用崩溃是移动开发中常见的问题,尤其在复杂的Android生态系统中,找出崩溃原因可能十分棘手。本文将以流行的校园应用&q…...
C++双链表介绍及实现
双链表详解 1. 基本概念 双链表(双向链表) 是一种链式数据结构,每个节点包含两个指针: 前驱指针(pre):指向直接前驱节点后继指针(next):指向直接…...
推流265视频,网页如何支持显示265的webrtc
科技发展真快,以前在网页上(一般指谷歌浏览器),要显示265的视频流,都是很鸡肋的办法,要么转码,要么用很慢的hls,体验非常不好,而今谷歌官方最新的浏览器已经支持265的web…...
linux多线(进)程编程——(6)共享内存
前言 话说进程君的儿子经过父亲点播后就开始闭关,它想要开发出一种全新的传音神通。他想,如果两个人的大脑生长到了一起,那不是就可以直接知道对方在想什么了吗,这样不是可以避免通过语言传递照成的浪费吗? 下面就是它…...
Allpairs工具下载及操作流程(联动Deepseek)
目录 一、Allpairs工具下载及操作流程二、Allpairs工具使用易错问题 Allpairs工具产生的原因 Allpairs工具的产生源于软件测试领域对高效组合测试方法的迫切需求,其核心目标是解决传统测试方法在多因素组合场景下用例数量爆炸和测试效率低下的问题。 一、Allpairs工…...
wkhtmltopdf 实现批量对网页转为图片的好工具,快速实现大量卡片制作
欢迎来到涛涛聊AI 1、需求痛点 在学习当中经常遇到一些知识点,想和大家分享。但只有文本形式,很多人不愿意去阅读,也看不到重点。 如果自己去单独设计页面版式,又太浪费时间。那就想着有没有一种方法,可以把一个知识…...
case客户续保预测中用到的特征工程、回归分析和决策树分析的总结
文章目录 [toc]1. 回归分析概述1.1 基本概念1.2 与分类的区别 2. 常见回归算法2.1 线性回归2.2 决策树回归2.3 逻辑回归(Logistic Regression)2.3 其他算法补充:通俗版:决策树 vs 随机森林🌳 决策树:像玩「…...
最新如何在服务器中解决FFmpeg下载、安装和配置问题教程(Linux|Windows|Mac|Ubuntu)
最新如何在服务器中解决FFmpeg下载、安装和配置问题教程(Linux|Windows|Mac|Ubuntu) 摘要: FFmpeg是一个强大的开源工具,广泛应用于音视频处理,支持格式转换、视频剪辑、流媒体推送…...
vue webSocket
vue webSocket 一、vue2 webSocketwebSocket.jsvue2 二、vue3 webSocket tswebSocket.tsvue3 一、vue2 webSocket webSocket.js export default {data() {return {websock: null, // 建立的连接,存websocket实例化的lockReconnect: false, // 是否真正建立连接…...
Flask+Influxdb+grafna构建电脑性能实时监控系统
Influx下载地址,这里下载了以下版本influxdb-1.8.5_windows_amd64.zip 运行前需要先启动Influx数据库: 管理员方式运行cmd->F:->cd F:\influxdb\influxdb-1.8.5-1->influxd -config influxdb.conf,以influxdb.conf配置文件启动数…...
【golang/jsonrpc】go-ethereum中json rpc初步使用(websocket版本)
说在前面 操作系统:win11 wsl2go-ethereum版本:1.15.8 关于json-rpc 官网 server 定义方法type CalculatorService struct{}func (s *CalculatorService) Add(a, b int) int {return a b }func (s *CalculatorService) Div(a, b int) (int, error) {…...
【C++】 —— 笔试刷题day_15
刷题day_15,继续加油!!! 一、平方数 题目解析 题目给出一个数,让我们找到离它最近的一个平方数,然后输出即可。 算法思路 这道题总体来说还是非常简单的。 这里先来看一种思路,就是从1开始找…...
网站备案详解
当小型网站开发完毕具备上线条件后,需要完成域名映射与相关备案,才能合法运维。就像婴儿出生后,要开出生证明并去派出所上户口一样,备案后就是有“户口”的网站啦。具体效果见:CodingLife 一:服务器部署 …...
IPV6应用最后的钥匙:DDNS-GO 动态域名解析工具上手指南--家庭云计算专家
DDNS-GO作为一款轻量级开源工具,其IPv6功能通过自动化动态域名解析,有效解决了家庭网络因运营商动态分配IPv6地址导致的访问难题。用户无需复杂配置,即可将冗长的IPv6地址绑定至易记域名,并实时同步IP变化,显著提升了N…...
ubuntu 系统安装Mysql
安装 mysql sudo apt update sudo apt install mysql-server 启动服务 sudo systemctl start mysql 设置为开机自启 sudo systemctl enable mysql 查看服务状态 (看到类似“active (running)”的状态信息代表成功) sudo systemctl status mysql …...
Go:方法
方法声明 type point struct { X, Y float64 }// 普通函数 func Distance(p, q Point) float64 {return math.Hypot(q.x - p.x, q.y - p.Y) }// Point类型的方法 func (p Point) Distance(q Point) float64 {return math.Hypot(q.x - p.x, q.y - p.Y) }方法声明与普通函数声…...
十四种逻辑器件综合对比——《器件手册--逻辑器件》
目录 逻辑器件 简述 按功能分类 按工艺分类 按电平分类 特殊功能逻辑器件 应用领域 详尽阐述 1 逻辑门 一、基本概念 二、主要类型 三、实现方式 四、应用领域 2 反相器 工作原理 基本功能 主要应用 常见类型 特点 未来发展趋势 3 锁存器 基本概念 工作原理 主要类型…...
[网鼎杯 2022 青龙组]fakeshell
这个题,我们查壳之后是upx壳。 但是当我们用upxunpack解包的时候我们解不出来。 说明有人动过这个包。 然后我们打开010eider,修改他的魔改 将此处,我们改成UPX我们在解包就可以了。然后我重新使用upxunpack 之后我们成功得到未加密的文件…...
vivado + modelsim 仿真:Post-Synthesis Timing Simulation
Vivado 结合Modelsim 实现综合后仿真的一种方法 Post-Synthesis Timing Simulation 使用Vivado 生成仿真所需文件创建Modelsim工程参考文档 使用Vivado 生成仿真所需文件 Vivado simulation 中可勾选Generate simulation scripts only;勾选-sdf_anno; 在testbanch文件中例化gl…...
可能存在特殊情况,比如控制台显示有延迟、缓冲问题等影响了显示顺序。
从控制台输出看,正常逻辑应是先执行 System.out.println(" 未处理异常演示 "); 输出对应文本,再因 arr 为 null 访问 length 触发 NullPointerException 输出异常信息。可能存在特殊情况,比如控制台显示有延迟、缓冲问题等影响…...
使用Python建模量子隧穿
引言 量子隧穿是量子力学中的一个非常有趣且令人神往的现象。在经典物理学中,我们通常认为粒子必须克服一个势垒才能通过它。但是,在量子力学中,粒子有时可以“穿越”一个势垒,即使它的能量不足以克服这个势垒。这种现象被称为“量子隧穿”。今天,我们将通过 Python 来建…...
Python-控制语句
控制语句 控制语句和逻辑思维 控制语句:把语句组合成能完成一定功能的小逻辑模块分类:顺序、选择、循环“顺序结构”:代表“先执行a,再执行b”的逻辑“条件判断结构”:代表“如果…,则…”的逻辑“循环结构”:代表“如果…则重复执行…”的逻辑条件判断结构 选择结构通…...
库学习04——numpy
一、基本属性 二、 创建数组 (一)arange a np.arange(10,20,2) # [10,12,14,16,18] 只有一个参数n的话,默认是从0到n-1的一维数组。 (二)自定义reshape a np.arange(12).reshape((3,4)) [[ 0 1 2 3][ 4 5 …...
DeepSeek在应急救援领域的应用解决方案
DeepSeek在应急救援领域的应用解决方案 一、引言 1.1 应急救援领域现状 近年来,我国应急管理工作全面加强,取得了显著成效。然而,一系列重特大灾害事故暴露出我国应急管理体系存在诸多问题短板。例如,在责任落实、应急处突、法…...
【HCIP】GRE VPN实验笔记
一、实验拓扑 二、实验要求 1、按照图示配置IP地址 2、在R1和R3上配置默认路由使公网区域互通 3、在R1和R3上配置GRE VPN,使两端私网能够互相访问,Tunnel口IP地址如图 4、在R1和R3上配置RIPv2或者ospf或者静态,来传递两端私网路由 三、实…...
ChatRex: Taming Multimodal LLM for Joint Perception and Understanding 论文理解和翻译
一、TL;DR MLLM在感知方面存在不足(远远比不上专家模型),比如Qwen2-VL在coco上recall只有43.9%提出了ChatRex,旨在从模型设计和数据开发两个角度来填补这一感知能力的缺口ChatRex通过proposal边界框输入到LLM中将其转…...
10min速通Linux文件传输
实验环境 在Linux中传输文件需要借助网络以及sshd,我们可通过systemctl status sshd来查看sshd状态 若服务未开启我们可通过systemctl enable --now sshd来开启sshd服务 将/etc/ssh/sshd_config中的PermitRootLogin 状态修改为yes 传输文件 scp scp (Sec…...
CE、NCE、InfoNCE的演变过程
CE、NCE、InfoNCE的演变过程及数学推导和关系 在机器学习和深度学习中,交叉熵( C E CE CE)、噪声对比估计( N C E NCE NCE)和信息噪声对比估计( I n f o N C E InfoNCE InfoNCE)是三个紧密相关…...
在Vue项目的引入meting-js音乐播放器插件
开源项目:https://github.com/swzaaaaaaa/NBlog 1、开源项目中音乐播放插件的使用流程 步骤1:下载meting-js相关文件 在MetingJS官方仓库或其他可靠的CDN获取meting-js的JavaScript文件以及相关依赖(如APlayer的文件)。将它们下…...
rapidocr 2.0 在线demo来了
引言 今日北京大风,大家都窝在家里,自己也趁着周末更新了RapidOCR在线demo,适配rapidocr2.0系列。 rapidocr2.0支持4个推理引擎(ONNRuntime、OpenVino、PaddlePaddle和PyTorch),且整理了文本检测和文本识…...
Compose笔记(十五)--进度条
这一节了解一下Compose中的进度条,有两种类型的进度条可供使用,分别是线性进度条(LinearProgressIndicator)和圆形进度条(CircularProgressIndicator),每种进度条又可分为确定模式和不确定模式。…...
图谱可视化的海洋生物信息查询网站的设计与实现(springboot+ssm+vue)含文档
图谱可视化的海洋生物信息查询网站的设计与实现(springbootssmvue)含文档 该系统是一个图谱可视化的海洋生物信息查询网站,主要功能包括海洋动物、海洋植物、生物图鉴、保护生物和海洋生物分布等模块;用户可以通过系统首页访问这些模块;在海…...
目标追踪Hyperspectral Adapter for Object Tracking based on Hyperspectral Video
论文作者:Long Gao,Yunhe Zhang,Langkun Chen,Yan Jiang,Weiying Xie,Yunsong Li 作者单位:Xidian University;the University of Sheffield 论文链接:http://arxiv.org/abs/2503.22199v1 内容简介: 1)方向&#x…...
【HD-RK3576-PI】Linux制作deb包的方法
1.什么是deb包 ? DEB包是Debian及其衍生Linux发行版(如Ubuntu、Linux Mint等)使用的软件包格式。DEB包主要用于简化软件的安装、更新和卸载过程。它实际上是一个归档文件,通常包含了两个主要部分: 数据压缩包…...
FileInputStream 详解与记忆方法
FileInputStream 详解与记忆方法 一、FileInputStream 核心概念 FileInputStream 是 Java 中用于从文件读取原始字节的类,继承自 InputStream 抽象类。 1. 核心特点 特性说明继承关系InputStream → FileInputStream数据单位字节(8bit)用…...
什么是回表?哪些数据库存在回表?
目录 一、什么是回表1. 回表的核心流程2. 示例说明3. 回表的性能问题4. 总结 二、哪些数据库会有回表1. MySQL(InnoDB)2. Oracle3. 其他数据库(如 SQL Server、PostgreSQL)4. 总结 三、非聚集索引与聚集索引的区别及产生原因1. 聚…...
跨平台开发的挑战与突破:Java开发工具的探索与实践!
全文目录: 开篇语前言摘要概述源码解析代码实例代码解析代码解析1. import java.io.File;2. public class CrossPlatformFileManager3. public static void main(String[] args)4. String filePath "example.txt";5. File file new File(filePath);6. *…...
JDK的卸载与安装
卸载JDK 删除java的1安装目录 卸载JAVA_HOME 删除path下关于java的路径 java -version查看 安装JDK 百度搜索JDK,找到下载地址 同意协议 下载电脑对应版本 双击安装 记住安装路径 配置环境变量 我的电脑–>右键–>属性–>高级系统设置 环境变…...
CyclicBarrier 基本用法
CyclicBarrier 基本用法 简介 CyclicBarrier 是 Java 并发包(java.util.concurrent)中的一个同步辅助类。它允许一组线程相互等待,直到到达某个公共屏障点(common barrier point)。只有当所有参与的线程都到达屏障点…...