从一行HEX到水文数据手把手教你用Python解析SL651-2014协议报文1. 理解SL651-2014协议的核心结构水文监测领域的SL651-2014协议定义了遥测终端与中心站之间的通信规范。当我们从串口或网络接收到原始HEX报文时首先要理解其分层封装结构典型报文结构示例 7E7E [起始符] 01 [中心站地址] 0012345678 [遥测站地址] 1234 [密码] 30 [功能码] 002B [数据长度] 02 [数据起始符] 0003 [流水号] 591011154947 [时间戳] ... [数据体] 03 [结束符] 20FA [CRC校验]关键字段解析功能码决定报文类型如0x30为定时报数据长度高字节表示传输方向低字节为实际长度时间戳采用BCD编码的yyMMddHHmmss格式注意协议要求所有多字节字段均采用大端序(Big-Endian)传输2. 搭建Python解析框架2.1 基础工具函数首先创建核心转换工具import struct import binascii from datetime import datetime def hex_to_bcd(hex_str): BCD码转十进制 return int(hex_str, 16) def parse_timestamp(bcd_bytes): 解析6字节BCD时间戳 dt_str f20{bcd_bytes[0]:02d}{bcd_bytes[1]:02d}{bcd_bytes[2]:02d} dt_str f{bcd_bytes[3]:02d}{bcd_bytes[4]:02d}{bcd_bytes[5]:02d} return datetime.strptime(dt_str, %Y%m%d%H%M%S) def calc_crc(data): 计算CRC-16校验值 crc 0xFFFF for byte in data: crc ^ byte for _ in range(8): if crc 0x0001: crc 1 crc ^ 0xA001 else: crc 1 return crc.to_bytes(2, big)2.2 报文拆解类构建面向对象的解析框架class SL651Parser: def __init__(self, hex_packet): self.raw binascii.unhexlify(hex_packet) self.header { start_mark: None, center_addr: None, station_addr: None, password: None, function_code: None, data_length: None, data_start: None, serial_num: None } def validate(self): 校验报文完整性 if len(self.raw) 20: raise ValueError(报文过短) crc_received self.raw[-2:] crc_calculated calc_crc(self.raw[:-2]) return crc_received crc_calculated def parse_header(self): 解析固定头部 fmt 2s B 5s 2s B 2s B 2s fields struct.unpack_from(fmt, self.raw) self.header.update({ start_mark: fields[0], center_addr: fields[1], station_addr: fields[2].decode(ascii), password: fields[3].hex(), function_code: fields[4], data_length: fields[5], data_start: fields[6], serial_num: int.from_bytes(fields[7], big) }) return self.header3. 处理水文要素数据3.1 常见要素解析逻辑不同功能码对应不同的数据体结构。以定时报(0x30)为例def parse_telemetry_data(data): 解析遥测站定时报数据体 elements [] pos 0 while pos len(data): elem_id data[pos:pos2].hex() pos 2 if elem_id 2019: # 当前降水量 length_dec data[pos] 3 decimal_places data[pos] 0x07 pos 1 value int.from_bytes(data[pos:poslength_dec], big) elements.append({ type: precipitation, value: value / (10 ** decimal_places), unit: mm }) pos length_dec # 其他要素类型处理... return elements3.2 特殊数据类型处理协议中几种特殊编码方式数据类型编码方式示例解析方法BCD时间BCD编码0x591011154947每半字节代表1位数字浮点数定标法0x19(3字节1小数位)值原始值×10^-小数位状态量位掩码0x4520000004按位解析各状态4. 完整解析流程实战以测试报30为例的分步解析sample 7E7E010012345678123430002B020003591011154947F1F1001234567848F0F0591011154920190000052619000005392300000127381211150320FA # 步骤1基础校验 parser SL651Parser(sample) if not parser.validate(): raise ValueError(CRC校验失败) # 步骤2解析头部 header parser.parse_header() print(f收到来自站号{header[station_addr]}的定时报) # 步骤3提取数据体 data_body parser.raw[20:-3] # 跳过头部和结束符 elements parse_telemetry_data(data_body) # 步骤4输出结果 for elem in elements: print(f{elem[type]}: {elem[value]}{elem[unit]})输出示例当前降水量: 0.5mm 降水量累计值: 0.5mm 瞬时河道水位: 0.127m 电源电压: 11.15V5. 异常处理与优化建议5.1 常见错误排查CRC校验失败检查报文是否被截断或传输错误时间戳异常确认时区设置和设备时钟同步数据越界严格验证data_length字段与实际数据长度5.2 性能优化技巧# 使用内存视图减少拷贝 def parse_large_packet(packet): view memoryview(packet) crc calc_crc(view[:-2]) if crc ! view[-2:]: return None # 其他处理...实际项目中遇到的坑某些设备会发送非标准功能码多包传输时需要处理报文分片历史数据中存在协议版本差异
从一行HEX到水文数据:手把手教你用Python解析SL651-2014协议报文
发布时间:2026/6/11 1:30:53
从一行HEX到水文数据手把手教你用Python解析SL651-2014协议报文1. 理解SL651-2014协议的核心结构水文监测领域的SL651-2014协议定义了遥测终端与中心站之间的通信规范。当我们从串口或网络接收到原始HEX报文时首先要理解其分层封装结构典型报文结构示例 7E7E [起始符] 01 [中心站地址] 0012345678 [遥测站地址] 1234 [密码] 30 [功能码] 002B [数据长度] 02 [数据起始符] 0003 [流水号] 591011154947 [时间戳] ... [数据体] 03 [结束符] 20FA [CRC校验]关键字段解析功能码决定报文类型如0x30为定时报数据长度高字节表示传输方向低字节为实际长度时间戳采用BCD编码的yyMMddHHmmss格式注意协议要求所有多字节字段均采用大端序(Big-Endian)传输2. 搭建Python解析框架2.1 基础工具函数首先创建核心转换工具import struct import binascii from datetime import datetime def hex_to_bcd(hex_str): BCD码转十进制 return int(hex_str, 16) def parse_timestamp(bcd_bytes): 解析6字节BCD时间戳 dt_str f20{bcd_bytes[0]:02d}{bcd_bytes[1]:02d}{bcd_bytes[2]:02d} dt_str f{bcd_bytes[3]:02d}{bcd_bytes[4]:02d}{bcd_bytes[5]:02d} return datetime.strptime(dt_str, %Y%m%d%H%M%S) def calc_crc(data): 计算CRC-16校验值 crc 0xFFFF for byte in data: crc ^ byte for _ in range(8): if crc 0x0001: crc 1 crc ^ 0xA001 else: crc 1 return crc.to_bytes(2, big)2.2 报文拆解类构建面向对象的解析框架class SL651Parser: def __init__(self, hex_packet): self.raw binascii.unhexlify(hex_packet) self.header { start_mark: None, center_addr: None, station_addr: None, password: None, function_code: None, data_length: None, data_start: None, serial_num: None } def validate(self): 校验报文完整性 if len(self.raw) 20: raise ValueError(报文过短) crc_received self.raw[-2:] crc_calculated calc_crc(self.raw[:-2]) return crc_received crc_calculated def parse_header(self): 解析固定头部 fmt 2s B 5s 2s B 2s B 2s fields struct.unpack_from(fmt, self.raw) self.header.update({ start_mark: fields[0], center_addr: fields[1], station_addr: fields[2].decode(ascii), password: fields[3].hex(), function_code: fields[4], data_length: fields[5], data_start: fields[6], serial_num: int.from_bytes(fields[7], big) }) return self.header3. 处理水文要素数据3.1 常见要素解析逻辑不同功能码对应不同的数据体结构。以定时报(0x30)为例def parse_telemetry_data(data): 解析遥测站定时报数据体 elements [] pos 0 while pos len(data): elem_id data[pos:pos2].hex() pos 2 if elem_id 2019: # 当前降水量 length_dec data[pos] 3 decimal_places data[pos] 0x07 pos 1 value int.from_bytes(data[pos:poslength_dec], big) elements.append({ type: precipitation, value: value / (10 ** decimal_places), unit: mm }) pos length_dec # 其他要素类型处理... return elements3.2 特殊数据类型处理协议中几种特殊编码方式数据类型编码方式示例解析方法BCD时间BCD编码0x591011154947每半字节代表1位数字浮点数定标法0x19(3字节1小数位)值原始值×10^-小数位状态量位掩码0x4520000004按位解析各状态4. 完整解析流程实战以测试报30为例的分步解析sample 7E7E010012345678123430002B020003591011154947F1F1001234567848F0F0591011154920190000052619000005392300000127381211150320FA # 步骤1基础校验 parser SL651Parser(sample) if not parser.validate(): raise ValueError(CRC校验失败) # 步骤2解析头部 header parser.parse_header() print(f收到来自站号{header[station_addr]}的定时报) # 步骤3提取数据体 data_body parser.raw[20:-3] # 跳过头部和结束符 elements parse_telemetry_data(data_body) # 步骤4输出结果 for elem in elements: print(f{elem[type]}: {elem[value]}{elem[unit]})输出示例当前降水量: 0.5mm 降水量累计值: 0.5mm 瞬时河道水位: 0.127m 电源电压: 11.15V5. 异常处理与优化建议5.1 常见错误排查CRC校验失败检查报文是否被截断或传输错误时间戳异常确认时区设置和设备时钟同步数据越界严格验证data_length字段与实际数据长度5.2 性能优化技巧# 使用内存视图减少拷贝 def parse_large_packet(packet): view memoryview(packet) crc calc_crc(view[:-2]) if crc ! view[-2:]: return None # 其他处理...实际项目中遇到的坑某些设备会发送非标准功能码多包传输时需要处理报文分片历史数据中存在协议版本差异