Python异步爬虫编程技巧:从入门到高级实战指南
Python异步爬虫编程技巧:从入门到高级实战指南 🚀
📚 目录
- 前言:为什么要学异步爬虫
- 异步编程基础概念
- 异步爬虫核心技术栈
- 入门实战:第一个异步爬虫
- 进阶技巧:并发控制与资源管理
- 高级实战:分布式异步爬虫架构
- 反爬虫对抗策略
- 性能优化与监控
- 常见问题与解决方案
- 最佳实践总结
前言:为什么要学异步爬虫? 🤔
在我近几年的爬虫开发经验中,见证了从最初的单线程顺序爬取,到多线程并发,再到如今的异步编程范式的演进。记得在做新闻数据采集的时候,传统的同步爬虫需要2小时才能完成10万个新闻的数据采集,而改用异步方案后,同样的任务仅需15分钟就能完成。
异步爬虫的核心优势在于:
- 高并发能力:单进程可处理数千个并发请求
- 资源利用率高:避免了线程切换开销
- 响应速度快:非阻塞IO让程序更高效
- 扩展性强:更容易构建大规模爬虫系统
异步编程基础概念 📖
什么是异步编程?
异步编程是一种编程范式,允许程序在等待某个操作完成时继续执行其他任务,而不是阻塞等待。在爬虫场景中,当我们发送HTTP请求时,不需要傻等服务器响应,而是可以继续发送其他请求。
核心概念解析
协程(Coroutine):可以暂停和恢复执行的函数,是异步编程的基本单元。
事件循环(Event Loop):负责调度和执行协程的核心机制。
awaitable对象:可以被await关键字等待的对象,包括协程、Task和Future。
异步爬虫核心技术栈 🛠️
在实际项目中,我通常会使用以下技术组合:
组件 | 推荐库 | 用途 |
---|---|---|
HTTP客户端 | aiohttp | 异步HTTP请求 |
HTML解析 | BeautifulSoup4 | 页面内容提取 |
并发控制 | asyncio.Semaphore | 限制并发数量 |
数据存储 | aiofiles | 异步文件操作 |
代理管理 | aiohttp-proxy | 代理池管理 |
入门实战:第一个异步爬虫 🎯
让我们从一个简单但实用的例子开始,爬取多个网页的标题:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import timeclass AsyncSpider:def __init__(self, max_concurrent=10):"""初始化异步爬虫:param max_concurrent: 最大并发数"""self.max_concurrent = max_concurrentself.session = Noneself.semaphore = asyncio.Semaphore(max_concurrent)async def __aenter__(self):"""异步上下文管理器入口"""# 创建aiohttp会话,设置连接池和超时参数connector = aiohttp.TCPConnector(limit=100, # 总连接池大小limit_per_host=20, # 单个host的连接数限制ttl_dns_cache=300, # DNS缓存时间)timeout = aiohttp.ClientTimeout(total=30, connect=10)self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""异步上下文管理器退出"""if self.session:await self.session.close()async def fetch_page(self, url):"""获取单个页面内容:param url: 目标URL:return: 页面标题和URL的元组"""async with self.semaphore: # 控制并发数try:async with self.session.get(url) as response:if response.status == 200:html = await response.text()soup = BeautifulSoup(html, 'html.parser')title = soup.find('title')title_text = title.get_text().strip() if title else "无标题"print(f"✅ 成功获取: {url} - {title_text}")return url, title_textelse:print(f"❌ HTTP错误 {response.status}: {url}")return url, Noneexcept asyncio.TimeoutError:print(f"⏰ 超时: {url}")return url, Noneexcept Exception as e:print(f"🚫 异常: {url} - {str(e)}")return url, Noneasync def crawl_urls(self, urls):"""批量爬取URL列表:param urls: URL列表:return: 结果列表"""print(f"🚀 开始爬取 {len(urls)} 个URL,最大并发数: {self.max_concurrent}")start_time = time.time()# 创建所有任务tasks = [self.fetch_page(url) for url in urls]# 并发执行所有任务results = await asyncio.gather(*tasks, return_exceptions=True)end_time = time.time()print(f"🎉 爬取完成,耗时: {end_time - start_time:.2f}秒")# 过滤掉异常结果valid_results = [r for r in results if isinstance(r, tuple) and r[1] is not None]print(f"📊 成功率: {len(valid_results)}/{len(urls)} ({len(valid_results)/len(urls)*100:.1f}%)")return valid_results# 使用示例
async def main():# 测试URL列表urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2', 'https://httpbin.org/delay/1','https://httpbin.org/status/200','https://httpbin.org/status/404','https://httpbin.org/json',]# 使用异步上下文管理器确保资源正确释放async with AsyncSpider(max_concurrent=5) as spider:results = await spider.crawl_urls(urls)# 输出结果print("\n📋 爬取结果:")for url, title in results:print(f" {url} -> {title}")if __name__ == "__main__":asyncio.run(main())
这个基础版本展示了异步爬虫的核心要素:
- 使用
aiohttp
进行异步HTTP请求 - 通过
Semaphore
控制并发数量 - 使用异步上下文管理器管理资源
- 异常处理和超时控制
进阶技巧:并发控制与资源管理 ⚡
在实际项目中,合理的并发控制和资源管理至关重要。以下是一个更高级的实现:
import asyncio
import aiohttp
import aiofiles
import json
import random
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict, Any
from urllib.parse import urljoin, urlparse
import logging# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)@dataclass
class CrawlResult:"""爬取结果数据类"""url: strstatus_code: inttitle: Optional[str] = Nonecontent_length: Optional[int] = Noneresponse_time: Optional[float] = Noneerror: Optional[str] = Noneclass AdvancedAsyncSpider:def __init__(self, config: Dict[str, Any]):"""高级异步爬虫初始化:param config: 配置字典,包含各种爬虫参数"""self.max_concurrent = config.get('max_concurrent', 10)self.retry_times = config.get('retry_times', 3)self.retry_delay = config.get('retry_delay', 1)self.request_delay = config.get('request_delay', 0)self.timeout = config.get('timeout', 30)self.session = Noneself.semaphore = asyncio.Semaphore(self.max_concurrent)self.results: List[CrawlResult] = []self.failed_urls: List[str] = []# 用户代理池self.user_agents = ['Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36','Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36','Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',]async def __aenter__(self):"""创建会话"""connector = aiohttp.TCPConnector(limit=200,limit_per_host=50,ttl_dns_cache=300,use_dns_cache=True,keepalive_timeout=60,enable_cleanup_closed=True)timeout = aiohttp.ClientTimeout(total=self.timeout)self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,trust_env=True # 支持代理环境变量)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""关闭会话"""if self.session:await self.session.close()# 等待连接完全关闭await asyncio.sleep(0.1)def get_random_headers(self) -> Dict[str, str]:"""生成随机请求头"""return {'User-Agent': random.choice(self.user_agents),'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8','Accept-Language': 'en-US,en;q=0.5','Accept-Encoding': 'gzip, deflate','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1',}async def fetch_with_retry(self, url: str) -> CrawlResult:"""带重试机制的页面获取:param url: 目标URL:return: 爬取结果对象"""async with self.semaphore:for attempt in range(self.retry_times + 1):start_time = asyncio.get_event_loop().time()try:# 添加随机延迟,避免被反爬虫检测if self.request_delay > 0:await asyncio.sleep(random.uniform(0, self.request_delay))headers = self.get_random_headers()async with self.session.get(url, headers=headers) as response:response_time = asyncio.get_event_loop().time() - start_timecontent = await response.text()# 解析标题title = Noneif 'text/html' in response.headers.get('content-type', ''):from bs4 import BeautifulSoupsoup = BeautifulSoup(content, 'html.parser')title_tag = soup.find('title')if title_tag:title = title_tag.get_text().strip()result = CrawlResult(url=url,status_code=response.status,title=title,content_length=len(content),response_time=response_time)logger.info(f"✅ 成功: {url} (状态码: {response.status}, 耗时: {response_time:.2f}s)")return resultexcept asyncio.TimeoutError:error_msg = f"超时 (尝试 {attempt + 1}/{self.retry_times + 1})"logger.warning(f"⏰ {url} - {error_msg}")except aiohttp.ClientError as e:error_msg = f"客户端错误: {str(e)} (尝试 {attempt + 1}/{self.retry_times + 1})"logger.warning(f"🚫 {url} - {error_msg}")except Exception as e:error_msg = f"未知错误: {str(e)} (尝试 {attempt + 1}/{self.retry_times + 1})"logger.error(f"💥 {url} - {error_msg}")# 如果不是最后一次尝试,等待后重试if attempt < self.retry_times:await asyncio.sleep(self.retry_delay * (2 ** attempt)) # 指数退避# 所有重试都失败了result = CrawlResult(url=url,status_code=0,error=f"重试 {self.retry_times} 次后仍然失败")self.failed_urls.append(url)logger.error(f"❌ 最终失败: {url}")return resultasync def crawl_batch(self, urls: List[str], callback=None) -> List[CrawlResult]:"""批量爬取URL:param urls: URL列表:param callback: 可选的回调函数,处理每个结果:return: 爬取结果列表"""logger.info(f"🚀 开始批量爬取 {len(urls)} 个URL")start_time = asyncio.get_event_loop().time()# 创建所有任务tasks = [self.fetch_with_retry(url) for url in urls]# 使用as_completed获取完成的任务,可以实时处理结果results = []completed = 0for coro in asyncio.as_completed(tasks):result = await cororesults.append(result)completed += 1# 调用回调函数if callback:await callback(result, completed, len(urls))# 显示进度if completed % 10 == 0 or completed == len(urls):logger.info(f"📊 进度: {completed}/{len(urls)} ({completed/len(urls)*100:.1f}%)")total_time = asyncio.get_event_loop().time() - start_timesuccess_count = len([r for r in results if r.status_code > 0])logger.info(f"🎉 批量爬取完成")logger.info(f"📈 总耗时: {total_time:.2f}秒")logger.info(f"📊 成功率: {success_count}/{len(urls)} ({success_count/len(urls)*100:.1f}%)")self.results.extend(results)return resultsasync def save_results(self, filename: str):"""异步保存结果到JSON文件"""data = {'total_urls': len(self.results),'successful': len([r for r in self.results if r.status_code > 0]),'failed': len(self.failed_urls),'results': [asdict(result) for result in self.results],'failed_urls': self.failed_urls}async with aiofiles.open(filename, 'w', encoding='utf-8') as f:await f.write(json.dumps(data, ensure_ascii=False, indent=2))logger.info(f"💾 结果已保存到: {filename}")# 使用示例
async def progress_callback(result: CrawlResult, completed: int, total: int):"""进度回调函数"""if result.status_code > 0:print(f"✅ [{completed}/{total}] {result.url} - {result.title}")else:print(f"❌ [{completed}/{total}] {result.url} - {result.error}")async def advanced_demo():"""高级爬虫演示"""config = {'max_concurrent': 20, # 并发数'retry_times': 2, # 重试次数'retry_delay': 1, # 重试延迟'request_delay': 0.5, # 请求间隔'timeout': 15, # 超时时间}# 测试URL列表urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/status/200','https://httpbin.org/status/404','https://httpbin.org/json','https://httpbin.org/xml','https://httpbin.org/html','https://httpbin.org/robots.txt',] * 3 # 重复3次,模拟更多URLasync with AdvancedAsyncSpider(config) as spider:# 批量爬取results = await spider.crawl_batch(urls, callback=progress_callback)# 保存结果await spider.save_results('crawl_results.json')# 统计信息successful = [r for r in results if r.status_code > 0]avg_response_time = sum(r.response_time or 0 for r in successful) / len(successful)print(f"\n📊 统计信息:")print(f" 成功: {len(successful)}")print(f" 失败: {len(spider.failed_urls)}")print(f" 平均响应时间: {avg_response_time:.2f}秒")if __name__ == "__main__":asyncio.run(advanced_demo())
这个进阶版本包含了:
- 完整的重试机制与指数退避
- 随机请求头和延迟,避免反爬虫检测
- 实时进度回调和详细日志
- 结构化的结果存储
- 更好的资源管理和异常处理
高级实战:分布式异步爬虫架构 🏗️
对于大规模爬虫项目,我们需要考虑分布式架构。以下是一个基于Redis的分布式爬虫实现:
import asyncio
import aiohttp
import aioredis
import json
import hashlib
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from urllib.parse import urljoin, urlparse
import logginglogger = logging.getLogger(__name__)@dataclass
class Task:"""爬取任务数据结构"""url: strpriority: int = 0retry_count: int = 0max_retries: int = 3metadata: Dict[str, Any] = Nonedef __post_init__(self):if self.metadata is None:self.metadata = {}class DistributedSpider:"""分布式异步爬虫"""def __init__(self, worker_id: str, redis_config: Dict[str, Any], spider_config: Dict[str, Any]):self.worker_id = worker_idself.redis_config = redis_configself.spider_config = spider_config# Redis连接self.redis = None# 队列名称self.task_queue = "spider:tasks"self.result_queue = "spider:results"self.failed_queue = "spider:failed"self.duplicate_set = "spider:duplicates"# HTTP会话self.session = Noneself.semaphore = asyncio.Semaphore(spider_config.get('max_concurrent', 10))# 统计信息self.stats = {'processed': 0,'success': 0,'failed': 0,'start_time': time.time()}async def __aenter__(self):"""初始化连接"""# 连接Redisself.redis = aioredis.from_url(f"redis://{self.redis_config['host']}:{self.redis_config['port']}",password=self.redis_config.get('password'),db=self.redis_config.get('db', 0),encoding='utf-8',decode_responses=True)# 创建HTTP会话connector = aiohttp.TCPConnector(limit=100, limit_per_host=20)timeout = aiohttp.ClientTimeout(total=30)self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)logger.info(f"🚀 Worker {self.worker_id} 已启动")return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):"""清理资源"""if self.session:await self.session.close()if self.redis:await self.redis.close()logger.info(f"🛑 Worker {self.worker_id} 已停止")def generate_task_id(self, url: str) -> str:"""生成任务唯一ID"""return hashlib.md5(url.encode()).hexdigest()async def add_task(self, task: Task) -> bool:"""添加任务到队列"""task_id = self.generate_task_id(task.url)# 检查是否重复is_duplicate = await self.redis.sismember(self.duplicate_set, task_id)if is_duplicate:logger.debug(f"⚠️ 重复任务: {task.url}")return False# 添加到去重集合await self.redis.sadd(self.duplicate_set, task_id)# 添加到任务队列(使用优先级队列)task_data = json.dumps(asdict(task))await self.redis.zadd(self.task_queue, {task_data: task.priority})logger.info(f"➕ 任务已添加: {task.url} (优先级: {task.priority})")return Trueasync def get_task(self) -> Optional[Task]:"""从队列获取任务"""# 使用BZPOPMAX获取最高优先级任务(阻塞式)result = await self.redis.bzpopmax(self.task_queue, timeout=5)if not result:return Nonetask_data = json.loads(result[1])return Task(**task_data)async def process_task(self, task: Task) -> Dict[str, Any]:"""处理单个任务"""async with self.semaphore:start_time = time.time()try:# 发送HTTP请求headers = {'User-Agent': 'DistributedSpider/1.0','Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'}async with self.session.get(task.url, headers=headers) as response:content = await response.text()response_time = time.time() - start_timeresult = {'task_id': self.generate_task_id(task.url),'url': task.url,'status_code': response.status,'content_length': len(content),'response_time': response_time,'worker_id': self.worker_id,'timestamp': time.time(),'content': content[:1000], # 只保存前1000字符'metadata': task.metadata}# 这里可以添加内容解析逻辑if response.status == 200:result['success'] = True# 解析页面,提取新的URL(示例)new_urls = await self.extract_urls(content, task.url)result['extracted_urls'] = new_urlselse:result['success'] = Falseresult['error'] = f"HTTP {response.status}"return resultexcept Exception as e:response_time = time.time() - start_timereturn {'task_id': self.generate_task_id(task.url),'url': task.url,'status_code': 0,'response_time': response_time,'worker_id': self.worker_id,'timestamp': time.time(),'success': False,'error': str(e),'metadata': task.metadata}async def extract_urls(self, content: str, base_url: str) -> List[str]:"""从页面内容中提取URL(示例实现)"""try:from bs4 import BeautifulSoupsoup = BeautifulSoup(content, 'html.parser')urls = []for link in soup.find_all('a', href=True):url = urljoin(base_url, link['href'])# 简单过滤if url.startswith('http') and len(urls) < 10:urls.append(url)return urlsexcept Exception:return []async def save_result(self, result: Dict[str, Any]):"""保存处理结果"""if result['success']:# 保存成功结果await self.redis.lpush(self.result_queue, json.dumps(result))self.stats['success'] += 1# 如果有提取的URL,添加为新任务if 'extracted_urls' in result:for url in result['extracted_urls']:new_task = Task(url=url, priority=0, metadata={'parent_url': result['url']})await self.add_task(new_task)else:# 处理失败的任务task_id = result['task_id']# 重试逻辑original_task = Task(url=result['url'],retry_count=result.get('retry_count', 0) + 1,metadata=result.get('metadata', {}))if original_task.retry_count <= original_task.max_retries:# 重新加入队列,降低优先级original_task.priority = -original_task.retry_countawait self.add_task(original_task)logger.info(f"🔄 重试任务: {original_task.url} (第{original_task.retry_count}次)")else:# 超过重试次数,保存到失败队列await self.redis.lpush(self.failed_queue, json.dumps(result))logger.error(f"💀 任务最终失败: {original_task.url}")self.stats['failed'] += 1async def run_worker(self, max_tasks: Optional[int] = None):"""运行工作进程"""logger.info(f"🏃 Worker {self.worker_id} 开始工作")processed = 0while True:if max_tasks and processed >= max_tasks:logger.info(f"🎯 达到最大任务数量: {max_tasks}")break# 获取任务task = await self.get_task()if not task:logger.debug("⏳ 暂无任务,等待中...")continue# 处理任务logger.info(f"🔧 处理任务: {task.url}")result = await self.process_task(task)# 保存结果await self.save_result(result)# 更新统计processed += 1self.stats['processed'] = processed# 定期输出统计信息if processed % 10 == 0:await self.print_stats()async def print_stats(self):"""打印统计信息"""elapsed = time.time() - self.stats['start_time']rate = self.stats['processed'] / elapsed if elapsed > 0 else 0logger.info(f"📊 Worker {self.worker_id} 统计:")logger.info(f" 已处理: {self.stats['processed']}")logger.info(f" 成功: {self.stats['success']}")logger.info(f" 失败: {self.stats['failed']}")logger.info(f" 速率: {rate:.2f} tasks/sec")logger.info(f" 运行时间: {elapsed:.2f}秒")# 使用示例
async def distributed_demo():"""分布式爬虫演示"""redis_config = {'host': 'localhost','port': 6379,'password': None,'db': 0}spider_config = {'max_concurrent': 5,'request_delay': 1,}worker_id = f"worker-{int(time.time())}"async with DistributedSpider(worker_id, redis_config, spider_config) as spider:# 添加一些初始任务initial_urls = ['https://httpbin.org/delay/1','https://httpbin.org/delay/2','https://httpbin.org/json','https://httpbin.org/html',]for url in initial_urls:task = Task(url=url, priority=10) # 高优先级await spider.add_task(task)# 运行工作进程await spider.run_worker(max_tasks=20)if __name__ == "__main__":logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')asyncio.run(distributed_demo())
这个分布式版本实现了:
- 基于Redis的任务队列和结果存储
- 优先级任务调度
- 自动去重机制
- 失败重试和降级处理
- 多工作进程协作
- 实时统计和监控
反爬虫对抗策略 🛡️
在实际爬虫开发中,反爬虫对抗是必须面对的挑战。以下是一些常用的策略:
import asyncio
import aiohttp
import random
import time
from typing import List, Dict, Optional
from urllib.parse import urlparse
import jsonclass AntiAntiSpider:"""反反爬虫工具类"""def __init__(self):# 用户代理池self.user_agents = ['Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36','Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0','Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:89.0) Gecko/20100101 Firefox/89.0',]# 代理池self.proxy_pool = [# 'http://proxy1:port',# 'http://proxy2:port',# 可以从代理服务商API动态获取]# 请求频率控制self.domain_delays = {} # 每个域名的延迟配置self.last_request_times = {} # 每个域名的最后请求时间def get_random_user_agent(self) -> str:"""获取随机User-Agent"""return random.choice(self.user_agents)def get_proxy(self) -> Optional[str]:"""获取代理(如果有的话)"""if self.proxy_pool:return random.choice(self.proxy_pool)return Noneasync def respect_robots_txt(self, session: aiohttp.ClientSession, url: str) -> bool:"""检查robots.txt(可选实现)"""try:parsed_url = urlparse(url)robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"async with session.get(robots_url) as response:if response.status == 200:robots_content = await response.text()# 这里可以实现robots.txt解析逻辑# 简化版本:检查是否包含Disallowreturn 'Disallow: /' not in robots_contentexcept:passreturn True # 默认允许async def rate_limit(self, url: str):"""频率限制"""domain = urlparse(url).netloc# 获取该域名的延迟配置(默认1-3秒)if domain not in self.domain_delays:self.domain_delays[domain] = random.uniform(1, 3)# 检查上次请求时间if domain in self.last_request_times:elapsed = time.time() - self.last_request_times[domain]required_delay = self.domain_delays[domain]if elapsed < required_delay:sleep_time = required_delay - elapsedawait asyncio.sleep(sleep_time)# 更新最后请求时间self.last_request_times[domain] = time.time()def get_headers(self, url: str, referer: Optional[str] = None) -> Dict[str, str]:"""生成请求头"""headers = {'User-Agent': self.get_random_user_agent(),'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8','Accept-Language': 'en-US,en;q=0.5','Accept-Encoding': 'gzip, deflate','Connection': 'keep-alive','Upgrade-Insecure-Requests': '1','Sec-Fetch-Dest': 'document','Sec-Fetch-Mode': 'navigate','Sec-Fetch-Site': 'none','Cache-Control': 'max-age=0',}# 添加Referer(如果提供)if referer:headers['Referer'] = referer# 根据域名添加特定头部domain = urlparse(url).netlocif 'github' in domain:headers['Accept'] = 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'return headersclass StealthSpider:"""隐蔽爬虫实现"""def __init__(self, max_concurrent: int = 3):self.max_concurrent = max_concurrentself.session = Noneself.semaphore = asyncio.Semaphore(max_concurrent)self.anti_anti = AntiAntiSpider()# 会话状态管理self.cookies = {}self.session_data = {}async def __aenter__(self):# 创建更真实的连接器配置connector = aiohttp.TCPConnector(limit=50,limit_per_host=10,ttl_dns_cache=300,use_dns_cache=True,keepalive_timeout=30,enable_cleanup_closed=True,# 模拟真实浏览器的连接行为family=0, # 支持IPv4和IPv6)# 设置合理的超时timeout = aiohttp.ClientTimeout(total=30,connect=10,sock_read=20)# 创建会话,支持自动重定向和cookieself.session = aiohttp.ClientSession(connector=connector,timeout=timeout,cookie_jar=aiohttp.CookieJar(),# 支持自动解压auto_decompress=True,# 信任环境变量中的代理设置trust_env=True)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):if self.session:await self.session.close()await asyncio.sleep(0.1) # 确保连接完全关闭async def fetch_with_stealth(self, url: str, referer: Optional[str] = None) -> Dict:"""隐蔽模式获取页面"""async with self.semaphore:# 频率限制await self.anti_anti.rate_limit(url)# 生成请求头headers = self.anti_anti.get_headers(url, referer)# 获取代理proxy = self.anti_anti.get_proxy()try:# 发送请求async with self.session.get(url, headers=headers, proxy=proxy,allow_redirects=True,max_redirects=5) as response:content = await response.text()return {'url': str(response.url),'status': response.status,'headers': dict(response.headers),'content': content,'cookies': {cookie.key: cookie.value for cookie in response.cookies.values()},'final_url': str(response.url),'redirects': len(response.history),'success': True}except Exception as e:return {'url': url,'status': 0,'error': str(e),'success': False}async def crawl_with_session_management(self, urls: List[str]) -> List[Dict]:"""带会话管理的爬取"""results = []for i, url in enumerate(urls):# 使用前一个URL作为referer(模拟用户行为)referer = urls[i-1] if i > 0 else Noneresult = await self.fetch_with_stealth(url, referer)results.append(result)# 模拟用户阅读时间if result['success']:reading_time = random.uniform(2, 8)print(f"📖 模拟阅读时间: {reading_time:.1f}秒")await asyncio.sleep(reading_time)return results# 验证码处理示例(需要OCR服务)
class CaptchaHandler:"""验证码处理器"""async def solve_captcha(self, captcha_image_url: str, session: aiohttp.ClientSession) -> Optional[str]:"""解决验证码(示例实现)实际项目中可能需要:1. 第三方OCR服务2. 机器学习模型3. 人工打码平台"""try:# 下载验证码图片async with session.get(captcha_image_url) as response:if response.status == 200:image_data = await response.read()# 这里应该调用OCR服务# 示例:使用第三方服务# result = await self.call_ocr_service(image_data)# return result# 临时返回None,表示无法处理return Noneexcept Exception as e:print(f"验证码处理失败: {e}")return Noneasync def handle_captcha_page(self, session: aiohttp.ClientSession, response_text: str, current_url: str):"""处理包含验证码的页面"""# 检测是否包含验证码if 'captcha' in response_text.lower() or '验证码' in response_text:print("🤖 检测到验证码页面")# 提取验证码图片URL(需要根据具体网站实现)# 这里只是示例from bs4 import BeautifulSoupsoup = BeautifulSoup(response_text, 'html.parser')captcha_img = soup.find('img', {'id': 'captcha'})if captcha_img:captcha_url = captcha_img.get('src')if captcha_url:# 解决验证码captcha_result = await self.solve_captcha(captcha_url, session)if captcha_result:print(f"✅ 验证码识别结果: {captcha_result}")return captcha_resultprint("❌ 验证码处理失败")return None# 使用示例
async def stealth_demo():"""隐蔽爬虫演示"""urls = ['https://httpbin.org/user-agent','https://httpbin.org/headers','https://httpbin.org/cookies','https://httpbin.org/redirect/2',]async with StealthSpider(max_concurrent=2) as spider:results = await spider.crawl_with_session_management(urls)for result in results:if result['success']:print(f"✅ {result['url']} (状态: {result['status']})")if result['redirects'] > 0:print(f" 重定向次数: {result['redirects']}")else:print(f"❌ {result['url']} - {result['error']}")if __name__ == "__main__":asyncio.run(stealth_demo())
性能优化与监控 📊
性能优化是异步爬虫的关键环节,以下是一个完整的监控和优化方案:
import asyncio
import aiohttp
import time
import psutil
import gc
from dataclasses import dataclass
from typing import Dict, List, Optional
import logging
from collections import deque, defaultdict
import json@dataclass
class PerformanceMetrics:"""性能指标数据类"""timestamp: floatrequests_per_second: floataverage_response_time: floatmemory_usage_mb: floatcpu_usage_percent: floatactive_connections: intqueue_size: intsuccess_rate: floatclass PerformanceMonitor:"""性能监控器"""def __init__(self, window_size: int = 60):self.window_size = window_size # 监控窗口大小(秒)self.metrics_history = deque(maxlen=window_size)self.request_times = deque()self.response_times = deque()self.success_count = 0self.total_count = 0self.start_time = time.time()# 连接池监控self.active_connections = 0self.queue_size = 0def record_request(self, response_time: float, success: bool):"""记录请求指标"""current_time = time.time()self.request_times.append(current_time)self.response_times.append(response_time)if success:self.success_count += 1self.total_count += 1# 清理过期数据cutoff_time = current_time - self.window_sizewhile self.request_times and self.request_times[0] < cutoff_time:self.request_times.popleft()self.response_times.popleft()def get_current_metrics(self) -> PerformanceMetrics:"""获取当前性能指标"""current_time = time.time()# 计算RPSrps = len(self.request_times) / self.window_size if self.request_times else 0# 计算平均响应时间avg_response_time = sum(self.response_times) / len(self.response_times) if self.response_times else 0# 系统资源使用memory_usage = psutil.Process().memory_info().rss / 1024 / 1024 # MBcpu_usage = psutil.Process().cpu_percent()# 成功率success_rate = self.success_count / self.total_count if self.total_count > 0 else 0metrics = PerformanceMetrics(timestamp=current_time,requests_per_second=rps,average_response_time=avg_response_time,memory_usage_mb=memory_usage,cpu_usage_percent=cpu_usage,active_connections=self.active_connections,queue_size=self.queue_size,success_rate=success_rate)self.metrics_history.append(metrics)return metricsdef print_stats(self):"""打印统计信息"""metrics = self.get_current_metrics()uptime = time.time() - self.start_timeprint(f"\n📊 性能监控报告 (运行时间: {uptime:.1f}s)")print(f" RPS: {metrics.requests_per_second:.2f}")print(f" 平均响应时间: {metrics.average_response_time:.2f}s")print(f" 内存使用: {metrics.memory_usage_mb:.1f}MB")print(f" CPU使用: {metrics.cpu_usage_percent:.1f}%")print(f" 活跃连接: {metrics.active_connections}")print(f" 队列大小: {metrics.queue_size}")print(f" 成功率: {metrics.success_rate:.1%}")print(f" 总请求数: {self.total_count}")class OptimizedAsyncSpider:"""优化版异步爬虫"""def __init__(self, config: Dict):self.config = configself.session = Noneself.semaphore = Noneself.monitor = PerformanceMonitor()# 连接池优化配置self.connector_config = {'limit': config.get('max_connections', 100),'limit_per_host': config.get('max_connections_per_host', 30),'ttl_dns_cache': config.get('dns_cache_ttl', 300),'use_dns_cache': True,'keepalive_timeout': config.get('keepalive_timeout', 60),'enable_cleanup_closed': True,# 优化TCP socket选项'socket_options': [(1, 6, 1), # TCP_NODELAY] if hasattr(1, '__index__') else None}# 自适应并发控制self.adaptive_concurrency = config.get('adaptive_concurrency', True)self.min_concurrent = config.get('min_concurrent', 5)self.max_concurrent = config.get('max_concurrent', 50)self.current_concurrent = self.min_concurrent# 请求池self.request_pool = asyncio.Queue(maxsize=config.get('request_queue_size', 1000))# 响应时间统计(用于自适应调整)self.response_time_window = deque(maxlen=100)async def __aenter__(self):# 创建优化的连接器connector = aiohttp.TCPConnector(**self.connector_config)# 设置超时timeout = aiohttp.ClientTimeout(total=self.config.get('total_timeout', 30),connect=self.config.get('connect_timeout', 10),sock_read=self.config.get('read_timeout', 20))# 创建会话self.session = aiohttp.ClientSession(connector=connector,timeout=timeout,# 启用压缩auto_decompress=True,# 设置最大响应大小read_bufsize=self.config.get('read_bufsize', 64 * 1024),)# 初始化信号量self.semaphore = asyncio.Semaphore(self.current_concurrent)return selfasync def __aexit__(self, exc_type, exc_val, exc_tb):if self.session:await self.session.close()# 强制垃圾回收gc.collect()async def adjust_concurrency(self):"""自适应并发调整"""if not self.adaptive_concurrency or len(self.response_time_window) < 10:return# 计算最近的平均响应时间recent_avg = sum(list(self.response_time_window)[-10:]) / 10overall_avg = sum(self.response_time_window) / len(self.response_time_window)# 如果最近响应时间明显增加,降低并发if recent_avg > overall_avg * 1.5 and self.current_concurrent > self.min_concurrent:self.current_concurrent = max(self.min_concurrent, self.current_concurrent - 2)print(f"📉 降低并发数至: {self.current_concurrent}")# 如果响应时间稳定且较快,增加并发elif recent_avg < overall_avg * 0.8 and self.current_concurrent < self.max_concurrent:self.current_concurrent = min(self.max_concurrent, self.current_concurrent + 1)print(f"📈 提高并发数至: {self.current_concurrent}")# 更新信号量self.semaphore = asyncio.Semaphore(self.current_concurrent)async def fetch_optimized(self, url: str) -> Dict:"""优化的请求方法"""async with self.semaphore:start_time = time.time()try:# 更新连接数self.monitor.active_connections += 1async with self.session.get(url) as response:content = await response.text()response_time = time.time() - start_time# 记录响应时间self.response_time_window.append(response_time)self.monitor.record_request(response_time, response.status == 200)return {'url': url,'status': response.status,'content': content,'response_time': response_time,'content_length': len(content),'success': response.status == 200}except Exception as e:response_time = time.time() - start_timeself.monitor.record_request(response_time, False)return {'url': url,'status': 0,'error': str(e),'response_time': response_time,'success': False}finally:self.monitor.active_connections -= 1async def batch_crawl_optimized(self, urls: List[str]) -> List[Dict]:"""优化的批量爬取"""print(f"🚀 开始优化爬取 {len(urls)} 个URL")# 分批处理,避免内存过载batch_size = self.config.get('batch_size', 100)all_results = []for i in range(0, len(urls), batch_size):batch_urls = urls[i:i + batch_size]print(f"📦 处理批次 {i//batch_size + 1}/{(len(urls)-1)//batch_size + 1}")# 创建任务tasks = [self.fetch_optimized(url) for url in batch_urls]# 执行任务batch_results = await asyncio.gather(*tasks, return_exceptions=True)# 过滤异常valid_results = [r for r in batch_results if isinstance(r, dict)]all_results.extend(valid_results)# 自适应调整并发await self.adjust_concurrency()# 打印监控信息if i % (batch_size * 2) == 0: # 每2个批次打印一次self.monitor.print_stats()# 批次间短暂休息,避免过载if i + batch_size < len(urls):await asyncio.sleep(0.1)return all_resultsasync def memory_cleanup(self):"""内存清理"""# 手动触发垃圾回收gc.collect()# 清理过期的监控数据current_time = time.time()cutoff_time = current_time - 300 # 保留5分钟的数据while (self.monitor.metrics_history and self.monitor.metrics_history[0].timestamp < cutoff_time):self.monitor.metrics_history.popleft()# 使用示例
async def performance_demo():"""性能优化演示"""config = {'max_connections': 50,'max_connections_per_host': 15,'max_concurrent': 20,'min_concurrent': 5,'adaptive_concurrency': True,'batch_size': 50,'total_timeout': 15,'connect_timeout': 5,}# 生成大量测试URLtest_urls = []for i in range(200):delay = i % 5 + 1 # 1-5秒延迟test_urls.append(f'https://httpbin.org/delay/{delay}')async with OptimizedAsyncSpider(config) as spider:# 启动监控任务monitor_task = asyncio.create_task(periodic_monitoring(spider.monitor))try:# 执行爬取results = await spider.batch_crawl_optimized(test_urls)# 最终统计print(f"\n🎉 爬取完成!")print(f"📊 总URL数: {len(test_urls)}")print(f"📊 成功数: {len([r for r in results if r.get('success')])}")print(f"📊 失败数: {len([r for r in results if not r.get('success')])}")spider.monitor.print_stats()finally:monitor_task.cancel()async def periodic_monitoring(monitor: PerformanceMonitor):"""定期监控任务"""while True:await asyncio.sleep(10) # 每10秒监控一次try:metrics = monitor.get_current_metrics()# 检查异常情况if metrics.memory_usage_mb > 500: # 内存超过500MBprint("⚠️ 内存使用过高!")if metrics.requests_per_second < 1 and monitor.total_count > 0:print("⚠️ RPS过低,可能存在瓶颈!")if metrics.success_rate < 0.8 and monitor.total_count > 10:print("⚠️ 成功率过低!")except Exception as e:print(f"监控异常: {e}")if __name__ == "__main__":asyncio.run(performance_demo())
常见问题与解决方案 ❓
1. 内存泄露问题
问题:长时间运行后内存持续增长
解决方案:
# 正确的资源管理
async with aiohttp.ClientSession() as session:# 使用完自动关闭pass# 定期垃圾回收
import gc
gc.collect()# 限制并发数量
semaphore = asyncio.Semaphore(10)
2. 连接池耗尽
问题:大量并发请求导致连接池耗尽
解决方案:
connector = aiohttp.TCPConnector(limit=100, # 增加连接池大小limit_per_host=20, # 单host连接数限制ttl_dns_cache=300, # DNS缓存enable_cleanup_closed=True # 自动清理关闭的连接
)
3. 超时处理不当
问题:请求超时导致任务堆积
解决方案:
timeout = aiohttp.ClientTimeout(total=30, # 总超时connect=10, # 连接超时sock_read=20 # 读取超时
)
4. 异常传播
问题:单个任务异常影响整体爬取
解决方案:
# 使用return_exceptions=True
results = await asyncio.gather(*tasks, return_exceptions=True)# 过滤异常结果
valid_results = [r for r in results if not isinstance(r, Exception)]
最佳实践总结 🏆
经过近几年的异步爬虫开发经验,我总结出以下最佳实践:
1. 架构设计原则
- 单一职责:每个组件只负责特定功能
- 可扩展性:支持水平扩展和配置调整
- 容错性:优雅处理各种异常情况
- 监控性:完整的性能监控和日志记录
2. 性能优化要点
- 合理的并发数:根据目标网站和网络环境调整
- 连接复用:使用连接池减少握手开销
- 内存管理:及时清理资源,避免内存泄露
- 批处理:分批处理大量URL,避免过载
3. 反爬虫对抗策略
- 请求头伪装:模拟真实浏览器行为
- 频率控制:合理的请求间隔
- IP轮换:使用代理池分散请求
- 会话管理:维护登录状态和cookie
4. 错误处理机制
- 重试策略:指数退避重试
- 降级处理:失败任务的备用方案
- 监控告警:及时发现和处理问题
- 日志记录:详细的操作日志
5. 数据质量保证
- 去重机制:避免重复抓取
- 数据验证:确保抓取数据的完整性
- 增量更新:只抓取变化的数据
- 备份恢复:重要数据的备份策略
通过遵循这些最佳实践,你可以构建出高效、稳定、可维护的异步爬虫系统。记住,爬虫开发不仅仅是技术问题,还需要考虑法律合规、网站负载、数据质量等多个方面。
异步爬虫是一门实践性很强的技术,建议在实际项目中不断优化和完善。随着Python异步生态的不断发展,相信会有更多优秀的工具和库出现,让我们的爬虫开发更加高效和便捷。
希望这份指南能够帮助你在异步爬虫的道路上走得更远!🚀
相关文章:
Python异步爬虫编程技巧:从入门到高级实战指南
Python异步爬虫编程技巧:从入门到高级实战指南 🚀 📚 目录 前言:为什么要学异步爬虫异步编程基础概念异步爬虫核心技术栈入门实战:第一个异步爬虫进阶技巧:并发控制与资源管理高级实战:分布式…...
Redis哨兵模式深度解析与实战部署
Redis哨兵模式深度解析与实战部署 文章目录 Redis哨兵模式深度解析与实战部署一、Redis哨兵模式理论架构详解1.1 哨兵模式的核心架构组成基础架构拓扑图 1.2 哨兵节点的核心功能模块1.2.1 监控模块(Monitoring)1.2.2 决策模块(Decision Makin…...
【软考高级系统架构论文】论边缘计算及其应用
论文真题 边缘计算是在靠近物或数据源头的网络边缘侧,融合网络、计算、存储、应用核心能力的分布式开放平台(架构),就近提供边缘智能服务。边缘计算与云计算各有所长,云计算擅长全局性、非实时、长周期的大数据处理与分析,能够在长周期维护、业务决策支撑等领域发挥优势;…...
触摸屏(典型 I2C + Input 子系统设备)从设备树解析到触摸事件上报
触摸屏(典型 I2C Input 子系统设备)从设备树解析到触摸事件上报 以下是架构图,对触摸屏(典型I2C Input子系统设备)从设备树解析到触摸事件上报的全流程详细拆解,包含文字讲解和配套流程图: 注…...
Java中==与equals()方法的深度解析
作为Java后端开发者,我们经常会遇到需要比较两个对象是否相等的情况。在Java中,运算符和equals()方法都可以用于比较,但它们之间存在着本质的区别。 1. 运算符 是一个比较运算符,它的行为取决于比较的类型: 1.1 比较…...
qt常用控件--02
文章目录 qt常用控件--02toolTip属性focusPolicy属性styleSheet属性补充知识点按钮类控件QPushButton 结语 很高兴和大家见面,给生活加点impetus!!开启今天的编程之路!! 今天我们进一步c11中常见的新增表达 作者&…...
AI-Sphere-Butler之如何将豆包桌面版对接到AI全能管家~新玩法(一)
环境: AI-Sphere-Butler VBCABLE2.1.58 Win10专业版 豆包桌面版1.47.4 ubuntu22.04 英伟达4070ti 12G python3.10 问题描述: AI-Sphere-Butler之如何将豆包桌面版对接到AI全能管家~新玩法(一) 聊天视频: AI真…...
为什么android要使用Binder机制
1.linux中大多数标准 IPC 场景(如管道、消息队列、ioctl 等)的进程间通信机制 ------------------ ------------------ ------------------ | 用户进程 A | | 内核空间 | | 用户进程 B | | (User Spa…...
Apache SeaTunnel Flink引擎执行流程源码分析
目录 1. 任务启动入口 2. 任务执行命令类:FlinkTaskExecuteCommand 3. FlinkExecution的创建与初始化 3.1 核心组件初始化 3.2 关键对象说明 4. 任务执行:FlinkExecution.execute() 5. Source处理流程 5.1 插件初始化 5.2 数据流生成 6. Transform处理流程 6.1 插…...
XML读取和设置例子
在Qt C中,可以使用Qt的 QDomDocument类来读取、更新和保存XML文件。这个类提供了对XML文档的强大操作能力,支持通过DOM(文档对象模型)对XML进行读取、修改、添加和删除节点等操作。 下面是一个详细的例子,演示如何在Qt…...
数据标注师学习内容
目录 文本标注词性标注实体标注 图像标注语音标注 文本标注 词性标注 第一篇 第二篇 实体标注 点击这里 关系标注 事件标注 意图标注 关键词标注 分类标注 问答标注 对话标注 图像标注 拉框标注 关键点标注 2D标注 3D标注 线标注 目标跟踪标注 OCR标注 图像分类标注 语音…...
如何实现财务自由
如果有人告诉你,普通人也可以在5到10年内,而不是40到50年后实现财务自由、彻底退休,你会不会觉得对方在开玩笑?但这并非天方夜谭,《百万富翁快车道》的作者MJ德马科就是成功案例。他曾和多数人一样做底层工作ÿ…...
一些想法。。。
1.for里面的局部变量这种还是在for里面定义比较好 比如 for(int i 0;i<n;i){ int num; cin>>num; } 实不相瞒,有一次直接cin了i怎么都没看出来哪里错了。。。 2.关于long long 如果发现中间结果大约是10^9,就要考虑int 溢出 即用 long …...
基于分布式部分可观测马尔可夫决策过程与联邦强化学习的低空经济智能协同决策框架
基于分布式部分可观测马尔可夫决策过程与联邦强化学习的低空经济智能协同决策框架 摘要: 低空经济作为新兴战略产业,其核心场景(如无人机物流、城市空中交通、低空监测)普遍面临环境动态性强、个体观测受限、数据隐私敏感及多智能体协同复杂等挑战。本文创新性地提出一种深…...
github常用插件
一,文档辅助阅读系列:自动化wiki处理 1,deepwiki https://deepwiki.com/ 将我们看不懂的官方code文档转换为wiki,更加便于理解。 其实能够翻阅的仓库很有限,比如说: 但是有很多仓库并没有indexÿ…...
python3字典
1 字典简介 字典是一种可变容器模型,且可存储任意类型对象。字典每个基本元素都包括两个部分: 键(key)和键对应的值(value) 每个键值 key>value 对用冒号: 分割,每个对之间用逗号(,)分割&am…...
华为云 Flexus+DeepSeek 征文|增值税发票智能提取小工具:基于大模型的自动化信息解析实践
华为云 FlexusDeepSeek 征文|增值税发票智能提取小工具:基于大模型的自动化信息解析实践 前言背景 企业财务处理中,增值税发票信息手动提取存在效率低、易出错等痛点,华为云 Flexus 弹性算力联合 DeepSeek 大模型,通过…...
[特殊字符] OpenCV opencv_world 模块作用及编译实践完整指南
📌 什么是 opencv_world 模块? opencv_world 是 OpenCV 官方提供的一个 大型集成动态库。它将 OpenCV 所有启用的模块(例如 core, imgproc, highgui, videoio, dnn, photo 等)打包到一个单一的动态库文件(如 Linux 的…...
目标检测之YOLOv5到YOLOv11——从架构设计和损失函数的变化分析
YOLO(You Only Look Once)系列作为实时目标检测领域的标杆性框架,自2016年YOLOv1问世以来,已历经十余年迭代。本文将聚焦YOLOv5(2020年发布)到YOLOv11(2024年前后)的核心技术演进&am…...
Java的SpringAI+Deepseek大模型实战【二】
文章目录 背景交互方式1、等待式问答2、流式问答 设置角色环绕增强1)修改controller2)修改配置日志级别 处理跨域 背景 上篇【Java的SpringAIDeepseek大模型实战【一】】搭建起浏览器交互的环境,如何进行流式问答,控制台打印日志…...
OpenCV——霍夫变换
霍夫变换 一、霍夫变换原理二、霍夫线检测2.1、标准霍夫变换2.2、概率霍夫变换 三、霍夫圆检测3.1、霍夫圆检测的原理3.2、霍夫梯度法 一、霍夫变换原理 霍夫变换(Hough TRansform)是从图像中识别几何图形的基本方法,由Paul Hough于1962年提…...
线程池 JMM 内存模型
线程池 & JMM 内存模型 文章目录 线程池 & JMM 内存模型线程池线程池的创建ThreadPoolExecutor 七大参数饱和策略ExecutorService 提交线程任务对象执行的方法:ExecutorService 关闭线程池的方法:线程池最大线程数如何确定? volatile…...
PillarNet: Real-Time and High-PerformancePillar-based 3D Object Detection
ECCV 2022 paper:[2205.07403] PillarNet: Real-Time and High-Performance Pillar-based 3D Object Detection code:https://github.com/VISION-SJTU/PillarNet-LTS 纯点云基于pillar3D检测模型 网络比较 SECOND 基于vo…...
配电抢修场景案例
以配电抢修场景为例来展示关键业务活动。配电抢修愿景分成业务逻辑、业务活动、业务特征、技术支撑、KPI五个层次,分别从策略、执行、评价、资源、协同5个方面描述配电抢修愿景的关键业务活动。...
H5新增属性
✅ 一、表单相关新增属性(Form Attributes) 这些属性增强了表单功能,提升用户体验和前端验证能力。 1. placeholder 描述:在输入框为空时显示提示文本。示例: <input type"text" placeholder"请输…...
C# Task 模式实现 Demo(含运行、暂停、结束状态)
下面是一个完整的 C# Task 实现示例,包含运行(Running)、暂停(Paused)和结束(Completed)状态控制: 1. 基本实现(使用 CancellationToken 控制) using System; using System.Threading; using System.Threading.Tasks;public cla…...
Docker健康检查
目录 1.命令 2.验证 1.命令 docker run -itd --name nginx -v data:/etc/nginx/ -v log:/var/log/ -p 8080:80 \ --health-cmd"curl http://127.0.0.1:80" \ --health-interval30s \ --health-timeout5s \ --health-retries3 \ --health-start-period18s \ nginx:…...
Linux笔记---线程控制
1. 线程创建:pthread_create() pthread_create() 是 POSIX 线程库(pthread)中用于创建新线程的函数。调用该函数后系统就会启动一个与主线程并发的线程,并使其跳转到入口函数处执行。 #include <pthread.h>int pthread_cr…...
【AI论文】扩展大型语言模型(LLM)智能体在测试时的计算量
摘要:扩展测试时的计算量在提升大型语言模型(LLMs)的推理能力方面已展现出显著成效。在本研究中,我们首次系统地探索了将测试时扩展方法应用于语言智能体,并研究了该方法在多大程度上能提高其有效性。具体而言…...
Spring--IOC容器的一些扩展属性
一、BeanFactoryPostProcessor和BeanPostProcessor BeanFactoryPostProcessor的作用是在实例化前修改BeanDefinition的属性 BeanPostProcessor的作用是在bean完成创建实例、填充属性之后,初始化阶段的前后都会对bean进行操作,使用postProcessBeforeIni…...
WebClient 功能介绍,使用场景,完整使用示例演示
WebClient 功能介绍 WebClient 是 Spring 5 中引入的响应式 HTTP 客户端,用于替代已弃用的 RestTemplate,专为异步非阻塞编程设计,基于 Reactor 框架实现。其核心功能包括: 异步与非阻塞 通过 Mono 和 Flux 处理请求与响应&#…...
[Java 基础]ArrayList
ArrayList 类是一个可以动态修改的数组,与普通数组的区别就是它是没有固定大小的限制。 ArrayList 的示意可以看 VCR:https://visualgo.net/en/array 创建 ArrayList 对象 final ArrayList<String> strings new ArrayList<>();这里创建 …...
用无人机和AI守护高原净土:高海拔自然保护区的垃圾检测新方法
这篇题为《Automatic Detection of Scattered Garbage Regions Using Small Unmanned Aerial Vehicle Low-Altitude Remote Sensing Images for High-Altitude Natural Reserve Environmental Protection》的论文,发表于 Environmental Science & Technology&am…...
《Redis高并发优化策略与规范清单:从开发到运维的全流程指南》
Redis高并发优化策略与规范清单:从开发到运维的全流程指南 在互联网应用的后端架构中,Redis凭借其高性能、高并发的特性,成为缓存和数据存储的首选方案。无论是电商抢购、社交平台的点赞计数,还是在线旅游平台的实时数据查询&…...
Linux基本指令篇 —— man指令
man命令是Linux系统中最重要的命令之一,它是"manual"(手册)的缩写,用于查看Linux系统中命令、函数、配置文件等的详细说明文档。man命令是Linux系统管理员和开发者的必备工具,熟练掌握man命令可以大大提高工…...
Spring Boot使用MCP服务器
1、JDK版本17 2、pom文件 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apac…...
学习Linux进程冻结技术
原文:蜗窝科技Linux进程冻结技术 功耗中经常需要用到,但是linux这块了解甚少,看到这个文章还蛮适合我阅读的 1 什么是进程冻结 进程冻结技术(freezing of tasks)是指在系统hibernate或者suspend的时候,将…...
Docker基本概念——AI教你学Docker
1.1 Docker 概念详解 1. Docker 是什么? Docker 是一个开源的应用容器引擎,它让开发者可以将应用及其依赖打包到一个可移植的容器(Container)中,并在任何支持 Docker 的 Linux、Windows 或 macOS 系统上运行。这样做…...
第十六届蓝桥杯C/C++程序设计研究生组国赛 国二
应该是最后一次参加蓝桥杯比赛了,很遗憾,还是没有拿到国一。 大二第一次参加蓝桥杯,印象最深刻的是居然不知道1s是1000ms,花了很多时间在这题,后面节奏都乱了,抗压能力也不行,身体也不适。最后…...
Python 数据分析与可视化 Day 5 - 数据可视化入门(Matplotlib Seaborn)
🎯 今日目标 掌握 Matplotlib 的基本绘图方法(折线图、柱状图、饼图)掌握 Seaborn 的高级绘图方法(分类图、分布图、箱线图)熟悉图像美化(标题、标签、颜色、风格)完成一组学生成绩数据的可视化…...
WebRTC(八):SDP
SDP 概念 SDP 是一种描述多媒体通信会话的文本格式(基于 MIME,RFC 4566)。本身 不传输数据,仅用于在会话建立阶段传递信息。常与 SIP(VoIP)、RTSP、WebRTC 等协议配合使用。 用途 描述媒体类型…...
《哈希表》K倍区间(解题报告)
文章目录 零、题目描述一、算法概述二、算法思路三、代码实现四、算法解释五、复杂度分析 零、题目描述 题目链接:K倍区间 一、算法概述 计算子数组和能被k整除的子数组数量的算法。通过前缀和与哈希表的结合,高效地统计满足条件的子数组。 需要注…...
牛津大学开源视频中的开放世界目标计数!
视频中的开放世界目标计数 GitHub PaPer Niki Amini-Naieni nikianrobots.ox.ac.uk Andrew Zisserman azrobots.ox.ac.uk 视觉几何组(VGG),牛津大学,英国 图 1:视频中的目标计数:给定顶行的视频&#…...
1.2、CAN总线帧格式
1、帧类型 2、帧类型介绍 (1)数据帧 扩展格式是为了扩展ID,ID号每4位一个字节(11位最大ID号为0x7FF) (2)遥控帧 遥控帧由于没有Data,所以DLC可能没有意义,可给任意值&am…...
DeepSeek今天喝什么随机奶茶推荐器
用DeepSeek生成了一个随机奶茶推荐器-今天喝什么,效果非常棒!UI界面美观。 提示词prompt如下 用html5帮我生成一个今天喝什么的网页 点击按钮随机生成奶茶品牌等,要包括中国常见的知名的奶茶品牌 如果不满意还可以随机再次生成 ui界面要好看 …...
词编码模型怎么进行训练的,输出输入是什么,标签是什么
词编码模型怎么进行训练的,输出输入是什么,标签是什么 词编码模型的训练本质是通过数据驱动的方式,将离散的文本符号映射为连续的语义向量。 一、训练机制:从符号到向量的映射逻辑 1. 核心目标 将单词/子词(Token)映射为低维向量,使语义相关的词在向量空间中距离更近…...
LSTM、GRU 与 Transformer网络模型参数计算
参数计算公式对比 模型类型参数计算公式关键组成部分LSTM4 (embed_dim hidden_size hidden_size hidden_size)4个门控结构GRU3 (embed_dim hidden_size hidden_size hidden_size)3个门控结构Transformer (Encoder)12 embed_dim 9 embed_dim ff_dim 14 embed_dim…...
nnv开源神经网络验证软件工具
一、软件介绍 文末提供程序和源码下载 用于神经网络验证的 Matlab 工具箱,该工具箱实现了可访问性方法,用于分析自主信息物理系统 (CPS) 领域中带有神经网络控制器的神经网络和控制系统。 二、相关工具和软件 该工具箱利用神经…...
SQLite3 在嵌入式系统中的应用指南
SQLite3 在嵌入式系统中的应用指南 一、嵌入式系统中 SQLite3 的优势 SQLite3 是嵌入式系统的理想数据库解决方案,具有以下核心优势: 特性嵌入式系统价值典型指标轻量级适合资源受限环境库大小:500-700KB零配置无需数据库管理员开箱即用无…...
原生微信小程序网络请求与上传接口封装实战指南
本文基于微信小程序原生 API,封装 request 和 uploadFile 接口,最终实现统一请求管理、请求拦截、错误处理等能力。 📦 一、为什么要封装网络请求? 微信小程序提供了 wx.request 和 wx.uploadFile 原生 API,但直接使用…...