autodetect qr contents
This commit is contained in:
parent
074065b190
commit
99b8481f4d
@ -44,8 +44,8 @@ class BBQrHeader:
|
||||
assert 0 <= self.which < self.num_parts
|
||||
|
||||
def __repr__(self):
|
||||
return '<BBQr: %d of %d parts, enc=%s ft=%s>' % (self.which, self.num_parts,
|
||||
self.encoding, self.file_type)
|
||||
return '<BBQr: %dof%d parts, enc=%s ft=%s>' % (self.which+1, self.num_parts,
|
||||
self.encoding, self.file_type)
|
||||
|
||||
def is_compat(self, other):
|
||||
# Does this header match previous ones seen?
|
||||
@ -99,7 +99,7 @@ class BBQrState:
|
||||
|
||||
hdr = BBQrHeader(scan)
|
||||
|
||||
print("Got " + repr(hdr))
|
||||
print("Got %r have %r" % (hdr, self.parts))
|
||||
|
||||
if not self.hdr or not self.hdr.is_compat(hdr):
|
||||
# New or incompatible header, they might have changed their
|
||||
@ -111,7 +111,13 @@ class BBQrState:
|
||||
# we've NOT YET seen this one
|
||||
|
||||
# convert back to binary
|
||||
raw = hdr.decode_body(scan)
|
||||
try:
|
||||
raw = hdr.decode_body(scan)
|
||||
except:
|
||||
# can happen if QR got corrupted between scanner and us (overlap)
|
||||
# or back BBQr implementation
|
||||
dis.draw_bbqr_progress(hdr, self.parts, bool(self.runt), corrupt=True)
|
||||
return True
|
||||
|
||||
if hdr.which and (hdr.which == hdr.num_parts-1) and not self.parts:
|
||||
# Problem: this is a runt and we saw it first, we have no idea
|
||||
@ -134,8 +140,8 @@ class BBQrState:
|
||||
self.parts.add(wh)
|
||||
self.runt = None
|
||||
|
||||
# provide UX
|
||||
dis.draw_bbqr_progress(hdr.which, list(self.parts), hdr.num_parts, hdr.file_label())
|
||||
# provide UX -- even if we didn't use it
|
||||
dis.draw_bbqr_progress(hdr, self.parts, bool(self.runt))
|
||||
|
||||
# do we need more still?
|
||||
return (len(self.parts) < hdr.num_parts)
|
||||
@ -172,7 +178,10 @@ class BBQrState:
|
||||
|
||||
if self.hdr.encoding == 'Z':
|
||||
# do in-place Zlib decompression (TODO)
|
||||
raw = uzlib.decompress(raw, -10)
|
||||
try:
|
||||
raw = uzlib.decompress(raw, -10)
|
||||
except:
|
||||
raise RuntimeError("Zlib fail")
|
||||
final_size = len(raw)
|
||||
|
||||
return self.hdr.file_type, final_size, raw
|
||||
|
||||
190
shared/decoders.py
Normal file
190
shared/decoders.py
Normal file
@ -0,0 +1,190 @@
|
||||
# (c) Copyright 2023 by Coinkite Inc. This file is covered by license found in COPYING-CC.
|
||||
#
|
||||
# decoders.py - Convert QR (or text) values into useful bitcoin-related objects.
|
||||
#
|
||||
import uasyncio as asyncio
|
||||
import ngu, bip39
|
||||
from ubinascii import unhexlify as a2b_hex
|
||||
from exceptions import QRDecodeExplained
|
||||
|
||||
def txn_decoding_taster(txt):
|
||||
# look at first 4 bytes, and assume it's txn version number (LE32 0x1 or 0x2), then decode
|
||||
# - working in normal RAM, won't handle full sized txn
|
||||
# - will not be binary
|
||||
# - not a very conclusive test, maybe should decode it more here?
|
||||
from ubinascii import a2b_base64
|
||||
|
||||
if txt[0:8] in { '01000000', '02000000'}:
|
||||
# transaction in hex format
|
||||
return a2b_hex(txt)
|
||||
elif txt[0:4] in { 'AQAA', 'AgAA' }:
|
||||
# Base64 encoded
|
||||
return a2b_base64(txt)
|
||||
else:
|
||||
raise ValueError("not txn")
|
||||
|
||||
|
||||
def decode_secret(got):
|
||||
# Decode a few different ways to store a master secret (in a QR), or raise
|
||||
# - xprv / tprv
|
||||
# - words (either full or prefixes, case insensitive)
|
||||
# - SeedQR (github.com/SeedSigner/seedsigner/blob/dev/docs/seed_qr/README.md)
|
||||
|
||||
# remove bitcoin: if present (unlikely)
|
||||
if ':' in got:
|
||||
_, got = got.split(':', 1)
|
||||
|
||||
if got[1:4] == 'prv':
|
||||
# xprv or tprv: private key import for sure
|
||||
# - verify checksum is right
|
||||
try:
|
||||
raw = ngu.codecs.b58_decode(got)
|
||||
except:
|
||||
raise ValueError('corrupt xprv?')
|
||||
|
||||
return 'xprv', got
|
||||
|
||||
taste = got.strip().lower()
|
||||
|
||||
if taste.isdigit():
|
||||
# SeedQR: 4 digit groups of index into word list
|
||||
parts = [taste[pos:pos+4] for pos in range(0, len(taste), 4)]
|
||||
try:
|
||||
assert len(parts) in (12, 18, 24)
|
||||
words = [bip39.wordlist_en[int(n)] for n in parts]
|
||||
except:
|
||||
raise ValueError('corrupt SeedQR?')
|
||||
return 'words', words
|
||||
|
||||
words = taste.strip().split(' ')
|
||||
if len(words) in [ 12, 18, 24]:
|
||||
# looks like bip-39 words, decode and re-expand
|
||||
idx = [bip39.get_word_index(w) for w in words]
|
||||
return 'words', [bip39.wordlist_en[n] for n in idx]
|
||||
|
||||
raise ValueError('no idea')
|
||||
|
||||
def decode_qr_result(got, expect_secret=False):
|
||||
# Could be BBQr or text
|
||||
|
||||
if hasattr(got, 'finalize'):
|
||||
# BBQr object
|
||||
try:
|
||||
ty, final_size, got = got.finalize()
|
||||
except BaseException as exc:
|
||||
raise QRDecodeExplained("BBQr decode failed: " + str(exc))
|
||||
|
||||
if expect_secret and ty in 'PT':
|
||||
raise QRDecodeExplained('Expected secrets not PSBT/TXN')
|
||||
|
||||
if ty == 'P':
|
||||
|
||||
return 'psbt', (None, final_size, got)
|
||||
|
||||
elif ty == 'T':
|
||||
|
||||
return 'txn', (None, final_size, got)
|
||||
|
||||
elif ty == 'C':
|
||||
|
||||
raise QRDecodeExplained("Sorry, CBOR not useful.")
|
||||
|
||||
elif ty == 'J':
|
||||
|
||||
# TODO: maybe multisig, hsm config setup??
|
||||
raise QRDecodeExplained("Sorry, JSON not useful.")
|
||||
|
||||
elif ty == 'U':
|
||||
|
||||
# continue thru code below for TEXT
|
||||
pass
|
||||
|
||||
else:
|
||||
raise QRDecodeExplained("Sorry, unknown file type: " + ty)
|
||||
|
||||
# First can we decode a master secret of some type?
|
||||
|
||||
try:
|
||||
mode, value = decode_secret(got)
|
||||
return mode, (value,)
|
||||
except QRDecodeExplained:
|
||||
raise
|
||||
except BaseException as exc:
|
||||
#import sys; sys.print_exception(exc)
|
||||
if expect_secret:
|
||||
raise QRDecodeExplained("Unable to decode as secret")
|
||||
|
||||
if expect_secret:
|
||||
raise QRDecodeExplained("Not a secret?")
|
||||
|
||||
return decode_qr_text(got)
|
||||
|
||||
def decode_qr_text(got):
|
||||
# Study text received over QR, for useful things.
|
||||
# - case may be "wrong" but some values are case-sensitive (base58)
|
||||
# - not binary, but might be some other encoding than BBQr
|
||||
# - if bad checksum on bitcoin addr, we treat as text... since might be
|
||||
# return: what-it-is, (tuple)
|
||||
orig_got = got
|
||||
|
||||
# might be a PSBT?
|
||||
if len(got) > 100:
|
||||
from auth import psbt_encoding_taster
|
||||
try:
|
||||
decoder, _, psbt_len = psbt_encoding_taster(got[0:10].encode(), len(got))
|
||||
return 'psbt', (decoder, psbt_len, got)
|
||||
except QRDecodeExplained:
|
||||
raise
|
||||
except:
|
||||
pass
|
||||
|
||||
# might be txn, as hex or base64
|
||||
try:
|
||||
return 'txn', (txn_decoding_taster(got), )
|
||||
except:
|
||||
# was something else.
|
||||
pass
|
||||
|
||||
# Might be an address or pubkey?
|
||||
|
||||
# remove URL protocol: if present
|
||||
proto, args, addr = None, None, None
|
||||
if ':' in got:
|
||||
proto, got = got.split(':', 1)
|
||||
|
||||
# looks like BIP-21 payment URL
|
||||
if '?' in got:
|
||||
addr, args = got.split('?', 1)
|
||||
|
||||
# weak URL decode here.
|
||||
args = args.split('&')
|
||||
args = dict(a.split('=', 1) for a in args)
|
||||
|
||||
# assume it's an bare address for now
|
||||
if not addr:
|
||||
addr = got
|
||||
|
||||
# old school
|
||||
try:
|
||||
raw = ngu.codecs.b58_decode(addr)
|
||||
|
||||
# it's valid base58
|
||||
# an address, P2PKH or xpub (xprv checked above)
|
||||
if addr[1:4] == 'pub':
|
||||
return 'xpub', (addr,)
|
||||
|
||||
return 'addr', (proto, addr, args)
|
||||
except:
|
||||
pass
|
||||
|
||||
# new school: bech32 or bech32m
|
||||
try:
|
||||
hrp, version, data = ngu.codecs.segwit_decode(addr)
|
||||
return 'addr', (proto, addr, args)
|
||||
except:
|
||||
pass
|
||||
|
||||
# catch-all ... was text. Can still show on-screen perhaps useful for other applications
|
||||
return 'text', (orig_got,)
|
||||
|
||||
# EOF
|
||||
@ -43,5 +43,8 @@ class IncorrectUTXOAmount(FatalPSBTIssue):
|
||||
class AbortInteraction(BaseException):
|
||||
pass
|
||||
|
||||
# Useful text to show user when we can't handle a QR
|
||||
class QRDecodeExplained(ValueError):
|
||||
pass
|
||||
|
||||
# EOF
|
||||
|
||||
@ -597,13 +597,22 @@ class Display:
|
||||
# TODO: pass a "max_brightness" param here, which would be cleared after next show
|
||||
self.show()
|
||||
|
||||
def draw_bbqr_progress(self, new, gotem, num_parts, label):
|
||||
def draw_bbqr_progress(self, hdr, got_parts, has_runt=False, corrupt=False):
|
||||
# we've seen at least one BBQr QR, so update display w/ progress bar
|
||||
# - lots of data so we can show nice animation
|
||||
count = len(gotem) or 1
|
||||
percent = int(count * 100.0 / num_parts)
|
||||
self.text(None, -2, 'Keep scanning more...')
|
||||
self.text(None, -1, '%s: %d of %d = %d%% ' % (label, count, num_parts, percent), dark=True)
|
||||
# - hdr:BBQrHeader instance
|
||||
count = len(got_parts) + (1 if has_runt else 0)
|
||||
if hdr.num_parts < (CHARS_W // 4):
|
||||
pat = [('.' if i not in got_parts else str(i)) for i in range(hdr.num_parts)]
|
||||
pat = (' ' if hdr.num_parts < (CHARS_W//2) else ' ').join(pat)
|
||||
self.text(None, -3, pat)
|
||||
|
||||
self.text(None, -2, 'Keep scanning more...' if count < hdr.num_parts else 'Got all parts!')
|
||||
self.text(None, -1, '%s: %d of %d parts' % (hdr.file_label(), count, hdr.num_parts),
|
||||
dark=True)
|
||||
percent = count / hdr.num_parts
|
||||
self.progress_bar(percent)
|
||||
self.show()
|
||||
|
||||
def draw_box(self, x, y, w, h):
|
||||
# using line-drawing chars, draw a box
|
||||
|
||||
@ -6,6 +6,7 @@ freeze_as_mpy('', [
|
||||
'keyboard.py',
|
||||
'scanner.py',
|
||||
'bbqr.py',
|
||||
'decoders.py',
|
||||
'lcd_display.py',
|
||||
'st7788.py',
|
||||
'gpu.py',
|
||||
|
||||
@ -51,7 +51,10 @@ def unwrap(packed):
|
||||
OKAY = b'Z\x01\x00\x02\x90\x00\x93\xa5'
|
||||
RAW_OKAY = b'\x90\x00'
|
||||
LEN_OKAY = const(8)
|
||||
|
||||
|
||||
# possible baud rates; unfortunately 115,200 doesn't appear to work
|
||||
SLOW_BAUD = const(9600)
|
||||
FAST_BAUD = const(57600)
|
||||
|
||||
# TODO: constructor should leave it in reset for simple lower-power usage; then after
|
||||
# login we can do full setup (2+ seconds) and then sleep again until needed.
|
||||
@ -79,7 +82,7 @@ class QRScanner:
|
||||
def hardware_setup(self):
|
||||
# setup hardware, reset scanner and return time to delay until ready
|
||||
from machine import UART, Pin
|
||||
self.serial = UART(2, 9600)
|
||||
self.serial = UART(2, SLOW_BAUD)
|
||||
self.reset = Pin('QR_RESET', Pin.OUT_OD, value=0)
|
||||
self.trigger = Pin('QR_TRIG', Pin.OUT_OD, value=1) # wasn't needed
|
||||
|
||||
@ -91,6 +94,10 @@ class QRScanner:
|
||||
# needs full 2 seconds of recovery time
|
||||
return 2
|
||||
|
||||
def set_baud(self, br):
|
||||
# change serial port baud rate
|
||||
self.serial.init(br)
|
||||
|
||||
async def setup_task(self, start_delay):
|
||||
# Task to setup device, and then die.
|
||||
await asyncio.sleep(start_delay)
|
||||
@ -99,18 +106,30 @@ class QRScanner:
|
||||
|
||||
# get b'V2.3.0.7\r\n' or similar
|
||||
# might need to repeat a few time to get into right state
|
||||
for retry in range(3):
|
||||
try:
|
||||
rx = await self.txrx('T_OUT_CVER')
|
||||
await self.txrx('S_CMD_FFFF') # factory reset of settings
|
||||
self.version = rx.decode().strip()
|
||||
break
|
||||
except:
|
||||
pass
|
||||
for baud in [FAST_BAUD, SLOW_BAUD]:
|
||||
self.set_baud(baud)
|
||||
|
||||
for retry in range(3):
|
||||
try:
|
||||
rx = await self.txrx('T_OUT_CVER', timeout=50)
|
||||
await self.txrx('S_CMD_FFFF') # factory reset of settings
|
||||
self.version = rx.decode().strip()
|
||||
break
|
||||
except Exception as exc:
|
||||
#print('fail @ %d: %s' % (baud, exc))
|
||||
pass
|
||||
else:
|
||||
continue
|
||||
break
|
||||
else:
|
||||
print("QR Scanner: missing")
|
||||
return
|
||||
|
||||
# go to high speed!
|
||||
if baud != FAST_BAUD:
|
||||
await self.txrx('S_CMD_H3BR%d' % FAST_BAUD)
|
||||
self.set_baud(FAST_BAUD)
|
||||
|
||||
# configure it like we want it
|
||||
await self.txrx('S_CMD_MTRS5000') # 5s to read before fail (unused)
|
||||
await self.txrx('S_CMD_MT11') # trigger is edge-based (not level)
|
||||
@ -121,14 +140,15 @@ class QRScanner:
|
||||
await self.txrx('S_CMD_03L0') # light off all the time by default
|
||||
|
||||
# ??
|
||||
await self.txrx('S_CMD_MSRI0000') # Modify the same code reading delay: 0ms
|
||||
|
||||
# settings under continuous scan mode
|
||||
await self.txrx('S_CMD_MARS0000') # "Modify the duration of single code reading" (ms)
|
||||
await self.txrx('S_CMD_MARR000') # "Modify the time of the reading interval 0ms"
|
||||
await self.txrx('S_CMD_MA30') # "Same code reading without delay"
|
||||
await self.txrx('S_CMD_MARI0000') # "Modify the same code reading delay 0ms"
|
||||
await self.txrx('S_CMD_MS30') # "Duplicate detection-off"
|
||||
|
||||
await self.txrx('S_CMD_MS31') # "Same code reading delay"
|
||||
await self.txrx('S_CMD_MSRI0100') # Modify the same code reading delay: 100ms
|
||||
|
||||
# these aren't useful (yet?) and just make things harder to decode.
|
||||
#await self.txrx('S_CMD_05F1') # add all information on
|
||||
@ -139,9 +159,10 @@ class QRScanner:
|
||||
#await self.txrx('S_CMD_0506') # suffix
|
||||
#await self.txrx('S_CMD_05D0') # tx total data
|
||||
|
||||
# prevent scanning magic QR to affect settings
|
||||
await self.txrx('S_CMD_0000') # close setting codes
|
||||
|
||||
self.setup_done = True
|
||||
print("QR scanner setup done.")
|
||||
|
||||
await self.goto_sleep()
|
||||
|
||||
@ -194,6 +215,7 @@ class QRScanner:
|
||||
|
||||
return rv
|
||||
|
||||
|
||||
async def _readline(self):
|
||||
# overridden in simulator
|
||||
# - blocks for QR to be seen
|
||||
@ -239,9 +261,8 @@ class QRScanner:
|
||||
# - by sending these without binary wrapper, we get back a shorter reply
|
||||
# - just RAW_OKAY
|
||||
# - which we can easily filter out of any QR data we get back at the same time
|
||||
#self.stream.write(msg)
|
||||
#await self.stream.drain()
|
||||
print('tx >> ' + msg)
|
||||
# - do not use async self.stream because other tasks may be using it
|
||||
#print('tx >> ' + msg)
|
||||
self.serial.write(msg)
|
||||
|
||||
async def txrx(self, msg, timeout=250):
|
||||
@ -256,7 +277,7 @@ class QRScanner:
|
||||
await self.flush_junk()
|
||||
|
||||
# Send the command
|
||||
print('txrx >> ' + msg)
|
||||
#print('txrx >> ' + msg)
|
||||
self.stream.write(wrap(msg))
|
||||
await self.stream.drain()
|
||||
|
||||
@ -271,7 +292,7 @@ class QRScanner:
|
||||
continue
|
||||
raise RuntimeError("no rx after %s" % msg)
|
||||
|
||||
print('txrx << ' + B2A(rx))
|
||||
#print('txrx << ' + B2A(rx))
|
||||
|
||||
if rx == OKAY:
|
||||
# good path
|
||||
@ -287,15 +308,15 @@ class QRScanner:
|
||||
pos = rx.rindex(b'\n')
|
||||
rx = rx[pos+1:]
|
||||
|
||||
if RAW_OKAY in rx:
|
||||
while rx.startswith(RAW_OKAY):
|
||||
# earlier bare commands' ACK's, remove them
|
||||
rx = rx.replace(RAW_OKAY, b'')
|
||||
rx = rx[2:]
|
||||
|
||||
mlen = unwrap_hdr(rx)
|
||||
|
||||
if mlen < 0:
|
||||
# framing issue, must be part way thru a QR
|
||||
print('Framing prob (cmd=%s): %s=%s' % (msg, rx, B2A(rx)))
|
||||
#print('Framing prob (cmd=%s): %s=%s' % (msg, rx, B2A(rx)))
|
||||
rx = b''
|
||||
expect = LEN_OKAY
|
||||
continue
|
||||
@ -311,9 +332,10 @@ class QRScanner:
|
||||
raise RuntimeError("extra at end")
|
||||
return body
|
||||
except Exception as exc:
|
||||
print("Bad Rx: %s=%r" % (B2A(rx), rx))
|
||||
print(" exc: %s" % exc)
|
||||
raise
|
||||
#print("Bad Rx: %s=%r" % (B2A(rx), rx))
|
||||
#print(" exc: %s" % exc)
|
||||
# this generally does not happen with all above complexity
|
||||
raise RuntimeError("bad frame after %s" % msg)
|
||||
|
||||
def torch_control_sync(self, on):
|
||||
# sync wrapper
|
||||
@ -324,7 +346,6 @@ class QRScanner:
|
||||
# - S_CMD_03L1 => always light
|
||||
# - S_CMD_03L2 => when needed
|
||||
# - S_CMD_03L0 => no
|
||||
print("torch=%d" % on)
|
||||
if not self.version:
|
||||
return
|
||||
|
||||
|
||||
250
shared/ux_q1.py
250
shared/ux_q1.py
@ -2,14 +2,15 @@
|
||||
#
|
||||
# ux_q1.py - UX/UI interactions that are Q1 specific and use big screen, keyboard.
|
||||
#
|
||||
import utime, gc, ngu, sys
|
||||
import uasyncio as asyncio
|
||||
from uasyncio import sleep_ms
|
||||
import utime, gc, ngu
|
||||
from charcodes import *
|
||||
from lcd_display import CHARS_W, CHARS_H, CursorSpec, CURSOR_SOLID, CURSOR_OUTLINE, CURSOR_DW_SOLID
|
||||
from exceptions import AbortInteraction
|
||||
from exceptions import AbortInteraction, QRDecodeExplained
|
||||
from queues import QueueEmpty
|
||||
import bip39
|
||||
from decoders import decode_qr_result
|
||||
|
||||
class PressRelease:
|
||||
def __init__(self, need_release=KEY_SELECT+KEY_CANCEL):
|
||||
@ -623,7 +624,7 @@ class QRScannerInteraction:
|
||||
print("Scanned: %r" % data)
|
||||
break
|
||||
|
||||
# wait for key or 250ms delay
|
||||
# wait for key or 250ms animation delay
|
||||
try:
|
||||
ch = await asyncio.wait_for_ms(ux_wait_keyup(), 250)
|
||||
except asyncio.TimeoutError:
|
||||
@ -641,56 +642,6 @@ class QRScannerInteraction:
|
||||
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def decode_secret(got):
|
||||
# Decode a few different ways to store a master secret, or raise
|
||||
# - xprv / tprv
|
||||
# - words (either full or prefixes, case insensitive)
|
||||
# - SeedQR (github.com/SeedSigner/seedsigner/blob/dev/docs/seed_qr/README.md)
|
||||
|
||||
taste = got.strip().lower()
|
||||
|
||||
# remove bitcoin: if present
|
||||
if ':' in taste:
|
||||
_, taste = taste.split(':', 1)
|
||||
|
||||
if taste[1:4] == 'prv':
|
||||
# xprv or tprv: private key import for sure
|
||||
# - verify checksum is right
|
||||
try:
|
||||
ngu.codecs.b58_decode(taste)
|
||||
except:
|
||||
raise ValueError('corrupt xprv?')
|
||||
|
||||
return 'xprv', taste
|
||||
|
||||
if taste.isdigit():
|
||||
# SeedQR: 4 digit groups of index into word list
|
||||
parts = [taste[pos:pos+4] for pos in range(0, len(taste), 4)]
|
||||
try:
|
||||
assert len(parts) in (12, 18, 24)
|
||||
words = [bip39.wordlist_en[int(n)] for n in parts]
|
||||
except:
|
||||
raise ValueError('corrupt SeedQR?')
|
||||
return 'words', words
|
||||
|
||||
words = taste.decode().split(' ')
|
||||
if len(words) in [ 12, 18, 24]:
|
||||
# looks like bip-39 words, decode and re-expand
|
||||
idx = [bip39.get_word_index(w) for w in words]
|
||||
return 'words', [bip39.wordlist_en[n] for n in idx]
|
||||
|
||||
raise ValueError('no idea')
|
||||
|
||||
async def secret_import(self, mode, value):
|
||||
# Import a private key: xprv or seed words
|
||||
# - plausible value has been received
|
||||
# - import as tmp or master
|
||||
print("Secret: %s %r" % (mode, value))
|
||||
if mode == 'xprv':
|
||||
pass
|
||||
elif mode == 'words':
|
||||
pass
|
||||
|
||||
async def scan_anything(self, expect_secret=False):
|
||||
# start a QR scan, and act on what we find, whatever it may be.
|
||||
@ -703,85 +654,57 @@ class QRScannerInteraction:
|
||||
if got is None:
|
||||
return
|
||||
|
||||
if hasattr(got, 'finalize'):
|
||||
# BBQr object
|
||||
ty, final_size, got = got.finalize()
|
||||
if ty == 'P':
|
||||
print("Got a PSBT file: %d bytes" % final_size)
|
||||
await qr_psbt_sign(None, final_size, got)
|
||||
return
|
||||
elif ty == 'T':
|
||||
problem = "No good use for transaction."
|
||||
continue
|
||||
elif ty == 'C':
|
||||
problem = "Sorry, Q has no use CBOR"
|
||||
continue
|
||||
elif ty == 'J':
|
||||
problem = "Sorry, Q has no use JSON yet"
|
||||
continue
|
||||
elif ty == 'U':
|
||||
# continue text thru code below
|
||||
pass
|
||||
else:
|
||||
problem = "Sorry, dont know filetype: " + ty
|
||||
continue
|
||||
|
||||
# Figure out what we got.
|
||||
try:
|
||||
# can we decode a master secret of some type?
|
||||
mode, value = self.decode_secret(got)
|
||||
|
||||
return await self.secret_import(mode, value)
|
||||
except BaseException as exc:
|
||||
if expect_secret:
|
||||
problem = str(exc)
|
||||
continue
|
||||
|
||||
# might be a PSBT?
|
||||
if len(got) > 100:
|
||||
from auth import psbt_encoding_taster
|
||||
try:
|
||||
decoder, _, psbt_len = psbt_encoding_taster(got[0:10].encode(), len(got))
|
||||
print("Got a PSBT file")
|
||||
await qr_psbt_sign(decoder, psbt_len, got)
|
||||
return
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Might be an address or pubkey or psbt? But not binary
|
||||
|
||||
# remove URL protocol: if present
|
||||
scheme = None
|
||||
if ':' in got:
|
||||
scheme, got = got.split(':', 1)
|
||||
|
||||
# old school
|
||||
try:
|
||||
raw = ngu.codecs.b58_decode(got)
|
||||
print("Valid base58")
|
||||
|
||||
# it's valid base58
|
||||
if got[1:4] == 'pub':
|
||||
# xpub ... why tho?
|
||||
return got
|
||||
else:
|
||||
# an address, P2PKH
|
||||
return got
|
||||
what, vals = decode_qr_result(got, expect_secret=expect_secret)
|
||||
except QRDecodeExplained as exc:
|
||||
problem = str(exc)
|
||||
continue
|
||||
except:
|
||||
pass
|
||||
problem = "Unable to decode QR"
|
||||
continue
|
||||
|
||||
# new school: bech32 or bech32m
|
||||
try:
|
||||
hrp, version, data = ngu.codecs.segwit_decode(got)
|
||||
print("Valid bech32")
|
||||
return got
|
||||
except:
|
||||
pass
|
||||
# Limitation: assuming ephemeral import here
|
||||
if what == 'xprv':
|
||||
from actions import import_extended_key_as_secret
|
||||
text_xprv, = vals
|
||||
await import_extended_key_as_secret(text_xprv, True)
|
||||
return
|
||||
|
||||
problem = 'Sorry, can not make use of that!'
|
||||
if what == 'words':
|
||||
from seed import ephemeral_seed_import_done_cb # dirty API
|
||||
words, = vals
|
||||
await ephemeral_seed_import_done_cb(words)
|
||||
return
|
||||
|
||||
if what == 'psbt':
|
||||
decoder, psbt_len, got = vals
|
||||
await qr_psbt_sign(decoder, psbt_len, got)
|
||||
return
|
||||
|
||||
if what == 'txn':
|
||||
bin_txn, = vals
|
||||
await ux_visualize_txn(bin_txn)
|
||||
return
|
||||
|
||||
if what == 'addr':
|
||||
proto, addr, args = vals
|
||||
await ux_visualize_bip21(proto, addr, args)
|
||||
return
|
||||
|
||||
if what == 'text' or what == 'xpub':
|
||||
# we couldn't really decode it.
|
||||
txt, = vals
|
||||
await ux_visualize_textqr(txt)
|
||||
return
|
||||
|
||||
|
||||
# not reached?
|
||||
problem = 'Unhandled: ' + what
|
||||
|
||||
|
||||
async def qr_psbt_sign(decoder, psbt_len, raw):
|
||||
# Got a PSBT coming in from QR scanner. Assume single QR for now.
|
||||
# Got a PSBT coming in from QR scanner. Sign it.
|
||||
# - similar to auth.sign_psbt_file()
|
||||
from auth import UserAuthorizedAction, ApproveTransaction
|
||||
from utils import CapsHexWriter
|
||||
@ -830,7 +753,7 @@ async def qr_psbt_sign(decoder, psbt_len, raw):
|
||||
|
||||
if data_len >= MAX_V40_SIZE:
|
||||
# too big for single version 40 QR
|
||||
await ux_show_story("Resulting txn is too big for QR code")
|
||||
await ux_show_story("Resulting txn is too big for single QR code.")
|
||||
return
|
||||
|
||||
await show_qr_code(here, is_alnum=True, msg=(txid or 'Partly Signed PSBT'))
|
||||
@ -839,4 +762,79 @@ async def qr_psbt_sign(decoder, psbt_len, raw):
|
||||
UserAuthorizedAction.active_request = ApproveTransaction(psbt_len, approved_cb=done)
|
||||
the_ux.push(UserAuthorizedAction.active_request)
|
||||
|
||||
async def ux_visualize_txn(bin_txn):
|
||||
# Show the user a signed transaction on-screen.
|
||||
# - longer-term we may offer more data about address ownership, etc
|
||||
# - be careful not to claim things we cannot prove w/o UTXO from confirmed blocks
|
||||
# - .. like the fee, which would be useful
|
||||
from ux import ux_show_story
|
||||
from io import BytesIO
|
||||
from psbt import calc_txid
|
||||
from ubinascii import hexlify as b2a_hex
|
||||
from serializations import CTransaction
|
||||
|
||||
txn = CTransaction()
|
||||
|
||||
try:
|
||||
txn.deserialize(BytesIO(bin_txn))
|
||||
|
||||
if (n := len(txn.vin)) == 1:
|
||||
msg = '1 input, '
|
||||
else:
|
||||
msg = '%d inputs, ' % n
|
||||
|
||||
if (n := len(txn.vout)) == 1:
|
||||
msg += '1 output'
|
||||
else:
|
||||
msg += '%d outputs' % n
|
||||
|
||||
# add txid
|
||||
txid = calc_txid(BytesIO(bin_txn), (0, len(bin_txn)))
|
||||
msg += '\n\nTxid:\n' + b2a_hex(txid).decode()
|
||||
|
||||
except Exception as exc:
|
||||
sys.print_exception(exc)
|
||||
msg = "Unable to deserialize"
|
||||
|
||||
await ux_show_story(msg, title="Signed Transaction")
|
||||
|
||||
|
||||
async def ux_visualize_bip21(proto, addr, args):
|
||||
# Show details of BIP-21 URL
|
||||
# - imho, a bare address is a valid BIP-21 URL so we come here too
|
||||
# - TODO: validate address ownership
|
||||
from ux import ux_show_story
|
||||
|
||||
msg = addr + '\n\n'
|
||||
args = args or {}
|
||||
|
||||
if 'amount' in args:
|
||||
msg += 'Amount: '
|
||||
try:
|
||||
amt = args.pop('amount')
|
||||
whole, frac = amt.split('.', 1)
|
||||
frac = int(frac) if frac else 0
|
||||
whole = int(whole) if whole else 0
|
||||
msg += '%d.%08d BTC\n' % (whole, frac)
|
||||
except:
|
||||
msg += '(corrupt)\n'
|
||||
|
||||
for fn in ['label', 'message', 'lightning']:
|
||||
if fn in args:
|
||||
val = args.pop(fn) # XXX needs url-decoding
|
||||
msg += '%s%s: %s\n' % (fn[0].upper(), fn[1:], val)
|
||||
|
||||
if args:
|
||||
msg += 'And values for: ' + ', '.join(args)
|
||||
|
||||
await ux_show_story(msg, title="Payment Address")
|
||||
|
||||
async def ux_visualize_textqr(txt):
|
||||
from ux import ux_show_story
|
||||
if len(txt) > 100:
|
||||
txt = txt[0:100] + '...'
|
||||
|
||||
await ux_show_story("%s\n\nAbove is text that was scanned. "
|
||||
"We can't do any more with it." % txt, title="Simple Text")
|
||||
|
||||
# EOF
|
||||
|
||||
7
testing/data/bip21.txt
Normal file
7
testing/data/bip21.txt
Normal file
@ -0,0 +1,7 @@
|
||||
bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R
|
||||
bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?label=Luke-Jr
|
||||
bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?amount=20.3&label=Luke-Jr
|
||||
bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?amount=50&label=Luke-Jr&message=Donation%20for%20project%20xyz
|
||||
bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?req-somethingyoudontunderstand=50&req-somethingelseyoudontget=999
|
||||
bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?somethingyoudontunderstand=50&somethingelseyoudontget=999
|
||||
|
||||
1
testing/data/devils-txn.txn
Normal file
1
testing/data/devils-txn.txn
Normal file
@ -0,0 +1 @@
|
||||
02000000000101dce550be0d74cfcbb042d6285ad78c85ce7d368ba5c93eb55cac1f56335d89010000000000fdffffff02ccb4eb0b00000000160014bed59ae63ed9a6a5630c2cb26cc967a23df9b87700a3e11100000000160014950fc51c23131864563589694dd913b9e310ab69040047304402200cb46756eeda1ba57fb8e260cc711ae5a0f31ff68c5abf824b551c22f2c77475022056b83be7cfd7bab9ccdb832b416e1e63d39e552bfef97f1e07e4b97e191dc67101473044022024dd75bf639765c6d530716da4167c22effc9811742a34a4ce710153327a24e10220513e4f61875d5559673090ece3f7fa2c9cda62986d84c88bc1ab087b7bd02c850147522102f9c33362e7c4d9d21e9145e1478a36f341f2f0cfe7055abe92380bb806d9ce7821035d92f72e755b3866b034f16b637cb6f3c24d8d4924664df8cc028a0798e75a3252ae00000000
|
||||
4
testing/data/overlapped-qr.txt
Normal file
4
testing/data/overlapped-qr.txt
Normal file
@ -0,0 +1,4 @@
|
||||
B$ZP0201DXW74HWMBTNKHH3H5X2Y66JCVKFAJT4HDPG5NJWYLH7GPHOXH7MSNKSNVLGBQVYW3DFZH6XTS5P4MVJBCP7R367KBYJO3U6FZ2P56NTNJE2D65EJJULOTE36V7PLA377Y5OPCRFFGZATV3TZF72R7UKVEQ5NXPDNLDVR2LNDWQZSEL7ATFAZ2S67XN6OM3JHTZ6MLRRWH7WZMZWN53CFRLAEQ3WP3OHIQG3GLIJO2LWUPFNLF2L2ZNJA3BK7ME7T4T2VO7VAMPMYVMYZPD4FT425EGLNAJSQ42X6HIGLDEVUL4SW7TYCZ6LRQVODO7RX3QQI3FB2IWYOGZ4JB77YWFV636KOV4TZJD7RAYPZZEEHNCH5C7SCVK2CJ3TJNLV4KQRMWUSUM7NV5P5RRT65TUNT7QDWENOW7KWXX5G2FJBAAZIIB5EC2EP7AV4AQAA
|
||||
B$ZP0201DXW74HWMBTNKHH3H5X2Y66JCVKFAJT4HDPG5NJWYLH7GPHOXH7MSNKSNVG3VHHEP3KQKUOE76JY25P3733AYM6ZWPYW3DFZH6XTS5P4MVJBCP7R367KBYJO3U6FZ2P56NTNJEB$ZP0201DXW74HWMBTNKHH3H5X2Y66JCVKFAJT4HDPG5NJWYLH7GPHOXH7MSNKSNVLGBQVMF76OXGB76OU7QWCP7R367KBYJO3U6FZ2P56NTNJE2D65EJJULOTE36V7PLA377Y5OPCRFFGZATV3TZF72R7UKVEQ5NXPDNLDVR2LNDWQZSEL7ATFAZ2S77XN6OM3JHTZ6MLRRWH7WZMZWN53CFRLAEQ3WP3OHIQG3GLIJO2LWUPFNLF2L2ZNJA3BK7ME7T4T2VO7VAMPMYVMYZPD4FT425EGLNAJSQ42IGLDEVUL4SW7TYCZ6LRQVODO7RX3QQI3FB2IWYOGZ4JB77YWFV636KOV4TZJD7RAYPZZEEHNCH5C7SCVK2CJ3TJNLV4KQRMWUM2E7NV5P5RRT65TUNT7QDWENOW7KWXX5G2FJBAAZIIB5EC2EP7AV4AQAA
|
||||
B$ZP0200FMUE4KXZZ7EPBV47TGEYDAMBTGLVX5HN6LCKTCIRFGXILJ4N2IR4LTTJ3ZZTXKTVLPML4YMCKZG7KDIGCD4A6BH6IUNQUVONWKVZM74YT6Y6ZA6O5PQORPG5VV7OTMYXW44TKNGVN55ASV2Y722VJMJREFVXFRE46XTCYDBBNKN74LOU7BJ3WLPGTWJ655WGHU54LYIKIWJX5XYEOMSMWVUKLTSMY6VCU7W3F47PV6CSNIP37W4Z5GGG5XUFRA5M4K4P5HFTGLZUEMS72SK6S3WZ2T7U7L37ZUDVIFO63IE4DGQ3ZJ7IJ3N3EMPHN2WJQRLJLMBYKQKSPCRSXDNZUMXLXET2UPFX3FQSPSCYOE4QIVMFV2EWHMOQQZ4EDD27A6XO34UP6IUGNXXEROVOEHUTORI5Z65FTP5F4MI737KTK6K5ON5YUN5KBI44XRP7H3TVMPNJ5567VRH43GBMFQ5M7EGKQKR2YUSDLQWCU3HNR5DSO6O7GK7BY5L7HD2YUD4VN2DNZA3EMF65FBR75DTTDVUYKXP2Q6RVAVKUP5MPW4UBR3C43ERJWNHLEHUZYN7C272NAGNF7ZBNF46LNXHQXNPLRIRIJP6JPS6BQ7J7UUKVMQ3SMHJY6MQO3AMLWTTY4GCOX3ODZKKEPFKBKRQXUST6WXW67O4H63OU36LWUV34ZZ6E3CKHKXQVG7DGS2QPDVCKY2JCQRNWO7IWP53J6BOUHM7F7KH3257M
|
||||
B$ZP0201DXW74HWMBTNKHH3H5X2Y66JCVKFAJT4HDPG5NJWYLH7GPHOXH7MSNKSNVLGBQVMF76OXGB76OU7QW6LY7SGDB4ZMQG3VHHEP3KQKUOE76JY25P3733AYM6ZWPYW3DFZH6XTS5P4MVJBCP7R367KBYJO3U6FZ2P56NTNJE2D65EJJULOTE36V7PLA377Y5OPCRFFGZATV3TZF72R7UKVEQ5NXPDNLDVR2LNDWQZSEL7ATFAZ2S7ZUBP4AUUMRSQUZIDKNU63AF3B7W7IMLCASPCHYB4CIYDAPX6ZMEDHGKY2V67XN6OM3JHTZ6MLRRWH7WZMZWN53CFRLAEQ3WP3OHIQG3GLIJO2LWUPFNLF2L2ZNJA3BK7ME7T4T2VO7VAMPMYVMYZPD4FT425EGLNAJSQ42XJFXFZQ4PP2HXR7EFFP73P7Q7W73IZC3EDGCCTBXSUTFR53NTK5FWWP7QCAW7R3Z6HIGLDEVUL4SW7TYCZ6LRQVODO7RX3QQI3FB2IWYOGZ4JB77YWFV636KOV4TZJD7RAYPZZEEHNCH5C7SCVK2CJ3TJNLV4KQRMWUSUMLPOI7F75MO7XQLNWOLN5YTCX6RQA5YICN3SVAEHGEYQQHW7Y6U7PSPBPXS4DPM2E7NV5P5RRT65TUNT7QDWENOW7KWXX5G2FJBAAZIIB5EC2EP7AV4AQAA
|
||||
131
testing/test_decoders.py
Normal file
131
testing/test_decoders.py
Normal file
@ -0,0 +1,131 @@
|
||||
# (c) Copyright 2023 by Coinkite Inc. This file is covered by license found in COPYING-CC.
|
||||
#
|
||||
# test decoders.py code (unit test)
|
||||
#
|
||||
#
|
||||
import pytest
|
||||
from binascii import a2b_hex, b2a_hex
|
||||
from base64 import b64encode
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
from helpers import prandom
|
||||
|
||||
from mnemonic import Mnemonic
|
||||
wordlist = Mnemonic('english').wordlist
|
||||
|
||||
@pytest.fixture
|
||||
def try_decode(sim_exec):
|
||||
def doit(arg):
|
||||
cmd = "from decoders import decode_qr_result; " + \
|
||||
f"RV.write(repr(decode_qr_result({arg!r})))"
|
||||
|
||||
result = sim_exec(cmd)
|
||||
|
||||
if 'Traceback' in result:
|
||||
raise RuntimeError(result)
|
||||
|
||||
if '<' in result:
|
||||
# objects, like "<HexStreamer..."
|
||||
result = result.replace('<', "'").replace('>', "'")
|
||||
|
||||
return eval(result)
|
||||
return doit
|
||||
|
||||
@pytest.mark.parametrize('fname,expect', [
|
||||
( 'data/p2pkh+p2sh+outs.psbt', 'psbt'),
|
||||
( 'data/snight-example.psbt', 'psbt'),
|
||||
( 'data/devils-txn.txn', 'txn'),
|
||||
])
|
||||
@pytest.mark.parametrize('encoding', ['hex', 'b64'])
|
||||
def test_detector_bin(fname, expect, encoding, try_decode):
|
||||
|
||||
# NOTE: input files must be hex to start
|
||||
arg = a2b_hex(open(fname, 'rt').read().strip())
|
||||
|
||||
if encoding == 'hex':
|
||||
arg = b2a_hex(arg).decode()
|
||||
elif encoding == 'b64':
|
||||
arg = b64encode(arg).decode()
|
||||
else:
|
||||
raise ValueError
|
||||
|
||||
ft, vals = try_decode(arg)
|
||||
assert ft == expect
|
||||
|
||||
|
||||
@pytest.mark.parametrize('url', [
|
||||
'bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R',
|
||||
'bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?label=Luke-Jr',
|
||||
'bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?amount=20.3&label=Luke-Jr',
|
||||
'bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?amount=50&label=Luke-Jr&message=Donation%20for%20project%20xyz',
|
||||
'bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?req-somethingyoudontunderstand=50&req-somethingelseyoudontget=999',
|
||||
'bitcoin:mtHSVByP9EYZmB26jASDdPVm19gvpecb5R?somethingyoudontunderstand=50&somethingelseyoudontget=999',
|
||||
])
|
||||
@pytest.mark.parametrize('bip21', range(2))
|
||||
@pytest.mark.parametrize('addr_fmt', range(2))
|
||||
def test_detector_url(url, bip21, addr_fmt, try_decode):
|
||||
a1, a2 = ('mtHSVByP9EYZmB26jASDdPVm19gvpecb5R',
|
||||
'BCRT1QUPYD58NDSH7LUT0ET0VTRQ432JVU9JTDX8FGYV')
|
||||
|
||||
if not bip21:
|
||||
_, url = url.split(':', 1)
|
||||
if addr_fmt:
|
||||
url = url.replace(a1, a2)
|
||||
expect_addr = a2
|
||||
else:
|
||||
expect_addr = a1
|
||||
|
||||
ft, vals = try_decode(url)
|
||||
assert ft == 'addr'
|
||||
proto, addr, args = vals
|
||||
assert addr == expect_addr
|
||||
assert proto == ('bitcoin' if bip21 else None)
|
||||
|
||||
p = urlparse(url)
|
||||
assert p.path == addr
|
||||
|
||||
xargs = parse_qs(p.query)
|
||||
if args:
|
||||
assert xargs.keys() == args.keys()
|
||||
else:
|
||||
assert not xargs
|
||||
|
||||
|
||||
@pytest.mark.parametrize('num_words', [12, 18, 24])
|
||||
@pytest.mark.parametrize('encoding', ['short', 'long', 'seed_qr'])
|
||||
@pytest.mark.parametrize('case', range(2))
|
||||
def test_detector_secrets(num_words, encoding, case, try_decode):
|
||||
|
||||
n = [(i*179)% 2048 for i in range(num_words)]
|
||||
|
||||
words = [wordlist[i] for i in n]
|
||||
expect = list(words)
|
||||
|
||||
if encoding == 'seed_qr':
|
||||
if case: return
|
||||
qr = ''.join('%04d'%i for i in n)
|
||||
else:
|
||||
|
||||
if encoding == 'short':
|
||||
words = [w[0:4] for w in words]
|
||||
if case:
|
||||
words = [w.upper() for w in words]
|
||||
|
||||
qr = ' '.join(words)
|
||||
|
||||
ft, vals = try_decode(qr)
|
||||
assert ft == 'words'
|
||||
got_words, = vals
|
||||
assert got_words == expect
|
||||
|
||||
@pytest.mark.parametrize('code', [
|
||||
'xprv9s21ZrQH143K2LBWUUQRFXhucrQqBpKdRRxNVq2zBqsx8HVqFk2uYo8kmbaLLHRdqtQpUm98uKfu3vca1LqdGhUtyoFnCNkfmXRyPXLjbKb',
|
||||
'xpub69H7F5d8KSRgmmdJg2KhpAK8SR3DjMwAdkxj3ZuxV27CprR9LgpeyGmXUbC6wb7ERfvrnKZjXoUmmDznezpbZb7ap6r1D3tgFxHmwMkQTPH',
|
||||
])
|
||||
def test_detector_xp(code, try_decode):
|
||||
|
||||
ft, vals = try_decode(code)
|
||||
|
||||
assert ft == code[0:4]
|
||||
assert vals[0] == code
|
||||
|
||||
# EOF
|
||||
@ -76,6 +76,18 @@ class AttachedQRScanner(QRScanner):
|
||||
|
||||
return 0
|
||||
|
||||
def set_baud(self, br=None):
|
||||
# change serial port baud rate
|
||||
import termios
|
||||
attr = termios.tcgetattr(self.serial.fileno())
|
||||
# [4][5] are the baud rate
|
||||
was = int(attr[4])
|
||||
attr[4] = br # assuming termios.B9600 = 9600 etc
|
||||
attr[5] = br
|
||||
if br is not None:
|
||||
termios.tcsetattr(self.serial.fileno(), 0, attr)
|
||||
return was
|
||||
|
||||
async def flush_junk(self):
|
||||
# I am in lack of .any() member on my serial port
|
||||
while 1:
|
||||
|
||||
1
unix/work/.gitignore
vendored
1
unix/work/.gitignore
vendored
@ -1,3 +1,4 @@
|
||||
# this simulator, or test cases produce these files.
|
||||
nfc-dump.ndef
|
||||
readback.psbt
|
||||
qrdata.txt
|
||||
|
||||
Loading…
Reference in New Issue
Block a user