micropython/tests/multi_extmod/machine_can_02_rx_callback.py
Angus Gratton 6cac2d275d stm32: Add machine.CAN implementation.
Implemented according to API docs in a parent comment.

Adds new multi_extmod/machine_can_* tests which pass when testing between
NUCLEO_G474RE, NUCLEO_H723ZG and PYBDV11.

This work was mostly funded through GitHub Sponsors.

Signed-off-by: Angus Gratton <angus@redyak.com.au>
2026-03-19 17:36:50 +11:00

123 lines
3.7 KiB
Python

from machine import CAN
import time
# Test the CAN.IRQ_RX irq handler, including overflow
rx_overflow = False
rx_full = False
received = []
# CAN IDs
ID_SPAM = 0x345 # messages spammed into the receive FIFO
ID_ACK_OFLOW = 0x055 # message the receiver sends after it's seen an overflow
ID_AFTER = 0x100 # message the sender sends after the ACK
can = CAN(1, 500_000)
# A very basic "soft" receiver handler that stores received messages into a global list
def receiver_irq_recv(can):
global rx_overflow, rx_full
assert can.irq().flags() & can.IRQ_RX # the only enabled IRQ
can_id, data, _flags, errors = can.recv()
received.append((can_id, None))
# The FIFO is expected not to overflow by itself, wait until 40 messages
# have been received and then block the receive handler to induce an overflow
if len(received) == 40:
assert not rx_overflow # shouldn't have already happened, either
time.sleep_ms(500)
if not rx_overflow and (errors & CAN.RECV_ERR_OVERRUN):
# expected this should happen on the very next message after
# the one where we slept for 500ms
print("irq_recv overrun", len(received))
received.clear() # check we still get some messages, see rx_spam print line below
rx_overflow = True
# also expect the FIFO to be FULL again immediately after overrunning and rx_overflow event
if rx_overflow and (errors & CAN.RECV_ERR_OVERRUN | CAN.RECV_ERR_FULL) == CAN.RECV_ERR_FULL:
rx_full = True
# Receiver
def instance0():
can.irq(receiver_irq_recv, trigger=can.IRQ_RX, hard=False)
can.set_filters(None) # receive all
multitest.next()
while not rx_overflow:
pass # Resume ASAP after FIFO0 overflows
can.send(ID_ACK_OFLOW, b"overflow")
# at least one ID_SPAM message should have been received
# *after* we overflowed and 'received' was clear in the irq handler
print("rx_spam", any(r[0] == ID_SPAM for r in received))
# wait until the "after" message is received
for n in range(100):
if any(r[0] == ID_AFTER for r in received):
break
time.sleep_ms(10)
can.irq(None) # disable the IRQ
received.clear()
# at some point while waiting for ID_AFTER the FIFO should have gotten
# full again
print("rx_full", rx_full)
# now IRQ is disabled, no new messages should be received
time.sleep_ms(250)
print("len", len(received))
received_ack = False
# reusing the result buffer so sender_irq_recv can be 'hard'
sender_irq_result = [None, memoryview(bytearray(64)), None, None]
def sender_irq_recv(can):
global received_ack
assert can.irq().flags() & can.IRQ_RX # the only enabled IRQ
can_id, data, _flags, _errors = can.recv(sender_irq_result)
print("sender_irq_recv", can_id, len(data)) # should be ID_ACK_OFLOW and "overflow" payload
received_ack = True
# Sender
def instance1():
can.irq(sender_irq_recv, CAN.IRQ_RX, hard=True)
can.set_filters(None)
multitest.next()
# Spam out messages until the receiver tells us its RX FIFO is full.
#
# The RX FIFO on the receiver can vary from 3 deep (BXCAN) to 25 deep (STM32H7),
# so we keep sending to it until we see a CAN message on ID_ACK_OFLOW indicating
# the receiver's FIFO has overflowed
while not received_ack:
for i in range(255):
while can.send(ID_SPAM, bytes([i] * 8)) is None and not received_ack:
# Don't overflow the TX FIFO
time.sleep_ms(1)
if received_ack:
break
# give the receiver some time to make space in the FIFO
time.sleep_ms(200)
# send the final message, the receiver should get this one
can.send(ID_AFTER, b"aaaaa")