用PythonPyModbus构建Modbus RTU从站从协议解析到实战调试在工业自动化领域Modbus RTU协议因其简单可靠的特点已成为连接PLC、传感器和上位机的通用语言。但对于开发者而言仅理解协议规范远远不够——当需要模拟设备行为、测试主站程序或排查通信故障时一个可编程的虚拟从站能极大提升工作效率。本文将带您用Python的PyModbus库从零构建支持8种核心功能码的Modbus RTU从站并通过Wireshark抓包分析真实数据流掌握协议实现的每个技术细节。1. 环境搭建与基础配置1.1 PyModbus库安装与虚拟串口设置首先通过pip安装最新版PyModbus3.5.0及其依赖pip install pymodbus3.5.0 pyserial3.5为模拟真实硬件环境建议使用虚拟串口工具创建端口对。在Windows上可以使用com0com# 验证串口可用性 import serial.tools.list_ports print([port.device for port in serial.tools.list_ports.comports()])1.2 从站基础框架搭建PyModbus提供了同步和异步两种API我们选择更易上手的同步版本。以下代码创建了一个支持RTU模式的从站实例from pymodbus.server import StartSerialServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext # 初始化数据存储区 coils ModbusSequentialDataBlock(0, [False]*100) # 线圈状态 discrete_inputs ModbusSequentialDataBlock(0, [True]*100) # 离散输入 holding_registers ModbusSequentialDataBlock(0, [0]*100) # 保持寄存器 input_registers ModbusSequentialDataBlock(0, [0]*100) # 输入寄存器 # 创建从站上下文 slave_context ModbusSlaveContext( didiscrete_inputs, cocoils, hrholding_registers, irinput_registers ) context ModbusServerContext(slavesslave_context, singleTrue) # 启动RTU从站 StartSerialServer( contextcontext, portCOM2, # 虚拟串口 framerModbusRtuFramer, baudrate19200, timeout0.005 )2. 核心功能码实现解析2.1 读操作功能码0x01-0x04四种读操作虽然对象不同但处理逻辑高度相似。以0x03读保持寄存器为例观察实际数据交互主站请求帧十六进制表示01 03 00 00 00 03 05 CB01从站地址03功能码00 00起始地址00 03寄存器数量05 CBCRC校验从站响应帧01 03 06 00 21 00 00 00 00 9D 7206返回字节数3寄存器×2字节00 21等寄存器值大端格式在代码中可以通过自定义数据块实现动态响应class DynamicDataBlock(ModbusSequentialDataBlock): def getValues(self, address, count1): # 模拟温度传感器地址0返回随机温度值 if address 0: return [random.randint(200, 250)] return super().getValues(address, count)2.2 写操作功能码0x05-0x10单线圈写入0x05的请求/响应帧完全一致这是协议设计的特殊之处01 05 00 00 FF 00 8C 3AFF 00表示置位00 00表示复位对于多寄存器写入0x10需要注意字节序处理。以下代码演示如何添加写入回调def on_write(context, slave, address, values): print(f寄存器 {address} 被修改为: {values}) slave_context ModbusSlaveContext( # ...其他参数... write_callbackon_write )3. 高级调试技巧3.1 Wireshark抓包分析配置使用Wireshark分析Modbus RTU需要特殊配置安装USBPcap捕获USB转串口流量设置显示过滤器modbus || crc16关键字段解析技巧帧间隔3.5字符时间是RTU模式的重要特征CRC校验错误通常表现为[Malformed Packet]3.2 异常响应处理PyModbus默认会处理异常情况但有时需要自定义异常码from pymodbus.exceptions import ModbusException from pymodbus.pdu import ExceptionResponse def handle_request(request, client): try: return request.execute(context) except ModbusException as exc: return ExceptionResponse(request.function_code, exc.exception_code)常见异常码代码含义触发场景0x01非法功能码接收到未实现的功能码0x02非法数据地址访问不存在的寄存器地址0x03非法数据值写入值超出允许范围4. 实战案例模拟温度控制器综合应用各功能码实现一个带报警功能的虚拟温度控制器class TemperatureController: def __init__(self): self.temp_registers [0]*10 self.alarm_coil False def update(self): # 寄存器0当前温度模拟值 self.temp_registers[0] random.randint(180, 300) # 寄存器1高温阈值可写 if self.temp_registers[0] self.temp_registers[1]: self.alarm_coil True # 寄存器2低温阈值可写 if self.temp_registers[0] self.temp_registers[2]: self.alarm_coil True # 集成到从站 controller TemperatureController() def custom_read(context, slave, address, count): if address 0: # 温度寄存器区 controller.update() return controller.temp_registers[address:addresscount] return context[slave].getValues(address, count)通过Modbus Poll等测试工具可以观察到保持寄存器1/2可读写设置阈值线圈0反映报警状态输入寄存器0实时变化温度值
用Python+PyModbus模拟一个Modbus RTU从站:从功能码到数据帧的完整实战
发布时间:2026/6/7 3:46:03
用PythonPyModbus构建Modbus RTU从站从协议解析到实战调试在工业自动化领域Modbus RTU协议因其简单可靠的特点已成为连接PLC、传感器和上位机的通用语言。但对于开发者而言仅理解协议规范远远不够——当需要模拟设备行为、测试主站程序或排查通信故障时一个可编程的虚拟从站能极大提升工作效率。本文将带您用Python的PyModbus库从零构建支持8种核心功能码的Modbus RTU从站并通过Wireshark抓包分析真实数据流掌握协议实现的每个技术细节。1. 环境搭建与基础配置1.1 PyModbus库安装与虚拟串口设置首先通过pip安装最新版PyModbus3.5.0及其依赖pip install pymodbus3.5.0 pyserial3.5为模拟真实硬件环境建议使用虚拟串口工具创建端口对。在Windows上可以使用com0com# 验证串口可用性 import serial.tools.list_ports print([port.device for port in serial.tools.list_ports.comports()])1.2 从站基础框架搭建PyModbus提供了同步和异步两种API我们选择更易上手的同步版本。以下代码创建了一个支持RTU模式的从站实例from pymodbus.server import StartSerialServer from pymodbus.datastore import ModbusSequentialDataBlock from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext # 初始化数据存储区 coils ModbusSequentialDataBlock(0, [False]*100) # 线圈状态 discrete_inputs ModbusSequentialDataBlock(0, [True]*100) # 离散输入 holding_registers ModbusSequentialDataBlock(0, [0]*100) # 保持寄存器 input_registers ModbusSequentialDataBlock(0, [0]*100) # 输入寄存器 # 创建从站上下文 slave_context ModbusSlaveContext( didiscrete_inputs, cocoils, hrholding_registers, irinput_registers ) context ModbusServerContext(slavesslave_context, singleTrue) # 启动RTU从站 StartSerialServer( contextcontext, portCOM2, # 虚拟串口 framerModbusRtuFramer, baudrate19200, timeout0.005 )2. 核心功能码实现解析2.1 读操作功能码0x01-0x04四种读操作虽然对象不同但处理逻辑高度相似。以0x03读保持寄存器为例观察实际数据交互主站请求帧十六进制表示01 03 00 00 00 03 05 CB01从站地址03功能码00 00起始地址00 03寄存器数量05 CBCRC校验从站响应帧01 03 06 00 21 00 00 00 00 9D 7206返回字节数3寄存器×2字节00 21等寄存器值大端格式在代码中可以通过自定义数据块实现动态响应class DynamicDataBlock(ModbusSequentialDataBlock): def getValues(self, address, count1): # 模拟温度传感器地址0返回随机温度值 if address 0: return [random.randint(200, 250)] return super().getValues(address, count)2.2 写操作功能码0x05-0x10单线圈写入0x05的请求/响应帧完全一致这是协议设计的特殊之处01 05 00 00 FF 00 8C 3AFF 00表示置位00 00表示复位对于多寄存器写入0x10需要注意字节序处理。以下代码演示如何添加写入回调def on_write(context, slave, address, values): print(f寄存器 {address} 被修改为: {values}) slave_context ModbusSlaveContext( # ...其他参数... write_callbackon_write )3. 高级调试技巧3.1 Wireshark抓包分析配置使用Wireshark分析Modbus RTU需要特殊配置安装USBPcap捕获USB转串口流量设置显示过滤器modbus || crc16关键字段解析技巧帧间隔3.5字符时间是RTU模式的重要特征CRC校验错误通常表现为[Malformed Packet]3.2 异常响应处理PyModbus默认会处理异常情况但有时需要自定义异常码from pymodbus.exceptions import ModbusException from pymodbus.pdu import ExceptionResponse def handle_request(request, client): try: return request.execute(context) except ModbusException as exc: return ExceptionResponse(request.function_code, exc.exception_code)常见异常码代码含义触发场景0x01非法功能码接收到未实现的功能码0x02非法数据地址访问不存在的寄存器地址0x03非法数据值写入值超出允许范围4. 实战案例模拟温度控制器综合应用各功能码实现一个带报警功能的虚拟温度控制器class TemperatureController: def __init__(self): self.temp_registers [0]*10 self.alarm_coil False def update(self): # 寄存器0当前温度模拟值 self.temp_registers[0] random.randint(180, 300) # 寄存器1高温阈值可写 if self.temp_registers[0] self.temp_registers[1]: self.alarm_coil True # 寄存器2低温阈值可写 if self.temp_registers[0] self.temp_registers[2]: self.alarm_coil True # 集成到从站 controller TemperatureController() def custom_read(context, slave, address, count): if address 0: # 温度寄存器区 controller.update() return controller.temp_registers[address:addresscount] return context[slave].getValues(address, count)通过Modbus Poll等测试工具可以观察到保持寄存器1/2可读写设置阈值线圈0反映报警状态输入寄存器0实时变化温度值