micropython/tests/serial_test.py
Angus Gratton 2762fe680a tests/serial_test.py: Allow up to 2 seconds between bytes.
Only a problem when UART TX is also enabled and goes first (i.e. esp32
port) as sending 16384 bytes in one go triggers the timeout.

This work was funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2025-11-06 12:26:43 +11:00

269 lines
7.3 KiB
Python
Executable File

#!/usr/bin/env python
#
# Performance and reliability test for serial port communication.
#
# Basic usage:
# serial_test.py [-t serial-device]
#
# The `serial-device` will default to /dev/ttyACM0.
import argparse
import serial
import sys
import time
run_tests_module = __import__("run-tests")
read_test_script = """
bin = True
try:
wr=__import__("pyb").USB_VCP(0).send
except:
import sys
if hasattr(sys.stdout,'buffer'):
wr=sys.stdout.buffer.write
else:
wr=sys.stdout.write
bin = False
b=bytearray(%u)
if bin:
wr('BIN')
for i in range(len(b)):
b[i] = i & 0xff
else:
wr('TXT')
for i in range(len(b)):
b[i] = 0x20 + (i & 0x3f)
for _ in range(%d):
wr(b)
"""
write_test_script_verified = """
import sys
try:
rd=__import__("pyb").USB_VCP(0).recv
except:
rd=sys.stdin.readinto
b=bytearray(%u)
for _ in range(%u):
n = rd(b)
fail = 0
for i in range(n):
if b[i] != 32 + (i & 0x3f):
fail += 1
if fail:
sys.stdout.write(b'ER%%05u' %% fail)
else:
sys.stdout.write(b'OK%%05u' %% n)
"""
write_test_script_unverified = """
import sys
try:
rd=__import__("pyb").USB_VCP(0).recv
except:
rd=sys.stdin.readinto
b=bytearray(%u)
for _ in range(%u):
n = rd(b)
if n != len(b):
sys.stdout.write(b'ER%%05u' %% n)
else:
sys.stdout.write(b'OK%%05u' %% n)
"""
class TestError(Exception):
pass
def drain_input(ser):
time.sleep(0.1)
while ser.inWaiting() > 0:
data = ser.read(ser.inWaiting())
time.sleep(0.1)
def send_script(ser, script):
chunk_size = 32
for i in range(0, len(script), chunk_size):
ser.write(script[i : i + chunk_size])
time.sleep(0.01)
ser.write(b"\x04") # eof
ser.flush()
response = ser.read(2)
if response != b"OK":
response += ser.read(ser.inWaiting())
raise TestError("could not send script", response)
def read_test(ser_repl, ser_data, bufsize, nbuf):
global test_passed
assert bufsize % 256 == 0 # for verify to work
# how long to wait for data from device
# (if UART TX is also enabled then it can take 1.4s to send
# out a 16KB butter at 115200bps)
READ_TIMEOUT_S = 2
# Load and run the read_test_script.
ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
drain_input(ser_repl)
script = bytes(read_test_script % (bufsize, nbuf), "ascii")
send_script(ser_repl, script)
# Read from the device the type of data that it will send (BIN or TXT).
data_type = ser_data.read(3)
# Read data from the device, check it is correct, and measure throughput.
n = 0
last_byte = None
t_start = time.time()
remain = nbuf * bufsize
total_data = bytearray(remain)
while remain:
t0 = time.monotonic_ns()
while ser_data.inWaiting() == 0:
if time.monotonic_ns() - t0 > READ_TIMEOUT_S * 1e9:
# timeout waiting for data from device
break
time.sleep(0.0001)
if not ser_data.inWaiting():
test_passed = False
print("ERROR: timeout waiting for data")
print(total_data[:n])
return 0
to_read = min(ser_data.inWaiting(), remain)
data = ser_data.read(to_read)
remain -= len(data)
print(f"{n} / {nbuf * bufsize}", end="\r")
total_data[n : n + len(data)] = data
n += len(data)
t_end = time.time()
for i in range(0, len(total_data)):
if data_type == b"BIN":
wanted = i & 0xFF
else:
wanted = 0x20 + (i & 0x3F)
if total_data[i] != wanted:
test_passed = False
print("ERROR: data mismatch:", i, wanted, total_data[i])
ser_repl.write(b"\x03") # break
t = t_end - t_start
# Print results.
print(
"DATA IN: bufsize=%u, read %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
% (bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
)
return n / t
def write_test(ser_repl, ser_data, bufsize, nbuf, verified):
global test_passed
# Load and run the write_test_script.
ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
drain_input(ser_repl)
if verified:
script = write_test_script_verified
else:
script = write_test_script_unverified
script = bytes(script % (bufsize, nbuf), "ascii")
send_script(ser_repl, script)
drain_input(ser_repl)
# Write data to the device, check it is correct, and measure throughput.
n = 0
t_start = time.time()
buf = bytearray(bufsize)
for i in range(len(buf)):
buf[i] = 32 + (i & 0x3F) # don't want to send ctrl chars!
for i in range(nbuf):
ser_data.write(buf)
n += len(buf)
print(f"{n} / {nbuf * bufsize}", end="\r")
response = ser_repl.read(7)
if response != b"OK%05u" % bufsize:
test_passed = False
print("ERROR: bad response, expecting OK%05u, got %r" % (bufsize, response))
t_end = time.time()
ser_repl.write(b"\x03") # break
t = t_end - t_start
# Print results.
print(
"DATA OUT: verify=%d, bufsize=%u, wrote %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
% (verified, bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
)
return n / t
def do_test(dev_repl, dev_data=None, time_per_subtest=1):
if dev_data is None:
print("REPL and data on", dev_repl)
ser_repl = serial.Serial(dev_repl, baudrate=115200, timeout=1)
ser_data = ser_repl
else:
print("REPL on", dev_repl)
print("data on", dev_data)
ser_repl = serial.Serial(dev_repl, baudrate=115200, timeout=1)
ser_data = serial.Serial(dev_data, baudrate=115200, timeout=1)
for test_func, test_args, bufsize in (
(read_test, (), 256),
(write_test, (True,), 128),
(write_test, (False,), 128),
):
nbuf = 128
while bufsize <= 16384:
rate = test_func(ser_repl, ser_data, bufsize, nbuf, *test_args)
bufsize *= 2
if rate:
# Adjust the amount of data based on the rate, to keep each subtest
# at around time_per_subtest seconds long.
nbuf = max(min(128, int(rate * time_per_subtest / bufsize)), 1)
ser_repl.close()
ser_data.close()
def main():
global test_passed
cmd_parser = argparse.ArgumentParser(
description="Test performance and reliability of serial port communication.",
epilog=run_tests_module.test_instance_epilog,
formatter_class=argparse.RawTextHelpFormatter,
)
cmd_parser.add_argument(
"-t",
"--test-instance",
default="a0",
help="MicroPython instance to test",
)
cmd_parser.add_argument(
"--time-per-subtest", default="1", help="approximate time to take per subtest (in seconds)"
)
args = cmd_parser.parse_args()
dev_repl = run_tests_module.convert_device_shortcut_to_real_device(args.test_instance)
test_passed = True
try:
do_test(dev_repl, None, float(args.time_per_subtest))
except TestError as er:
test_passed = False
print("ERROR:", er)
if not test_passed:
sys.exit(1)
if __name__ == "__main__":
main()