#!/usr/bin/env python3SerialPingTest - 串口直连MAVLink延迟测试直接读写 /dev/ttyAMA3不走Copilot任何链路测量纯串口RTT: 串口写COMMAND_LONG - FMU - 串口读COMMAND_ACK用法: 先停Copilot释放串口再运行:python3 SerialPingTest.pypython3 SerialPingTest.py -p /dev/ttyAMA3 -b 115200 -r 50python3 SerialPingTest.py -r 100 # 100Hzpython3 SerialPingTest.py -d # debug模式, 打印所有收到的消息import sysimport osimport timeimport structimport argparseimport mathfrom collections import OrderedDict# # MAVLink 常量# MAVLINK_STX_M1 0xFE # MAVLink 1.0MAVLINK_CORE_HEADER_LEN 5 # length seq sysid compid msgidMAVLINK_MSG_ID_COMMAND_LONG 76MAVLINK_MSG_ID_COMMAND_ACK 77MAV_CMD_PLANE_CONNECTION_STATE_REPORT 12010SYS_ID 0x02COMP_ID 0xFE# CRC_EXTRA (from ArduPilot ardupilotmega.h MAVLINK_MESSAGE_CRCS)CRC_EXTRA_TABLE {0: 50, # HEARTBEAT76: 152, # COMMAND_LONG77: 143, # COMMAND_ACK}# # MAVLink CRC - 与 ArduPilot checksum.h 完全一致的逐字节算法# def crc_accumulate_byte(data, crc):单字节CRC累加, 等价于C: crc_accumulate(data, crcAccum)tmp (data ^ (crc 0xFF)) 0xFFtmp ^ (tmp 4) 0xFFcrc ((crc 8) 0xFFFF) ^ ((tmp 8) 0xFFFF) ^ ((tmp 3) 0xFFFF) ^ ((tmp 4) 0xFFFF)return crc 0xFFFFdef crc_accumulate_buffer(data, crc):多字节CRC累加, 等价于C: crc_accumulate_buffer(crcAccum, buf, len)for b in data:crc crc_accumulate_byte(b, crc)return crcdef crc_calculate(data):计算CRC, 等价于C: crc_calculate(buf, len), init0xFFFFcrc 0xFFFFfor b in data:crc crc_accumulate_byte(b, crc)return crc# # MAVLink 消息打包 - 与 _mav_finalize_message_chan_send 完全一致# def pack_mavlink_v1(msgid, sysid, compid, seq, payload):打包 MAVLink 1.0 消息帧length len(payload)# buf[0]STX, buf[1]length, buf[2]seq, buf[3]sysid, buf[4]compid, buf[5]msgidheader struct.pack(BBBBBB, MAVLINK_STX_M1, length, seq, sysid, compid, msgid)# CRC: crc_calculate(buf[1], MAVLINK_CORE_HEADER_LEN) 即 lengthseqsysidcompidmsgid# crc_accumulate_buffer(payload) crc_accumulate(crc_extra)crc_extra CRC_EXTRA_TABLE.get(msgid, 0)crc crc_calculate(header[1:1MAVLINK_CORE_HEADER_LEN]) # lengthseqsysidcompidmsgidcrc crc_accumulate_buffer(payload, crc)crc crc_accumulate_byte(crc_extra 0xFF, crc)return header payload struct.pack(H, crc)def pack_command_long(sysid, compid, seq, target_system, target_component, command,confirmation, param1, param2, param3, param4, param5, param6, param7):打包 MAVLink 1.0 COMMAND_LONG 消息payload struct.pack(fffffffHBBB,param1, param2, param3, param4, param5, param6, param7,command, target_system, target_component, confirmation)return pack_mavlink_v1(MAVLINK_MSG_ID_COMMAND_LONG, sysid, compid, seq, payload)# # MAVLink 消息解析# class MAVLinkParser:MAVLink 1.0 消息解析器def __init__(self, debugFalse):self.debug debugself.state 0self.length 0self.seq 0self.sysid 0self.compid 0self.msgid 0self.payload bytearray()self.n_read 0self.msg_count 0def parse(self, data):results []for b in data:msg self._parse_byte(b)if msg is not None:results.append(msg)return resultsdef _parse_byte(self, b):b b 0xFFif self.state 0:if b MAVLINK_STX_M1:self.state 1self.n_read 0return Noneif self.state 1:self.length bself.state 2self.n_read 0self.hdr_buf bytearray()return Noneif self.state 2:self.hdr_buf.append(b)self.n_read 1if self.n_read 4:self.seq self.hdr_buf[0]self.sysid self.hdr_buf[1]self.compid self.hdr_buf[2]self.msgid self.hdr_buf[3]self.state 3self.payload bytearray()self.n_read 0return Noneif self.state 3:self.payload.append(b)self.n_read 1if self.n_read self.length:self.state 4self.crc_buf bytearray()self.n_read 0return Noneif self.state 4:self.crc_buf.append(b)self.n_read 1if self.n_read 2:self.state 0self.msg_count 1if self.debug:print(f[Parser] msg#{self.msg_count} id{self.msgid} sys{self.sysid} comp{self.compid} len{self.length})return (self.msgid, self.sysid, self.compid, bytes(self.payload))return Noneself.state 0return Nonedef parse_command_ack(payload):解析 COMMAND_ACK payloadif len(payload) 3:return Nonecommand struct.unpack_from(H, payload, 0)[0]result payload[2]return {command: command, result: result}# # SerialPingTest 主类# class SerialPingTest:def __init__(self, port, baudrate, rate, debugFalse):self.port portself.baudrate baudrateself.rate rateself.interval_s 1.0 / rateself.timeout_s 0.5self.debug debugself.serial_fd -1self.parser MAVLinkParser(debugdebug)self.seq 0self.pending OrderedDict()self.max_pending 100self.count 0self.lost 0self.min_rtt 999999.0self.max_rtt 0.0self.sum_rtt 0.0self.sum_rtt2 0.0self.stats_interval max(rate, 1)self.total_bytes_read 0self.running Falsedef open_serial(self):try:self.serial_fd os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)except OSError as e:print(f[SerialPingTest] open {self.port} failed: {e})return Falseimport termiostry:old termios.tcgetattr(self.serial_fd)except termios.error as e:print(f[SerialPingTest] tcgetattr failed: {e})os.close(self.serial_fd)self.serial_fd -1return Falseiflag old[0]oflag old[1]cflag old[2]lflag old[3]cc list(old[6])baud_map {9600: termios.B9600, 19200: termios.B19200, 38400: termios.B38400,57600: termios.B57600, 115200: termios.B115200, 230400: termios.B230400,460800: termios.B460800, 921600: termios.B921600,1000000: termios.B1000000, 1500000: termios.B1500000,}baud_const baud_map.get(self.baudrate, termios.B115200)cflag ~(termios.PARENB | termios.CSTOPB | termios.CSIZE | termios.CRTSCTS)cflag | termios.CS8 | termios.CREAD | termios.CLOCALiflag ~(termios.IXON | termios.IXOFF | termios.IXANY |termios.IGNBRK | termios.BRKINT | termios.PARMRK |termios.ISTRIP | termios.INLCR | termios.IGNCR | termios.ICRNL)lflag ~(termios.ECHO | termios.ECHONL | termios.ICANON | termios.ISIG | termios.IEXTEN)oflag ~(termios.OPOST | termios.ONLCR)cc[termios.VMIN] 0cc[termios.VTIME] 0new [iflag, oflag, cflag, lflag, baud_const, baud_const, cc]try:termios.tcsetattr(self.serial_fd, termios.TCSANOW, new)except termios.error as e:print(f[SerialPingTest] tcsetattr failed: {e})os.close(self.serial_fd)self.serial_fd -1return Falsetermios.tcflush(self.serial_fd, termios.TCIOFLUSH)print(f[SerialPingTest] serial {self.port} opened, fd{self.serial_fd})return Truedef close_serial(self):if self.serial_fd 0:os.close(self.serial_fd)self.serial_fd -1def send_ping(self):now_us self._time_us()confirmation self.seq 0xFFdata pack_command_long(sysidSYS_ID, compidCOMP_ID, seqself.seq 0xFF,target_system1, target_component1,commandMAV_CMD_PLANE_CONNECTION_STATE_REPORT,confirmationconfirmation,param11.0, param23.0, param30, param40,param50, param60, param70)self.seq 1if self.debug:print(f[SerialPingTest] TX {len(data)} bytes, confirm{confirmation}, hex{data.hex()})try:os.write(self.serial_fd, data)except OSError as e:print(f[SerialPingTest] write failed: {e})returnself.pending[confirmation] now_usif len(self.pending) self.max_pending:self.pending.popitem(lastFalse)def read_serial(self):try:data os.read(self.serial_fd, 4096)except (OSError, BlockingIOError):returnif not data:returnself.total_bytes_read len(data)messages self.parser.parse(data)for msgid, sysid, compid, payload in messages:if msgid MAVLINK_MSG_ID_COMMAND_ACK:ack parse_command_ack(payload)if ack and ack[command] MAV_CMD_PLANE_CONNECTION_STATE_REPORT:self._handle_ack(ack)elif self.debug:print(f[SerialPingTest] ACK cmd{ack[command] if ack else ?} (not 12010))elif self.debug:print(f[SerialPingTest] RX msgid{msgid} sys{sysid} comp{compid} len{len(payload)})def _handle_ack(self, ack):now_us self._time_us()if not self.pending:returnconfirm, send_time next(iter(self.pending.items()))self.pending.popitem(lastFalse)rtt_ms (now_us - send_time) / 1000.0self.count 1if rtt_ms self.min_rtt:self.min_rtt rtt_msif rtt_ms self.max_rtt:self.max_rtt rtt_msself.sum_rtt rtt_msself.sum_rtt2 rtt_ms * rtt_msprint(f[SerialPingTest] ACK result{ack[result]} confirm{confirm} RTT{rtt_ms:.3f} ms)if self.count % self.stats_interval 0:self.print_stats()def check_timeout(self):now_us self._time_us()timeout_us self.timeout_s * 1e6while self.pending:confirm, send_time next(iter(self.pending.items()))if now_us - send_time timeout_us:self.pending.popitem(lastFalse)self.lost 1print(f[SerialPingTest] confirm{confirm} TIMEOUT)else:breakdef print_stats(self):if self.count 0:print(f[SerialPingTest] No ACK received (lost{self.lost}, bytes_read{self.total_bytes_read}, msgs_parsed{self.parser.msg_count}))returnavg self.sum_rtt / self.countvariance (self.sum_rtt2 / self.count) - (avg * avg)stddev math.sqrt(variance) if variance 0 else 0.0total self.count self.lostloss_rate (self.lost / total * 100.0) if total 0 else 0.0print(f[SerialPingTest] )print(f[SerialPingTest] Stats (count{self.count}, lost{self.lost}, loss{loss_rate:.1f}%))print(f[SerialPingTest] Min: {self.min_rtt:.3f} ms)print(f[SerialPingTest] Max: {self.max_rtt:.3f} ms)print(f[SerialPingTest] Avg: {avg:.3f} ms)print(f[SerialPingTest] Std: {stddev:.3f} ms)print(f[SerialPingTest] )def run(self):print(f[SerialPingTest] )print(f[SerialPingTest] Serial Direct MAVLink Latency Test)print(f[SerialPingTest] Port: {self.port} {self.baudrate})print(f[SerialPingTest] Send: serial write COMMAND_LONG(12010))print(f[SerialPingTest] Recv: serial read COMMAND_ACK)print(f[SerialPingTest] Rate: {self.rate}Hz (interval {self.interval_s*1000:.0f}ms))print(f[SerialPingTest] Timeout: {self.timeout_s*1000:.0f}ms)print(f[SerialPingTest] Debug: {self.debug})print(f[SerialPingTest] CtrlC to stop)print(f[SerialPingTest] )if not self.open_serial():print([SerialPingTest] cannot open serial, exit)return -1self.running Truetry:while self.running:self.send_ping()deadline self._time_us() int(self.interval_s * 1e6)while self._time_us() deadline:self.read_serial()time.sleep(0.001)self.check_timeout()except KeyboardInterrupt:print(\n[SerialPingTest] interrupted)finally:print(\n[SerialPingTest] Final stats:)self.print_stats()self.close_serial()print([SerialPingTest] stopped)return 0staticmethoddef _time_us():return int(time.time() * 1e6)def main():parser argparse.ArgumentParser(descriptionSerial Direct MAVLink Latency Test)parser.add_argument(-p, --port, default/dev/ttyAMA3, helpSerial port)parser.add_argument(-b, --baud, typeint, default115200, helpBaud rate)parser.add_argument(-r, --rate, typeint, default50, helpPing rate in Hz)parser.add_argument(-d, --debug, actionstore_true, helpDebug mode)args parser.parse_args()test SerialPingTest(portargs.port, baudrateargs.baud, rateargs.rate, debugargs.debug)return test.run()if __name__ __main__:sys.exit(main())
mavlink_V1
发布时间:2026/7/1 2:52:01
#!/usr/bin/env python3SerialPingTest - 串口直连MAVLink延迟测试直接读写 /dev/ttyAMA3不走Copilot任何链路测量纯串口RTT: 串口写COMMAND_LONG - FMU - 串口读COMMAND_ACK用法: 先停Copilot释放串口再运行:python3 SerialPingTest.pypython3 SerialPingTest.py -p /dev/ttyAMA3 -b 115200 -r 50python3 SerialPingTest.py -r 100 # 100Hzpython3 SerialPingTest.py -d # debug模式, 打印所有收到的消息import sysimport osimport timeimport structimport argparseimport mathfrom collections import OrderedDict# # MAVLink 常量# MAVLINK_STX_M1 0xFE # MAVLink 1.0MAVLINK_CORE_HEADER_LEN 5 # length seq sysid compid msgidMAVLINK_MSG_ID_COMMAND_LONG 76MAVLINK_MSG_ID_COMMAND_ACK 77MAV_CMD_PLANE_CONNECTION_STATE_REPORT 12010SYS_ID 0x02COMP_ID 0xFE# CRC_EXTRA (from ArduPilot ardupilotmega.h MAVLINK_MESSAGE_CRCS)CRC_EXTRA_TABLE {0: 50, # HEARTBEAT76: 152, # COMMAND_LONG77: 143, # COMMAND_ACK}# # MAVLink CRC - 与 ArduPilot checksum.h 完全一致的逐字节算法# def crc_accumulate_byte(data, crc):单字节CRC累加, 等价于C: crc_accumulate(data, crcAccum)tmp (data ^ (crc 0xFF)) 0xFFtmp ^ (tmp 4) 0xFFcrc ((crc 8) 0xFFFF) ^ ((tmp 8) 0xFFFF) ^ ((tmp 3) 0xFFFF) ^ ((tmp 4) 0xFFFF)return crc 0xFFFFdef crc_accumulate_buffer(data, crc):多字节CRC累加, 等价于C: crc_accumulate_buffer(crcAccum, buf, len)for b in data:crc crc_accumulate_byte(b, crc)return crcdef crc_calculate(data):计算CRC, 等价于C: crc_calculate(buf, len), init0xFFFFcrc 0xFFFFfor b in data:crc crc_accumulate_byte(b, crc)return crc# # MAVLink 消息打包 - 与 _mav_finalize_message_chan_send 完全一致# def pack_mavlink_v1(msgid, sysid, compid, seq, payload):打包 MAVLink 1.0 消息帧length len(payload)# buf[0]STX, buf[1]length, buf[2]seq, buf[3]sysid, buf[4]compid, buf[5]msgidheader struct.pack(BBBBBB, MAVLINK_STX_M1, length, seq, sysid, compid, msgid)# CRC: crc_calculate(buf[1], MAVLINK_CORE_HEADER_LEN) 即 lengthseqsysidcompidmsgid# crc_accumulate_buffer(payload) crc_accumulate(crc_extra)crc_extra CRC_EXTRA_TABLE.get(msgid, 0)crc crc_calculate(header[1:1MAVLINK_CORE_HEADER_LEN]) # lengthseqsysidcompidmsgidcrc crc_accumulate_buffer(payload, crc)crc crc_accumulate_byte(crc_extra 0xFF, crc)return header payload struct.pack(H, crc)def pack_command_long(sysid, compid, seq, target_system, target_component, command,confirmation, param1, param2, param3, param4, param5, param6, param7):打包 MAVLink 1.0 COMMAND_LONG 消息payload struct.pack(fffffffHBBB,param1, param2, param3, param4, param5, param6, param7,command, target_system, target_component, confirmation)return pack_mavlink_v1(MAVLINK_MSG_ID_COMMAND_LONG, sysid, compid, seq, payload)# # MAVLink 消息解析# class MAVLinkParser:MAVLink 1.0 消息解析器def __init__(self, debugFalse):self.debug debugself.state 0self.length 0self.seq 0self.sysid 0self.compid 0self.msgid 0self.payload bytearray()self.n_read 0self.msg_count 0def parse(self, data):results []for b in data:msg self._parse_byte(b)if msg is not None:results.append(msg)return resultsdef _parse_byte(self, b):b b 0xFFif self.state 0:if b MAVLINK_STX_M1:self.state 1self.n_read 0return Noneif self.state 1:self.length bself.state 2self.n_read 0self.hdr_buf bytearray()return Noneif self.state 2:self.hdr_buf.append(b)self.n_read 1if self.n_read 4:self.seq self.hdr_buf[0]self.sysid self.hdr_buf[1]self.compid self.hdr_buf[2]self.msgid self.hdr_buf[3]self.state 3self.payload bytearray()self.n_read 0return Noneif self.state 3:self.payload.append(b)self.n_read 1if self.n_read self.length:self.state 4self.crc_buf bytearray()self.n_read 0return Noneif self.state 4:self.crc_buf.append(b)self.n_read 1if self.n_read 2:self.state 0self.msg_count 1if self.debug:print(f[Parser] msg#{self.msg_count} id{self.msgid} sys{self.sysid} comp{self.compid} len{self.length})return (self.msgid, self.sysid, self.compid, bytes(self.payload))return Noneself.state 0return Nonedef parse_command_ack(payload):解析 COMMAND_ACK payloadif len(payload) 3:return Nonecommand struct.unpack_from(H, payload, 0)[0]result payload[2]return {command: command, result: result}# # SerialPingTest 主类# class SerialPingTest:def __init__(self, port, baudrate, rate, debugFalse):self.port portself.baudrate baudrateself.rate rateself.interval_s 1.0 / rateself.timeout_s 0.5self.debug debugself.serial_fd -1self.parser MAVLinkParser(debugdebug)self.seq 0self.pending OrderedDict()self.max_pending 100self.count 0self.lost 0self.min_rtt 999999.0self.max_rtt 0.0self.sum_rtt 0.0self.sum_rtt2 0.0self.stats_interval max(rate, 1)self.total_bytes_read 0self.running Falsedef open_serial(self):try:self.serial_fd os.open(self.port, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)except OSError as e:print(f[SerialPingTest] open {self.port} failed: {e})return Falseimport termiostry:old termios.tcgetattr(self.serial_fd)except termios.error as e:print(f[SerialPingTest] tcgetattr failed: {e})os.close(self.serial_fd)self.serial_fd -1return Falseiflag old[0]oflag old[1]cflag old[2]lflag old[3]cc list(old[6])baud_map {9600: termios.B9600, 19200: termios.B19200, 38400: termios.B38400,57600: termios.B57600, 115200: termios.B115200, 230400: termios.B230400,460800: termios.B460800, 921600: termios.B921600,1000000: termios.B1000000, 1500000: termios.B1500000,}baud_const baud_map.get(self.baudrate, termios.B115200)cflag ~(termios.PARENB | termios.CSTOPB | termios.CSIZE | termios.CRTSCTS)cflag | termios.CS8 | termios.CREAD | termios.CLOCALiflag ~(termios.IXON | termios.IXOFF | termios.IXANY |termios.IGNBRK | termios.BRKINT | termios.PARMRK |termios.ISTRIP | termios.INLCR | termios.IGNCR | termios.ICRNL)lflag ~(termios.ECHO | termios.ECHONL | termios.ICANON | termios.ISIG | termios.IEXTEN)oflag ~(termios.OPOST | termios.ONLCR)cc[termios.VMIN] 0cc[termios.VTIME] 0new [iflag, oflag, cflag, lflag, baud_const, baud_const, cc]try:termios.tcsetattr(self.serial_fd, termios.TCSANOW, new)except termios.error as e:print(f[SerialPingTest] tcsetattr failed: {e})os.close(self.serial_fd)self.serial_fd -1return Falsetermios.tcflush(self.serial_fd, termios.TCIOFLUSH)print(f[SerialPingTest] serial {self.port} opened, fd{self.serial_fd})return Truedef close_serial(self):if self.serial_fd 0:os.close(self.serial_fd)self.serial_fd -1def send_ping(self):now_us self._time_us()confirmation self.seq 0xFFdata pack_command_long(sysidSYS_ID, compidCOMP_ID, seqself.seq 0xFF,target_system1, target_component1,commandMAV_CMD_PLANE_CONNECTION_STATE_REPORT,confirmationconfirmation,param11.0, param23.0, param30, param40,param50, param60, param70)self.seq 1if self.debug:print(f[SerialPingTest] TX {len(data)} bytes, confirm{confirmation}, hex{data.hex()})try:os.write(self.serial_fd, data)except OSError as e:print(f[SerialPingTest] write failed: {e})returnself.pending[confirmation] now_usif len(self.pending) self.max_pending:self.pending.popitem(lastFalse)def read_serial(self):try:data os.read(self.serial_fd, 4096)except (OSError, BlockingIOError):returnif not data:returnself.total_bytes_read len(data)messages self.parser.parse(data)for msgid, sysid, compid, payload in messages:if msgid MAVLINK_MSG_ID_COMMAND_ACK:ack parse_command_ack(payload)if ack and ack[command] MAV_CMD_PLANE_CONNECTION_STATE_REPORT:self._handle_ack(ack)elif self.debug:print(f[SerialPingTest] ACK cmd{ack[command] if ack else ?} (not 12010))elif self.debug:print(f[SerialPingTest] RX msgid{msgid} sys{sysid} comp{compid} len{len(payload)})def _handle_ack(self, ack):now_us self._time_us()if not self.pending:returnconfirm, send_time next(iter(self.pending.items()))self.pending.popitem(lastFalse)rtt_ms (now_us - send_time) / 1000.0self.count 1if rtt_ms self.min_rtt:self.min_rtt rtt_msif rtt_ms self.max_rtt:self.max_rtt rtt_msself.sum_rtt rtt_msself.sum_rtt2 rtt_ms * rtt_msprint(f[SerialPingTest] ACK result{ack[result]} confirm{confirm} RTT{rtt_ms:.3f} ms)if self.count % self.stats_interval 0:self.print_stats()def check_timeout(self):now_us self._time_us()timeout_us self.timeout_s * 1e6while self.pending:confirm, send_time next(iter(self.pending.items()))if now_us - send_time timeout_us:self.pending.popitem(lastFalse)self.lost 1print(f[SerialPingTest] confirm{confirm} TIMEOUT)else:breakdef print_stats(self):if self.count 0:print(f[SerialPingTest] No ACK received (lost{self.lost}, bytes_read{self.total_bytes_read}, msgs_parsed{self.parser.msg_count}))returnavg self.sum_rtt / self.countvariance (self.sum_rtt2 / self.count) - (avg * avg)stddev math.sqrt(variance) if variance 0 else 0.0total self.count self.lostloss_rate (self.lost / total * 100.0) if total 0 else 0.0print(f[SerialPingTest] )print(f[SerialPingTest] Stats (count{self.count}, lost{self.lost}, loss{loss_rate:.1f}%))print(f[SerialPingTest] Min: {self.min_rtt:.3f} ms)print(f[SerialPingTest] Max: {self.max_rtt:.3f} ms)print(f[SerialPingTest] Avg: {avg:.3f} ms)print(f[SerialPingTest] Std: {stddev:.3f} ms)print(f[SerialPingTest] )def run(self):print(f[SerialPingTest] )print(f[SerialPingTest] Serial Direct MAVLink Latency Test)print(f[SerialPingTest] Port: {self.port} {self.baudrate})print(f[SerialPingTest] Send: serial write COMMAND_LONG(12010))print(f[SerialPingTest] Recv: serial read COMMAND_ACK)print(f[SerialPingTest] Rate: {self.rate}Hz (interval {self.interval_s*1000:.0f}ms))print(f[SerialPingTest] Timeout: {self.timeout_s*1000:.0f}ms)print(f[SerialPingTest] Debug: {self.debug})print(f[SerialPingTest] CtrlC to stop)print(f[SerialPingTest] )if not self.open_serial():print([SerialPingTest] cannot open serial, exit)return -1self.running Truetry:while self.running:self.send_ping()deadline self._time_us() int(self.interval_s * 1e6)while self._time_us() deadline:self.read_serial()time.sleep(0.001)self.check_timeout()except KeyboardInterrupt:print(\n[SerialPingTest] interrupted)finally:print(\n[SerialPingTest] Final stats:)self.print_stats()self.close_serial()print([SerialPingTest] stopped)return 0staticmethoddef _time_us():return int(time.time() * 1e6)def main():parser argparse.ArgumentParser(descriptionSerial Direct MAVLink Latency Test)parser.add_argument(-p, --port, default/dev/ttyAMA3, helpSerial port)parser.add_argument(-b, --baud, typeint, default115200, helpBaud rate)parser.add_argument(-r, --rate, typeint, default50, helpPing rate in Hz)parser.add_argument(-d, --debug, actionstore_true, helpDebug mode)args parser.parse_args()test SerialPingTest(portargs.port, baudrateargs.baud, rateargs.rate, debugargs.debug)return test.run()if __name__ __main__:sys.exit(main())