一、大数据处理概述1.1 大数据的特征大数据具有以下特征5VVolume海量数据规模Velocity数据产生速度快Variety数据类型多样Veracity数据质量参差不齐Value需要从数据中提取价值1.2 大数据处理架构┌─────────────────────────────────────────────────────────────┐ │ 大数据处理架构 │ ├─────────────────────────────────────────────────────────────┤ │ 数据采集层 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 日志采集 │ │ 数据库 │ │ 传感器 │ │ │ │ Fluentd │ │ CDC │ │ MQTT │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ ├───────┼─────────────┼─────────────┼──────────────────────┤ │ 数据存储层 │ │ ┌─────────────────────────────────────────────────┐ │ │ │ HDFS / S3 / Cloud Storage │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │ Parquet │ │ ORC │ │ Avro │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ └─────────────────────────────────────────────────┘ │ ├─────────────────────────────────────────────────────────────┤ │ 数据处理层 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Spark │ │ Flink │ │ Hive │ │ │ │ 批处理 │ │ 流处理 │ │ SQL查询 │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ ├───────┼─────────────┼─────────────┼──────────────────────┤ │ 数据分析层 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 机器学习 │ │ 可视化 │ │ 报表工具 │ │ │ │ TensorFlow│ │ Superset│ │ Tableau │ │ │ └──────────┘ └──────────┘ └──────────┘ │ └─────────────────────────────────────────────────────────────┘二、数据采集与传输2.1 Fluentd日志采集# Fluentd配置示例 class FluentdConfigGenerator: def __init__(self): self.config { source: [], filter: [], match: [] } def add_tail_source(self, name, path, tag): self.config[source].append({ type: tail, name: name, path: path, tag: tag, pos_file: f/var/log/fluentd/{name}.pos, read_from_head: True }) def add_kafka_output(self, tag, brokers, topic): self.config[match].append({ type: kafka, tag: tag, brokers: brokers, default_topic: topic, format: json }) def generate_config(self): return self._format_config() def _format_config(self): lines [] for source in self.config[source]: lines.append(fsource) for key, value in source.items(): lines.append(f {key} {value}) lines.append(f/source) return \n.join(lines)2.2 Kafka数据传输# Kafka生产者配置 from kafka import KafkaProducer import json class DataProducer: def __init__(self, bootstrap_servers): self.producer KafkaProducer( bootstrap_serversbootstrap_servers, value_serializerlambda v: json.dumps(v).encode(utf-8), compression_typegzip ) def send_message(self, topic, message): future self.producer.send(topic, valuemessage) return future.get(timeout10) def close(self): self.producer.close()三、数据存储3.1 HDFS操作# HDFS操作封装 class HDFSManager: def __init__(self, hdfs_urlhdfs://localhost:9000): self.hdfs_url hdfs_url def list_files(self, path): from hdfs import InsecureClient client InsecureClient(self.hdfs_url) return client.list(path) def read_file(self, path): from hdfs import InsecureClient client InsecureClient(self.hdfs_url) with client.read(path) as f: return f.read() def write_file(self, path, content): from hdfs import InsecureClient client InsecureClient(self.hdfs_url) with client.write(path) as f: f.write(content)3.2 列式存储格式# Parquet文件读写 import pandas as pd class ParquetManager: def __init__(self): pass def write_parquet(self, df, path, compressionsnappy): df.to_parquet(path, compressioncompression) def read_parquet(self, path): return pd.read_parquet(path) def write_partitioned(self, df, base_path, partition_cols): df.to_parquet( base_path, partition_colspartition_cols, compressionsnappy )四、批处理计算4.1 Spark批处理# Spark批处理示例 from pyspark.sql import SparkSession class SparkBatchProcessor: def __init__(self, app_nameBatchProcessor): self.spark SparkSession.builder \ .appName(app_name) \ .getOrCreate() def read_csv(self, path): return self.spark.read.csv(path, headerTrue, inferSchemaTrue) def read_parquet(self, path): return self.spark.read.parquet(path) def process_data(self, df): # 数据清洗 cleaned df.dropna() # 数据转换 transformed cleaned.withColumn( total_amount, cleaned[price] * cleaned[quantity] ) # 聚合计算 result transformed.groupBy(category) \ .sum(total_amount) \ .withColumnRenamed(sum(total_amount), category_total) return result def write_result(self, df, path): df.write.parquet(path, modeoverwrite) def stop(self): self.spark.stop()4.2 SQL查询# Spark SQL示例 class SparkSQLProcessor: def __init__(self, spark): self.spark spark def register_table(self, df, table_name): df.createOrReplaceTempView(table_name) def execute_query(self, query): return self.spark.sql(query) def complex_query(self): query SELECT category, COUNT(*) as order_count, AVG(total_amount) as avg_order_value, SUM(total_amount) as total_revenue FROM orders WHERE order_date 2024-01-01 GROUP BY category HAVING COUNT(*) 100 ORDER BY total_revenue DESC return self.execute_query(query)五、流处理计算5.1 Flink流处理# Flink流处理示例 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment class FlinkStreamProcessor: def __init__(self): self.env StreamExecutionEnvironment.get_execution_environment() self.t_env StreamTableEnvironment.create(self.env) def read_kafka_stream(self, topic, brokers): source_ddl f CREATE TABLE kafka_source ( user_id STRING, event_type STRING, timestamp BIGINT ) WITH ( connector kafka, topic {topic}, properties.bootstrap.servers {brokers}, format json ) self.t_env.execute_sql(source_ddl) return self.t_env.from_path(kafka_source) def process_stream(self, table): result table \ .group_by(user_id) \ .select(user_id, COUNT(event_type) as event_count) return result def write_sink(self, table, output_topic): sink_ddl f CREATE TABLE kafka_sink ( user_id STRING, event_count BIGINT ) WITH ( connector kafka, topic {output_topic}, properties.bootstrap.servers localhost:9092, format json ) self.t_env.execute_sql(sink_ddl) table.execute_insert(kafka_sink).wait() def execute(self): self.env.execute(Stream Processing Job)5.2 窗口计算# 窗口计算示例 class WindowProcessor: def __init__(self, env): self.env env def tumbling_window(self, stream): return stream \ .key_by(lambda x: x[0]) \ .window(TumblingEventTimeWindows.of(Time.seconds(5))) \ .sum(1) def sliding_window(self, stream): return stream \ .key_by(lambda x: x[0]) \ .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) \ .reduce(lambda a, b: (a[0], a[1] b[1]))六、数据分析与可视化6.1 Pandas数据分析# Pandas数据分析示例 import pandas as pd import numpy as np class DataAnalyzer: def __init__(self): pass def load_data(self, path): return pd.read_parquet(path) def descriptive_stats(self, df): return df.describe() def correlation_analysis(self, df): return df.corr() def time_series_analysis(self, df, date_coldate): df[date_col] pd.to_datetime(df[date_col]) df.set_index(date_col, inplaceTrue) # 按周聚合 weekly df.resample(W).sum() # 计算移动平均 df[moving_avg_7d] df[revenue].rolling(window7).mean() return weekly, df def cohort_analysis(self, df): # 计算用户留存 df[cohort_month] df[signup_date].dt.to_period(M) df[user_age] (df[activity_date] - df[signup_date]).dt.days cohort df.groupby([cohort_month, user_age])[user_id].nunique().unstack() cohort_size cohort.iloc[:, 0] retention cohort.divide(cohort_size, axis0) return retention6.2 可视化展示# 数据可视化 import matplotlib.pyplot as plt import seaborn as sns class DataVisualizer: def __init__(self): sns.set_style(whitegrid) def plot_time_series(self, df, x_col, y_col, title): plt.figure(figsize(12, 6)) sns.lineplot(datadf, xx_col, yy_col) plt.title(title) plt.show() def plot_bar_chart(self, df, x_col, y_col, title): plt.figure(figsize(10, 6)) sns.barplot(datadf, xx_col, yy_col) plt.title(title) plt.xticks(rotation45) plt.show() def plot_heatmap(self, data, title): plt.figure(figsize(10, 8)) sns.heatmap(data, annotTrue, cmapcoolwarm) plt.title(title) plt.show()七、实战案例电商数据分析7.1 数据处理流程class ECommerceDataPipeline: def __init__(self): self.spark_processor SparkBatchProcessor() self.analyzer DataAnalyzer() self.visualizer DataVisualizer() def run_pipeline(self): # 1. 读取数据 orders_df self.spark_processor.read_parquet(hdfs:///data/orders) users_df self.spark_processor.read_parquet(hdfs:///data/users) # 2. 数据清洗和转换 joined_df orders_df.join(users_df, onuser_id) # 3. 聚合分析 result joined_df.groupBy(user_country, product_category) \ .sum(order_amount) \ .withColumnRenamed(sum(order_amount), total_revenue) # 4. 保存结果 self.spark_processor.write_result(result, hdfs:///results/revenue_by_country) # 5. 下载结果进行可视化 result_pd result.toPandas() self.visualizer.plot_bar_chart( result_pd, x_coluser_country, y_coltotal_revenue, titleRevenue by Country ) self.spark_processor.stop()7.2 实时监控仪表盘# 实时监控仪表盘 class RealTimeDashboard: def __init__(self): self.flink_processor FlinkStreamProcessor() def start_monitoring(self): # 读取实时数据流 stream self.flink_processor.read_kafka_stream( clickstream, localhost:9092 ) # 实时计算 result stream \ .group_by(page) \ .select(page, COUNT(*) as clicks) # 输出到仪表盘 result.execute_insert(dashboard_sink).wait() self.flink_processor.execute()八、总结与最佳实践8.1 关键要点选择合适的工具根据场景选择Spark/Flink/Hive数据格式优化使用列式存储格式提高查询效率资源管理合理配置集群资源监控告警建立完善的监控体系8.2 常见误区过度使用Spark简单查询可以使用Hive忽视数据分区合理分区能大幅提升查询性能资源配置不合理导致集群资源浪费或任务失败忽视数据质量脏数据会影响分析结果8.3 未来趋势湖仓一体数据湖与数据仓库融合实时数据仓库支持实时分析AI增强分析利用AI自动发现数据模式参考资料Apache Spark官方文档Apache Flink官方文档Apache Hadoop官方文档Pandas官方文档
【大数据】大数据处理技术栈:从采集到分析的完整链路
发布时间:2026/5/27 12:04:28
一、大数据处理概述1.1 大数据的特征大数据具有以下特征5VVolume海量数据规模Velocity数据产生速度快Variety数据类型多样Veracity数据质量参差不齐Value需要从数据中提取价值1.2 大数据处理架构┌─────────────────────────────────────────────────────────────┐ │ 大数据处理架构 │ ├─────────────────────────────────────────────────────────────┤ │ 数据采集层 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 日志采集 │ │ 数据库 │ │ 传感器 │ │ │ │ Fluentd │ │ CDC │ │ MQTT │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ ├───────┼─────────────┼─────────────┼──────────────────────┤ │ 数据存储层 │ │ ┌─────────────────────────────────────────────────┐ │ │ │ HDFS / S3 / Cloud Storage │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │ Parquet │ │ ORC │ │ Avro │ │ │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ └─────────────────────────────────────────────────┘ │ ├─────────────────────────────────────────────────────────────┤ │ 数据处理层 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Spark │ │ Flink │ │ Hive │ │ │ │ 批处理 │ │ 流处理 │ │ SQL查询 │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ ├───────┼─────────────┼─────────────┼──────────────────────┤ │ 数据分析层 │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 机器学习 │ │ 可视化 │ │ 报表工具 │ │ │ │ TensorFlow│ │ Superset│ │ Tableau │ │ │ └──────────┘ └──────────┘ └──────────┘ │ └─────────────────────────────────────────────────────────────┘二、数据采集与传输2.1 Fluentd日志采集# Fluentd配置示例 class FluentdConfigGenerator: def __init__(self): self.config { source: [], filter: [], match: [] } def add_tail_source(self, name, path, tag): self.config[source].append({ type: tail, name: name, path: path, tag: tag, pos_file: f/var/log/fluentd/{name}.pos, read_from_head: True }) def add_kafka_output(self, tag, brokers, topic): self.config[match].append({ type: kafka, tag: tag, brokers: brokers, default_topic: topic, format: json }) def generate_config(self): return self._format_config() def _format_config(self): lines [] for source in self.config[source]: lines.append(fsource) for key, value in source.items(): lines.append(f {key} {value}) lines.append(f/source) return \n.join(lines)2.2 Kafka数据传输# Kafka生产者配置 from kafka import KafkaProducer import json class DataProducer: def __init__(self, bootstrap_servers): self.producer KafkaProducer( bootstrap_serversbootstrap_servers, value_serializerlambda v: json.dumps(v).encode(utf-8), compression_typegzip ) def send_message(self, topic, message): future self.producer.send(topic, valuemessage) return future.get(timeout10) def close(self): self.producer.close()三、数据存储3.1 HDFS操作# HDFS操作封装 class HDFSManager: def __init__(self, hdfs_urlhdfs://localhost:9000): self.hdfs_url hdfs_url def list_files(self, path): from hdfs import InsecureClient client InsecureClient(self.hdfs_url) return client.list(path) def read_file(self, path): from hdfs import InsecureClient client InsecureClient(self.hdfs_url) with client.read(path) as f: return f.read() def write_file(self, path, content): from hdfs import InsecureClient client InsecureClient(self.hdfs_url) with client.write(path) as f: f.write(content)3.2 列式存储格式# Parquet文件读写 import pandas as pd class ParquetManager: def __init__(self): pass def write_parquet(self, df, path, compressionsnappy): df.to_parquet(path, compressioncompression) def read_parquet(self, path): return pd.read_parquet(path) def write_partitioned(self, df, base_path, partition_cols): df.to_parquet( base_path, partition_colspartition_cols, compressionsnappy )四、批处理计算4.1 Spark批处理# Spark批处理示例 from pyspark.sql import SparkSession class SparkBatchProcessor: def __init__(self, app_nameBatchProcessor): self.spark SparkSession.builder \ .appName(app_name) \ .getOrCreate() def read_csv(self, path): return self.spark.read.csv(path, headerTrue, inferSchemaTrue) def read_parquet(self, path): return self.spark.read.parquet(path) def process_data(self, df): # 数据清洗 cleaned df.dropna() # 数据转换 transformed cleaned.withColumn( total_amount, cleaned[price] * cleaned[quantity] ) # 聚合计算 result transformed.groupBy(category) \ .sum(total_amount) \ .withColumnRenamed(sum(total_amount), category_total) return result def write_result(self, df, path): df.write.parquet(path, modeoverwrite) def stop(self): self.spark.stop()4.2 SQL查询# Spark SQL示例 class SparkSQLProcessor: def __init__(self, spark): self.spark spark def register_table(self, df, table_name): df.createOrReplaceTempView(table_name) def execute_query(self, query): return self.spark.sql(query) def complex_query(self): query SELECT category, COUNT(*) as order_count, AVG(total_amount) as avg_order_value, SUM(total_amount) as total_revenue FROM orders WHERE order_date 2024-01-01 GROUP BY category HAVING COUNT(*) 100 ORDER BY total_revenue DESC return self.execute_query(query)五、流处理计算5.1 Flink流处理# Flink流处理示例 from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment class FlinkStreamProcessor: def __init__(self): self.env StreamExecutionEnvironment.get_execution_environment() self.t_env StreamTableEnvironment.create(self.env) def read_kafka_stream(self, topic, brokers): source_ddl f CREATE TABLE kafka_source ( user_id STRING, event_type STRING, timestamp BIGINT ) WITH ( connector kafka, topic {topic}, properties.bootstrap.servers {brokers}, format json ) self.t_env.execute_sql(source_ddl) return self.t_env.from_path(kafka_source) def process_stream(self, table): result table \ .group_by(user_id) \ .select(user_id, COUNT(event_type) as event_count) return result def write_sink(self, table, output_topic): sink_ddl f CREATE TABLE kafka_sink ( user_id STRING, event_count BIGINT ) WITH ( connector kafka, topic {output_topic}, properties.bootstrap.servers localhost:9092, format json ) self.t_env.execute_sql(sink_ddl) table.execute_insert(kafka_sink).wait() def execute(self): self.env.execute(Stream Processing Job)5.2 窗口计算# 窗口计算示例 class WindowProcessor: def __init__(self, env): self.env env def tumbling_window(self, stream): return stream \ .key_by(lambda x: x[0]) \ .window(TumblingEventTimeWindows.of(Time.seconds(5))) \ .sum(1) def sliding_window(self, stream): return stream \ .key_by(lambda x: x[0]) \ .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) \ .reduce(lambda a, b: (a[0], a[1] b[1]))六、数据分析与可视化6.1 Pandas数据分析# Pandas数据分析示例 import pandas as pd import numpy as np class DataAnalyzer: def __init__(self): pass def load_data(self, path): return pd.read_parquet(path) def descriptive_stats(self, df): return df.describe() def correlation_analysis(self, df): return df.corr() def time_series_analysis(self, df, date_coldate): df[date_col] pd.to_datetime(df[date_col]) df.set_index(date_col, inplaceTrue) # 按周聚合 weekly df.resample(W).sum() # 计算移动平均 df[moving_avg_7d] df[revenue].rolling(window7).mean() return weekly, df def cohort_analysis(self, df): # 计算用户留存 df[cohort_month] df[signup_date].dt.to_period(M) df[user_age] (df[activity_date] - df[signup_date]).dt.days cohort df.groupby([cohort_month, user_age])[user_id].nunique().unstack() cohort_size cohort.iloc[:, 0] retention cohort.divide(cohort_size, axis0) return retention6.2 可视化展示# 数据可视化 import matplotlib.pyplot as plt import seaborn as sns class DataVisualizer: def __init__(self): sns.set_style(whitegrid) def plot_time_series(self, df, x_col, y_col, title): plt.figure(figsize(12, 6)) sns.lineplot(datadf, xx_col, yy_col) plt.title(title) plt.show() def plot_bar_chart(self, df, x_col, y_col, title): plt.figure(figsize(10, 6)) sns.barplot(datadf, xx_col, yy_col) plt.title(title) plt.xticks(rotation45) plt.show() def plot_heatmap(self, data, title): plt.figure(figsize(10, 8)) sns.heatmap(data, annotTrue, cmapcoolwarm) plt.title(title) plt.show()七、实战案例电商数据分析7.1 数据处理流程class ECommerceDataPipeline: def __init__(self): self.spark_processor SparkBatchProcessor() self.analyzer DataAnalyzer() self.visualizer DataVisualizer() def run_pipeline(self): # 1. 读取数据 orders_df self.spark_processor.read_parquet(hdfs:///data/orders) users_df self.spark_processor.read_parquet(hdfs:///data/users) # 2. 数据清洗和转换 joined_df orders_df.join(users_df, onuser_id) # 3. 聚合分析 result joined_df.groupBy(user_country, product_category) \ .sum(order_amount) \ .withColumnRenamed(sum(order_amount), total_revenue) # 4. 保存结果 self.spark_processor.write_result(result, hdfs:///results/revenue_by_country) # 5. 下载结果进行可视化 result_pd result.toPandas() self.visualizer.plot_bar_chart( result_pd, x_coluser_country, y_coltotal_revenue, titleRevenue by Country ) self.spark_processor.stop()7.2 实时监控仪表盘# 实时监控仪表盘 class RealTimeDashboard: def __init__(self): self.flink_processor FlinkStreamProcessor() def start_monitoring(self): # 读取实时数据流 stream self.flink_processor.read_kafka_stream( clickstream, localhost:9092 ) # 实时计算 result stream \ .group_by(page) \ .select(page, COUNT(*) as clicks) # 输出到仪表盘 result.execute_insert(dashboard_sink).wait() self.flink_processor.execute()八、总结与最佳实践8.1 关键要点选择合适的工具根据场景选择Spark/Flink/Hive数据格式优化使用列式存储格式提高查询效率资源管理合理配置集群资源监控告警建立完善的监控体系8.2 常见误区过度使用Spark简单查询可以使用Hive忽视数据分区合理分区能大幅提升查询性能资源配置不合理导致集群资源浪费或任务失败忽视数据质量脏数据会影响分析结果8.3 未来趋势湖仓一体数据湖与数据仓库融合实时数据仓库支持实时分析AI增强分析利用AI自动发现数据模式参考资料Apache Spark官方文档Apache Flink官方文档Apache Hadoop官方文档Pandas官方文档