Pyspark EDA实战:PB级数据探索的四层架构与分布式诊断方法 1. 项目概述为什么在大数据场景下EDA不能再只靠Pandas了“Exploratory Data Analysis (EDA) using Pyspark”——这个标题乍看平平无奇但背后藏着一个几乎所有数据工程师、分析型产品经理和BI团队都踩过的真实坑当你的数据量从百万级跳到十亿级甚至跨天、跨月、跨业务线拉取原始日志时你本地笔记本上跑得飞快的df.head()、df.describe()、sns.histplot()突然就卡死、报内存溢出、或者干脆连read_csv都失败。我去年帮一家做用户行为埋点的SaaS公司做数据质量诊断他们每天新增12TB原始日志运维同学习惯性用Pandas加载单日样本做分布检查结果一次df.value_counts(event_type)直接把8核32G的Jupyter服务器拖垮三次。这不是工具不行而是战场变了——Pandas是精工小刀适合切豆腐Pyspark是液压剪专治钢筋混凝土。EDA的本质没变理解数据的形状、分布、缺失、异常、相关性但执行载体必须切换。Pyspark EDA不是“把Pandas代码换个别名”而是重构整个探索逻辑从“全量加载→本地计算”变成“延迟计算→分布式采样→分片统计→聚合呈现”。它解决的核心问题是让数据科学家在不写SQL、不依赖数仓ETL的前提下对PB级原始数据完成第一轮可信度判断。适合谁三类人最该立刻上手一是刚接手新数据源、需要快速摸清底细的数据分析师二是要验证数据管道输出是否符合预期的工程师三是做模型前必须确认特征分布稳定性的算法同学。它不替代可视化但为可视化提供真正可信赖的输入它不取代统计建模但能提前拦住90%因数据脏乱导致的模型翻车。2. 核心思路拆解Pyspark EDA不是Pandas平移而是四层架构重构2.1 为什么不能简单把Pandas代码改成pyspark.sql.DataFrame很多人第一次尝试Pyspark EDA时会本能地写# 错误示范Pandas思维惯性 df_spark spark.read.parquet(s3://data/large_log/) print(df_spark.shape) # ❌ 报错Spark DataFrame没有shape属性 print(df_spark.describe().show()) # ⚠️ 能运行但describe只支持数值列且show()默认只打10行掩盖大量信息这暴露了根本误区Pyspark DataFrame是逻辑执行计划Logical Plan不是内存中的二维数组。它的.count()、.describe()、.groupBy().agg()等操作本质是向集群提交一个计算任务等待Executor返回结果。而Pandas的.shape、.info()是即时内存读取。所以Pyspark EDA必须放弃“即时反馈”幻想建立四层分层处理架构第一层元数据探查层Metadata Inspection目标零计算开销快速获取表结构、分区信息、存储格式。关键操作df.printSchema()字段类型、嵌套结构、df.dtypes类型清单、spark.catalog.listTables()库表关系、df.inputFiles()源文件路径。我曾用df.printSchema()发现某上游系统将时间戳存为string而非timestamp导致后续所有时间窗口计算失效这个错误在Pandas里可能要跑完pd.to_datetime()才报错而这里一眼定位。第二层轻量采样统计层Lightweight Sampling Aggregation目标用1%数据量获得高置信度分布概览。关键操作df.sample(0.01).select(...).agg(...).collect()。注意不是df.limit(1000)——那是取前N行严重偏态sample(0.01)是随机抽样保证统计代表性。我们实测过在10亿行用户点击流中0.01%采样100万行的countDistinct(user_id)误差率0.3%但耗时仅12秒而全量去重需47分钟。第三层分片深度诊断层Shard-level Deep Diagnostics目标识别数据漂移、分区倾斜、字段空值模式。关键操作利用df.groupBy(date).agg(...)按业务维度分组统计用df.select([count(when(isnull(c), c)).alias(f{c}_null_count) for c in df.columns]).collect()一次性扫全表空值用df.select(key, value).rdd.map(lambda x: (x[0], 1)).reduceByKey(lambda a,b: ab).top(10, keylambda x: x[1])找热点key。这一层直击大数据痛点——比如我们发现某支付表的order_id在2023-10-01分区有37%空值而其他日期均0.1%立刻定位到当日上游订单服务异常。第四层交互式验证层Interactive Validation目标对可疑字段做定向深挖支持类似Pandas的灵活切片。关键操作df.filter(col(amount) 0).select(order_id, amount, create_time).show(50)df.filter(col(amount) 0).count()。这里强调filter后立即count()而非show()因为show()只触发action但不返回结果对象而count()返回整数便于写入监控告警。我们把它封装成check_anomaly(df, condition, field_list, threshold100)函数自动判断异常行数是否超阈值。这四层不是线性流程而是根据数据规模动态启用小数据1GB可全量跑二、三层中等数据1GB–100GB必走采样层超大数据100GB则以元数据采样分片诊断为主交互验证仅针对已知问题字段。这种架构让EDA从“碰运气式探索”变成“目标导向式诊断”。2.2 工具链选型为什么坚持用原生Pyspark而不是Koalas或Databricks Runtime市面上有多个“Pandas兼容层”Koalas现并入Pyspark、Databricks的pandas_on_spark、甚至Ray-based的Modin。但我们团队在三个大型项目中明确弃用它们原因很实在Koalas的API陷阱它实现了df.plot()但底层调用的是to_pandas()意味着10亿行数据会先被collect()到Driver节点内存再交给Matplotlib——这是典型的“伪分布式”Driver瞬间OOM。我们曾见客户用Koalas画一个直方图集群16个Executor全空闲只有Driver在疯狂GC。Databricks Runtime的绑定风险其pandas_on_spark深度集成Unity Catalog和Delta Lake但在自建Hadoop/YARN集群上无法使用。当客户从Databricks迁移到阿里云EMR时所有EDA脚本全部报错重写成本极高。Pyspark原生的不可替代性df.explain(modeformatted)能清晰看到物理执行计划比如是否启用了BroadcastHashJoin、AQEAdaptive Query Execution是否生效spark.sparkContext.setLogLevel(INFO)可实时监控Shuffle spill量。这些是任何兼容层都无法透出的底层洞察。我们用explain()发现某次df.describe()慢如蜗牛是因为AQE未开启导致Shuffle分区数固定为200而实际数据倾斜严重手动设置spark.sql.adaptive.enabledtrue后耗时从8分钟降至42秒。所以我们的工具链极简Pyspark 3.4支持Python 3.9、StructType推断增强、pandas仅用于最终结果的小规模转换和绘图、matplotlib/seaborn仅接收collect()后的少量数据。拒绝任何中间层确保每行代码的执行路径完全透明、可审计、可迁移。3. 核心细节解析与实操要点从schema诊断到空值热力图3.1 Schema深度诊断不只是看字段名更要揪出类型陷阱Pyspark的printSchema()输出看似简单但藏着大量隐性风险。我们建立了一套标准化检查清单每次新数据接入必跑def inspect_schema(df, max_depth3): 深度解析Schema识别嵌套、精度、时区问题 schema df.schema print( SCHEMA DEEP INSPECTION ) # 1. 检查嵌套结构深度避免JSON爆炸 def get_max_nesting(schema_obj, depth0): if hasattr(schema_obj, fields): return max([get_max_nesting(f.dataType, depth1) for f in schema_obj.fields], defaultdepth) return depth max_nest get_max_nesting(schema) print(f最大嵌套深度: {max_nest} (建议≤3超深嵌套影响查询性能)) # 2. 检查Decimal精度金融场景致命 from pyspark.sql.types import DecimalType decimal_cols [] for field in schema.fields: if isinstance(field.dataType, DecimalType): decimal_cols.append((field.name, field.dataType.precision, field.dataType.scale)) if decimal_cols: print(⚠️ Decimal字段精度:) for name, prec, scale in decimal_cols: print(f - {name}: precision{prec}, scale{scale} (例(18,2)支持9999999999999999.99)) # 3. 检查Timestamp时区日志分析常见坑 from pyspark.sql.types import TimestampType tz_cols [f.name for f in schema.fields if isinstance(f.dataType, TimestampType)] if tz_cols: print(⏰ Timestamp字段注意Pyspark默认UTC业务时间需转换:) for col in tz_cols: print(f - {col}) # 4. 检查String长度防止截断 string_cols [f.name for f in schema.fields if f.dataType.typeName() string] if string_cols: print( String字段建议后续采样检查实际长度分布:) for col in string_cols[:5]: # 只列前5个避免刷屏 print(f - {col}) # 实际调用 inspect_schema(df_spark)这段代码揭示了三个真实案例某APP埋点日志的event_properties字段是MapType(StringType(), StringType())但实际嵌套了5层JSON导致df.select(event_properties.app_version).show()报StackOverflowError我们改用get_json_object(event_properties, $.app.version)安全提取。支付表的amount是Decimal(10,2)但上游偶尔传入Decimal(12,2)Pyspark静默截断高位造成金额丢失我们加了df.filter(col(amount) pow(10, 10-2))告警。用户注册时间register_time是TimestampType但业务要求按“北京时间”统计我们统一加withColumn(register_time_beijing, from_utc_timestamp(col(register_time), Asia/Shanghai))。提示永远不要相信上游给的Schema文档。我们有个硬性规定新数据源接入首周每天凌晨自动运行inspect_schema()生成HTML报告邮件发送连续3天无异常才进入开发阶段。3.2 空值与重复值的分布式扫描如何在10亿行中30秒定位空值热点Pandas的df.isnull().sum()在Pyspark里不能直接用因为isnull()返回的是Column对象需配合agg。但更关键的是空值分布本身就有业务含义。比如user_id在login_event表中空值率0.01%可接受但在payment_event表中空值率0.01%就是重大事故支付必须关联用户。我们设计了两套扫描策略策略一全字段空值热力图适用于100GB数据from pyspark.sql.functions import col, when, count, isnull, isnan, lit def null_heatmap(df, sample_ratio0.1): 生成空值热力图每列空值率 每列非空值Top5频次 sampled_df df.sample(sample_ratio) if sample_ratio 1.0 else df # 步骤1计算每列空值率 null_counts sampled_df.agg(*[ (count(when(isnull(c) | isnan(c), c)) / count(lit(1))).alias(f{c}_null_rate) for c in sampled_df.columns ]).collect()[0] # 步骤2计算每列非空值Top5字符串/数值分别处理 top5_dict {} for c in sampled_df.columns: # 过滤空值取Top5 top5 sampled_df.filter(~isnull(c) ~isnan(c)) \ .groupBy(c).count() \ .orderBy(count, ascendingFalse) \ .limit(5) \ .rdd.map(lambda r: (r[c], r[count])).collect() top5_dict[c] top5 # 合并结果 result {} for c in sampled_df.columns: result[c] { null_rate: null_counts[f{c}_null_rate], top5_values: top5_dict[c] } return result # 使用 heatmap null_heatmap(df_spark, sample_ratio0.05) # 5%采样 for col, info in heatmap.items(): print(f{col:20} | null_rate: {info[null_rate]:.4f} | top5: {info[top5_values]})策略二分区级空值钻取适用于PB级数据当全量扫描太慢我们转向“问题驱动”先用df.groupBy(date).count().orderBy(count).show(5)找到数据量最少的分区再针对性检查该分区空值。因为数据量突降往往伴随空值激增。命令如下# 在Spark Shell中快速执行 df.filter(col(date) 2023-10-01) \ .select([count(when(isnull(c), c)).alias(c) for c in [user_id, order_id, amount]]) \ .show()实操心得我们曾用策略一发现device_id列空值率高达42%但Top5非空值全是unknown。深入查证发现SDK版本升级后旧版设备指纹算法失效新设备无法生成ID但上报逻辑未拦截导致大量unknown污染数据。这个发现直接推动产品团队在客户端增加ID生成失败的上报埋点。3.3 数值分布与异常检测超越describe()的五维诊断法Pyspark的df.describe()只返回count、mean、stddev、min、max对长尾分布、多峰分布、离群点完全失语。我们构建了“五维诊断法”每维对应一个Spark SQL函数组合维度Spark实现业务意义典型案例1. 分位数分布df.approxQuantile(amount, [0.01,0.25,0.5,0.75,0.99], 0.01)查看数据“胖瘦”0.01/0.99分位数比min/max更抗噪支付金额0.01分位数是0.01元测试数据0.99分位数是99999元黑产刷单而mean被拉高到2300元误导性极强2. 偏度与峰度df.agg(skewness(amount).alias(skew), kurtosis(amount).alias(kurt)).collect()skew1右偏如收入-1左偏如退货率kurt3尖峰集中3平峰分散用户停留时长skew4.2说明多数人看1页就走少数人深度浏览需分层运营3. 箱线图四分位距df.agg(q1percentile_approx(amount, 0.25), q3percentile_approx(amount, 0.75)).collect()IQR Q3-Q1异常点定义为 Q1-1.5IQR 或 Q31.5IQR计算IQR后发现99.7%的订单金额在[IQR下限, IQR上限]内但存在0.3%的“幽灵订单”金额0或极大需单独清洗4. 零值密度df.agg((count(when(col(amount)0, 1))/count(lit(1))).alias(zero_rate)).collect()零不是异常但零值率突变是信号某日discount_amount零值率从95%突降至60%定位到优惠券系统配置错误大量发放了非零优惠5. 跨字段一致性df.filter(col(paid_amount) 0).filter(col(order_status) unpaid).count()检查业务逻辑矛盾发现“已支付金额0但订单状态未支付”的记录暴露支付网关与订单中心状态同步延迟这套方法封装成函数每日定时运行输出JSON报告def numeric_diagnosis(df, cols): from pyspark.sql.functions import col, when, count, lit, skewness, kurtosis, percentile_approx results {} for c in cols: # 分位数 quantiles df.approxQuantile(c, [0.01,0.25,0.5,0.75,0.99], 0.01) # 偏度峰度 sk_ku df.agg(skewness(c).alias(skew), kurtosis(c).alias(kurt)).collect()[0] # IQR iqr_df df.agg( percentile_approx(c, 0.25).alias(q1), percentile_approx(c, 0.75).alias(q3) ).collect()[0] iqr iqr_df[q3] - iqr_df[q1] # 零值率 zero_rate df.agg((count(when(col(c)0, 1))/count(lit(1))).alias(zr)).collect()[0][zr] results[c] { quantiles: quantiles, skewness: float(sk_ku[skew]), kurtosis: float(sk_ku[kurt]), iqr: float(iqr), zero_rate: float(zero_rate) } return results注意approxQuantile和percentile_approx是近似算法误差率可控第三个参数比精确percentile快10倍以上。我们设误差率为0.01意味着99%的分位数结果在真实值±1%内这对EDA完全足够。4. 实操过程与核心环节实现从集群连接到自动化报告4.1 环境准备与连接绕过最常见的5个Driver崩溃陷阱Pyspark EDA的第一道坎往往是环境配置。我们整理了生产环境中最常导致Driver崩溃的5个陷阱及解决方案陷阱1Driver内存不足最常见现象java.lang.OutOfMemoryError: Java heap space尤其在collect()大结果集时。解决方案启动SparkSession时显式设置Driver内存。from pyspark.sql import SparkSession spark SparkSession.builder \ .appName(EDA-Job) \ .config(spark.driver.memory, 8g) \ # 关键默认仅1g .config(spark.driver.maxResultSize, 4g) \ # 限制collect()返回大小 .config(spark.sql.adaptive.enabled, true) \ # 启用AQE优化 .getOrCreate()实操心得我们曾将spark.driver.memory从2g升至8g使df.agg(...).collect()处理千万行聚合结果的成功率从30%提升至100%。但切记maxResultSize必须小于driver.memory否则无效。陷阱2序列化失败Kryo vs Java现象org.apache.spark.SparkException: Failed to serialize function尤其在使用lambda或自定义UDF时。解决方案强制使用Kryo序列化器并注册常用类。spark SparkSession.builder \ .config(spark.serializer, org.apache.spark.serializer.KryoSerializer) \ .config(spark.kryo.registrationRequired, false) \ # 生产环境建议设为true并注册 .getOrCreate()陷阱3Hive Metastore连接超时现象org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException在读取Hive表时。解决方案增加超时并启用连接池。spark SparkSession.builder \ .config(hive.metastore.uris, thrift://metastore:9083) \ .config(spark.sql.hive.metastore heartbeat interval, 30000) \ # 30秒心跳 .config(spark.sql.hive.metastore connection pool size, 10) \ .enableHiveSupport() \ .getOrCreate()陷阱4S3/ADLS权限错误云环境高频现象com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied。解决方案使用IAM RoleAWS或Managed IdentityAzure而非AKSK硬编码。# AWS EMR最佳实践无需配置EC2 Instance Profile自动授权 spark SparkSession.builder \ .config(spark.hadoop.fs.s3a.impl, org.apache.hadoop.fs.s3a.S3AFileSystem) \ .config(spark.hadoop.fs.s3a.aws.credentials.provider, com.amazonaws.auth.InstanceProfileCredentialsProvider) \ .getOrCreate()陷阱5Python版本不匹配现象ModuleNotFoundError: No module named pyspark在Executor上。解决方案分发Python环境。spark SparkSession.builder \ .config(spark.pyspark.python, /opt/conda/bin/python) \ # Driver Python路径 .config(spark.pyspark.driver.python, /opt/conda/bin/python) \ # Executor Python路径 .config(spark.archives, pyenv.tar.gz#environment) \ # 打包conda环境 .getOrCreate()4.2 完整EDA流水线从数据加载到HTML报告生成我们把日常EDA固化为一条可复用的流水线包含6个核心步骤每个步骤输出可审计的日志import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from datetime import datetime import json class SparkEDA: def __init__(self, spark, data_path, table_name): self.spark spark self.data_path data_path self.table_name table_name self.report {metadata: {}, diagnostics: {}, timestamp: str(datetime.now())} def load_data(self): 步骤1加载数据带容错 try: if self.data_path.endswith(.parquet): self.df self.spark.read.parquet(self.data_path) elif self.data_path.endswith(.csv): self.df self.spark.read.option(header, true).csv(self.data_path) else: self.df self.spark.read.table(self.table_name) self.report[metadata][row_count] self.df.count() # 触发action self.report[metadata][column_count] len(self.df.columns) print(f✅ 加载成功{self.report[metadata][row_count]}行{self.report[metadata][column_count]}列) except Exception as e: print(f❌ 加载失败{e}) raise def schema_inspect(self): 步骤2Schema深度检查 from pyspark.sql.types import StructType, DecimalType, TimestampType schema self.df.schema self.report[diagnostics][schema] { fields: [], decimal_fields: [], timestamp_fields: [] } for field in schema.fields: self.report[diagnostics][schema][fields].append({ name: field.name, type: str(field.dataType), nullable: field.nullable }) if isinstance(field.dataType, DecimalType): self.report[diagnostics][schema][decimal_fields].append({ name: field.name, precision: field.dataType.precision, scale: field.dataType.scale }) if isinstance(field.dataType, TimestampType): self.report[diagnostics][schema][timestamp_fields].append(field.name) def null_analysis(self, sample_ratio0.05): 步骤3空值分析 sampled_df self.df.sample(sample_ratio) null_stats sampled_df.agg(*[ (count(when(isnull(c) | isnan(c), c)) / count(lit(1))).alias(f{c}_null_rate) for c in sampled_df.columns ]).collect()[0] self.report[diagnostics][null_rates] { c: float(null_stats[f{c}_null_rate]) for c in sampled_df.columns } # 找出空值率5%的列 high_null_cols [c for c, r in self.report[diagnostics][null_rates].items() if r 0.05] if high_null_cols: print(f⚠️ 高空值率列{high_null_cols}) def numeric_diagnosis(self, numeric_cols): 步骤4数值分布诊断 self.report[diagnostics][numeric] {} for c in numeric_cols: try: quantiles self.df.approxQuantile(c, [0.01,0.25,0.5,0.75,0.99], 0.01) sk_ku self.df.agg(skewness(c).alias(skew), kurtosis(c).alias(kurt)).collect()[0] self.report[diagnostics][numeric][c] { quantiles: quantiles, skewness: float(sk_ku[skew]), kurtosis: float(sk_ku[kurt]) } except Exception as e: print(f❌ 数值诊断失败 {c}: {e}) def generate_report(self, output_patheda_report.html): 步骤5生成HTML报告 # 将Spark结果转为Pandas进行绘图 null_df pd.DataFrame(list(self.report[diagnostics][null_rates].items()), columns[column, null_rate]) plt.figure(figsize(10, 6)) sns.barplot(datanull_df.sort_values(null_rate, ascendingFalse).head(10), xnull_rate, ycolumn) plt.title(Top 10 Columns by Null Rate) plt.tight_layout() plt.savefig(null_rate_plot.png, dpi150, bbox_inchestight) # 生成HTML html_content f htmlbody h1EDA Report for {self.table_name}/h1 h2Metadata/h2 pRow Count: {self.report[metadata][row_count]}/p pColumn Count: {self.report[metadata][column_count]}/p h2Null Rate Heatmap/h2 img srcnull_rate_plot.png h2Full JSON Report/h2 pre{json.dumps(self.report, indent2, defaultstr)}/pre /body/html with open(output_path, w) as f: f.write(html_content) print(f✅ HTML报告已生成{output_path}) def run_full_eda(self, numeric_colsNone): 步骤6执行全流程 self.load_data() self.schema_inspect() self.null_analysis() if numeric_cols: self.numeric_diagnosis(numeric_cols) self.generate_report() # 使用示例 eda SparkEDA(spark, s3://my-bucket/data/events/, events_table) eda.run_full_eda(numeric_cols[duration_ms, revenue_usd])这条流水线已在我们团队运行超2年平均每次执行耗时10GB数据2分18秒100GB数据14分05秒1TB数据1小时52分钟主要耗时在count()和approxQuantile关键技巧我们把run_full_eda()封装成Airflow DAG每天凌晨2点自动扫描所有核心表异常结果自动钉钉告警。告警规则包括null_rate 0.1、skewness 5、row_count yesterday * 0.8。这让我们在数据管道故障发生后15分钟内就能收到通知远早于业务方投诉。4.3 可视化落地如何用PandasMatplotlib安全绘制Spark结果Pyspark本身不提供绘图能力必须将结果collect()到Driver再用Pandas绘图。但collect()有巨大风险我们制定了三条铁律铁律1永远先count()再collect()# ❌ 危险 result_df df.groupBy(category).count() pandas_df result_df.toPandas() # 如果category有100万种Pandas会OOM # ✅ 安全 count result_df.count() # 先知道有多少行 if count 10000: # 阈值可配置 pandas_df result_df.toPandas() sns.barplot(datapandas_df, xcategory, ycount) else: print(f⚠️ 结果行数{count}超限跳过绘图仅保存CSV) result_df.coalesce(1).write.mode(overwrite).csv(output/category_count)铁律2对长文本字段做哈希截断当collect()含string列时避免传输超长文本from pyspark.sql.functions import substring, sha2, length # 对content字段只取前50字符sha2后8位 safe_df df.select( col(id), substring(col(content), 1, 50).alias(content_preview), substring(sha2(col(content), 256), 1, 8).alias(content_hash) ) pandas_df safe_df.collect() # 安全铁律3用toPandas()而非collect()pd.DataFrame()toPandas()是Spark优化过的转换比手动collect()再构造DataFrame快3倍且自动处理None到NaN的映射# ✅ 推荐 pandas_df df.select(user_id, amount).toPandas() # ❌ 不推荐 rows df.select(user_id, amount).collect() pandas_df pd.DataFrame(rows, columns[user_id, amount])我们最终的可视化模板长这样def plot_distribution(spark_df, col_name, bins50, titleDistribution): 安全绘制单列分布直方图 # 步骤1采样并转Pandas sampled_spark spark_df.sample(0.1).select(col_name) count sampled_spark.count() if count 0: print(f⚠️ 列{col_name}无数据) return # 步骤2转Pandas并过滤空值 pandas_df sampled_spark.toPandas() series pandas_df[col_name].dropna() # 步骤3绘制 plt.figure(figsize(10, 6)) plt.hist(series, binsbins, alpha0.7, edgecolorblack) plt.title(f{title} ({count} samples)) plt.xlabel(col_name) plt.ylabel(Frequency) plt.grid(True, alpha0.3) plt.show() # 使用 plot_distribution(df_spark, amount, bins100, titlePayment Amount Distribution)5. 常见问题与排查技巧实录来自27个真实项目的血泪总结5.1 “为什么我的df.count()要跑10分钟”——Shuffle与分区倾斜实战排查count()是Pyspark中最常被低估的“重操作”。它本质是reduce每个Executor计算本地分区行数再由Driver汇总。当出现以下情况时count()会异常缓慢分区数量远超Executor核心数比如1000个分区但只有10个Executor每个Executor要处理100个分区CPU空转。分区大小极度不均90%数据集中在1个分区如date2023-01-01该Executor成为瓶颈。Shuffle spill到磁盘Executor内存不足把中间结果写到本地磁盘IO拖慢整体。排查四步法看Stage UI在Spark UI的Stages页找到count()对应的Stage观察Task数量是否远大于Executor数量是否有Task耗时远超其他5倍Shuffle Write和Shuffle Read量是否巨大查分区信息# 查看分区数 print(f分区数: {df.rdd.getNumPartitions