From f8a4166c6c3d7a22e8a7c9af565d770a033c62c6 Mon Sep 17 00:00:00 2001 From: "Peter D. Gray" Date: Tue, 14 Feb 2023 16:06:39 -0500 Subject: [PATCH] QR scanner support --- shared/actions.py | 11 ++-- shared/glob.py | 2 + shared/keyboard.py | 14 +++++ shared/q1.py | 3 ++ shared/scanner.py | 124 ++++++++++++++++++++++++++++++++++++--------- 5 files changed, 128 insertions(+), 26 deletions(-) diff --git a/shared/actions.py b/shared/actions.py index 4503b616..bcf18460 100644 --- a/shared/actions.py +++ b/shared/actions.py @@ -2123,8 +2123,13 @@ Secure Elements: {se} ''' - await ux_show_story(msg.format(rel=rel, built=built, bl=bl, chk=chk, se=se, - ser=serial, hw=hw)) + msg = msg.format(rel=rel, built=built, bl=bl, chk=chk, + se=se, ser=serial, hw=hw) + if version.has_qr: + from glob import SCAN + msg += '\nQR Scanner:\n %s\n' % (SCAN.version or 'missing') + + await ux_show_story(msg) async def ship_wo_bag(*a): # Factory command: for dev and test units that have no bag number, and never will. @@ -2250,7 +2255,7 @@ async def change_seed_vault(is_enabled): if (not is_enabled) and settings.master_get('seeds'): # problem: they still have some seeds... also this path blocks - # disable from within a tmp seed + # disable from within a tmp seed settings.set('seedvault', 1) # restore it await ux_show_story("Please remove all seeds from the vault before disabling.") diff --git a/shared/glob.py b/shared/glob.py index e332fe99..5af629cc 100644 --- a/shared/glob.py +++ b/shared/glob.py @@ -26,5 +26,7 @@ VD = None # NFC interface (Mk4, and can be disabled) NFC = None +# QR scanner (Q1 only) +SCAN = None # EOF diff --git a/shared/keyboard.py b/shared/keyboard.py index 8d908c08..03f9b7e0 100644 --- a/shared/keyboard.py +++ b/shared/keyboard.py @@ -51,6 +51,7 @@ class FullKeyboard(NumpadBase): self.lcd_tear = Pin('LCD_TEAR', Pin.IN) self.lcd_tear.irq(self._measure_irq, trigger=Pin.IRQ_RISING, hard=False) + self.torch_on = False # ready to start def power_press(self, pin): @@ -131,6 +132,14 @@ class FullKeyboard(NumpadBase): # - not trying to support multiple presses, just one for kn in range(NUM_ROWS * NUM_COLS): if self.is_pressed[kn]: + if kn == KEYNUM_LAMP: + if not self.torch_on: + # handle light button right here and now + from glob import SCAN + self.torch_on = True + call_later_ms(0, SCAN.torch, 1) + continue + # indicated key was found to be down and then back up key = DECODER[kn] if key != self.key_pressed: @@ -139,6 +148,11 @@ class FullKeyboard(NumpadBase): self.lp_time = utime.ticks_ms() + if self.torch_on and not self.is_pressed[KEYNUM_LAMP]: + from glob import SCAN + self.torch_on = False + call_later_ms(0, SCAN.torch, 0) + none_active = (sum(self.is_pressed) == 0) if none_active: if self.key_pressed: diff --git a/shared/q1.py b/shared/q1.py index df5eaae5..26d2ea61 100644 --- a/shared/q1.py +++ b/shared/q1.py @@ -16,4 +16,7 @@ def init0(): mk4_init0() + from scanner import QRScanner + glob.SCAN = QRScanner() + # EOF diff --git a/shared/scanner.py b/shared/scanner.py index b32e92fb..d887536b 100644 --- a/shared/scanner.py +++ b/shared/scanner.py @@ -1,20 +1,45 @@ # (c) Copyright 2023 by Coinkite Inc. This file is covered by license found in COPYING-CC. # -# scanner.py - QR scanner submodule on Q1 (only) +# scanner.py - QR scanner submodule on Q1 # import utime -from struct import pack +import uasyncio as asyncio +from struct import pack, unpack from utils import B2A +from imptask import IMPT + +def calc_bcc(msg): + bcc = 0 + for c in msg: + bcc ^= c + return bytes([bcc]) def wrap(body, fid=0): - # wrap w/ their weird framing - # LATER: USB? serial port doesn't need this! just send the string! + # wrap w/ their weird serial framing + # - serial port doesn't always need this! just send the string, but + # then response is unwrapped as well, so no checksums. body = body if isinstance(body, bytes) else body.encode('ascii') - rv = pack(b'>bH', fid, len(body)) + body - bcc = 0 - for c in rv: - bcc ^= c - return b'\x5A' + rv + bytes([bcc]) + b'\xA5' # STX ... ETX + rv = pack('>bH', fid, len(body)) + body + return b'\x5A' + rv + calc_bcc(rv) + b'\xA5' # STX ... ETX + +def unwrap(packed): + # read back values + stx, fid, mlen = unpack('>bbH', packed[0:4]) + assert stx == 0x5A, 'framing: STX' + assert fid == 1, 'not resp' + + body = packed[4:4+mlen] + got_bcc, etx = packed[4+mlen:4+mlen+2] + + assert etx == 0xA5, 'framing: ETX' + expect = calc_bcc(packed[1:4+mlen]) + assert got_bcc == expect[0], 'bad BCC' + + # return decoded body, and any extra bytes following it + return body, packed[4+mlen+2:] + +# this is wrap(b'\x90\x00', fid=1) ... 9000 is ACK. silence is NACK +OKAY = b'Z\x01\x00\x02\x90\x00\x93\xa5' class QRScanner: @@ -31,28 +56,81 @@ class QRScanner: utime.sleep_ms(10) self.reset(1) + self.sr = asyncio.StreamReader(self.serial) + + # needs 2+ seconds of recovery time after reset + self.version = None + asyncio.create_task(self.setup_task()) + + async def setup_task(self): + # setup device, and then stop + await asyncio.sleep(2) + + while self.serial.read(): + pass # ignore old data + + try: + # get b'V2.3.0.7\r\n' or similar + rx = await self.tx('T_OUT_CVER') + self.version = rx.decode().strip() + print("QR Scanner: " + self.version) + except: + raise + print("QR Scanner: missing") + + # configure it like we want it + #self.tx('T_CMD...' + await self.tx('S_CMD_MTRS5000') # 5s to read before fail + await self.tx('S_CMD_MT10') # trigger is level-based (not edge) + await self.tx('S_CMD_MT30') # Same code reading withou delay + await self.tx('S_CMD_MT20') # Enable automatic sleep when idle + def scan(self): - if q.trigger() == 0: + if self.trigger() == 0: # need to release/re-press - q.trigger(1) + self.trigger(1) utime.sleep_ms(100) - q.trigger(0); + self.trigger(0); def rx(self): # untested return self.serial.read() - def tx(self, m): - # not working - x = wrap(m) - print('Sending: ' + B2A(x)) - self.serial.write(x) + async def tx(self, msg): + # send a command, get response + # - has a long timeout, collects rx based on framing + self.sr.write(wrap(msg)) + await self.sr.drain() - def test(self): - m = bytes([90, 0, 0, 10, 95, 67, 77, 68, 95, 48, 48, 48, 49, 18, 165]) - self.serial.write(m) - return self.serial.read() - -q = QRScanner() + # read until ETX=0xA5 is seen + rx = bytearray() + while 1: + try: + h = await asyncio.wait_for_ms(self.sr.read(-1), 500) + except asyncio.TimeoutError: + raise RuntimeError("rx timeout") + + if h: + rx.extend(h) + if h[-1] == 0xA5: + break + + if rx == OKAY: + return + + try: + body, extra = unwrap(rx) + if extra: + raise RuntimeError("extra at end") + return body + except: + print("Bad Rx: " + B2A(rx)) + + async def torch(self, on): + # be an expensive flashlight + # - S_CMD_03L1 => always light + # - S_CMD_03L2 => when needed + if not self.version: return + await self.tx('S_CMD_03L%d' % (1 if on else 2)) # EOF