diff --git a/shared/actions.py b/shared/actions.py index e5e06c92..9e1cb00a 100644 --- a/shared/actions.py +++ b/shared/actions.py @@ -21,7 +21,7 @@ from glob import settings from pincodes import pa from menu import start_chooser from version import MAX_TXN_LEN -from charcodes import KEY_NFC, KEY_QR +from charcodes import KEY_NFC, KEY_QR, KEY_CANCEL CLEAR_PIN = '999999-999999' @@ -690,7 +690,8 @@ async def view_seed_words(*a): raw or sv.raw, sv.node) - msg += '\n\nPress (1) to view as QR Code.' + if not version.has_qwerty: + msg += '\n\nPress (1) to view as QR Code.' while 1: ch = await ux_show_story(msg, sensitive=True, escape='1'+KEY_QR) @@ -1343,8 +1344,11 @@ async def import_xprv(_1, _2, item): # error already displayed in nfc.py return elif choice == KEY_QR: - # TODO: scan something - pass + from ux_q1 import QRScannerInteraction + extended_key = await QRScannerInteraction.scan('Scan XPRV from a QR code') + if not extended_key: + # press pressed CANCEL + return else: # only get here if NFC was not chosen # pick a likely-looking file. @@ -1491,7 +1495,7 @@ async def import_tapsigner_backup_file(_1, _2, item): meta = "from " label = "TAPSIGNER encrypted backup file" - choice = await import_export_prompt(label, is_import=False) + choice = await import_export_prompt(label, is_import=True) if choice == KEY_CANCEL: return @@ -1502,8 +1506,28 @@ async def import_tapsigner_backup_file(_1, _2, item): # error already displayed in nfc.py return elif choice == KEY_QR: - # TODO: how is binary encoded? who made this QR??! - pass + # how is binary encoded? who made this QR??! + from ux_q1 import QRScannerInteraction + from ubinascii import a2b_base64 + from ubinascii import unhexlify as a2b_hex + + prob = None + while 1: + data = await QRScannerInteraction.scan( + 'Scan TAPSIGNER backup data', prob) + if not data: return # pressed cancel + + # guess at serialization between Base64 and Hex + try: + # pure hex, the smarter encoding (when in caps) + data = a2b_hex(data) + except ValueError: + try: + data = a2b_base64(data) + except ValueError: + prob = 'Expected HEX digits or Base64 encoded binary' + continue + break else: fn = await file_picker('Pick ' + label, suffix="aes", min_size=100, max_size=160, **choice) if not fn: return diff --git a/shared/auth.py b/shared/auth.py index b7fd1454..c328035a 100644 --- a/shared/auth.py +++ b/shared/auth.py @@ -967,7 +967,7 @@ def psbt_encoding_taster(taste, psbt_len): if taste[0:5] == b'psbt\xff': decoder = None output_encoder = lambda x: x - elif taste[0:10] == b'70736274ff' or taste[0:10] == b'70736274FF': + elif taste[0:10].lower() == b'70736274ff': decoder = HexStreamer() output_encoder = HexWriter psbt_len //= 2 diff --git a/shared/bbqr.py b/shared/bbqr.py new file mode 100644 index 00000000..c4259d92 --- /dev/null +++ b/shared/bbqr.py @@ -0,0 +1,169 @@ +# (c) Copyright 2023 by Coinkite Inc. This file is covered by license found in COPYING-CC. +# +# bbqr.py - Implement BBQr protocol for multiple QR support (also compression and filetype info) +# +import utime, uzlib +import uasyncio as asyncio +from struct import pack, unpack +from utils import B2A +from imptask import IMPT +from queues import Queue + +# For BBQr support +import ngu +b32encode = ngu.codecs.b32_encode +b32decode = ngu.codecs.b32_decode +from ubinascii import unhexlify as a2b_hex + +TYPE_LABELS = dict(P='PSBT File', T='Transaction', J='JSON', C='CBOR', U='Unicode Text') + +class BBQrHeader: + def __init__(self, taste): + # parse header based on standard + # expects a string + assert len(taste) >= 8 + assert taste[0:2] == 'B$' + + self.encoding, self.file_type = taste[2:4] + self.num_parts = int(taste[4:6], 16) + self.which = int(taste[6:8], 16) + + assert 1 <= self.num_parts <= 255 + assert 0 <= self.which < self.num_parts + + def __repr__(self): + return '' % (self.which, self.num_parts, + self.encoding, self.file_type) + + def is_compat(self, other): + # Does this header match previous ones seen? + return (self.encoding == other.encoding and + self.file_type == other.file_type and + self.num_parts == other.num_parts) + + def decode_body(self, scan): + # perform the decoding implied by header (but not decompression) + body = bytes(memoryview(scan)[8:]) + + if self.encoding == 'H': + rv = a2b_hex(body) + else: + rv = b32decode(body) + + return rv + + def file_label(self): + # provide a string as hint to user of what they are getting + if self.file_type in TYPE_LABELS: + return TYPE_LABELS[self.file_type] + else: + return 'Unknown: %s' % self.file_type + + +class BBQrState: + def __init__(self): + self.reset() + + def reset(self): + self._psb = None # hack, to be removed + self.hdr = None + self.parts = set() + self.runt = None + self.runt_size = None + self.blksize = None + + def upper_bound(self): + # max size we are expecting + return self.blksize * self.hdr.num_parts + + def is_valid(self): + return bool(self.hdr) and len(self.parts) == self.hdr.num_parts + + def collect(self, scan): + # Another BBQr has come in; track it. + # - return T while more parts are still needed + # - updates UX to show the progress + from glob import dis + + hdr = BBQrHeader(scan) + + print("Got " + repr(hdr)) + + if not self.hdr or not self.hdr.is_compat(hdr): + # New or incompatible header, they might have changed their + # minds and are now trying to scan something else; recover + self.reset() + self.hdr = hdr + + if hdr.which not in self.parts: + # we've NOT YET seen this one + + # convert back to binary + raw = hdr.decode_body(scan) + + if hdr.which and (hdr.which == hdr.num_parts-1) and not self.parts: + # Problem: this is a runt and we saw it first, we have no idea + # where to put it; store as tmp for now. + self.runt = (hdr.which, raw) + else: + # based on (required) assumption that all parts are equal, we know + # where to put this data, so do that. + self.parts.add(hdr.which) + + if self.blksize is None: + self.blksize = len(raw) + + self.save_packet(hdr.which, raw) + + # seeing any other packet is enough to decide where to put the runt + if self.runt: + wh, raw = self.runt + self.save_packet(wh, raw) + self.parts.add(wh) + self.runt = None + + # provide UX + dis.draw_bbqr_progress(hdr.which, list(self.parts), hdr.num_parts, hdr.file_label()) + + # do we need more still? + return (len(self.parts) < hdr.num_parts) + + def save_packet(self, which, data): + # override this on other projects... which don't have stupid PSRAM like this + # - can only write 4-aligned data to PSRAM, and typically the parts will not be + # 4-aligned because base32 yields 5 byte quantities + # - TODO: keep up num_parts of 3-byte runts, etc. {offset:bytes} and flush aligned + # parts each time we get more data + assert self.blksize is not None + + if which == None: + # we are supposed to be done, return w/ complete length + final_size = (self.blksize * (self.hdr.num_parts-1)) + self.runt_size + return final_size, self._psb[0:final_size] + + if which == self.hdr.num_parts-1: + self.runt_size = len(data) + + offset = which * self.blksize + if not self._psb: + self._psb = bytearray(self.upper_bound()) + + self._psb[offset:offset+len(data)] = data + + def finalize(self): + # got all the parts, so maybe decompress + # - return number of bytes waiting at start of PSRAM, and the filetype code + assert len(self.parts) == self.hdr.num_parts, "still missing parts" + + # flush out data we have + final_size, raw = self.save_packet(None, None) + + if self.hdr.encoding == 'Z': + # do in-place Zlib decompression (TODO) + raw = uzlib.decompress(raw, -10) + final_size = len(raw) + + return self.hdr.file_type, final_size, raw + + +# EOF diff --git a/shared/lcd_display.py b/shared/lcd_display.py index b7a1cac4..3198353c 100644 --- a/shared/lcd_display.py +++ b/shared/lcd_display.py @@ -540,7 +540,8 @@ class Display: # - we need one more (white) pixel on all sides from utils import word_wrap - assert not sidebar + # maybe show something other than QR contents under it + msg = sidebar or msg if msg: if len(msg) <= CHARS_W: @@ -596,6 +597,14 @@ class Display: # TODO: pass a "max_brightness" param here, which would be cleared after next show self.show() + def draw_bbqr_progress(self, new, gotem, num_parts, label): + # we've seen at least one BBQr QR, so update display w/ progress bar + # - lots of data so we can show nice animation + count = len(gotem) or 1 + percent = int(count * 100.0 / num_parts) + self.text(None, -2, 'Keep scanning more...') + self.text(None, -1, '%s: %d of %d = %d%% ' % (label, count, num_parts, percent), dark=True) + def draw_box(self, x, y, w, h): # using line-drawing chars, draw a box # returns X pos of first inside char diff --git a/shared/qrs.py b/shared/qrs.py index e6005a24..48f6ae9d 100644 --- a/shared/qrs.py +++ b/shared/qrs.py @@ -10,6 +10,10 @@ from charcodes import (KEY_LEFT, KEY_RIGHT, KEY_UP, KEY_DOWN, KEY_HOME, from version import has_qwerty +# TODO: This class has a terrible API! + +MAX_V40_SIZE = 4296 + class QRDisplaySingle(UserInteraction): # Show a single QR code for (typically) a list of addresses, or a single value. @@ -30,6 +34,7 @@ class QRDisplaySingle(UserInteraction): # - version=4..11 => single pixel per module # - not really providing enough space around these, shrug # - inverted QR (black/white swap) still readable by scanners, altho wrong + # - on Q: ver 23 => 109x109 is largest that can be pixel-doubled, can do v40 tho at 1:1 if self.is_alnum: # targeting 'alpha numeric' mode, nice and dense; caps only tho enc = uqr.Mode_ALPHANUMERIC diff --git a/shared/scanner.py b/shared/scanner.py index ab40f0ff..e0563a27 100644 --- a/shared/scanner.py +++ b/shared/scanner.py @@ -8,6 +8,7 @@ from struct import pack, unpack from utils import B2A from imptask import IMPT from queues import Queue +from bbqr import BBQrState def calc_bcc(msg): bcc = 0 @@ -50,6 +51,7 @@ def unwrap(packed): OKAY = b'Z\x01\x00\x02\x90\x00\x93\xa5' RAW_OKAY = b'\x90\x00' LEN_OKAY = const(8) + # TODO: constructor should leave it in reset for simple lower-power usage; then after # login we can do full setup (2+ seconds) and then sleep again until needed. @@ -57,10 +59,13 @@ LEN_OKAY = const(8) class QRScanner: def __init__(self): - self.lock = asyncio.Lock() + # set when we are.. self.busy_scanning = False + # hodl this lock when communicating w/ QR scanner + self.lock = asyncio.Lock() + from machine import UART, Pin self.serial = UART(2, 9600) self.reset = Pin('QR_RESET', Pin.OUT_OD, value=0) @@ -84,29 +89,30 @@ class QRScanner: # setup device, and then stop await asyncio.sleep(2) - while self.serial.read(): - pass # ignore old data + async with self.lock: + while self.serial.read(): + pass # ignore old data - try: - # get b'V2.3.0.7\r\n' or similar - rx = await self.tx('T_OUT_CVER') - self.version = rx.decode().strip() - except: - raise - #print("QR Scanner: missing") + try: + # get b'V2.3.0.7\r\n' or similar + rx = await self.tx('T_OUT_CVER') + self.version = rx.decode().strip() + except: + raise + #print("QR Scanner: missing") - # configure it like we want it - await self.tx('S_CMD_FFFF') # factory reset of settings - await self.tx('S_CMD_MTRS5000') # 5s to read before fail - await self.tx('S_CMD_MT11') # trigger is edge-based (not level) - await self.tx('S_CMD_MT30') # Same code reading without delay - await self.tx('S_CMD_MT20') # Enable automatic sleep when idle - await self.tx('S_CMD_MTRF500') # Idle time: 500ms - await self.tx('S_CMD_059A') # add CR LF after QR data + # configure it like we want it + await self.tx('S_CMD_FFFF') # factory reset of settings + await self.tx('S_CMD_MTRS5000') # 5s to read before fail + await self.tx('S_CMD_MT11') # trigger is edge-based (not level) + await self.tx('S_CMD_MT30') # Same code reading without delay + await self.tx('S_CMD_MT20') # Enable automatic sleep when idle + await self.tx('S_CMD_MTRF500') # Idle time: 500ms + await self.tx('S_CMD_059A') # add CR LF after QR data - self.setup_done = True + self.setup_done = True - await self.goto_sleep() + await self.goto_sleep() async def scan_once(self): # blocks until something is scanned. returns it @@ -115,6 +121,8 @@ class QRScanner: while not self.setup_done: await asyncio.sleep(.25) + bbqr = BBQrState() + async with self.lock: self.busy_scanning = True @@ -134,17 +142,39 @@ class QRScanner: await self.tx('SR030301') try: - rv = await self.stream.readline() + while 1: + rv = await self._readline() + if not rv: continue + + if rv[0:2] == 'B$' and bbqr.collect(rv): + # BBQr protocol detected; collect more data + continue + + break except asyncio.CancelledError: - rv = None + return None finally: - await self.tx('SR030300') + self.busy_scanning = False + try: + await self.tx('SR030300') + except: + await self.tx('SR030300') # second try await self.goto_sleep() - self.busy_scanning = False + if bbqr.is_valid(): + # will return a tuple + return bbqr return rv + async def _readline(self): + # overridden in simulator + # - blocks for QR to be seen + # - must trim newline(s) + # - must convert to str + rv = await self.stream.readline() + return rv.rstrip().decode() + async def wakeup(self): # send specific command until it responds # - it will wake on any command, but not instant @@ -231,7 +261,7 @@ class QRScanner: return if self.busy_scanning: - # do nothing if scanning already + # during scanning, invert meaning... turn off light return async with self.lock: diff --git a/shared/utils.py b/shared/utils.py index 921bb246..bef73460 100644 --- a/shared/utils.py +++ b/shared/utils.py @@ -114,7 +114,6 @@ class HexWriter: def write(self, b): self.checksum.update(b) self.pos += len(b) - self.fd.write(b2a_hex(b)) def seek(self, offset, whence=0): @@ -135,6 +134,17 @@ class HexWriter: buf[0:len(b)] = b return len(b) +class CapsHexWriter(HexWriter): + # omit newlines at end, and do CAPS ... better for QR usage + def write(self, b): + self.checksum.update(b) + self.pos += len(b) + self.fd.write(b2a_hex(b).upper()) + + def __exit__(self, *a, **k): + self.fd.seek(0, 2) # go to end + return self.fd.__exit__(*a, **k) + class Base64Writer: # Emulate a file/stream but convert binary to Base64 as they write def __init__(self, fd): diff --git a/shared/ux.py b/shared/ux.py index 02494108..52753565 100644 --- a/shared/ux.py +++ b/shared/ux.py @@ -89,9 +89,10 @@ def ux_clear_keys(no_aborts=False): async def ux_wait_keyup(expected=None): # Wait for single keypress in 'expected' set, return it # no visual feedback, no escape + # - can be canceled anytime, using wait_for to create a timeout from glob import numpad - armed = None + armed = numpad.key_pressed or False while 1: ch = await numpad.get() @@ -126,6 +127,9 @@ def ux_poll_key(): return ch +def q1_reword(msg): + return msg.replace('\nX ', 'CANCEL ').replace(' X ', ' CANCEL ').replace('OK', 'ENTER') + async def ux_show_story(msg, title=None, escape=None, sensitive=False, strict_escape=False): # show a big long string, and wait for XY to continue # - returns character used to get out (X or Y) @@ -139,6 +143,10 @@ async def ux_show_story(msg, title=None, escape=None, sensitive=False, strict_es # LATER: rarely used lines.append('\x01' + title) + if version.has_qwerty: + # big screen always needs blank after title + lines.append('') + if hasattr(msg, 'readline'): # coming from in-memory file for larger messages msg.seek(0) @@ -146,6 +154,9 @@ async def ux_show_story(msg, title=None, escape=None, sensitive=False, strict_es if ln[-1] == '\n': ln = ln[:-1] + if version.has_qwerty: + ln = q1_reword(ln) + if len(ln) > CH_PER_W: lines.extend(word_wrap(ln, CH_PER_W)) else: @@ -159,7 +170,7 @@ async def ux_show_story(msg, title=None, escape=None, sensitive=False, strict_es else: # simple string being shown if version.has_qwerty: - msg = msg.replace('\nX ', 'CANCEL ').replace(' X ', ' CANCEL ').replace('OK', 'SELECT') + msg = q1_reword(msg) for ln in msg.split('\n'): if len(ln) > CH_PER_W: @@ -293,9 +304,9 @@ async def show_qr_codes(addrs, is_alnum, start_n): o = QRDisplaySingle(addrs, is_alnum, start_n, sidebar=None) await o.interact_bare() -async def show_qr_code(data, is_alnum=False): +async def show_qr_code(data, is_alnum=False, msg=None): from qrs import QRDisplaySingle - o = QRDisplaySingle([data], is_alnum) + o = QRDisplaySingle([data], is_alnum, sidebar=msg) await o.interact_bare() async def ux_enter_bip32_index(prompt, can_cancel=False, unlimited=False): @@ -384,14 +395,13 @@ async def import_export_prompt(title, is_import=False, no_qr=False): else: prompt, escape = _export_prompt_builder(title, no_qr) - force_vdisk = False - slot_b = None # ie. don't care + force_vdisk = False + slot_b = None # ie. don't care / either if not prompt: # they don't have NFC nor VD enabled, and no second slots... so will be file. pass else: - print('escape: ' + repr(escape)) ch = await ux_show_story(prompt, escape=escape) if ch in "3"+KEY_NFC: diff --git a/shared/ux_q1.py b/shared/ux_q1.py index c2f0e7fe..68ce84a8 100644 --- a/shared/ux_q1.py +++ b/shared/ux_q1.py @@ -68,7 +68,7 @@ async def ux_confirm(msg): # confirmation screen, with stock title and Y=of course. from ux import ux_show_story - resp = await ux_show_story('\n' + msg, title="Are you SURE ?!?") + resp = await ux_show_story(msg, title="Are you SURE ?!?") return resp == 'y' @@ -594,9 +594,11 @@ class QRScannerInteraction: def __init__(self): pass - async def animation(self, prompt, line2=None): + @staticmethod + async def scan(prompt, line2=None): # draw animation, while waiting for them to scan something # - CANCEL to abort + # - returns a proper string or None. newlines stripped. no binary support from glob import dis, SCAN from ux import ux_wait_keyup frames = [ 1, 2, 3, 4, 5, 4, 3, 2 ] @@ -621,8 +623,7 @@ class QRScannerInteraction: print("Scanned: %r" % data) break - # wait for key or 250ms - # XXX bug, key doesn't usually get noticed? + # wait for key or 250ms delay try: ch = await asyncio.wait_for_ms(ux_wait_keyup(), 250) except asyncio.TimeoutError: @@ -632,10 +633,11 @@ class QRScannerInteraction: data = None break - #await SCAN.scan_stop() task.cancel() + # clear screen right away so user knows we got it dis.clear() + dis.show() return data @@ -697,10 +699,33 @@ class QRScannerInteraction: prompt = 'Scan any QR code, or CANCEL' if not expect_secret else \ 'Scan XPRV or Seed Words, or CANCEL' - got = await self.animation(prompt, line2=problem) + got = await self.scan(prompt, line2=problem) if got is None: return + if hasattr(got, 'finalize'): + # BBQr object + ty, final_size, got = got.finalize() + if ty == 'P': + print("Got a PSBT file: %d bytes" % final_size) + await qr_psbt_sign(None, final_size, got) + return + elif ty == 'T': + problem = "No good use for transaction." + continue + elif ty == 'C': + problem = "Sorry, Q has no use CBOR" + continue + elif ty == 'J': + problem = "Sorry, Q has no use JSON yet" + continue + elif ty == 'U': + # continue text thru code below + pass + else: + problem = "Sorry, dont know filetype: " + ty + continue + try: # can we decode a master secret of some type? mode, value = self.decode_secret(got) @@ -711,10 +736,20 @@ class QRScannerInteraction: problem = str(exc) continue - # Might be an address or pubkey? But not binary - got = got.decode().strip() + # might be a PSBT? + if len(got) > 100: + from auth import psbt_encoding_taster + try: + decoder, _, psbt_len = psbt_encoding_taster(got[0:10].encode(), len(got)) + print("Got a PSBT file") + await qr_psbt_sign(decoder, psbt_len, got) + return + except ValueError: + pass - # remove URL scheme: if present + # Might be an address or pubkey or psbt? But not binary + + # remove URL protocol: if present scheme = None if ':' in got: scheme, got = got.split(':', 1) @@ -745,4 +780,63 @@ class QRScannerInteraction: problem = 'Sorry, can not make use of that!' +async def qr_psbt_sign(decoder, psbt_len, raw): + # Got a PSBT coming in from QR scanner. Assume single QR for now. + # - similar to auth.sign_psbt_file() + from auth import UserAuthorizedAction, ApproveTransaction + from utils import CapsHexWriter + from glob import dis, PSRAM + from ux import show_qr_code, the_ux + from sffile import SFFile + from auth import MAX_TXN_LEN, TXN_INPUT_OFFSET, TXN_OUTPUT_OFFSET + from qrs import MAX_V40_SIZE + + if isinstance(raw, str): + raw = raw.encode() + + # copy to PSRAM, and convert encoding at same time + total = 0 + with SFFile(TXN_INPUT_OFFSET, max_size=psbt_len) as out: + if not decoder: + total += out.write(raw) + else: + for here in decoder.more(raw): + out.write(here) + total += len(here) + + # might have been whitespace inflating initial estimate of PSBT size + assert total <= psbt_len + psbt_len = total + + async def done(psbt): + dis.fullscreen("Wait...") + txid = None + + with SFFile(TXN_OUTPUT_OFFSET, max_size=MAX_TXN_LEN, message="Saving...") as psram: + + # save transaction, as hex into PSRAM + with CapsHexWriter(psram) as fd: + if psbt.is_complete(): + txid = psbt.finalize(fd) + else: + psbt.serialize(fd) + + data_len = psram.tell() + + UserAuthorizedAction.cleanup() + + # SOON will be a loop here, that animates multiple QR's ... for now, one. + here = PSRAM.read_at(TXN_OUTPUT_OFFSET, data_len) + + if data_len >= MAX_V40_SIZE: + # too big for single version 40 QR + await ux_show_story("Resulting txn is too big for QR code") + return + + await show_qr_code(here, is_alnum=True, msg=(txid or 'Partly Signed PSBT')) + + UserAuthorizedAction.cleanup() + UserAuthorizedAction.active_request = ApproveTransaction(psbt_len, approved_cb=done) + the_ux.push(UserAuthorizedAction.active_request) + # EOF diff --git a/stm32/COLDCARD_Q1/modlcd.c b/stm32/COLDCARD_Q1/modlcd.c index 9edf9ff5..8afce4aa 100644 --- a/stm32/COLDCARD_Q1/modlcd.c +++ b/stm32/COLDCARD_Q1/modlcd.c @@ -94,7 +94,7 @@ static void write_data(const spi_t *spi, int len, const uint8_t *data) static void write_data_repeated(const spi_t *spi, int count, int len, const uint8_t *data) { - // Send a bunch of pixel data, and repeat it N tim.es + // Send a bunch of pixel data, and repeat it N times. mp_hal_pin_write(PIN_LCD_CS, 1); mp_hal_pin_write(PIN_LCD_DATA_CMD, 1); mp_hal_pin_write(PIN_LCD_CS, 0); diff --git a/stm32/COLDCARD_Q1/mpconfigboard.h b/stm32/COLDCARD_Q1/mpconfigboard.h index 4fd0fb8e..5a52972b 100644 --- a/stm32/COLDCARD_Q1/mpconfigboard.h +++ b/stm32/COLDCARD_Q1/mpconfigboard.h @@ -69,9 +69,11 @@ // SD card detect switch // - open when card inserted, grounded when no card +/* Q has dual slot, so multiple detect pins, this can't work #define MICROPY_HW_SDCARD_DETECT_PIN (pin_C13) #define MICROPY_HW_SDCARD_DETECT_PULL (GPIO_PULLUP) #define MICROPY_HW_SDCARD_DETECT_PRESENT (GPIO_PIN_SET) +*/ // We have our own version of this code. diff --git a/unix/q1-images/background.png b/unix/q1-images/background.png index 681b40a1..5603791c 100644 Binary files a/unix/q1-images/background.png and b/unix/q1-images/background.png differ diff --git a/unix/simulator.py b/unix/simulator.py index b820194d..512f7a33 100755 --- a/unix/simulator.py +++ b/unix/simulator.py @@ -544,67 +544,162 @@ def special_q1_keys(ch): return None +def q1_click_to_keynum(x, y): + # convert on-screen position to a keynumber, or None if they missing + + # handle screen click as "paste" + if (90 <= x <= 430) and (90 <= y <= 345): + # click on screen + return 'SCREEN' + + # keypad area + left = 29 + right = 490 + top = 398 + bottom = 790 + + if (y > bottom) or (y < top): + return None + + # put onto a grid; better would have dead zones between them + pitch_x = (right-left) / 10 + pitch_y = (bottom-top) / 7 + + gx = int((x - left) / pitch_x) + gy = int((y - top) / pitch_y) + + #print(f'{x=} {y=} => {gx=} {gy=}') + + # main qwerty area, nice grid + if 2 <= gy <= 5: + return ((gy-1) * 10) + gx + + # top area; two rows really + if (0 <= gy <= 1): + if 2 <= gx <= 3: + return 0x03 # KEY_LEFT + if 6 <= gx <= 7: + return 0x06 # KEY_RIGHT + + if gy == 0: + if gx == 0: + # power key? + raise SystemExit + if gx == 1: + return 0x02 # KEY_QR + if 4 <= gx <= 5: + return 0x04 # KEY_UP + if gx >= 8: + return 0x07 # KEY_CANCEL + + if gy == 1: + if gx == 0: + return 0x00 # KEY_NFC + if gx == 1: + return 0x01 # KEY_TAB + if 4 <= gx <= 5: + return 0x05 # KEY_DOWN + if gx >= 8: + return 0x08 # KEY_ENTER + + if gy == 6: + # bottom row + if gx == 0: # too narrow, but meh + return q1_charmap.KEYNUM_LAMP + if 1 <= gx <= 3: + return q1_charmap.KEYNUM_SHIFT + if 4 <= gx <= 6: + return 52 # space + if 7 <= gx <= 8: + return q1_charmap.KEYNUM_SYMBOL + if gx == 9: + return 54 # delete + + return None + q1_pressed = set() -def handle_q1_key_events(event, numpad_tx): +def handle_q1_key_events(event, numpad_tx, data_tx): # Map SDL2 (unix, desktop) keyscan code into keynumber on Q1 # - allow Q1 to do shift logic # - support up to 5 keys down at once global q1_pressed - assert event.type in { sdl2.SDL_KEYUP, sdl2.SDL_KEYDOWN} + if event.type in (sdl2.SDL_MOUSEBUTTONDOWN, sdl2.SDL_MOUSEBUTTONUP): + is_press = (event.type == sdl2.SDL_MOUSEBUTTONDOWN) + kn = q1_click_to_keynum(event.button.x, event.button.y) - is_press = (event.type == sdl2.SDL_KEYDOWN) + if kn == 'SCREEN': + # click on screen to paste clipboard into QR scanner or NFC tag + if is_press: + txt = sdl2.SDL_GetClipboardText() + if txt: + print(f"Doing paste: {txt.decode()}") + data_tx.write(txt + b'\n') + return None - # first, see if we can convert to ascii char - scancode = event.key.keysym.sym & 0xffff - try: - ch = chr(event.key.keysym.sym) - except: - ch = scancode_remap(scancode) + if not kn: return - #print(f'scan 0x{scancode:04x} mod=0x{event.key.keysym.mod:04x}=> char={ch}=0x{ord(ch) if ch else 0:02x}') - - shift_down = bool(event.key.keysym.mod & 0x3) # left or right shift - symbol_down = bool(event.key.keysym.mod & 0x200) # right ALT - special_down = bool(event.key.keysym.mod & 0xc00) # left or right META - - #print(f"modifier = 0x{event.key.keysym.mod:04x} => shift={shift_down} symb={symbol_down} spec={special_down}") - - if special_down: - ch = special_q1_keys(ch) - if not ch: - return - - # reverse char to a keynum, and perhaps the meta key too - kn = None - - if ch: - if ch in q1_charmap.DECODER: - kn = q1_charmap.DECODER.find(ch) - elif ch in q1_charmap.DECODER_SHIFT: - kn = q1_charmap.DECODER_SHIFT.find(ch) - shift_down = is_press - elif ch in q1_charmap.DECODER_SYMBOL: - kn = q1_charmap.DECODER_SYMBOL.find(ch) - symbol_down = is_press - - #print(f"{ch=} => keynum={kn} => shift={shift_down} sym={symbol_down}") - - if kn is not None: + # do right click for shift+key, middle click for symb+key ... good luck + #shift_down = (event.button.button == sdl2.SDL_BUTTON_RIGHT) + #symbol_down = (event.button.button == sdl2.SDL_BUTTON_MIDDLE) if is_press: q1_pressed.add(kn) else: q1_pressed.discard(kn) + else: + assert event.type in { sdl2.SDL_KEYUP, sdl2.SDL_KEYDOWN} + is_press = (event.type == sdl2.SDL_KEYDOWN) - q1_pressed.discard(q1_charmap.KEYNUM_SHIFT) - q1_pressed.discard(q1_charmap.KEYNUM_SYMBOL) + # first, see if we can convert to ascii char + scancode = event.key.keysym.sym & 0xffff + try: + ch = chr(event.key.keysym.sym) + except: + ch = scancode_remap(scancode) - if shift_down: - q1_pressed.add(q1_charmap.KEYNUM_SHIFT) - if symbol_down: - q1_pressed.add(q1_charmap.KEYNUM_SYMBOL) + #print(f'scan 0x{scancode:04x} mod=0x{event.key.keysym.mod:04x}=> char={ch}=0x{ord(ch) if ch else 0:02x}') - #print(f" .. => pressed: {q1_pressed}") + shift_down = bool(event.key.keysym.mod & 0x3) # left or right shift + symbol_down = bool(event.key.keysym.mod & 0x200) # right ALT + special_down = bool(event.key.keysym.mod & 0xc00) # left or right META + + #print(f"modifier = 0x{event.key.keysym.mod:04x} => shift={shift_down} symb={symbol_down} spec={special_down}") + + if special_down: + ch = special_q1_keys(ch) + if not ch: + return + + # reverse char to a keynum, and perhaps the meta key too + kn = None + + if ch: + if ch in q1_charmap.DECODER: + kn = q1_charmap.DECODER.find(ch) + elif ch in q1_charmap.DECODER_SHIFT: + kn = q1_charmap.DECODER_SHIFT.find(ch) + shift_down = is_press + elif ch in q1_charmap.DECODER_SYMBOL: + kn = q1_charmap.DECODER_SYMBOL.find(ch) + symbol_down = is_press + + #print(f"{ch=} => keynum={kn} => shift={shift_down} sym={symbol_down}") + + if kn is not None: + if is_press: + q1_pressed.add(kn) + else: + q1_pressed.discard(kn) + + q1_pressed.discard(q1_charmap.KEYNUM_SHIFT) + q1_pressed.discard(q1_charmap.KEYNUM_SYMBOL) + + if shift_down: + q1_pressed.add(q1_charmap.KEYNUM_SHIFT) + if symbol_down: + q1_pressed.add(q1_charmap.KEYNUM_SYMBOL) + + #print(f" .. => pressed: {q1_pressed}") # see variant/touch.py where this is decoded. if len(q1_pressed) > 5: @@ -629,6 +724,7 @@ Q1 specials: Meta-L - Lamp button Meta-N - NFC button Meta-R - QR button (not Meta-Q, because that's quit!) + Click Screen - Send clipboard contents to QR/NFC ''') sdl2.ext.init() sdl2.SDL_EnableScreenSaver() @@ -662,6 +758,7 @@ Q1 specials: display_r, display_w = os.pipe() # fancy OLED display led_r, led_w = os.pipe() # genuine LED numpad_r, numpad_w = os.pipe() # keys + data_r, data_w = os.pipe() # data dumps # manage unix socket cleanup for client def sock_cleanup(): @@ -677,7 +774,7 @@ Q1 specials: # - open the serial device # - get buffering/non-blocking right # - pass in open fd numbers - pass_fds = [display_w, numpad_r, led_w] + pass_fds = [display_w, numpad_r, led_w, data_r] if '--metal' in sys.argv: # bare-metal access: use a real Coldcard's bootrom+SE. @@ -696,8 +793,7 @@ Q1 specials: os.chdir('./work') cc_cmd = ['../coldcard-mpy', '-X', 'heapsize=9m', - '-i', '../sim_boot.py', - str(display_w), str(numpad_r), str(led_w)] \ + '-i', '../sim_boot.py'] + [str(i) for i in pass_fds] \ + metal_args + sys.argv[1:] xterm = subprocess.Popen(['xterm', '-title', 'Coldcard Simulator REPL', '-geom', '132x40+650+40', '-e'] + cc_cmd, @@ -710,6 +806,7 @@ Q1 specials: display_rx = open(display_r, 'rb', closefd=0, buffering=0) led_rx = open(led_r, 'rb', closefd=0, buffering=0) numpad_tx = open(numpad_w, 'wb', closefd=0, buffering=0) + data_tx = open(data_w, 'wb', closefd=0, buffering=0) # setup no blocking for r in [display_rx, led_rx]: @@ -750,7 +847,7 @@ Q1 specials: pass else: # all other key events for Q1 get handled here - handle_q1_key_events(event, numpad_tx) + handle_q1_key_events(event, numpad_tx, data_tx) continue if event.type == sdl2.SDL_KEYUP or event.type == sdl2.SDL_KEYDOWN: @@ -821,7 +918,7 @@ Q1 specials: send_event(ch, event.type == sdl2.SDL_KEYDOWN) if is_q1 and event.type in (sdl2.SDL_MOUSEBUTTONDOWN, sdl2.SDL_MOUSEBUTTONUP): - print('NOTE: Click on sim keyboard not supported for Q1') + handle_q1_key_events(event, numpad_tx, data_tx) else: if event.type == sdl2.SDL_MOUSEBUTTONDOWN: #print('xy = %d, %d' % (event.button.x, event.button.y)) @@ -833,6 +930,19 @@ Q1 specials: for ch in list(pressed): send_event(ch, False) + if event.type == sdl2.SDL_DROPFILE: + # failed to get sdl2.SDL_DROPTEXT to work, but also not convenient to use + print(f"Sending file: {event.drop.file.decode()}") + try: + data = open(event.drop.file, 'rb').read(4096) # size limit < pipe depth + if data[-1] != b'\n': + data += b'\n' # must end w/ NL, probably needs to be text too + data_tx.write(data) + print(f".. sent {len(data)} bytes") + except Exception as exc: + print(repr(exc)) + + rs, ws, es = select(readables, [], [], 0) for r in rs: diff --git a/unix/variant/ckcc.py b/unix/variant/ckcc.py index 83d14e72..2e59eeac 100644 --- a/unix/variant/ckcc.py +++ b/unix/variant/ckcc.py @@ -4,7 +4,7 @@ # # REMINDER: you must recompile coldcard-mpy if you change this file! # -import ustruct, sys +import ustruct, sys, uasyncio from ubinascii import hexlify as b2a_hex #from ubinascii import unhexlify as a2b_hex #import utime as time @@ -16,10 +16,12 @@ rng_fd = open('/dev/urandom', 'rb') # Emulate the red/green LED global genuine_led - led_pipe = open(int(sys.argv[3]), 'wb') led_pipe.write(b'\xff\x01') # all off, except SE1 green genuine_led = True + +# Provide a way to dump few hundred/4k bytes of data from QR or NFC simulated read +data_pipe = uasyncio.StreamReader(open(int(sys.argv[4]), 'rb')) # HACK: reduce size of heap in Unix simulator to be more similar to # actual hardware, so we can enjoy those out-of-memory errors too! diff --git a/unix/variant/sim_scanner.py b/unix/variant/sim_scanner.py index 7f663783..163c2e67 100644 --- a/unix/variant/sim_scanner.py +++ b/unix/variant/sim_scanner.py @@ -11,9 +11,13 @@ DATA_FILE = 'qrdata.txt' class SimulatedQRScanner(QRScanner): def __init__(self): + self.setup_done = True self.version = '4.20' + self.lock = asyncio.Lock() + # returns a Q we append to as results come in + self._q = Queue() - async def _read_results(self, q): + async def _read_results(self): # be a task that reads incoming QR codes from scanner (already in operation) # - will be canceled when done/stopping try: @@ -21,8 +25,15 @@ class SimulatedQRScanner(QRScanner): except OSError: orig_mtime = None + from ckcc import data_pipe + while 1: - await asyncio.sleep_ms(250) + try: + got = await asyncio.wait_for(data_pipe.readline(), 250) + print("Got pasted QR data.") + self._q.put_nowait(got) + except asyncio.TimeoutError: + pass try: _, mtime, _ = os.stat(DATA_FILE)[-3:] @@ -32,31 +43,25 @@ class SimulatedQRScanner(QRScanner): if mtime == orig_mtime: continue - print("Got new QR scan data.") - got = open(DATA_FILE, 'rt').read(8196).strip() - q.put_nowait(got) + print("Got new QR scan data from file.") + got = open(DATA_FILE, 'rb').read(8196) + self._q.put_nowait(got) orig_mtime = mtime - async def scan_once(self): - # returns a Q we append to as results come in - q = Queue() - - print("Put QR data into file: work/%s" % DATA_FILE) - - task = asyncio.create_task(self._read_results(q)) - - rv = await q.get() - - task.cancel() - - return rv + async def _readline(self): + rv = await self._q.get() + return rv.rstrip().decode() async def wakeup(self): + print("Click screen to paste QR data from clipboard,\nor write data into file: work/%s" % DATA_FILE) + self._task = asyncio.create_task(self._read_results()) + + async def tx(self, msg, timeout=250): return async def goto_sleep(self): - return + self._task.cancel() async def torch_control(self, on): print("Torch is: " + ('ON' if on else 'off'))