用 PySpark 深度解析 MovieLens 1M 数据集20 个实战分析任务全解当海量电影评分数据遇上分布式计算引擎会碰撞出怎样的火花MovieLens 1M 数据集作为推荐系统领域的经典数据集包含了百万量级的用户评分记录。本文将带你用 PySpark 这个强大的工具从数据加载到复杂分析一步步揭开电影评分数据背后的秘密。1. 环境准备与数据加载虽然本文聚焦实战分析但确保 PySpark 环境正常运行是前提。假设你已经配置好以下环境Java 8/11/17Python 3.8PySpark 3.3.0让我们首先加载数据集。MovieLens 1M 包含三个主要文件使用特殊的::分隔符from pyspark.sql import SparkSession from pyspark.sql.types import * # 初始化Spark会话 spark SparkSession.builder \ .appName(MovieLensAnalysis) \ .getOrCreate() # 定义数据结构 ratings_schema StructType([ StructField(user_id, IntegerType()), StructField(movie_id, IntegerType()), StructField(rating, FloatType()), StructField(timestamp, LongType()) ]) movies_schema StructType([ StructField(movie_id, IntegerType()), StructField(title, StringType()), StructField(genres, StringType()) ]) users_schema StructType([ StructField(user_id, IntegerType()), StructField(gender, StringType()), StructField(age, IntegerType()), StructField(occupation, IntegerType()), StructField(zipcode, StringType()) ]) # 加载数据 ratings spark.read.csv( path/to/ratings.dat, sep::, schemaratings_schema ) movies spark.read.csv( path/to/movies.dat, sep::, schemamovies_schema ) users spark.read.csv( path/to/users.dat, sep::, schemausers_schema )提示实际路径需替换为你的本地文件路径。如果遇到编码问题可尝试指定encodingISO-8859-1参数。2. 基础统计分析2.1 数据集概览首先让我们对数据集有个整体认识print(f评分记录数: {ratings.count():,}) print(f电影数量: {movies.count():,}) print(f用户数量: {users.count():,}) # 计算数据时间跨度 from pyspark.sql.functions import min, max, to_date, from_unixtime ratings.select( to_date(from_unixtime(min(timestamp))).alias(最早评分日期), to_date(from_unixtime(max(timestamp))).alias(最新评分日期) ).show()输出结果示例评分记录数: 1,000,209 电影数量: 3,883 用户数量: 6,040 最早评分日期: 2000-04-26 最新评分日期: 2003-02-282.2 用户画像分析了解用户群体特征对推荐系统至关重要# 性别分布 users.groupBy(gender).count().show() # 年龄分布 age_groups { 1: Under 18, 18: 18-24, 25: 25-34, 35: 35-44, 45: 45-49, 50: 50-55, 56: 56 } age_expr CASE \ .join([fWHEN age {k} THEN {v} for k, v in age_groups.items()]) \ END AS age_group users.selectExpr(gender, age_expr).groupBy(gender, age_group).count().orderBy(gender, age_group).show()2.3 评分分布分析评分是推荐系统的核心了解其分布特点很有必要from pyspark.sql.functions import count, col # 整体评分分布 ratings.groupBy(rating).agg(count(*).alias(count)).orderBy(rating).show() # 按性别分析评分分布 ratings.join(users, user_id).groupBy(gender, rating).agg(count(*).alias(count)).orderBy(gender, rating).show()3. 电影特征分析3.1 电影类型统计MovieLens 数据集中的电影类型是用|分隔的多标签格式我们需要先进行拆分from pyspark.sql.functions import explode, split # 拆分电影类型 movie_genres movies.withColumn(genre, explode(split(col(genres), \\|))) # 统计各类型电影数量 genre_counts movie_genres.groupBy(genre).count().orderBy(count, ascendingFalse) genre_counts.show()3.2 年度电影产量分析从电影标题中提取年份信息分析电影产量随时间变化from pyspark.sql.functions import regexp_extract # 从标题中提取年份 movies_with_year movies.withColumn(year, regexp_extract(col(title), \((\d{4})\), 1).cast(int)) # 按年份统计电影数量 movies_per_year movies_with_year.groupBy(year).count().orderBy(year) movies_per_year.show()4. 高级分析任务4.1 最受欢迎电影排行结合评分次数和平均评分找出最受欢迎的电影from pyspark.sql.functions import avg, desc movie_ratings ratings.groupBy(movie_id).agg( count(*).alias(rating_count), avg(rating).alias(avg_rating) ).filter(col(rating_count) 100) # 只考虑评分次数超过100的电影 top_movies movie_ratings.join(movies, movie_id).orderBy(desc(avg_rating), desc(rating_count)).limit(20) top_movies.show(truncateFalse)4.2 用户评分行为分析分析用户的活跃程度和评分倾向user_activity ratings.groupBy(user_id).agg( count(*).alias(rating_count), avg(rating).alias(avg_rating), min(rating).alias(min_rating), max(rating).alias(max_rating) ) # 找出最活跃的用户 user_activity.orderBy(desc(rating_count)).limit(10).show() # 用户评分分布 user_activity.select(rating_count).summary().show()4.3 基于人口统计的评分差异分析不同性别、年龄组的评分偏好# 按性别和年龄组分析 demographic_ratings ratings.join(users, user_id) \ .selectExpr(*, age_expr) \ .groupBy(gender, age_group) \ .agg( avg(rating).alias(avg_rating), count(*).alias(rating_count) ).orderBy(gender, age_group) demographic_ratings.show()5. 复杂分析任务5.1 不同类型电影随时间的变化趋势分析各类型电影在不同时期的平均评分变化# 创建包含年份和类型的完整数据集 movies_with_details movies_with_year.join(movie_genres, movie_id) # 计算各类型各年份的平均评分 genre_year_ratings ratings.join(movies_with_details, movie_id) \ .groupBy(year, genre) \ .agg( avg(rating).alias(avg_rating), count(*).alias(rating_count) ).filter(col(rating_count) 50) \ .orderBy(year, genre) genre_year_ratings.show()5.2 找出评分差异最大的电影有些电影评价两极分化找出这些争议性电影from pyspark.sql.functions import stddev rating_variance ratings.groupBy(movie_id).agg( avg(rating).alias(avg_rating), stddev(rating).alias(rating_stddev), count(*).alias(rating_count) ).filter(col(rating_count) 100) controversial_movies rating_variance.join(movies, movie_id) \ .orderBy(desc(rating_stddev)) \ .limit(20) controversial_movies.show()5.3 职业与电影类型的关联分析不同职业的人可能偏好不同类型的电影# 加载职业数据 occupations spark.createDataFrame([ (0, other), (1, academic), (2, artist), (3, clerical), (4, college), (5, customer), (6, doctor), (7, executive), (8, farmer), (9, homemaker), (10, K-12), (11, lawyer), (12, programmer), (13, retired), (14, sales), (15, scientist), (16, self-employed), (17, technician), (18, tradesman), (19, unemployed), (20, writer) ], [occupation_id, occupation]) # 分析各职业最喜爱的电影类型 occupation_genre_pref ratings.join(users, user_id) \ .join(movie_genres, movie_id) \ .join(occupations, users.occupation occupations.occupation_id) \ .groupBy(occupation, genre) \ .agg( avg(rating).alias(avg_rating), count(*).alias(rating_count) ).filter(col(rating_count) 50) \ .orderBy(occupation, desc(avg_rating)) # 查看程序员最喜欢的电影类型 occupation_genre_pref.filter(col(occupation) programmer).show()6. 窗口函数应用6.1 计算每部电影在同类型中的排名使用窗口函数可以方便地进行组内比较from pyspark.sql.window import Window from pyspark.sql.functions import dense_rank window_spec Window.partitionBy(genre).orderBy(desc(avg_rating)) genre_rankings movie_genres.join( ratings.groupBy(movie_id).agg( avg(rating).alias(avg_rating), count(*).alias(rating_count) ), movie_id ).filter(col(rating_count) 50) \ .withColumn(rank_in_genre, dense_rank().over(window_spec)) \ .filter(col(rank_in_genre) 5) \ .join(movies, movie_id) \ .orderBy(genre, rank_in_genre) genre_rankings.show()6.2 用户评分时间序列分析分析用户的评分行为随时间的变化from pyspark.sql.functions import lag, datediff from pyspark.sql.window import Window user_window Window.partitionBy(user_id).orderBy(timestamp) user_rating_sequence ratings.withColumn( days_since_last_rating, datediff( from_unixtime(timestamp), from_unixtime(lag(timestamp, 1).over(user_window)) ) ) # 计算用户评分间隔统计 user_rating_intervals user_rating_sequence.groupBy(user_id).agg( avg(days_since_last_rating).alias(avg_days_between_ratings), stddev(days_since_last_rating).alias(stddev_days_between_ratings), count(*).alias(rating_count) ).filter(col(rating_count) 10) user_rating_intervals.orderBy(desc(avg_days_between_ratings)).show()7. 数据可视化准备虽然 PySpark 本身不提供可视化功能但我们可以将聚合结果转换为 Pandas DataFrame 进行可视化# 将电影类型统计转换为Pandas DataFrame genre_counts_pd genre_counts.toPandas() # 将用户评分分布转换为Pandas DataFrame rating_dist_pd ratings.groupBy(rating).count().orderBy(rating).toPandas() # 将年度电影产量转换为Pandas DataFrame movies_per_year_pd movies_per_year.filter(col(year).isNotNull()).orderBy(year).toPandas()这些数据可以方便地使用 Matplotlib 或 Seaborn 进行可视化import matplotlib.pyplot as plt import seaborn as sns plt.figure(figsize(12, 6)) sns.barplot(xgenre, ycount, datagenre_counts_pd) plt.xticks(rotation90) plt.title(Movie Count by Genre) plt.show()8. 性能优化技巧处理大规模数据时性能优化至关重要8.1 缓存常用数据集# 缓存频繁使用的DataFrame ratings.cache() movies.cache() users.cache() # 检查缓存状态 print(ratings.is_cached) # True8.2 合理设置分区# 重新分区以提高并行度 ratings ratings.repartition(100) # 按用户ID分区优化用户相关分析 ratings_by_user ratings.repartition(100, user_id)8.3 广播小数据集# 广播小的维度表 from pyspark.sql.functions import broadcast movies_broadcast broadcast(movies) ratings.join(movies_broadcast, movie_id).explain()9. 数据质量检查在进行深入分析前检查数据质量很重要9.1 缺失值检查from pyspark.sql.functions import isnull, count, when # 检查ratings表的缺失值 ratings.select([count(when(isnull(c), c)).alias(c) for c in ratings.columns]).show() # 检查movies表的缺失值 movies.select([count(when(isnull(c), c)).alias(c) for c in movies.columns]).show()9.2 异常值检测# 检查评分范围是否合理 ratings.select(rating).summary().show() # 检查时间戳是否在合理范围内 ratings.select( from_unixtime(min(timestamp)).alias(min_date), from_unixtime(max(timestamp)).alias(max_date) ).show()10. 完整项目结构建议对于实际项目建议采用以下结构组织代码movielens-analysis/ ├── config/ # 配置文件 │ └── settings.py # 项目配置 ├── data/ # 数据文件 │ ├── raw/ # 原始数据 │ └── processed/ # 处理后的数据 ├── notebooks/ # Jupyter笔记本 ├── src/ # 源代码 │ ├── etl/ # 数据加载和转换 │ ├── analysis/ # 分析任务 │ ├── utils/ # 工具函数 │ └── main.py # 主程序 ├── tests/ # 测试代码 └── requirements.txt # 依赖项这种结构便于维护和扩展特别是当分析任务变得复杂时。11. 常见问题解决方案在实际操作中可能会遇到以下问题11.1 内存不足错误# 调整Spark配置 spark SparkSession.builder \ .appName(MovieLensAnalysis) \ .config(spark.driver.memory, 8g) \ .config(spark.executor.memory, 4g) \ .getOrCreate()11.2 数据倾斜问题# 处理数据倾斜 from pyspark.sql.functions import rand # 添加随机前缀解决join倾斜 ratings_with_prefix ratings.withColumn(prefix, (rand() * 10).cast(int)) movies_with_prefix movies.withColumn(prefix, (rand() * 10).cast(int)) # 使用前缀进行join result ratings_with_prefix.join( movies_with_prefix, (ratings_with_prefix.movie_id movies_with_prefix.movie_id) (ratings_with_prefix.prefix movies_with_prefix.prefix) )12. 扩展分析思路除了上述分析还可以考虑以下方向12.1 基于内容的推荐利用电影类型信息构建内容相似度from pyspark.ml.feature import CountVectorizer from pyspark.ml.linalg import Vectors from pyspark.sql.functions import collect_list # 将电影类型转换为特征向量 genre_lists movie_genres.groupBy(movie_id).agg(collect_list(genre).alias(genres)) vectorizer CountVectorizer(inputColgenres, outputColfeatures) model vectorizer.fit(genre_lists) genre_features model.transform(genre_lists)12.2 时间序列预测预测未来评分趋势from pyspark.sql.functions import month, year # 按月统计评分数量 ratings_by_month ratings.groupBy( year(from_unixtime(timestamp)).alias(year), month(from_unixtime(timestamp)).alias(month) ).count().orderBy(year, month) ratings_by_month.show()13. 项目部署建议完成分析后可以考虑以下部署方式13.1 定期运行脚本使用 cron 或 Airflow 设置定期任务# 示例cron任务每天凌晨运行 0 0 * * * /path/to/spark-submit /path/to/movielens_analysis.py13.2 构建Dash应用使用 Plotly Dash 或 Streamlit 构建交互式仪表盘import streamlit as st import pandas as pd import plotly.express as px # 加载数据 genre_counts pd.read_csv(genre_counts.csv) # 创建交互式图表 st.title(MovieLens 数据分析) fig px.bar(genre_counts, xgenre, ycount) st.plotly_chart(fig)14. 进一步学习资源要深入掌握 PySpark 和数据分析推荐以下资源官方文档PySpark API 文档书籍《Learning Spark, 2nd Edition》在线课程Databricks 提供的 Spark 课程社区Stack Overflow 的 PySpark 标签15. 实际应用建议将分析结果转化为实际业务价值用户分群根据评分行为将用户分组实施差异化推荐策略内容优化根据受欢迎的电影类型指导内容采购异常检测识别异常评分行为防止刷分作弊趋势预测预测未来热门电影类型提前布局通过这20个分析任务我们全面探索了MovieLens数据集从基础统计到复杂分析展示了PySpark强大的数据处理能力。实际项目中可以根据具体需求选择适合的分析方向并进一步优化代码性能和结果展示方式。
用 PySpark 分析 MovieLens 1M 数据集:从数据加载到20个经典分析任务实战(含完整代码)
发布时间:2026/5/25 17:22:35
用 PySpark 深度解析 MovieLens 1M 数据集20 个实战分析任务全解当海量电影评分数据遇上分布式计算引擎会碰撞出怎样的火花MovieLens 1M 数据集作为推荐系统领域的经典数据集包含了百万量级的用户评分记录。本文将带你用 PySpark 这个强大的工具从数据加载到复杂分析一步步揭开电影评分数据背后的秘密。1. 环境准备与数据加载虽然本文聚焦实战分析但确保 PySpark 环境正常运行是前提。假设你已经配置好以下环境Java 8/11/17Python 3.8PySpark 3.3.0让我们首先加载数据集。MovieLens 1M 包含三个主要文件使用特殊的::分隔符from pyspark.sql import SparkSession from pyspark.sql.types import * # 初始化Spark会话 spark SparkSession.builder \ .appName(MovieLensAnalysis) \ .getOrCreate() # 定义数据结构 ratings_schema StructType([ StructField(user_id, IntegerType()), StructField(movie_id, IntegerType()), StructField(rating, FloatType()), StructField(timestamp, LongType()) ]) movies_schema StructType([ StructField(movie_id, IntegerType()), StructField(title, StringType()), StructField(genres, StringType()) ]) users_schema StructType([ StructField(user_id, IntegerType()), StructField(gender, StringType()), StructField(age, IntegerType()), StructField(occupation, IntegerType()), StructField(zipcode, StringType()) ]) # 加载数据 ratings spark.read.csv( path/to/ratings.dat, sep::, schemaratings_schema ) movies spark.read.csv( path/to/movies.dat, sep::, schemamovies_schema ) users spark.read.csv( path/to/users.dat, sep::, schemausers_schema )提示实际路径需替换为你的本地文件路径。如果遇到编码问题可尝试指定encodingISO-8859-1参数。2. 基础统计分析2.1 数据集概览首先让我们对数据集有个整体认识print(f评分记录数: {ratings.count():,}) print(f电影数量: {movies.count():,}) print(f用户数量: {users.count():,}) # 计算数据时间跨度 from pyspark.sql.functions import min, max, to_date, from_unixtime ratings.select( to_date(from_unixtime(min(timestamp))).alias(最早评分日期), to_date(from_unixtime(max(timestamp))).alias(最新评分日期) ).show()输出结果示例评分记录数: 1,000,209 电影数量: 3,883 用户数量: 6,040 最早评分日期: 2000-04-26 最新评分日期: 2003-02-282.2 用户画像分析了解用户群体特征对推荐系统至关重要# 性别分布 users.groupBy(gender).count().show() # 年龄分布 age_groups { 1: Under 18, 18: 18-24, 25: 25-34, 35: 35-44, 45: 45-49, 50: 50-55, 56: 56 } age_expr CASE \ .join([fWHEN age {k} THEN {v} for k, v in age_groups.items()]) \ END AS age_group users.selectExpr(gender, age_expr).groupBy(gender, age_group).count().orderBy(gender, age_group).show()2.3 评分分布分析评分是推荐系统的核心了解其分布特点很有必要from pyspark.sql.functions import count, col # 整体评分分布 ratings.groupBy(rating).agg(count(*).alias(count)).orderBy(rating).show() # 按性别分析评分分布 ratings.join(users, user_id).groupBy(gender, rating).agg(count(*).alias(count)).orderBy(gender, rating).show()3. 电影特征分析3.1 电影类型统计MovieLens 数据集中的电影类型是用|分隔的多标签格式我们需要先进行拆分from pyspark.sql.functions import explode, split # 拆分电影类型 movie_genres movies.withColumn(genre, explode(split(col(genres), \\|))) # 统计各类型电影数量 genre_counts movie_genres.groupBy(genre).count().orderBy(count, ascendingFalse) genre_counts.show()3.2 年度电影产量分析从电影标题中提取年份信息分析电影产量随时间变化from pyspark.sql.functions import regexp_extract # 从标题中提取年份 movies_with_year movies.withColumn(year, regexp_extract(col(title), \((\d{4})\), 1).cast(int)) # 按年份统计电影数量 movies_per_year movies_with_year.groupBy(year).count().orderBy(year) movies_per_year.show()4. 高级分析任务4.1 最受欢迎电影排行结合评分次数和平均评分找出最受欢迎的电影from pyspark.sql.functions import avg, desc movie_ratings ratings.groupBy(movie_id).agg( count(*).alias(rating_count), avg(rating).alias(avg_rating) ).filter(col(rating_count) 100) # 只考虑评分次数超过100的电影 top_movies movie_ratings.join(movies, movie_id).orderBy(desc(avg_rating), desc(rating_count)).limit(20) top_movies.show(truncateFalse)4.2 用户评分行为分析分析用户的活跃程度和评分倾向user_activity ratings.groupBy(user_id).agg( count(*).alias(rating_count), avg(rating).alias(avg_rating), min(rating).alias(min_rating), max(rating).alias(max_rating) ) # 找出最活跃的用户 user_activity.orderBy(desc(rating_count)).limit(10).show() # 用户评分分布 user_activity.select(rating_count).summary().show()4.3 基于人口统计的评分差异分析不同性别、年龄组的评分偏好# 按性别和年龄组分析 demographic_ratings ratings.join(users, user_id) \ .selectExpr(*, age_expr) \ .groupBy(gender, age_group) \ .agg( avg(rating).alias(avg_rating), count(*).alias(rating_count) ).orderBy(gender, age_group) demographic_ratings.show()5. 复杂分析任务5.1 不同类型电影随时间的变化趋势分析各类型电影在不同时期的平均评分变化# 创建包含年份和类型的完整数据集 movies_with_details movies_with_year.join(movie_genres, movie_id) # 计算各类型各年份的平均评分 genre_year_ratings ratings.join(movies_with_details, movie_id) \ .groupBy(year, genre) \ .agg( avg(rating).alias(avg_rating), count(*).alias(rating_count) ).filter(col(rating_count) 50) \ .orderBy(year, genre) genre_year_ratings.show()5.2 找出评分差异最大的电影有些电影评价两极分化找出这些争议性电影from pyspark.sql.functions import stddev rating_variance ratings.groupBy(movie_id).agg( avg(rating).alias(avg_rating), stddev(rating).alias(rating_stddev), count(*).alias(rating_count) ).filter(col(rating_count) 100) controversial_movies rating_variance.join(movies, movie_id) \ .orderBy(desc(rating_stddev)) \ .limit(20) controversial_movies.show()5.3 职业与电影类型的关联分析不同职业的人可能偏好不同类型的电影# 加载职业数据 occupations spark.createDataFrame([ (0, other), (1, academic), (2, artist), (3, clerical), (4, college), (5, customer), (6, doctor), (7, executive), (8, farmer), (9, homemaker), (10, K-12), (11, lawyer), (12, programmer), (13, retired), (14, sales), (15, scientist), (16, self-employed), (17, technician), (18, tradesman), (19, unemployed), (20, writer) ], [occupation_id, occupation]) # 分析各职业最喜爱的电影类型 occupation_genre_pref ratings.join(users, user_id) \ .join(movie_genres, movie_id) \ .join(occupations, users.occupation occupations.occupation_id) \ .groupBy(occupation, genre) \ .agg( avg(rating).alias(avg_rating), count(*).alias(rating_count) ).filter(col(rating_count) 50) \ .orderBy(occupation, desc(avg_rating)) # 查看程序员最喜欢的电影类型 occupation_genre_pref.filter(col(occupation) programmer).show()6. 窗口函数应用6.1 计算每部电影在同类型中的排名使用窗口函数可以方便地进行组内比较from pyspark.sql.window import Window from pyspark.sql.functions import dense_rank window_spec Window.partitionBy(genre).orderBy(desc(avg_rating)) genre_rankings movie_genres.join( ratings.groupBy(movie_id).agg( avg(rating).alias(avg_rating), count(*).alias(rating_count) ), movie_id ).filter(col(rating_count) 50) \ .withColumn(rank_in_genre, dense_rank().over(window_spec)) \ .filter(col(rank_in_genre) 5) \ .join(movies, movie_id) \ .orderBy(genre, rank_in_genre) genre_rankings.show()6.2 用户评分时间序列分析分析用户的评分行为随时间的变化from pyspark.sql.functions import lag, datediff from pyspark.sql.window import Window user_window Window.partitionBy(user_id).orderBy(timestamp) user_rating_sequence ratings.withColumn( days_since_last_rating, datediff( from_unixtime(timestamp), from_unixtime(lag(timestamp, 1).over(user_window)) ) ) # 计算用户评分间隔统计 user_rating_intervals user_rating_sequence.groupBy(user_id).agg( avg(days_since_last_rating).alias(avg_days_between_ratings), stddev(days_since_last_rating).alias(stddev_days_between_ratings), count(*).alias(rating_count) ).filter(col(rating_count) 10) user_rating_intervals.orderBy(desc(avg_days_between_ratings)).show()7. 数据可视化准备虽然 PySpark 本身不提供可视化功能但我们可以将聚合结果转换为 Pandas DataFrame 进行可视化# 将电影类型统计转换为Pandas DataFrame genre_counts_pd genre_counts.toPandas() # 将用户评分分布转换为Pandas DataFrame rating_dist_pd ratings.groupBy(rating).count().orderBy(rating).toPandas() # 将年度电影产量转换为Pandas DataFrame movies_per_year_pd movies_per_year.filter(col(year).isNotNull()).orderBy(year).toPandas()这些数据可以方便地使用 Matplotlib 或 Seaborn 进行可视化import matplotlib.pyplot as plt import seaborn as sns plt.figure(figsize(12, 6)) sns.barplot(xgenre, ycount, datagenre_counts_pd) plt.xticks(rotation90) plt.title(Movie Count by Genre) plt.show()8. 性能优化技巧处理大规模数据时性能优化至关重要8.1 缓存常用数据集# 缓存频繁使用的DataFrame ratings.cache() movies.cache() users.cache() # 检查缓存状态 print(ratings.is_cached) # True8.2 合理设置分区# 重新分区以提高并行度 ratings ratings.repartition(100) # 按用户ID分区优化用户相关分析 ratings_by_user ratings.repartition(100, user_id)8.3 广播小数据集# 广播小的维度表 from pyspark.sql.functions import broadcast movies_broadcast broadcast(movies) ratings.join(movies_broadcast, movie_id).explain()9. 数据质量检查在进行深入分析前检查数据质量很重要9.1 缺失值检查from pyspark.sql.functions import isnull, count, when # 检查ratings表的缺失值 ratings.select([count(when(isnull(c), c)).alias(c) for c in ratings.columns]).show() # 检查movies表的缺失值 movies.select([count(when(isnull(c), c)).alias(c) for c in movies.columns]).show()9.2 异常值检测# 检查评分范围是否合理 ratings.select(rating).summary().show() # 检查时间戳是否在合理范围内 ratings.select( from_unixtime(min(timestamp)).alias(min_date), from_unixtime(max(timestamp)).alias(max_date) ).show()10. 完整项目结构建议对于实际项目建议采用以下结构组织代码movielens-analysis/ ├── config/ # 配置文件 │ └── settings.py # 项目配置 ├── data/ # 数据文件 │ ├── raw/ # 原始数据 │ └── processed/ # 处理后的数据 ├── notebooks/ # Jupyter笔记本 ├── src/ # 源代码 │ ├── etl/ # 数据加载和转换 │ ├── analysis/ # 分析任务 │ ├── utils/ # 工具函数 │ └── main.py # 主程序 ├── tests/ # 测试代码 └── requirements.txt # 依赖项这种结构便于维护和扩展特别是当分析任务变得复杂时。11. 常见问题解决方案在实际操作中可能会遇到以下问题11.1 内存不足错误# 调整Spark配置 spark SparkSession.builder \ .appName(MovieLensAnalysis) \ .config(spark.driver.memory, 8g) \ .config(spark.executor.memory, 4g) \ .getOrCreate()11.2 数据倾斜问题# 处理数据倾斜 from pyspark.sql.functions import rand # 添加随机前缀解决join倾斜 ratings_with_prefix ratings.withColumn(prefix, (rand() * 10).cast(int)) movies_with_prefix movies.withColumn(prefix, (rand() * 10).cast(int)) # 使用前缀进行join result ratings_with_prefix.join( movies_with_prefix, (ratings_with_prefix.movie_id movies_with_prefix.movie_id) (ratings_with_prefix.prefix movies_with_prefix.prefix) )12. 扩展分析思路除了上述分析还可以考虑以下方向12.1 基于内容的推荐利用电影类型信息构建内容相似度from pyspark.ml.feature import CountVectorizer from pyspark.ml.linalg import Vectors from pyspark.sql.functions import collect_list # 将电影类型转换为特征向量 genre_lists movie_genres.groupBy(movie_id).agg(collect_list(genre).alias(genres)) vectorizer CountVectorizer(inputColgenres, outputColfeatures) model vectorizer.fit(genre_lists) genre_features model.transform(genre_lists)12.2 时间序列预测预测未来评分趋势from pyspark.sql.functions import month, year # 按月统计评分数量 ratings_by_month ratings.groupBy( year(from_unixtime(timestamp)).alias(year), month(from_unixtime(timestamp)).alias(month) ).count().orderBy(year, month) ratings_by_month.show()13. 项目部署建议完成分析后可以考虑以下部署方式13.1 定期运行脚本使用 cron 或 Airflow 设置定期任务# 示例cron任务每天凌晨运行 0 0 * * * /path/to/spark-submit /path/to/movielens_analysis.py13.2 构建Dash应用使用 Plotly Dash 或 Streamlit 构建交互式仪表盘import streamlit as st import pandas as pd import plotly.express as px # 加载数据 genre_counts pd.read_csv(genre_counts.csv) # 创建交互式图表 st.title(MovieLens 数据分析) fig px.bar(genre_counts, xgenre, ycount) st.plotly_chart(fig)14. 进一步学习资源要深入掌握 PySpark 和数据分析推荐以下资源官方文档PySpark API 文档书籍《Learning Spark, 2nd Edition》在线课程Databricks 提供的 Spark 课程社区Stack Overflow 的 PySpark 标签15. 实际应用建议将分析结果转化为实际业务价值用户分群根据评分行为将用户分组实施差异化推荐策略内容优化根据受欢迎的电影类型指导内容采购异常检测识别异常评分行为防止刷分作弊趋势预测预测未来热门电影类型提前布局通过这20个分析任务我们全面探索了MovieLens数据集从基础统计到复杂分析展示了PySpark强大的数据处理能力。实际项目中可以根据具体需求选择适合的分析方向并进一步优化代码性能和结果展示方式。