From 99b8481f4df833c638eb34356bdbaf52164dfc2e Mon Sep 17 00:00:00 2001 From: "Peter D. Gray" Date: Fri, 1 Dec 2023 12:13:28 -0500 Subject: [PATCH] autodetect qr contents --- shared/bbqr.py | 23 ++- shared/decoders.py | 190 +++++++++++++++++++++++++ shared/exceptions.py | 3 + shared/lcd_display.py | 19 ++- shared/manifest_q1.py | 1 + shared/scanner.py | 71 ++++++---- shared/ux_q1.py | 250 ++++++++++++++++----------------- testing/data/bip21.txt | 7 + testing/data/devils-txn.txn | 1 + testing/data/overlapped-qr.txt | 4 + testing/test_decoders.py | 131 +++++++++++++++++ unix/variant/sim_scanner.py | 12 ++ unix/work/.gitignore | 1 + 13 files changed, 550 insertions(+), 163 deletions(-) create mode 100644 shared/decoders.py create mode 100644 testing/data/bip21.txt create mode 100644 testing/data/devils-txn.txn create mode 100644 testing/data/overlapped-qr.txt create mode 100644 testing/test_decoders.py diff --git a/shared/bbqr.py b/shared/bbqr.py index f1678775..6f4128b5 100644 --- a/shared/bbqr.py +++ b/shared/bbqr.py @@ -44,8 +44,8 @@ class BBQrHeader: assert 0 <= self.which < self.num_parts def __repr__(self): - return '' % (self.which, self.num_parts, - self.encoding, self.file_type) + return '' % (self.which+1, self.num_parts, + self.encoding, self.file_type) def is_compat(self, other): # Does this header match previous ones seen? @@ -99,7 +99,7 @@ class BBQrState: hdr = BBQrHeader(scan) - print("Got " + repr(hdr)) + print("Got %r have %r" % (hdr, self.parts)) if not self.hdr or not self.hdr.is_compat(hdr): # New or incompatible header, they might have changed their @@ -111,7 +111,13 @@ class BBQrState: # we've NOT YET seen this one # convert back to binary - raw = hdr.decode_body(scan) + try: + raw = hdr.decode_body(scan) + except: + # can happen if QR got corrupted between scanner and us (overlap) + # or back BBQr implementation + dis.draw_bbqr_progress(hdr, self.parts, bool(self.runt), corrupt=True) + return True if hdr.which and (hdr.which == hdr.num_parts-1) and not self.parts: # Problem: this is a runt and we saw it first, we have no idea @@ -134,8 +140,8 @@ class BBQrState: self.parts.add(wh) self.runt = None - # provide UX - dis.draw_bbqr_progress(hdr.which, list(self.parts), hdr.num_parts, hdr.file_label()) + # provide UX -- even if we didn't use it + dis.draw_bbqr_progress(hdr, self.parts, bool(self.runt)) # do we need more still? return (len(self.parts) < hdr.num_parts) @@ -172,7 +178,10 @@ class BBQrState: if self.hdr.encoding == 'Z': # do in-place Zlib decompression (TODO) - raw = uzlib.decompress(raw, -10) + try: + raw = uzlib.decompress(raw, -10) + except: + raise RuntimeError("Zlib fail") final_size = len(raw) return self.hdr.file_type, final_size, raw diff --git a/shared/decoders.py b/shared/decoders.py new file mode 100644 index 00000000..617449a1 --- /dev/null +++ b/shared/decoders.py @@ -0,0 +1,190 @@ +# (c) Copyright 2023 by Coinkite Inc. This file is covered by license found in COPYING-CC. +# +# decoders.py - Convert QR (or text) values into useful bitcoin-related objects. +# +import uasyncio as asyncio +import ngu, bip39 +from ubinascii import unhexlify as a2b_hex +from exceptions import QRDecodeExplained + +def txn_decoding_taster(txt): + # look at first 4 bytes, and assume it's txn version number (LE32 0x1 or 0x2), then decode + # - working in normal RAM, won't handle full sized txn + # - will not be binary + # - not a very conclusive test, maybe should decode it more here? + from ubinascii import a2b_base64 + + if txt[0:8] in { '01000000', '02000000'}: + # transaction in hex format + return a2b_hex(txt) + elif txt[0:4] in { 'AQAA', 'AgAA' }: + # Base64 encoded + return a2b_base64(txt) + else: + raise ValueError("not txn") + + +def decode_secret(got): + # Decode a few different ways to store a master secret (in a QR), or raise + # - xprv / tprv + # - words (either full or prefixes, case insensitive) + # - SeedQR (github.com/SeedSigner/seedsigner/blob/dev/docs/seed_qr/README.md) + + # remove bitcoin: if present (unlikely) + if ':' in got: + _, got = got.split(':', 1) + + if got[1:4] == 'prv': + # xprv or tprv: private key import for sure + # - verify checksum is right + try: + raw = ngu.codecs.b58_decode(got) + except: + raise ValueError('corrupt xprv?') + + return 'xprv', got + + taste = got.strip().lower() + + if taste.isdigit(): + # SeedQR: 4 digit groups of index into word list + parts = [taste[pos:pos+4] for pos in range(0, len(taste), 4)] + try: + assert len(parts) in (12, 18, 24) + words = [bip39.wordlist_en[int(n)] for n in parts] + except: + raise ValueError('corrupt SeedQR?') + return 'words', words + + words = taste.strip().split(' ') + if len(words) in [ 12, 18, 24]: + # looks like bip-39 words, decode and re-expand + idx = [bip39.get_word_index(w) for w in words] + return 'words', [bip39.wordlist_en[n] for n in idx] + + raise ValueError('no idea') + +def decode_qr_result(got, expect_secret=False): + # Could be BBQr or text + + if hasattr(got, 'finalize'): + # BBQr object + try: + ty, final_size, got = got.finalize() + except BaseException as exc: + raise QRDecodeExplained("BBQr decode failed: " + str(exc)) + + if expect_secret and ty in 'PT': + raise QRDecodeExplained('Expected secrets not PSBT/TXN') + + if ty == 'P': + + return 'psbt', (None, final_size, got) + + elif ty == 'T': + + return 'txn', (None, final_size, got) + + elif ty == 'C': + + raise QRDecodeExplained("Sorry, CBOR not useful.") + + elif ty == 'J': + + # TODO: maybe multisig, hsm config setup?? + raise QRDecodeExplained("Sorry, JSON not useful.") + + elif ty == 'U': + + # continue thru code below for TEXT + pass + + else: + raise QRDecodeExplained("Sorry, unknown file type: " + ty) + + # First can we decode a master secret of some type? + + try: + mode, value = decode_secret(got) + return mode, (value,) + except QRDecodeExplained: + raise + except BaseException as exc: + #import sys; sys.print_exception(exc) + if expect_secret: + raise QRDecodeExplained("Unable to decode as secret") + + if expect_secret: + raise QRDecodeExplained("Not a secret?") + + return decode_qr_text(got) + +def decode_qr_text(got): + # Study text received over QR, for useful things. + # - case may be "wrong" but some values are case-sensitive (base58) + # - not binary, but might be some other encoding than BBQr + # - if bad checksum on bitcoin addr, we treat as text... since might be + # return: what-it-is, (tuple) + orig_got = got + + # might be a PSBT? + if len(got) > 100: + from auth import psbt_encoding_taster + try: + decoder, _, psbt_len = psbt_encoding_taster(got[0:10].encode(), len(got)) + return 'psbt', (decoder, psbt_len, got) + except QRDecodeExplained: + raise + except: + pass + + # might be txn, as hex or base64 + try: + return 'txn', (txn_decoding_taster(got), ) + except: + # was something else. + pass + + # Might be an address or pubkey? + + # remove URL protocol: if present + proto, args, addr = None, None, None + if ':' in got: + proto, got = got.split(':', 1) + + # looks like BIP-21 payment URL + if '?' in got: + addr, args = got.split('?', 1) + + # weak URL decode here. + args = args.split('&') + args = dict(a.split('=', 1) for a in args) + + # assume it's an bare address for now + if not addr: + addr = got + + # old school + try: + raw = ngu.codecs.b58_decode(addr) + + # it's valid base58 + # an address, P2PKH or xpub (xprv checked above) + if addr[1:4] == 'pub': + return 'xpub', (addr,) + + return 'addr', (proto, addr, args) + except: + pass + + # new school: bech32 or bech32m + try: + hrp, version, data = ngu.codecs.segwit_decode(addr) + return 'addr', (proto, addr, args) + except: + pass + + # catch-all ... was text. Can still show on-screen perhaps useful for other applications + return 'text', (orig_got,) + +# EOF diff --git a/shared/exceptions.py b/shared/exceptions.py index 72c90742..7be7c517 100644 --- a/shared/exceptions.py +++ b/shared/exceptions.py @@ -43,5 +43,8 @@ class IncorrectUTXOAmount(FatalPSBTIssue): class AbortInteraction(BaseException): pass +# Useful text to show user when we can't handle a QR +class QRDecodeExplained(ValueError): + pass # EOF diff --git a/shared/lcd_display.py b/shared/lcd_display.py index 3198353c..d7750dd5 100644 --- a/shared/lcd_display.py +++ b/shared/lcd_display.py @@ -597,13 +597,22 @@ class Display: # TODO: pass a "max_brightness" param here, which would be cleared after next show self.show() - def draw_bbqr_progress(self, new, gotem, num_parts, label): + def draw_bbqr_progress(self, hdr, got_parts, has_runt=False, corrupt=False): # we've seen at least one BBQr QR, so update display w/ progress bar # - lots of data so we can show nice animation - count = len(gotem) or 1 - percent = int(count * 100.0 / num_parts) - self.text(None, -2, 'Keep scanning more...') - self.text(None, -1, '%s: %d of %d = %d%% ' % (label, count, num_parts, percent), dark=True) + # - hdr:BBQrHeader instance + count = len(got_parts) + (1 if has_runt else 0) + if hdr.num_parts < (CHARS_W // 4): + pat = [('.' if i not in got_parts else str(i)) for i in range(hdr.num_parts)] + pat = (' ' if hdr.num_parts < (CHARS_W//2) else ' ').join(pat) + self.text(None, -3, pat) + + self.text(None, -2, 'Keep scanning more...' if count < hdr.num_parts else 'Got all parts!') + self.text(None, -1, '%s: %d of %d parts' % (hdr.file_label(), count, hdr.num_parts), + dark=True) + percent = count / hdr.num_parts + self.progress_bar(percent) + self.show() def draw_box(self, x, y, w, h): # using line-drawing chars, draw a box diff --git a/shared/manifest_q1.py b/shared/manifest_q1.py index 838cb91b..42a54ed3 100644 --- a/shared/manifest_q1.py +++ b/shared/manifest_q1.py @@ -6,6 +6,7 @@ freeze_as_mpy('', [ 'keyboard.py', 'scanner.py', 'bbqr.py', + 'decoders.py', 'lcd_display.py', 'st7788.py', 'gpu.py', diff --git a/shared/scanner.py b/shared/scanner.py index 4fc4dfd9..701a9683 100644 --- a/shared/scanner.py +++ b/shared/scanner.py @@ -51,7 +51,10 @@ def unwrap(packed): OKAY = b'Z\x01\x00\x02\x90\x00\x93\xa5' RAW_OKAY = b'\x90\x00' LEN_OKAY = const(8) - + +# possible baud rates; unfortunately 115,200 doesn't appear to work +SLOW_BAUD = const(9600) +FAST_BAUD = const(57600) # TODO: constructor should leave it in reset for simple lower-power usage; then after # login we can do full setup (2+ seconds) and then sleep again until needed. @@ -79,7 +82,7 @@ class QRScanner: def hardware_setup(self): # setup hardware, reset scanner and return time to delay until ready from machine import UART, Pin - self.serial = UART(2, 9600) + self.serial = UART(2, SLOW_BAUD) self.reset = Pin('QR_RESET', Pin.OUT_OD, value=0) self.trigger = Pin('QR_TRIG', Pin.OUT_OD, value=1) # wasn't needed @@ -91,6 +94,10 @@ class QRScanner: # needs full 2 seconds of recovery time return 2 + def set_baud(self, br): + # change serial port baud rate + self.serial.init(br) + async def setup_task(self, start_delay): # Task to setup device, and then die. await asyncio.sleep(start_delay) @@ -99,18 +106,30 @@ class QRScanner: # get b'V2.3.0.7\r\n' or similar # might need to repeat a few time to get into right state - for retry in range(3): - try: - rx = await self.txrx('T_OUT_CVER') - await self.txrx('S_CMD_FFFF') # factory reset of settings - self.version = rx.decode().strip() - break - except: - pass + for baud in [FAST_BAUD, SLOW_BAUD]: + self.set_baud(baud) + + for retry in range(3): + try: + rx = await self.txrx('T_OUT_CVER', timeout=50) + await self.txrx('S_CMD_FFFF') # factory reset of settings + self.version = rx.decode().strip() + break + except Exception as exc: + #print('fail @ %d: %s' % (baud, exc)) + pass + else: + continue + break else: print("QR Scanner: missing") return + # go to high speed! + if baud != FAST_BAUD: + await self.txrx('S_CMD_H3BR%d' % FAST_BAUD) + self.set_baud(FAST_BAUD) + # configure it like we want it await self.txrx('S_CMD_MTRS5000') # 5s to read before fail (unused) await self.txrx('S_CMD_MT11') # trigger is edge-based (not level) @@ -121,14 +140,15 @@ class QRScanner: await self.txrx('S_CMD_03L0') # light off all the time by default # ?? - await self.txrx('S_CMD_MSRI0000') # Modify the same code reading delay: 0ms # settings under continuous scan mode await self.txrx('S_CMD_MARS0000') # "Modify the duration of single code reading" (ms) await self.txrx('S_CMD_MARR000') # "Modify the time of the reading interval 0ms" await self.txrx('S_CMD_MA30') # "Same code reading without delay" await self.txrx('S_CMD_MARI0000') # "Modify the same code reading delay 0ms" - await self.txrx('S_CMD_MS30') # "Duplicate detection-off" + + await self.txrx('S_CMD_MS31') # "Same code reading delay" + await self.txrx('S_CMD_MSRI0100') # Modify the same code reading delay: 100ms # these aren't useful (yet?) and just make things harder to decode. #await self.txrx('S_CMD_05F1') # add all information on @@ -139,9 +159,10 @@ class QRScanner: #await self.txrx('S_CMD_0506') # suffix #await self.txrx('S_CMD_05D0') # tx total data + # prevent scanning magic QR to affect settings + await self.txrx('S_CMD_0000') # close setting codes self.setup_done = True - print("QR scanner setup done.") await self.goto_sleep() @@ -194,6 +215,7 @@ class QRScanner: return rv + async def _readline(self): # overridden in simulator # - blocks for QR to be seen @@ -239,9 +261,8 @@ class QRScanner: # - by sending these without binary wrapper, we get back a shorter reply # - just RAW_OKAY # - which we can easily filter out of any QR data we get back at the same time - #self.stream.write(msg) - #await self.stream.drain() - print('tx >> ' + msg) + # - do not use async self.stream because other tasks may be using it + #print('tx >> ' + msg) self.serial.write(msg) async def txrx(self, msg, timeout=250): @@ -256,7 +277,7 @@ class QRScanner: await self.flush_junk() # Send the command - print('txrx >> ' + msg) + #print('txrx >> ' + msg) self.stream.write(wrap(msg)) await self.stream.drain() @@ -271,7 +292,7 @@ class QRScanner: continue raise RuntimeError("no rx after %s" % msg) - print('txrx << ' + B2A(rx)) + #print('txrx << ' + B2A(rx)) if rx == OKAY: # good path @@ -287,15 +308,15 @@ class QRScanner: pos = rx.rindex(b'\n') rx = rx[pos+1:] - if RAW_OKAY in rx: + while rx.startswith(RAW_OKAY): # earlier bare commands' ACK's, remove them - rx = rx.replace(RAW_OKAY, b'') + rx = rx[2:] mlen = unwrap_hdr(rx) if mlen < 0: # framing issue, must be part way thru a QR - print('Framing prob (cmd=%s): %s=%s' % (msg, rx, B2A(rx))) + #print('Framing prob (cmd=%s): %s=%s' % (msg, rx, B2A(rx))) rx = b'' expect = LEN_OKAY continue @@ -311,9 +332,10 @@ class QRScanner: raise RuntimeError("extra at end") return body except Exception as exc: - print("Bad Rx: %s=%r" % (B2A(rx), rx)) - print(" exc: %s" % exc) - raise + #print("Bad Rx: %s=%r" % (B2A(rx), rx)) + #print(" exc: %s" % exc) + # this generally does not happen with all above complexity + raise RuntimeError("bad frame after %s" % msg) def torch_control_sync(self, on): # sync wrapper @@ -324,7 +346,6 @@ class QRScanner: # - S_CMD_03L1 => always light # - S_CMD_03L2 => when needed # - S_CMD_03L0 => no - print("torch=%d" % on) if not self.version: return diff --git a/shared/ux_q1.py b/shared/ux_q1.py index 00752e7d..8e49aeed 100644 --- a/shared/ux_q1.py +++ b/shared/ux_q1.py @@ -2,14 +2,15 @@ # # ux_q1.py - UX/UI interactions that are Q1 specific and use big screen, keyboard. # +import utime, gc, ngu, sys import uasyncio as asyncio from uasyncio import sleep_ms -import utime, gc, ngu from charcodes import * from lcd_display import CHARS_W, CHARS_H, CursorSpec, CURSOR_SOLID, CURSOR_OUTLINE, CURSOR_DW_SOLID -from exceptions import AbortInteraction +from exceptions import AbortInteraction, QRDecodeExplained from queues import QueueEmpty import bip39 +from decoders import decode_qr_result class PressRelease: def __init__(self, need_release=KEY_SELECT+KEY_CANCEL): @@ -623,7 +624,7 @@ class QRScannerInteraction: print("Scanned: %r" % data) break - # wait for key or 250ms delay + # wait for key or 250ms animation delay try: ch = await asyncio.wait_for_ms(ux_wait_keyup(), 250) except asyncio.TimeoutError: @@ -641,56 +642,6 @@ class QRScannerInteraction: return data - @staticmethod - def decode_secret(got): - # Decode a few different ways to store a master secret, or raise - # - xprv / tprv - # - words (either full or prefixes, case insensitive) - # - SeedQR (github.com/SeedSigner/seedsigner/blob/dev/docs/seed_qr/README.md) - - taste = got.strip().lower() - - # remove bitcoin: if present - if ':' in taste: - _, taste = taste.split(':', 1) - - if taste[1:4] == 'prv': - # xprv or tprv: private key import for sure - # - verify checksum is right - try: - ngu.codecs.b58_decode(taste) - except: - raise ValueError('corrupt xprv?') - - return 'xprv', taste - - if taste.isdigit(): - # SeedQR: 4 digit groups of index into word list - parts = [taste[pos:pos+4] for pos in range(0, len(taste), 4)] - try: - assert len(parts) in (12, 18, 24) - words = [bip39.wordlist_en[int(n)] for n in parts] - except: - raise ValueError('corrupt SeedQR?') - return 'words', words - - words = taste.decode().split(' ') - if len(words) in [ 12, 18, 24]: - # looks like bip-39 words, decode and re-expand - idx = [bip39.get_word_index(w) for w in words] - return 'words', [bip39.wordlist_en[n] for n in idx] - - raise ValueError('no idea') - - async def secret_import(self, mode, value): - # Import a private key: xprv or seed words - # - plausible value has been received - # - import as tmp or master - print("Secret: %s %r" % (mode, value)) - if mode == 'xprv': - pass - elif mode == 'words': - pass async def scan_anything(self, expect_secret=False): # start a QR scan, and act on what we find, whatever it may be. @@ -703,85 +654,57 @@ class QRScannerInteraction: if got is None: return - if hasattr(got, 'finalize'): - # BBQr object - ty, final_size, got = got.finalize() - if ty == 'P': - print("Got a PSBT file: %d bytes" % final_size) - await qr_psbt_sign(None, final_size, got) - return - elif ty == 'T': - problem = "No good use for transaction." - continue - elif ty == 'C': - problem = "Sorry, Q has no use CBOR" - continue - elif ty == 'J': - problem = "Sorry, Q has no use JSON yet" - continue - elif ty == 'U': - # continue text thru code below - pass - else: - problem = "Sorry, dont know filetype: " + ty - continue - + # Figure out what we got. try: - # can we decode a master secret of some type? - mode, value = self.decode_secret(got) - - return await self.secret_import(mode, value) - except BaseException as exc: - if expect_secret: - problem = str(exc) - continue - - # might be a PSBT? - if len(got) > 100: - from auth import psbt_encoding_taster - try: - decoder, _, psbt_len = psbt_encoding_taster(got[0:10].encode(), len(got)) - print("Got a PSBT file") - await qr_psbt_sign(decoder, psbt_len, got) - return - except ValueError: - pass - - # Might be an address or pubkey or psbt? But not binary - - # remove URL protocol: if present - scheme = None - if ':' in got: - scheme, got = got.split(':', 1) - - # old school - try: - raw = ngu.codecs.b58_decode(got) - print("Valid base58") - - # it's valid base58 - if got[1:4] == 'pub': - # xpub ... why tho? - return got - else: - # an address, P2PKH - return got + what, vals = decode_qr_result(got, expect_secret=expect_secret) + except QRDecodeExplained as exc: + problem = str(exc) + continue except: - pass + problem = "Unable to decode QR" + continue - # new school: bech32 or bech32m - try: - hrp, version, data = ngu.codecs.segwit_decode(got) - print("Valid bech32") - return got - except: - pass + # Limitation: assuming ephemeral import here + if what == 'xprv': + from actions import import_extended_key_as_secret + text_xprv, = vals + await import_extended_key_as_secret(text_xprv, True) + return - problem = 'Sorry, can not make use of that!' + if what == 'words': + from seed import ephemeral_seed_import_done_cb # dirty API + words, = vals + await ephemeral_seed_import_done_cb(words) + return + + if what == 'psbt': + decoder, psbt_len, got = vals + await qr_psbt_sign(decoder, psbt_len, got) + return + + if what == 'txn': + bin_txn, = vals + await ux_visualize_txn(bin_txn) + return + + if what == 'addr': + proto, addr, args = vals + await ux_visualize_bip21(proto, addr, args) + return + + if what == 'text' or what == 'xpub': + # we couldn't really decode it. + txt, = vals + await ux_visualize_textqr(txt) + return + + + # not reached? + problem = 'Unhandled: ' + what async def qr_psbt_sign(decoder, psbt_len, raw): - # Got a PSBT coming in from QR scanner. Assume single QR for now. + # Got a PSBT coming in from QR scanner. Sign it. # - similar to auth.sign_psbt_file() from auth import UserAuthorizedAction, ApproveTransaction from utils import CapsHexWriter @@ -830,7 +753,7 @@ async def qr_psbt_sign(decoder, psbt_len, raw): if data_len >= MAX_V40_SIZE: # too big for single version 40 QR - await ux_show_story("Resulting txn is too big for QR code") + await ux_show_story("Resulting txn is too big for single QR code.") return await show_qr_code(here, is_alnum=True, msg=(txid or 'Partly Signed PSBT')) @@ -839,4 +762,79 @@ async def qr_psbt_sign(decoder, psbt_len, raw): UserAuthorizedAction.active_request = ApproveTransaction(psbt_len, approved_cb=done) the_ux.push(UserAuthorizedAction.active_request) +async def ux_visualize_txn(bin_txn): + # Show the user a signed transaction on-screen. + # - longer-term we may offer more data about address ownership, etc + # - be careful not to claim things we cannot prove w/o UTXO from confirmed blocks + # - .. like the fee, which would be useful + from ux import ux_show_story + from io import BytesIO + from psbt import calc_txid + from ubinascii import hexlify as b2a_hex + from serializations import CTransaction + + txn = CTransaction() + + try: + txn.deserialize(BytesIO(bin_txn)) + + if (n := len(txn.vin)) == 1: + msg = '1 input, ' + else: + msg = '%d inputs, ' % n + + if (n := len(txn.vout)) == 1: + msg += '1 output' + else: + msg += '%d outputs' % n + + # add txid + txid = calc_txid(BytesIO(bin_txn), (0, len(bin_txn))) + msg += '\n\nTxid:\n' + b2a_hex(txid).decode() + + except Exception as exc: + sys.print_exception(exc) + msg = "Unable to deserialize" + + await ux_show_story(msg, title="Signed Transaction") + + +async def ux_visualize_bip21(proto, addr, args): + # Show details of BIP-21 URL + # - imho, a bare address is a valid BIP-21 URL so we come here too + # - TODO: validate address ownership + from ux import ux_show_story + + msg = addr + '\n\n' + args = args or {} + + if 'amount' in args: + msg += 'Amount: ' + try: + amt = args.pop('amount') + whole, frac = amt.split('.', 1) + frac = int(frac) if frac else 0 + whole = int(whole) if whole else 0 + msg += '%d.%08d BTC\n' % (whole, frac) + except: + msg += '(corrupt)\n' + + for fn in ['label', 'message', 'lightning']: + if fn in args: + val = args.pop(fn) # XXX needs url-decoding + msg += '%s%s: %s\n' % (fn[0].upper(), fn[1:], val) + + if args: + msg += 'And values for: ' + ', '.join(args) + + await ux_show_story(msg, title="Payment Address") + +async def ux_visualize_textqr(txt): + from ux import ux_show_story + if len(txt) > 100: + txt = txt[0:100] + '...' + + await ux_show_story("%s\n\nAbove is text that was scanned. " + "We can't do any more with it." % txt, title="Simple Text") + # EOF diff --git a/testing/data/bip21.txt b/testing/data/bip21.txt new file mode 100644 index 00000000..c5777ba0 --- /dev/null +++ b/testing/data/bip21.txt @@ -0,0 +1,7 @@ +bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R +bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?label=Luke-Jr +bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?amount=20.3&label=Luke-Jr +bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?amount=50&label=Luke-Jr&message=Donation%20for%20project%20xyz +bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?req-somethingyoudontunderstand=50&req-somethingelseyoudontget=999 +bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?somethingyoudontunderstand=50&somethingelseyoudontget=999 + diff --git a/testing/data/devils-txn.txn b/testing/data/devils-txn.txn new file mode 100644 index 00000000..372ee4af --- /dev/null +++ b/testing/data/devils-txn.txn @@ -0,0 +1 @@ +02000000000101dce550be0d74cfcbb042d6285ad78c85ce7d368ba5c93eb55cac1f56335d89010000000000fdffffff02ccb4eb0b00000000160014bed59ae63ed9a6a5630c2cb26cc967a23df9b87700a3e11100000000160014950fc51c23131864563589694dd913b9e310ab69040047304402200cb46756eeda1ba57fb8e260cc711ae5a0f31ff68c5abf824b551c22f2c77475022056b83be7cfd7bab9ccdb832b416e1e63d39e552bfef97f1e07e4b97e191dc67101473044022024dd75bf639765c6d530716da4167c22effc9811742a34a4ce710153327a24e10220513e4f61875d5559673090ece3f7fa2c9cda62986d84c88bc1ab087b7bd02c850147522102f9c33362e7c4d9d21e9145e1478a36f341f2f0cfe7055abe92380bb806d9ce7821035d92f72e755b3866b034f16b637cb6f3c24d8d4924664df8cc028a0798e75a3252ae00000000 \ No newline at end of file diff --git a/testing/data/overlapped-qr.txt b/testing/data/overlapped-qr.txt new file mode 100644 index 00000000..12f216ad --- /dev/null +++ b/testing/data/overlapped-qr.txt @@ -0,0 +1,4 @@ +B$ZP0201DXW74HWMBTNKHH3H5X2Y66JCVKFAJT4HDPG5NJWYLH7GPHOXH7MSNKSNVLGBQVYW3DFZH6XTS5P4MVJBCP7R367KBYJO3U6FZ2P56NTNJE2D65EJJULOTE36V7PLA377Y5OPCRFFGZATV3TZF72R7UKVEQ5NXPDNLDVR2LNDWQZSEL7ATFAZ2S67XN6OM3JHTZ6MLRRWH7WZMZWN53CFRLAEQ3WP3OHIQG3GLIJO2LWUPFNLF2L2ZNJA3BK7ME7T4T2VO7VAMPMYVMYZPD4FT425EGLNAJSQ42X6HIGLDEVUL4SW7TYCZ6LRQVODO7RX3QQI3FB2IWYOGZ4JB77YWFV636KOV4TZJD7RAYPZZEEHNCH5C7SCVK2CJ3TJNLV4KQRMWUSUM7NV5P5RRT65TUNT7QDWENOW7KWXX5G2FJBAAZIIB5EC2EP7AV4AQAA +B$ZP0201DXW74HWMBTNKHH3H5X2Y66JCVKFAJT4HDPG5NJWYLH7GPHOXH7MSNKSNVG3VHHEP3KQKUOE76JY25P3733AYM6ZWPYW3DFZH6XTS5P4MVJBCP7R367KBYJO3U6FZ2P56NTNJEB$ZP0201DXW74HWMBTNKHH3H5X2Y66JCVKFAJT4HDPG5NJWYLH7GPHOXH7MSNKSNVLGBQVMF76OXGB76OU7QWCP7R367KBYJO3U6FZ2P56NTNJE2D65EJJULOTE36V7PLA377Y5OPCRFFGZATV3TZF72R7UKVEQ5NXPDNLDVR2LNDWQZSEL7ATFAZ2S77XN6OM3JHTZ6MLRRWH7WZMZWN53CFRLAEQ3WP3OHIQG3GLIJO2LWUPFNLF2L2ZNJA3BK7ME7T4T2VO7VAMPMYVMYZPD4FT425EGLNAJSQ42IGLDEVUL4SW7TYCZ6LRQVODO7RX3QQI3FB2IWYOGZ4JB77YWFV636KOV4TZJD7RAYPZZEEHNCH5C7SCVK2CJ3TJNLV4KQRMWUM2E7NV5P5RRT65TUNT7QDWENOW7KWXX5G2FJBAAZIIB5EC2EP7AV4AQAA +B$ZP0200FMUE4KXZZ7EPBV47TGEYDAMBTGLVX5HN6LCKTCIRFGXILJ4N2IR4LTTJ3ZZTXKTVLPML4YMCKZG7KDIGCD4A6BH6IUNQUVONWKVZM74YT6Y6ZA6O5PQORPG5VV7OTMYXW44TKNGVN55ASV2Y722VJMJREFVXFRE46XTCYDBBNKN74LOU7BJ3WLPGTWJ655WGHU54LYIKIWJX5XYEOMSMWVUKLTSMY6VCU7W3F47PV6CSNIP37W4Z5GGG5XUFRA5M4K4P5HFTGLZUEMS72SK6S3WZ2T7U7L37ZUDVIFO63IE4DGQ3ZJ7IJ3N3EMPHN2WJQRLJLMBYKQKSPCRSXDNZUMXLXET2UPFX3FQSPSCYOE4QIVMFV2EWHMOQQZ4EDD27A6XO34UP6IUGNXXEROVOEHUTORI5Z65FTP5F4MI737KTK6K5ON5YUN5KBI44XRP7H3TVMPNJ5567VRH43GBMFQ5M7EGKQKR2YUSDLQWCU3HNR5DSO6O7GK7BY5L7HD2YUD4VN2DNZA3EMF65FBR75DTTDVUYKXP2Q6RVAVKUP5MPW4UBR3C43ERJWNHLEHUZYN7C272NAGNF7ZBNF46LNXHQXNPLRIRIJP6JPS6BQ7J7UUKVMQ3SMHJY6MQO3AMLWTTY4GCOX3ODZKKEPFKBKRQXUST6WXW67O4H63OU36LWUV34ZZ6E3CKHKXQVG7DGS2QPDVCKY2JCQRNWO7IWP53J6BOUHM7F7KH3257M +B$ZP0201DXW74HWMBTNKHH3H5X2Y66JCVKFAJT4HDPG5NJWYLH7GPHOXH7MSNKSNVLGBQVMF76OXGB76OU7QW6LY7SGDB4ZMQG3VHHEP3KQKUOE76JY25P3733AYM6ZWPYW3DFZH6XTS5P4MVJBCP7R367KBYJO3U6FZ2P56NTNJE2D65EJJULOTE36V7PLA377Y5OPCRFFGZATV3TZF72R7UKVEQ5NXPDNLDVR2LNDWQZSEL7ATFAZ2S7ZUBP4AUUMRSQUZIDKNU63AF3B7W7IMLCASPCHYB4CIYDAPX6ZMEDHGKY2V67XN6OM3JHTZ6MLRRWH7WZMZWN53CFRLAEQ3WP3OHIQG3GLIJO2LWUPFNLF2L2ZNJA3BK7ME7T4T2VO7VAMPMYVMYZPD4FT425EGLNAJSQ42XJFXFZQ4PP2HXR7EFFP73P7Q7W73IZC3EDGCCTBXSUTFR53NTK5FWWP7QCAW7R3Z6HIGLDEVUL4SW7TYCZ6LRQVODO7RX3QQI3FB2IWYOGZ4JB77YWFV636KOV4TZJD7RAYPZZEEHNCH5C7SCVK2CJ3TJNLV4KQRMWUSUMLPOI7F75MO7XQLNWOLN5YTCX6RQA5YICN3SVAEHGEYQQHW7Y6U7PSPBPXS4DPM2E7NV5P5RRT65TUNT7QDWENOW7KWXX5G2FJBAAZIIB5EC2EP7AV4AQAA diff --git a/testing/test_decoders.py b/testing/test_decoders.py new file mode 100644 index 00000000..a7e1d73d --- /dev/null +++ b/testing/test_decoders.py @@ -0,0 +1,131 @@ +# (c) Copyright 2023 by Coinkite Inc. This file is covered by license found in COPYING-CC. +# +# test decoders.py code (unit test) +# +# +import pytest +from binascii import a2b_hex, b2a_hex +from base64 import b64encode +from urllib.parse import urlparse, parse_qs +from helpers import prandom + +from mnemonic import Mnemonic +wordlist = Mnemonic('english').wordlist + +@pytest.fixture +def try_decode(sim_exec): + def doit(arg): + cmd = "from decoders import decode_qr_result; " + \ + f"RV.write(repr(decode_qr_result({arg!r})))" + + result = sim_exec(cmd) + + if 'Traceback' in result: + raise RuntimeError(result) + + if '<' in result: + # objects, like "', "'") + + return eval(result) + return doit + +@pytest.mark.parametrize('fname,expect', [ + ( 'data/p2pkh+p2sh+outs.psbt', 'psbt'), + ( 'data/snight-example.psbt', 'psbt'), + ( 'data/devils-txn.txn', 'txn'), +]) +@pytest.mark.parametrize('encoding', ['hex', 'b64']) +def test_detector_bin(fname, expect, encoding, try_decode): + + # NOTE: input files must be hex to start + arg = a2b_hex(open(fname, 'rt').read().strip()) + + if encoding == 'hex': + arg = b2a_hex(arg).decode() + elif encoding == 'b64': + arg = b64encode(arg).decode() + else: + raise ValueError + + ft, vals = try_decode(arg) + assert ft == expect + + +@pytest.mark.parametrize('url', [ +'bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R', +'bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?label=Luke-Jr', +'bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?amount=20.3&label=Luke-Jr', +'bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?amount=50&label=Luke-Jr&message=Donation%20for%20project%20xyz', +'bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?req-somethingyoudontunderstand=50&req-somethingelseyoudontget=999', +'bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?somethingyoudontunderstand=50&somethingelseyoudontget=999', +]) +@pytest.mark.parametrize('bip21', range(2)) +@pytest.mark.parametrize('addr_fmt', range(2)) +def test_detector_url(url, bip21, addr_fmt, try_decode): + a1, a2 = ('mtHSVByP9EYZmB26jASDdPVm19gvpecb5R', + 'BCRT1QUPYD58NDSH7LUT0ET0VTRQ432JVU9JTDX8FGYV') + + if not bip21: + _, url = url.split(':', 1) + if addr_fmt: + url = url.replace(a1, a2) + expect_addr = a2 + else: + expect_addr = a1 + + ft, vals = try_decode(url) + assert ft == 'addr' + proto, addr, args = vals + assert addr == expect_addr + assert proto == ('bitcoin' if bip21 else None) + + p = urlparse(url) + assert p.path == addr + + xargs = parse_qs(p.query) + if args: + assert xargs.keys() == args.keys() + else: + assert not xargs + + +@pytest.mark.parametrize('num_words', [12, 18, 24]) +@pytest.mark.parametrize('encoding', ['short', 'long', 'seed_qr']) +@pytest.mark.parametrize('case', range(2)) +def test_detector_secrets(num_words, encoding, case, try_decode): + + n = [(i*179)% 2048 for i in range(num_words)] + + words = [wordlist[i] for i in n] + expect = list(words) + + if encoding == 'seed_qr': + if case: return + qr = ''.join('%04d'%i for i in n) + else: + + if encoding == 'short': + words = [w[0:4] for w in words] + if case: + words = [w.upper() for w in words] + + qr = ' '.join(words) + + ft, vals = try_decode(qr) + assert ft == 'words' + got_words, = vals + assert got_words == expect + +@pytest.mark.parametrize('code', [ + 'xprv9s21ZrQH143K2LBWUUQRFXhucrQqBpKdRRxNVq2zBqsx8HVqFk2uYo8kmbaLLHRdqtQpUm98uKfu3vca1LqdGhUtyoFnCNkfmXRyPXLjbKb', + 'xpub69H7F5d8KSRgmmdJg2KhpAK8SR3DjMwAdkxj3ZuxV27CprR9LgpeyGmXUbC6wb7ERfvrnKZjXoUmmDznezpbZb7ap6r1D3tgFxHmwMkQTPH', +]) +def test_detector_xp(code, try_decode): + + ft, vals = try_decode(code) + + assert ft == code[0:4] + assert vals[0] == code + +# EOF diff --git a/unix/variant/sim_scanner.py b/unix/variant/sim_scanner.py index 9f4b32f6..24456618 100644 --- a/unix/variant/sim_scanner.py +++ b/unix/variant/sim_scanner.py @@ -76,6 +76,18 @@ class AttachedQRScanner(QRScanner): return 0 + def set_baud(self, br=None): + # change serial port baud rate + import termios + attr = termios.tcgetattr(self.serial.fileno()) + # [4][5] are the baud rate + was = int(attr[4]) + attr[4] = br # assuming termios.B9600 = 9600 etc + attr[5] = br + if br is not None: + termios.tcsetattr(self.serial.fileno(), 0, attr) + return was + async def flush_junk(self): # I am in lack of .any() member on my serial port while 1: diff --git a/unix/work/.gitignore b/unix/work/.gitignore index d05a9bf9..54d67f3c 100644 --- a/unix/work/.gitignore +++ b/unix/work/.gitignore @@ -1,3 +1,4 @@ # this simulator, or test cases produce these files. nfc-dump.ndef readback.psbt +qrdata.txt