当前位置: 首页 > news >正文

VectorBT:使用PyTorch+Transformer训练和回测股票模型 进阶五

VectorBT:使用PyTorch+Transformer训练和回测股票模型 进阶五

本方案基于PyTorch框架与Transformer模型,结合VectorBT回测引擎构建多股票量化交易系统,采用滑动窗口技术构建时序特征,通过自注意力机制捕捉市场规律预测收益率,集成双EMA交叉策略动态生成交易信号,利用Optuna优化模型超参与策略参数,支持增量训练更新特征分布,结合波动率调整非线性仓位,并通过分组标准化与股票分组计算严格规避数据泄漏风险,实现端到端的量化策略研发闭环。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。

Backtest Strategy

本文是进阶指南🚀,推荐先阅读了解基础知识‼️

  • VectorBT:Python量化交易策略开发与回测评估详解 🔥
  • VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶一 🔥
  • VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶二 🔥
  • VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶三 🔥
  • VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶四 🔥

1. 方案概述

本方案基于PyTorch框架与Transformer模型,结合VectorBT回测引擎构建多股票量化交易系统,采用滑动窗口技术构建时序特征,通过自注意力机制捕捉市场规律预测收益率,集成双EMA交叉策略动态生成交易信号,利用Optuna优化模型超参与策略参数,支持增量训练更新特征分布,结合波动率调整非线性仓位,并通过分组标准化与股票分组计算严格规避数据泄漏风险,实现端到端的量化策略研发闭环。

1.1 核心原理

  1. 多时序特征编码:通过滑动窗口技术构建三维特征矩阵(样本×时间步×特征)
  2. Transformer建模:利用自注意力机制捕捉时序依赖关系
  3. 动态仓位管理:结合波动率调整仓位大小(使用tanh函数压缩)
  4. 增量学习机制:支持在线更新模型参数和特征分布

1.2 关键特点

  • 多股票联合训练:共享特征表示,提升模型泛化能力
  • 非线性仓位控制:position_size = tanh(|return|/volatility)
  • 参数自动优化:使用Optuna进行双重优化(模型超参+策略参数)
  • 特征鲁棒处理:分组标准化(趋势类用RobustScaler,成交量用StandardScaler)

1.3 注意事项

  • 数据泄漏风险:严格按股票分组计算收益率和技术指标
  • 设备兼容性:支持CUDA/MPS/CPU多设备自动切换
  • 内存管理:滑动窗口生成时需控制窗口大小(默认5天)
  • 过拟合预防:采用早停机制和学习率动态调整

2. 系统架构

数据层
模型层
训练层
回测层
应用层
调用
调用
调用
加载数据
生成/筛选特征
全量/增量训练
加载/保存模型
构建
执行引擎
策略逻辑
核心组件
注意力机制
data_processing.py
load_data
model_definition.py
TransformerModel
MultiHeadAttention
training.py
OnlineFeatureEngineer
IncrementalTrainer
TrainingStateManager
backtesting.py
BacktestStrategy
DualEMACrossoverStrategy
main.py

架构说明:

  1. 应用层:通过main.py整合

    • 统一训练/回测接口
    • 设备自动检测
    • 全流程种子控制
  2. 数据层:通过data_processing.py实现

    • 多股票数据加载与合并
    • 收益率计算与异常值处理
    • 严格的时间序列管理
  3. 模型层:定义于model_definition.py

    • Transformer架构实现时序预测
    • 包含位置编码和多头注意力机制
    • 支持动态维度调整的Encoder结构
  4. 训练层:通过training.py实现

    • 增量式训练框架
    • 在线特征工程系统
    • 自适应特征选择机制
    • Optuna超参优化集成
  5. 回测层:定义于backtesting.py

    • 双EMA交叉策略引擎
    • 波动率自适应仓位管理
    • 策略参数动态优化模块

2.1 数据层(Data Layer)

对应代码data_processing.py

def load_data(ts_codes, data_path="./data", test=False):# 核心数据加载逻辑combined_df["returns"] = combined_df.groupby("ts_code")["close"].pct_change().shift(-1)
  • 数据源:本地存储的Parquet文件(含复权处理)
  • 关键处理
    • 跨股票数据合并与日期对齐
    • 严格避免未来信息:按股票分组计算次日收益率
    • 收益率截断(±10%边界)
  • 输出格式:带时间戳的DataFrame,包含open/high/low/close/vol等原始字段

2.2 特征工程(Feature Engineering)

对应代码training.py

class OnlineFeatureEngineer:def generate_features(self, df):# 生成8大类技术指标self.feature_groups = {"Trend": ["MA20","EMA12","MACD"...],"Momentum": ["RSI","KDJ_K"...],...}
  • 动态特征生成

    • 趋势类(MA20, MACD, …)
    • 动量类(RSI, KDJ, …)
    • 波动率(布林带, ATR, …)
    • 成交量(OBV, MFI, …)
  • 标准化策略

    self.scalers = {"Trend": RobustScaler(),"Momentum": MinMaxScaler(),"Volatility": RobustScaler(),...
    }
    
  • 特征选择

    selector = RandomForestRegressor(n_estimators=50)
    feature_selector = SelectFromModel(selector)
    

2.3 模型系统(Model System)

对应代码model_definition.py

class TransformerModel(nn.Module):def __init__(self, input_size, d_model=64, num_heads=4...):# Encoder-Only结构self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, dff) for _ in range(num_layers)])
  • 核心架构

    • 可配置的Encoder层数(2-4层)
    • 多头注意力(4-8头)
    • 位置编码使用正弦/余弦混合编码
  • 训练机制

    criterion = nn.HuberLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    scheduler = ReduceLROnPlateau(optimizer, 'min', patience=5)
    

2.4 交易策略(Trading Strategy)

对应代码backtesting.py

class DualEMACrossoverStrategy:def generate_signals(self):ema_fast = pred_returns.ewm(span=fast_span).mean()ema_slow = pred_returns.ewm(span=slow_span).mean()position_size = np.tanh(pred_returns.abs()/volatility)
  • 动态参数
    • 快线EMA周期:10-30日
    • 慢线EMA周期:50-100日
  • 仓位控制
    • 基于波动率的tanh函数非线性映射
    • 最大仓位限制在30%-80%之间

2.5 回测引擎(Backtesting)

对应代码backtesting.py

class BacktestStrategy:def _configure_vbt(self):vbt.settings.portfolio["fees"] = 0.0025vbt.settings.portfolio["slippage"] = 0.0025
  • 成本模型

    • 双向0.25%佣金
    • 0.25%滑点成本
  • 执行机制

    Portfolio.from_signals(entries=signals.shift(1) == 1,  # T+1信号执行size=position_size,size_type="percent"
    )
    

3. 系统流程

3.1 训练流程序列图

Main DataProcessing FeatureEngineer FeatureSelector Optuna Trainer Model StateManager load_data() generate_features() feature_selection() 传递特征矩阵 创建study 参数建议 训练评估 返回指标 loop [Optuna超参优化] 正式训练 保存状态 持久化存储 Main DataProcessing FeatureEngineer FeatureSelector Optuna Trainer Model StateManager

3.2 回测流程序列图

Main BacktestStrategy DualEMA Optuna VectorBT 初始化 模型预测 创建策略实例 生成参数组合 模拟交易 返回收益 loop [参数优化] 执行最终回测 返回组合结果 Main BacktestStrategy DualEMA Optuna VectorBT

3.3 流程关键点说明

  1. 训练流程:

    • 采用两阶段优化:特征选择 → 超参优化
    • 使用Optuna进行贝叶斯优化
    • 支持断点续训的模型保存机制
  2. 回测流程:

    • 策略参数动态搜索空间
    • 基于波动率的非线性仓位控制
    • 交易成本的双向收取模型
    • 结果可视化自动适配暗色主题
  3. 跨模块协作:

    • 特征工程与模型输入的维度一致性保证
    • 训练/推理的设备自动适配
    • 时间序列的严格对齐机制

4. 总结与优化建议

4.1 方案优势

  • 架构灵活性:模块化设计支持策略快速迭代
  • 计算效率:GPU加速训练+VectorBT高效回测
  • 风险控制:动态波动率调整+仓位限制(0.3-0.8)

4.2 优化方向

维度当前实现优化建议
特征工程8类技术指标增加市场情绪数据(新闻舆情、资金流向)
模型结构Encoder-only增加Decoder实现Seq2Seq预测
仓位策略固定比例引入强化学习动态调整
数据增强原始序列添加随机时频变换增强
风险控制波动率约束加入VaR压力测试模块

4.3 实践建议

  1. 增量更新频率:建议每月更新模型,每周更新特征分布
  2. 参数搜索空间:可扩展EMA参数范围(快线5-50,慢线30-200)
  3. 硬件配置:推荐使用至少16GB显存的GPU设备
  4. 回测验证:建议采用Walk-Forward分析法,避免过拟合

5. 工程代码

目录结构:

data/
├── processed_600000.SH.parquet
├── processed_600036.SH.parquet
├── processed_600519.SH.parquet
├── processed_000001.SZ.parquet
models/
├── vectorbt_5_model.pth
├── vectorbt_5_preprocessors.pkl
src/
└── vectorbt_5/├── data_processing.py├── model_definition.py├── training.py├── backtesting.py├── main.py└── __init__.py

5.1 data_processing.py

import pandas as pddef load_data(ts_codes, data_path="./data"):"""加载预处理后的股票数据:param ts_code: 股票代码(如["600000.SH", "600519.SH", "000001.SZ"]):param data_path: 数据存储路径):return: 合并后的DataFrame(含ts_code列标识股票)处理步骤:1. 读取parquet格式的本地数据2. 转换交易日期格式3. 计算次日收益率(目标变量)4. 删除缺失值"""dfs = []for ts_code in ts_codes:file_path = f"{data_path}/processed_{ts_code}.parquet"df = pd.read_parquet(file_path)df["ts_code"] = ts_codedfs.append(df)combined_df = pd.concat(dfs)combined_df["trade_date"] = pd.to_datetime(combined_df["trade_date"], format="%Y%m%d")combined_df.set_index("trade_date", inplace=True)combined_df.sort_index(inplace=True)# 按股票分组计算收益率,避免跨股票计算,严格避免未来信息combined_df["returns"] = combined_df.groupby("ts_code", group_keys=False)["close"].apply(lambda x: x.pct_change().shift(-1).clip(-0.1, 0.1)  # 添加收益率截断)combined_df.dropna(inplace=True)return combined_df

5.2 model_definition.py

import mathimport torch
import torch.nn as nnclass MultiHeadAttention(nn.Module):"""多头注意力机制。:param d_model: 输入和输出的维度:param num_heads: 注意力头的数量"""def __init__(self, d_model, num_heads):super().__init__()self.num_heads = num_heads  # 注意力头的数量self.d_model = d_model  # 输入和输出的维度self.depth = d_model // num_heads  # 每个头的维度self.wq = nn.Linear(d_model, d_model)  # 查询线性变换self.wk = nn.Linear(d_model, d_model)  # 键线性变换self.wv = nn.Linear(d_model, d_model)  # 值线性变换self.dense = nn.Linear(d_model, d_model)  # 输出线性变换def split_heads(self, x, batch_size):"""将输入张量分割成多个头。:param x: 输入张量 (batch_size, seq_len, d_model):param batch_size: 批次大小:return: 分割后的张量 (batch_size, num_heads, seq_len, depth)"""x = x.view(batch_size, -1, self.num_heads, self.depth)  # 将d_model维度拆分成num_heads和depthreturn x.permute(0, 2, 1, 3)  # 调整维度顺序为 (batch_size, num_heads, seq_len, depth)def forward(self, q, k, v):"""前向传播函数。:param q: 查询张量 (batch_size, seq_len, d_model):param k: 键张量 (batch_size, seq_len, d_model):param v: 值张量 (batch_size, seq_len, d_model):return: 输出张量 (batch_size, seq_len, d_model) 和注意力权重 (batch_size, num_heads, seq_len, seq_len)"""batch_size = q.size(0)  # 获取批次大小q = self.wq(q)  # 对查询进行线性变换k = self.wk(k)  # 对键进行线性变换v = self.wv(v)  # 对值进行线性变换q = self.split_heads(q, batch_size)  # 将查询张量分割成多个头k = self.split_heads(k, batch_size)  # 将键张量分割成多个头v = self.split_heads(v, batch_size)  # 将值张量分割成多个头# 计算注意力scaled_attention, attention_weights = self.scaled_dot_product_attention(q, k, v)scaled_attention = scaled_attention.permute(0, 2, 1, 3)  # 调整维度顺序为 (batch_size, seq_len, num_heads, depth)concat_attention = scaled_attention.reshape(batch_size, -1, self.d_model)  # 合并头,恢复原始维度output = self.dense(concat_attention)  # 进行最终的线性变换return output, attention_weightsdef scaled_dot_product_attention(self, q, k, v):"""计算缩放点积注意力。:param q: 查询张量 (batch_size, num_heads, seq_len, depth):param k: 键张量 (batch_size, num_heads, seq_len, depth):param v: 值张量 (batch_size, num_heads, seq_len, depth):return: 注意力输出 (batch_size, num_heads, seq_len, depth) 和注意力权重 (batch_size, num_heads, seq_len, seq_len)"""matmul_qk = torch.matmul(q, k.transpose(-1, -2))  # 计算Q和K的点积dk = torch.tensor(k.size(-1), dtype=torch.float32)  # 获取深度dkscaled_attention_logits = matmul_qk / torch.sqrt(dk)  # 缩放点积attention_weights = torch.softmax(scaled_attention_logits, dim=-1)  # 计算注意力权重output = torch.matmul(attention_weights, v)  # 应用注意力权重到值上return output, attention_weightsclass EncoderLayer(nn.Module):"""编码器层。:param d_model: 输入和输出的维度:param num_heads: 注意力头的数量:param dff: 前馈神经网络的中间维度:param dropout: dropout 概率,默认为 0.1"""def __init__(self, d_model, num_heads, dff, dropout=0.1):super().__init__()self.mha = MultiHeadAttention(d_model, num_heads)  # 多头注意力机制self.ffn = nn.Sequential(nn.Linear(d_model, dff),  # 线性变换到dff维度nn.GELU(),  # GELU激活函数nn.Linear(dff, d_model),  # 线性变换回d_model维度)self.layer_norm1 = nn.LayerNorm(d_model)  # 第一个层归一化self.layer_norm2 = nn.LayerNorm(d_model)  # 第二个层归一化self.dropout = nn.Dropout(dropout)  # Dropout层def forward(self, x):"""前向传播函数。:param x: 输入张量 (batch_size, seq_len, d_model):return: 输出张量 (batch_size, seq_len, d_model)"""# 多头注意力attn_output, _ = self.mha(x, x, x)  # 计算多头注意力attn_output = self.dropout(attn_output)  # 应用dropoutout1 = self.layer_norm1(x + attn_output)  # 残差连接和层归一化# 前馈神经网络ffn_output = self.ffn(out1)  # 前馈神经网络ffn_output = self.dropout(ffn_output)  # 应用dropoutout2 = self.layer_norm2(out1 + ffn_output)  # 残差连接和层归一化return out2class DecoderLayer(nn.Module):"""解码器层。:param d_model: 输入和输出的维度:param num_heads: 注意力头的数量:param dff: 前馈神经网络的中间维度:param dropout: dropout 概率,默认为 0.1"""def __init__(self, d_model, num_heads, dff, dropout=0.1):super().__init__()self.mha1 = MultiHeadAttention(d_model, num_heads)  # 掩码多头注意力self.mha2 = MultiHeadAttention(d_model, num_heads)  # 编码器-解码器注意力self.ffn = nn.Sequential(nn.Linear(d_model, dff),  # 线性变换到dff维度nn.GELU(),  # GELU激活函数nn.Linear(dff, d_model),  # 线性变换回d_model维度)self.layer_norm1 = nn.LayerNorm(d_model)  # 第一个层归一化self.layer_norm2 = nn.LayerNorm(d_model)  # 第二个层归一化self.layer_norm3 = nn.LayerNorm(d_model)  # 第三个层归一化self.dropout = nn.Dropout(dropout)  # Dropout层def forward(self, x, enc_output):"""前向传播函数。:param x: 输入张量 (batch_size, seq_len, d_model):param enc_output: 编码器输出 (batch_size, src_seq_len, d_model):return: 输出张量 (batch_size, seq_len, d_model) 和两个注意力权重"""# 掩码多头注意力attn1, attn_weights1 = self.mha1(x, x, x)  # 计算掩码多头注意力attn1 = self.dropout(attn1)  # 应用dropoutout1 = self.layer_norm1(x + attn1)  # 残差连接和层归一化# 编码器-解码器注意力attn2, attn_weights2 = self.mha2(out1, enc_output, enc_output)  # 计算编码器-解码器注意力attn2 = self.dropout(attn2)  # 应用dropoutout2 = self.layer_norm2(out1 + attn2)  # 残差连接和层归一化# 前馈神经网络ffn_output = self.ffn(out2)  # 前馈神经网络ffn_output = self.dropout(ffn_output)  # 应用dropoutout3 = self.layer_norm3(out2 + ffn_output)  # 残差连接和层归一化return out3, attn_weights1, attn_weights2class PositionalEncoding(nn.Module):"""位置编码。:param d_model: 输入和输出的维度:param max_len: 最大序列长度,默认为5000:param dropout: dropout 概率,默认为 0.1"""def __init__(self, d_model, max_len=5000, dropout=0.1):super().__init__()self.dropout = nn.Dropout(dropout)  # Dropout层pe = torch.zeros(max_len, d_model)  # 初始化位置编码position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # 位置索引div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))  # 用于计算正弦和余弦的位置项pe[:, 0::2] = torch.sin(position * div_term)  # 正弦位置编码pe[:, 1::2] = torch.cos(position * div_term)  # 余弦位置编码pe = pe.unsqueeze(0)  # 增加批次维度self.register_buffer("pe", pe)  # 注册位置编码为缓冲区def forward(self, x):"""前向传播函数。:param x: 输入张量 (batch_size, seq_len, d_model):return: 添加了位置编码的张量 (batch_size, seq_len, d_model)"""x = x + self.pe[:, : x.size(1)]  # 添加位置编码return self.dropout(x)  # 应用dropoutclass TransformerModel(nn.Module):"""Transformer模型。:param input_size: 输入特征的维度:param d_model: 输入和输出的维度:param num_heads: 注意力头的数量:param num_layers: 编码器层数:param dff: 前馈神经网络的中间维度:param dropout: dropout 概率,默认为 0.1"""def __init__(self, input_size, d_model, num_heads, num_layers, dff, dropout=0.1):super().__init__()self.embedding = nn.Linear(input_size, d_model)  # 输入嵌入self.pos_encoding = PositionalEncoding(d_model, dropout=dropout)  # 位置编码self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, dff, dropout)for _ in range(num_layers)]  # 编码器层)self.fc = nn.Linear(d_model, 1)  # 全连接层def forward(self, x):"""前向传播函数。:param x: 输入张量 (batch_size, seq_len, input_size):return: 输出张量 (batch_size, 1)"""x = self.embedding(x)  # 输入嵌入x = self.pos_encoding(x)  # 位置编码for enc_layer in self.encoder_layers:x = enc_layer(x)  # 通过每个编码器层x = self.fc(x.mean(dim=1))  # 使用全局平均池化并进行最终线性变换return x

5.3 training.py

import osimport joblib
import numpy as np
import optuna
import pandas as pd
import torch
import torch.nn as nn
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectFromModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdmfrom vectorbt_5.model_definition import TransformerModelclass SingleWindowDataset(Dataset):def __init__(self, X, y):"""单窗口数据集类,用于将时间序列数据转换为PyTorch数据集格式。:param X: 滑动窗口特征数据:param y: 目标变量"""self.X = X.astype(np.float32)  # 将滑动窗口特征数据转换为float32类型self.y = y.astype(np.float32)  # 将目标变量转换为float32类型def __len__(self):return len(self.y)  # 返回目标变量的长度def __getitem__(self, idx):x = torch.from_numpy(self.X[idx]).float()  # 将滑动窗口特征数据转换为PyTorch张量label = torch.tensor(self.y[idx], dtype=torch.float32)  # 将目标变量转换为PyTorch张量return x, label  # 返回特征和标签class TrainingStateManager:def __init__(self, model_dir="./models"):"""训练状态管理器类,负责模型和预处理器的保存和加载。:param model_dir: 模型保存目录,defaults to "./models""""self.model_dir = model_dir  # 设置模型保存目录self.model_path = f"{self.model_dir}/vectorbt_5_model.pth"  # 设置模型文件路径self.preprocessors_path = (f"{self.model_dir}/vectorbt_5_preprocessors.pkl"  # 设置预处理器文件路径)os.makedirs(model_dir, exist_ok=True)  # 创建模型保存目录(如果不存在)def save(self, model, feature_engineer, feature_selector, config):"""保存模型和预处理器。:param model: 训练好的模型:param feature_engineer: 特征工程对象:param feature_selector: 特征选择器:param config: 模型配置"""torch.save(model.state_dict(), self.model_path)  # 保存模型参数joblib.dump({"feature_engineer": feature_engineer,"feature_selector": feature_selector,"config": config,},self.preprocessors_path,)  # 保存预处理器和配置print(f"Model saved to {self.model_path}")  # 打印模型保存路径print(f"Preprocessor saved to {self.preprocessors_path}")  # 打印预处理器保存路径def load(self, device):"""加载模型和预处理器。:param device: 计算设备(CPU/GPU):return: 加载的模型、特征工程对象、特征选择器和模型配置"""model = None  # 初始化模型feature_engineer = None  # 初始化特征工程对象feature_selector = None  # 初始化特征选择器config = None  # 初始化配置if os.path.exists(self.preprocessors_path):  # 如果预处理器文件存在preprocess = joblib.load(self.preprocessors_path)  # 加载预处理器feature_engineer = preprocess["feature_engineer"]  # 获取特征工程对象feature_selector = preprocess["feature_selector"]  # 获取特征选择器config = preprocess["config"]  # 获取配置if os.path.exists(self.model_path):  # 如果模型文件存在model = TransformerModel(input_size=config["input_size"],d_model=config["d_model"],num_heads=config["num_heads"],num_layers=config["num_layers"],dff=config["dff"],dropout=config["dropout"],).to(device)model.load_state_dict(torch.load(self.model_path, weights_only=False, map_location=device))  # 加载模型参数return (model,feature_engineer,feature_selector,config,)  # 返回加载的模型、特征工程对象、特征选择器和配置class OnlineFeatureEngineer:def __init__(self, windows=[5]):"""在线特征工程生成器类,用于生成技术指标特征并进行标准化。:param windows: 滑动窗口列表(用于特征生成),defaults to [5]"""self.windows = windows  # 设置滑动窗口列表self.n_features = 10  # 设置特征数量self.scalers = {"Trend": RobustScaler(),  # 趋势类指标使用RobustScaler"Momentum": MinMaxScaler(),  # 动量类指标使用MinMaxScaler"Volatility": RobustScaler(),  # 波动率类指标使用RobustScaler"Volume": StandardScaler(),  # 成交量类指标使用StandardScaler# "Sentiment": StandardScaler(),  # 市场情绪类指标使用StandardScaler"SupportResistance": MinMaxScaler(),  # 支撑阻力类指标使用MinMaxScaler"Statistical": StandardScaler(),  # 统计类指标使用StandardScaler"Composite": RobustScaler(),  # 复合型指标使用RobustScaler}# "price": ["open", "high", "low", "close"],self.feature_groups = {# Trend-Following Indicators 趋势类指标"Trend": ["MA20", "EMA12", "MACD", "ADX", "SAR"],# Momentum Indicators 动量类指标"Momentum": ["RSI", "KDJ_K", "KDJ_D", "KDJ_J", "CCI", "WILLR"],# Volatility Indicators 波动率类指标"Volatility": ["BB_upper", "BB_middle", "BB_lower", "ATR", "STD20"],# Volume Indicators 成交量类指标"Volume": ["OBV", "MFI"],# Market Sentiment Indicators 市场情绪类指标 (需要外部数据)# "Sentiment": [],# Support/Resistance Indicators 支撑阻力类指标"SupportResistance": ["Fib_0.382", "Fib_0.618", "Pivot"],# Statistical Indicators 统计类指标"Statistical": ["LR_slope", "LR_angle"],# Composite Indicators 复合型指标"Composite": ["Ichimoku_tenkan","Ichimoku_kijun","Ichimoku_senkou_a","Ichimoku_senkou_b","Ichimoku_chikou",],}self.all_features = [f for sublist in self.feature_groups.values() for f in sublist]  # 生成所有特征列表def partial_fit(self, new_df):"""对新数据进行部分拟合。:param new_df: 新数据DataFrame"""new_features = self.generate_features(new_df, refit=True)  # 生成新数据的特征for group, features in self.feature_groups.items():  # 遍历每个特征组if hasattr(self.scalers[group], "partial_fit"):  # 如果该缩放器支持部分拟合self.scalers[group].partial_fit(new_features[features])  # 对新数据进行部分拟合def generate_features(self, df, refit=False):"""生成技术指标特征。:param df: 原始数据DataFrame:param refit: 是否重新拟合标准化器,defaults to False:return: 特征DataFrame生成8大类技术指标:1. 趋势类指标(MA, MACD等)2. 动量类指标(RSI, KDJ等)3. 波动率指标(布林带, ATR等)4. 成交量指标(OBV, MFI等)5. 市场情绪类指标 (需要外部数据) -- 忽略6. 支撑阻力指标(斐波那契回撤等)7. 统计指标(线性回归斜率等)8. 复合指标(Ichimoku云图等)"""processed = []  # 初始化处理后的特征列表for group, features in self.feature_groups.items():  # 遍历每个特征组scaler = self.scalers[group]  # 获取对应的缩放器if not refit:  # 如果不重新拟合scaler.fit(df[features])  # 拟合缩放器scaled = scaler.transform(df[features])  # 标准化特征processed.append(scaled)  # 添加到处理后的特征列表processed_df = pd.DataFrame(np.hstack(processed), index=df.index, columns=self.all_features)  # 将处理后的特征合并为DataFrameprocessed_df["ts_code"] = df["ts_code"]  # 添加股票代码processed_df["returns"] = df["returns"]  # 添加收益return processed_df.dropna()  # 删除缺失值def feature_selection(self, X, y):"""进行特征选择。:param X: 特征数据:param y: 目标变量:return: 选择后的特征数据"""selector = RandomForestRegressor(n_estimators=50, n_jobs=-1)  # 初始化随机森林回归器selector.fit(X, y)  # 拟合随机森林回归器feature_selector = SelectFromModel(selector, prefit=True)  # 初始化特征选择器return feature_selector  # 返回特征选择器class IncrementalDataHandler:def __init__(self, feature_engineer, feature_selector, window_size=5):"""增量数据处理器类,用于处理增量数据并生成滑动窗口数据。:param feature_engineer: 特征工程对象:param feature_selector: 特征选择器:param window_size: 滑动窗口大小,defaults to 5"""self.feature_engineer = feature_engineer  # 设置特征工程对象self.feature_selector = feature_selector  # 设置特征选择器self.window_size = window_size  # 设置滑动窗口大小self.buffer = pd.DataFrame()  # 初始化数据缓冲区def update_buffer(self, new_df):"""更新数据缓冲区。:param new_df: 新数据DataFrame"""self.buffer = pd.concat([self.buffer, new_df]).sort_index()  # 更新数据缓冲区并排序def prepare_incremental_data(self):"""准备增量数据。:return: 训练数据和测试数据"""processed_df = self.feature_engineer.generate_features(self.buffer)  # 生成特征X_selected = self.feature_selector.transform(processed_df[self.feature_engineer.all_features].values)  # 选择特征X, y = self.sliding_window(processed_df, X_selected)  # 生成滑动窗口数据X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=False)  # 划分训练集和测试集return (X_train, y_train), (X_test, y_test)  # 返回训练集和测试集def sliding_window(self, df, features):"""生成时序滑动窗口数据"""sequences = []  # 初始化序列列表labels = []  # 初始化标签列表for ts_code, group in df.groupby("ts_code"):  # 按股票代码分组group_features = features[df["ts_code"] == ts_code]  # 获取该股票的特征for i in range(self.window_size, len(group)):  # 生成滑动窗口sequences.append(group_features[i - self.window_size : i])  # 添加滑动窗口特征labels.append(group["returns"].iloc[i])  # 添加标签return np.array(sequences, dtype=np.float32), np.array(labels, dtype=np.float32)  # 返回滑动窗口特征和标签class IncrementalTrainer:def __init__(self, device, window_size=5):"""增量训练控制器类,负责模型的初始训练和增量更新。:param device: 计算设备(CPU/GPU)"""self.device = device  # 设置计算设备self.window_size = window_size  # 设置滑动窗口大小self.state_manager = TrainingStateManager()  # 初始化训练状态管理器self.model = None  # 初始化模型self.feature_engineer = None  # 初始化特征工程对象self.feature_selector = None  # 初始化特征选择器self.config = None  # 初始化配置self.load_state()  # 加载状态def load_state(self):"""加载模型和预处理器。"""(self.model,self.feature_engineer,self.feature_selector,self.config,) = self.state_manager.load(self.device)  # 加载模型、特征工程对象、特征选择器和配置def initial_train(self, df):"""初始训练模型。:param df: 原始数据DataFrame"""# 特征生成self.feature_engineer = OnlineFeatureEngineer(windows=[self.window_size])  # 初始化特征工程对象processed_df = self.feature_engineer.generate_features(df)  # 生成特征# 特征选择X, y = processed_df[self.feature_engineer.all_features], df["returns"].reindex(processed_df.index)  # 准备特征和标签self.feature_selector = self.feature_engineer.feature_selection(X, y)  # 选择特征# 准备训练数据data_handler = IncrementalDataHandler(feature_engineer=self.feature_engineer,feature_selector=self.feature_selector,window_size=self.window_size,)  # 初始化增量数据处理器data_handler.buffer = df  # 更新数据缓冲区(X_train, y_train), (X_test, y_test) = (data_handler.prepare_incremental_data())  # 准备训练数据# Optuna超参优化self._optimize_parameters(X_train, y_train, X_test, y_test)  # 优化超参数# 最佳模型训练print(f"Initial Model Config: {self.config}")  # 打印初始模型配置self._train(X_train, y_train)  # 训练模型print("Initial Model Evaluation:")  # 打印初始模型评估self._evaluate(X_test, y_test)  # 评估模型print("Initial Model Save:")  # 打印初始模型保存self.state_manager.save(self.model, self.feature_engineer, self.feature_selector, self.config)  # 保存模型def incremental_update(self, new_df):"""增量更新模型。:param new_df: 新数据DataFrame"""if not self.model:print("The model does not exist, please train it first.")  # 如果模型不存在,提示先训练模型returnself.feature_engineer.partial_fit(new_df)  # 对新数据进行部分拟合data_handler = IncrementalDataHandler(self.feature_engineer, self.feature_selector)  # 初始化增量数据处理器data_handler.update_buffer(new_df)  # 更新数据缓冲区(X_train, y_train), (X_test, y_test) = (data_handler.prepare_incremental_data())  # 准备增量数据print(f"Incremental Model Config: {self.config}")  # 打印增量模型配置self._train(X_train, y_train)  # 训练模型print("\nIncremental Model Evaluation:")  # 打印增量模型评估self._evaluate(X_test, y_test)  # 评估模型print("\nIncremental Model Save:")  # 打印增量模型保存self.state_manager.save(self.model, self.feature_engineer, self.feature_selector, self.config)  # 保存模型def _optimize_parameters(self, X_train, y_train, X_test, y_test):"""Optuna超参优化:param X_train: 训练集滑动窗口特征数据:param y_train: 训练集目标变量:param X_test: 测试集滑动窗口特征数据:param y_test: 测试集目标变量"""def objective(trial):self.config = {"num_heads": trial.suggest_categorical("num_heads", [4, 8]),"d_model": trial.suggest_int("d_model", 64, 256, step=32),  # 以32为步长"num_layers": trial.suggest_int("num_layers", 2, 4),"dff": trial.suggest_int("dff", 128, 512, step=64),"dropout": trial.suggest_float("dropout", 0.1, 0.3),"lr": trial.suggest_float("lr", 1e-4, 1e-3, log=True),"input_size": X_train[0].shape[-1],  # 输入特征维度"batch_size": trial.suggest_categorical("batch_size", [32, 64, 128]),"epochs": 100,  # 训练轮数"window_size": self.window_size,  # 滑动窗口大小}self._train(X_train, y_train)  # 训练模型val_loss = self._evaluate(X_test, y_test)  # 评估模型print(f"Val Loss: {val_loss:.4f}")  # 打印验证损失return val_loss  # 返回验证损失# 超参优化study = optuna.create_study(direction="minimize")  # 创建研究study.optimize(objective, n_trials=10, show_progress_bar=True)  # 优化目标函数print(f"Training Best params: {study.best_params}")  # 打印最佳参数# 最佳模型参数self.config.update(study.best_params)  # 更新配置def _train(self, X_train, y_train):"""模型训练内部方法。:param X_train: 训练集滑动窗口特征数据:param y_train: 训练集目标变量"""print(f"Training Model Config: {self.config}")  # 打印模型配置num_heads = self.config.get("num_heads", 8)d_model = self.config.get("d_model", 64)d_model = (d_model // num_heads) * num_headsnum_layers = self.config.get("num_layers", 3)dff = self.config.get("dff", 256)dropout = self.config.get("dropout", 0.1)lr = self.config.get("lr", 1e-4)input_size = self.config.get("input_dim", X_train[0].shape[-1])  # 获取输入特征维度epochs = self.config.get("epochs", 100)  # 获取训练轮数batch_size = self.config.get("batch_size", 128)# 强制维度约束if d_model % num_heads != 0:d_model = (d_model // num_heads) * num_heads  # 自动调整为最近的可整除数dataset = SingleWindowDataset(X_train, y_train)  # 初始化数据集loader = DataLoader(dataset,batch_size=batch_size,shuffle=False,)  # 初始化数据加载器# 初始化模型self.model = TransformerModel(input_size, d_model, num_heads, num_layers, dff, dropout).to(self.device)optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)  # 初始化优化器scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, "min", patience=5)  # 初始化学习率调度器criterion = nn.HuberLoss()  # 初始化损失函数# 训练循环for epoch in tqdm(range(epochs), desc="Training"):  # 进行训练self.model.train()  # 设置模型为训练模式total_loss = 0  # 初始化总损失for X_batch, y_batch in loader:  # 遍历数据加载器X_batch = X_batch.to(self.device)  # 将特征数据移动到指定设备y_batch = y_batch.to(self.device).unsqueeze(1)  # 将标签数据移动到指定设备并扩展维度optimizer.zero_grad()  # 清零梯度preds = self.model(X_batch)  # 前向传播loss = criterion(preds, y_batch)  # 计算损失loss.backward()  # 反向传播nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)  # 梯度裁剪optimizer.step()  # 更新参数total_loss += loss.item()  # 累加损失# 学习率调整avg_loss = total_loss / len(loader)  # 计算平均损失scheduler.step(avg_loss)  # 更新学习率def _evaluate(self, X_test, y_test):"""模型评估内部方法。:param X_test: 测试集滑动窗口特征数据:param y_test: 测试集目标变量:return: 平均损失"""test_dataset = SingleWindowDataset(X_test, y_test)  # 初始化测试数据集test_loader = DataLoader(test_dataset,batch_size=128,shuffle=False,)  # 初始化测试数据加载器self.model.eval()  # 设置模型为评估模式total_loss = 0  # 初始化总损失criterion = nn.HuberLoss()  # 初始化损失函数with torch.no_grad():  # 关闭梯度计算for X_batch, y_batch in test_loader:  # 遍历测试数据加载器X_batch = X_batch.to(self.device)  # 将特征数据移动到指定设备y_batch = y_batch.to(self.device).unsqueeze(1)  # 将标签数据移动到指定设备并扩展维度preds = self.model(X_batch)  # 前向传播loss = criterion(preds, y_batch)  # 计算损失total_loss += loss.item() * len(y_batch)  # 累加损失test_loss = total_loss / len(test_dataset)  # 计算平均损失print(f"Test Loss: {test_loss}")  # 打印测试损失return test_loss  # 返回测试损失

5.4 backtesting.py

import numpy as np
import optuna
import pandas as pd
import torch
import vectorbt as vbtclass DualEMACrossoverStrategy:def __init__(self, pred_returns, volatility, params):"""双EMA交叉策略类。:param pred_returns: 预测的收益率序列:param volatility: 波动率序列(用于仓位计算):param params: 参数字典,包含快慢EMA的时间跨度"""self.pred_returns = pred_returns  # 预测的收益率序列self.volatility = volatility.clip(lower=0.01)  # 防止波动率为0,导致除零错误self.fast_span = params["fast_span"]  # 快速EMA的时间跨度self.slow_span = params["slow_span"]  # 慢速EMA的时间跨度def generate_signals(self):"""生成交易信号。:return: (交易信号, 仓位大小)"""ema_fast = self.pred_returns.ewm(span=self.fast_span, min_periods=self.fast_span).mean()  # 计算快速EMAema_slow = self.pred_returns.ewm(span=self.slow_span, min_periods=self.slow_span).mean()  # 计算慢速EMAsignals = pd.Series(0, index=self.pred_returns.index)  # 初始化信号序列signals[(ema_fast > ema_slow) & (ema_fast.shift(1) <= ema_slow.shift(1))] = (1  # 买入信号)signals[(ema_fast < ema_slow) & (ema_fast.shift(1) >= ema_slow.shift(1))] = (-1)  # 卖出信号# 使用tanh函数压缩仓位大小,实现非线性映射position_size = np.tanh(self.pred_returns.abs() / self.volatility).clip(0.3, 0.8)return signals, position_size  # 返回信号和仓位大小class BacktestStrategy:def __init__(self, model, device):"""回测执行引擎。:param model: 训练好的预测模型:param device: 计算设备(CPU/GPU)"""self.model = model  # 训练好的预测模型self.device = device  # 计算设备self._configure_vbt()  # 配置VectorBT全局参数def _configure_vbt(self):"""配置VectorBT全局参数。"""vbt.settings.array_wrapper["freq"] = "D"  # 设置时间频率为日频vbt.settings.plotting["layout"]["template"] = "vbt_dark"  # 使用暗色主题vbt.settings.plotting["layout"]["width"] = 1200  # 设置图表宽度vbt.settings.portfolio["init_cash"] = 100000.0  # 初始资金10万元vbt.settings.portfolio["fees"] = 0.0025  # 交易成本(手续费)0.25%vbt.settings.portfolio["slippage"] = 0.0025  # 交易成本(滑点)0.25%def _optimize_parameters(self, result_df):"""优化参数逻辑调整。:param result_df: 包含预测收益率的结果DataFrame:return: 最优参数"""def objective(trial):params = {"fast_span": trial.suggest_int("fast_span", 10, 30),  # 建议快速EMA的时间跨度"slow_span": trial.suggest_int("slow_span", 50, 100),  # 建议慢速EMA的时间跨度}strategy = DualEMACrossoverStrategy(pred_returns=result_df["pred_returns"],volatility=result_df["volatility"],params=params,)  # 创建策略实例signals, position_size = strategy.generate_signals()  # 生成交易信号pf = vbt.Portfolio.from_signals(close=result_df["close"],entries=signals.shift(1) == 1,  # 买入信号exits=signals.shift(1) == -1,  # 卖出信号size=position_size,  # 固定仓位freq="D",)return pf.total_profit()  # 返回总利润study = optuna.create_study(direction="maximize")  # 创建Optuna研究study.optimize(objective, n_trials=10, show_progress_bar=True)  # 优化参数print(f"Strategy Best params: {study.best_params}")  # 打印最优参数return study.best_params  # 返回最优参数def run(self, test_data, df):"""执行完整回测流程。:param test_data: 测试数据集元组(X_test, y_test):param df: 原始数据DataFrame:return: (组合对象, 结果DataFrame)"""X_test = test_data  # 测试数据self.model.eval()  # 将模型设置为评估模式with torch.no_grad():  # 禁用梯度计算test_tensor = torch.FloatTensor(X_test).to(self.device)  # 转换为Tensor并移动到指定设备preds = (self.model(test_tensor).cpu().numpy().flatten())  # 获取预测值并转换为NumPy数组test_dates = df.index[-len(preds) :]  # 获取测试日期result_df = pd.DataFrame({"close": df["close"].values[-len(preds) :],"pred_returns": preds,  # 使用rolling窗口计算动态波动率"volatility": df["ATR"].values[-len(preds) :]/ df["close"].values[-len(preds) :],},index=test_dates,)  # 创建结果DataFramebest_params = self._optimize_parameters(result_df)  # 运行参数优化strategy = DualEMACrossoverStrategy(pred_returns=result_df["pred_returns"],volatility=result_df["volatility"],params=best_params,)  # 创建策略实例signals, position_size = strategy.generate_signals()  # 生成交易信号return vbt.Portfolio.from_signals(close=result_df["close"],entries=signals == 1,  # 买入信号exits=signals == -1,  # 卖出信号size=position_size,size_type="percent",freq="D",)  # 执行组合回测

5.5 main.py

import randomimport numpy as np
import torchfrom vectorbt_5.backtesting import BacktestStrategy
from vectorbt_5.data_processing import load_data
from vectorbt_5.training import IncrementalTrainer, TrainingStateManagerdef set_random_seed(seed=42):"""设置全局随机种子:param seed: 随机种子, defaults to 42影响范围:- Python内置随机模块- Numpy随机数生成- PyTorch CPU/CUDA随机种子"""random.seed(seed)np.random.seed(seed)torch.manual_seed(seed)if torch.cuda.is_available():torch.cuda.manual_seed(seed)torch.cuda.manual_seed_all(seed)  # 如果使用多个GPUtorch.backends.cudnn.deterministic = Truetorch.backends.cudnn.benchmark = Falsedef prepare_backtest_data(ts_code, device):"""准备回测数据。:param ts_code: 股票代码:param device: 计算设备(CPU/GPU):return: 滑动窗口特征数据、原始数据DataFrame、模型、特征工程对象、特征选择器和配置"""state_manager = TrainingStateManager()  # 初始化训练状态管理器model, feature_engineer, feature_selector, config = state_manager.load(device)  # 加载模型和预处理器print(f"Model Config: {config}")  # 打印模型配置# 加载并处理数据test_df = load_data([ts_code])  # 加载数据test_df = test_df[-300:]  # 取最近300条数据processed_df = feature_engineer.generate_features(test_df)  # 生成特征X_selected = feature_selector.transform(processed_df[feature_engineer.all_features].values)  # 选择特征# 构建滑动窗口window_size = config["window_size"]  # 获取滑动窗口大小sequences = []  # 初始化序列列表for i in range(window_size, len(X_selected)):  # 遍历数据sequences.append(X_selected[i - window_size : i])  # 添加滑动窗口特征return (np.array(sequences, dtype=np.float32),  # 返回滑动窗口特征数据test_df,  # 返回原始数据DataFramemodel,  # 返回模型feature_engineer,  # 返回特征工程对象feature_selector,  # 返回特征选择器config,  # 返回配置)if __name__ == "__main__":# 设置随机种子# 函数确保了整个训练过程的可重复性。# 通过设置相同的随机种子,可以保证每次运行时生成的随机数序列一致,这对于调试和实验验证非常重要。set_random_seed()# 检测可用计算设备(优先使用CUDA)device = torch.device("cuda"if torch.cuda.is_available()else "mps" if torch.backends.mps.is_available() else "cpu")# 股票行情# start_date = "20180101"# end_date = "20241231"trainer = IncrementalTrainer(device)# 多股票初始训练示例# 浦发银行(600000.SH)# 招商银行(600036.SH)# 平安银行(000001.SZ)train_codes = ["600000.SH", "600036.SH", "000001.SZ"]train_df = load_data(train_codes)model = trainer.initial_train(train_df)# 增量训练示例# 贵州茅台(600519.SH)# new_df = load_data(["600519.SH"])# model = trainer.incremental_update(new_df)# 单股票回测(test_data, test_df, model, feature_engineer, feature_selector, config) = (prepare_backtest_data("600036.SH", device))backtester = BacktestStrategy(model, device)pf = backtester.run(test_data, test_df)print("回测结果统计:")print(pf.stats())pf.plot().show()

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

相关文章:

VectorBT:使用PyTorch+Transformer训练和回测股票模型 进阶五

VectorBT&#xff1a;使用PyTorchTransformer训练和回测股票模型 进阶五 本方案基于PyTorch框架与Transformer模型&#xff0c;结合VectorBT回测引擎构建多股票量化交易系统&#xff0c;采用滑动窗口技术构建时序特征&#xff0c;通过自注意力机制捕捉市场规律预测收益率&#…...

DP Alt Mode​​ 与 ​​USB​​ 的关系

DP Alt Mode​​ 与 ​​USB​​ 的关系 1. 物理接口的统一&#xff1a;USB-C 是“万能插座” [USB-C接口物理结构] |-----------------------------------------------| | USB 3.0数据引脚 | DP Alt Mode视频引脚 | 电源引脚 | |-------------------------------------…...

C#“与AI的奇妙结合”

原文&#xff1a;C# 使用通义灵码 - AI 助力 Visual Studio 开发_w3cschool &#xff08;注意&#xff1a;本文章中并不存在任何广告&#xff0c;也不存在任何盈利内容&#xff09; C# 使用通义灵码 C# 作为一种功能强大且灵活多变的编程语言&#xff0c;被广泛应用于各个领…...

企业ITR流程设计与执行详细介绍【附全文阅读】

该方案聚焦企业 ITR 流程,适用于企业的服务管理人员、流程优化负责人、技术支持团队以及中高层管理者等。 ITR 流程的重要性:企业服务面临客户不满、管理者焦虑、服务人员无奈等挑战,缺乏完善的 ITR 流程会影响品牌形象、客户满意度和产品竞争力。ITR 流程能够保障客户满意,…...

Ubuntu 无密码热点(Soft AP)完整配置方案

适用于 Jetson、嵌入式 Linux、RDK 平台。目标&#xff1a;配置一个无密码热点&#xff08;Soft AP&#xff09;&#xff0c;供手机等设备直接连接。实现开机自动启动热点&#xff0c;也支持后续一键切换回 WiFi 客户端模式。 平台&#xff1a;Yahboom RDK X3&#xff08;Jetso…...

【力扣hot100题】(063)搜索二维矩阵

看到这题我就想到之前被我当作这题做的【力扣hot100题】&#xff08;020&#xff09;搜索二维矩阵Ⅱ 其实是完全不一样的两题&#xff0c;个人觉得这道题更简单也更考验基础&#xff0c;那道题思路更难想到但代码更好写。 两个二分查找结束&#xff0c;要注意的是第一个二分查…...

瑞萨RA4M2使用心得-KEIL5的第一次编译

目录 前言 环境&#xff1a; 开发板&#xff1a;RA-Eco-RA4M2-100PIN-V1.0 IDE&#xff1a;keil5.35 一、软件的下载 编辑瑞萨的芯片&#xff0c;除了keil5 外还需要一个软件&#xff1a;RASC 路径&#xff1a;Releases renesas/fsp (github.com) 向下找到&#xff1a; …...

玄机-apache日志分析

靶场任务 1、提交当天访问次数最多的IP&#xff0c;即黑客IP&#xff1a; 查看apache日志 apache访问日志的位置是&#xff1a;/var/log/apache2/access.log.1 匹配正则算法 首先先cat看看 发现地址都在第一行&#xff0c;直接匹配计算输出 cat access.log.1 |grep -Eo &…...

[C++]洛谷B2119 删除单词后缀

题目与解析 题干题目描述输入格式输出格式样例样例输入样例输出 答案解析食用提示AC代码AC代码详细解析头文件部分主程序8~12行代码 12行以后的代码 题干 题目描述 给定一个单词&#xff0c;如果该单词以 er、ly 或者 ing 后缀结尾&#xff0c;则删除该后缀&#xff08;题目保…...

Ubuntu远程连接Mysql数据库(图文详解)

Ubuntu远程连接Mysql数据库 1、版本2、检查有没有Mysql2.1 查询是否安装了Mysql包2.2 查看Mysql版本2.3 查看Mysql运行状态 3、卸载Mysql4、安装4.1 更新4.2 开始安装4.3 安装完后查看状态 5、登录5.1、使用5.2、查看数据库权限5.3 更新权限5.4 再次查看数据库权限5.5 添加新用…...

回归预测 | Matlab实现NRBO-Transformer-GRU多变量回归预测

回归预测 | Matlab实现NRBO-Transformer-GRU多变量回归预测 目录 回归预测 | Matlab实现NRBO-Transformer-GRU多变量回归预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.【JCR一区级】Matlab实现NRBO-Transformer-GRU多变量回归预测&#xff0c;牛顿-拉夫逊算法优…...

leetcode122-买卖股票的最佳时机II

leetcode 122 思路 方法一的核心思想是简单的贪心策略。我们每天都看当前价格和下一个价格的差值。如果下一个价格高于当前价格&#xff08;即diff > 0&#xff09;&#xff0c;那么就认为当天可以买入并在第二天卖出&#xff0c;赚取利润。因此&#xff0c;方法一把所有…...

from PIL import Image 安装失败

正确安装 Pillow (PIL) # 通过 Conda 安装 conda install pillow -c conda-forge# 或通过 Pip 安装 pip install pillow验证安装 在 Python 中测试是否成功&#xff1a; from PIL import Image print(Image.__version__) # 应输出类似 "9.5.0" 的版本号常见问题说…...

DPFunc蛋白质功能预测模型复现报告

模型简介 模型的具体介绍见蛋白质功能预测论文阅读记录2025&#xff08;DPFunc、ProtCLIP&#xff09;_protein functions-CSDN博客 复现流程 仓库&#xff1a;CSUBioGroup/DPFunc 时间&#xff1a;2025.4.5 环境配置 python 3.9.21 & CUDA 11.6 Pytorch: 1.12.0 DG…...

在 Ubuntu24.04 LTS 上 Docker Compose 部署基于 Dify 重构二开的开源项目 Dify-Plus

一、安装环境信息说明 硬件资源&#xff08;GB 和 GiB 的主要区别在于它们的换算基数不同&#xff0c;GB 使用十进制&#xff0c;GiB 使用二进制&#xff0c;导致相同数值下 GiB 表示的容量略大于 GB&#xff1b;换算关系&#xff1a;1 GiB ≈ 1.07374 GB &#xff1b;1 GB ≈ …...

双系统ubuntu20.04不能外接显示器的解决办法

一&#xff0c;更换驱动 首先确定是不是英伟达显卡驱动&#xff0c;如果不是的话&#xff0c;设置里找到附加驱动&#xff0c;更改为NVIdia类型的驱动&#xff0c;更改完成之后重启 这里大部分电脑都可以了&#xff0c;如果不行 二、更改启动方式 重启之后进入BIOS设置&…...

高并发内存池:原理、设计与多线程性能优化实践

高并发内存池是一种专门为多线程环境设计的内存管理机制&#xff0c;其核心目标是通过优化内存分配和释放过程&#xff0c;解决传统内存分配器&#xff08;如malloc/free&#xff09;在高并发场景下的性能瓶颈&#xff0c;显著提升多线程程序的内存访问效率。 目录 一、核心设计…...

03.31-04.06 论文速递 聚焦具身智能、复杂场景渲染、电影级对话生成等五大前沿领域

&#x1f31f; 论文速递 | 2025.03.31-04.06 &#x1f4e2; 聚焦具身智能、复杂场景渲染、电影级对话生成等前沿领域 1️⃣ 具身智能体&#xff1a;从脑启发到安全协作系统 论文标题&#xff1a; Advances and Challenges in Foundation Agents: From Brain-Inspired Intellige…...

Django和Celery实现的异步任务案例

推荐超级课程: 本地离线DeepSeek AI方案部署实战教程【完全版】Docker快速入门到精通Kubernetes入门到大师通关课AWS云服务快速入门实战目录 先决条件步骤1:安装依赖项步骤2:配置Celery2.1 创建`celery.py`2.2 更新 `__init__.py`步骤3:配置Django设置步骤4:定义Celery任务…...

DAY 38 leetcode 15--哈希表.三数之和

题号15 给你一个整数数组 nums &#xff0c;判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k &#xff0c;同时还满足 nums[i] nums[j] nums[k] 0 。请你返回所有和为 0 且不重复的三元组。 未去重版本 整体思路&#xff1a;先排序再双指针遍…...

Java项目之基于ssm的个性化旅游攻略定制系统(源码+文档)

项目简介 个性化旅游攻略定制系统实现了以下功能&#xff1a; 个性化旅游攻略定制系统能够实现对用户上传信息&#xff0c;旅游路线信息&#xff0c;景点项目信息&#xff0c;景点信息&#xff0c;标签分类信息等信息的管理。 &#x1f495;&#x1f495;作者&#xff1a;落落…...

链表和数组的效率

访问元素 • 数组&#xff1a;通过索引直接访问元素&#xff0c;时间复杂度为O(1)&#xff0c;速度很快。例如arr[5]可以立即访问到数组arr中索引为5的元素。 • 链表&#xff1a;需要从链表头开始逐个遍历节点&#xff0c;直到找到目标元素&#xff0c;平均时间复杂度为O(n)…...

经典回溯问题———组合的输出

题目如下 思路 代码如下...

WPS宏开发手册——附录

目录 系列文章7、附录 系列文章 使用、工程、模块介绍 JSA语法 JSA语法练习题 Excel常用Api Excel实战 常见问题 附录 7、附录 颜色序列&#xff1a;在excel中设置颜色&#xff0c;只能设置颜色序号&#xff0c;不能直接设置rgb颜色 1、黑色 (Black)…...

【leetcode100】买卖股票的最佳时机

1、题目描述 给定一个数组 prices ,它的第 i 个元素 prices[i] 表示一支给定股票第 i 天的价格。 你只能选择 某一天 买入这只股票,并选择在 未来的某一个不同的日子 卖出该股票。设计一个算法来计算你所能获取的最大利润。 返回你可以从这笔交易中获取的最大利润。如果你…...

uniapp小程序登录失效后操作失灵问题

一开始我在请求返回失效验证时做了登录失效处理然后用uni.switchTab跳转主页的逻辑&#xff0c;结果发现在一天后重新打开小程序或者其他登录挤掉登录验证时有概率导致整个页面失灵无法操作。 经过排查发现&#xff0c;在小程序跳转新页面的时候如果遇到**(过快还是过多&#…...

找树左下角的值(DFS 深度优先搜索)| LeetCode 513

✨ 题目描述 给定一个二叉树的 根节点 root&#xff0c;请找出该二叉树的 最底层 最左边 节点的值。 提示&#xff1a; 二叉树中至少有一个节点。 &#x1f4c4; 示例 示例 1 输入: root [2,1,3] 输出: 1示例 2 输入: [1,2,3,4,null,5,6,null,null,7] 输出: 7&#x1f5…...

【力扣hot100题】(060)分割回文串

每次需要判断回文串&#xff0c;这点比之前几题回溯题目复杂一些。 还有我怎么又多写了循环…… class Solution { public:vector<vector<string>> result;string s;bool palindromic(string s){for(int i0;i<s.size()/2;i) if(s[i]!s[s.size()-1-i]) return …...

中国钧瓷收藏市场现状和风险警示

一、数据权威性与综合维度 本榜单由钧瓷联合体、钧瓷频道及钧瓷数据库三方协同制作&#xff0c;通过10项规则综合评估匠人影响力&#xff0c;涵盖知名度、用户评价、平台指数等多元维度&#xff0c;避免单一指标&#xff08;如拍卖价格&#xff09;的片面性。榜单每月更新&…...

forms实现任务文档功能

说明&#xff1a; forms实现任务文档功能 效果图&#xff1a; step1:C:\Users\wangrusheng\RiderProjects\WinFormsApp26\WinFormsApp26\Form1.cs using System; using System.Collections.Generic; using System.ComponentModel; using System.Drawing; using System.IO; u…...

字符串的replace、replaceAll、split()方法

参考 字符串的replace、replaceAll、split()方法_字符串replace-CSDN博客...

Kotlin语言进阶:协程、Flow、Channel详解(二)

Kotlin语言进阶:协程、Flow、Channel详解(二) 一、Flow基础 1.1 什么是Flow Flow是Kotlin提供的用于处理异步数据流的解决方案,它建立在协程之上,具有以下特点: 冷流特性:只有在收集时才会开始发射数据背压处理:自动处理生产者和消费者速度不匹配的问题组合操作:提…...

VBA知识学习

文章目录 打开开发工具编写第一个VBA使用VBA调试器 VBA语法 打开开发工具 文件->选项->自定义功能区->开发工具 编写第一个VBA 点击开发工具 ->点击Visual Basic 选择sheet1->右键->插入->模块 Sub 第一个VBA程序()MsgBox "Hello World!&quo…...

JAVA EE_多线程-初阶(二)

1.线程安全 1.1.观察线程不安全 实例&#xff1a; package thread; public class text18 {//定义一个成员变量count,初始值为0private static int count0;public static void main(String[] args) throws InterruptedException {Thread t1new Thread(()->{for(int i0;i&l…...

【Linux】进程预备知识——冯诺依曼、操作系统

目录&#xff1a; 一、冯诺依曼体系结构 &#xff08;一&#xff09;什么是冯诺依曼 &#xff08;二&#xff09;为什么需要冯诺依曼 &#xff08;三&#xff09;冯诺依曼如何操作 二、操作系统概念 &#xff08;一&#xff09;对下硬件管理 &#xff08;二&#xff09;…...

Java入门首周精要:从基础语法到面向对象核心解析

文章目录 Java入门首周精要&#xff1a;从基础语法到面向对象核心解析1.Java类名与文件名的一致性规则2.Java和C语言中char类型的区别3.Java中的注释4.Java中的‘’‘’运算符5.Java的输入输出6.方法&#xff08;重载&重写&#xff09;方法的重载方法的重写 7.面向对象&…...

嵌入式AI开源生态指南:从框架到应用的全面解析

嵌入式AI开源生态指南&#xff1a;从框架到应用的全面解析 引言 随着人工智能技术的迅速发展&#xff0c;将AI能力部署到边缘设备上的需求日益增长。嵌入式AI通过在资源受限的微控制器上运行机器学习模型&#xff0c;实现了无需云连接的本地智能处理&#xff0c;大幅降低了延…...

MCP server的stdio和SSE分别是什么?

文章目录 一、Stdio:本地进程间通信的核心二、SSE:远程通信与实时推送的利器三、Stdio vs SSE:关键差异对比四、如何选择?场景驱动的决策指南五、实战建议与避坑指南实际操作结语在AI应用开发中,MCP(Model Context Protocol)协议正成为连接大模型与外部资源的核心桥梁。…...

哈希表(闭散列)的实现

目录 概念及定义 闭散列的介绍 闭散列底层实现 哈希表的定义 哈希表的构造 哈希表扩容 哈希表插入 哈希表删除 哈希表查找 概念及定义 哈希表&#xff0c;也成为散列表&#xff0c;在C中unordered_set和unordered_map的底层实现依靠的都是哈希表。 map和set的底层是红…...

Shiro学习(六):Shiro整合CAS实现单点登录

一、单点登录介绍 单点登录&#xff08;Single Sign On&#xff09;&#xff0c;简称为 SSO&#xff0c;是比较流行的企业业务整合的解决方案之一。 SSO的定义是在多个[应用]&#xff0c;用户只需要登录一次就可以访问所有相互信任的应用系统。 一般这种单点登录的实现方案&…...

HAProxy-ACL实战篇

HAProxy-ACL实战篇 IP说明172.25.254.101客户端172.25.254.102haproxy服务器172.25.254.103web1172.25.254.104web2 ACL示例-域名匹配 # 172.25.254.102 [rootRocky ~]# cat /etc/haproxy/conf.d/test.cfg frontend magedu_http_portbind 172.25.254.102:80mode httpbalanc…...

以下是针对该 Ansible 任务的格式检查和优化建议

以下是针对该 Ansible 任务的格式检查和优化建议&#xff1a; 目录 一、格式检查原始代码问题分析修正后的标准格式 二、推荐增强功能1. 添加可执行权限2. 显式指定 Shell 解释器3. 添加错误处理 三、完整 Playbook 示例四、验证脚本兼容性五、常见错误总结 一、格式检查 原始…...

C++语言的测试覆盖率

C语言的测试覆盖率分析与实践 引言 在软件开发过程中&#xff0c;测试覆盖率是一项重要的质量指标&#xff0c;它帮助开发者评估代码的测试效果&#xff0c;确保软件的可靠性与稳定性。尤其在C语言的开发中&#xff0c;由于其复杂的特性和丰富的功能&#xff0c;测试覆盖率的…...

如何使用 DrissionPage 进行网页自动化和爬取

在这个技术博客中&#xff0c;我们将向大家展示如何使用 DrissionPage 进行网页自动化操作与数据爬取。DrissionPage 是一个基于 Playwright 的 Python 自动化工具&#xff0c;它允许我们轻松地控制浏览器进行网页爬取、测试以及自动化操作。与其他工具&#xff08;如 Selenium…...

设计模式 Day 3:抽象工厂模式(Abstract Factory Pattern)详解

经过前两天的学习&#xff0c;我们已经掌握了单例模式与工厂方法模式&#xff0c;理解了如何控制实例个数与如何通过子类封装对象的创建逻辑。 今天&#xff0c;我们将进一步深入“工厂”体系&#xff0c;学习抽象工厂模式&#xff08;Abstract Factory Pattern&#xff09;&a…...

TensorRT 有什么特殊之处

一、TensorRT的定义与核心功能 TensorRT是NVIDIA推出的高性能深度学习推理优化器和运行时库&#xff0c;专注于将训练好的模型在GPU上实现低延迟、高吞吐量的部署。其主要功能包括&#xff1a; 模型优化&#xff1a;通过算子融合&#xff08;合并网络层&#xff09;、消除冗余…...

SQL注入-盲注靶场实战(手写盲注payload)--SRC获得库名即可

布尔盲注 进入页面 注入点 ’ and 11 and 12 得知为布尔盲注 库名长度 and length(database()) 8 抓包&#xff08;浏览器自动进行了url编码&#xff09;爆破 得知为 12 库名字符 1 and ascii(substr(database(),1,1))112 – q &#xff08;这里如果不再次抓包…...

http://noi.openjudge.cn/_2.5基本算法之搜索_1804:小游戏

文章目录 题目深搜代码宽搜代码深搜数据演示图总结 题目 1804:小游戏 总时间限制: 1000ms 内存限制: 65536kB 描述 一天早上&#xff0c;你起床的时候想&#xff1a;“我编程序这么牛&#xff0c;为什么不能靠这个赚点小钱呢&#xff1f;”因此你决定编写一个小游戏。 游戏在一…...

Windows Flip PDF Plus Corporate PDF翻页工具

软件介绍 Flip PDF Plus Corporate是一款功能强大的PDF翻页工具&#xff0c;也被称为名编辑电子杂志大师。这款软件能够迅速将PDF文件转换为具有翻页动画效果的电子书&#xff0c;同时保留原始的超链接和书签。无论是相册、视频、音频&#xff0c;还是Flash、视频和链接&#…...

Java八股文-List

集合的底层是否加锁也就代表是否线程安全 (一)List集合 一、数组 array[1]是如何通过索引找到堆内存中对应的这块数据的呢? (1)数组如何获取其他元素的地址值 (2)为什么数组的索引是从0开始的&#xff0c;不可以从1开始吗 (3)操作数组的时间复杂度 ①查找 根据索引查询 未…...