import time from concurrent.futures import ProcessPoolExecutor, TimeoutError from pydantic import BaseModel import multiprocessing as mp import os import signalclass SharedData(BaseModel):value: int = 0def worker(shared_value, data_dict):# 真实场景中的任务,没有循环检查stop_eventfor i in range(30):data_dict['value'] = ishared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')time.sleep(1)return data_dict['value'] + shared_value.valuedef init_worker():"""设置子进程的信号处理"""signal.signal(signal.SIGTERM, lambda sig, frame: os._exit(0))if __name__ == '__main__':manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})with ProcessPoolExecutor(initializer=init_worker) as executor:future = executor.submit(worker, shared_value, data_dict)try:# 主进程监控5秒for count in range(5):print(f'\n第 {count + 1} 次检查 - 状态: {"运行中" if future.running() else "完成" if future.done() else "等待"}')print(f'主进程 shared_value:{shared_value.value} data_dict:{data_dict["value"]}')time.sleep(1)# 检查是否完成if not future.done():print("\n任务超时,强制终止子进程...")# 获取子进程PID并发送终止信号for pid, process in executor._processes.items():if process.is_alive():os.kill(pid, signal.SIGTERM)raise TimeoutError("任务超时")result = future.result()print(f"Result: {result}")except TimeoutError:print("子进程已被强制终止")future.cancel()finally:executor.shutdown(wait=True)print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")
import time from concurrent.futures import ProcessPoolExecutor, TimeoutError from pydantic import BaseModel import multiprocessing as mp import signal import osclass SharedData(BaseModel):value: int = 0def worker(shared_value, data_dict, stop_event):print('子进程PID', os.getpid())i = 0while i < 30 and not stop_event.is_set():# data_dict['value'] = i# shared_value.value = i * 10print(f'子进程 data_dict:{data_dict["value"]} shared_value:{shared_value.value}')i += 1time.sleep(1)return data_dict['value'] + shared_value.valueif __name__ == '__main__':print('主进程PID', os.getpid())manager = mp.Manager()shared_value = manager.Value('i', 0)data_dict = manager.dict({'value': 0})stop_event = manager.Event()with ProcessPoolExecutor() as executor:future = executor.submit(worker, shared_value, data_dict, stop_event)try:# 主进程监控5秒for _ in range(5):print("future.running() =>", future.running())print(f'主进程 shared_value:{shared_value.value} data_dict:{data_dict["value"]}')time.sleep(1)# 检查子进程是否完成result = future.result(timeout=0.1)print(f"Result: {result}")except TimeoutError:print("任务超时,通知子进程停止...")stop_event.set() # 通知子进程优雅停止try:# 给子进程一些时间进行清理result = future.result(timeout=0.1)print(f"子进程已优雅停止,Result: {result}")except TimeoutError:print("子进程未及时响应,强制取消...")future.cancel()# 正常关闭执行器,等待所有进程完成executor.shutdown(wait=True)print("future.running() =>", future.running())print(f"最终值 - shared: {shared_value.value}, dict: {data_dict['value']}")