From 95cc006d7b8c86a2e9f70bca2b996506cdb8c520 Mon Sep 17 00:00:00 2001 From: scgbckbone Date: Tue, 3 Feb 2026 22:26:47 +0100 Subject: [PATCH] input explorer --- releases/Next-ChangeLog.md | 2 + shared/auth.py | 280 +++++++++++++++++++++++++++---------- shared/display.py | 17 ++- shared/lcd_display.py | 38 ++++- shared/psbt.py | 30 ++-- shared/qrs.py | 20 ++- testing/conftest.py | 252 ++++++++++++++++++++++++++++++++- testing/test_multisig.py | 71 ++++++++++ testing/test_sign.py | 82 +++++++++-- testing/txn.py | 41 ++++-- 10 files changed, 706 insertions(+), 127 deletions(-) diff --git a/releases/Next-ChangeLog.md b/releases/Next-ChangeLog.md index 8ed515c7..294622ee 100644 --- a/releases/Next-ChangeLog.md +++ b/releases/Next-ChangeLog.md @@ -6,6 +6,8 @@ This lists the new changes that have not yet been published in a normal release. - New Feature: Export [BIP-380](https://github.com/bitcoin/bips/blob/master/bip-0380.mediawiki) extended key expression. Navigate to `Advanced/Tools -> Export Wallet -> Key Expression` +- New Feature: Transaction Input Explorer. Shows data about UTXO(s) being spent. Press (2) before approving + transaction to enter Transaction Explorer. - New Feature: Support for v3 transactions - New Feature: Send keystrokes with all derived BIP-85 secrets - New Feature: Nuke Device. Purge device data and make it an e-waste. diff --git a/shared/auth.py b/shared/auth.py index 54aa5cad..1a54598c 100644 --- a/shared/auth.py +++ b/shared/auth.py @@ -9,12 +9,15 @@ from ubinascii import hexlify as b2a_hex from ubinascii import unhexlify as a2b_hex from uhashlib import sha256 from public_constants import AFC_SCRIPT, AF_CLASSIC, AFC_BECH32, SUPPORTED_ADDR_FORMATS, AF_P2TR -from public_constants import STXN_FINALIZE, STXN_VISUALIZE, STXN_SIGNED +from public_constants import STXN_FINALIZE, STXN_VISUALIZE, STXN_SIGNED, AF_P2SH, AF_P2WPKH_P2SH from sffile import SFFile +from menu import MenuSystem, MenuItem +from serializations import ser_uint256, SIGHASH_ALL from ux import ux_show_story, abort_and_goto, ux_dramatic_pause, ux_clear_keys, ux_confirm -from ux import show_qr_code, OK, X, abort_and_push, AbortInteraction +from ux import show_qr_code, OK, X, abort_and_push, AbortInteraction, the_ux from usb import CCBusyError -from utils import HexWriter, xfp2str, problem_file_line, cleanup_deriv_path, B2A, show_single_address +from utils import (HexWriter, xfp2str, problem_file_line, cleanup_deriv_path, B2A, + show_single_address, keypath_to_str, seconds2human_readable) from psbt import psbtObject, FatalPSBTIssue, FraudulentChangeOutput from files import CardSlot, CardMissingError from exceptions import HSMDenied, QRTooBigError @@ -479,7 +482,7 @@ class ApproveTransaction(UserAuthorizedAction): if not hsm_active: esc = "2" msg.write("Press %s to approve and sign transaction." - " Press (2) to explore txn outputs." % OK) + " Press (2) to explore transaction." % OK) if (self.input_method == "sd") and CardSlot.both_inserted(): esc += "b" msg.write(" (B) to write to lower SD slot.") @@ -488,7 +491,7 @@ class ApproveTransaction(UserAuthorizedAction): while True: ch = await ux_show_story(msg, title="OK TO SEND?", escape=esc) if ch == "2": - await self.txn_explorer() + await TXExplorer.start(self) continue else: msg.close() @@ -574,76 +577,6 @@ class ApproveTransaction(UserAuthorizedAction): # sys.print_exception(exc) return await self.failure("PSBT output failed", exc) - - async def txn_explorer(self): - # Page through unlimited-sized transaction details - # - shows all outputs (including change): their address and amounts. - from glob import dis - - def make_msg(offset, count): - dis.fullscreen('Wait...') - rv = "" - end = min(offset + count, self.psbt.num_outputs) - addrs = [] - change = [] - for i, (idx, out) in enumerate(self.psbt.output_iter(offset, end)): - outp = self.psbt.outputs[idx] - item = "Output %d%s:\n\n" % (idx, " (change)" if outp.is_change else "") - msg, addr_or_script = self.render_output(out) - item += msg - addrs.append(addr_or_script) - if outp.is_change: - change.append(i) - item += "\n" - rv += item - dis.progress_sofar(idx-offset+1, count) - - rv += 'Press RIGHT to see next group' - if offset: - rv += ', LEFT to go back' - - if not version.has_qwerty: - # Q has hint key - rv += ", (4) to show QR code" - rv += ('. %s to quit.' % X) - - return rv, addrs, change, end - - start = 0 - n = 10 - msg, addrs, change, end = make_msg(start, n) - while True: - ch = await ux_show_story(msg, title="%d-%d" % (start, end-1), - escape='479'+KEY_RIGHT+KEY_LEFT+KEY_QR, - hint_icons=KEY_QR) - if ch == 'x': - del msg - return - elif ch in "4"+KEY_QR: - from ux import show_qr_codes - # showing addresses from PSBT, no idea what is in there - # handle QR code failures gracefully - await show_qr_codes(addrs, False, start, is_addrs=True, - change_idxs=change, can_raise=False) - continue - elif (ch in KEY_LEFT+"7"): - if (start - n) < 0: - continue - else: - # go backwards in explorer - start -= n - elif (ch in KEY_RIGHT+"9"): - if (start + n) >= self.psbt.num_outputs: - continue - else: - # go forwards - start += n - else: - # nothing changed - do not recalc msg - continue - - msg, addrs, change, end = make_msg(start, n) - async def save_visualization(self, msg, sign_text=False): # write story text out, maybe signing it as we go # - return length and checksum @@ -1619,4 +1552,201 @@ def authorize_upgrade(hdr, length, **kws): abort_and_goto(UserAuthorizedAction.active_request) +class TXExplorer: + def __init__(self, n, user_auth_action, max_items): + self.n = n + self.user_auth_action = user_auth_action + self.max_items = max_items + self.chain = chains.current_chain() + self.qr_msgs = [] + self.title = None + + @classmethod + async def start(cls, user_auth_action): + rv = [ + MenuItem("Inputs", f=TXInpExplorer(user_auth_action).explore), + MenuItem("Outputs", f=TXOutExplorer(user_auth_action).explore), + ] + the_ux.push(MenuSystem(rv)) + await the_ux.interact() + + def make_ux_msg(self, offset, count): + from glob import dis + dis.fullscreen('Wait...') + rv = "" + qrs = [] + change = [] + end = min(offset + count, self.max_items) + for idx, item in self.yield_item(offset, end, qrs, change): + rv += item + dis.progress_sofar(idx-offset+1, count) + + rv += 'Press RIGHT to see next group' + if offset: + rv += ', LEFT to go back' + + if not version.has_qwerty: + # Q has hint key + rv += ", (4) to show QR code" + rv += ('. %s to quit.' % X) + + return rv, qrs, change, end + + + async def explore(self, *a): + # Page through unlimited-sized transaction details + # - shows all outputs (including change): their address and amounts. + # - shows all inputs: utxo amount and address, txid & tx index. + + start = 0 + msg, addrs, change, end = self.make_ux_msg(start, self.n) + + while True: + ch = await ux_show_story(msg, title=self.title, escape='479'+KEY_RIGHT+KEY_LEFT+KEY_QR, + hint_icons=KEY_QR) + if ch == 'x': + del msg + return + elif (ch in "4"+KEY_QR) and addrs: + from ux import show_qr_codes + # showing addresses from PSBT, no idea what is in there + # handle QR code failures gracefully + await show_qr_codes(addrs, False, start, is_addrs=True, + change_idxs=change, can_raise=False, + qr_msgs=self.qr_msgs, no_index=bool(self.qr_msgs)) + continue + elif ch in (KEY_LEFT+"7"): + if (start - self.n) < 0: + continue + else: + # go backwards in explorer + start -= self.n + elif ch in (KEY_RIGHT+"9"): + if (start + self.n) >= self.max_items: + continue + else: + # go forwards + start += self.n + else: + # nothing changed - do not recalc msg + continue + + msg, addrs, change, end = self.make_ux_msg(start, self.n) + + +class TXOutExplorer(TXExplorer): + def __init__(self, user_auth_action): + super().__init__(10, user_auth_action, user_auth_action.psbt.num_outputs) + + def yield_item(self, offset, end, qr_items, change_idxs): + # showing 10 outputs per UX page (just address/script + whether change) + self.title = "%d-%d" % (offset, end - 1) + for i, (idx, out) in enumerate(self.user_auth_action.psbt.output_iter(offset, end)): + outp = self.user_auth_action.psbt.outputs[idx] + item = "Output %d%s:\n\n" % (idx, " (change)" if outp.is_change else "") + msg, addr_or_script = self.user_auth_action.render_output(out) + item += msg + qr_items.append(addr_or_script) + if outp.is_change: + change_idxs.append(i) + item += "\n" + yield idx, item + + +class TXInpExplorer(TXExplorer): + def __init__(self, user_auth_action): + super().__init__(1, user_auth_action, user_auth_action.psbt.num_inputs) + self.qr_msgs = ["TXID", "UTXO ADDR"] + + def yield_item(self, offset, end, qr_items, change_idxs): + # showing just one input per UX page + i, (idx, txin) = next(enumerate(self.user_auth_action.psbt.input_iter(offset, offset+1))) + self.title = "Input %d" % idx + inp = self.user_auth_action.psbt.inputs[idx] + + txid = b2a_hex(ser_uint256(txin.prevout.hash)).decode() + qr_items.append(txid) + item = "%s:%d\n\n" % (txid, txin.prevout.n) + + has_utxo = inp.has_utxo() + if has_utxo: + utxo = inp.get_utxo(txin.prevout.n) + spk = b2a_hex(utxo.scriptPubKey).decode() + try: + addr = self.chain.render_address(utxo.scriptPubKey) + except: + # some script we do not understand + addr = None + + val, unit = self.chain.render_value(utxo.nValue) + item += "=== UTXO ===\n\n%s %s\n\n%s\n\n" % (val, unit, spk) + if addr: + item += show_single_address(addr) + "\n\n" + item += "Address Format: %s\n\n" % chains.addr_fmt_str(inp.addr_fmt) + qr_items.append(addr) + + if self.user_auth_action.psbt.txn_version >= 2: + has_rtl = inp.has_relative_timelock(txin) + if has_rtl: + if has_rtl[0]: + val = seconds2human_readable(has_rtl[1]) + msg = "time-based timelock of:\n %s" % val + else: + msg = "block height timelock of %d blocks" % (has_rtl[1]) + + item += "Input has relative %s\n\n" % msg + + + psbt_item = "" + if inp.required_key: + our = [inp.required_key] if isinstance(inp.required_key, bytes) else inp.required_key + psbt_item += "Our key%s:\n\n" % ("s" if len(our) > 1 else "") + for k in our: + pth = inp.subpaths[k] + psbt_item += "%s:\n%s\n\n" % (keypath_to_str(pth, prefix="%s/" % xfp2str(pth[0])), + b2a_hex(k).decode()) + + M = None + if inp.is_multisig: + ks_coord = inp.witness_script or inp.redeem_script + if ks_coord: + ks = self.user_auth_action.psbt.get(ks_coord) + + from multisig import disassemble_multisig_mn + try: + M, N = disassemble_multisig_mn(ks) + psbt_item += "Multisig: %dof%d\n\n" % (M, N) + except: pass + + if inp.part_sigs: + # do not show XFPs in case input is fully signed --> elif + # only part_sig should be available, as we haven't signed yet so added_sigs empty + done = [] + for pk, pth in inp.subpaths.items(): + if pk in inp.part_sigs: + done.append(xfp2str(pth[0])) + + if inp.fully_signed or (M and (len(done) >= M)): + psbt_item += "Input fully signed.\n\n" + else: + psbt_item += "Already signed:\n" + for xfp in done: + psbt_item += " %s\n" % xfp + psbt_item += "\n" + + if inp.sighash and (inp.sighash != SIGHASH_ALL): + # only show sighash value to the user if it is non-standard + psbt_item += "sighash: %s\n\n" % { + 1: "ALL", 2: "NONE", 3: "SINGLE", + 1 | 0x80: "ALL|ANYONECANPAY", + 2 | 0x80: "NONE|ANYONECANPAY", + 3 | 0x80: "SINGLE|ANYONECANPAY", + }[inp.sighash] + + if psbt_item: + psbt_item = "=== PSBT ===\n\n" + psbt_item + item += psbt_item + + yield idx, item + # EOF diff --git a/shared/display.py b/shared/display.py index 787e117d..22fb38af 100644 --- a/shared/display.py +++ b/shared/display.py @@ -352,7 +352,7 @@ class Display: self._draw_qr_display(bw, lm, msg, False, None, idx_hint, False) def draw_qr_display(self, qr_data, msg, is_alnum, sidebar, idx_hint, invert, - is_addr=False, force_msg=False, is_change=False): + is_addr=False, force_msg=False, side_msg=None): # 'sidebar' is a pre-formated obj to show to right of QR -- oled life # - 'msg' will appear to right if very short, else under in tiny # - ignores "is_addr" because exactly zero space to do anything special @@ -396,19 +396,19 @@ class Display: gly = framebuf.FrameBuffer(bytearray(packed), w, w, framebuf.MONO_HLSB) self.dis.blit(gly, XO, YO, 1) - self._draw_qr_display(bw, lm, msg, is_alnum, sidebar, idx_hint, invert, is_addr, is_change) + self._draw_qr_display(bw, lm, msg, is_alnum, sidebar, idx_hint, invert, is_addr, side_msg) def _draw_qr_display(self, bw, lm, msg, is_alnum, sidebar, idx_hint, invert, - is_addr=False, is_change=False): + is_addr=False, side_msg=None): # does not draw actual QR, but all other things in the screen from utils import word_wrap if not sidebar and not msg: pass - elif not sidebar and ((len(msg) > (5*7)) or is_change): + elif not sidebar and ((len(msg) > (5*7)) or side_msg): # use FontTiny and word wrap (will just split if no spaces) # native segwit addresses and taproot - # if is_change=True also p2pkh and p2sh fall into this category as space is needed for "CHANGE" + # if 'side_msg' also p2pkh and p2sh fall into this category as space is needed for "CHANGE BACK" text x = bw + lm + 4 ww = ((128 - x)//4) - 1 # char width avail y = 1 @@ -424,8 +424,11 @@ class Display: self.text(x, y, line, FontTiny) y += 8 - if is_addr and is_change: - self.text(x+4, y+8, "CHANGE BACK", FontTiny) + if side_msg and (len(side_msg) < 15): + y_pos = y + 8 + # only render if there is space + if (self.HEIGHT - y_pos) >= FontTiny.height: + self.text(x+4, y+8, side_msg, FontTiny) else: # hand-positioned for known cases # - sidebar = (text, #of char per line) diff --git a/shared/lcd_display.py b/shared/lcd_display.py index de601c23..a87f62b8 100644 --- a/shared/lcd_display.py +++ b/shared/lcd_display.py @@ -694,6 +694,34 @@ class Display: else: self.text(-1, 0, str_idx) + def draw_side_msg(self, msg, has_idx): + right_sub = 2 if has_idx else 0 + start_right = right_msg = None + if len(msg) <= CHARS_H: + # we only need left side + start_left = CHARS_H - len(msg) + left_msg = msg + else: + split_msg = msg.split() + if len(split_msg) == 1 or len(split_msg) > 2: + return # not possible + + left_msg, right_msg = split_msg + if len(left_msg) > CHARS_H: + return + if len(right_msg) > (CHARS_H - right_sub): + return + + start_left = CHARS_H - len(left_msg) + start_right = CHARS_H - len(right_msg) + + for i, c in enumerate(left_msg, start=start_left): + self.text(1, i, c) + + if start_right: + for i, c in enumerate(right_msg, start=start_right): + self.text(-1, i, c) + def draw_qr_error(self, idx_hint, msg=None): x = 85 y = 30 @@ -710,7 +738,7 @@ class Display: self.show() def draw_qr_display(self, qr_data, msg, is_alnum, sidebar, idx_hint, invert, partial_bar=None, - is_addr=False, force_msg=False, is_change=False): + is_addr=False, force_msg=False, side_msg=None): # Show a QR code on screen w/ some text under it # - invert not supported on Q1 # - sidebar not supported here (see users.py) @@ -804,12 +832,8 @@ class Display: if idx_hint: self.draw_qr_idx_hint(idx_hint) - if is_addr and is_change: - for i, c in enumerate("CHANGE", start=4): - self.text(1, i, c) - - for i, c in enumerate("BACK", start=6): - self.text(-1, i, c) + if side_msg: + self.draw_side_msg(side_msg, idx_hint) # pass a max brightness flag here, which will be cleared after next show self.show(max_bright=True) diff --git a/shared/psbt.py b/shared/psbt.py index dbebf77f..33309e3e 100644 --- a/shared/psbt.py +++ b/shared/psbt.py @@ -657,7 +657,7 @@ class psbtInputProxy(psbtProxy): #self.fully_signed = False # we can't really learn this until we take apart the UTXO's scriptPubKey - #self.af = None # string representation of address format aka. script type + #self.af = None # address format aka. script type #self.amount = None #self.utxo_spk = None # scriptPubKey for input utxo @@ -740,6 +740,15 @@ class psbtInputProxy(psbtProxy): # do we have a copy of the corresponding UTXO? return bool(self.utxo) or bool(self.witness_utxo) + def guess_multisig_addr_fmt(self): + # based on provided input scripts (witness/redeem) + if self.witness_script and not self.redeem_script: + return AF_P2WSH + elif self.witness_script and self.redeem_script: + return AF_P2WSH_P2SH + else: + return AF_P2SH + def get_utxo(self, idx): # Load up the TxOut for specific output of the input txn associated with this in PSBT # Aka. the "spendable" for this input #. @@ -1318,15 +1327,18 @@ class psbtObject(psbtProxy): fd.seek(old_pos) - def input_iter(self): + def input_iter(self, start=0, stop=None): # Yield each of the txn's inputs, as a tuple: # # (index, CTxIn) # # - we also capture much data about the txn on the first pass thru here # + if stop is None: + stop = self.num_inputs + if self.is_v2: - for idx in range(self.num_inputs): + for idx in range(start, stop): inp = self.inputs[idx] prevout = COutPoint(uint256_from_str(self.get(inp.previous_txid)), unpack(" 1 else None - def is_change(self): + def side_msg(self): if self.idx in self.change_idxs: - return True - return False + return "CHANGE BACK" + + elif self.qr_msgs: + try: + return self.qr_msgs[self.idx] + except IndexError: pass + + return None def redraw(self): # Redraw screen. @@ -106,7 +116,7 @@ class QRDisplaySingle(UserInteraction): dis.draw_qr_display(self.qr_data, msg, self.is_alnum, self.sidebar, idx_hint, self.invert, is_addr=self.is_addrs, force_msg=self.force_msg, - is_change=self.is_change()) + side_msg=self.side_msg()) async def interact_bare(self): from glob import NFC, dis diff --git a/testing/conftest.py b/testing/conftest.py index 199ad3a7..ea60fd83 100644 --- a/testing/conftest.py +++ b/testing/conftest.py @@ -3,7 +3,7 @@ import pytest, time, sys, random, re, ndef, os, glob, hashlib, json, functools, io, math, bech32, pdb, base64 from subprocess import check_output from ckcc.protocol import CCProtocolPacker -from helpers import B2A, U2SAT, hash160, taptweak, addr_from_display_format +from helpers import B2A, U2SAT, hash160, taptweak, addr_from_display_format, seconds2human_readable from base58 import decode_base58_checksum from bip32 import BIP32Node from msg import verify_message @@ -15,6 +15,7 @@ from constants import * from charcodes import * from core_fixtures import _need_keypress, _sim_exec, _cap_story, _cap_menu, _cap_screen, _sim_eval from core_fixtures import _press_select, _pick_menu_item, _enter_complex, _dev_hw_label +from txn import render_address # lock down randomness @@ -600,7 +601,7 @@ def verify_qr_address(cap_screen_qr, cap_screen, is_q1): # plus text version of address, if any, is right. from ckcc_protocol.constants import AFC_BECH32 - def doit(addr_fmt, expect_addr=None, is_change=None): + def doit(addr_fmt, expect_addr=None, is_change=None, txt_check=True): qr = cap_screen_qr().decode('ascii') if isinstance(addr_fmt, str): @@ -649,7 +650,7 @@ def verify_qr_address(cap_screen_qr, cap_screen, is_q1): txt = ''.join(full_split).replace('CHANGE BACK', '') - if txt: + if txt_check and txt: assert txt == qr if is_q1: # addr is not spaced out on Mk4, but check it was on Q @@ -2486,15 +2487,251 @@ def goto_address_explorer(goto_home, pick_menu_item, need_keypress, return doit + @pytest.fixture -def txout_explorer(cap_story, press_cancel, need_keypress, is_q1, verify_qr_address): +def explorer_input_check(cap_story, press_cancel, need_keypress, is_q1, verify_qr_address, cap_menu, + cap_screen_qr, cap_screen): + def doit(idx, title, story, af, in_amt=100000000, num_our_keys=1, chain="XTN", is_multi=False, + sighash=None, sequence=None, fully_signed=False, already_signed=None): + # collect QR codes first + need_keypress(KEY_QR if is_q1 else "4") + + if af is None: + assert "=== UTXO" not in story + + txid_qr = addr_qr = None + qrs = ["TXID"] + if "=== UTXO" in story and af != "unknown": + qrs += ["UTXO ADDR"] + for what in qrs: + if "TXID" == what: + txid_qr = cap_screen_qr().decode('ascii').lower() + else: + addr_qr = verify_qr_address(af, txt_check=False) + + scr = cap_screen() + + scr_txt = scr.replace("~", "").replace("\n", "").replace(" ", "") + target_txt = what.replace(" ", "") + assert target_txt in scr_txt + + need_keypress(KEY_RIGHT if is_q1 else "9") + time.sleep(.5) + + press_cancel() # QR code on screen - exit + + # header and txin info always present + assert title == f"Input {idx}" + ss = story.split("\n\n") + txid_n = ss[0] + txid, n = txid_n.split(":") + assert txid_qr == txid + assert len(txid) == 64 + int(n) + idx = 1 + if "=== UTXO ===" == ss[idx]: + idx += 1 + txt_amount = ss[idx] + assert txt_amount == f'{in_amt / 100000000:.8f} {chain}' + idx += 1 + spk = ss[idx] + if af != "unknown": + calc_addr = render_address(bytes.fromhex(spk), testnet=False if chain == "BTC" else True) + idx += 1 + addr = addr_from_display_format(ss[idx]) + assert addr == calc_addr + assert addr_qr == addr + idx += 1 + addr_fmt = ss[idx].split(" ")[-1] + if af == "p2wpkh-p2sh": + assert addr_fmt == "p2sh-p2wpkh" + else: + assert addr_fmt == af + + idx += 1 + + parsed_sequence = None + if ss[idx].startswith("Input has relative"): # time-lock + parsed_sequence = ss[idx] + idx += 1 + + parsed_multisig = None + parsed_signed_xfps = None + parsed_sighash = None + parsed_our_keys = {} + parsed_fully_signed = False + if "=== PSBT ===" == ss[idx]: + idx += 1 + if ss[idx].startswith("Our key"): + idx += 1 + while ":\n" in ss[idx]: + der, pk = ss[idx].split(":\n") + parsed_our_keys[pk] = der + idx += 1 + + if "Multisig:" in ss[idx]: + parsed_multisig = ss[idx] + idx += 1 + + + if "Input fully signed." in ss[idx]: + parsed_fully_signed = True + idx += 1 + + if "Already signed:" in ss[idx]: + parsed_signed_xfps = ss[idx].split("\n ")[1:] + idx += 1 + + if "sighash" in ss[idx]: + parsed_sighash = ss[idx].split(" ")[-1] + idx += 1 + + # SEQUENCE + if parsed_sequence is None: + # without consensus meaning + # 0 --> no lock + assert (not sequence) or (sequence & (1 << 31)) + else: + is_timebased = False + if sequence & (1 << 22): + # Time-based relative lock-time + is_timebased = True + res = (sequence & 0x0000ffff) << 9 + else: + # Block height relative lock-time + res = sequence & 0x0000ffff + + if is_timebased: + val = seconds2human_readable(res) + msg = "time-based timelock of:\n %s" % val + else: + msg = "block height timelock of %d blocks" % res + + assert msg in parsed_sequence + + # OUR KEYS + if not parsed_our_keys: + assert not num_our_keys + else: + if num_our_keys is not None: + assert len(parsed_our_keys) == num_our_keys + + n = BIP32Node.from_wallet_key(simulator_fixed_xprv if chain == "BTC" else simulator_fixed_tprv) + for pk, der in parsed_our_keys.items(): + assert bytes.fromhex(pk) == n.subkey_for_path(der.split("/", 1)[-1]).sec() + + # MULTISIG + if parsed_multisig is None: + assert not is_multi + else: + assert is_multi + ms_txt = parsed_multisig + msg, m_n = ms_txt.split(" ") + assert msg == "Multisig:" + M, N = m_n.split("of") + assert is_multi[0] == int(M) + assert is_multi[1] == int(N) + if parsed_signed_xfps is None: + assert (not already_signed) or fully_signed + else: + assert set(parsed_signed_xfps) == set(already_signed) + + # SIGHASH + if parsed_sighash is None: + assert sighash in [None, "ALL"] + else: + assert sighash == parsed_sighash + + # IS FULLY SIGNED + assert bool(fully_signed) == parsed_fully_signed + + return doit + + +@pytest.fixture +def txin_explorer(cap_story, press_cancel, need_keypress, is_q1, cap_menu, + pick_menu_item, explorer_input_check): + def doit(num_inputs, inputs): + + time.sleep(.1) + title, story = cap_story() + assert title == 'OK TO SEND?' + assert "Press (2) to explore transaction" in story + need_keypress("2") + time.sleep(.1) + pick_menu_item("Inputs") + + for i in range(num_inputs): + time.sleep(.1) + title, story = cap_story() + ss = story.split("\n\n") + assert "Press RIGHT to see next group" in ss[-1] + if i: + assert " LEFT to go back" in ss[-1] + else: + assert "LEFT" not in ss[-1] + + if not is_q1: + assert "(4) to show QR code" in ss[-1] + + try: + inp = inputs[i] + except IndexError: + inp = inputs[0] + + explorer_input_check(i, title, story, *inp) + + need_keypress(KEY_RIGHT if is_q1 else "9") + + # currently sitting at the last story in explorer + # try to go further (must not work and story is unchanged) + for _ in range(2): + need_keypress(KEY_RIGHT if is_q1 else "9") + time.sleep(.1) + _, xstory = cap_story() + assert story == xstory + + # go back to first explorer story + for _ in range(num_inputs): + need_keypress(KEY_LEFT if is_q1 else "7") + time.sleep(.1) + + title, story = cap_story() + assert "Input 0" == title + + # currently sitting at the first story in explorer + # try to go further (must not work and story is unchanged) + for _ in range(2): + need_keypress(KEY_LEFT if is_q1 else "7") + time.sleep(.1) + _, xstory = cap_story() + assert story == xstory + + # leave explorer - will return back to sign story + press_cancel() + time.sleep(.1) + m = cap_menu() + assert "Outputs" in m + assert "Inputs" in m + press_cancel() + time.sleep(.1) + title, _ = cap_story() + assert title == 'OK TO SEND?' + press_cancel() + + return doit + +@pytest.fixture +def txout_explorer(cap_story, press_cancel, need_keypress, is_q1, verify_qr_address, cap_menu, + pick_menu_item): def doit(data, chain="XTN"): time.sleep(.1) title, story = cap_story() assert title == 'OK TO SEND?' - assert "Press (2) to explore txn" in story + assert "Press (2) to explore transaction" in story need_keypress("2") time.sleep(.1) + pick_menu_item("Outputs") n = 10 for i in range(0, len(data), n): @@ -2584,6 +2821,11 @@ def txout_explorer(cap_story, press_cancel, need_keypress, is_q1, verify_qr_addr # leave explorer - will return back to sign story press_cancel() time.sleep(.1) + m = cap_menu() + assert "Outputs" in m + assert "Inputs" in m + press_cancel() + time.sleep(.1) title, _ = cap_story() assert title == 'OK TO SEND?' press_cancel() diff --git a/testing/test_multisig.py b/testing/test_multisig.py index 8b0f4603..35605700 100644 --- a/testing/test_multisig.py +++ b/testing/test_multisig.py @@ -3565,4 +3565,75 @@ def test_fwd_slash_in_name(import_ms_wallet, clear_miniscript, pick_menu_item, n press_cancel() press_cancel() + +@pytest.mark.parametrize("chain", ["BTC", "XTN"]) +@pytest.mark.parametrize("M_N", [(3, 5)])#, (14, 15)]) +@pytest.mark.parametrize("complete", [True, False, None]) +@pytest.mark.parametrize("addr_fmt", ["p2wsh", "p2sh", "p2sh-p2wsh"]) +def test_txin_explorer(dev, chain, M_N, addr_fmt, fake_ms_txn, start_sign, settings_set, txin_explorer, + cap_story, pytestconfig, import_ms_wallet, complete, clear_ms): + # TODO This test MUST be run with --psbt2 flag on and off + clear_ms() + settings_set("chain", chain) + inp_amount = 100000000 + num_ins = 2 + M, N = M_N + + keys = import_ms_wallet(M, N, name='txin_expl', accept=True, netcode=chain, + descriptor=True, addr_fmt=addr_fmt) + + all_xfps = [xfp2str(k[0]) for k in keys][:-1] # remove myself + if complete: + target_xfps = all_xfps[:M] + elif complete is False: + target_xfps = all_xfps[:M-1] + else: + target_xfps = [] + + + def hack(psbt): + for inp in psbt.inputs: + for i, (pk, pth) in enumerate(inp.bip32_paths.items()): + xfp = pth[:4].hex().upper() + if xfp in target_xfps: + inp.part_sigs[pk] = os.urandom(71) + + + psbt = fake_ms_txn(num_ins, 1, M, keys, inp_af=unmap_addr_fmt[addr_fmt], + input_amount=inp_amount, psbt_v2=pytestconfig.getoption('psbt2'), + hack_psbt=hack) + + start_sign(psbt) + txin_explorer(num_ins, [(addr_fmt, inp_amount, 1, chain, (M,N), None, None, complete, target_xfps)]) + + +def test_txin_explorer_our_sig(dev, fake_ms_txn, start_sign, settings_set, clear_ms, + txin_explorer, cap_story, pytestconfig, import_ms_wallet): + # TODO This test MUST be run with --psbt2 flag on and off + clear_ms() + inp_amount = 100000000 + num_ins = 3 + M, N = 5,7 + af = "p2wsh" + + keys = import_ms_wallet(M, N, name='txin_expl', accept=True, netcode="XTN", + descriptor=True, addr_fmt="p2wsh") + + my_xfp = xfp2str(keys[-1][0]) + + def hack(psbt): + for inp in psbt.inputs: + for i, (pk, pth) in enumerate(inp.bip32_paths.items()): + xfp = pth[:4].hex().upper() + if xfp in my_xfp: + inp.part_sigs[pk] = os.urandom(71) + + + psbt = fake_ms_txn(num_ins, 1, M, keys, inp_af=unmap_addr_fmt[af], + input_amount=inp_amount, psbt_v2=pytestconfig.getoption('psbt2'), + hack_psbt=hack) + + start_sign(psbt) + txin_explorer(num_ins, [(af, inp_amount, 0, "XTN", (M,N), None, None, False, [my_xfp])]) + # EOF diff --git a/testing/test_sign.py b/testing/test_sign.py index 19b948af..9872347f 100644 --- a/testing/test_sign.py +++ b/testing/test_sign.py @@ -135,9 +135,9 @@ def test_psbt_proxy_parsing(fn, sim_execfile, sim_exec, src_root_dir, sim_root_d @pytest.mark.unfinalized @pytest.mark.parametrize("addr_fmt", ["p2tr", "p2wpkh"]) def test_speed_test(dev, addr_fmt, fake_txn, is_mark3, is_mark4, start_sign, end_sign, - press_select, press_cancel, sim_root_dir): + press_select, press_cancel, sim_root_dir, is_q1): # measure time to sign a larger txn - if is_mark4: + if is_mark4 or is_q1: # Mk4: expect # 20/250 => 15.5s (or 10.0 if seed is cached) # 200/500 => 96.3s @@ -230,7 +230,7 @@ def test_io_size(request, use_regtest, decode_with_bitcoind, fake_txn, except: cap_story = None - signed = end_sign(True, finalize=True) + signed = end_sign(accept=True, finalize=True) with open(f'{sim_root_dir}/debug/signed.txn', 'wb') as f: f.write(signed) @@ -1418,7 +1418,7 @@ def test_payjoin_signing(num_ins, num_outs, fake_txn, try_sign, start_sign, end_ with open(f'{sim_root_dir}/debug/payjoin.psbt', 'wb') as f: f.write(psbt) - ip = start_sign(psbt, finalize=False) + start_sign(psbt, finalize=False) time.sleep(.1) _, story = cap_story() @@ -1428,7 +1428,7 @@ def test_payjoin_signing(num_ins, num_outs, fake_txn, try_sign, start_sign, end_ assert "different wallet" in story assert ': %s' % (num_ins-1) in story - txn = end_sign(True, finalize=False) + end_sign(True, finalize=False) @pytest.mark.parametrize('addr_fmt', ["p2wpkh", "p2tr"]) def test_fully_unsigned(fake_txn, try_sign, addr_fmt): @@ -1752,6 +1752,7 @@ def test_foreign_utxo_missing(addr_fmt, num_not_ours, dev, fake_txn, start_sign, start_sign(psbt) time.sleep(.1) _, story = cap_story() + no = ", ".join(str(i) for i in list(range(num_not_ours))) assert "warnings" in story assert f"Limited Signing:" in story @@ -2802,7 +2803,6 @@ def test_nsequence_timebased_relative_locktime_ux(seconds, use_regtest, bitcoind assert txid == story_txid -@pytest.mark.veryslow @pytest.mark.bitcoind @pytest.mark.parametrize("abs_lock", [True, False]) @pytest.mark.parametrize("num_rtl", [(2,3),(4,7),(6,7)]) @@ -3111,6 +3111,38 @@ def test_txout_explorer(chain, data, fake_txn, start_sign, settings_set, txout_e start_sign(psbt) txout_explorer(data, chain) +@pytest.mark.parametrize("chain", ["BTC", "XTN"]) +@pytest.mark.parametrize("addr_fmt", ["p2wpkh", "p2pkh", "p2wpkh-p2sh"]) +def test_txin_explorer(chain, addr_fmt, fake_txn, start_sign, settings_set, txin_explorer, + cap_story, pytestconfig): + # TODO This test MUST be run with --psbt2 flag on and off + settings_set("chain", chain) + inp_amount = 1000000 + num_ins = 3 + + if addr_fmt == "p2wpkh": + segwit = True + wrapped = False + sh = "SINGLE" + seq = 1100 + elif addr_fmt == "p2pkh": + segwit = False + wrapped = False + sh = "ALL|ANYONECANPAY" + seq = SEQUENCE_LOCKTIME_TYPE_FLAG | (512 >> 9) + else: + segwit = True + wrapped = True + sh = "SINGLE|ANYONECANPAY" + seq = 1 + + psbt = fake_txn(num_ins, 1, segwit_in=segwit, wrapped=wrapped, + psbt_v2=pytestconfig.getoption('psbt2'), input_amount=inp_amount, + sequences=[seq], sighashes=[sh]) + + start_sign(psbt) + txin_explorer(num_ins, [(addr_fmt, inp_amount, 1, chain, False, sh, seq)]) + @pytest.mark.parametrize("finalize", [True, False]) @pytest.mark.parametrize("data", [ [(1, b"Coinkite"), (0, b"Mk1 Mk2 Mk3 Mk4 Q"), (100, b"binarywatch.org"), (100, b"a" * 75)], @@ -3121,7 +3153,7 @@ def test_txout_explorer(chain, data, fake_txn, start_sign, settings_set, txout_e ]) def test_txout_explorer_op_return(finalize, data, fake_txn, start_sign, cap_story, is_q1, need_keypress, press_cancel, press_select, end_sign, - cap_screen_qr, cap_screen): + cap_screen_qr, cap_screen, pick_menu_item): outputs = [["p2tr", 50000, not i] for i in range(20)] outputs += [["op_return", am, None, d] for am, d in data] out_val = sum(o[1] for o in outputs) @@ -3141,8 +3173,9 @@ def test_txout_explorer_op_return(finalize, data, fake_txn, start_sign, cap_stor else: assert "OP_RETURN > 80 bytes" not in story - assert "Press (2) to explore txn" in story + assert "Press (2) to explore transaction" in story need_keypress("2") + pick_menu_item("Outputs") time.sleep(.1) # OP_RETURN is put at the end of output list (fake_txn) # 20 normal outputs, all OP_RETURN on last page @@ -3213,6 +3246,7 @@ def test_txout_explorer_op_return(finalize, data, fake_txn, start_sign, cap_stor assert s == dd0 assert e == dd1 + press_cancel() # exit output explorer press_cancel() # exit txn out explorer end_sign(finalize=finalize) @@ -3483,6 +3517,38 @@ def test_txn_v3_eph_anchor(finalize, set_seed_words, start_sign, end_sign, cap_s txo.deserialize(BytesIO(res)) assert txo.nVersion == 3 + +@pytest.mark.parametrize("stype", ["p2tr", "unknown", "no_utxo"]) +def test_unknown_input_script(stype, fake_txn , start_sign, cap_story, use_testnet, + txin_explorer): + use_testnet() + + def hack(psbt): + psbt.inputs[0].bip32_paths = None + psbt.inputs[0].utxo = None + psbt.inputs[0].witness_utxo = None + if stype != "no_utxo": + if stype == "p2tr": + scr = bytes([81, 32]) + os.urandom(32) + else: + scr = bytes([90, 45]) + os.urandom(45) + + psbt.inputs[0].witness_utxo = CTxOut(100000000, scr).serialize() + + # second input is always ours + if stype == "no_utxo": + af = None + else: + af = stype + + ins = [(af, 100000000, 0), ("p2wpkh", 100000000, 1)] + + psbt = fake_txn(2, 2, segwit_in=True, change_outputs=[0], psbt_hacker=hack) + start_sign(psbt) + title, story = cap_story() + assert title == "OK TO SEND?" + txin_explorer(len(ins), ins) + # EOF @pytest.mark.bitcoind diff --git a/testing/txn.py b/testing/txn.py index dad6e4e3..7498f1d5 100644 --- a/testing/txn.py +++ b/testing/txn.py @@ -9,7 +9,7 @@ from io import BytesIO from helpers import fake_dest_addr, make_change_addr, hash160, taptweak, str_to_path from base58 import decode_base58 from bip32 import BIP32Node -from constants import simulator_fixed_tprv +from constants import simulator_fixed_tprv, SIGHASH_MAP from serialize import uint256_from_str from ctransaction import CTransaction, COutPoint, CTxIn, CTxOut @@ -22,7 +22,8 @@ def fake_txn(dev, pytestconfig): def doit(inputs, outputs, master_xpub=None, psbt_hacker=None, add_xpub=None, psbt_v2=None, fee=200, addr_fmt="p2wpkh", input_amount=100_000_000, capture_scripts=None, - force_full_tx_utxo=False, supply_num_ins=1, supply_num_outs=1): # input_amount in sats + force_full_tx_utxo=False, supply_num_ins=1, supply_num_outs=1, lock_time=0, + sequences=None, sighashes=None): # input_amount in sats psbt = BasicPSBT() @@ -52,6 +53,7 @@ def fake_txn(dev, pytestconfig): psbt.output_count = num_outs txn = CTransaction() + txn.nLockTime = lock_time txn.nVersion = 2 master_xpub = master_xpub or dev.master_xpub or simulator_fixed_tprv @@ -146,15 +148,36 @@ def fake_txn(dev, pytestconfig): supply.calc_sha256() + if sighashes: + try: + sh = sighashes[i] + except IndexError: + sh = sighashes[0] + + psbt.inputs[i].sighash = SIGHASH_MAP[sh] + + + if lock_time and not i: + seq = 0xfffffffd + else: + seq = 0xffffffff + + if sequences: + # sequences parameter overrides what locktime sets for 0th input nSequence - if defined + try: + seq = sequences[i] + except IndexError: + seq = sequences[0] + + spendable = CTxIn(COutPoint(supply.sha256, 0), nSequence=seq) + txn.vin.append(spendable) + if psbt_v2: psbt.inputs[i].previous_txid = supply.hash psbt.inputs[i].prevout_idx = 0 - # TODO sequence - # TODO height timelock - # TODO time timelock - else: - spendable = CTxIn(COutPoint(supply.sha256, 0), nSequence=0xffffffff) - txn.vin.append(spendable) + psbt.inputs[i].sequence = seq + # psbt.inputs[i].req_time_locktime = None + # psbt.inputs[i].req_height_locktime = None # calculate fee if num_outs: @@ -238,8 +261,6 @@ def fake_txn(dev, pytestconfig): rv = BytesIO() psbt.serialize(rv) - pos = rv.tell() - # assert pos <= MAX_TXN_LEN, 'too fat %d' % pos return rv.getvalue()