在智能证券投资的课程体系中4S不仅仅指选股Stock Selection更包含择时Market Timing、仓位管理Position Sizing和风控Stop Loss。很多开发者在编写回测程序时容易陷入一个误区过度拟合单一环节忽略了策略的系统完整性。今天我将结合Python编程实践为你拆解一个完整的4S策略回测系统。我们将不掺杂任何营销话术中立地探讨如何通过代码将选股、择时、仓位、风控四大模块工程化并输出年化收益CAGR与最大回撤Max Drawdown这两个核心评价指标。一、 实际应用场景描述在量化投研的实际工作中我们面临的真实场景往往不是简单的低买高卖而是一个复杂的系统工程1. 多因子选股面对全市场5000只股票如何根据4S标准如基本面、动量、质量因子每日筛选出符合条件的股票池2. 动态仓位管理选出了5只股票是等权分配还是根据评分加权总资金如何在进攻和防守之间切换3. 严格的止损机制当单只个股亏损触及总资金的2%时如何强制平仓这2%是总资金的2%而非个股成本的2%这中间存在杠杆撬动的差异。4. 完整的绩效评估策略跑完3年到底赚了多少最大回撤发生在什么时候本篇将解决的核心痛点如何通过模块化代码实现选股→择时→仓位→风控的闭环回测并精准计算年化收益与最大回撤。二、 引入痛点问题结构化在编写4S完整策略回测时开发者通常会遇到以下三大痛点1. 模块耦合度高选股逻辑、交易逻辑、风控逻辑写在一个大循环里代码超过500行就难以维护更无法灵活调整参数。2. 收益率计算失真简单使用(期末-期初)/期初 无法反映资金曲线的波动且忽略了资金利用率非满仓状态下的真实收益。3. 回撤计算盲区只计算了单日回撤忽略了历史最高点到当前点的累计回撤导致对风险的描述过于乐观。三、 核心逻辑讲解1. 4S策略的闭环逻辑我们的回测引擎将遵循以下执行顺序每日选股Select → 市场择时Time → 资金分配Size → 风险检查Stop → 记录净值Record2. 年化收益CAGR的核心算法公式CAGR \left(\frac{V_{final}}{V_{start}}\right)^{\frac{1}{years}} - 1代码实现要点需要精确计算回测天数处理闰年和非交易日避免用年份差取整导致的误差。3. 最大回撤Max Drawdown的核心算法公式MDD \max\left(\frac{V_{peak} - V_{trough}}{V_{peak}}\right)代码实现要点需要维护一个rolling_max至今最高净值变量。当当前净值 rolling_max时计算回撤比并持续更新全局最大回撤值。四、 代码模块化实现我们将代码分为5个模块结构清晰注释详尽。1. 配置文件 (config.yaml)# 回测全局配置backtest:start_date: 2020-01-01end_date: 2023-12-31initial_capital: 1000000 # 初始资金100万# 4S 选股参数select:momentum_days: 20 # 20日动量volatility_lookback: 20 # 20日波动率# 4S 择时参数time:ma_short: 5ma_long: 20# 4S 仓位参数size:max_positions: 5 # 最多持有5只single_position_pct: 0.20 # 单只最大20%# 4S 风控参数stop:max_loss_of_total_capital: 0.02 # 单只亏总资金2%强制平仓2. 数据加载与预处理 (data_loader.py)import pandas as pdimport numpy as npdef load_price_data(path: str) - pd.DataFrame:加载日频行情数据CSV格式: date, code, open, high, low, close, volume, amountdf pd.read_csv(path, parse_dates[date])df[code] df[code].astype(str).str.zfill(6)# 确保按日期和代码排序return df.sort_values([date, code]).reset_index(dropTrue)def calculate_technical_factors(df: pd.DataFrame) - pd.DataFrame:计算技术因子动量、均线等这是4S选股的核心输入df df.copy()# 20日动量因子df[momentum] df.groupby(code)[close].pct_change(periods20)# 20日波动率因子df[volatility] df.groupby(code)[close].pct_change().rolling(window20).std().reset_index(0, dropTrue)# 5日均线df[ma5] df.groupby(code)[close].transform(lambda x: x.rolling(window5).mean())# 20日均线df[ma20] df.groupby(code)[close].transform(lambda x: x.rolling(window20).mean())return df3. 4S策略引擎 (strategy_engine.py)import pandas as pdimport numpy as npfrom typing import Dict, List, Tupleclass FourSStrategy:★ 4S完整策略引擎Select - Time - Size - Stopdef __init__(self, config: dict):self.config configself.capital config[backtest][initial_capital]self.initial_capital self.capital# 持仓状态: {code: {qty: int, cost: float, entry_date: Timestamp}}self.positions: Dict[str, dict] {}# 净值曲线记录self.nav_history: List[Tuple[pd.Timestamp, float]] []# 交易记录self.trade_log: List[dict] []def run_daily(self, date: pd.Timestamp, daily_data: pd.DataFrame):★ 核心逻辑每日执行4S策略# S1: 选股 (Select) candidates self._select_stocks(daily_data)# S2: 择时 (Time) final_buy, final_sell self._time_market(date, candidates, daily_data)# S3: 仓位 (Size) S4: 风控 (Stop) self._execute_trades(date, final_buy, final_sell, daily_data)# 记录当日净值 self._record_nav(date, daily_data)def _select_stocks(self, df: pd.DataFrame) - pd.DataFrame:选股逻辑动量 0 且 波动率 阈值 (避免高波动陷阱)if len(df) 0:return pd.DataFrame()# 简单4S选股示例动量排名前20且波动率低于80%分位df df.dropna(subset[momentum, volatility, close])if len(df) 5:return pd.DataFrame()# 动量筛选df df[df[momentum] 0].sort_values(momentum, ascendingFalse)# 波动率过滤排除过高波动的标的vol_threshold df[volatility].quantile(0.8)df df[df[volatility] vol_threshold]return df.head(self.config[size][max_positions] * 2) # 预选池def _time_market(self, date, candidates: pd.DataFrame, daily_data: pd.DataFrame) - Tuple[List[str], List[str]]:择时逻辑均线金叉买入死叉卖出buy_list []sell_list []if len(candidates) 0:return [], []for _, row in candidates.iterrows():code row[code]ma5 row.get(ma5, 0)ma20 row.get(ma20, 0)close row.get(close, 0)if pd.isna(ma5) or pd.isna(ma20) or ma5 0 or ma20 0:continue# ★ 买入信号短均 长均 (金叉区域)if ma5 ma20 * 1.005: # 0.5%缓冲带防止噪音if code not in self.positions:buy_list.append(code)# ★ 卖出信号短均 长均 (死叉区域) 或 持有标的不再在候选池if code in self.positions:if ma5 ma20 * 0.995:sell_list.append(code)# 检查不在候选池的持仓基本面恶化加入卖出列表current_codes set(candidates[code].tolist())for code in list(self.positions.keys()):if code not in current_codes:if code not in sell_list:sell_list.append(code)return buy_list, sell_listdef _execute_trades(self, date, buy_list, sell_list, daily_data: pd.DataFrame):执行交易包含仓位管理和风控检查# ★ 先执行卖出释放资金for code in sell_list:if code in self.positions:self._sell_stock(date, code, daily_data)# ★ 再执行买入使用释放的资金available_codes [c for c in buy_list if c not in self.positions]if available_codes and len(self.positions) self.config[size][max_positions]:self._buy_stock(date, available_codes, daily_data)def _buy_stock(self, date, codes: List[str], daily_data: pd.DataFrame):买入逻辑等权分配可用资金n_slots self.config[size][max_positions]current_pos len(self.positions)remaining_slots n_slots - current_posif remaining_slots 0 or not codes:return# 等权分配alloc_per_stock self.capital / n_slotsfor code in codes[:remaining_slots]:row daily_data[daily_data[code] code]if len(row) 0:continueprice row.iloc[0][close]if price 0:continueqty int(alloc_per_stock / price / 100) * 100 # 整百股if qty 0:continuecost qty * price * 1.0003 # 佣金if cost self.capital:continueself.capital - costself.positions[code] {qty: qty,cost: price,entry_date: date}self.trade_log.append({date: date, code: code, action: BUY,price: price, qty: qty, cost: cost})def _sell_stock(self, date, code: str, daily_data: pd.DataFrame):卖出逻辑含风控检查if code not in self.positions:returnpos self.positions[code]row daily_data[daily_data[code] code]if len(row) 0:returnprice row.iloc[0][close]if price 0:price pos[cost] * 0.95 # 极端情况用成本价估算revenue pos[qty] * price * (1 - 0.0003 - 0.001) # 佣金印花税self.capital revenuepnl (price - pos[cost]) / pos[cost] * 100self.trade_log.append({date: date, code: code, action: SELL,price: price, qty: pos[qty], pnl_pct: pnl})del self.positions[code]def _record_nav(self, date, daily_data: pd.DataFrame):记录当日净值market_value self.capitalfor code, pos in self.positions.items():row daily_data[daily_data[code] code]if len(row) 0:price row.iloc[0][close]if price 0:market_value pos[qty] * priceself.nav_history.append((date, market_value))def risk_check(self, daily_data: pd.DataFrame) - List[str]:★ 风控检查单只亏损超总资金2%强制调出这是4S中Stop的核心实现total_capital self.initial_capitalforced_sell []for code, pos in self.positions.items():row daily_data[daily_data[code] code]if len(row) 0:continuecurrent_price row.iloc[0][close]if current_price 0:continue# ★ 核心公式个股亏损对总资金的影响loss_impact (current_price - pos[cost]) * pos[qty] / total_capital# 触发风控亏损超过总资金的2%if loss_impact -self.config[stop][max_loss_of_total_capital]:forced_sell.append(code)pnl (current_price - pos[cost]) / pos[cost] * 100print(f [风控] {code} 触发强制平仓: f开仓 ¥{pos[cost]:.2f} → 当前 ¥{current_price:.2f}, f个股亏损 {pnl:.1f}%, 占总资金 {loss_impact*100:.2f}%)return forced_sell4. 绩效计算模块 (metrics.py)import pandas as pdimport numpy as npdef calculate_cagr(nav_series: pd.Series) - float:★ 计算年化收益率 (CAGR)CAGR (V_final / V_start)^(1/years) - 1if len(nav_series) 2:return 0.0start_val nav_series.iloc[0]end_val nav_series.iloc[-1]if start_val 0:return 0.0# ★ 精确计算年数考虑实际天数days (nav_series.index[-1] - nav_series.index[0]).daysif days 0:return 0.0years days / 365.25cagr (end_val / start_val) ** (1.0 / years) - 1.0return round(cagr * 100, 2)def calculate_max_drawdown(nav_series: pd.Series) - Tuple[float, str, str]:★ 计算最大回撤 (Max Drawdown)MDD max((Peak - Trough) / Peak)返回: (最大回撤百分比, 回撤开始日期, 回撤结束日期)if len(nav_series) 2:return 0.0, , nav nav_series.valuesdates nav_series.indexmax_dd 0.0peak_date trough_date peak_val nav[0]peak_idx 0for i in range(1, len(nav)):if nav[i] peak_val:peak_val nav[i]peak_idx idd (peak_val - nav[i]) / peak_valif dd max_dd:max_dd ddpeak_date str(dates[peak_idx].strftime(%Y-%m-%d))trough_date str(dates[i].strftime(%Y-%m-%d))return round(max_dd * 100, 2), peak_date, trough_datedef calculate_sharpe(nav_series: pd.Series, risk_free_rate: float 0.025) - float:计算夏普比率if len(nav_series) 2:return 0.0ret nav_series.pct_change().dropna()if len(ret) 2:return 0.0days (nav_series.index[-1] - nav_series.index[0]).daysyrs max(days / 365.25, 1/365.25)ann_ret (nav_series.iloc[-1] / nav_series.iloc[0]) ** (1/yrs) - 1ann_vol ret.std() * np.sqrt(252)if ann_vol 1e-10:return 0.0sharpe (ann_ret - risk_free_rate) / ann_volreturn round(sharpe, 3)def print_performance_report(nav_series: pd.Series, strategy_name: str 4S策略):打印完整绩效报告cagr calculate_cagr(nav_series)mdd, peak_d, trough_d calculate_max_drawdown(nav_series)sharpe calculate_sharpe(nav_series)total_ret (nav_series.iloc[-1] / nav_series.iloc[0] - 1) * 100print(f\n{*60})print(f {strategy_name} 绩效报告)print(f{*60})print(f 期初净值: ¥{nav_series.iloc[0]:,.2f})print(f 期末净值: ¥{nav_series.iloc[-1]:,.2f})print(f 累计收益: {total_ret:.2f}%)print(f ★ 年化收益: {cagr:.2f}%)print(f ★ 最大回撤: {mdd:.2f}%)print(f ├─ 回撤开始: {peak_d})print(f └─ 回撤探底: {trough_d})print(f ★ 夏普比率: {sharpe:.3f})print(f{*60}\n)return {cagr: cagr,max_drawdown: mdd,sharpe: sharpe,total_return: round(total_ret, 2)}5. 主回测入口 (main.py)import yamlimport pandas as pdfrom pathlib import Pathfrom data_loader import load_price_data, calculate_technical_factorsfrom strategy_engine import FourSStrategyfrom metrics import print_performance_reportdef load_config(path: str config.yaml) - dict:with open(path, r, encodingutf-8) as f:return yaml.safe_load(f)def main():config load_config()# 1. 加载数据print( 加载行情数据...)df load_price_data(data/daily_prices.csv)df calculate_technical_factors(df)# 2. 初始化策略strategy FourSStrategy(config)# 3. 按日循环回测unique_dates df[date].unique()print(f\n{*60})print(f 开始回测: {unique_dates[0].strftime(%Y-%m-%d)} ~ f{unique_dates[-1].strftime(%Y-%m-%d)})print(f 初始资金: ¥{config[backtest][initial_capital]:,.0f})print(f{*60}\n)for i, date in enumerate(unique_dates):daily df[df[date] date]# ★ 先检查风控针对持仓forced_sell strategy.risk_check(daily)if forced_sell:for code in forced_sell:strategy._sell_stock(date, code, daily)# ★ 再执行正常4S策略strategy.run_daily(date, daily)if i % 100 0:nav strategy.nav_history[-1][1] if strategy.nav_history else 0pos_count len(strategy.positions)print(f [{i1}/{len(unique_dates)}] {date.strftime(%Y-%m-%d)} | f持仓:{pos_count} | 净值:¥{nav:,.0f})# 4. 生成净值序列dates [d for d, v in strategy.nav_history]values [v for d, v in strategy.nav_history]nav_series pd.Series(values, indexdates)# 5. 输出核心指标metrics print_performance_report(nav_series, 4S完整策略)# 6. 保存结果Path(output).mkdir(exist_okTrue)nav_series.to_csv(output/nav_curve.csv, header[nav], index_labeldate)print(f✅ 回测完成)print(f 年化收益: {metrics[cagr]:.2f}%)print(f 最大回撤: {metrics[max_drawdown]:.2f}%)print(f 夏普比率: {metrics[sharpe]:.3f})if __name__ __main__:main()五、 README 文件与使用说明# 4S 完整策略历史回测系统## 项目简介本系统实现选股(Select) → 择时(Time) → 仓位(Size) → 风控(Stop)完整闭环输出年化收益、最大回撤、夏普比率三大核心指标。## 安装依赖bashpip install pandas numpy pyyaml matplotlib## 使用说明1. 准备数据: data/daily_prices.csv (date, code, open, high, low, close, volume)2. 修改配置: config.yaml (调整选股因子、仓位上限、风控线)3. 运行回测: python main.py## 核心输出- 年化收益率 (CAGR)- 最大回撤 (Max Drawdown) 及起止日期- 夏普比率 (Sharpe Ratio)- 净值曲线 CSV## 参数调优建议- 选股因子周期: 20日动量适合中线5日适合短线- 风控线: 2%为中性激进可放宽至3%保守收紧至1%- 仓位上限: 5只为宜过多分散收益过少集中风险六、 核心知识点卡片┌──────────────────────────────────────────────────────┐│ 4S 完整策略回测 — 核心知识 │├──────────────┬───────────────────────────────────────┤│ 选股(Select) │ 动量因子波动率过滤筛选优质标的 ││ 择时(Time) │ 均线金叉/死叉决定买卖时机 ││ 仓位(Size) │ 等权/加权分配控制单只最大占比 ││ 风控(Stop) │ 个股亏损超总资金2% → 强制平仓 ││ 年化收益(CAGR)│ (期末/期初)^(1/年数) - 1 ││ 最大回撤(MDD) │ max((峰值-谷值)/峰值) ││ 夏普比率 │ (年化收益-无风险利率) / 年化波动 ││ 回测陷阱 │ 未来函数、幸存者偏差、过拟合 ││ 核心原则 │ 风控线不是限制收益是保住本金 │└──────────────┴───────────────────────────────────────┘七、 免责声明与风险提示⚠️ 免责声明本代码仅供学习、研究与量化教学用途不构成任何投资建议或投资决策依据。模拟数据为随机数生成不代表任何真实标的历史或未来表现。⚠️ 风险提示- 回测结果不代表实盘表现存在滑点、冲击成本等未建模因素- 2%风控线为经验值不同策略/资金规模需动态调整- 均线择时存在滞后性单边下跌市中可能无法及时止损- 动量因子在震荡市中容易频繁切换产生大量交易成本- 本系统未进行参数优化直接用于实盘可能导致亏损八、 总结编写一个完整的4S策略回测系统核心不在于写出多复杂的数学公式而在于工程化的闭环思维1. 模块化选股、择时、仓位、风控四权分立便于单独调参和单元测试2. 年化收益用(期末/期初)^(1/实际年数) - 1而非简单除法才是真实的CAGR3. 最大回撤维护rolling_max变量追踪历史最高点→当前的回撤幅度而非单日波动4. 风控2%计算的是个股亏损/总资金不是个股亏损/个股成本这是大资金管理的核心视角核心原则回测的意义不是证明策略能赚钱而是量化你在不同市场环境下的风险暴露。一个能清晰输出年化收益和最大回撤的系统才是合格的策略研发基础设施。本文代码仅供学习与技术交流不构成任何投资建议股市有风险入市需谨慎利用AI解决实际问题如果你觉得这个工具好用欢迎关注长安牧笛
实现4S完整策略历史回测程序,输出年化收益,最大回撤核心指标。
发布时间:2026/6/21 15:26:30
在智能证券投资的课程体系中4S不仅仅指选股Stock Selection更包含择时Market Timing、仓位管理Position Sizing和风控Stop Loss。很多开发者在编写回测程序时容易陷入一个误区过度拟合单一环节忽略了策略的系统完整性。今天我将结合Python编程实践为你拆解一个完整的4S策略回测系统。我们将不掺杂任何营销话术中立地探讨如何通过代码将选股、择时、仓位、风控四大模块工程化并输出年化收益CAGR与最大回撤Max Drawdown这两个核心评价指标。一、 实际应用场景描述在量化投研的实际工作中我们面临的真实场景往往不是简单的低买高卖而是一个复杂的系统工程1. 多因子选股面对全市场5000只股票如何根据4S标准如基本面、动量、质量因子每日筛选出符合条件的股票池2. 动态仓位管理选出了5只股票是等权分配还是根据评分加权总资金如何在进攻和防守之间切换3. 严格的止损机制当单只个股亏损触及总资金的2%时如何强制平仓这2%是总资金的2%而非个股成本的2%这中间存在杠杆撬动的差异。4. 完整的绩效评估策略跑完3年到底赚了多少最大回撤发生在什么时候本篇将解决的核心痛点如何通过模块化代码实现选股→择时→仓位→风控的闭环回测并精准计算年化收益与最大回撤。二、 引入痛点问题结构化在编写4S完整策略回测时开发者通常会遇到以下三大痛点1. 模块耦合度高选股逻辑、交易逻辑、风控逻辑写在一个大循环里代码超过500行就难以维护更无法灵活调整参数。2. 收益率计算失真简单使用(期末-期初)/期初 无法反映资金曲线的波动且忽略了资金利用率非满仓状态下的真实收益。3. 回撤计算盲区只计算了单日回撤忽略了历史最高点到当前点的累计回撤导致对风险的描述过于乐观。三、 核心逻辑讲解1. 4S策略的闭环逻辑我们的回测引擎将遵循以下执行顺序每日选股Select → 市场择时Time → 资金分配Size → 风险检查Stop → 记录净值Record2. 年化收益CAGR的核心算法公式CAGR \left(\frac{V_{final}}{V_{start}}\right)^{\frac{1}{years}} - 1代码实现要点需要精确计算回测天数处理闰年和非交易日避免用年份差取整导致的误差。3. 最大回撤Max Drawdown的核心算法公式MDD \max\left(\frac{V_{peak} - V_{trough}}{V_{peak}}\right)代码实现要点需要维护一个rolling_max至今最高净值变量。当当前净值 rolling_max时计算回撤比并持续更新全局最大回撤值。四、 代码模块化实现我们将代码分为5个模块结构清晰注释详尽。1. 配置文件 (config.yaml)# 回测全局配置backtest:start_date: 2020-01-01end_date: 2023-12-31initial_capital: 1000000 # 初始资金100万# 4S 选股参数select:momentum_days: 20 # 20日动量volatility_lookback: 20 # 20日波动率# 4S 择时参数time:ma_short: 5ma_long: 20# 4S 仓位参数size:max_positions: 5 # 最多持有5只single_position_pct: 0.20 # 单只最大20%# 4S 风控参数stop:max_loss_of_total_capital: 0.02 # 单只亏总资金2%强制平仓2. 数据加载与预处理 (data_loader.py)import pandas as pdimport numpy as npdef load_price_data(path: str) - pd.DataFrame:加载日频行情数据CSV格式: date, code, open, high, low, close, volume, amountdf pd.read_csv(path, parse_dates[date])df[code] df[code].astype(str).str.zfill(6)# 确保按日期和代码排序return df.sort_values([date, code]).reset_index(dropTrue)def calculate_technical_factors(df: pd.DataFrame) - pd.DataFrame:计算技术因子动量、均线等这是4S选股的核心输入df df.copy()# 20日动量因子df[momentum] df.groupby(code)[close].pct_change(periods20)# 20日波动率因子df[volatility] df.groupby(code)[close].pct_change().rolling(window20).std().reset_index(0, dropTrue)# 5日均线df[ma5] df.groupby(code)[close].transform(lambda x: x.rolling(window5).mean())# 20日均线df[ma20] df.groupby(code)[close].transform(lambda x: x.rolling(window20).mean())return df3. 4S策略引擎 (strategy_engine.py)import pandas as pdimport numpy as npfrom typing import Dict, List, Tupleclass FourSStrategy:★ 4S完整策略引擎Select - Time - Size - Stopdef __init__(self, config: dict):self.config configself.capital config[backtest][initial_capital]self.initial_capital self.capital# 持仓状态: {code: {qty: int, cost: float, entry_date: Timestamp}}self.positions: Dict[str, dict] {}# 净值曲线记录self.nav_history: List[Tuple[pd.Timestamp, float]] []# 交易记录self.trade_log: List[dict] []def run_daily(self, date: pd.Timestamp, daily_data: pd.DataFrame):★ 核心逻辑每日执行4S策略# S1: 选股 (Select) candidates self._select_stocks(daily_data)# S2: 择时 (Time) final_buy, final_sell self._time_market(date, candidates, daily_data)# S3: 仓位 (Size) S4: 风控 (Stop) self._execute_trades(date, final_buy, final_sell, daily_data)# 记录当日净值 self._record_nav(date, daily_data)def _select_stocks(self, df: pd.DataFrame) - pd.DataFrame:选股逻辑动量 0 且 波动率 阈值 (避免高波动陷阱)if len(df) 0:return pd.DataFrame()# 简单4S选股示例动量排名前20且波动率低于80%分位df df.dropna(subset[momentum, volatility, close])if len(df) 5:return pd.DataFrame()# 动量筛选df df[df[momentum] 0].sort_values(momentum, ascendingFalse)# 波动率过滤排除过高波动的标的vol_threshold df[volatility].quantile(0.8)df df[df[volatility] vol_threshold]return df.head(self.config[size][max_positions] * 2) # 预选池def _time_market(self, date, candidates: pd.DataFrame, daily_data: pd.DataFrame) - Tuple[List[str], List[str]]:择时逻辑均线金叉买入死叉卖出buy_list []sell_list []if len(candidates) 0:return [], []for _, row in candidates.iterrows():code row[code]ma5 row.get(ma5, 0)ma20 row.get(ma20, 0)close row.get(close, 0)if pd.isna(ma5) or pd.isna(ma20) or ma5 0 or ma20 0:continue# ★ 买入信号短均 长均 (金叉区域)if ma5 ma20 * 1.005: # 0.5%缓冲带防止噪音if code not in self.positions:buy_list.append(code)# ★ 卖出信号短均 长均 (死叉区域) 或 持有标的不再在候选池if code in self.positions:if ma5 ma20 * 0.995:sell_list.append(code)# 检查不在候选池的持仓基本面恶化加入卖出列表current_codes set(candidates[code].tolist())for code in list(self.positions.keys()):if code not in current_codes:if code not in sell_list:sell_list.append(code)return buy_list, sell_listdef _execute_trades(self, date, buy_list, sell_list, daily_data: pd.DataFrame):执行交易包含仓位管理和风控检查# ★ 先执行卖出释放资金for code in sell_list:if code in self.positions:self._sell_stock(date, code, daily_data)# ★ 再执行买入使用释放的资金available_codes [c for c in buy_list if c not in self.positions]if available_codes and len(self.positions) self.config[size][max_positions]:self._buy_stock(date, available_codes, daily_data)def _buy_stock(self, date, codes: List[str], daily_data: pd.DataFrame):买入逻辑等权分配可用资金n_slots self.config[size][max_positions]current_pos len(self.positions)remaining_slots n_slots - current_posif remaining_slots 0 or not codes:return# 等权分配alloc_per_stock self.capital / n_slotsfor code in codes[:remaining_slots]:row daily_data[daily_data[code] code]if len(row) 0:continueprice row.iloc[0][close]if price 0:continueqty int(alloc_per_stock / price / 100) * 100 # 整百股if qty 0:continuecost qty * price * 1.0003 # 佣金if cost self.capital:continueself.capital - costself.positions[code] {qty: qty,cost: price,entry_date: date}self.trade_log.append({date: date, code: code, action: BUY,price: price, qty: qty, cost: cost})def _sell_stock(self, date, code: str, daily_data: pd.DataFrame):卖出逻辑含风控检查if code not in self.positions:returnpos self.positions[code]row daily_data[daily_data[code] code]if len(row) 0:returnprice row.iloc[0][close]if price 0:price pos[cost] * 0.95 # 极端情况用成本价估算revenue pos[qty] * price * (1 - 0.0003 - 0.001) # 佣金印花税self.capital revenuepnl (price - pos[cost]) / pos[cost] * 100self.trade_log.append({date: date, code: code, action: SELL,price: price, qty: pos[qty], pnl_pct: pnl})del self.positions[code]def _record_nav(self, date, daily_data: pd.DataFrame):记录当日净值market_value self.capitalfor code, pos in self.positions.items():row daily_data[daily_data[code] code]if len(row) 0:price row.iloc[0][close]if price 0:market_value pos[qty] * priceself.nav_history.append((date, market_value))def risk_check(self, daily_data: pd.DataFrame) - List[str]:★ 风控检查单只亏损超总资金2%强制调出这是4S中Stop的核心实现total_capital self.initial_capitalforced_sell []for code, pos in self.positions.items():row daily_data[daily_data[code] code]if len(row) 0:continuecurrent_price row.iloc[0][close]if current_price 0:continue# ★ 核心公式个股亏损对总资金的影响loss_impact (current_price - pos[cost]) * pos[qty] / total_capital# 触发风控亏损超过总资金的2%if loss_impact -self.config[stop][max_loss_of_total_capital]:forced_sell.append(code)pnl (current_price - pos[cost]) / pos[cost] * 100print(f [风控] {code} 触发强制平仓: f开仓 ¥{pos[cost]:.2f} → 当前 ¥{current_price:.2f}, f个股亏损 {pnl:.1f}%, 占总资金 {loss_impact*100:.2f}%)return forced_sell4. 绩效计算模块 (metrics.py)import pandas as pdimport numpy as npdef calculate_cagr(nav_series: pd.Series) - float:★ 计算年化收益率 (CAGR)CAGR (V_final / V_start)^(1/years) - 1if len(nav_series) 2:return 0.0start_val nav_series.iloc[0]end_val nav_series.iloc[-1]if start_val 0:return 0.0# ★ 精确计算年数考虑实际天数days (nav_series.index[-1] - nav_series.index[0]).daysif days 0:return 0.0years days / 365.25cagr (end_val / start_val) ** (1.0 / years) - 1.0return round(cagr * 100, 2)def calculate_max_drawdown(nav_series: pd.Series) - Tuple[float, str, str]:★ 计算最大回撤 (Max Drawdown)MDD max((Peak - Trough) / Peak)返回: (最大回撤百分比, 回撤开始日期, 回撤结束日期)if len(nav_series) 2:return 0.0, , nav nav_series.valuesdates nav_series.indexmax_dd 0.0peak_date trough_date peak_val nav[0]peak_idx 0for i in range(1, len(nav)):if nav[i] peak_val:peak_val nav[i]peak_idx idd (peak_val - nav[i]) / peak_valif dd max_dd:max_dd ddpeak_date str(dates[peak_idx].strftime(%Y-%m-%d))trough_date str(dates[i].strftime(%Y-%m-%d))return round(max_dd * 100, 2), peak_date, trough_datedef calculate_sharpe(nav_series: pd.Series, risk_free_rate: float 0.025) - float:计算夏普比率if len(nav_series) 2:return 0.0ret nav_series.pct_change().dropna()if len(ret) 2:return 0.0days (nav_series.index[-1] - nav_series.index[0]).daysyrs max(days / 365.25, 1/365.25)ann_ret (nav_series.iloc[-1] / nav_series.iloc[0]) ** (1/yrs) - 1ann_vol ret.std() * np.sqrt(252)if ann_vol 1e-10:return 0.0sharpe (ann_ret - risk_free_rate) / ann_volreturn round(sharpe, 3)def print_performance_report(nav_series: pd.Series, strategy_name: str 4S策略):打印完整绩效报告cagr calculate_cagr(nav_series)mdd, peak_d, trough_d calculate_max_drawdown(nav_series)sharpe calculate_sharpe(nav_series)total_ret (nav_series.iloc[-1] / nav_series.iloc[0] - 1) * 100print(f\n{*60})print(f {strategy_name} 绩效报告)print(f{*60})print(f 期初净值: ¥{nav_series.iloc[0]:,.2f})print(f 期末净值: ¥{nav_series.iloc[-1]:,.2f})print(f 累计收益: {total_ret:.2f}%)print(f ★ 年化收益: {cagr:.2f}%)print(f ★ 最大回撤: {mdd:.2f}%)print(f ├─ 回撤开始: {peak_d})print(f └─ 回撤探底: {trough_d})print(f ★ 夏普比率: {sharpe:.3f})print(f{*60}\n)return {cagr: cagr,max_drawdown: mdd,sharpe: sharpe,total_return: round(total_ret, 2)}5. 主回测入口 (main.py)import yamlimport pandas as pdfrom pathlib import Pathfrom data_loader import load_price_data, calculate_technical_factorsfrom strategy_engine import FourSStrategyfrom metrics import print_performance_reportdef load_config(path: str config.yaml) - dict:with open(path, r, encodingutf-8) as f:return yaml.safe_load(f)def main():config load_config()# 1. 加载数据print( 加载行情数据...)df load_price_data(data/daily_prices.csv)df calculate_technical_factors(df)# 2. 初始化策略strategy FourSStrategy(config)# 3. 按日循环回测unique_dates df[date].unique()print(f\n{*60})print(f 开始回测: {unique_dates[0].strftime(%Y-%m-%d)} ~ f{unique_dates[-1].strftime(%Y-%m-%d)})print(f 初始资金: ¥{config[backtest][initial_capital]:,.0f})print(f{*60}\n)for i, date in enumerate(unique_dates):daily df[df[date] date]# ★ 先检查风控针对持仓forced_sell strategy.risk_check(daily)if forced_sell:for code in forced_sell:strategy._sell_stock(date, code, daily)# ★ 再执行正常4S策略strategy.run_daily(date, daily)if i % 100 0:nav strategy.nav_history[-1][1] if strategy.nav_history else 0pos_count len(strategy.positions)print(f [{i1}/{len(unique_dates)}] {date.strftime(%Y-%m-%d)} | f持仓:{pos_count} | 净值:¥{nav:,.0f})# 4. 生成净值序列dates [d for d, v in strategy.nav_history]values [v for d, v in strategy.nav_history]nav_series pd.Series(values, indexdates)# 5. 输出核心指标metrics print_performance_report(nav_series, 4S完整策略)# 6. 保存结果Path(output).mkdir(exist_okTrue)nav_series.to_csv(output/nav_curve.csv, header[nav], index_labeldate)print(f✅ 回测完成)print(f 年化收益: {metrics[cagr]:.2f}%)print(f 最大回撤: {metrics[max_drawdown]:.2f}%)print(f 夏普比率: {metrics[sharpe]:.3f})if __name__ __main__:main()五、 README 文件与使用说明# 4S 完整策略历史回测系统## 项目简介本系统实现选股(Select) → 择时(Time) → 仓位(Size) → 风控(Stop)完整闭环输出年化收益、最大回撤、夏普比率三大核心指标。## 安装依赖bashpip install pandas numpy pyyaml matplotlib## 使用说明1. 准备数据: data/daily_prices.csv (date, code, open, high, low, close, volume)2. 修改配置: config.yaml (调整选股因子、仓位上限、风控线)3. 运行回测: python main.py## 核心输出- 年化收益率 (CAGR)- 最大回撤 (Max Drawdown) 及起止日期- 夏普比率 (Sharpe Ratio)- 净值曲线 CSV## 参数调优建议- 选股因子周期: 20日动量适合中线5日适合短线- 风控线: 2%为中性激进可放宽至3%保守收紧至1%- 仓位上限: 5只为宜过多分散收益过少集中风险六、 核心知识点卡片┌──────────────────────────────────────────────────────┐│ 4S 完整策略回测 — 核心知识 │├──────────────┬───────────────────────────────────────┤│ 选股(Select) │ 动量因子波动率过滤筛选优质标的 ││ 择时(Time) │ 均线金叉/死叉决定买卖时机 ││ 仓位(Size) │ 等权/加权分配控制单只最大占比 ││ 风控(Stop) │ 个股亏损超总资金2% → 强制平仓 ││ 年化收益(CAGR)│ (期末/期初)^(1/年数) - 1 ││ 最大回撤(MDD) │ max((峰值-谷值)/峰值) ││ 夏普比率 │ (年化收益-无风险利率) / 年化波动 ││ 回测陷阱 │ 未来函数、幸存者偏差、过拟合 ││ 核心原则 │ 风控线不是限制收益是保住本金 │└──────────────┴───────────────────────────────────────┘七、 免责声明与风险提示⚠️ 免责声明本代码仅供学习、研究与量化教学用途不构成任何投资建议或投资决策依据。模拟数据为随机数生成不代表任何真实标的历史或未来表现。⚠️ 风险提示- 回测结果不代表实盘表现存在滑点、冲击成本等未建模因素- 2%风控线为经验值不同策略/资金规模需动态调整- 均线择时存在滞后性单边下跌市中可能无法及时止损- 动量因子在震荡市中容易频繁切换产生大量交易成本- 本系统未进行参数优化直接用于实盘可能导致亏损八、 总结编写一个完整的4S策略回测系统核心不在于写出多复杂的数学公式而在于工程化的闭环思维1. 模块化选股、择时、仓位、风控四权分立便于单独调参和单元测试2. 年化收益用(期末/期初)^(1/实际年数) - 1而非简单除法才是真实的CAGR3. 最大回撤维护rolling_max变量追踪历史最高点→当前的回撤幅度而非单日波动4. 风控2%计算的是个股亏损/总资金不是个股亏损/个股成本这是大资金管理的核心视角核心原则回测的意义不是证明策略能赚钱而是量化你在不同市场环境下的风险暴露。一个能清晰输出年化收益和最大回撤的系统才是合格的策略研发基础设施。本文代码仅供学习与技术交流不构成任何投资建议股市有风险入市需谨慎利用AI解决实际问题如果你觉得这个工具好用欢迎关注长安牧笛