QR scanner support

This commit is contained in:
Peter D. Gray 2023-02-14 16:06:39 -05:00 committed by scgbckbone
parent a2452767cb
commit f8a4166c6c
5 changed files with 128 additions and 26 deletions

View File

@ -2123,8 +2123,13 @@ Secure Elements:
{se}
'''
await ux_show_story(msg.format(rel=rel, built=built, bl=bl, chk=chk, se=se,
ser=serial, hw=hw))
msg = msg.format(rel=rel, built=built, bl=bl, chk=chk,
se=se, ser=serial, hw=hw)
if version.has_qr:
from glob import SCAN
msg += '\nQR Scanner:\n %s\n' % (SCAN.version or 'missing')
await ux_show_story(msg)
async def ship_wo_bag(*a):
# Factory command: for dev and test units that have no bag number, and never will.
@ -2250,7 +2255,7 @@ async def change_seed_vault(is_enabled):
if (not is_enabled) and settings.master_get('seeds'):
# problem: they still have some seeds... also this path blocks
# disable from within a tmp seed
# disable from within a tmp seed
settings.set('seedvault', 1) # restore it
await ux_show_story("Please remove all seeds from the vault before disabling.")

View File

@ -26,5 +26,7 @@ VD = None
# NFC interface (Mk4, and can be disabled)
NFC = None
# QR scanner (Q1 only)
SCAN = None
# EOF

View File

@ -51,6 +51,7 @@ class FullKeyboard(NumpadBase):
self.lcd_tear = Pin('LCD_TEAR', Pin.IN)
self.lcd_tear.irq(self._measure_irq, trigger=Pin.IRQ_RISING, hard=False)
self.torch_on = False
# ready to start
def power_press(self, pin):
@ -131,6 +132,14 @@ class FullKeyboard(NumpadBase):
# - not trying to support multiple presses, just one
for kn in range(NUM_ROWS * NUM_COLS):
if self.is_pressed[kn]:
if kn == KEYNUM_LAMP:
if not self.torch_on:
# handle light button right here and now
from glob import SCAN
self.torch_on = True
call_later_ms(0, SCAN.torch, 1)
continue
# indicated key was found to be down and then back up
key = DECODER[kn]
if key != self.key_pressed:
@ -139,6 +148,11 @@ class FullKeyboard(NumpadBase):
self.lp_time = utime.ticks_ms()
if self.torch_on and not self.is_pressed[KEYNUM_LAMP]:
from glob import SCAN
self.torch_on = False
call_later_ms(0, SCAN.torch, 0)
none_active = (sum(self.is_pressed) == 0)
if none_active:
if self.key_pressed:

View File

@ -16,4 +16,7 @@ def init0():
mk4_init0()
from scanner import QRScanner
glob.SCAN = QRScanner()
# EOF

View File

@ -1,20 +1,45 @@
# (c) Copyright 2023 by Coinkite Inc. This file is covered by license found in COPYING-CC.
#
# scanner.py - QR scanner submodule on Q1 (only)
# scanner.py - QR scanner submodule on Q1
#
import utime
from struct import pack
import uasyncio as asyncio
from struct import pack, unpack
from utils import B2A
from imptask import IMPT
def calc_bcc(msg):
bcc = 0
for c in msg:
bcc ^= c
return bytes([bcc])
def wrap(body, fid=0):
# wrap w/ their weird framing
# LATER: USB? serial port doesn't need this! just send the string!
# wrap w/ their weird serial framing
# - serial port doesn't always need this! just send the string, but
# then response is unwrapped as well, so no checksums.
body = body if isinstance(body, bytes) else body.encode('ascii')
rv = pack(b'>bH', fid, len(body)) + body
bcc = 0
for c in rv:
bcc ^= c
return b'\x5A' + rv + bytes([bcc]) + b'\xA5' # STX ... ETX
rv = pack('>bH', fid, len(body)) + body
return b'\x5A' + rv + calc_bcc(rv) + b'\xA5' # STX ... ETX
def unwrap(packed):
# read back values
stx, fid, mlen = unpack('>bbH', packed[0:4])
assert stx == 0x5A, 'framing: STX'
assert fid == 1, 'not resp'
body = packed[4:4+mlen]
got_bcc, etx = packed[4+mlen:4+mlen+2]
assert etx == 0xA5, 'framing: ETX'
expect = calc_bcc(packed[1:4+mlen])
assert got_bcc == expect[0], 'bad BCC'
# return decoded body, and any extra bytes following it
return body, packed[4+mlen+2:]
# this is wrap(b'\x90\x00', fid=1) ... 9000 is ACK. silence is NACK
OKAY = b'Z\x01\x00\x02\x90\x00\x93\xa5'
class QRScanner:
@ -31,28 +56,81 @@ class QRScanner:
utime.sleep_ms(10)
self.reset(1)
self.sr = asyncio.StreamReader(self.serial)
# needs 2+ seconds of recovery time after reset
self.version = None
asyncio.create_task(self.setup_task())
async def setup_task(self):
# setup device, and then stop
await asyncio.sleep(2)
while self.serial.read():
pass # ignore old data
try:
# get b'V2.3.0.7\r\n' or similar
rx = await self.tx('T_OUT_CVER')
self.version = rx.decode().strip()
print("QR Scanner: " + self.version)
except:
raise
print("QR Scanner: missing")
# configure it like we want it
#self.tx('T_CMD...'
await self.tx('S_CMD_MTRS5000') # 5s to read before fail
await self.tx('S_CMD_MT10') # trigger is level-based (not edge)
await self.tx('S_CMD_MT30') # Same code reading withou delay
await self.tx('S_CMD_MT20') # Enable automatic sleep when idle
def scan(self):
if q.trigger() == 0:
if self.trigger() == 0:
# need to release/re-press
q.trigger(1)
self.trigger(1)
utime.sleep_ms(100)
q.trigger(0);
self.trigger(0);
def rx(self):
# untested
return self.serial.read()
def tx(self, m):
# not working
x = wrap(m)
print('Sending: ' + B2A(x))
self.serial.write(x)
async def tx(self, msg):
# send a command, get response
# - has a long timeout, collects rx based on framing
self.sr.write(wrap(msg))
await self.sr.drain()
def test(self):
m = bytes([90, 0, 0, 10, 95, 67, 77, 68, 95, 48, 48, 48, 49, 18, 165])
self.serial.write(m)
return self.serial.read()
q = QRScanner()
# read until ETX=0xA5 is seen
rx = bytearray()
while 1:
try:
h = await asyncio.wait_for_ms(self.sr.read(-1), 500)
except asyncio.TimeoutError:
raise RuntimeError("rx timeout")
if h:
rx.extend(h)
if h[-1] == 0xA5:
break
if rx == OKAY:
return
try:
body, extra = unwrap(rx)
if extra:
raise RuntimeError("extra at end")
return body
except:
print("Bad Rx: " + B2A(rx))
async def torch(self, on):
# be an expensive flashlight
# - S_CMD_03L1 => always light
# - S_CMD_03L2 => when needed
if not self.version: return
await self.tx('S_CMD_03L%d' % (1 if on else 2))
# EOF