万亿级数据迁移实战从全量导出到增量同步的零停机方案一、数据迁移的走钢丝为什么停机窗口永远不够用万亿级数据迁移是存储领域最考验工程能力的任务之一。传统方案要求业务停机在维护窗口内完成全量导出、传输和导入。但现实是10TB 数据的全量迁移在千兆网络下需要超过 24 小时而业务方能给的停机窗口通常不超过 4 小时。更糟糕的是迁移过程中可能遇到字符集不兼容、索引构建超时、外键约束冲突等意外导致迁移失败后回滚——回滚本身又需要数小时。零停机迁移的核心思路是全量 增量先在业务运行期间完成全量快照导出再通过 Binlog 或 CDC 捕获增量变更持续同步到目标端直到增量追平后切换流量。整个过程业务无感知但工程复杂度显著提升。二、全量增量迁移的架构与数据流flowchart LR subgraph 源端集群 A[MySQL 主库] -- B[Binlog 流] A -- C[全量快照导出] end subgraph 迁移中间层 D[全量导出器] -- E[数据校验器] B -- F[CDC 增量捕获] F -- G[增量转换器] G -- E end subgraph 目标端集群 E -- H[ClickHouse 目标表] H -- I[一致性校验] end C -- D E --|全量数据| H E --|增量数据| H全量快照使用mysqldump --single-transaction或 Percona XtraBackup 获取一致性快照记录快照位的 GTID 或 Binlog 位点。增量捕获从该位点开始订阅 Binlog解析 DML 事件并转换为 ClickHouse 的 INSERT/ALTER 语句。关键挑战在于全量导出期间产生的增量变更如何与全量数据合并以及如何保证两端数据最终一致。三、零停机迁移的工程实现3.1 全量快照导出与位点记录import subprocess import json import time from datetime import datetime class FullSnapshotExporter: 全量快照导出器记录 Binlog 位点用于增量衔接 def __init__(self, mysql_config: dict, output_dir: str): self.mysql mysql_config self.output_dir output_dir def export_with_position(self, database: str, tables: list None): 使用 mysqldump 导出全量快照同时记录 Binlog 位点 --single-transaction: InnoDB 一致性快照不锁表 --master-data2: 记录 Binlog 位点为注释 --flush-logs: 切换 Binlog 文件便于后续增量起点定位 dump_file f{self.output_dir}/{database}_full_{datetime.now().strftime(%Y%m%d%H%M%S)}.sql cmd [ mysqldump, f-h{self.mysql[host]}, f-P{self.mysql[port]}, f-u{self.mysql[user]}, f-p{self.mysql[password]}, --single-transaction, --master-data2, --flush-logs, --routines, --triggers, --set-gtid-purgedON, --max-allowed-packet512M, --net-buffer-length32K, database ] if tables: cmd.extend(tables) # 执行导出记录耗时 start_time time.time() result subprocess.run( cmd, stdoutopen(dump_file, w), stderrsubprocess.PIPE, textTrue ) elapsed time.time() - start_time if result.returncode ! 0: raise RuntimeError(f全量导出失败: {result.stderr}) # 解析 Binlog 位点 position self._parse_binlog_position(dump_file) return { dump_file: dump_file, binlog_file: position[file], binlog_pos: position[pos], gtid: position.get(gtid), elapsed_seconds: elapsed, export_time: datetime.now().isoformat() } def _parse_binlog_position(self, dump_file: str) - dict: 从导出文件头部解析 Binlog 位点 with open(dump_file, r) as f: for line in f: if line.startswith(-- CHANGE MASTER TO): # 格式: CHANGE MASTER TO MASTER_LOG_FILExxx, MASTER_LOG_POS123; parts line.split(,) log_file parts[0].split()[1] log_pos int(parts[1].split()[1].rstrip(;\n)) return {file: log_file, pos: log_pos} if line.startswith(-- GTID state): gtid line.split()[1] return {gtid: gtid, file: , pos: 0} raise ValueError(无法从导出文件中解析 Binlog 位点)3.2 CDC 增量捕获与转换from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent class IncrementalSyncer: 基于 Python-mysql-replication 的增量同步器 def __init__(self, mysql_config: dict, start_position: dict, clickhouse_client, batch_size5000): self.mysql mysql_config self.start_pos start_position self.ch_client clickhouse_client self.batch_size batch_size self.buffer [] def start_sync(self): 从指定 Binlog 位点开始增量同步 stream_settings { server_id: 9999, # 伪装为从库 host: self.mysql[host], port: self.mysql[port], user: self.mysql[user], passwd: self.mysql[password], blocking: True, # 阻塞等待新事件 resume_stream: True, log_pos: self.start_pos[pos], log_file: self.start_pos[file], } stream BinLogStreamReader(**stream_settings) last_committed_pos self.start_pos[pos] for binlog_event in stream: if isinstance(binlog_event, (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent)): ch_sql self._convert_to_clickhouse_sql(binlog_event) if ch_sql: self.buffer.append(ch_sql) # 批量提交 if len(self.buffer) self.batch_size: self._flush_buffer() last_committed_pos binlog_event.packet.log_pos # 记录位点用于断点续传 if binlog_event.log_pos: last_committed_pos binlog_event.log_pos stream.close() def _convert_to_clickhouse_sql(self, event) - str: 将 MySQL Binlog 事件转换为 ClickHouse SQL table event.table schema event.schema if isinstance(event, WriteRowsEvent): rows event.rows columns list(rows[0][values].keys()) values [ tuple(row[values][col] for col in columns) for row in rows ] return self._build_insert_sql(f{schema}.{table}, columns, values) elif isinstance(event, UpdateRowsEvent): # ClickHouse 不支持原地 UPDATE使用 INSERT 最终合并 rows event.rows columns list(rows[0][after_values].keys()) values [ tuple(row[after_values][col] for col in columns) for row in rows ] return self._build_insert_sql(f{schema}.{table}, columns, values) elif isinstance(event, DeleteRowsEvent): rows event.rows # ClickHouse 使用 ALTER TABLE DELETE conditions [] for row in rows: pk_cond AND .join( f{k} {self._format_value(v)} for k, v in row[values].items() if k in self._get_primary_keys(schema, table) ) conditions.append(pk_cond) return fALTER TABLE {schema}.{table} DELETE WHERE { OR .join(conditions)} return None def _flush_buffer(self): 批量提交到 ClickHouse try: for sql in self.buffer: self.ch_client.execute(sql) self.buffer.clear() except Exception as e: # 写入失败位点文件支持断点续传 self._save_checkpoint() raise RuntimeError(f增量同步写入失败: {e}) def _save_checkpoint(self): 保存当前同步位点用于故障恢复 checkpoint { binlog_file: self._current_file, binlog_pos: self._current_pos, timestamp: datetime.now().isoformat(), buffer_size: len(self.buffer) } with open(f{self.output_dir}/checkpoint.json, w) as f: json.dump(checkpoint, f, indent2)3.3 一致性校验器class ConsistencyValidator: 源端与目标端的数据一致性校验 def validate_table(self, source_conn, target_conn, table: str, sample_ratio0.01): 采样校验对比源端和目标端的数据一致性 sample_ratio: 采样比例1% 的数据进行全字段比对 # 获取总行数 source_count self._get_count(source_conn, table) target_count self._get_count(target_conn, table) count_diff abs(source_count - target_count) count_match_rate 1 - count_diff / max(source_count, 1) # 采样校验随机抽取主键比对全字段 sample_keys self._get_sample_keys( source_conn, table, int(source_count * sample_ratio) ) mismatch_count 0 for pk_values in sample_keys: source_row self._get_row(source_conn, table, pk_values) target_row self._get_row(target_conn, table, pk_values) if source_row ! target_row: mismatch_count 1 sample_match_rate 1 - mismatch_count / max(len(sample_keys), 1) return { table: table, source_count: source_count, target_count: target_count, count_match_rate: count_match_rate, sample_match_rate: sample_match_rate, status: PASS if count_match_rate 0.999 and sample_match_rate 0.999 else FAIL }四、零停机迁移的边界条件与工程权衡增量延迟的累积效应全量导出耗时越长累积的增量数据越多。如果全量导出需要 12 小时而业务写入 QPS 为 5000则累积增量约 2.16 亿条。增量追平可能需要数小时甚至数天期间源端和目标端处于准同步状态。延迟过大时切换流量可能导致数据不一致。Binlog 保留窗口的风险MySQL 的expire_logs_days默认为 7 天。如果全量导出 增量追平的总时间超过 Binlog 保留窗口增量链路断裂需要重新全量导出。生产环境建议将expire_logs_days设置为 14 天以上并监控 Binlog 空间。DDL 变更的同步盲区CDC 只能捕获 DML 事件ALTER TABLE 等 DDL 变更需要单独处理。如果迁移期间源端发生 DDL 变更目标端表结构不一致增量同步会报错。解决方案是在迁移窗口内禁止 DDL或使用 gh-ost 等工具同步 DDL。ClickHouse 的 UPDATE/DELETE 性能陷阱ClickHouse 的 ALTER TABLE DELETE 和 UPDATE 是异步执行的 mutation 操作不保证实时生效。高频 UPDATE 场景下mutation 队列积压可能导致查询结果不一致。对于频繁更新的表建议使用 ReplacingMergeTree 引擎通过 INSERT 新版本行实现逻辑更新。五、总结万亿级数据零停机迁移的核心是全量快照 增量同步 一致性校验三段式方案。全量导出记录 Binlog 位点作为增量起点CDC 持续捕获变更并同步到目标端采样校验保证数据最终一致。关键风险点增量延迟累积、Binlog 保留窗口不足、DDL 同步盲区、ClickHouse 的 mutation 异步特性。落地建议全量导出前将expire_logs_days调至 14 天以上迁移窗口内禁止 DDL 变更高频更新表使用 ReplacingMergeTree 引擎增量追平延迟 5 秒后再执行流量切换切换后保留双写 48 小时作为安全兜底。
万亿级数据迁移实战:从全量导出到增量同步的零停机方案
发布时间:2026/6/14 17:11:11
万亿级数据迁移实战从全量导出到增量同步的零停机方案一、数据迁移的走钢丝为什么停机窗口永远不够用万亿级数据迁移是存储领域最考验工程能力的任务之一。传统方案要求业务停机在维护窗口内完成全量导出、传输和导入。但现实是10TB 数据的全量迁移在千兆网络下需要超过 24 小时而业务方能给的停机窗口通常不超过 4 小时。更糟糕的是迁移过程中可能遇到字符集不兼容、索引构建超时、外键约束冲突等意外导致迁移失败后回滚——回滚本身又需要数小时。零停机迁移的核心思路是全量 增量先在业务运行期间完成全量快照导出再通过 Binlog 或 CDC 捕获增量变更持续同步到目标端直到增量追平后切换流量。整个过程业务无感知但工程复杂度显著提升。二、全量增量迁移的架构与数据流flowchart LR subgraph 源端集群 A[MySQL 主库] -- B[Binlog 流] A -- C[全量快照导出] end subgraph 迁移中间层 D[全量导出器] -- E[数据校验器] B -- F[CDC 增量捕获] F -- G[增量转换器] G -- E end subgraph 目标端集群 E -- H[ClickHouse 目标表] H -- I[一致性校验] end C -- D E --|全量数据| H E --|增量数据| H全量快照使用mysqldump --single-transaction或 Percona XtraBackup 获取一致性快照记录快照位的 GTID 或 Binlog 位点。增量捕获从该位点开始订阅 Binlog解析 DML 事件并转换为 ClickHouse 的 INSERT/ALTER 语句。关键挑战在于全量导出期间产生的增量变更如何与全量数据合并以及如何保证两端数据最终一致。三、零停机迁移的工程实现3.1 全量快照导出与位点记录import subprocess import json import time from datetime import datetime class FullSnapshotExporter: 全量快照导出器记录 Binlog 位点用于增量衔接 def __init__(self, mysql_config: dict, output_dir: str): self.mysql mysql_config self.output_dir output_dir def export_with_position(self, database: str, tables: list None): 使用 mysqldump 导出全量快照同时记录 Binlog 位点 --single-transaction: InnoDB 一致性快照不锁表 --master-data2: 记录 Binlog 位点为注释 --flush-logs: 切换 Binlog 文件便于后续增量起点定位 dump_file f{self.output_dir}/{database}_full_{datetime.now().strftime(%Y%m%d%H%M%S)}.sql cmd [ mysqldump, f-h{self.mysql[host]}, f-P{self.mysql[port]}, f-u{self.mysql[user]}, f-p{self.mysql[password]}, --single-transaction, --master-data2, --flush-logs, --routines, --triggers, --set-gtid-purgedON, --max-allowed-packet512M, --net-buffer-length32K, database ] if tables: cmd.extend(tables) # 执行导出记录耗时 start_time time.time() result subprocess.run( cmd, stdoutopen(dump_file, w), stderrsubprocess.PIPE, textTrue ) elapsed time.time() - start_time if result.returncode ! 0: raise RuntimeError(f全量导出失败: {result.stderr}) # 解析 Binlog 位点 position self._parse_binlog_position(dump_file) return { dump_file: dump_file, binlog_file: position[file], binlog_pos: position[pos], gtid: position.get(gtid), elapsed_seconds: elapsed, export_time: datetime.now().isoformat() } def _parse_binlog_position(self, dump_file: str) - dict: 从导出文件头部解析 Binlog 位点 with open(dump_file, r) as f: for line in f: if line.startswith(-- CHANGE MASTER TO): # 格式: CHANGE MASTER TO MASTER_LOG_FILExxx, MASTER_LOG_POS123; parts line.split(,) log_file parts[0].split()[1] log_pos int(parts[1].split()[1].rstrip(;\n)) return {file: log_file, pos: log_pos} if line.startswith(-- GTID state): gtid line.split()[1] return {gtid: gtid, file: , pos: 0} raise ValueError(无法从导出文件中解析 Binlog 位点)3.2 CDC 增量捕获与转换from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent class IncrementalSyncer: 基于 Python-mysql-replication 的增量同步器 def __init__(self, mysql_config: dict, start_position: dict, clickhouse_client, batch_size5000): self.mysql mysql_config self.start_pos start_position self.ch_client clickhouse_client self.batch_size batch_size self.buffer [] def start_sync(self): 从指定 Binlog 位点开始增量同步 stream_settings { server_id: 9999, # 伪装为从库 host: self.mysql[host], port: self.mysql[port], user: self.mysql[user], passwd: self.mysql[password], blocking: True, # 阻塞等待新事件 resume_stream: True, log_pos: self.start_pos[pos], log_file: self.start_pos[file], } stream BinLogStreamReader(**stream_settings) last_committed_pos self.start_pos[pos] for binlog_event in stream: if isinstance(binlog_event, (WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent)): ch_sql self._convert_to_clickhouse_sql(binlog_event) if ch_sql: self.buffer.append(ch_sql) # 批量提交 if len(self.buffer) self.batch_size: self._flush_buffer() last_committed_pos binlog_event.packet.log_pos # 记录位点用于断点续传 if binlog_event.log_pos: last_committed_pos binlog_event.log_pos stream.close() def _convert_to_clickhouse_sql(self, event) - str: 将 MySQL Binlog 事件转换为 ClickHouse SQL table event.table schema event.schema if isinstance(event, WriteRowsEvent): rows event.rows columns list(rows[0][values].keys()) values [ tuple(row[values][col] for col in columns) for row in rows ] return self._build_insert_sql(f{schema}.{table}, columns, values) elif isinstance(event, UpdateRowsEvent): # ClickHouse 不支持原地 UPDATE使用 INSERT 最终合并 rows event.rows columns list(rows[0][after_values].keys()) values [ tuple(row[after_values][col] for col in columns) for row in rows ] return self._build_insert_sql(f{schema}.{table}, columns, values) elif isinstance(event, DeleteRowsEvent): rows event.rows # ClickHouse 使用 ALTER TABLE DELETE conditions [] for row in rows: pk_cond AND .join( f{k} {self._format_value(v)} for k, v in row[values].items() if k in self._get_primary_keys(schema, table) ) conditions.append(pk_cond) return fALTER TABLE {schema}.{table} DELETE WHERE { OR .join(conditions)} return None def _flush_buffer(self): 批量提交到 ClickHouse try: for sql in self.buffer: self.ch_client.execute(sql) self.buffer.clear() except Exception as e: # 写入失败位点文件支持断点续传 self._save_checkpoint() raise RuntimeError(f增量同步写入失败: {e}) def _save_checkpoint(self): 保存当前同步位点用于故障恢复 checkpoint { binlog_file: self._current_file, binlog_pos: self._current_pos, timestamp: datetime.now().isoformat(), buffer_size: len(self.buffer) } with open(f{self.output_dir}/checkpoint.json, w) as f: json.dump(checkpoint, f, indent2)3.3 一致性校验器class ConsistencyValidator: 源端与目标端的数据一致性校验 def validate_table(self, source_conn, target_conn, table: str, sample_ratio0.01): 采样校验对比源端和目标端的数据一致性 sample_ratio: 采样比例1% 的数据进行全字段比对 # 获取总行数 source_count self._get_count(source_conn, table) target_count self._get_count(target_conn, table) count_diff abs(source_count - target_count) count_match_rate 1 - count_diff / max(source_count, 1) # 采样校验随机抽取主键比对全字段 sample_keys self._get_sample_keys( source_conn, table, int(source_count * sample_ratio) ) mismatch_count 0 for pk_values in sample_keys: source_row self._get_row(source_conn, table, pk_values) target_row self._get_row(target_conn, table, pk_values) if source_row ! target_row: mismatch_count 1 sample_match_rate 1 - mismatch_count / max(len(sample_keys), 1) return { table: table, source_count: source_count, target_count: target_count, count_match_rate: count_match_rate, sample_match_rate: sample_match_rate, status: PASS if count_match_rate 0.999 and sample_match_rate 0.999 else FAIL }四、零停机迁移的边界条件与工程权衡增量延迟的累积效应全量导出耗时越长累积的增量数据越多。如果全量导出需要 12 小时而业务写入 QPS 为 5000则累积增量约 2.16 亿条。增量追平可能需要数小时甚至数天期间源端和目标端处于准同步状态。延迟过大时切换流量可能导致数据不一致。Binlog 保留窗口的风险MySQL 的expire_logs_days默认为 7 天。如果全量导出 增量追平的总时间超过 Binlog 保留窗口增量链路断裂需要重新全量导出。生产环境建议将expire_logs_days设置为 14 天以上并监控 Binlog 空间。DDL 变更的同步盲区CDC 只能捕获 DML 事件ALTER TABLE 等 DDL 变更需要单独处理。如果迁移期间源端发生 DDL 变更目标端表结构不一致增量同步会报错。解决方案是在迁移窗口内禁止 DDL或使用 gh-ost 等工具同步 DDL。ClickHouse 的 UPDATE/DELETE 性能陷阱ClickHouse 的 ALTER TABLE DELETE 和 UPDATE 是异步执行的 mutation 操作不保证实时生效。高频 UPDATE 场景下mutation 队列积压可能导致查询结果不一致。对于频繁更新的表建议使用 ReplacingMergeTree 引擎通过 INSERT 新版本行实现逻辑更新。五、总结万亿级数据零停机迁移的核心是全量快照 增量同步 一致性校验三段式方案。全量导出记录 Binlog 位点作为增量起点CDC 持续捕获变更并同步到目标端采样校验保证数据最终一致。关键风险点增量延迟累积、Binlog 保留窗口不足、DDL 同步盲区、ClickHouse 的 mutation 异步特性。落地建议全量导出前将expire_logs_days调至 14 天以上迁移窗口内禁止 DDL 变更高频更新表使用 ReplacingMergeTree 引擎增量追平延迟 5 秒后再执行流量切换切换后保留双写 48 小时作为安全兜底。