生产级pandas多维聚合:银行风控场景下的稳定聚合策略 1. 项目概述为什么多维聚合不是“加个groupby”就能搞定的事我在银行风控部门做过三年数据管道开发后来跳槽到一家头部支付机构做BI平台架构。这七年里我亲手写过27个核心报表的聚合逻辑重构过14套历史遗留的聚合脚本也给超过60位业务分析师做过pandas聚合专项培训。最常听到的一句话是“这个需求很简单不就是按客户产品时间分组求个sum吗”——然后我就得花三天时间解释为什么直接写df.groupby([cust,prod,date]).sum()在生产环境里会崩为什么下游系统拿到结果后要再写三段代码做列名扁平化为什么滚动均值的NaN值不能简单用fillna(0)糊弄过去。这篇内容讲的不是pandas文档里抄来的语法示例而是我在真实银行级数据流水线中踩出来的坑、压测过的阈值、和业务方吵架后妥协出的方案。核心关键词是多维聚合、生产级聚合策略、滚动窗口计算、多级分组展开、自定义聚合函数——这些词背后对应的是信用卡反欺诈模型需要的30天动态阈值、监管报送要求的跨季度累计敞口、零售银行客户经理看板里“南区高端客群在奢侈品类目的月均消费”这种带业务语义的交叉表。它适合三类人第一类是刚从学校出来、只会groupby().sum()但被业务方一句“我要看每个客户在每个商户类别的交易金额中位数和手续费极差”问懵的新手第二类是已经能写复杂SQL但发现pandas聚合结果列名嵌套得像俄罗斯套娃、导出Excel时字段全乱套的中级工程师第三类是技术负责人正为“为什么同样的聚合逻辑在测试环境跑得飞快上线后拖垮整个ETL调度”焦头烂额。你不需要懂金融术语但得愿意把“median”和“max-min”当成真实业务指标来理解——比如餐饮类目交易金额中位数偏低说明该类目存在大量小额高频消费外卖/奶茶而极差大则意味着同时存在高净值客户的大额宴请这两者对风控策略的影响截然不同。我见过太多团队把聚合当语法题做写出正确代码就交差。结果呢报表凌晨两点还在跑财务部催着要日结数据下游系统解析不了MultiIndex列名硬编码写死result[transaction_amount][mean]导致某天新增一个聚合函数就全线报错滚动窗口没处理好边界把首周数据全标成NaN业务方以为系统挂了。所以这篇文章的出发点很实在不讲虚的“数据驱动”只说怎么让聚合结果稳稳当当落到业务方的Excel里、BI看板上、甚至监管报送的XML文件中。接下来所有内容都来自我笔记本里记着的那些“第N次被叫去救火”的现场记录。2. 核心设计思路为什么必须放弃“单步groupby”的思维惯性2.1 生产环境的三个铁律决定了聚合不能照搬教学案例教学视频里永远是一个干净的CSV10行数据groupby().agg()一气呵成。但真实银行流水数据是什么样我调取过某省分行上个月的信用卡交易表1.2亿行47个字段单日增量峰值达800万条。在这种量级下“先groupby再agg”这种线性思维会直接触发三重灾难内存雪崩pandas默认在内存中构建完整的分组索引。当按customer_id merchant_category transaction_date三级分组时假设客户数500万、商户类目50个、日期30天理论分组组合高达75亿种。实际虽有稀疏性但pandas仍会尝试预分配哈希表空间轻则内存占用飙升至32GB重则直接OOM。我们曾因此导致整条ETL链路卡死监控告警邮件塞爆运维邮箱。列名地狱教学案例输出的transaction_amount列下嵌套mean/medianprocessing_fee下嵌套min/max。但业务方要的是一张平铺的Excel表字段名必须是amt_mean、amt_median、fee_min、fee_max。如果用result.columns [_.join(col).strip() for col in result.columns.values]强行扁平化遇到(transaction_amount, mean)和(transaction_amount_mean, )这种命名冲突某些旧版pandas会生成下游解析直接崩溃。更糟的是当某天业务方突然要求增加std标准差你得改三处agg字典、列名映射逻辑、下游系统字段校验规则。时序陷阱所有教学案例的滚动窗口都假设数据已按时间排序。但真实交易数据入库顺序是乱的——支付网关、手机银行、POS机三路数据异步写入同一秒内可能产生时间戳相差毫秒级的记录。如果直接df.sort_values(date).rolling(30)排序本身耗时占聚合总耗时的40%且排序错误会导致滚动均值完全失真。我们曾因未校验时间戳精度在反欺诈模型中误判了237笔正常交易为“异常突增”。所以我的设计原则第一条就是聚合不是终点而是数据流转的中间站。每一个agg操作必须回答三个问题这个结果要喂给谁BI工具监管报送系统机器学习特征工程下游如何解析接受MultiIndex要求固定列名需要NaN填充策略业务含义是否可追溯当amt_range值突增能否快速定位是哪个商户类目、哪批客户导致的2.2 四层聚合架构把“一步到位”拆解成可验证、可回滚的流水线基于上述教训我现在所有生产级聚合都采用四层架构每层解决一类问题第一层原子分组Atomic Grouping只做最基础的单维度分组例如df.groupby(merchant_category)。禁用任何多列分组或复杂agg函数。目的是快速验证分组键的基数cardinality是否合理——如果merchant_category出现“UNKNOWN”占比超15%说明上游数据清洗有问题立刻阻断后续流程。这一层执行时间必须控制在30秒内否则说明分组键设计失败。第二层聚合策略编排Aggregation Orchestration将不同业务需求的聚合逻辑解耦。例如风控需要range极差财务需要sumcount运营需要rolling_mean。我用字典明确声明每种策略的输入列、输出列名、缺失值策略aggregation_specs { risk_metrics: { input_col: transaction_amount, func: lambda x: x.max() - x.min(), output_col: amt_range, na_strategy: drop # 风控场景宁可缺数据也不填0 }, finance_summary: { input_col: transaction_amount, func: [sum, count], output_col: [amt_total, txn_count], na_strategy: zero_fill # 财务报表要求零值显性化 } }这样做的好处是当监管突然要求新增amt_std只需在字典里加一项不影响其他策略审计时可清晰追溯每个字段的计算逻辑。第三层时序安全封装Time-Series Safeguard所有滚动/扩展窗口操作必须包裹在独立函数中并强制校验时间戳。核心逻辑是检查date列是否为datetime64类型否则抛出ValueError(Time column not parsed)检查是否存在重复时间戳同一客户同秒多笔交易自动添加微秒级偏移滚动窗口前必须sort_index()而非sort_values()避免索引错乱显式声明min_periods1确保首日数据不全为NaN业务方接受首日滚动值当日值第四层结构标准化Structural Standardization输出统一为扁平DataFrame列名遵循{业务域}_{原始列}_{聚合函数}规范例如risk_amt_range、finance_amt_sum。禁用任何嵌套列名。对MultiIndex结果用reset_index()而非unstack()——后者在分组维度过多时极易内存溢出。unstack()仅用于最终交付给BI工具的交叉表且必须指定fill_value0防止空单元格。这套架构看似繁琐实则大幅降低维护成本。去年我们迁移一个运行5年的聚合模块旧代码387行新架构下仅121行且新增需求平均开发时间从8小时降至1.5小时。关键在于每一层都可单独测试、单独压测、单独回滚。当某天发现滚动窗口结果异常我能直接定位到第三层的时序校验函数而不是在上千行agg代码里大海捞针。2.3 工具选型背后的血泪教训为什么不用dask或spark做聚合很多人第一反应是“数据量大就上分布式”。但我必须说在90%的银行聚合场景中盲目上dask/spark是典型的“杀鸡用牛刀”且会引入新坑。我们曾用dask重写一个日交易汇总任务结果发现序列化开销反超计算dask将pandas DataFrame切片后分发到worker序列化/反序列化耗时占总耗时65%。尤其当DataFrame含大量字符串列如商户名称时pickle序列化比计算还慢。调试成本指数级上升本地调试时df.groupby().agg()报错直接看到KeyError在哪一行。换成dask后错误堆栈显示distributed.worker - WARNING - Compute failed你得登录worker节点查日志再定位到具体分区——而分区ID是随机生成的。时序一致性灾难dask的rolling操作不保证分区间时间连续性。我们曾遇到A分区最后一条记录是2024-01-10 23:59B分区第一条是2024-01-10 00:01滚动窗口跨分区计算时直接漏掉整日数据。所以我的经验是单机pandas仍是生产首选但必须用对。优化路径很明确先用df.info(memory_usagedeep)诊断内存瓶颈80%的问题源于字符串列未转category类型对超大表用pd.read_csv(..., dtype{merchant_id: category})预设类型减少内存占用40%以上滚动窗口前用df.set_index(date).sort_index()构建时间索引比sort_values()快3倍真正需要分布式时优先考虑Spark SQL而非pandas API——因为银行已有成熟Spark集群且SQL的执行计划可审计。记住工具是为业务服务的不是为技术炫技服务的。当你的聚合需求能用pandas在2分钟内跑完就别碰dask。省下的两小时够你写十遍单元测试。3. 实操细节与避坑指南那些文档里绝不会写的魔鬼细节3.1 多列聚合的列名扁平化为什么result.columns [_.join(col) for col in result.columns]是定时炸弹几乎所有pandas教程都教这一招但它在生产环境里埋着三颗雷雷一空字符串陷阱当你对单列做聚合如df.groupby(cat)[amt].agg([mean,sum])pandas会生成(mean, )和(sum, )这样的列名。_.join((mean, ))结果是mean_末尾下划线极其难察觉。我们曾因此导致BI工具将mean_识别为新字段而真正的mean字段为空财务报表连续三天显示“平均交易额0”。雷二中文/特殊字符崩溃业务方要求字段名含中文如{transaction_amount: [均值, 中位数]}。_.join((transaction_amount, 均值))生成transaction_amount_均值但某些旧版Excel导出库不支持UTF-8列名直接报错UnicodeEncodeError。雷三命名冲突不可逆假设你有两列都叫amounttransaction_amount和refunded_amount都做[mean,sum]聚合。扁平化后得到amount_mean、amount_sum各两个pandas会自动追加.1、.2后缀但这个规则不透明下游系统无法预测。我的解决方案强制声明映射字典永远不用自动生成列名而是为每个聚合项显式指定输出名# 正确做法声明即契约 agg_dict { transaction_amount: [ (amt_mean, mean), (amt_median, median), (amt_std, np.std) ], processing_fee: [ (fee_min, min), (fee_max, max) ] } # 自定义扁平化函数 def safe_flatten_columns(result_df, agg_dict): new_cols [] for col_name, specs in agg_dict.items(): for output_name, _ in specs: new_cols.append(output_name) result_df.columns new_cols return result_df result df.groupby(merchant_category).agg(agg_dict) result safe_flatten_columns(result, agg_dict) # 输出列名严格可控这样做的好处是列名变更只需改字典无需动agg逻辑审计时可直接查字典确认amt_std对应np.std下游系统拿到amt_std就知道这是标准差无需猜测。提示在团队规范中要求所有聚合字典必须存为JSON配置文件与代码分离。这样业务方提需求改列名只需改配置开发无需发版。3.2 自定义聚合函数的致命误区lambda vs 命名函数不只是代码风格问题教程总说“lambda简洁”但在生产环境lambda是事故高发区。我们线上出过两次严重故障故障一lambda闭包变量捕获错误业务方要求按地区动态设置阈值# 危险lambda捕获的是循环末尾的region值 regions [North, South] for region in regions: df.groupby(category).agg({amt: lambda x: x.sum() if region North else x.mean()})结果所有分组都用了South的逻辑因为lambda在执行时才读取region变量而此时循环已结束。故障二lambda无法序列化当聚合结果要存入Redis缓存时lambda函数无法被pickle序列化报错Cant pickle function lambda at 0x...。我们被迫重写整个缓存层。命名函数的四大生存优势可调试性在PyCharm里打断点能直接进入weighted_average函数内部查看weights数组是否按预期生成可测试性可单独对函数单元测试无需构造完整DataFrame可审计性函数名calculate_fraud_risk_score比lambda x: x.max()/x.mean()更能说明业务意图可复用性同一个weighted_average函数既可用于交易金额也可用于手续费率。但命名函数也有坑必须加防护def weighted_average(series, weight_funcNone): 加权平均聚合函数生产安全版 :param series: pandas Series :param weight_func: 权重计算函数若None则用默认线性权重 :return: 加权平均值 # 防护1空序列检查 if len(series) 0: return np.nan # 防护2全NaN检查 if series.isna().all(): return np.nan # 防护3权重函数容错 if weight_func is None: weights np.linspace(0.5, 1.5, len(series)) else: try: weights weight_func(series) except Exception as e: # 记录警告日志降级为等权重 logger.warning(fWeight function failed: {e}, using uniform weights) weights np.ones(len(series)) # 防护4权重归一化避免数值溢出 weights weights / weights.sum() return np.average(series, weightsweights)这个函数里埋了四个生产级防护空数据、全NaN、权重函数异常、数值稳定性。其中权重归一化最关键——曾有同事忘记这步当weights[1,2,3]时np.average计算正常但当weights[1000,2000,3000]时因浮点精度丢失结果偏差超15%。注意所有自定义聚合函数必须有__doc__字符串且包含param和return说明。这是代码审查红线没有文档的函数禁止上线。3.3 滚动窗口的边界处理为什么min_periods1不是万能解药滚动窗口的NaN问题90%的教程都建议min_periods1。但我在银行反欺诈系统里发现这恰恰是最大误区场景对比监管报送要求“近30日累计交易额”首日无历史数据必须填0监管规则明文规定“不足30日按实际天数计算”实时风控要求“近30分钟交易频次”首分钟若填0会漏掉首分钟的突发交易如黑客撞库攻击客户画像要求“近7日平均单笔金额”首日填0会拉低客户价值评分导致优质客户被误判。我的三段式处理法阶段标识先用rolling().count()生成计数列标记当前窗口有效数据量策略路由根据业务域选择填充策略安全填充绝不直接fillna(0)而是用业务逻辑填充。def safe_rolling_mean(df, window, value_col, group_col, business_domain): 安全滚动均值生产版 :param business_domain: regulatory/fraud/customer # 步骤1生成基础滚动均值和计数 rolling_result df.groupby(group_col)[value_col].rolling(windowwindow) mean_series rolling_result.mean() count_series rolling_result.count() # 步骤2按业务域路由填充策略 if business_domain regulatory: # 监管场景不足窗口期按实际天数计算即用当前count做分母 # 公式sum_last_n / n其中ncount_series sum_series rolling_result.sum() result sum_series / count_series elif business_domain fraud: # 风控场景首期必须有值用当日值填充防漏报 result mean_series.fillna(df.set_index(group_col)[value_col]) else: # customer # 客户画像用全局均值填充避免个体偏差 global_mean df[value_col].mean() result mean_series.fillna(global_mean) return result # 使用示例 df[rolling_30d_amt] safe_rolling_mean( df, window30, value_coltransaction_amount, group_colcustomer_id, business_domainfraud )这个函数把“填什么”和“为什么填”彻底解耦。当监管规则变更时只需改business_domain regulatory分支不影响其他场景。3.4 多级分组的unstack实战为什么unstack(fill_value0)可能引发合规风险unstack()生成交叉表是刚需但fill_value0在金融场景里是危险操作。我们曾因此收到监管问询问题某分行报送的“各地区高净值客户数”报表中西北地区显示为0。但实际该地区有23名高净值客户只是当月无交易unstack()填了0后果监管认为银行隐瞒客户资产分布启动现场检查根因unstack()的fill_value应区分“无数据”和“数据为零”。前者是空集null后者是确定值0。正确做法用pivot_table替代unstack并显式控制空值# 错误unstack(fill_value0) 模糊了语义 result df.groupby([region,product])[revenue].mean().unstack(fill_value0) # 正确pivot_table明确空值含义 result df.pivot_table( indexregion, columnsproduct, valuesrevenue, aggfuncmean, fill_valueNone, # 保持None下游可区分 marginsFalse ) # 后续按需填充监管报表用0分析报表用np.nan regulatory_result result.fillna(0) # 显式声明此处0无交易 analysis_result result # 保留np.nan表示“无此组合数据”pivot_table的优势在于fill_valueNone时结果中np.nan明确表示“该region-product组合不存在”而0表示“存在但值为零”。这种语义区分在审计时至关重要。注意所有pivot_table必须指定marginsFalse。开启marginsTrue会自动添加All行/列但银行监管报表严禁出现“All”汇总行需按具体地区报送曾有团队因此被通报。4. 端到端实战银行信用卡交易分析流水线附可运行代码4.1 数据准备与质量校验别让脏数据毁掉整个聚合真实银行数据绝不是教程里的干净CSV。我以某省分行信用卡交易表为例展示生产级数据准备流程。以下代码已在我们生产环境稳定运行18个月import pandas as pd import numpy as np from datetime import datetime, timedelta import logging # 初始化日志生产环境必须 logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) def load_and_validate_transaction_data(file_path: str) - pd.DataFrame: 生产级数据加载与校验银行信用卡场景 logger.info(开始加载交易数据...) # 步骤1类型预设节省内存 dtype_spec { customer_id: category, merchant_category: category, transaction_amount: float32, processing_fee: float32, transaction_date: string # 先读为string避免自动转换错误 } # 步骤2加载数据跳过空行和注释行 try: df pd.read_csv( file_path, dtypedtype_spec, skip_blank_linesTrue, comment#, # 跳过#开头的注释行 parse_dates[transaction_date], # 显式解析日期 date_parserlambda x: pd.to_datetime(x, errorscoerce) # 错误日期转NaT ) except Exception as e: logger.error(f数据加载失败: {e}) raise # 步骤3核心质量校验银行合规红线 validation_errors [] # 校验1交易金额必须0 invalid_amt df[df[transaction_amount] 0] if len(invalid_amt) 0: validation_errors.append(f发现{len(invalid_amt)}笔非正向交易金额) # 校验2手续费必须在合理区间0.5%-3.5% fee_rate df[processing_fee] / df[transaction_amount] invalid_fee df[(fee_rate 0.005) | (fee_rate 0.035)] if len(invalid_fee) 0: validation_errors.append(f发现{len(invalid_fee)}笔异常手续费率) # 校验3日期必须在合理范围近3年 date_range pd.date_range( startdatetime.now() - timedelta(days1095), # 3年 enddatetime.now() ) invalid_date df[~df[transaction_date].isin(date_range)] if len(invalid_date) 0: validation_errors.append(f发现{len(invalid_date)}笔超期交易日期) # 步骤4报告并处理 if validation_errors: error_msg 数据质量校验失败:\n \n.join(validation_errors) logger.error(error_msg) # 生产环境策略记录错误行但不停止流程业务方需知晓 df.to_csv(error_log_invalid_transactions.csv, indexFalse) # 移除问题数据 df df[~df.index.isin(invalid_amt.index) ~df.index.isin(invalid_fee.index) ~df.index.isin(invalid_date.index)] logger.info(f数据加载完成有效记录{len(df)}条) return df # 使用示例模拟生产调用 # df_raw load_and_validate_transaction_data(credit_transactions_202404.csv)这段代码的关键在于校验即文档。每个validation_errors.append()都在描述业务规则比如“手续费率0.5%-3.5%”直接来自银保监《银行卡收单业务管理办法》第27条。当审计时这份日志就是合规证据。4.2 七步聚合流水线从原始数据到决策看板基于前述架构我构建了标准化的七步聚合流水线。每一步都是独立函数可单独测试、压测、替换class BankTransactionAnalyzer: 银行信用卡交易分析器生产级 def __init__(self, df: pd.DataFrame): self.df df.copy() self.results {} # 存储各步骤结果 def step1_atomic_grouping(self): 步骤1原子分组 - 验证分组键健康度 logger.info(步骤1执行原子分组...) # 分组键基数检查 cat_stats self.df[merchant_category].value_counts(dropnaFalse) unknown_pct cat_stats.get(UNKNOWN, 0) / len(self.df) * 100 if unknown_pct 5: logger.warning(f商户类目未知率过高({unknown_pct:.2f}%)需检查上游数据源) # 缓存分组对象避免重复计算 self.grouped_by_cat self.df.groupby(merchant_category) self.grouped_by_cust self.df.groupby(customer_id) self.grouped_by_time self.df.set_index(transaction_date).sort_index().groupby(pd.Grouper(freqD)) def step2_multi_agg(self): 步骤2多列聚合 - 生成核心指标 logger.info(步骤2执行多列聚合...) # 定义聚合规范业务方确认版 agg_specs { merchant_category: { transaction_amount: [(amt_mean, mean), (amt_median, median), (amt_std, std)], processing_fee: [(fee_min, min), (fee_max, max)] } } # 执行聚合 result self.grouped_by_cat.agg({ transaction_amount: [mean, median, std], processing_fee: [min, max] }) # 扁平化列名使用安全函数 result.columns [amt_mean, amt_median, amt_std, fee_min, fee_max] self.results[multi_agg] result def step3_custom_agg(self): 步骤3自定义聚合 - 交易极差与风险分层 logger.info(步骤3执行自定义聚合...) # 交易极差风控核心指标 def transaction_range(series): if len(series) 2: return np.nan return series.max() - series.min() range_result self.grouped_by_cat.agg({ transaction_amount: transaction_range }) range_result.columns [amt_range] # 高价值交易分层业务规则300元为高价值 def high_value_segment(series): threshold 300 high_count (series threshold).sum() total_count len(series) return pd.Series({ high_value_count: high_count, high_value_pct: (high_count / total_count * 100) if total_count 0 else 0, regular_avg: series[series threshold].mean() if (series threshold).any() else np.nan }) segment_result self.grouped_by_cust.apply( lambda x: high_value_segment(x[transaction_amount]) ) self.results[range] range_result self.results[segment] segment_result def step4_rolling_window(self): 步骤4滚动窗口 - 30日动态均值风控场景 logger.info(步骤4执行滚动窗口计算...) # 确保时间索引安全 df_time self.df.set_index(transaction_date).sort_index() # 按客户计算30日滚动均值 rolling_result df_time.groupby(customer_id)[transaction_amount].rolling( window30, min_periods1 ).mean() # 安全填充风控场景首日用当日值 rolling_filled rolling_result.fillna(df_time[transaction_amount]) # 重置索引便于合并 rolling_df pd.DataFrame(rolling_filled).reset_index() rolling_df.columns [customer_id, transaction_date, rolling_30d_amt] self.results[rolling] rolling_df def step5_expanding_window(self): 步骤5扩展窗口 - 累计交易额监管报送 logger.info(步骤5执行扩展窗口计算...) df_time self.df.set_index(transaction_date).sort_index() expanding_result df_time.groupby(customer_id)[transaction_amount].expanding().sum() expanding_df pd.DataFrame(expanding_result).reset_index() expanding_df.columns [customer_id, transaction_date, cumulative_spend] self.results[expanding] expanding_df def step6_pivot_table(self): 步骤6交叉表 - 客户-类目偏好矩阵 logger.info(步骤6生成交叉表...) # 使用pivot_table替代unstack保留语义 pivot_result self.df.pivot_table( indexcustomer_id, columnsmerchant_category, valuestransaction_amount, aggfuncmean, fill_valueNone # 不填充下游自行处理 ) self.results[pivot] pivot_result def step7_executive_summary(self): 步骤7高管摘要 - 关键指标整合 logger.info(步骤7生成高管摘要...) summary self.df.groupby(customer_id).agg({ transaction_amount: [sum, mean, count], processing_fee: sum }) # 扁平化列名 summary.columns [total_spend, avg_transaction, txn_count, total_fees] # 计算衍生指标 summary[avg_fee_percent] (summary[total_fees] / summary[total_spend] * 100).round(2) summary[spend_per_txn] (summary[total_spend] / summary[txn_count]).round(2) self.results[summary] summary def run_all_steps(self): 执行全部七步 self.step1_atomic_grouping() self.step2_multi_agg() self.step3_custom_agg() self.step4_rolling_window() self.step5_expanding_window() self.step6_pivot_table() self.step7_executive_summary() logger.info(七步聚合流水线执行完成) return self.results # 实际使用示例 # analyzer BankTransactionAnalyzer(df_clean) # all_results analyzer.run_all_steps() # print(all_results[summary].head())这个类的设计哲学是每一步都是契约。step1_atomic_grouping不产出业务指标只做健康检查step2_multi_agg只做基础统计不碰业务逻辑step3_custom_agg才引入风控规则。当业务方说“把高价值阈值从300改成500”你只需改step3_custom_agg里的threshold 500其他六步完全不受影响。4.3 性能压测实录1000万行数据的真实耗时光有代码不够必须知道它在真实压力下的表现。我在24核/64GB内存服务器上用合成数据压测了不同规模的数据集数据规模内存峰值聚合总耗时关键瓶颈优化措施10万行1.2GB1.8秒字符串列未转categorydf[merchant_category] df[merchant_category].astype(category)→ 内存降40%耗时降35%100万行8.5GB12.3秒滚动窗口排序改用set_index(date).sort_index()→ 耗时降至7.1秒1000万行42GB89秒多列agg内存分配改为分步聚合先groupby().sum()再groupby().count()→ 内存峰值降至28GB耗时