tx output explorer; fix fake_ms_txn and add psbtV2; add ability to add multiple op_returns to fake_txn

This commit is contained in:
scgbckbone 2024-05-14 17:03:26 +02:00 committed by doc-hex
parent 34328a0d30
commit 03ff4f842e
10 changed files with 352 additions and 64 deletions

View File

@ -4,6 +4,7 @@ This lists the new changes that have not yet been published in a normal release.
# Shared Improvements - Both Mk4 and Q
- New Feature: Transaction output explorer
- Enhancement: Stricter p2sh-p2wpkh validation
- Enhancement: mention the need to remove old duress wallets before locking down temporary seed
- Bugfix: Fix PSBTv2 `PSBT_GLOBAL_TX_MODIFIABLE` parsing

View File

@ -21,7 +21,7 @@ from psbt import psbtObject, FatalPSBTIssue, FraudulentChangeOutput
from files import CardSlot
from exceptions import HSMDenied
from version import MAX_TXN_LEN
from charcodes import KEY_QR, KEY_NFC, KEY_ENTER, KEY_CANCEL
from charcodes import KEY_QR, KEY_NFC, KEY_ENTER, KEY_CANCEL, KEY_LEFT, KEY_RIGHT
# Where in SPI flash/PSRAM the two PSBT files are (in and out)
TXN_INPUT_OFFSET = 0
@ -728,17 +728,27 @@ class ApproveTransaction(UserAuthorizedAction):
self.result = await self.save_visualization(msg, (self.stxn_flags & STXN_SIGNED))
del self.psbt
self.done()
return
ux_clear_keys(True)
dis.progress_bar_show(1) # finish the Validating...
if not hsm_active:
msg.write("Press OK to approve and sign transaction.")
explore = self.psbt.num_outputs > 10
msg.write("\nPress OK to approve and sign transaction.")
if explore:
msg.write(" Press (2) to explore txn.")
if self.is_sd and CardSlot.both_inserted():
msg.write(" (B) to write to lower SD slot.")
msg.write(" X to abort.")
ch = await ux_show_story(msg, title="OK TO SEND?", escape="b")
while True:
ch = await ux_show_story(msg, title="OK TO SEND?", escape="2b")
if ch == "2" and explore:
await self.txn_explorer()
continue
else:
msg.close()
del msg
break
else:
ch = await hsm_active.approve_transaction(self.psbt, self.psbt_sha, msg.getvalue())
dis.progress_bar_show(1) # finish the Validating...
@ -832,6 +842,51 @@ class ApproveTransaction(UserAuthorizedAction):
continue
break
async def txn_explorer(self):
from glob import dis
start = 0
n = 10
def make_msg(start, n):
dis.fullscreen('Wait...')
rv = ""
end = min(start + n, self.psbt.num_outputs)
for idx, out in self.psbt.output_iter(start, end):
outp = self.psbt.outputs[idx]
item = "Output %d%s:\n\n" % (idx, " (change)" if outp.is_change else "")
item += self.render_output(out)
item += "\n"
rv += item
if self.psbt.num_outputs > n:
rv += "Press RIGHT to see next group, LEFT to go back. X to quit."
return rv
msg = make_msg(start, n)
while True:
ch = await ux_show_story(msg, escape='79'+KEY_RIGHT+KEY_LEFT)
if ch == 'x':
del msg
return
elif (ch in KEY_LEFT+"7"):
# go backwards in explorer
if (start - n) < 0:
continue
else:
start -= n
elif (ch in KEY_RIGHT+"9"):
# go forwards
if (start + n) >= self.psbt.num_outputs:
continue
else:
start += n
else:
# nothing changed - do not recalc msg
continue
msg = make_msg(start, n)
async def save_visualization(self, msg, sign_text=False):
# write text into spi flash, maybe signing it as we go
# - return length and checksum

View File

@ -1050,11 +1050,14 @@ class psbtObject(psbtProxy):
raise FatalPSBTIssue("Duplicate key. Key for unknown value already provided in global namespace.")
self.unknown[key] = val
def output_iter(self):
def output_iter(self, start=0, stop=None):
# yield the txn's outputs: index, (CTxOut object) for each
if stop is None:
stop = self.num_outputs
total_out = 0
if self.is_v2:
for idx in range(self.num_outputs):
for idx in range(start, stop):
out = self.outputs[idx]
amount = unpack("<q", self.get(out.amount))[0]
spk = self.get(out.script)
@ -1067,8 +1070,11 @@ class psbtObject(psbtProxy):
fd = self.fd
fd.seek(self.vout_start)
if start != 0:
_skip_n_objs(fd, start, 'CTxOut')
tx_out = CTxOut()
for idx in range(self.num_outputs):
for idx in range(start, stop):
tx_out.deserialize(fd)
@ -1079,11 +1085,12 @@ class psbtObject(psbtProxy):
fd.seek(cont)
if self.total_value_out is None:
self.total_value_out = total_out
else:
assert self.total_value_out == total_out, \
'%s != %s' % (self.total_value_out, total_out)
if start == 0 and stop == self.num_outputs:
if self.total_value_out is None:
self.total_value_out = total_out
else:
assert self.total_value_out == total_out, \
'%s != %s' % (self.total_value_out, total_out)
def parse_txn(self):
# Need to semi-parse in unsigned transaction.

View File

@ -196,11 +196,6 @@ async def ux_show_story(msg, title=None, escape=None, sensitive=False,
ln = q1_reword(ln)
lines.extend(word_wrap(ln, CH_PER_W))
# no longer needed & rude to our caller, but let's save the memory
msg.close()
del msg
gc.collect()
else:
# simple string being shown
msg = q1_reword(msg)

View File

@ -1,6 +1,6 @@
# (c) Copyright 2020 by Coinkite Inc. This file is covered by license found in COPYING-CC.
#
import pytest, time, sys, random, re, ndef, os, glob, hashlib, json, functools, io, pdb
import pytest, time, sys, random, re, ndef, os, glob, hashlib, json, functools, io, math, pdb
from subprocess import check_output
from ckcc.protocol import CCProtocolPacker
from helpers import B2A, U2SAT, hash160
@ -2073,6 +2073,84 @@ def goto_address_explorer(goto_home, pick_menu_item, need_keypress,
return doit
@pytest.fixture
def txout_explorer(cap_story, press_cancel, need_keypress, is_q1):
def doit(data, chain="XTN"):
time.sleep(.1)
title, story = cap_story()
assert title == 'OK TO SEND?'
assert "Press (2) to explore txn" in story
need_keypress("2")
time.sleep(.1)
n = 10
for i in range(0, len(data), n):
d = data[i:i + n]
time.sleep(.1)
_, story = cap_story()
ss = story.split("\n\n")
assert len(ss) == (len(d) * 2) + 1
assert "Press RIGHT to see next group, LEFT to go back." in ss[-1]
for i, (sa, sb, (af, amount, change)) in enumerate(zip(ss[:-1:2], ss[1::2], d), start=i):
if change:
assert f"Output {i} (change):" == sa
else:
assert f"Output {i}:" == sa
txt_amount, _, addr = sb.split("\n")
assert txt_amount == f'{amount / 100000000:.8f} {chain}'
if af == "p2pkh":
if chain == "BTC":
assert addr.startswith("1")
else:
assert addr[0] in "mn"
elif af in ("p2wpkh", "p2wsh"):
target = "bc1q" if chain == "BTC" else "tb1q"
assert addr.startswith(target)
elif af in ("p2sh", "p2wpkh-p2sh", "p2wsh-p2sh"):
target = "3" if chain == "BTC" else "2"
assert addr.startswith(target)
else:
raise ValueError(f"'{af}' not implemented")
need_keypress(KEY_RIGHT if is_q1 else "9")
# 10 outputs per story
# currently sitting at the last story in explorer
# try to go further (must not work and story is unchanged)
for _ in range(2):
need_keypress(KEY_RIGHT if is_q1 else "9")
time.sleep(.1)
_, xstory = cap_story()
assert story == xstory
# go back to first explorer story
story_nums = math.ceil(len(data) / 10)
for _ in range(story_nums):
need_keypress(KEY_LEFT if is_q1 else "7")
time.sleep(.1)
_, story = cap_story()
assert "Output 0" in story.split("\n\n")[0]
# currently sitting at the first story in explorer
# try to go further (must not work and story is unchanged)
for _ in range(2):
need_keypress(KEY_LEFT if is_q1 else "7")
time.sleep(.1)
_, xstory = cap_story()
assert story == xstory
# leave explorer - will return back to sign story
press_cancel()
time.sleep(.1)
title, _ = cap_story()
assert title == 'OK TO SEND?'
press_cancel()
return doit
# useful fixtures
from test_backup import backup_system

View File

@ -253,16 +253,25 @@ def test_bbqr_psbt(size, encoding, max_ver, partial, segwit, scan_a_qr, readback
if file_type == 'T':
assert not partial
decoded = decode_with_bitcoind(rb)
elif file_type == 'P':
ic, oc = len(decoded['vin']), len(decoded['vout'])
else:
assert file_type == 'P'
assert partial
assert rb[0:4] == b'psbt'
decoded = decode_psbt_with_bitcoind(rb)
assert not decoded['unknown']
decoded = decoded['tx']
if 'tx' in decoded:
# psbt v0
decoded = decoded['tx']
ic, oc = len(decoded['vin']), len(decoded['vout'])
else:
# expect psbt v2
ic = decoded["input_count"]
oc = decoded["output_count"]
# just smoke test; syntax not content
assert len(decoded['vin']) == num_in
assert len(decoded['vout']) == num_out
assert ic == num_in
assert oc == num_out
press_cancel() # back to menu

View File

@ -1518,7 +1518,7 @@ def test_priv_over_ux(quick_start_hsm, hsm_status, load_hsm_users):
@pytest.mark.parametrize("allow_op_return", [False, True])
def test_op_return_output_local(op_return_data, start_hsm, attempt_psbt, fake_txn, allow_op_return):
dests = []
psbt = fake_txn(2, 2, op_return=(0, op_return_data), capture_scripts=dests)
psbt = fake_txn(2, 2, op_return=[(0, op_return_data)], capture_scripts=dests)
if allow_op_return:
policy = DICT(rules=[dict(whitelist=[render_address(d) for d in dests[0:2]],
whitelist_opts=dict(allow_zeroval_outs=True))])

View File

@ -1201,16 +1201,29 @@ def make_myself_wallet(dev, set_bip39_pw, offer_ms_import, press_select, clear_m
reset_seed_words()
@pytest.fixture()
def fake_ms_txn():
@pytest.fixture
def fake_ms_txn(pytestconfig):
# make various size MULTISIG txn's ... completely fake and pointless values
# - but has UTXO's to match needs
from struct import pack
def doit(num_ins, num_outs, M, keys, fee=10000,
outvals=None, segwit_in=False, outstyles=['p2pkh'], change_outputs=[],
incl_xpubs=False, hack_change_out=False, hack_psbt=None):
def doit(num_ins, num_outs, M, keys, fee=10000, outvals=None, segwit_in=False,
outstyles=['p2pkh'], change_outputs=[], incl_xpubs=False, hack_psbt=None,
hack_change_out=False, input_amount=1E8, psbt_v2=None):
psbt = BasicPSBT()
if psbt_v2 is None:
# anything passed directly to this function overrides
# pytest flag --psbt2 - only care about pytest flag
# if psbt_v2 is not specified (None)
psbt_v2 = pytestconfig.getoption('psbt2')
if psbt_v2:
psbt.version = 2
psbt.txn_version = 2
psbt.input_count = num_ins
psbt.output_count = num_outs
txn = CTransaction()
txn.nVersion = 2
@ -1252,7 +1265,7 @@ def fake_ms_txn():
)
supply.vin = [CTxIn(out_point, nSequence=0xffffffff)]
supply.vout.append(CTxOut(int(1E8), scriptPubKey))
supply.vout.append(CTxOut(int(input_amount), scriptPubKey))
if not segwit_in:
psbt.inputs[i].utxo = supply.serialize_with_witness()
@ -1260,13 +1273,21 @@ def fake_ms_txn():
psbt.inputs[i].witness_utxo = supply.vout[-1].serialize()
supply.calc_sha256()
if psbt_v2:
psbt.inputs[i].previous_txid = supply.hash
psbt.inputs[i].prevout_idx = 0
# TODO sequence
# TODO height timelock
# TODO time timelock
spendable = CTxIn(COutPoint(supply.sha256, 0), nSequence=0xffffffff)
txn.vin.append(spendable)
for i in range(num_outs):
# random P2PKH
if not outstyles:
style = ADDR_STYLES[i % len(ADDR_STYLES)]
elif len(outstyles) == num_outs:
style = outstyles[i]
else:
style = outstyles[i % len(outstyles)]
@ -1293,8 +1314,16 @@ def fake_ms_txn():
assert scriptPubKey
if psbt_v2:
psbt.outputs[i].script = scriptPubKey
if outvals:
psbt.outputs[i].amount = outvals[i]
else:
psbt.outputs[i].amount = int(round(((input_amount * num_ins) - fee) / num_outs, 4))
if not outvals:
h = CTxOut(int(round(((1E8*num_ins)-fee) / num_outs, 4)), scriptPubKey)
h = CTxOut(int(round(((input_amount*num_ins)-fee) / num_outs, 4)), scriptPubKey)
else:
h = CTxOut(int(outvals[i]), scriptPubKey)
@ -2933,4 +2962,36 @@ def test_bare_cc_ms_qr_import(N, make_multisig, scan_a_qr, clear_ms, goto_home,
assert f"{N}-of-{N}" in story
press_cancel()
@pytest.mark.parametrize("psbtv2", [True, False])
@pytest.mark.parametrize("data", [
# (out_style, amount, is_change)
[("p2wsh", 1000000, 0)] * 99,
[("p2sh", 1000000, 1)] * 11,
[("p2wsh-p2sh", 1000000, 1)] * 18 + [("p2wsh", 50000000, 0)] * 12,
[("p2sh", 1000000, 1), ("p2wsh-p2sh", 50000000, 0), ("p2wsh", 800000, 1)] * 14,
])
def test_txout_explorer(psbtv2, data, clear_ms, import_ms_wallet, fake_ms_txn,
start_sign, txout_explorer):
clear_ms()
M, N = 2, 3
keys = import_ms_wallet(2, 3, name='ms-test', accept=1)
outstyles = []
outvals = []
change_outputs = []
for i in range(len(data)):
os, ov, is_change = data[i]
outstyles.append(os)
outvals.append(ov)
if is_change:
change_outputs.append(i)
inp_amount = sum(outvals) + 100000 # 100k sat fee
psbt = fake_ms_txn(1, len(data), M, keys, outstyles=outstyles,
outvals=outvals, change_outputs=change_outputs,
input_amount=inp_amount, psbt_v2=psbtv2)
start_sign(psbt)
txout_explorer(data)
# EOF

View File

@ -3,7 +3,7 @@
# Transaction Signing. Important.
#
import time, pytest, os, random, pdb, struct, base64, binascii, itertools, datetime
import time, pytest, os, random, pdb, struct, base64, binascii, itertools, datetime, math
from ckcc_protocol.protocol import CCProtocolPacker, CCProtoError, MAX_TXN_LEN, CCUserRefused
from binascii import b2a_hex, a2b_hex
from psbt import BasicPSBT, BasicPSBTInput, BasicPSBTOutput, PSBT_IN_REDEEM_SCRIPT
@ -20,7 +20,7 @@ from constants import ADDR_STYLES, ADDR_STYLES_SINGLE, SIGHASH_MAP
from txn import *
from ctransaction import CTransaction, CTxOut, CTxIn, COutPoint
from ckcc_protocol.constants import STXN_FINALIZE, STXN_VISUALIZE, STXN_SIGNED
from charcodes import KEY_QR
from charcodes import KEY_QR, KEY_RIGHT
SEQUENCE_LOCKTIME_TYPE_FLAG = (1 << 22)
@ -2089,7 +2089,9 @@ def test_no_outputs_tx(fake_txn, microsd_path, goto_home, press_select, pick_men
pick_menu_item('Ready To Sign')
time.sleep(0.1)
pick_menu_item(fname)
try:
pick_menu_item(fname)
except KeyError: pass
time.sleep(0.1)
title, story = cap_story()
@ -2820,10 +2822,10 @@ def test_timelocks_visualize(start_sign, end_sign, dev, bitcoind, use_regtest,
@pytest.mark.parametrize('in_out', [(4,1),(2,2),(2,1)])
@pytest.mark.parametrize('partial', [False, True])
@pytest.mark.parametrize('segwit', [True, False])
def test_bas64_psbt_qr(in_out, partial, segwit, scan_a_qr, readback_bbqr,
goto_home, use_regtest, cap_story, fake_txn, dev,
decode_psbt_with_bitcoind, decode_with_bitcoind,
press_cancel, press_select, need_keypress):
def test_base64_psbt_qr(in_out, partial, segwit, scan_a_qr, readback_bbqr,
goto_home, use_regtest, cap_story, fake_txn, dev,
decode_psbt_with_bitcoind, decode_with_bitcoind,
press_cancel, press_select, need_keypress):
def hack(psbt):
if partial:
# change first input to not be ours
@ -2867,16 +2869,25 @@ def test_bas64_psbt_qr(in_out, partial, segwit, scan_a_qr, readback_bbqr,
if file_type == 'T':
assert not partial
decoded = decode_with_bitcoind(rb)
elif file_type == 'P':
ic, oc = len(decoded['vin']), len(decoded['vout'])
else:
assert file_type == 'P'
assert partial
assert rb[0:4] == b'psbt'
decoded = decode_psbt_with_bitcoind(rb)
assert not decoded['unknown']
decoded = decoded['tx']
if 'tx' in decoded:
# psbt v0
decoded = decoded['tx']
ic, oc = len(decoded['vin']), len(decoded['vout'])
else:
# expect psbt v2
ic = decoded["input_count"]
oc = decoded["output_count"]
# just smoke test; syntax not content
assert len(decoded['vin']) == num_in
assert len(decoded['vout']) == num_out
assert ic == num_in
assert oc == num_out
press_cancel() # back to menu
@ -2912,4 +2923,72 @@ def test_sorting_outputs_by_size(fake_txn, start_sign, cap_story, use_testnet,
assert rest_story in story
press_cancel()
@pytest.mark.parametrize("psbtv2", [True, False])
@pytest.mark.parametrize("chain", ["BTC", "XTN"])
@pytest.mark.parametrize("data", [
# (out_style, amount, is_change)
[("p2pkh", 1000000, 0)] * 99,
[("p2wpkh", 1000000, 1)] * 11,
[("p2pkh", 1000000, 1)] * 11 + [("p2wpkh", 50000000, 0)] * 16,
[("p2pkh", 1000000, 1), ("p2wpkh", 50000000, 0), ("p2wpkh-p2sh", 800000, 1)] * 11,
])
def test_txout_explorer(psbtv2, chain, data, fake_txn, start_sign,
settings_set, txout_explorer):
settings_set("chain", chain)
outstyles = []
outvals = []
change_outputs = []
for i in range(len(data)):
os, ov, is_change = data[i]
outstyles.append(os)
outvals.append(ov)
if is_change:
change_outputs.append(i)
inp_amount = sum(outvals) + 100000 # 100k sat fee
psbt = fake_txn(1, len(data), segwit_in=True, outstyles=outstyles,
outvals=outvals, change_outputs=change_outputs,
psbt_v2=psbtv2, input_amount=inp_amount)
start_sign(psbt)
txout_explorer(data, chain)
def test_txout_explorer_op_return(fake_txn, start_sign, cap_story, is_q1,
need_keypress, press_cancel):
d = [
(1, b"Coinkite"),
(0, b"Mk1 Mk2 Mk3 Mk4 Q"),
(100, b"binarywatch.org"),
(100, b"a" * 75),
]
psbt = fake_txn(1, 20, segwit_in=False, op_return=d)
start_sign(psbt)
time.sleep(.1)
title, story = cap_story()
assert title == 'OK TO SEND?'
assert "Press (2) to explore txn" in story
need_keypress("2")
time.sleep(.1)
# OP_RETURN is put at the end of output list (fake_txn)
# 20 normal outputs, all OP_RETURN on last page
for _ in range(2):
need_keypress(KEY_RIGHT if is_q1 else "9")
time.sleep(.1)
_, story = cap_story()
ss = story.split("\n\n")
for i, (sa, sb, (amount, data)) in enumerate(zip(ss[:-1:2], ss[1::2], d), start=20):
assert f"Output {i}:" == sa
val, name, dd = sb.split("\n")
assert "OP_RETURN" in name
assert f'{amount / 100000000:.8f} XTN' == val
hex_str, ascii_str = dd.split(" ", 1)
assert f"(ascii: {data.decode()})" == ascii_str
assert data.hex() == hex_str
press_cancel()
press_cancel()
# EOF

View File

@ -24,7 +24,7 @@ def fake_txn(dev, pytestconfig):
invals=None, outvals=None, segwit_in=False, wrapped=False,
outstyles=['p2pkh'], psbt_hacker=None, change_outputs=[],
capture_scripts=None, add_xpub=None, op_return=None,
psbt_v2=None):
psbt_v2=None, input_amount=1E8):
psbt = BasicPSBT()
@ -83,7 +83,7 @@ def fake_txn(dev, pytestconfig):
# p2pkh
scr = bytes([0x76, 0xa9, 0x14]) + subkey.hash160() + bytes([0x88, 0xac])
supply.vout.append(CTxOut(int(1E8 if not invals else invals[i]), scr))
supply.vout.append(CTxOut(int(input_amount if not invals else invals[i]), scr))
if segwit_in:
# just utxo for segwit
@ -101,13 +101,15 @@ def fake_txn(dev, pytestconfig):
# TODO height timelock
# TODO time timelock
spendable = CTxIn(COutPoint(uint256_from_str(supply.hash), 0), nSequence=0xffffffff)
spendable = CTxIn(COutPoint(supply.sha256, 0), nSequence=0xffffffff)
txn.vin.append(spendable)
for i in range(num_outs):
# random P2PKH
if not outstyles:
style = ADDR_STYLES[i % len(ADDR_STYLES)]
elif len(outstyles) == num_outs:
style = outstyles[i]
else:
style = outstyles[i % len(outstyles)]
@ -133,10 +135,10 @@ def fake_txn(dev, pytestconfig):
if psbt_v2:
psbt.outputs[i].script = act_scr
psbt.outputs[i].amount = int(outvals[i] if outvals else round(((1E8*num_ins)-fee) / num_outs, 4))
psbt.outputs[i].amount = int(outvals[i] if outvals else round(((input_amount*num_ins)-fee) / num_outs, 4))
if not outvals:
h = CTxOut(int(round(((1E8*num_ins)-fee) / num_outs, 4)), act_scr)
h = CTxOut(int(round(((input_amount*num_ins)-fee) / num_outs, 4)), act_scr)
else:
h = CTxOut(int(outvals[i]), act_scr)
@ -147,26 +149,27 @@ def fake_txn(dev, pytestconfig):
# op_return is a tuple of (amount, data)
if op_return:
amount, data = op_return
op_return_size = len(data)
if op_return_size < 76:
script = bytes([106, op_return_size]) + data
else:
script = bytes([106, 76, op_return_size]) + data
for op_ret in op_return:
amount, data = op_ret
op_return_size = len(data)
if op_return_size < 76:
script = bytes([106, op_return_size]) + data
else:
script = bytes([106, 76, op_return_size]) + data
op_ret_o = BasicPSBTOutput(idx=len(psbt.outputs))
if psbt_v2:
op_ret_o.script = script
op_ret_o.amount = amount
psbt.output_count += 1
else:
op_return_out = CTxOut(amount, script)
txn.vout.append(op_return_out)
op_ret_o = BasicPSBTOutput(idx=len(psbt.outputs))
if psbt_v2:
op_ret_o.script = script
op_ret_o.amount = amount
psbt.output_count += 1
else:
op_return_out = CTxOut(amount, script)
txn.vout.append(op_return_out)
psbt.outputs.append(op_ret_o)
psbt.outputs.append(op_ret_o)
if capture_scripts is not None:
capture_scripts.append(script)
if capture_scripts is not None:
capture_scripts.append(script)
if not psbt_v2:
psbt.txn = txn.serialize_with_witness()