scanning psbt

This commit is contained in:
Peter D. Gray 2023-11-23 14:41:43 -05:00 committed by scgbckbone
parent 359c343854
commit f63db63e76
15 changed files with 593 additions and 123 deletions

View File

@ -21,7 +21,7 @@ from glob import settings
from pincodes import pa
from menu import start_chooser
from version import MAX_TXN_LEN
from charcodes import KEY_NFC, KEY_QR
from charcodes import KEY_NFC, KEY_QR, KEY_CANCEL
CLEAR_PIN = '999999-999999'
@ -690,7 +690,8 @@ async def view_seed_words(*a):
raw or sv.raw,
sv.node)
msg += '\n\nPress (1) to view as QR Code.'
if not version.has_qwerty:
msg += '\n\nPress (1) to view as QR Code.'
while 1:
ch = await ux_show_story(msg, sensitive=True, escape='1'+KEY_QR)
@ -1343,8 +1344,11 @@ async def import_xprv(_1, _2, item):
# error already displayed in nfc.py
return
elif choice == KEY_QR:
# TODO: scan something
pass
from ux_q1 import QRScannerInteraction
extended_key = await QRScannerInteraction.scan('Scan XPRV from a QR code')
if not extended_key:
# press pressed CANCEL
return
else:
# only get here if NFC was not chosen
# pick a likely-looking file.
@ -1491,7 +1495,7 @@ async def import_tapsigner_backup_file(_1, _2, item):
meta = "from "
label = "TAPSIGNER encrypted backup file"
choice = await import_export_prompt(label, is_import=False)
choice = await import_export_prompt(label, is_import=True)
if choice == KEY_CANCEL:
return
@ -1502,8 +1506,28 @@ async def import_tapsigner_backup_file(_1, _2, item):
# error already displayed in nfc.py
return
elif choice == KEY_QR:
# TODO: how is binary encoded? who made this QR??!
pass
# how is binary encoded? who made this QR??!
from ux_q1 import QRScannerInteraction
from ubinascii import a2b_base64
from ubinascii import unhexlify as a2b_hex
prob = None
while 1:
data = await QRScannerInteraction.scan(
'Scan TAPSIGNER backup data', prob)
if not data: return # pressed cancel
# guess at serialization between Base64 and Hex
try:
# pure hex, the smarter encoding (when in caps)
data = a2b_hex(data)
except ValueError:
try:
data = a2b_base64(data)
except ValueError:
prob = 'Expected HEX digits or Base64 encoded binary'
continue
break
else:
fn = await file_picker('Pick ' + label, suffix="aes", min_size=100, max_size=160, **choice)
if not fn: return

View File

@ -967,7 +967,7 @@ def psbt_encoding_taster(taste, psbt_len):
if taste[0:5] == b'psbt\xff':
decoder = None
output_encoder = lambda x: x
elif taste[0:10] == b'70736274ff' or taste[0:10] == b'70736274FF':
elif taste[0:10].lower() == b'70736274ff':
decoder = HexStreamer()
output_encoder = HexWriter
psbt_len //= 2

169
shared/bbqr.py Normal file
View File

@ -0,0 +1,169 @@
# (c) Copyright 2023 by Coinkite Inc. This file is covered by license found in COPYING-CC.
#
# bbqr.py - Implement BBQr protocol for multiple QR support (also compression and filetype info)
#
import utime, uzlib
import uasyncio as asyncio
from struct import pack, unpack
from utils import B2A
from imptask import IMPT
from queues import Queue
# For BBQr support
import ngu
b32encode = ngu.codecs.b32_encode
b32decode = ngu.codecs.b32_decode
from ubinascii import unhexlify as a2b_hex
TYPE_LABELS = dict(P='PSBT File', T='Transaction', J='JSON', C='CBOR', U='Unicode Text')
class BBQrHeader:
def __init__(self, taste):
# parse header based on standard
# expects a string
assert len(taste) >= 8
assert taste[0:2] == 'B$'
self.encoding, self.file_type = taste[2:4]
self.num_parts = int(taste[4:6], 16)
self.which = int(taste[6:8], 16)
assert 1 <= self.num_parts <= 255
assert 0 <= self.which < self.num_parts
def __repr__(self):
return '<BBQr: %d of %d parts, enc=%s ft=%s>' % (self.which, self.num_parts,
self.encoding, self.file_type)
def is_compat(self, other):
# Does this header match previous ones seen?
return (self.encoding == other.encoding and
self.file_type == other.file_type and
self.num_parts == other.num_parts)
def decode_body(self, scan):
# perform the decoding implied by header (but not decompression)
body = bytes(memoryview(scan)[8:])
if self.encoding == 'H':
rv = a2b_hex(body)
else:
rv = b32decode(body)
return rv
def file_label(self):
# provide a string as hint to user of what they are getting
if self.file_type in TYPE_LABELS:
return TYPE_LABELS[self.file_type]
else:
return 'Unknown: %s' % self.file_type
class BBQrState:
def __init__(self):
self.reset()
def reset(self):
self._psb = None # hack, to be removed
self.hdr = None
self.parts = set()
self.runt = None
self.runt_size = None
self.blksize = None
def upper_bound(self):
# max size we are expecting
return self.blksize * self.hdr.num_parts
def is_valid(self):
return bool(self.hdr) and len(self.parts) == self.hdr.num_parts
def collect(self, scan):
# Another BBQr has come in; track it.
# - return T while more parts are still needed
# - updates UX to show the progress
from glob import dis
hdr = BBQrHeader(scan)
print("Got " + repr(hdr))
if not self.hdr or not self.hdr.is_compat(hdr):
# New or incompatible header, they might have changed their
# minds and are now trying to scan something else; recover
self.reset()
self.hdr = hdr
if hdr.which not in self.parts:
# we've NOT YET seen this one
# convert back to binary
raw = hdr.decode_body(scan)
if hdr.which and (hdr.which == hdr.num_parts-1) and not self.parts:
# Problem: this is a runt and we saw it first, we have no idea
# where to put it; store as tmp for now.
self.runt = (hdr.which, raw)
else:
# based on (required) assumption that all parts are equal, we know
# where to put this data, so do that.
self.parts.add(hdr.which)
if self.blksize is None:
self.blksize = len(raw)
self.save_packet(hdr.which, raw)
# seeing any other packet is enough to decide where to put the runt
if self.runt:
wh, raw = self.runt
self.save_packet(wh, raw)
self.parts.add(wh)
self.runt = None
# provide UX
dis.draw_bbqr_progress(hdr.which, list(self.parts), hdr.num_parts, hdr.file_label())
# do we need more still?
return (len(self.parts) < hdr.num_parts)
def save_packet(self, which, data):
# override this on other projects... which don't have stupid PSRAM like this
# - can only write 4-aligned data to PSRAM, and typically the parts will not be
# 4-aligned because base32 yields 5 byte quantities
# - TODO: keep up num_parts of 3-byte runts, etc. {offset:bytes} and flush aligned
# parts each time we get more data
assert self.blksize is not None
if which == None:
# we are supposed to be done, return w/ complete length
final_size = (self.blksize * (self.hdr.num_parts-1)) + self.runt_size
return final_size, self._psb[0:final_size]
if which == self.hdr.num_parts-1:
self.runt_size = len(data)
offset = which * self.blksize
if not self._psb:
self._psb = bytearray(self.upper_bound())
self._psb[offset:offset+len(data)] = data
def finalize(self):
# got all the parts, so maybe decompress
# - return number of bytes waiting at start of PSRAM, and the filetype code
assert len(self.parts) == self.hdr.num_parts, "still missing parts"
# flush out data we have
final_size, raw = self.save_packet(None, None)
if self.hdr.encoding == 'Z':
# do in-place Zlib decompression (TODO)
raw = uzlib.decompress(raw, -10)
final_size = len(raw)
return self.hdr.file_type, final_size, raw
# EOF

View File

@ -540,7 +540,8 @@ class Display:
# - we need one more (white) pixel on all sides
from utils import word_wrap
assert not sidebar
# maybe show something other than QR contents under it
msg = sidebar or msg
if msg:
if len(msg) <= CHARS_W:
@ -596,6 +597,14 @@ class Display:
# TODO: pass a "max_brightness" param here, which would be cleared after next show
self.show()
def draw_bbqr_progress(self, new, gotem, num_parts, label):
# we've seen at least one BBQr QR, so update display w/ progress bar
# - lots of data so we can show nice animation
count = len(gotem) or 1
percent = int(count * 100.0 / num_parts)
self.text(None, -2, 'Keep scanning more...')
self.text(None, -1, '%s: %d of %d = %d%% ' % (label, count, num_parts, percent), dark=True)
def draw_box(self, x, y, w, h):
# using line-drawing chars, draw a box
# returns X pos of first inside char

View File

@ -10,6 +10,10 @@ from charcodes import (KEY_LEFT, KEY_RIGHT, KEY_UP, KEY_DOWN, KEY_HOME,
from version import has_qwerty
# TODO: This class has a terrible API!
MAX_V40_SIZE = 4296
class QRDisplaySingle(UserInteraction):
# Show a single QR code for (typically) a list of addresses, or a single value.
@ -30,6 +34,7 @@ class QRDisplaySingle(UserInteraction):
# - version=4..11 => single pixel per module
# - not really providing enough space around these, shrug
# - inverted QR (black/white swap) still readable by scanners, altho wrong
# - on Q: ver 23 => 109x109 is largest that can be pixel-doubled, can do v40 tho at 1:1
if self.is_alnum:
# targeting 'alpha numeric' mode, nice and dense; caps only tho
enc = uqr.Mode_ALPHANUMERIC

View File

@ -8,6 +8,7 @@ from struct import pack, unpack
from utils import B2A
from imptask import IMPT
from queues import Queue
from bbqr import BBQrState
def calc_bcc(msg):
bcc = 0
@ -50,6 +51,7 @@ def unwrap(packed):
OKAY = b'Z\x01\x00\x02\x90\x00\x93\xa5'
RAW_OKAY = b'\x90\x00'
LEN_OKAY = const(8)
# TODO: constructor should leave it in reset for simple lower-power usage; then after
# login we can do full setup (2+ seconds) and then sleep again until needed.
@ -57,10 +59,13 @@ LEN_OKAY = const(8)
class QRScanner:
def __init__(self):
self.lock = asyncio.Lock()
# set when we are..
self.busy_scanning = False
# hodl this lock when communicating w/ QR scanner
self.lock = asyncio.Lock()
from machine import UART, Pin
self.serial = UART(2, 9600)
self.reset = Pin('QR_RESET', Pin.OUT_OD, value=0)
@ -84,29 +89,30 @@ class QRScanner:
# setup device, and then stop
await asyncio.sleep(2)
while self.serial.read():
pass # ignore old data
async with self.lock:
while self.serial.read():
pass # ignore old data
try:
# get b'V2.3.0.7\r\n' or similar
rx = await self.tx('T_OUT_CVER')
self.version = rx.decode().strip()
except:
raise
#print("QR Scanner: missing")
try:
# get b'V2.3.0.7\r\n' or similar
rx = await self.tx('T_OUT_CVER')
self.version = rx.decode().strip()
except:
raise
#print("QR Scanner: missing")
# configure it like we want it
await self.tx('S_CMD_FFFF') # factory reset of settings
await self.tx('S_CMD_MTRS5000') # 5s to read before fail
await self.tx('S_CMD_MT11') # trigger is edge-based (not level)
await self.tx('S_CMD_MT30') # Same code reading without delay
await self.tx('S_CMD_MT20') # Enable automatic sleep when idle
await self.tx('S_CMD_MTRF500') # Idle time: 500ms
await self.tx('S_CMD_059A') # add CR LF after QR data
# configure it like we want it
await self.tx('S_CMD_FFFF') # factory reset of settings
await self.tx('S_CMD_MTRS5000') # 5s to read before fail
await self.tx('S_CMD_MT11') # trigger is edge-based (not level)
await self.tx('S_CMD_MT30') # Same code reading without delay
await self.tx('S_CMD_MT20') # Enable automatic sleep when idle
await self.tx('S_CMD_MTRF500') # Idle time: 500ms
await self.tx('S_CMD_059A') # add CR LF after QR data
self.setup_done = True
self.setup_done = True
await self.goto_sleep()
await self.goto_sleep()
async def scan_once(self):
# blocks until something is scanned. returns it
@ -115,6 +121,8 @@ class QRScanner:
while not self.setup_done:
await asyncio.sleep(.25)
bbqr = BBQrState()
async with self.lock:
self.busy_scanning = True
@ -134,17 +142,39 @@ class QRScanner:
await self.tx('SR030301')
try:
rv = await self.stream.readline()
while 1:
rv = await self._readline()
if not rv: continue
if rv[0:2] == 'B$' and bbqr.collect(rv):
# BBQr protocol detected; collect more data
continue
break
except asyncio.CancelledError:
rv = None
return None
finally:
await self.tx('SR030300')
self.busy_scanning = False
try:
await self.tx('SR030300')
except:
await self.tx('SR030300') # second try
await self.goto_sleep()
self.busy_scanning = False
if bbqr.is_valid():
# will return a tuple
return bbqr
return rv
async def _readline(self):
# overridden in simulator
# - blocks for QR to be seen
# - must trim newline(s)
# - must convert to str
rv = await self.stream.readline()
return rv.rstrip().decode()
async def wakeup(self):
# send specific command until it responds
# - it will wake on any command, but not instant
@ -231,7 +261,7 @@ class QRScanner:
return
if self.busy_scanning:
# do nothing if scanning already
# during scanning, invert meaning... turn off light
return
async with self.lock:

View File

@ -114,7 +114,6 @@ class HexWriter:
def write(self, b):
self.checksum.update(b)
self.pos += len(b)
self.fd.write(b2a_hex(b))
def seek(self, offset, whence=0):
@ -135,6 +134,17 @@ class HexWriter:
buf[0:len(b)] = b
return len(b)
class CapsHexWriter(HexWriter):
# omit newlines at end, and do CAPS ... better for QR usage
def write(self, b):
self.checksum.update(b)
self.pos += len(b)
self.fd.write(b2a_hex(b).upper())
def __exit__(self, *a, **k):
self.fd.seek(0, 2) # go to end
return self.fd.__exit__(*a, **k)
class Base64Writer:
# Emulate a file/stream but convert binary to Base64 as they write
def __init__(self, fd):

View File

@ -89,9 +89,10 @@ def ux_clear_keys(no_aborts=False):
async def ux_wait_keyup(expected=None):
# Wait for single keypress in 'expected' set, return it
# no visual feedback, no escape
# - can be canceled anytime, using wait_for to create a timeout
from glob import numpad
armed = None
armed = numpad.key_pressed or False
while 1:
ch = await numpad.get()
@ -126,6 +127,9 @@ def ux_poll_key():
return ch
def q1_reword(msg):
return msg.replace('\nX ', 'CANCEL ').replace(' X ', ' CANCEL ').replace('OK', 'ENTER')
async def ux_show_story(msg, title=None, escape=None, sensitive=False, strict_escape=False):
# show a big long string, and wait for XY to continue
# - returns character used to get out (X or Y)
@ -139,6 +143,10 @@ async def ux_show_story(msg, title=None, escape=None, sensitive=False, strict_es
# LATER: rarely used
lines.append('\x01' + title)
if version.has_qwerty:
# big screen always needs blank after title
lines.append('')
if hasattr(msg, 'readline'):
# coming from in-memory file for larger messages
msg.seek(0)
@ -146,6 +154,9 @@ async def ux_show_story(msg, title=None, escape=None, sensitive=False, strict_es
if ln[-1] == '\n':
ln = ln[:-1]
if version.has_qwerty:
ln = q1_reword(ln)
if len(ln) > CH_PER_W:
lines.extend(word_wrap(ln, CH_PER_W))
else:
@ -159,7 +170,7 @@ async def ux_show_story(msg, title=None, escape=None, sensitive=False, strict_es
else:
# simple string being shown
if version.has_qwerty:
msg = msg.replace('\nX ', 'CANCEL ').replace(' X ', ' CANCEL ').replace('OK', 'SELECT')
msg = q1_reword(msg)
for ln in msg.split('\n'):
if len(ln) > CH_PER_W:
@ -293,9 +304,9 @@ async def show_qr_codes(addrs, is_alnum, start_n):
o = QRDisplaySingle(addrs, is_alnum, start_n, sidebar=None)
await o.interact_bare()
async def show_qr_code(data, is_alnum=False):
async def show_qr_code(data, is_alnum=False, msg=None):
from qrs import QRDisplaySingle
o = QRDisplaySingle([data], is_alnum)
o = QRDisplaySingle([data], is_alnum, sidebar=msg)
await o.interact_bare()
async def ux_enter_bip32_index(prompt, can_cancel=False, unlimited=False):
@ -384,14 +395,13 @@ async def import_export_prompt(title, is_import=False, no_qr=False):
else:
prompt, escape = _export_prompt_builder(title, no_qr)
force_vdisk = False
slot_b = None # ie. don't care
force_vdisk = False
slot_b = None # ie. don't care / either
if not prompt:
# they don't have NFC nor VD enabled, and no second slots... so will be file.
pass
else:
print('escape: ' + repr(escape))
ch = await ux_show_story(prompt, escape=escape)
if ch in "3"+KEY_NFC:

View File

@ -68,7 +68,7 @@ async def ux_confirm(msg):
# confirmation screen, with stock title and Y=of course.
from ux import ux_show_story
resp = await ux_show_story('\n' + msg, title="Are you SURE ?!?")
resp = await ux_show_story(msg, title="Are you SURE ?!?")
return resp == 'y'
@ -594,9 +594,11 @@ class QRScannerInteraction:
def __init__(self):
pass
async def animation(self, prompt, line2=None):
@staticmethod
async def scan(prompt, line2=None):
# draw animation, while waiting for them to scan something
# - CANCEL to abort
# - returns a proper string or None. newlines stripped. no binary support
from glob import dis, SCAN
from ux import ux_wait_keyup
frames = [ 1, 2, 3, 4, 5, 4, 3, 2 ]
@ -621,8 +623,7 @@ class QRScannerInteraction:
print("Scanned: %r" % data)
break
# wait for key or 250ms
# XXX bug, key doesn't usually get noticed?
# wait for key or 250ms delay
try:
ch = await asyncio.wait_for_ms(ux_wait_keyup(), 250)
except asyncio.TimeoutError:
@ -632,10 +633,11 @@ class QRScannerInteraction:
data = None
break
#await SCAN.scan_stop()
task.cancel()
# clear screen right away so user knows we got it
dis.clear()
dis.show()
return data
@ -697,10 +699,33 @@ class QRScannerInteraction:
prompt = 'Scan any QR code, or CANCEL' if not expect_secret else \
'Scan XPRV or Seed Words, or CANCEL'
got = await self.animation(prompt, line2=problem)
got = await self.scan(prompt, line2=problem)
if got is None:
return
if hasattr(got, 'finalize'):
# BBQr object
ty, final_size, got = got.finalize()
if ty == 'P':
print("Got a PSBT file: %d bytes" % final_size)
await qr_psbt_sign(None, final_size, got)
return
elif ty == 'T':
problem = "No good use for transaction."
continue
elif ty == 'C':
problem = "Sorry, Q has no use CBOR"
continue
elif ty == 'J':
problem = "Sorry, Q has no use JSON yet"
continue
elif ty == 'U':
# continue text thru code below
pass
else:
problem = "Sorry, dont know filetype: " + ty
continue
try:
# can we decode a master secret of some type?
mode, value = self.decode_secret(got)
@ -711,10 +736,20 @@ class QRScannerInteraction:
problem = str(exc)
continue
# Might be an address or pubkey? But not binary
got = got.decode().strip()
# might be a PSBT?
if len(got) > 100:
from auth import psbt_encoding_taster
try:
decoder, _, psbt_len = psbt_encoding_taster(got[0:10].encode(), len(got))
print("Got a PSBT file")
await qr_psbt_sign(decoder, psbt_len, got)
return
except ValueError:
pass
# remove URL scheme: if present
# Might be an address or pubkey or psbt? But not binary
# remove URL protocol: if present
scheme = None
if ':' in got:
scheme, got = got.split(':', 1)
@ -745,4 +780,63 @@ class QRScannerInteraction:
problem = 'Sorry, can not make use of that!'
async def qr_psbt_sign(decoder, psbt_len, raw):
# Got a PSBT coming in from QR scanner. Assume single QR for now.
# - similar to auth.sign_psbt_file()
from auth import UserAuthorizedAction, ApproveTransaction
from utils import CapsHexWriter
from glob import dis, PSRAM
from ux import show_qr_code, the_ux
from sffile import SFFile
from auth import MAX_TXN_LEN, TXN_INPUT_OFFSET, TXN_OUTPUT_OFFSET
from qrs import MAX_V40_SIZE
if isinstance(raw, str):
raw = raw.encode()
# copy to PSRAM, and convert encoding at same time
total = 0
with SFFile(TXN_INPUT_OFFSET, max_size=psbt_len) as out:
if not decoder:
total += out.write(raw)
else:
for here in decoder.more(raw):
out.write(here)
total += len(here)
# might have been whitespace inflating initial estimate of PSBT size
assert total <= psbt_len
psbt_len = total
async def done(psbt):
dis.fullscreen("Wait...")
txid = None
with SFFile(TXN_OUTPUT_OFFSET, max_size=MAX_TXN_LEN, message="Saving...") as psram:
# save transaction, as hex into PSRAM
with CapsHexWriter(psram) as fd:
if psbt.is_complete():
txid = psbt.finalize(fd)
else:
psbt.serialize(fd)
data_len = psram.tell()
UserAuthorizedAction.cleanup()
# SOON will be a loop here, that animates multiple QR's ... for now, one.
here = PSRAM.read_at(TXN_OUTPUT_OFFSET, data_len)
if data_len >= MAX_V40_SIZE:
# too big for single version 40 QR
await ux_show_story("Resulting txn is too big for QR code")
return
await show_qr_code(here, is_alnum=True, msg=(txid or 'Partly Signed PSBT'))
UserAuthorizedAction.cleanup()
UserAuthorizedAction.active_request = ApproveTransaction(psbt_len, approved_cb=done)
the_ux.push(UserAuthorizedAction.active_request)
# EOF

View File

@ -94,7 +94,7 @@ static void write_data(const spi_t *spi, int len, const uint8_t *data)
static void write_data_repeated(const spi_t *spi, int count, int len, const uint8_t *data)
{
// Send a bunch of pixel data, and repeat it N tim.es
// Send a bunch of pixel data, and repeat it N times.
mp_hal_pin_write(PIN_LCD_CS, 1);
mp_hal_pin_write(PIN_LCD_DATA_CMD, 1);
mp_hal_pin_write(PIN_LCD_CS, 0);

View File

@ -69,9 +69,11 @@
// SD card detect switch
// - open when card inserted, grounded when no card
/* Q has dual slot, so multiple detect pins, this can't work
#define MICROPY_HW_SDCARD_DETECT_PIN (pin_C13)
#define MICROPY_HW_SDCARD_DETECT_PULL (GPIO_PULLUP)
#define MICROPY_HW_SDCARD_DETECT_PRESENT (GPIO_PIN_SET)
*/
// We have our own version of this code.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 402 KiB

After

Width:  |  Height:  |  Size: 439 KiB

View File

@ -544,67 +544,162 @@ def special_q1_keys(ch):
return None
def q1_click_to_keynum(x, y):
# convert on-screen position to a keynumber, or None if they missing
# handle screen click as "paste"
if (90 <= x <= 430) and (90 <= y <= 345):
# click on screen
return 'SCREEN'
# keypad area
left = 29
right = 490
top = 398
bottom = 790
if (y > bottom) or (y < top):
return None
# put onto a grid; better would have dead zones between them
pitch_x = (right-left) / 10
pitch_y = (bottom-top) / 7
gx = int((x - left) / pitch_x)
gy = int((y - top) / pitch_y)
#print(f'{x=} {y=} => {gx=} {gy=}')
# main qwerty area, nice grid
if 2 <= gy <= 5:
return ((gy-1) * 10) + gx
# top area; two rows really
if (0 <= gy <= 1):
if 2 <= gx <= 3:
return 0x03 # KEY_LEFT
if 6 <= gx <= 7:
return 0x06 # KEY_RIGHT
if gy == 0:
if gx == 0:
# power key?
raise SystemExit
if gx == 1:
return 0x02 # KEY_QR
if 4 <= gx <= 5:
return 0x04 # KEY_UP
if gx >= 8:
return 0x07 # KEY_CANCEL
if gy == 1:
if gx == 0:
return 0x00 # KEY_NFC
if gx == 1:
return 0x01 # KEY_TAB
if 4 <= gx <= 5:
return 0x05 # KEY_DOWN
if gx >= 8:
return 0x08 # KEY_ENTER
if gy == 6:
# bottom row
if gx == 0: # too narrow, but meh
return q1_charmap.KEYNUM_LAMP
if 1 <= gx <= 3:
return q1_charmap.KEYNUM_SHIFT
if 4 <= gx <= 6:
return 52 # space
if 7 <= gx <= 8:
return q1_charmap.KEYNUM_SYMBOL
if gx == 9:
return 54 # delete
return None
q1_pressed = set()
def handle_q1_key_events(event, numpad_tx):
def handle_q1_key_events(event, numpad_tx, data_tx):
# Map SDL2 (unix, desktop) keyscan code into keynumber on Q1
# - allow Q1 to do shift logic
# - support up to 5 keys down at once
global q1_pressed
assert event.type in { sdl2.SDL_KEYUP, sdl2.SDL_KEYDOWN}
if event.type in (sdl2.SDL_MOUSEBUTTONDOWN, sdl2.SDL_MOUSEBUTTONUP):
is_press = (event.type == sdl2.SDL_MOUSEBUTTONDOWN)
kn = q1_click_to_keynum(event.button.x, event.button.y)
is_press = (event.type == sdl2.SDL_KEYDOWN)
if kn == 'SCREEN':
# click on screen to paste clipboard into QR scanner or NFC tag
if is_press:
txt = sdl2.SDL_GetClipboardText()
if txt:
print(f"Doing paste: {txt.decode()}")
data_tx.write(txt + b'\n')
return None
# first, see if we can convert to ascii char
scancode = event.key.keysym.sym & 0xffff
try:
ch = chr(event.key.keysym.sym)
except:
ch = scancode_remap(scancode)
if not kn: return
#print(f'scan 0x{scancode:04x} mod=0x{event.key.keysym.mod:04x}=> char={ch}=0x{ord(ch) if ch else 0:02x}')
shift_down = bool(event.key.keysym.mod & 0x3) # left or right shift
symbol_down = bool(event.key.keysym.mod & 0x200) # right ALT
special_down = bool(event.key.keysym.mod & 0xc00) # left or right META
#print(f"modifier = 0x{event.key.keysym.mod:04x} => shift={shift_down} symb={symbol_down} spec={special_down}")
if special_down:
ch = special_q1_keys(ch)
if not ch:
return
# reverse char to a keynum, and perhaps the meta key too
kn = None
if ch:
if ch in q1_charmap.DECODER:
kn = q1_charmap.DECODER.find(ch)
elif ch in q1_charmap.DECODER_SHIFT:
kn = q1_charmap.DECODER_SHIFT.find(ch)
shift_down = is_press
elif ch in q1_charmap.DECODER_SYMBOL:
kn = q1_charmap.DECODER_SYMBOL.find(ch)
symbol_down = is_press
#print(f"{ch=} => keynum={kn} => shift={shift_down} sym={symbol_down}")
if kn is not None:
# do right click for shift+key, middle click for symb+key ... good luck
#shift_down = (event.button.button == sdl2.SDL_BUTTON_RIGHT)
#symbol_down = (event.button.button == sdl2.SDL_BUTTON_MIDDLE)
if is_press:
q1_pressed.add(kn)
else:
q1_pressed.discard(kn)
else:
assert event.type in { sdl2.SDL_KEYUP, sdl2.SDL_KEYDOWN}
is_press = (event.type == sdl2.SDL_KEYDOWN)
q1_pressed.discard(q1_charmap.KEYNUM_SHIFT)
q1_pressed.discard(q1_charmap.KEYNUM_SYMBOL)
# first, see if we can convert to ascii char
scancode = event.key.keysym.sym & 0xffff
try:
ch = chr(event.key.keysym.sym)
except:
ch = scancode_remap(scancode)
if shift_down:
q1_pressed.add(q1_charmap.KEYNUM_SHIFT)
if symbol_down:
q1_pressed.add(q1_charmap.KEYNUM_SYMBOL)
#print(f'scan 0x{scancode:04x} mod=0x{event.key.keysym.mod:04x}=> char={ch}=0x{ord(ch) if ch else 0:02x}')
#print(f" .. => pressed: {q1_pressed}")
shift_down = bool(event.key.keysym.mod & 0x3) # left or right shift
symbol_down = bool(event.key.keysym.mod & 0x200) # right ALT
special_down = bool(event.key.keysym.mod & 0xc00) # left or right META
#print(f"modifier = 0x{event.key.keysym.mod:04x} => shift={shift_down} symb={symbol_down} spec={special_down}")
if special_down:
ch = special_q1_keys(ch)
if not ch:
return
# reverse char to a keynum, and perhaps the meta key too
kn = None
if ch:
if ch in q1_charmap.DECODER:
kn = q1_charmap.DECODER.find(ch)
elif ch in q1_charmap.DECODER_SHIFT:
kn = q1_charmap.DECODER_SHIFT.find(ch)
shift_down = is_press
elif ch in q1_charmap.DECODER_SYMBOL:
kn = q1_charmap.DECODER_SYMBOL.find(ch)
symbol_down = is_press
#print(f"{ch=} => keynum={kn} => shift={shift_down} sym={symbol_down}")
if kn is not None:
if is_press:
q1_pressed.add(kn)
else:
q1_pressed.discard(kn)
q1_pressed.discard(q1_charmap.KEYNUM_SHIFT)
q1_pressed.discard(q1_charmap.KEYNUM_SYMBOL)
if shift_down:
q1_pressed.add(q1_charmap.KEYNUM_SHIFT)
if symbol_down:
q1_pressed.add(q1_charmap.KEYNUM_SYMBOL)
#print(f" .. => pressed: {q1_pressed}")
# see variant/touch.py where this is decoded.
if len(q1_pressed) > 5:
@ -629,6 +724,7 @@ Q1 specials:
Meta-L - Lamp button
Meta-N - NFC button
Meta-R - QR button (not Meta-Q, because that's quit!)
Click Screen - Send clipboard contents to QR/NFC
''')
sdl2.ext.init()
sdl2.SDL_EnableScreenSaver()
@ -662,6 +758,7 @@ Q1 specials:
display_r, display_w = os.pipe() # fancy OLED display
led_r, led_w = os.pipe() # genuine LED
numpad_r, numpad_w = os.pipe() # keys
data_r, data_w = os.pipe() # data dumps
# manage unix socket cleanup for client
def sock_cleanup():
@ -677,7 +774,7 @@ Q1 specials:
# - open the serial device
# - get buffering/non-blocking right
# - pass in open fd numbers
pass_fds = [display_w, numpad_r, led_w]
pass_fds = [display_w, numpad_r, led_w, data_r]
if '--metal' in sys.argv:
# bare-metal access: use a real Coldcard's bootrom+SE.
@ -696,8 +793,7 @@ Q1 specials:
os.chdir('./work')
cc_cmd = ['../coldcard-mpy',
'-X', 'heapsize=9m',
'-i', '../sim_boot.py',
str(display_w), str(numpad_r), str(led_w)] \
'-i', '../sim_boot.py'] + [str(i) for i in pass_fds] \
+ metal_args + sys.argv[1:]
xterm = subprocess.Popen(['xterm', '-title', 'Coldcard Simulator REPL',
'-geom', '132x40+650+40', '-e'] + cc_cmd,
@ -710,6 +806,7 @@ Q1 specials:
display_rx = open(display_r, 'rb', closefd=0, buffering=0)
led_rx = open(led_r, 'rb', closefd=0, buffering=0)
numpad_tx = open(numpad_w, 'wb', closefd=0, buffering=0)
data_tx = open(data_w, 'wb', closefd=0, buffering=0)
# setup no blocking
for r in [display_rx, led_rx]:
@ -750,7 +847,7 @@ Q1 specials:
pass
else:
# all other key events for Q1 get handled here
handle_q1_key_events(event, numpad_tx)
handle_q1_key_events(event, numpad_tx, data_tx)
continue
if event.type == sdl2.SDL_KEYUP or event.type == sdl2.SDL_KEYDOWN:
@ -821,7 +918,7 @@ Q1 specials:
send_event(ch, event.type == sdl2.SDL_KEYDOWN)
if is_q1 and event.type in (sdl2.SDL_MOUSEBUTTONDOWN, sdl2.SDL_MOUSEBUTTONUP):
print('NOTE: Click on sim keyboard not supported for Q1')
handle_q1_key_events(event, numpad_tx, data_tx)
else:
if event.type == sdl2.SDL_MOUSEBUTTONDOWN:
#print('xy = %d, %d' % (event.button.x, event.button.y))
@ -833,6 +930,19 @@ Q1 specials:
for ch in list(pressed):
send_event(ch, False)
if event.type == sdl2.SDL_DROPFILE:
# failed to get sdl2.SDL_DROPTEXT to work, but also not convenient to use
print(f"Sending file: {event.drop.file.decode()}")
try:
data = open(event.drop.file, 'rb').read(4096) # size limit < pipe depth
if data[-1] != b'\n':
data += b'\n' # must end w/ NL, probably needs to be text too
data_tx.write(data)
print(f".. sent {len(data)} bytes")
except Exception as exc:
print(repr(exc))
rs, ws, es = select(readables, [], [], 0)
for r in rs:

View File

@ -4,7 +4,7 @@
#
# REMINDER: you must recompile coldcard-mpy if you change this file!
#
import ustruct, sys
import ustruct, sys, uasyncio
from ubinascii import hexlify as b2a_hex
#from ubinascii import unhexlify as a2b_hex
#import utime as time
@ -16,10 +16,12 @@ rng_fd = open('/dev/urandom', 'rb')
# Emulate the red/green LED
global genuine_led
led_pipe = open(int(sys.argv[3]), 'wb')
led_pipe.write(b'\xff\x01') # all off, except SE1 green
genuine_led = True
# Provide a way to dump few hundred/4k bytes of data from QR or NFC simulated read
data_pipe = uasyncio.StreamReader(open(int(sys.argv[4]), 'rb'))
# HACK: reduce size of heap in Unix simulator to be more similar to
# actual hardware, so we can enjoy those out-of-memory errors too!

View File

@ -11,9 +11,13 @@ DATA_FILE = 'qrdata.txt'
class SimulatedQRScanner(QRScanner):
def __init__(self):
self.setup_done = True
self.version = '4.20'
self.lock = asyncio.Lock()
# returns a Q we append to as results come in
self._q = Queue()
async def _read_results(self, q):
async def _read_results(self):
# be a task that reads incoming QR codes from scanner (already in operation)
# - will be canceled when done/stopping
try:
@ -21,8 +25,15 @@ class SimulatedQRScanner(QRScanner):
except OSError:
orig_mtime = None
from ckcc import data_pipe
while 1:
await asyncio.sleep_ms(250)
try:
got = await asyncio.wait_for(data_pipe.readline(), 250)
print("Got pasted QR data.")
self._q.put_nowait(got)
except asyncio.TimeoutError:
pass
try:
_, mtime, _ = os.stat(DATA_FILE)[-3:]
@ -32,31 +43,25 @@ class SimulatedQRScanner(QRScanner):
if mtime == orig_mtime:
continue
print("Got new QR scan data.")
got = open(DATA_FILE, 'rt').read(8196).strip()
q.put_nowait(got)
print("Got new QR scan data from file.")
got = open(DATA_FILE, 'rb').read(8196)
self._q.put_nowait(got)
orig_mtime = mtime
async def scan_once(self):
# returns a Q we append to as results come in
q = Queue()
print("Put QR data into file: work/%s" % DATA_FILE)
task = asyncio.create_task(self._read_results(q))
rv = await q.get()
task.cancel()
return rv
async def _readline(self):
rv = await self._q.get()
return rv.rstrip().decode()
async def wakeup(self):
print("Click screen to paste QR data from clipboard,\nor write data into file: work/%s" % DATA_FILE)
self._task = asyncio.create_task(self._read_results())
async def tx(self, msg, timeout=250):
return
async def goto_sleep(self):
return
self._task.cancel()
async def torch_control(self, on):
print("Torch is: " + ('ON' if on else 'off'))