1. 什么是 Apache ArrowApache Arrow 是一种内存列式数据格式。在 PySpark 里它的核心作用是提升 JVM 与 Python 之间的数据传输效率因此对经常使用 Pandas、NumPy 的 Python 用户尤其有价值。不过 Arrow 并不会自动在所有场景下生效通常需要额外的配置或特定 API 才能启用。2. 使用 Arrow 的前提要在 PySpark 中使用 Arrow首先要确保安装了推荐版本的 PyArrow。官方说明中提到如果你通过pip安装 PySpark可以使用下面的方式安装 SQL 相关依赖pipinstallpyspark[sql]如果不是这种安装方式就需要手动保证 PyArrow 在所有集群节点上都可用。官方还说明在pyspark.sql场景下最低支持版本是 Pandas 2.2.0 和 PyArrow 11.0.0。3. Spark 与 PyArrow Table 互转从 Spark 4.0 开始Spark DataFrame 与 PyArrow Table 可以直接互转SparkSession.createDataFrame(pyarrow_table)DataFrame.toArrow()importpyarrowaspaimportnumpyasnp tablepa.table([pa.array(np.random.rand(100))for_inrange(3)],names[a,b,c])dfspark.createDataFrame(table)result_tabledf.select(*).toArrow()print(result_table.schema)需要注意的是DataFrame.toArrow()会把 DataFrame 全部收集到 Driver 端因此只适合小数据集。另外并不是所有 Spark 和 Arrow 类型都完全支持遇到不支持的类型会直接报错。4. 启用 Arrow 优化 Pandas 转换Arrow 最常见的使用场景是优化 Spark DataFrame 与 Pandas DataFrame 的互转SparkSession.createDataFrame(pandas_df)DataFrame.toPandas()要启用这类优化需要先打开配置importnumpyasnpimportpandasaspd spark.conf.set(spark.sql.execution.arrow.pyspark.enabled,true)pdfpd.DataFrame(np.random.rand(100,3))dfspark.createDataFrame(pdf)result_pdfdf.select(*).toPandas()print(result_pdf.describe())默认情况下这个配置是关闭的。官方还说明如果在真正执行前发生错误Spark 可以自动回退到非 Arrow 实现这由spark.sql.execution.arrow.pyspark.fallback.enabled控制。同样要记住即使开启 ArrowtoPandas()仍然会把所有数据收集到 Driver因此它依然只适合小数据集。5. Pandas UDFArrow 最常见的高性能入口Pandas UDF 本质上就是借助 Arrow 传输数据再用 Pandas 执行向量化计算因此它通常比普通 Python UDF 更适合数值计算和批处理逻辑。官方文档说明Pandas UDF 可以通过pandas_udf()装饰器定义不需要额外配置。5.1 Series to Series最常见的一种形式是输入pandas.Series输出pandas.Seriesimportpandasaspdfrompyspark.sql.functionsimportcol,pandas_udffrompyspark.sql.typesimportLongTypedefmultiply_func(a:pd.Series,b:pd.Series)-pd.Series:returna*b multiplypandas_udf(multiply_func,returnTypeLongType())dfspark.createDataFrame(pd.DataFrame([1,2,3],columns[x]))df.select(multiply(col(x),col(x))).show()这种形式要求输出长度和输入长度一致。5.2 Iterator of Series to Iterator of Series如果你的函数需要一次初始化状态然后对多个批次复用可以使用迭代器形式fromtypingimportIteratorimportpandasaspdfrompyspark.sql.functionsimportpandas_udfpandas_udf(long)defplus_one(iterator:Iterator[pd.Series])-Iterator[pd.Series]:forxiniterator:yieldx1这种方式适合“初始化一次多批次重复使用”的场景。5.3 多列输入如果需要多个输入列可以写成“多 Series 迭代器”形式fromtypingimportIterator,Tupleimportpandasaspdfrompyspark.sql.functionsimportpandas_udfpandas_udf(long)defmultiply_two_cols(iterator:Iterator[Tuple[pd.Series,pd.Series]])-Iterator[pd.Series]:fora,biniterator:yielda*b这种方式适合多列联动计算。5.4 Series to ScalarPandas UDF 还可以做聚合输入 Series输出一个标量importpandasaspdfrompyspark.sql.functionsimportpandas_udffrompyspark.sqlimportWindow dfspark.createDataFrame([(1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)],(id,v))pandas_udf(double)defmean_udf(v:pd.Series)-float:returnv.mean()df.groupby(id).agg(mean_udf(df[v])).show()官方特别提醒这类 UDF 不支持部分聚合分组或窗口中的所有数据都会被加载到内存里因此大组数据要特别小心内存压力。6. Pandas Function APIs除了 Pandas UDFPySpark 还提供了 Pandas Function APIs。它们内部同样依赖 Arrow 进行数据传输但对外表现为 DataFrame 级别 API而不是列级别 API。官方重点提到三类applyInPandas()、mapInPandas()和cogroup().applyInPandas()。6.1applyInPandas()按组处理groupBy().applyInPandas()适合对每个分组执行 pandas.DataFrame 级别的逻辑importpandasaspd dfspark.createDataFrame([(1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)],(id,v))defsubtract_mean(pdf:pd.DataFrame)-pd.DataFrame:returnpdf.assign(vpdf.v-pdf.v.mean())df.groupby(id).applyInPandas(subtract_mean,schemaid long, v double).show()官方明确说明每个组的所有数据都会先加载到内存再交给函数处理因此如果分组倾斜严重很容易出现 OOM。6.2mapInPandas()批次映射mapInPandas()可以把一个pandas.DataFrame迭代器映射成另一个迭代器输出长度可以任意变化fromtypingimportIterableimportpandasaspd dfspark.createDataFrame([(1,21),(2,30)],(id,age))deffilter_func(iterator:Iterable[pd.DataFrame])-Iterable[pd.DataFrame]:forpdfiniterator:yieldpdf[pdf.id1]df.mapInPandas(filter_func,schemadf.schema).show()它很适合需要自定义过滤、拆分、重组批次数据的场景。6.3cogroup().applyInPandas()双表按组处理如果需要两个 DataFrame 按同一键分组后再联合处理可以使用 cogroup 版本importpandasaspd df1spark.createDataFrame([(20000101,1,1.0),(20000101,2,2.0),(20000102,1,3.0),(20000102,2,4.0)],(time,id,v1))df2spark.createDataFrame([(20000101,1,x),(20000101,2,y)],(time,id,v2))defmerge_ordered(left:pd.DataFrame,right:pd.DataFrame)-pd.DataFrame:returnpd.merge_ordered(left,right)df1.groupby(id).cogroup(df2.groupby(id)).applyInPandas(merge_ordered,schematime int, id int, v1 double, v2 string).show()同样官方提醒 cogroup 的所有数据也会先加载到内存中因此要小心大组问题。7. Arrow Python UDF除了 Pandas UDF官方还引入了 Arrow Python UDF。它依然是逐行执行但使用 Arrow 进行高效批量传输和序列化。定义时需要在udf()里设置useArrowTrue或者全局打开spark.sql.execution.pythonUDF.arrow.enabled。frompyspark.sql.functionsimportudfudf(returnTypeint)defslen(s):returnlen(s)udf(returnTypeint,useArrowTrue)defarrow_slen(s):returnlen(s)dfspark.createDataFrame([(1,John Doe,21)],(id,name,age))df.select(slen(name),arrow_slen(name)).show()官方指出相比默认的 pickled Python UDFArrow Python UDF 在类型转换机制上更一致也更能减少类型不匹配带来的歧义和数据损失。8. 使用 Arrow 时要注意的几个点8.1 并非所有类型都支持官方说明目前 Arrow 转换支持大多数 Spark SQL 类型但ArrayType(TimestampType)仍然不支持而MapType和嵌套StructType的ArrayType需要 PyArrow 2.0.0 及以上版本。8.2 批大小会影响内存Spark 会把数据分区转换为 Arrow record batch。默认情况下每个 batch 最大是 10000 行由spark.sql.execution.arrow.maxRecordsPerBatch控制。如果列很多应该适当降低这个值以避免 JVM 内存压力过大。8.3 时间戳语义要特别小心Spark 内部以 UTC 存储时间戳而 Pandas 使用的是datetime64[ns]。官方说明Spark → Pandas会转换到 Spark session 时区并显示为本地时间Spark → PyArrow Table保持 UTC 和微秒精度Pandas / PyArrow → Spark会转成 UTC 微秒纳秒会被截断所以时间戳列在 Arrow 场景下虽然会自动转换但跨系统和跨时区时一定要明确 session 时区配置。8.4toPandas()和toArrow()仍然是 Driver 收集操作即使开启 Arrow这两个 API 本质上仍是“全量收集到 Driver”不是分布式持久化接口因此不能把它们当成大数据量导出方案。8.5self_destruct可以省内存从 Spark 3.2 开始可以开启spark.sql.execution.arrow.pyspark.selfDestruct.enabled来减少toPandas()或toArrow()转换时的内存占用但这是实验特性可能带来只读数组问题甚至让部分 Pandas 操作报错。官方还提醒这种模式通常会更慢因为它是单线程的。9. 什么时候优先考虑 Arrow如果你的场景符合下面几类Arrow 往往值得优先考虑Spark DataFrame 与 Pandas DataFrame 频繁互转需要使用 Pandas UDF 做向量化处理需要applyInPandas()、mapInPandas()这类 Pandas Function APIsPython UDF 性能瓶颈明显且可以尝试 Arrow Python UDF如果只是普通 Spark SQL 查询或者完全不涉及 Pandas / NumPy / Python UDF那么 Arrow 带来的收益通常没那么明显。这个结论与官方对 Arrow 适用场景的整体定位是一致的。10. 总结Arrow 在 PySpark 里的价值核心不是“多了一个配置项”而是它打通了 Spark、Pandas、PyArrow 之间更高效的数据交换路径。你可以把它理解成 PySpark 与 Python 数据生态之间的高速通道DataFrame 转 Pandas 更快Pandas UDF 更高效applyInPandas()和mapInPandas()这类 API 也能更自然地发挥作用。当然它并不是万能开关内存、类型支持、时间戳语义和 Driver 收集风险仍然要重点关注。只要把这些边界想清楚Arrow 基本就是 PySpark Python 侧性能优化里绕不开的一环。
Apache Arrow 在 PySpark 中的使用提速 Pandas 转换与 UDF 的关键武器
发布时间:2026/5/26 5:26:56
1. 什么是 Apache ArrowApache Arrow 是一种内存列式数据格式。在 PySpark 里它的核心作用是提升 JVM 与 Python 之间的数据传输效率因此对经常使用 Pandas、NumPy 的 Python 用户尤其有价值。不过 Arrow 并不会自动在所有场景下生效通常需要额外的配置或特定 API 才能启用。2. 使用 Arrow 的前提要在 PySpark 中使用 Arrow首先要确保安装了推荐版本的 PyArrow。官方说明中提到如果你通过pip安装 PySpark可以使用下面的方式安装 SQL 相关依赖pipinstallpyspark[sql]如果不是这种安装方式就需要手动保证 PyArrow 在所有集群节点上都可用。官方还说明在pyspark.sql场景下最低支持版本是 Pandas 2.2.0 和 PyArrow 11.0.0。3. Spark 与 PyArrow Table 互转从 Spark 4.0 开始Spark DataFrame 与 PyArrow Table 可以直接互转SparkSession.createDataFrame(pyarrow_table)DataFrame.toArrow()importpyarrowaspaimportnumpyasnp tablepa.table([pa.array(np.random.rand(100))for_inrange(3)],names[a,b,c])dfspark.createDataFrame(table)result_tabledf.select(*).toArrow()print(result_table.schema)需要注意的是DataFrame.toArrow()会把 DataFrame 全部收集到 Driver 端因此只适合小数据集。另外并不是所有 Spark 和 Arrow 类型都完全支持遇到不支持的类型会直接报错。4. 启用 Arrow 优化 Pandas 转换Arrow 最常见的使用场景是优化 Spark DataFrame 与 Pandas DataFrame 的互转SparkSession.createDataFrame(pandas_df)DataFrame.toPandas()要启用这类优化需要先打开配置importnumpyasnpimportpandasaspd spark.conf.set(spark.sql.execution.arrow.pyspark.enabled,true)pdfpd.DataFrame(np.random.rand(100,3))dfspark.createDataFrame(pdf)result_pdfdf.select(*).toPandas()print(result_pdf.describe())默认情况下这个配置是关闭的。官方还说明如果在真正执行前发生错误Spark 可以自动回退到非 Arrow 实现这由spark.sql.execution.arrow.pyspark.fallback.enabled控制。同样要记住即使开启 ArrowtoPandas()仍然会把所有数据收集到 Driver因此它依然只适合小数据集。5. Pandas UDFArrow 最常见的高性能入口Pandas UDF 本质上就是借助 Arrow 传输数据再用 Pandas 执行向量化计算因此它通常比普通 Python UDF 更适合数值计算和批处理逻辑。官方文档说明Pandas UDF 可以通过pandas_udf()装饰器定义不需要额外配置。5.1 Series to Series最常见的一种形式是输入pandas.Series输出pandas.Seriesimportpandasaspdfrompyspark.sql.functionsimportcol,pandas_udffrompyspark.sql.typesimportLongTypedefmultiply_func(a:pd.Series,b:pd.Series)-pd.Series:returna*b multiplypandas_udf(multiply_func,returnTypeLongType())dfspark.createDataFrame(pd.DataFrame([1,2,3],columns[x]))df.select(multiply(col(x),col(x))).show()这种形式要求输出长度和输入长度一致。5.2 Iterator of Series to Iterator of Series如果你的函数需要一次初始化状态然后对多个批次复用可以使用迭代器形式fromtypingimportIteratorimportpandasaspdfrompyspark.sql.functionsimportpandas_udfpandas_udf(long)defplus_one(iterator:Iterator[pd.Series])-Iterator[pd.Series]:forxiniterator:yieldx1这种方式适合“初始化一次多批次重复使用”的场景。5.3 多列输入如果需要多个输入列可以写成“多 Series 迭代器”形式fromtypingimportIterator,Tupleimportpandasaspdfrompyspark.sql.functionsimportpandas_udfpandas_udf(long)defmultiply_two_cols(iterator:Iterator[Tuple[pd.Series,pd.Series]])-Iterator[pd.Series]:fora,biniterator:yielda*b这种方式适合多列联动计算。5.4 Series to ScalarPandas UDF 还可以做聚合输入 Series输出一个标量importpandasaspdfrompyspark.sql.functionsimportpandas_udffrompyspark.sqlimportWindow dfspark.createDataFrame([(1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)],(id,v))pandas_udf(double)defmean_udf(v:pd.Series)-float:returnv.mean()df.groupby(id).agg(mean_udf(df[v])).show()官方特别提醒这类 UDF 不支持部分聚合分组或窗口中的所有数据都会被加载到内存里因此大组数据要特别小心内存压力。6. Pandas Function APIs除了 Pandas UDFPySpark 还提供了 Pandas Function APIs。它们内部同样依赖 Arrow 进行数据传输但对外表现为 DataFrame 级别 API而不是列级别 API。官方重点提到三类applyInPandas()、mapInPandas()和cogroup().applyInPandas()。6.1applyInPandas()按组处理groupBy().applyInPandas()适合对每个分组执行 pandas.DataFrame 级别的逻辑importpandasaspd dfspark.createDataFrame([(1,1.0),(1,2.0),(2,3.0),(2,5.0),(2,10.0)],(id,v))defsubtract_mean(pdf:pd.DataFrame)-pd.DataFrame:returnpdf.assign(vpdf.v-pdf.v.mean())df.groupby(id).applyInPandas(subtract_mean,schemaid long, v double).show()官方明确说明每个组的所有数据都会先加载到内存再交给函数处理因此如果分组倾斜严重很容易出现 OOM。6.2mapInPandas()批次映射mapInPandas()可以把一个pandas.DataFrame迭代器映射成另一个迭代器输出长度可以任意变化fromtypingimportIterableimportpandasaspd dfspark.createDataFrame([(1,21),(2,30)],(id,age))deffilter_func(iterator:Iterable[pd.DataFrame])-Iterable[pd.DataFrame]:forpdfiniterator:yieldpdf[pdf.id1]df.mapInPandas(filter_func,schemadf.schema).show()它很适合需要自定义过滤、拆分、重组批次数据的场景。6.3cogroup().applyInPandas()双表按组处理如果需要两个 DataFrame 按同一键分组后再联合处理可以使用 cogroup 版本importpandasaspd df1spark.createDataFrame([(20000101,1,1.0),(20000101,2,2.0),(20000102,1,3.0),(20000102,2,4.0)],(time,id,v1))df2spark.createDataFrame([(20000101,1,x),(20000101,2,y)],(time,id,v2))defmerge_ordered(left:pd.DataFrame,right:pd.DataFrame)-pd.DataFrame:returnpd.merge_ordered(left,right)df1.groupby(id).cogroup(df2.groupby(id)).applyInPandas(merge_ordered,schematime int, id int, v1 double, v2 string).show()同样官方提醒 cogroup 的所有数据也会先加载到内存中因此要小心大组问题。7. Arrow Python UDF除了 Pandas UDF官方还引入了 Arrow Python UDF。它依然是逐行执行但使用 Arrow 进行高效批量传输和序列化。定义时需要在udf()里设置useArrowTrue或者全局打开spark.sql.execution.pythonUDF.arrow.enabled。frompyspark.sql.functionsimportudfudf(returnTypeint)defslen(s):returnlen(s)udf(returnTypeint,useArrowTrue)defarrow_slen(s):returnlen(s)dfspark.createDataFrame([(1,John Doe,21)],(id,name,age))df.select(slen(name),arrow_slen(name)).show()官方指出相比默认的 pickled Python UDFArrow Python UDF 在类型转换机制上更一致也更能减少类型不匹配带来的歧义和数据损失。8. 使用 Arrow 时要注意的几个点8.1 并非所有类型都支持官方说明目前 Arrow 转换支持大多数 Spark SQL 类型但ArrayType(TimestampType)仍然不支持而MapType和嵌套StructType的ArrayType需要 PyArrow 2.0.0 及以上版本。8.2 批大小会影响内存Spark 会把数据分区转换为 Arrow record batch。默认情况下每个 batch 最大是 10000 行由spark.sql.execution.arrow.maxRecordsPerBatch控制。如果列很多应该适当降低这个值以避免 JVM 内存压力过大。8.3 时间戳语义要特别小心Spark 内部以 UTC 存储时间戳而 Pandas 使用的是datetime64[ns]。官方说明Spark → Pandas会转换到 Spark session 时区并显示为本地时间Spark → PyArrow Table保持 UTC 和微秒精度Pandas / PyArrow → Spark会转成 UTC 微秒纳秒会被截断所以时间戳列在 Arrow 场景下虽然会自动转换但跨系统和跨时区时一定要明确 session 时区配置。8.4toPandas()和toArrow()仍然是 Driver 收集操作即使开启 Arrow这两个 API 本质上仍是“全量收集到 Driver”不是分布式持久化接口因此不能把它们当成大数据量导出方案。8.5self_destruct可以省内存从 Spark 3.2 开始可以开启spark.sql.execution.arrow.pyspark.selfDestruct.enabled来减少toPandas()或toArrow()转换时的内存占用但这是实验特性可能带来只读数组问题甚至让部分 Pandas 操作报错。官方还提醒这种模式通常会更慢因为它是单线程的。9. 什么时候优先考虑 Arrow如果你的场景符合下面几类Arrow 往往值得优先考虑Spark DataFrame 与 Pandas DataFrame 频繁互转需要使用 Pandas UDF 做向量化处理需要applyInPandas()、mapInPandas()这类 Pandas Function APIsPython UDF 性能瓶颈明显且可以尝试 Arrow Python UDF如果只是普通 Spark SQL 查询或者完全不涉及 Pandas / NumPy / Python UDF那么 Arrow 带来的收益通常没那么明显。这个结论与官方对 Arrow 适用场景的整体定位是一致的。10. 总结Arrow 在 PySpark 里的价值核心不是“多了一个配置项”而是它打通了 Spark、Pandas、PyArrow 之间更高效的数据交换路径。你可以把它理解成 PySpark 与 Python 数据生态之间的高速通道DataFrame 转 Pandas 更快Pandas UDF 更高效applyInPandas()和mapInPandas()这类 API 也能更自然地发挥作用。当然它并不是万能开关内存、类型支持、时间戳语义和 Driver 收集风险仍然要重点关注。只要把这些边界想清楚Arrow 基本就是 PySpark Python 侧性能优化里绕不开的一环。