This commit is contained in:
Peter D. Gray 2023-11-14 14:02:36 -05:00 committed by scgbckbone
parent d3833180e8
commit 73af932419

View File

@ -1,6 +1,6 @@
# (c) Copyright 2023 by Coinkite Inc. This file is covered by license found in COPYING-CC.
#
# scanner.py - QR scanner submodule on Q. Low level hardware stuff only.
# scanner.py - QR scanner submodule. Low level hardware stuff only.
#
import utime
import uasyncio as asyncio
@ -23,6 +23,13 @@ def wrap(body, fid=0):
rv = pack('>bH', fid, len(body)) + body
return b'\x5A' + rv + calc_bcc(rv) + b'\xA5' # STX ... ETX
def unwrap_hdr(packed):
# just get out the length
stx, fid, mlen = unpack('>bbH', packed[0:4])
if stx != 0x5A or fid not in (1, 2):
return -1
return mlen + 6
def unwrap(packed):
# read back values
stx, fid, mlen = unpack('>bbH', packed[0:4])
@ -50,25 +57,27 @@ LEN_OKAY = const(8)
class QRScanner:
def __init__(self):
self.q = None
self._scan_task = None
self.lock = asyncio.Lock()
self.busy_scanning = False
from machine import UART, Pin
self.serial = UART(2, 9600)
self.reset = Pin('QR_RESET', Pin.OUT_OD)
self.trigger = Pin('QR_TRIG', Pin.OUT_OD)
self.reset = Pin('QR_RESET', Pin.OUT_OD, value=0)
self.trigger = Pin('QR_TRIG', Pin.OUT_OD, value=1) # wasn't needed
# trigger/reset are active low (open drain)
self.trigger(1)
# from https://github.com/peterhinch/micropython-async/blob/master/v3/as_demos/auart_hd.py
self.stream = asyncio.StreamReader(self.serial, {})
# NOTE: reset is active low (open drain)
self.reset(0)
utime.sleep_ms(10)
self.reset(1)
self.sr = asyncio.StreamReader(self.serial)
# needs 2+ seconds of recovery time after reset
self.version = None
# needs 2+ seconds of recovery time after reset, so watch that
self.setup_done = False
asyncio.create_task(self.setup_task())
async def setup_task(self):
@ -88,126 +97,121 @@ class QRScanner:
#print("QR Scanner: missing")
# configure it like we want it
#self.tx('T_CMD...'
await self.tx('S_CMD_FFFF') # factory reset of settings
await self.tx('S_CMD_MTRS5000') # 5s to read before fail
await self.tx('S_CMD_MT11') # trigger is edge-based (not level)
await self.tx('S_CMD_MT30') # Same code reading without delay
await self.tx('S_CMD_MT20') # Enable automatic sleep when idle
await self.tx('S_CMD_MTRF500') # Idle time: 500ms
await self.tx('S_CMD_059A') # add CR LF after QR data
await self.sleep()
self.setup_done = True
def hw_scan(self):
if self.trigger() == 0:
# need to release/re-press
self.trigger(1)
utime.sleep_ms(100)
self.trigger(0);
async def _read_results(self):
# be a task that reads incoming QR codes from scanner (already in operation)
# - will be canceled when done/stopping
# - qr data is bare, (not wrapped) with CR at end ... so not gonna work w/ binary data
ln = bytearray()
while 1:
print('rzzz')
ch = await self.sr.read(1)
print('got %r' % ch)
if ch == b'\r':
print('Scan RX: ' + B2A(ln))
await self.q.put(ln)
ln = bytearray()
else:
ln.append(ch[0])
await self.goto_sleep()
async def scan_start(self, test=0):
# returns a Q we append to as results come in
async def scan_once(self):
# blocks until something is scanned. returns it
# wait for reset process to complete (can be an issue right after boot)
while not self.version:
await asyncio.sleep(.25)
print('wait')
async with self.lock:
self.busy_scanning = True
await self.wakeup()
await self.tx('S_CMD_020D')
# wait for reset process to complete (can be an issue right after boot)
while not self.setup_done:
print('wait for setup')
await asyncio.sleep(.25)
self.q = rv = Queue()
self._scan_task = asyncio.create_task(self._read_results())
await self.wakeup()
await self.tx('S_CMD_020D')
# begin scan
await self.tx('SR030301')
#await self.tx('S_CMD_05F1') # add all information on
#await self.tx('S_CMD_05L1') # output decoding length info on
#await self.tx('S_CMD_05S1') # STX start char
#await self.tx('S_CMD_05C1') # CodeID+prefix
#await self.tx('S_CMD_0501') # prefix on
#await self.tx('S_CMD_0506') # suffix
#await self.tx('S_CMD_05D0') # tx total data
if test:
await asyncio.sleep(test)
await self.scan_stop()
# begin scan
await self.tx('SR030301')
try:
rv = await self.stream.readline()
except asyncio.CancelledError:
rv = None
finally:
await self.tx('SR030300')
await self.goto_sleep()
self.busy_scanning = False
return rv
async def scan_stop(self):
# stop scanning
if self._scan_task:
self._scan_task.cancel()
self._scan_task = None
self.q = None
await self.tx('SR030300')
await self.sleep()
def rx(self):
# untested
return self.serial.read()
async def wakeup(self):
# send specific command until it responds
# - it will wake on any command, but not instant
for retry in range(3):
# - first one seems to fail 100%
for retry in range(5):
try:
await self.tx('SRDF0051')
await self.tx('SRDF0051', 50) # 50 ok, 20 too short
return
except: pass
except:
# first try usually fails, that's okay... its asleep and groggy
pass
print("unable to wake QR")
async def sleep(self):
async def goto_sleep(self):
# Had to decode hex to get this command! Does work tho, current consumption
# is near zero, and wakeup is instant
await self.tx('SRDF0050')
async def tx(self, msg):
# send a command, get response
async def tx(self, msg, timeout=250):
# Send a command, get the response.
# - has a long timeout, collects rx based on framing
print('Scan >> ' + msg)
# - but optimized for normal case, which is just "ok" back
# - out going messages are text, and we wrap that w/ binary framing
if msg is not None:
self.sr.write(wrap(msg))
await self.sr.drain()
# fix framing by clearing anything already there before command
while n := self.stream.s.any():
junk = await self.stream.readexactly(n)
print('Scan << (junk) ' + B2A(junk))
print('Scan >> ' + msg)
self.stream.write(wrap(msg))
await self.stream.drain()
# read until ETX=0xA5 is seen
rx = bytearray()
expect = LEN_OKAY
rx = b''
while 1:
try:
h = await asyncio.wait_for_ms(self.sr.read(-1), 500)
rx += await asyncio.wait_for_ms(self.stream.readexactly(expect), timeout)
except asyncio.TimeoutError:
if rx: break
raise RuntimeError("rx timeout")
if timeout is None:
continue
print("read t/o")
raise RuntimeError("no rx")
if h:
rx.extend(h)
if h[-1] == 0xA5:
break
print('Scan << ' + B2A(rx))
print('Scan << ' + B2A(rx))
if rx == OKAY:
# - can get scan data ahead of OK msg sometimes, so ignore any prefix
return
if rx[-LEN_OKAY:] == OKAY:
# - can get scan data ahead of OK msg sometimes, so ignore any prefix
return
if rx == RAW_OKAY:
# - sometimes? get this bare (unwrapped), in response to SRDF0051 (wakeup)
return
if rx == RAW_OKAY:
# I get this bare (unwrapped), in response to SRDF0051 (wakeup)
return
mlen = unwrap_hdr(rx)
if mlen < 0:
# framing issue
print('Framing prob: %s=%s' % (rx, B2A(rx)))
break
more = mlen - len(rx)
if more <= 0: break
expect = more
try:
body, extra = unwrap(rx)
@ -218,19 +222,28 @@ class QRScanner:
print("Bad Rx: " + B2A(rx))
print(" exc: %s" % exc)
async def torch(self, on):
def torch_control_sync(self, on):
# sync wrapper
asyncio.create_task(self.torch_control(on))
async def torch_control(self, on):
# be an expensive flashlight
# - S_CMD_03L1 => always light
# - S_CMD_03L2 => when needed
if not self.version:
return
await self.wakeup()
if self.busy_scanning:
# do nothing if scanning already
return
await self.tx('S_CMD_03L%d' % (1 if on else 2))
async with self.lock:
if not on:
# sleep module too
await self.sleep()
await self.wakeup()
await self.tx('S_CMD_03L%d' % (1 if on else 2))
if not on:
# sleep module too
await self.goto_sleep()
# EOF