Python实战用Socket模拟IEC104协议通信全流程电力自动化系统中IEC104协议作为国际通用的通信标准承担着变电站与调度中心之间数据交互的重要使命。对于刚接触这个领域的开发者而言协议文档中复杂的帧结构和专业术语往往让人望而生畏。本文将从工程实践角度带你用Python的socket库逐步构建IEC104通信模拟器通过可运行的代码理解协议本质。1. 环境准备与协议基础1.1 Python环境配置推荐使用Python 3.8版本主要依赖库如下pip install pyiec104 # 可选辅助库1.2 IEC104协议核心概念协议采用Client/Server架构典型交互流程包含三个阶段链路建立TCP三次握手数据传输启动STARTDT激活业务交互总召唤、单点遥信等关键帧类型对比帧类型方向性包含ASDU典型用途I格式双向传输是业务数据传输S格式单向确认否序号确认U格式控制指令否STARTDT/STOPDT/TESTFR2. 通信链路建立与维护2.1 TCP连接实现import socket class IEC104Server: def __init__(self, host0.0.0.0, port2404): self.sock socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind((host, port)) self.sock.listen(1) print(fServer listening on {host}:{port}) def handle_connection(self): conn, addr self.sock.accept() print(fConnected by {addr}) return conn2.2 启动数据传输流程U帧处理示例代码def process_u_frame(data): 解析U格式控制帧 if len(data) 6: return None control_field data[2:6] if control_field b\x07\x00\x00\x00: return STARTDT激活 elif control_field b\x0b\x00\x00\x00: return STARTDT确认 elif control_field b\x13\x00\x00\x00: return TESTFR激活注意实际项目中需要实现心跳检测机制通常每15秒交换一次TESTFR帧3. 业务数据交互实现3.1 总召唤处理流程总召唤GI是IEC104最重要的功能之一代码实现要点主站发送激活命令TypeID100子站分批次返回所有数据主站发送终止确认def build_gi_command(): 构建总召唤命令ASDU return bytes([ 0x68, 0x0E, # APCI长度 0x00, 0x00, 0x00, 0x00, # 发送序号 0x64, # TypeID: 100总召唤 0x01, # SQ0, 信息对象数量1 0x06, # COT6 激活 0x00, # 公共地址 0x00, 0x00, 0x00, # 信息对象地址 0x14 # QOI20 总召唤 ])3.2 单点遥信解析典型单点遥信报文示例字节位置值说明00x68启动字符10x0BAPDU长度2-5-控制字段60x01TypeID: 单点遥信70x01SQ0, 1个信息对象80x03COT3 突发传输90x01公共地址10-120x01信息对象地址130x01SIQ值最低位为状态4. 异常处理与调试技巧4.1 常见错误代码表错误类型可能原因解决方案连接拒绝端口未开放/防火墙拦截检查网络配置和防火墙规则接收超时心跳报文丢失调整TESTFR发送间隔序号不连续丢包或处理延迟实现序号校验和重传机制ASDU解析失败长度字段与实际不符增加报文完整性校验4.2 调试工具推荐Wireshark插件使用IEC104协议解析器Socket调试助手模拟简单通信对端日志记录关键帧十六进制打印def hex_dump(data): return .join(f{b:02x} for b in data) # 示例输出68 0e 00 00 00 00 64 01 06 00 00 00 00 145. 完整示例代码以下是一个简化的子站模拟器实现import socket import time from threading import Thread class IEC104Simulator: def __init__(self): self.seq_send 0 self.seq_recv 0 def start(self): sock socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind((localhost, 2404)) sock.listen(1) print(Simulator started) while True: conn, addr sock.accept() Thread(targetself.handle_client, args(conn,)).start() def handle_client(self, conn): try: while True: data conn.recv(1024) if not data: break if data.startswith(b\x68): # APCI帧 self.process_apdu(conn, data) finally: conn.close() def process_apdu(self, conn, data): # 简化的帧处理逻辑 if data[2] 0x01 0: # I格式帧 self.seq_recv (data[3] 8) data[2] 1 self.send_s_frame(conn) # 发送确认 if data[6] 0x64: # 总召唤 self.send_gi_response(conn) def send_s_frame(self, conn): ack bytes([0x68, 0x04, (self.seq_recv 1) 0xFF, (self.seq_recv 7) 0xFF, 0x00, 0x00]) conn.send(ack)6. 进阶开发建议性能优化使用asyncio实现异步处理安全增强添加TLS加密传输协议扩展支持文件传输等附加功能状态管理实现链路状态机参考IEC 60870-5-2在真实项目中处理变电站数据时需要特别注意信号量的时序问题。某次调试中发现遥信变位上报延迟最终定位是未正确实现平衡模式下的主动上报机制。
电力自动化新手必看:用Python模拟IEC104协议通信(附完整代码)
发布时间:2026/5/31 10:09:35
Python实战用Socket模拟IEC104协议通信全流程电力自动化系统中IEC104协议作为国际通用的通信标准承担着变电站与调度中心之间数据交互的重要使命。对于刚接触这个领域的开发者而言协议文档中复杂的帧结构和专业术语往往让人望而生畏。本文将从工程实践角度带你用Python的socket库逐步构建IEC104通信模拟器通过可运行的代码理解协议本质。1. 环境准备与协议基础1.1 Python环境配置推荐使用Python 3.8版本主要依赖库如下pip install pyiec104 # 可选辅助库1.2 IEC104协议核心概念协议采用Client/Server架构典型交互流程包含三个阶段链路建立TCP三次握手数据传输启动STARTDT激活业务交互总召唤、单点遥信等关键帧类型对比帧类型方向性包含ASDU典型用途I格式双向传输是业务数据传输S格式单向确认否序号确认U格式控制指令否STARTDT/STOPDT/TESTFR2. 通信链路建立与维护2.1 TCP连接实现import socket class IEC104Server: def __init__(self, host0.0.0.0, port2404): self.sock socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind((host, port)) self.sock.listen(1) print(fServer listening on {host}:{port}) def handle_connection(self): conn, addr self.sock.accept() print(fConnected by {addr}) return conn2.2 启动数据传输流程U帧处理示例代码def process_u_frame(data): 解析U格式控制帧 if len(data) 6: return None control_field data[2:6] if control_field b\x07\x00\x00\x00: return STARTDT激活 elif control_field b\x0b\x00\x00\x00: return STARTDT确认 elif control_field b\x13\x00\x00\x00: return TESTFR激活注意实际项目中需要实现心跳检测机制通常每15秒交换一次TESTFR帧3. 业务数据交互实现3.1 总召唤处理流程总召唤GI是IEC104最重要的功能之一代码实现要点主站发送激活命令TypeID100子站分批次返回所有数据主站发送终止确认def build_gi_command(): 构建总召唤命令ASDU return bytes([ 0x68, 0x0E, # APCI长度 0x00, 0x00, 0x00, 0x00, # 发送序号 0x64, # TypeID: 100总召唤 0x01, # SQ0, 信息对象数量1 0x06, # COT6 激活 0x00, # 公共地址 0x00, 0x00, 0x00, # 信息对象地址 0x14 # QOI20 总召唤 ])3.2 单点遥信解析典型单点遥信报文示例字节位置值说明00x68启动字符10x0BAPDU长度2-5-控制字段60x01TypeID: 单点遥信70x01SQ0, 1个信息对象80x03COT3 突发传输90x01公共地址10-120x01信息对象地址130x01SIQ值最低位为状态4. 异常处理与调试技巧4.1 常见错误代码表错误类型可能原因解决方案连接拒绝端口未开放/防火墙拦截检查网络配置和防火墙规则接收超时心跳报文丢失调整TESTFR发送间隔序号不连续丢包或处理延迟实现序号校验和重传机制ASDU解析失败长度字段与实际不符增加报文完整性校验4.2 调试工具推荐Wireshark插件使用IEC104协议解析器Socket调试助手模拟简单通信对端日志记录关键帧十六进制打印def hex_dump(data): return .join(f{b:02x} for b in data) # 示例输出68 0e 00 00 00 00 64 01 06 00 00 00 00 145. 完整示例代码以下是一个简化的子站模拟器实现import socket import time from threading import Thread class IEC104Simulator: def __init__(self): self.seq_send 0 self.seq_recv 0 def start(self): sock socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind((localhost, 2404)) sock.listen(1) print(Simulator started) while True: conn, addr sock.accept() Thread(targetself.handle_client, args(conn,)).start() def handle_client(self, conn): try: while True: data conn.recv(1024) if not data: break if data.startswith(b\x68): # APCI帧 self.process_apdu(conn, data) finally: conn.close() def process_apdu(self, conn, data): # 简化的帧处理逻辑 if data[2] 0x01 0: # I格式帧 self.seq_recv (data[3] 8) data[2] 1 self.send_s_frame(conn) # 发送确认 if data[6] 0x64: # 总召唤 self.send_gi_response(conn) def send_s_frame(self, conn): ack bytes([0x68, 0x04, (self.seq_recv 1) 0xFF, (self.seq_recv 7) 0xFF, 0x00, 0x00]) conn.send(ack)6. 进阶开发建议性能优化使用asyncio实现异步处理安全增强添加TLS加密传输协议扩展支持文件传输等附加功能状态管理实现链路状态机参考IEC 60870-5-2在真实项目中处理变电站数据时需要特别注意信号量的时序问题。某次调试中发现遥信变位上报延迟最终定位是未正确实现平衡模式下的主动上报机制。