摘要Spark SQL 是 Apache Spark 中用于结构化数据处理的模块。本文将深入剖析 Spark SQL 的核心概念、架构演进、DataFrame 与 DataSet 的底层原理以及实战编程技巧帮助你全面掌握这一大数据利器。一、Spark SQL 概述1.1 什么是 Spark SQLSpark SQL 是 Spark 用于**结构化数据structured data**处理的模块。它提供了 DataFrame 和 DataSet 两种编程抽象让开发者可以用更简洁、更高效的方式处理大规模数据。与传统的 Spark Core基于 RDD相比Spark SQL 的最大优势在于内置优化引擎通过 Catalyst 优化器自动生成最优执行计划统一数据访问用相同的方式连接 Hive、JSON、Parquet、JDBC 等多种数据源兼容 Hive可以直接在已有的 Hive 数据仓库上运行 SQL 或 HiveQL标准连接支持 JDBC/ODBC 连接方便 BI 工具对接1.2 Hive 与 Spark SQL 的演进关系1.2.1 时代背景Hive 是早期唯一运行在 Hadoop 上的 SQL-on-Hadoop 工具。但 MapReduce 计算过程中大量的中间磁盘落地消耗了大量 I/O严重降低了运行效率。为了提高 SQL-on-Hadoop 的效率多种工具应运而生工具特点Drill支持多种数据源的分布式 SQL 引擎ImpalaCloudera 开发的 MPP 查询引擎Shark基于 Hive 开发运行在 Spark 引擎上1.2.2 Shark 的诞生与局限Shark 是伯克利实验室 Spark 生态环境的组件之一它修改了 Hive 的内存管理、物理计划、执行三个模块使之能运行在 Spark 引擎上。Shark 的出现使得 SQL-on-Hadoop 的性能比 Hive 有了10-100 倍的提升。然而Shark 对 Hive 的过度依赖如采用 Hive 的语法解析器、查询优化器等制约了 Spark “One Stack Rule Them All” 的既定方针。2014 年 6 月 1 日Shark 项目停止开发团队将所有资源转向Spark SQL项目。1.2.3 Spark SQL 的两大分支Shark 停止后发展出两个支线Spark SQL作为 Spark 生态的一员继续发展不再受限于 Hive只是兼容 HiveHive on SparkHive 的发展计划将 Spark 作为 Hive 的底层引擎之一1.3 Spark SQL 的核心特点1.3.1 易整合无缝整合 SQL 查询和 Spark 编程支持 Java、Scala、Python 和 R 四种语言。1.3.2 统一的数据访问DataFrame 和 SQL 提供了访问多种数据源的通用方式包括 Hive、Avro、Parquet、ORC、JSON、JDBC 等甚至可以跨数据源 Join。1.3.3 兼容 HiveSpark SQL 复用 Hive 的前端和元数据存储Metastore与现有 Hive 数据、查询、UDF 完全兼容。1.3.4 标准数据连接提供 JDBC 和 ODBC 连接支持 BI 工具直接查询大数据。二、核心抽象DataFrame 与 DataSet2.1 DataFrame 是什么DataFrame 是一种以 RDD 为基础的分布式数据集类似于传统数据库中的二维表格。2.1.1 DataFrame 与 RDD 的本质区别特性RDDDataFrame类型信息无运行时才知道有 Schema编译期确定优化能力Stage 层面简单优化Catalyst 优化器深度优化API 风格函数式低门槛关系型更友好嵌套数据不支持支持 struct、array、map左侧 RDD[Person]Spark 框架本身不了解 Person 类的内部结构只是将其视为普通对象。右侧 DataFrame提供了详细的结构信息Spark SQL 清楚知道数据集包含哪些列每列的名称和类型是什么。2.1.2 Catalyst 优化器查询计划优化DataFrame 性能高于 RDD 的核心原因优化的执行计划。// 示例代码Join 后 Filterusers.join(events,users(id)events(uid)).filter(events(date)2015-01-01)优化过程逻辑计划先 Join 再 Filter优化计划将 Filter 下推到 Join 下方先对 DataFrame 过滤再 Join 较小的结果集智能数据源优化将 Filter 直接推入数据源如SELECT * FROM events WHERE date ...这种**谓词下推Predicate Pushdown**是 Spark SQL 性能提升的关键。2.1.3 性能对比从图中可以明显看出RDD groupByKey红色性能最差执行时间最高RDD reduceByKey蓝色性能中等DataFrame黄色性能最优执行时间最低2.2 DataSet 是什么DataSet 是 Spark 1.6 中添加的新抽象是DataFrame 的扩展。2.2.1 DataSet 的核心特性强类型如DataSet[Car]、DataSet[Person]RDD 的优势强类型检查、强大的 Lambda 函数能力Spark SQL 的优势优化执行引擎样例类映射用样例类定义数据结构属性名称直接映射到字段名2.2.2 DataFrame 与 DataSet 的关系typeDataFrameDataset[Row]DataFrame 是 DataSet 的特例每一行类型为Row。可以通过as方法将 DataFrame 转换为 DataSetcaseclassUser(name:String,age:Int)valdf:DataFrame...valds:Dataset[User]df.as[User]2.3 RDD、DataFrame、DataSet 三者关系2.3.1 版本演进Spark 1.0 RDDSpark 1.3 DataFrameSpark 1.6 DataSet2.3.2 三者的共性都是 Spark 平台下的分布式弹性数据集都有惰性机制Transformation 不会立即执行都有共同的函数filter、排序等都需要import spark.implicits._都会根据内存情况自动缓存运算都有partition概念2.3.3 三者的区别维度RDDDataFrameDataSet类型安全是否Row 类型是语法错误运行时编译时编译时序列化Java 序列化Tungsten 二进制Tungsten 二进制GC 性能差创建大量对象好好与 Spark MLlib同时使用一般不同时一般不同时Spark SQL 操作不支持支持支持2.3.4 互相转换// RDD - DataFramevaldfrdd.toDF(name,age)// RDD - DataSetcaseclassUser(name:String,age:Int)valdsrdd.map(tUser(t._1,t._2)).toDS// DataFrame - RDDvalrdddf.rdd// RDD[Row]// DataFrame - DataSetvaldsdf.as[User]// DataSet - RDDvalrddds.rdd// RDD[User]// DataSet - DataFramevaldfds.toDF三、Spark SQL 核心编程3.1 新的起点SparkSessionSpark Core 中需要构建SparkContext而 Spark SQL 提供了更高级的入口SparkSession。SparkSession 实质上是SQLContext 和 HiveContext 的组合内部封装了 SparkContext。importorg.apache.spark.sql.SparkSessionvalspark:SparkSessionSparkSession.builder().appName(SparkSQLDemo).master(local[*]).getOrCreate()// Spark 2.x 中spark-shell 会自动创建名为 spark 的 SparkSession启动 spark-shell 后你会看到Spark session available as spark.3.2 创建 DataFrame有三种方式3.2.1 从 Spark 数据源创建// 查看支持的数据源格式spark.read.// csv format jdbc json load option options orc parquet schema table text textFile// 读取 JSON 文件valdfspark.read.json(data/user.json)df.show()注意从文件中读取的数字不能确定是什么类型所以用bigint接收。3.2.2 从 RDD 转换// 方式1通过样例类转换caseclassUser(name:String,age:Int)valrddsc.makeRDD(List((zhangsan,30),(lisi,40)))valdfrdd.map(tUser(t._1,t._2)).toDF// 方式2指定 Schema 转换importorg.apache.spark.sql.types._valschemaStructType(Array(StructField(name,StringType),StructField(age,IntegerType)))valrowRDDrdd.map(tRow(t._1,t._2))valdfspark.createDataFrame(rowRDD,schema)3.2.3 从 Hive Table 查询// 需要配置 Hive 支持valsparkSparkSession.builder().appName(HiveDemo).config(spark.sql.warehouse.dir,/user/hive/warehouse).enableHiveSupport().getOrCreate()spark.sql(SELECT * FROM default.user_table).show()3.3 SQL 语法风格// 1. 创建 DataFramevaldfspark.read.json(data/user.json)// 2. 创建临时视图Session 级别df.createOrReplaceTempView(people)// 3. 使用 SQL 查询valsqlDFspark.sql(SELECT * FROM people WHERE age 20)sqlDF.show()// 4. 创建全局临时视图应用级别df.createGlobalTempView(people_global)spark.sql(SELECT * FROM global_temp.people_global).show()3.4 DSL 语法风格DataFrame 提供了一套领域特定语言DSL无需创建临时视图。valdfspark.read.json(data/user.json)// 查看 Schemadf.printSchema()// 选择列df.select(username).show()// 选择列并计算df.select($username,$age1asnewage).show()// 过滤df.filter($age30).show()// 分组聚合df.groupBy(age).count().show()3.5 DataFrame 与 RDD 互转// DataFrame - RDDvaldfsc.makeRDD(List((zhangsan,30),(lisi,40))).map(tUser(t._1,t._2)).toDFvalrdddf.rdd// RDD[Row]valarrayrdd.collect// array: Array[Row] Array([zhangsan,30], [lisi,40])// 获取字段值array(0).getAs[String](name)// zhangsanarray(0)(0)// zhangsan3.6 DataSet 编程3.6.1 创建 DataSet// 方式1使用样例类序列caseclassPerson(name:String,age:Long)valcaseClassDSSeq(Person(zhangsan,2)).toDS()// 方式2使用基本类型序列valdsSeq(1,2,3,4,5).toDS// 方式3从 RDD 转换valdssc.makeRDD(List((zhangsan,30),(lisi,49))).map(tUser(t._1,t._2)).toDS3.6.2 DataSet - RDDvalrddds.rdd// RDD[User]rdd.collect// Array[User] Array(User(zhangsan,30), User(lisi,49))四、用户自定义函数UDF/UDAF4.1 UDF用户自定义函数// 1. 创建 DataFramevaldfspark.read.json(data/user.json)// 2. 注册 UDFspark.udf.register(addName,(x:String)Name: x)// 3. 创建临时表df.createOrReplaceTempView(people)// 4. 应用 UDFspark.sql(SELECT addName(name), age FROM people).show()4.2 UDAF用户自定义聚合函数Spark 3.0 后推荐使用Aggregator强类型方式// 输入数据类型caseclassUser01(username:String,age:Long)// 缓存类型caseclassAgeBuffer(varsum:Long,varcount:Long)// 定义聚合函数classMyAverageUDAFextendsAggregator[User01,AgeBuffer,Double]{overridedefzero:AgeBufferAgeBuffer(0L,0L)overridedefreduce(b:AgeBuffer,a:User01):AgeBuffer{b.suma.age b.count1b}overridedefmerge(b1:AgeBuffer,b2:AgeBuffer):AgeBuffer{b1.sumb2.sum b1.countb2.count b1}overridedeffinish(reduction:AgeBuffer):Double{reduction.sum.toDouble/reduction.count}overridedefbufferEncoder:Encoder[AgeBuffer]Encoders.productoverridedefoutputEncoder:Encoder[Double]Encoders.scalaDouble}// 使用valdsspark.read.json(data/user.json).as[User01]valmyAvgnewMyAverageUDAFvalcolmyAvg.toColumn ds.select(col).show()五、数据加载与保存5.1 通用方式// 加载数据valdfspark.read.format(json).load(data/user.json)// 保存数据df.write.format(parquet).save(output/)// SaveMode 选项df.write.mode(SaveMode.Append).save(output/)// ErrorIfExists(default) / Append / Overwrite / Ignore5.2 常用数据源格式读取保存说明Parquetspark.read.load()df.write.save()默认格式列式存储JSONspark.read.json()df.write.json()每行一个 JSON 对象CSVspark.read.format(csv).option(header, true)df.write.csv()需指定分隔符、表头JDBCspark.read.jdbc(url, table, props)df.write.jdbc(url, table, props)需 JDBC 驱动5.3 MySQL 读写示例importjava.util.Properties// 读取valpropsnewProperties()props.setProperty(user,root)props.setProperty(password,123123)valdfspark.read.jdbc(jdbc:mysql://localhost:3306/spark-sql,user,props)// 写入valrddspark.sparkContext.makeRDD(List(User2(lisi,20),User2(zs,30)))valdsrdd.toDS ds.write.mode(SaveMode.Append).jdbc(jdbc:mysql://localhost:3306/spark-sql,user,props)5.4 Hive 集成// 1. 添加依赖// spark-hive_2.12, hive-exec, mysql-connector-java// 2. 拷贝 hive-site.xml 到 resources// 3. 代码实现valsparkSparkSession.builder().appName(HiveDemo).config(spark.sql.warehouse.dir,hdfs://localhost:8020/user/hive/warehouse).enableHiveSupport().getOrCreate()// 执行 Hive SQLspark.sql(SHOW TABLES).show()spark.sql(CREATE TABLE aa (id INT))spark.sql(LOAD DATA LOCAL INPATH input/ids.txt INTO TABLE aa)spark.sql(SELECT * FROM aa).show()六、实战项目各区域热门商品 Top36.1 数据准备-- 用户行为表CREATETABLEuser_visit_action(dateSTRING,user_idBIGINT,session_id STRING,page_idBIGINT,action_time STRING,search_keyword STRING,click_category_idBIGINT,click_product_idBIGINT,order_category_ids STRING,order_product_ids STRING,pay_category_ids STRING,pay_product_ids STRING,city_idBIGINT)ROWFORMAT DELIMITEDFIELDSTERMINATEDBY\t;-- 产品表CREATETABLEproduct_info(product_idBIGINT,product_name STRING,extend_info STRING)ROWFORMAT DELIMITEDFIELDSTERMINATEDBY\t;-- 城市表CREATETABLEcity_info(city_idBIGINT,city_name STRING,area STRING)ROWFORMAT DELIMITEDFIELDSTERMINATEDBY\t;6.2 需求分析计算各个区域前三大热门商品按点击量并备注每个商品在主要城市中的分布比例。输出示例地区商品名称点击次数城市备注华北商品A100000北京21.2%天津13.2%其他65.6%华北商品P80200北京63.0%太原10%其他27.0%6.3 实现思路连接三张表获取完整数据仅点击记录按地区和商品名称分组统计点击次数每个地区内按点击次数降序排列取 Top3自定义 UDAF 函数实现城市备注// 核心 SQL 逻辑spark.sql(SELECT area, product_name, click_count, city_remark(city_name) as city_remark FROM (SELECT area, product_name, count(*) as click_count, collect_list(city_name) as city_list, ROW_NUMBER() OVER (PARTITION BY area ORDER BY count(*) DESC) as rk FROM user_visit_action t1 JOIN city_info t2 ON t1.city_id t2.city_id JOIN product_info t3 ON t1.click_product_id t3.product_id WHERE click_product_id IS NOT NULL GROUP BY area, product_name) t WHERE rk 3).show()七、总结特性说明核心优势Catalyst 优化器、Tungsten 执行引擎、统一数据访问编程抽象DataFrame弱类型、DataSet强类型入口对象SparkSession封装了 SparkContext查询方式SQL 语法、DSL 语法扩展能力UDF、UDAFAggregator数据源JSON、Parquet、CSV、JDBC、Hive 等Spark SQL 极大地简化了大数据处理的开发复杂度同时通过底层优化保证了执行效率。在实际工作中DataSet/DataFrame 已基本取代 RDD 成为首选 API掌握 Spark SQL 是每一位大数据工程师的必修课。参考文档Apache Spark 官方文档、尚硅谷大数据技术之 SparkSQL V3.0Spark 版本3.0推荐环境Scala 2.12 / Java 8
大数据技术之SparkSQL
发布时间:2026/5/23 21:01:17
摘要Spark SQL 是 Apache Spark 中用于结构化数据处理的模块。本文将深入剖析 Spark SQL 的核心概念、架构演进、DataFrame 与 DataSet 的底层原理以及实战编程技巧帮助你全面掌握这一大数据利器。一、Spark SQL 概述1.1 什么是 Spark SQLSpark SQL 是 Spark 用于**结构化数据structured data**处理的模块。它提供了 DataFrame 和 DataSet 两种编程抽象让开发者可以用更简洁、更高效的方式处理大规模数据。与传统的 Spark Core基于 RDD相比Spark SQL 的最大优势在于内置优化引擎通过 Catalyst 优化器自动生成最优执行计划统一数据访问用相同的方式连接 Hive、JSON、Parquet、JDBC 等多种数据源兼容 Hive可以直接在已有的 Hive 数据仓库上运行 SQL 或 HiveQL标准连接支持 JDBC/ODBC 连接方便 BI 工具对接1.2 Hive 与 Spark SQL 的演进关系1.2.1 时代背景Hive 是早期唯一运行在 Hadoop 上的 SQL-on-Hadoop 工具。但 MapReduce 计算过程中大量的中间磁盘落地消耗了大量 I/O严重降低了运行效率。为了提高 SQL-on-Hadoop 的效率多种工具应运而生工具特点Drill支持多种数据源的分布式 SQL 引擎ImpalaCloudera 开发的 MPP 查询引擎Shark基于 Hive 开发运行在 Spark 引擎上1.2.2 Shark 的诞生与局限Shark 是伯克利实验室 Spark 生态环境的组件之一它修改了 Hive 的内存管理、物理计划、执行三个模块使之能运行在 Spark 引擎上。Shark 的出现使得 SQL-on-Hadoop 的性能比 Hive 有了10-100 倍的提升。然而Shark 对 Hive 的过度依赖如采用 Hive 的语法解析器、查询优化器等制约了 Spark “One Stack Rule Them All” 的既定方针。2014 年 6 月 1 日Shark 项目停止开发团队将所有资源转向Spark SQL项目。1.2.3 Spark SQL 的两大分支Shark 停止后发展出两个支线Spark SQL作为 Spark 生态的一员继续发展不再受限于 Hive只是兼容 HiveHive on SparkHive 的发展计划将 Spark 作为 Hive 的底层引擎之一1.3 Spark SQL 的核心特点1.3.1 易整合无缝整合 SQL 查询和 Spark 编程支持 Java、Scala、Python 和 R 四种语言。1.3.2 统一的数据访问DataFrame 和 SQL 提供了访问多种数据源的通用方式包括 Hive、Avro、Parquet、ORC、JSON、JDBC 等甚至可以跨数据源 Join。1.3.3 兼容 HiveSpark SQL 复用 Hive 的前端和元数据存储Metastore与现有 Hive 数据、查询、UDF 完全兼容。1.3.4 标准数据连接提供 JDBC 和 ODBC 连接支持 BI 工具直接查询大数据。二、核心抽象DataFrame 与 DataSet2.1 DataFrame 是什么DataFrame 是一种以 RDD 为基础的分布式数据集类似于传统数据库中的二维表格。2.1.1 DataFrame 与 RDD 的本质区别特性RDDDataFrame类型信息无运行时才知道有 Schema编译期确定优化能力Stage 层面简单优化Catalyst 优化器深度优化API 风格函数式低门槛关系型更友好嵌套数据不支持支持 struct、array、map左侧 RDD[Person]Spark 框架本身不了解 Person 类的内部结构只是将其视为普通对象。右侧 DataFrame提供了详细的结构信息Spark SQL 清楚知道数据集包含哪些列每列的名称和类型是什么。2.1.2 Catalyst 优化器查询计划优化DataFrame 性能高于 RDD 的核心原因优化的执行计划。// 示例代码Join 后 Filterusers.join(events,users(id)events(uid)).filter(events(date)2015-01-01)优化过程逻辑计划先 Join 再 Filter优化计划将 Filter 下推到 Join 下方先对 DataFrame 过滤再 Join 较小的结果集智能数据源优化将 Filter 直接推入数据源如SELECT * FROM events WHERE date ...这种**谓词下推Predicate Pushdown**是 Spark SQL 性能提升的关键。2.1.3 性能对比从图中可以明显看出RDD groupByKey红色性能最差执行时间最高RDD reduceByKey蓝色性能中等DataFrame黄色性能最优执行时间最低2.2 DataSet 是什么DataSet 是 Spark 1.6 中添加的新抽象是DataFrame 的扩展。2.2.1 DataSet 的核心特性强类型如DataSet[Car]、DataSet[Person]RDD 的优势强类型检查、强大的 Lambda 函数能力Spark SQL 的优势优化执行引擎样例类映射用样例类定义数据结构属性名称直接映射到字段名2.2.2 DataFrame 与 DataSet 的关系typeDataFrameDataset[Row]DataFrame 是 DataSet 的特例每一行类型为Row。可以通过as方法将 DataFrame 转换为 DataSetcaseclassUser(name:String,age:Int)valdf:DataFrame...valds:Dataset[User]df.as[User]2.3 RDD、DataFrame、DataSet 三者关系2.3.1 版本演进Spark 1.0 RDDSpark 1.3 DataFrameSpark 1.6 DataSet2.3.2 三者的共性都是 Spark 平台下的分布式弹性数据集都有惰性机制Transformation 不会立即执行都有共同的函数filter、排序等都需要import spark.implicits._都会根据内存情况自动缓存运算都有partition概念2.3.3 三者的区别维度RDDDataFrameDataSet类型安全是否Row 类型是语法错误运行时编译时编译时序列化Java 序列化Tungsten 二进制Tungsten 二进制GC 性能差创建大量对象好好与 Spark MLlib同时使用一般不同时一般不同时Spark SQL 操作不支持支持支持2.3.4 互相转换// RDD - DataFramevaldfrdd.toDF(name,age)// RDD - DataSetcaseclassUser(name:String,age:Int)valdsrdd.map(tUser(t._1,t._2)).toDS// DataFrame - RDDvalrdddf.rdd// RDD[Row]// DataFrame - DataSetvaldsdf.as[User]// DataSet - RDDvalrddds.rdd// RDD[User]// DataSet - DataFramevaldfds.toDF三、Spark SQL 核心编程3.1 新的起点SparkSessionSpark Core 中需要构建SparkContext而 Spark SQL 提供了更高级的入口SparkSession。SparkSession 实质上是SQLContext 和 HiveContext 的组合内部封装了 SparkContext。importorg.apache.spark.sql.SparkSessionvalspark:SparkSessionSparkSession.builder().appName(SparkSQLDemo).master(local[*]).getOrCreate()// Spark 2.x 中spark-shell 会自动创建名为 spark 的 SparkSession启动 spark-shell 后你会看到Spark session available as spark.3.2 创建 DataFrame有三种方式3.2.1 从 Spark 数据源创建// 查看支持的数据源格式spark.read.// csv format jdbc json load option options orc parquet schema table text textFile// 读取 JSON 文件valdfspark.read.json(data/user.json)df.show()注意从文件中读取的数字不能确定是什么类型所以用bigint接收。3.2.2 从 RDD 转换// 方式1通过样例类转换caseclassUser(name:String,age:Int)valrddsc.makeRDD(List((zhangsan,30),(lisi,40)))valdfrdd.map(tUser(t._1,t._2)).toDF// 方式2指定 Schema 转换importorg.apache.spark.sql.types._valschemaStructType(Array(StructField(name,StringType),StructField(age,IntegerType)))valrowRDDrdd.map(tRow(t._1,t._2))valdfspark.createDataFrame(rowRDD,schema)3.2.3 从 Hive Table 查询// 需要配置 Hive 支持valsparkSparkSession.builder().appName(HiveDemo).config(spark.sql.warehouse.dir,/user/hive/warehouse).enableHiveSupport().getOrCreate()spark.sql(SELECT * FROM default.user_table).show()3.3 SQL 语法风格// 1. 创建 DataFramevaldfspark.read.json(data/user.json)// 2. 创建临时视图Session 级别df.createOrReplaceTempView(people)// 3. 使用 SQL 查询valsqlDFspark.sql(SELECT * FROM people WHERE age 20)sqlDF.show()// 4. 创建全局临时视图应用级别df.createGlobalTempView(people_global)spark.sql(SELECT * FROM global_temp.people_global).show()3.4 DSL 语法风格DataFrame 提供了一套领域特定语言DSL无需创建临时视图。valdfspark.read.json(data/user.json)// 查看 Schemadf.printSchema()// 选择列df.select(username).show()// 选择列并计算df.select($username,$age1asnewage).show()// 过滤df.filter($age30).show()// 分组聚合df.groupBy(age).count().show()3.5 DataFrame 与 RDD 互转// DataFrame - RDDvaldfsc.makeRDD(List((zhangsan,30),(lisi,40))).map(tUser(t._1,t._2)).toDFvalrdddf.rdd// RDD[Row]valarrayrdd.collect// array: Array[Row] Array([zhangsan,30], [lisi,40])// 获取字段值array(0).getAs[String](name)// zhangsanarray(0)(0)// zhangsan3.6 DataSet 编程3.6.1 创建 DataSet// 方式1使用样例类序列caseclassPerson(name:String,age:Long)valcaseClassDSSeq(Person(zhangsan,2)).toDS()// 方式2使用基本类型序列valdsSeq(1,2,3,4,5).toDS// 方式3从 RDD 转换valdssc.makeRDD(List((zhangsan,30),(lisi,49))).map(tUser(t._1,t._2)).toDS3.6.2 DataSet - RDDvalrddds.rdd// RDD[User]rdd.collect// Array[User] Array(User(zhangsan,30), User(lisi,49))四、用户自定义函数UDF/UDAF4.1 UDF用户自定义函数// 1. 创建 DataFramevaldfspark.read.json(data/user.json)// 2. 注册 UDFspark.udf.register(addName,(x:String)Name: x)// 3. 创建临时表df.createOrReplaceTempView(people)// 4. 应用 UDFspark.sql(SELECT addName(name), age FROM people).show()4.2 UDAF用户自定义聚合函数Spark 3.0 后推荐使用Aggregator强类型方式// 输入数据类型caseclassUser01(username:String,age:Long)// 缓存类型caseclassAgeBuffer(varsum:Long,varcount:Long)// 定义聚合函数classMyAverageUDAFextendsAggregator[User01,AgeBuffer,Double]{overridedefzero:AgeBufferAgeBuffer(0L,0L)overridedefreduce(b:AgeBuffer,a:User01):AgeBuffer{b.suma.age b.count1b}overridedefmerge(b1:AgeBuffer,b2:AgeBuffer):AgeBuffer{b1.sumb2.sum b1.countb2.count b1}overridedeffinish(reduction:AgeBuffer):Double{reduction.sum.toDouble/reduction.count}overridedefbufferEncoder:Encoder[AgeBuffer]Encoders.productoverridedefoutputEncoder:Encoder[Double]Encoders.scalaDouble}// 使用valdsspark.read.json(data/user.json).as[User01]valmyAvgnewMyAverageUDAFvalcolmyAvg.toColumn ds.select(col).show()五、数据加载与保存5.1 通用方式// 加载数据valdfspark.read.format(json).load(data/user.json)// 保存数据df.write.format(parquet).save(output/)// SaveMode 选项df.write.mode(SaveMode.Append).save(output/)// ErrorIfExists(default) / Append / Overwrite / Ignore5.2 常用数据源格式读取保存说明Parquetspark.read.load()df.write.save()默认格式列式存储JSONspark.read.json()df.write.json()每行一个 JSON 对象CSVspark.read.format(csv).option(header, true)df.write.csv()需指定分隔符、表头JDBCspark.read.jdbc(url, table, props)df.write.jdbc(url, table, props)需 JDBC 驱动5.3 MySQL 读写示例importjava.util.Properties// 读取valpropsnewProperties()props.setProperty(user,root)props.setProperty(password,123123)valdfspark.read.jdbc(jdbc:mysql://localhost:3306/spark-sql,user,props)// 写入valrddspark.sparkContext.makeRDD(List(User2(lisi,20),User2(zs,30)))valdsrdd.toDS ds.write.mode(SaveMode.Append).jdbc(jdbc:mysql://localhost:3306/spark-sql,user,props)5.4 Hive 集成// 1. 添加依赖// spark-hive_2.12, hive-exec, mysql-connector-java// 2. 拷贝 hive-site.xml 到 resources// 3. 代码实现valsparkSparkSession.builder().appName(HiveDemo).config(spark.sql.warehouse.dir,hdfs://localhost:8020/user/hive/warehouse).enableHiveSupport().getOrCreate()// 执行 Hive SQLspark.sql(SHOW TABLES).show()spark.sql(CREATE TABLE aa (id INT))spark.sql(LOAD DATA LOCAL INPATH input/ids.txt INTO TABLE aa)spark.sql(SELECT * FROM aa).show()六、实战项目各区域热门商品 Top36.1 数据准备-- 用户行为表CREATETABLEuser_visit_action(dateSTRING,user_idBIGINT,session_id STRING,page_idBIGINT,action_time STRING,search_keyword STRING,click_category_idBIGINT,click_product_idBIGINT,order_category_ids STRING,order_product_ids STRING,pay_category_ids STRING,pay_product_ids STRING,city_idBIGINT)ROWFORMAT DELIMITEDFIELDSTERMINATEDBY\t;-- 产品表CREATETABLEproduct_info(product_idBIGINT,product_name STRING,extend_info STRING)ROWFORMAT DELIMITEDFIELDSTERMINATEDBY\t;-- 城市表CREATETABLEcity_info(city_idBIGINT,city_name STRING,area STRING)ROWFORMAT DELIMITEDFIELDSTERMINATEDBY\t;6.2 需求分析计算各个区域前三大热门商品按点击量并备注每个商品在主要城市中的分布比例。输出示例地区商品名称点击次数城市备注华北商品A100000北京21.2%天津13.2%其他65.6%华北商品P80200北京63.0%太原10%其他27.0%6.3 实现思路连接三张表获取完整数据仅点击记录按地区和商品名称分组统计点击次数每个地区内按点击次数降序排列取 Top3自定义 UDAF 函数实现城市备注// 核心 SQL 逻辑spark.sql(SELECT area, product_name, click_count, city_remark(city_name) as city_remark FROM (SELECT area, product_name, count(*) as click_count, collect_list(city_name) as city_list, ROW_NUMBER() OVER (PARTITION BY area ORDER BY count(*) DESC) as rk FROM user_visit_action t1 JOIN city_info t2 ON t1.city_id t2.city_id JOIN product_info t3 ON t1.click_product_id t3.product_id WHERE click_product_id IS NOT NULL GROUP BY area, product_name) t WHERE rk 3).show()七、总结特性说明核心优势Catalyst 优化器、Tungsten 执行引擎、统一数据访问编程抽象DataFrame弱类型、DataSet强类型入口对象SparkSession封装了 SparkContext查询方式SQL 语法、DSL 语法扩展能力UDF、UDAFAggregator数据源JSON、Parquet、CSV、JDBC、Hive 等Spark SQL 极大地简化了大数据处理的开发复杂度同时通过底层优化保证了执行效率。在实际工作中DataSet/DataFrame 已基本取代 RDD 成为首选 API掌握 Spark SQL 是每一位大数据工程师的必修课。参考文档Apache Spark 官方文档、尚硅谷大数据技术之 SparkSQL V3.0Spark 版本3.0推荐环境Scala 2.12 / Java 8