4S风控模块编写程序,单只个股亏损超总资金2%,强制调出持仓。 严格聚焦 4S 风控模块单只个股亏损超总资金 2%强制调出持仓4S 风控模块单只亏损 2% 强制调出一、实际应用场景描述在量化实盘中选股 → 仓位分配 → 风控 是完整闭环。即使前两步做得再好如果风控缺失一次黑天鹅就能让账户伤筋动骨。典型场景场景 问题某重仓股突发利空连续 3 个跌停 单只票亏掉总资金的 8%回本需要 50%持仓 10 只其中 2 只持续阴跌 亏损悄无声息地侵蚀总资金止损设个股 -5%但仓位 30% 单只 -5% 总资金 -1.5%看起来不多但 3 只同时触发 -4.5%盘后看账户才发现超亏 没有实时监控止损滞后核心矛盾个股的止损线 ≠ 对总资金的影响。必须用总资金视角设定风控红线。二、引入痛点问题结构化层级 痛点 后果概念层 止损 5% 是模糊表述 5% 的什么个股总资金计算层 亏损金额 vs 亏损比例混淆 大仓位 -3% 比小仓位 -8% 更致命执行层 触发止损后再等等看 纪律被情绪瓦解系统层 无集中监控 盘后才知道超亏教学层 讲义只讲要止损不讲怎么算、怎么执行 学生不知如何工程化本质结论亏损不超过总资金 2%不是一句口号它需要一个精确的实时计算引擎。三、核心逻辑讲解3.1 核心公式个股对总资金的亏损贡献 当前价 - 开仓价× 持仓数量亏损占比 个股亏损金额 / 总资金触发条件亏损占比 -2%即 -0.023.2 为什么是总资金的 2%参数 含义 推导逻辑2% 单只最大亏损占比 10 只等权单只 2% × 10 极端情况 -20%1% 保守风格 10 只 × 1% -10%3% 激进风格 适合高夏普策略0.5% 超保守大资金 单只 0.5% × 20 只 -10%3.3 风控检查时机┌──────────────────────────────────────────────────────┐│ 风控检查触发时机 │├──────────────────────────────────────────────────────┤│ ││ ① 盘前检查Pre-Market ││ 用昨日收盘价计算提前标记风险持仓 ││ ││ ② 盘中实时监控Intraday ││ 每分钟或每 tick重新计算 ││ 超 1.5% → 预警yellow ││ 超 2.0% → 强制卖出red ││ ││ ③ 盘后复盘After-Market ││ 用收盘价做最终清算生成风控报告 ││ │└──────────────────────────────────────────────────────┘3.4 执行机制强制调出流程个股亏损占比 -2%↓★ 触发风控卖出信号↓检查持仓数量- 持仓 ≥ 2 只 → 全部卖出该标的- 持仓 1 只 → 卖出 80%留 20% 观察可选↓释放的资金 → 进入待分配池↓下一交易日 → 重新参与仓位分配↓记录风控事件日志用于复盘3.5 进阶分级预警级别 阈值 动作 预警 亏损达 -1.5% 发通知标记关注 强制 亏损达 -2.0% 立即卖出不可撤销 紧急 单日跌超 -7% 盘中紧急卖出跳过 2% 阈值四、项目结构risk_control/├── README.md├── requirements.txt├── config.yaml├── data/│ ├── positions.csv # 持仓数据含开仓价│ └── daily_prices.csv # 日频行情├── src/│ ├── loss_calculator.py # ★ 亏损计算引擎│ ├── risk_monitor.py # ★ 实时监控 分级预警│ ├── force_exit.py # ★ 强制调出执行器│ ├── risk_reporter.py # 风控事件报告│ ├── risk_backtester.py # 风控回测对比│ └── visualizer.py # 可视化├── main.py└── output/├── risk_events.csv # 风控事件日志└── risk_report.txt # 风控报告五、完整代码模块化 清晰注释requirements.txtpandas1.5numpy1.21matplotlib3.5seaborn0.12pyyaml6.0config.yaml# 4S 风控模块配置# ★ 核心风控参数risk:max_loss_pct: 0.02 # 单只最大亏损占总资金 2%warning_pct: 0.015 # 预警线 1.5%emergency_drop_pct: 0.07 # 单日暴跌 7% 紧急卖出enabled: true# 执行参数execution:sell_all_on_trigger: true # 触发后全部卖出true或保留部分falsekeep_pct_if_single: 0.20 # 若只剩 1 只持仓保留 20%commission_rate: 0.0003stamp_tax_rate: 0.001# 回测对比backtest:initial_capital: 1000000compare_without_risk: true # 是否运行无风控对比组# 输出output:events_path: output/risk_events.csvreport_path: output/risk_report.txtsrc/loss_calculator.py★ 核心loss_calculator.py★ 亏损计算引擎核心功能1. 实时计算每只持仓的盈亏金额和占比2. 判断是否触发风控阈值3. 支持盘前 / 盘中 / 盘后三种模式import pandas as pdimport numpy as npfrom typing import Dict, List, Optional, Tuplefrom dataclasses import dataclassimport logginglogging.basicConfig(levellogging.INFO, format%(asctime)s [%(levelname)s] %(message)s)logger logging.getLogger(__name__)dataclassclass PositionInfo:持仓信息数据类code: stropen_date: pd.Timestampopen_price: floatqty: intcurrent_price: float 0.0capital: float 0.0 # 开仓时分配的资金class LossCalculator:★ 亏损计算引擎核心公式持仓市值 当前价 × 持仓数量开仓成本 开仓价 × 持仓数量浮动盈亏 持仓市值 - 开仓成本盈亏占比 浮动盈亏 / 总资金触发条件盈亏占比 -max_loss_pctdef __init__(self, max_loss_pct: float 0.02, warning_pct: float 0.015):参数:max_loss_pct: 最大亏损占比如 0.02 2%warning_pct: 预警线如 0.015 1.5%self.max_loss max_loss_pctself.warn_pct warning_pctlogger.info(f亏损计算引擎初始化: 风控线{max_loss_pct*100:.1f}%, f预警线{warning_pct*100:.1f}%)def compute_position_pnl(self,positions: Dict[str, PositionInfo],current_prices: pd.Series,total_capital: float) - pd.DataFrame:★ 核心方法批量计算所有持仓的盈亏参数:positions: {code: PositionInfo}current_prices: 当前价格 Seriescode → pricetotal_capital: 总资金用于计算占比返回:DataFrame: 每只持仓的完整盈亏信息if not positions:return pd.DataFrame()records []for code, pos in positions.items():current_price current_prices.get(code, pos.open_price)# ★ 核心计算market_value current_price * pos.qty # 当前市值cost pos.open_price * pos.qty # 开仓成本pnl_amount market_value - cost # 浮动盈亏元pnl_pct (current_price - pos.open_price) / pos.open_price # 个股收益率capital_impact pnl_amount / total_capital # ★ 对总资金的冲击records.append({code: code,open_price: pos.open_price,current_price: current_price,qty: pos.qty,cost: cost,market_value: market_value,pnl_amount: pnl_amount,pnl_pct: pnl_pct,capital_impact: capital_impact, # ★ 核心指标capital_impact_pct: capital_impact * 100,triggered: capital_impact -self.max_loss, # 是否触发风控warning: capital_impact -self.warn_pct # 是否达预警})df pd.DataFrame(records)if len(df) 0:df df.sort_values(capital_impact) # 亏损最多的排前面return dfdef check_risk_events(self,pnl_df: pd.DataFrame) - Dict:★ 风控事件检测返回:{triggered: [...], # 触发强制卖出的warning: [...], # 预警的normal: [...] # 正常的}if len(pnl_df) 0:return {triggered: [], warning: [], normal: []}triggered pnl_df[pnl_df[triggered] True]warning pnl_df[(pnl_df[warning] True) (pnl_df[triggered] False)]normal pnl_df[pnl_df[warning] False]return {triggered: triggered.to_dict(records),warning: warning.to_dict(records),normal: normal.to_dict(records)}def compute_portfolio_risk(self,positions: Dict[str, PositionInfo],current_prices: pd.Series,total_capital: float) - Dict:计算组合层面的风险指标返回:{total_pnl: 总浮动盈亏,total_pnl_pct: 占总资金比例,max_single_loss: 单只最大亏损,n_triggered: 触发风控数量,n_warning: 预警数量,portfolio_heat: 组合热度0~1}pnl_df self.compute_position_pnl(positions, current_prices, total_capital)if len(pnl_df) 0:return {total_pnl: 0, total_pnl_pct: 0,max_single_loss: 0, n_triggered: 0,n_warning: 0, portfolio_heat: 0}total_pnl pnl_df[pnl_amount].sum()max_loss pnl_df[capital_impact].min()n_triggered (pnl_df[triggered] True).sum()n_warning (pnl_df[warning] True).sum()# 组合热度所有持仓盈亏占比的绝对值之和归一化abs_impacts pnl_df[capital_impact].abs()portfolio_heat min(abs_impacts.sum() / (len(pnl_df) * self.max_loss), 1.0)return {total_pnl: total_pnl,total_pnl_pct: total_pnl / total_capital * 100,max_single_loss: max_loss * 100,n_triggered: int(n_triggered),n_warning: int(n_warning),portfolio_heat: round(portfolio_heat, 4)}def print_pnl_report(self, pnl_df: pd.DataFrame, total_capital: float):打印盈亏报告if len(pnl_df) 0:print( 无持仓)returnprint(f\n{*80})print(f{持仓盈亏报告:^80})print(f{*80})print(f 总资金: ¥{total_capital:,.2f})print(f 持仓: {len(pnl_df)} 只)print(f\n {代码:8} {开仓价:10} {当前价:10} {持仓量:8} f{盈亏(元):15} {盈亏%:10} {占总资金%:10} {状态:8})print(f {─*80})for _, r in pnl_df.iterrows():status ( 触发 if r[triggered] else 预警 if r[warning] else 正常)pnl_sign if r[pnl_amount] 0 else pct_sign if r[pnl_pct] 0 else print(f {r[code]:8} {r[open_price]:10.2f} {r[current_price]:10.2f} f{r[qty]:8} f{pnl_sign}{r[pnl_amount]:12,.2f} f{pct_sign}{r[pnl_pct]*100:7.2f}% f{r[capital_impact_pct]:8.2f}% {status})# 汇总total_pnl pnl_df[pnl_amount].sum()print(f\n {─*80})print(f 合计浮动盈亏: ¥{total_pnl:,.2f} f占总资金 {total_pnl/total_capital*100:.2f}%)print(f{*80}\n)src/risk_monitor.py★ 实时监控risk_monitor.py★ 实时监控 分级预警核心功能1. 盘中每分钟重新计算盈亏2. 三级预警正常 / 预警 / 强制3. 紧急熔断单日暴跌import pandas as pdimport numpy as npfrom src.loss_calculator import LossCalculator, PositionInfofrom typing import Dict, List, Optionalfrom enum import Enumimport logginglogger logging.getLogger(__name__)class RiskLevel(Enum):风险等级NORMAL normal # 正常WARNING warning # 预警TRIGGERED triggered # 强制卖出EMERGENCY emergency # 紧急熔断class RiskMonitor:★ 风控监控器分级预警机制- 正常亏损占比 -1.5%- 预警-2.0% 亏损占比 ≤ -1.5%- 强制亏损占比 ≤ -2.0%- 紧急单日跌幅 ≥ 7%跳过阈值直接卖def __init__(self,loss_calculator: LossCalculator,emergency_drop_pct: float 0.07):参数:loss_calculator: 亏损计算引擎emergency_drop_pct: 单日暴跌熔断阈值如 0.07 7%self.calc loss_calculatorself.emergency emergency_drop_pctself.event_log: List[Dict] []logger.info(f风控监控器初始化: 熔断线{emergency_drop_pct*100:.0f}%)def check_positions(self,date: pd.Timestamp,positions: Dict[str, PositionInfo],current_prices: pd.Series,total_capital: float,prev_prices: Optional[pd.Series] None) - Dict:★ 核心方法检查所有持仓的风险状态参数:date: 当前日期positions: 持仓字典current_prices: 当前价格total_capital: 总资金prev_prices: 上一期价格用于计算单日跌幅返回:{risk_level: 组合最高风险等级,triggered_codes: [强制卖出的标的],warning_codes: [预警标的],emergency_codes: [紧急熔断标的],pnl_df: 完整盈亏 DataFrame,portfolio_risk: 组合风险指标}# 计算盈亏 pnl_df self.calc.compute_position_pnl(positions, current_prices, total_capital)if len(pnl_df) 0:return {risk_level: RiskLevel.NORMAL.value,triggered_codes: [],warning_codes: [],emergency_codes: [],pnl_df: pd.DataFrame(),portfolio_risk: {}}triggered []warning []emergency []# 检查 1常规风控线 for _, row in pnl_df.iterrows():code row[code]if row[triggered]:triggered.append(code)self._log_event(date, code, RiskLevel.TRIGGERED, row[capital_impact_pct])elif row[warning]:warning.append(code)self._log_event(date, code, RiskLevel.WARNING, row[capital_impact_pct])# 检查 2紧急熔断单日暴跌if prev_prices is not None:for code in positions:if code not in current_prices or code not in prev_prices:continueprev prev_prices[code]curr current_prices[code]if prev 0:continuedaily_drop (curr - prev) / previf daily_drop -self.emergency:if code not in triggered: # 避免重复emergency.append(code)self._log_event(date, code, RiskLevel.EMERGENCY, daily_drop * 100)# 确定组合最高风险等级 if emergency:risk_level RiskLevel.EMERGENCYelif triggered:risk_level RiskLevel.TRIGGEREDelif warning:risk_level RiskLevel.WARNINGelse:risk_level RiskLevel.NORMAL# 组合风险指标 portfolio_risk self.calc.compute_portfolio_risk(positions, current_prices, total_capital)# 日志输出 if risk_level ! RiskLevel.NORMAL:self._print_risk_alert(date, risk_level, triggered, warning, emergency, pnl_df)return {risk_level: risk_level.value,triggered_codes: triggered,warning_codes: warning,emergency_codes: emergency,pnl_df: pnl_df,portfolio_risk: portfolio_risk}def _log_event(self, date: pd.Timestamp, code: str, level: RiskLevel, value: float):记录风控事件self.event_log.append({date: date,code: code,level: level.value,value: value,timestamp: pd.Timestamp.now()})def _print_risk_alert(self,date: pd.Timestamp,level: RiskLevel,triggered: List[str],warning: List[str],emergency: List[str],pnl_df: pd.DataFrame):打印风控告警level_emoji {RiskLevel.WARNING: ,RiskLevel.TRIGGERED: ,RiskLevel.EMERGENCY: }print(f\n {level_emoji.get(level, )} [{date.strftime(%Y-%m-%d)}] f风控告警: {level.value.upper()})if emergency:print(f 紧急熔断: {, .join(emergency)}单日暴跌 ≥ {self.emergency*100:.0f}%)if triggered:print(f 强制调出: {, .join(triggered)}亏损占比 ≤ -{self.calc.max_loss*100:.1f}%)for code in triggered:row pnl_df[pnl_df[code] code]if len(row) 0:r row.iloc[0]print(f {code}: 开仓 ¥{r[open_price]:.2f} → 当前 ¥{r[current_price]:.2f}, f亏损 ¥{r[pnl_amount]:,.2f}占总资金 {r[capital_impact_pct]:.2f}%)if warning:print(f 预警: {, .join(warning)}亏损占比 ≤ -{self.calc.warn_pct*100:.1f}%)print()def get_event_summary(self) - Dict:返回风控事件统计if not self.event_log:return {total_events: 0}df pd.DataFrame(self.event_log)return {total_events: len(df),triggered: len(df[df[level] triggered]),warning: len(df[df[level] warning]),emergency: len(df[df[level] emergency]),unique_codes: df[code].nunique(),first_event: df[date].min(),last_event: df[date].max()}def print_event_summary(self):打印风控事件汇总summary self.get_event_summary()print(f\n{*60})print(f 风控事件汇总)print(f{*60})print(f 总事件数: {summary[total_events]})print(f 强制调出: {summary[triggered]})print(f 预警: {summary[warning]})print(f 紧急熔断: {summary[emergency]})print(f 涉及标的: {summary[unique_codes]} 只)if summary[total_events] 0:print(f 首次事件: {summary[first_event].strftime(%Y-%m-%d)})print(f 最近事件: {summary[last_event].strftime(%Y-%m-%d)})print(f{*60}\n)src/force_exit.py★ 强制调出force_exit.py★ 强制调出执行器触发后1. 立即卖出触发标的2. 释放资金进入待分配池3. 记录调出原因和价格import pandas as pdimport numpy as npfrom src.loss_calculator import PositionInfofrom typing import Dict, List, Optionalimport logginglogger logging.getLogger(__name__)class ForceExitExecutor:★ 强制调出执行器核心逻辑- 触发风控 → 全部卖出- 释放资金 → 记录到 available_cash- 调出记录 → 写入日志用于复盘def __init__(self,sell_all_on_trigger: bool True,keep_pct_if_single: float 0.0,commission_rate: float 0.0003,stamp_tax_rate: float 0.001):参数:sell_all_on_trigger: 触发后是否全部卖出keep_pct_if_single: 若只剩 1 只持仓保留百分比0 全卖commission_rate: 佣金率stamp_tax_rate: 印花税率self.sell_all sell_all_on_triggerself.keep_pct keep_pct_if_singleself.comm commission_rateself.tax stamp_tax_rateself.exit_log: List[Dict] []self.available_cash: float 0.0 # 调出后释放的现金logger.info(f强制调出执行器初始化: f全部卖出{sell_all_on_trigger}, f单只保留{keep_pct_if_single*100:.0f}%)def execute(self,triggered_codes: List[str],positions: Dict[str, PositionInfo],current_prices: pd.Series,date: pd.Timestamp) - Dict:★ 核心方法执行强制调出参数:triggered_codes: 触发风控的标的列表positions: 当前持仓current_prices: 当前价格date: 调出日期返回:{removed: [code, ...], # 被调出的kept: [code, ...], # 被保留的如有cash_released: float, # 释放的现金exit_details: [...] # 调出明细}removed []kept []cash_released 0.0details []total_positions len(positions)for code in triggered_codes:if code not in positions:continuepos positions[code]exit_price current_prices.get(code, pos.open_price)# ★ 决定是否保留部分if total_positions 1 and self.keep_pct 0:# 只剩一只保留部分keep_qty int(pos.qty * self.keep_pct / 100) * 100sell_qty pos.qty - keep_qtyelse:sell_qty pos.qtykeep_qty 0if sell_qty 0:revenue sell_qty * exit_price * (1 - self.comm - self.tax)cash_released revenuepnl (exit_price - po本文代码仅供学习与技术交流不构成任何投资建议股市有风险入市需谨慎利用AI解决实际问题如果你觉得这个工具好用欢迎关注长安牧笛