firmware/shared/scanner.py
2026-06-24 11:12:35 -04:00

478 lines
17 KiB
Python

# (c) Copyright 2023 by Coinkite Inc. This file is covered by license found in COPYING-CC.
#
# scanner.py - QR scanner submodule. Low level hardware stuff only.
#
import utime
import uasyncio as asyncio
from struct import pack, unpack
from utils import B2A
from imptask import IMPT
from queues import Queue
from bbqr import BBQrState, BBQrPsramStorage
def calc_bcc(msg):
bcc = 0
for c in msg:
bcc ^= c
return bytes([bcc])
def wrap(body, fid=0):
# wrap w/ their weird serial framing
# - serial port doesn't always need this! just send the string, but
# then response is unwrapped as well, so no checksums.
body = body if isinstance(body, bytes) else body.encode('ascii')
rv = pack('>bH', fid, len(body)) + body
return b'\x5A' + rv + calc_bcc(rv) + b'\xA5' # STX ... ETX
def unwrap_hdr(packed):
# just get out the length, no exceptions
stx, fid, mlen = unpack('>bbH', packed[0:4])
if stx != 0x5A or fid not in (1, 2):
return -1
return mlen + 6
def unwrap(packed):
# read back values
stx, fid, mlen = unpack('>bbH', packed[0:4])
assert stx == 0x5A, 'framing: STX'
assert fid == 1, 'not resp'
body = packed[4:4+mlen]
got_bcc, etx = packed[4+mlen:4+mlen+2]
assert etx == 0xA5, 'framing: ETX'
expect = calc_bcc(packed[1:4+mlen])
assert got_bcc == expect[0], 'bad BCC'
# return decoded body, and any extra bytes following it
return body, packed[4+mlen+2:]
# this is wrap(b'\x90\x00', fid=1) ... 9000 is ACK. silence is NACK
OKAY = b'Z\x01\x00\x02\x90\x00\x93\xa5'
RAW_OKAY = b'\x90\x00'
LEN_OKAY = const(8)
# possible baud rates; unfortunately 115,200 doesn't appear to work
SLOW_BAUD = const(9600)
FAST_BAUD = const(57600)
RX_BUF_SIZE = const(4350) # big enough for full v40 decoded
# TODO: constructor should avoid full setup until after login; after setup,
# command sleep is the known low-power state.
class QRScanner:
def __init__(self):
self.busy_scanning = False
self.scan_light = False # is light on during scanning?
self.version = None
self.setup_done = False
self.needs_reinit = False
self.sleep_seq = 0
# hodl this lock when communicating w/ QR scanner
self.lock = asyncio.Lock()
start_delay = self.hardware_setup()
# from https://github.com/peterhinch/micropython-async/blob/master/v3/as_demos/auart_hd.py
self.stream = asyncio.StreamReader(self.serial, {})
# needs 2+ seconds of recovery time after reset, so watch that
asyncio.create_task(self.setup_task(start_delay))
def hardware_setup(self):
# setup hardware, reset scanner and return time to delay until ready
from machine import UART, Pin
self.serial = UART(2, SLOW_BAUD, rxbuf=RX_BUF_SIZE)
self.reset = Pin('QR_RESET', Pin.OUT_OD, value=1)
self.trigger = Pin('QR_TRIG', Pin.OUT_OD, value=1) # wasn't needed
self.pulse_reset()
# needs full 2 seconds of recovery time after reset
return 2
def pulse_reset(self):
# RESET is active low (open drain). Keep it as a pulse; module docs
# describe low on this pin as wake-up, so don't use it as parking state.
self.reset(0)
utime.sleep_ms(10)
self.reset(1)
self.needs_reinit = False
def set_baud(self, br):
# change serial port baud rate
self.serial.init(br, rxbuf=RX_BUF_SIZE)
async def probe_baud(self):
# see what baud it's running at, and first contact
for baud in [FAST_BAUD, SLOW_BAUD]:
self.set_baud(baud)
try:
# get b'V2.3.0.7\r\n' or similar
rx = await self.txrx('T_OUT_CVER', timeout=250)
self.version = rx.decode().strip()
return baud
except Exception as exc:
#import sys; sys.print_exception(exc)
#print('fail @ %d: %s' % (baud, exc))
pass
return None
async def setup_task(self, start_delay):
# Task to setup device, and then die.
async with self.lock:
for attempt in range(3):
await asyncio.sleep(start_delay)
try:
await self._configure()
except Exception:
# a step failed or timed out (would have left scanner dead
# until next boot); reset module and start over
await self.blind_shutdown()
if attempt == 2:
break
start_delay = self.reset_stream()
continue
self.setup_done = True
await self.goto_sleep()
return
self.mark_needs_reinit()
def reset_stream(self):
self.sleep_seq += 1
start_delay = self.hardware_setup()
self.stream = asyncio.StreamReader(self.serial, {})
return start_delay
def mark_needs_reinit(self):
self.setup_done = False
self.version = None
self.needs_reinit = True
if hasattr(self, 'reset'):
self.reset(1)
async def blind_shutdown(self):
for baud in (SLOW_BAUD, FAST_BAUD):
self.set_baud(baud)
await self.tx('S_CMD_020D') # return to "Command mode"
await asyncio.sleep_ms(20)
await self.tx('S_CMD_03L0') # turn off bright light
await asyncio.sleep_ms(20)
await self.tx('SRDF0050') # sleep scanner
await asyncio.sleep_ms(150)
await self.tx('SRDF0050')
await asyncio.sleep_ms(20)
async def _configure(self):
# full config sequence; any step may raise on timeout/framing error
# might need to repeat a few time to get into right state
for retry in range(5):
baud = await self.probe_baud()
if baud: break
else:
#print("QR Scanner: missing")
raise RuntimeError('no contact')
try:
await self.txrx('S_CMD_FFFF') # factory reset of settings
except RuntimeError:
await asyncio.sleep_ms(1000)
for retry in range(5):
baud = await self.probe_baud()
if baud: break
else:
raise RuntimeError('no contact after S_CMD_FFFF')
# go to high speed!
if baud != FAST_BAUD:
await self.txrx('S_CMD_H3BR%d' % FAST_BAUD)
self.set_baud(FAST_BAUD)
# configure it like we want it
await self.txrx('S_CMD_MTRS5000') # 5s to read before fail (unused)
await self.txrx('S_CMD_MT11') # trigger is edge-based (not level)
await self.txrx('S_CMD_MT30') # Same code reading without delay
await self.txrx('S_CMD_MT20') # Enable automatic sleep when idle
await self.txrx('S_CMD_MTRF500') # Idle time: 500ms
await self.txrx('S_CMD_059A') # add CR LF after QR data (important)
await self.txrx('S_CMD_03L0') # light off all the time by default
await self.txrx('S_CMD_0407') # turn on signal for our yellow led
# settings under continuous scan mode
await self.txrx('S_CMD_MARS0000') # "Modify the duration of single code reading" (ms)
await self.txrx('S_CMD_MARR000') # "Modify the time of the reading interval 0ms"
await self.txrx('S_CMD_MA31') # Enable "Same code reading delay"
await self.txrx('S_CMD_MARI0050') # "Modify the same code reading delay 50ms"
# these aren't useful (yet?) and just make things harder to decode.
#await self.txrx('S_CMD_05F1') # add all information on
#await self.txrx('S_CMD_05L1') # output decoding length info on
#await self.txrx('S_CMD_05S1') # STX start char
#await self.txrx('S_CMD_05C1') # CodeID+prefix
#await self.txrx('S_CMD_0501') # prefix on
#await self.txrx('S_CMD_0506') # suffix
#await self.txrx('S_CMD_05D0') # tx total data
# prevent scanning magic QR to affect settings
await self.txrx('S_CMD_0000') # close setting codes
async def scan_once(self):
# Blocks until something is scanned. Returns it as string
# - will keep scanning if BBRq detected
# - updates UX via BBQrState.collect() while BBQr is being received
# - returns a BBQr object at that point
self.scan_light = False
if self.needs_reinit:
try:
await self.setup_task(self.reset_stream())
if self.setup_done:
await asyncio.sleep_ms(200)
except asyncio.CancelledError:
await self.blind_shutdown()
self.mark_needs_reinit()
return None
# wait for reset process to complete (can be an issue right after boot)
# - few seconds of boot time needed
for retry in range(10):
if self.setup_done: break
await asyncio.sleep(.25)
else:
return 'Scanner missing!'
storage = BBQrPsramStorage()
bbqr = BBQrState(storage)
async with self.lock:
self.busy_scanning = True
await self.wakeup()
# begin scan, in continuous mode
await self.tx('S_CMD_020E') # Continuous scanning mode start
try:
while 1:
rv = await self._readline()
if not rv: continue
if rv[0:2] == 'B$' and bbqr.collect(rv):
# BBQr protocol detected, accepted need to collect more data
continue
break
except asyncio.CancelledError:
#print('scan cancel')
return None
finally:
# Problem: another valid scan can come in just as we are trying
# to get out of scanner mode
for retry in range(3):
try:
await self.txrx('S_CMD_020D', timeout=1000) # return to "Command mode"
await self.txrx('S_CMD_03L0', timeout=1000) # turn off bright light
#print('rest after %d retries' % retry)
break
except Exception:
pass
await asyncio.sleep_ms(50)
else:
#print('reset failed')
await self.blind_shutdown()
self.mark_needs_reinit()
if self.setup_done:
await self.goto_sleep()
self.busy_scanning = False
# return BBQr object or string if simple QR
return bbqr if bbqr.is_complete() else rv
async def _readline(self):
# overridden in simulator
# - blocks for QR to be seen
# - must trim newline(s)
# - must convert to str
rv = await self.stream.readline()
# must remove OKAY response we see, these happen
# in response to light on/off cmds during scanning
# - because binary, won't happen in body of QR
rv = rv.replace(RAW_OKAY, b'')
try:
return rv.rstrip().decode()
except UnicodeError:
# probably binary QR, but we arent prepared for that and
# so framing is uncertain, we may have damaged 0x9000 sequences, etc
#print("Bin?: " + repr(rv))
return '(unsupported binary QR)'
async def wakeup(self):
# send specific command until it responds
# - it will wake on any command, but not instant
# - first one seems to fail 100%
self.sleep_seq += 1
await self.tx('SRDF0051') # blindly at first
for retry in range(5):
try:
await self.txrx('SRDF0051', timeout=50) # 50 ok, 20 too short
return
except Exception:
# first try usually fails, that's okay... its asleep and groggy
pass
async def goto_sleep(self):
# Had to decode hex to get this command! Does work tho, current consumption
# is near zero, and wakeup is near instant
# - need blind retries here
# - might be two layers of sleep, and we need this second command after the first
# - helps to turn off the yellow LED, and save power as well
self.sleep_seq += 1
sleep_seq = self.sleep_seq
await self.tx('SRDF0050')
async def later():
await asyncio.sleep_ms(150)
if sleep_seq != self.sleep_seq or self.busy_scanning:
return
await self.tx('SRDF0050')
asyncio.create_task(later())
async def flush_junk(self):
while n := self.stream.s.any():
junk = await self.stream.readexactly(n)
#print('Scan << (junk) ' + B2A(junk))
async def tx(self, msg):
# Send a command, don't wait for response
# - by sending these without binary wrapper, we get back a shorter reply,
# just: RAW_OKAY
# - which we can easily filter out of any QR data we get back at the same time
# - do not use async self.stream because other tasks may be using it
#print('tx >> ' + msg)
self.serial.write(msg)
async def readexactly_timeout(self, num, timeout, msg=None):
# Avoid asyncio.wait_for_ms here: it can leave the scanner setup task
# stuck after a CancelledError. Convert scanner silence into a normal
# retryable command failure instead.
if timeout is None:
return await self.stream.readexactly(num)
start = utime.ticks_ms()
while self.stream.s.any() < num:
if utime.ticks_diff(utime.ticks_ms(), start) >= timeout:
#print("no rx after %s" % msg)
raise RuntimeError
await asyncio.sleep_ms(5)
return await self.stream.readexactly(num)
async def txrx(self, msg, timeout=250):
# Send a command, get the corresponding response.
# - has a long timeout, collects rx based on framing
# - but optimized for normal case, which is just "ok" back
# - outgoing messages are text, and we wrap that w/ binary framing
# - doing the binary wrap will cause the longer response w/ framing
# - ignore QR data (text+\r\n) and RAW_OKAY packets ... they are not us
# flush pending (but QR could still happen)
await self.flush_junk()
# Send the command
#print('txrx >> ' + msg)
self.stream.write(wrap(msg))
await self.stream.drain()
# Read until the first response is consumed
expect = LEN_OKAY
rx = b''
while 1:
rx += await self.readexactly_timeout(expect, timeout, msg)
#print('txrx << ' + B2A(rx))
if rx == OKAY:
# good path
return
# attempt to unframe
mlen = unwrap_hdr(rx)
if mlen < 0:
# framing issue, but we can fix maybe.
if b'\r\n' in rx:
# trim QR code(s) that might be at beginning of buffer
pos = rx.rindex(b'\n')
rx = rx[pos+1:]
while rx.startswith(RAW_OKAY):
# earlier bare commands' ACK's, remove them
rx = rx[2:]
mlen = unwrap_hdr(rx)
if mlen < 0:
# framing issue, must be part way thru a QR
#print('Framing prob (cmd=%s): %s=%s' % (msg, rx, B2A(rx)))
rx = b''
expect = LEN_OKAY
continue
more = mlen - len(rx)
if more > 0:
expect = more
continue
try:
body, extra = unwrap(rx)
if extra:
raise RuntimeError("extra at end")
return body
except Exception as exc:
# this generally does not happen with above complexity in place
#print("bad frame after %s" % msg)
raise RuntimeError
def torch_control_sync(self, on):
# sync wrapper
asyncio.create_task(self.torch_control(on))
async def torch_control(self, on):
# be an expensive flashlight
# - S_CMD_03L1 => always light
# - S_CMD_03L2 => when needed
# - S_CMD_03L0 => no
if not self.version:
return
if self.busy_scanning:
# during scanning, toggle light state, so they don't need to hold it down
if on:
self.scan_light = not self.scan_light
await self.tx('S_CMD_03L%d' % (2 if self.scan_light else 0))
return
async with self.lock:
await self.wakeup()
await self.txrx('S_CMD_03L%d' % (1 if on else 0))
if not on:
# sleep module too
await self.goto_sleep()
# EOF