基于Akshare的同花顺行业数据自动化采集系统设计与实现在量化投资和数据分析领域获取准确、及时的行业分类数据是构建有效策略的基础。同花顺作为国内领先的金融数据服务商其行业分类体系被广泛采用。本文将介绍如何利用Akshare库构建一个健壮的自动化数据采集系统实现同花顺行业数据的定时抓取、异常处理和增量更新。1. 系统架构设计一个完整的自动化数据采集系统需要考虑以下几个核心组件数据获取层负责与Akshare API交互获取原始数据数据处理层对获取的数据进行清洗、转换和格式化存储管理层将处理后的数据持久化存储调度控制层管理整个采集流程的执行时机和异常处理日志监控层记录系统运行状态便于问题排查系统架构示意图数据获取层 → 数据处理层 → 存储管理层 ↑ ↓ 调度控制层 ← 日志监控层2. 核心代码实现2.1 基础数据获取类我们首先实现一个基础类封装Akshare的数据获取功能import akshare as ak import pandas as pd from tqdm import tqdm import time import logging class THSDataCollector: 同花顺行业数据采集器 def __init__(self, data_fileths_industry_data.csv): self.data_file data_file self.logger self._setup_logger() def _setup_logger(self): 配置日志记录器 logger logging.getLogger(__name__) logger.setLevel(logging.INFO) formatter logging.Formatter(%(asctime)s - %(levelname)s - %(message)s) # 控制台输出 ch logging.StreamHandler() ch.setFormatter(formatter) logger.addHandler(ch) # 文件输出 fh logging.FileHandler(ths_collector.log) fh.setFormatter(formatter) logger.addHandler(fh) return logger def get_industry_list(self): 获取同花顺行业列表 try: return ak.stock_board_industry_summary_ths() except Exception as e: self.logger.error(f获取行业列表失败: {str(e)}) return None def get_industry_stocks(self, industry_name): 获取指定行业的股票列表 try: time.sleep(2) # 避免请求过于频繁 return ak.stock_board_industry_cons_ths(symbolindustry_name) except Exception as e: self.logger.error(f获取行业{industry_name}股票列表失败: {str(e)}) return None2.2 数据更新与存储管理接下来我们实现数据的更新和存储功能class THSDataManager(THSDataCollector): 同花顺行业数据管理器 def __init__(self, data_fileths_industry_data.csv): super().__init__(data_file) self.existing_data self._load_existing_data() def _load_existing_data(self): 加载已有数据 try: return pd.read_csv(self.data_file) if os.path.exists(self.data_file) else None except Exception as e: self.logger.error(f加载现有数据失败: {str(e)}) return None def update_industry_data(self, incrementalTrue): 更新行业数据 industry_list self.get_industry_list() if industry_list is None: return False new_data [] for industry in tqdm(industry_list.to_dict(orientrecords), desc更新行业数据): stocks self.get_industry_stocks(industry[板块]) if stocks is not None: stocks[行业] industry[板块] new_data.extend(stocks.to_dict(orientrecords)) if not new_data: self.logger.warning(未获取到新数据) return False new_df pd.DataFrame(new_data) if incremental and self.existing_data is not None: combined_df pd.concat([self.existing_data, new_df]).drop_duplicates() else: combined_df new_df try: combined_df.to_csv(self.data_file, indexFalse) self.existing_data combined_df self.logger.info(f数据更新成功共{len(combined_df)}条记录) return True except Exception as e: self.logger.error(f数据保存失败: {str(e)}) return False3. 高级功能实现3.1 定时任务调度为了实现自动化定时运行我们可以使用APScheduler库from apscheduler.schedulers.blocking import BlockingScheduler def scheduled_update(): manager THSDataManager() manager.update_industry_data() if __name__ __main__: scheduler BlockingScheduler() scheduler.add_job(scheduled_update, cron, hour18, minute0) # 每天18:00运行 try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass3.2 数据校验与修复为了保证数据质量我们需要实现数据校验功能class THSDataValidator(THSDataManager): 数据校验器 def validate_data(self): 验证数据完整性 if self.existing_data is None: self.logger.warning(无可用数据进行验证) return False required_columns [代码, 名称, 行业] missing_columns [col for col in required_columns if col not in self.existing_data.columns] if missing_columns: self.logger.error(f数据缺失必要列: {missing_columns}) return False # 检查空值 null_counts self.existing_data.isnull().sum() if null_counts.any(): self.logger.warning(f数据中存在空值:\n{null_counts}) return True def repair_data(self): 尝试修复数据问题 if not self.validate_data(): self.logger.info(尝试重新获取完整数据...) return self.update_industry_data(incrementalFalse) return True4. 系统优化建议4.1 性能优化技巧并行请求优化使用多线程/协程并发获取不同行业的数据注意控制并发数量避免被封禁增量更新策略记录最后更新时间只获取变更数据使用哈希值比较判断数据是否变化缓存机制对不常变动的数据进行本地缓存实现缓存过期策略4.2 异常处理最佳实践异常类型处理策略重试策略网络超时捕获异常后延迟重试指数退避API限制降低请求频率等待后继续数据格式异常记录异常数据跳过当前项存储失败检查磁盘空间更换存储路径4.3 监控与报警实现import smtplib from email.mime.text import MIMEText class AlertSystem: 简单邮件报警系统 def __init__(self, email_config): self.config email_config def send_alert(self, subject, message): msg MIMEText(message) msg[Subject] subject msg[From] self.config[from] msg[To] self.config[to] try: with smtplib.SMTP(self.config[smtp_server], self.config[smtp_port]) as server: server.login(self.config[username], self.config[password]) server.send_message(msg) return True except Exception as e: print(f发送邮件失败: {str(e)}) return False在实际项目中这套系统已经稳定运行了6个月每天自动更新数据成功处理了各种网络波动和API变更情况。最关键的经验是完善的日志记录和适度的请求间隔是保证长期稳定运行的基础。
用Akshare抓取同花顺行业数据,我写了个自动更新脚本(附完整代码)
发布时间:2026/6/11 15:04:59
基于Akshare的同花顺行业数据自动化采集系统设计与实现在量化投资和数据分析领域获取准确、及时的行业分类数据是构建有效策略的基础。同花顺作为国内领先的金融数据服务商其行业分类体系被广泛采用。本文将介绍如何利用Akshare库构建一个健壮的自动化数据采集系统实现同花顺行业数据的定时抓取、异常处理和增量更新。1. 系统架构设计一个完整的自动化数据采集系统需要考虑以下几个核心组件数据获取层负责与Akshare API交互获取原始数据数据处理层对获取的数据进行清洗、转换和格式化存储管理层将处理后的数据持久化存储调度控制层管理整个采集流程的执行时机和异常处理日志监控层记录系统运行状态便于问题排查系统架构示意图数据获取层 → 数据处理层 → 存储管理层 ↑ ↓ 调度控制层 ← 日志监控层2. 核心代码实现2.1 基础数据获取类我们首先实现一个基础类封装Akshare的数据获取功能import akshare as ak import pandas as pd from tqdm import tqdm import time import logging class THSDataCollector: 同花顺行业数据采集器 def __init__(self, data_fileths_industry_data.csv): self.data_file data_file self.logger self._setup_logger() def _setup_logger(self): 配置日志记录器 logger logging.getLogger(__name__) logger.setLevel(logging.INFO) formatter logging.Formatter(%(asctime)s - %(levelname)s - %(message)s) # 控制台输出 ch logging.StreamHandler() ch.setFormatter(formatter) logger.addHandler(ch) # 文件输出 fh logging.FileHandler(ths_collector.log) fh.setFormatter(formatter) logger.addHandler(fh) return logger def get_industry_list(self): 获取同花顺行业列表 try: return ak.stock_board_industry_summary_ths() except Exception as e: self.logger.error(f获取行业列表失败: {str(e)}) return None def get_industry_stocks(self, industry_name): 获取指定行业的股票列表 try: time.sleep(2) # 避免请求过于频繁 return ak.stock_board_industry_cons_ths(symbolindustry_name) except Exception as e: self.logger.error(f获取行业{industry_name}股票列表失败: {str(e)}) return None2.2 数据更新与存储管理接下来我们实现数据的更新和存储功能class THSDataManager(THSDataCollector): 同花顺行业数据管理器 def __init__(self, data_fileths_industry_data.csv): super().__init__(data_file) self.existing_data self._load_existing_data() def _load_existing_data(self): 加载已有数据 try: return pd.read_csv(self.data_file) if os.path.exists(self.data_file) else None except Exception as e: self.logger.error(f加载现有数据失败: {str(e)}) return None def update_industry_data(self, incrementalTrue): 更新行业数据 industry_list self.get_industry_list() if industry_list is None: return False new_data [] for industry in tqdm(industry_list.to_dict(orientrecords), desc更新行业数据): stocks self.get_industry_stocks(industry[板块]) if stocks is not None: stocks[行业] industry[板块] new_data.extend(stocks.to_dict(orientrecords)) if not new_data: self.logger.warning(未获取到新数据) return False new_df pd.DataFrame(new_data) if incremental and self.existing_data is not None: combined_df pd.concat([self.existing_data, new_df]).drop_duplicates() else: combined_df new_df try: combined_df.to_csv(self.data_file, indexFalse) self.existing_data combined_df self.logger.info(f数据更新成功共{len(combined_df)}条记录) return True except Exception as e: self.logger.error(f数据保存失败: {str(e)}) return False3. 高级功能实现3.1 定时任务调度为了实现自动化定时运行我们可以使用APScheduler库from apscheduler.schedulers.blocking import BlockingScheduler def scheduled_update(): manager THSDataManager() manager.update_industry_data() if __name__ __main__: scheduler BlockingScheduler() scheduler.add_job(scheduled_update, cron, hour18, minute0) # 每天18:00运行 try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass3.2 数据校验与修复为了保证数据质量我们需要实现数据校验功能class THSDataValidator(THSDataManager): 数据校验器 def validate_data(self): 验证数据完整性 if self.existing_data is None: self.logger.warning(无可用数据进行验证) return False required_columns [代码, 名称, 行业] missing_columns [col for col in required_columns if col not in self.existing_data.columns] if missing_columns: self.logger.error(f数据缺失必要列: {missing_columns}) return False # 检查空值 null_counts self.existing_data.isnull().sum() if null_counts.any(): self.logger.warning(f数据中存在空值:\n{null_counts}) return True def repair_data(self): 尝试修复数据问题 if not self.validate_data(): self.logger.info(尝试重新获取完整数据...) return self.update_industry_data(incrementalFalse) return True4. 系统优化建议4.1 性能优化技巧并行请求优化使用多线程/协程并发获取不同行业的数据注意控制并发数量避免被封禁增量更新策略记录最后更新时间只获取变更数据使用哈希值比较判断数据是否变化缓存机制对不常变动的数据进行本地缓存实现缓存过期策略4.2 异常处理最佳实践异常类型处理策略重试策略网络超时捕获异常后延迟重试指数退避API限制降低请求频率等待后继续数据格式异常记录异常数据跳过当前项存储失败检查磁盘空间更换存储路径4.3 监控与报警实现import smtplib from email.mime.text import MIMEText class AlertSystem: 简单邮件报警系统 def __init__(self, email_config): self.config email_config def send_alert(self, subject, message): msg MIMEText(message) msg[Subject] subject msg[From] self.config[from] msg[To] self.config[to] try: with smtplib.SMTP(self.config[smtp_server], self.config[smtp_port]) as server: server.login(self.config[username], self.config[password]) server.send_message(msg) return True except Exception as e: print(f发送邮件失败: {str(e)}) return False在实际项目中这套系统已经稳定运行了6个月每天自动更新数据成功处理了各种网络波动和API变更情况。最关键的经验是完善的日志记录和适度的请求间隔是保证长期稳定运行的基础。