signing artifacts re-export
This commit is contained in:
parent
a4d7f884c0
commit
9a28d36097
@ -19,6 +19,7 @@ This lists the new changes that have not yet been published in a normal release.
|
||||
|
||||
- New Feature: Multisig transaction finalization. Allows to use [PushTX](https://pushtx.org/) with multisig wallets.
|
||||
Read more [here](https://github.com/Coldcard/firmware/blob/master/docs/limitations.md#p2sh--multisig)
|
||||
- New Feature: Signing artifacts re-export. Press (0) at the end of signing to re-export with different medium
|
||||
- Enhancement: NFC export usability upgrade. NFC keeps exporting until CANCEL/X is pressed
|
||||
- Enhancement: Add `Bitcoin Safe` option to `Export Wallet`
|
||||
- Enhancement: 10% performance improvement in USB upload speed for large files
|
||||
|
||||
@ -2327,7 +2327,7 @@ async def pushtx_setup_menu(*a):
|
||||
|
||||
if not await feature_requires_nfc():
|
||||
# they don't want to proceed
|
||||
return
|
||||
return
|
||||
|
||||
async def doit(menu, picked, xx_self):
|
||||
# using stock values, or Disable
|
||||
|
||||
370
shared/auth.py
370
shared/auth.py
@ -3,19 +3,19 @@
|
||||
# Operations that require user authorization, like our core features: signing messages
|
||||
# and signing bitcoin transactions.
|
||||
#
|
||||
import stash, ure, ux, chains, sys, gc, uio, version, ngu, ujson
|
||||
import stash, ure, chains, sys, gc, uio, version, ngu, ujson
|
||||
from ubinascii import b2a_base64, a2b_base64
|
||||
from ubinascii import hexlify as b2a_hex
|
||||
from ubinascii import unhexlify as a2b_hex
|
||||
from uhashlib import sha256
|
||||
from public_constants import MSG_SIGNING_MAX_LENGTH, SUPPORTED_ADDR_FORMATS
|
||||
from public_constants import AFC_SCRIPT, AF_CLASSIC, AFC_BECH32, AF_P2WPKH, AF_P2WPKH_P2SH
|
||||
from public_constants import STXN_FLAGS_MASK, STXN_FINALIZE, STXN_VISUALIZE, STXN_SIGNED
|
||||
from public_constants import STXN_FINALIZE, STXN_VISUALIZE, STXN_SIGNED
|
||||
from sffile import SFFile
|
||||
from ux import ux_aborted, ux_show_story, abort_and_goto, ux_dramatic_pause, ux_clear_keys
|
||||
from ux import show_qr_code, OK, X, ux_input_text, ux_enter_bip32_index
|
||||
from usb import CCBusyError
|
||||
from utils import HexWriter, xfp2str, problem_file_line, cleanup_deriv_path
|
||||
from utils import HexWriter, xfp2str, problem_file_line, cleanup_deriv_path, chunk_checksum
|
||||
from utils import B2A, to_ascii_printable, show_single_address
|
||||
from psbt import psbtObject, FatalPSBTIssue, FraudulentChangeOutput
|
||||
from files import CardSlot, CardMissingError, needs_microsd
|
||||
@ -236,15 +236,10 @@ def verify_signed_file_digest(msg):
|
||||
continue
|
||||
path = card.abs_path(fname)
|
||||
|
||||
md = sha256()
|
||||
with open(path, "rb") as f:
|
||||
while True:
|
||||
chunk = f.read(1024)
|
||||
if not chunk:
|
||||
break
|
||||
md.update(chunk)
|
||||
csum = chunk_checksum(f)
|
||||
|
||||
h = b2a_hex(md.digest()).decode().strip()
|
||||
h = b2a_hex(csum).decode().strip()
|
||||
if h != digest:
|
||||
err.append((fname, h, digest))
|
||||
except:
|
||||
@ -387,7 +382,7 @@ class ApproveMessageSign(UserAuthorizedAction):
|
||||
|
||||
async def interact(self):
|
||||
# Prompt user w/ details and get approval
|
||||
from glob import dis, hsm_active
|
||||
from glob import hsm_active
|
||||
|
||||
if hsm_active:
|
||||
ch = await hsm_active.approve_msg_sign(self.text, self.address, self.subpath)
|
||||
@ -502,8 +497,6 @@ async def sign_with_own_address(subpath, addr_fmt):
|
||||
# used for cases where we already have the key picked, but need the message:
|
||||
# * address_explorer custom path
|
||||
# * positive ownership test
|
||||
from glob import dis
|
||||
|
||||
to_sign = await ux_input_text("", scan_ok=True, prompt="Enter MSG") # max len is 100 only here
|
||||
if not to_sign: return
|
||||
|
||||
@ -745,7 +738,7 @@ async def try_push_tx(data, txid, txn_sha=None):
|
||||
|
||||
|
||||
class ApproveTransaction(UserAuthorizedAction):
|
||||
def __init__(self, psbt_len, flags=0x0, approved_cb=None, psbt_sha=None, is_sd=None):
|
||||
def __init__(self, psbt_len, flags=0x0, approved_cb=None, psbt_sha=None, cb_kws=None):
|
||||
from glob import settings
|
||||
super().__init__()
|
||||
self.psbt_len = psbt_len
|
||||
@ -756,7 +749,9 @@ class ApproveTransaction(UserAuthorizedAction):
|
||||
self.psbt_sha = psbt_sha
|
||||
self.approved_cb = approved_cb
|
||||
self.result = None # will be (len, sha256) of the resulting PSBT
|
||||
self.is_sd = is_sd
|
||||
self.cb_kws = cb_kws or {}
|
||||
self.is_sd = (self.cb_kws.get("input_method", None) is None) \
|
||||
and (not self.cb_kws.get("force_vdisk", False))
|
||||
self.chain = chains.current_chain()
|
||||
|
||||
def render_output(self, o):
|
||||
@ -917,16 +912,19 @@ class ApproveTransaction(UserAuthorizedAction):
|
||||
dis.progress_bar_show(1) # finish the Validating...
|
||||
|
||||
if not hsm_active:
|
||||
esc = ""
|
||||
msg.write("\nPress %s to approve and sign transaction." % OK)
|
||||
if needs_txn_explorer:
|
||||
esc += "2"
|
||||
msg.write(" Press (2) to explore txn.")
|
||||
if self.is_sd and CardSlot.both_inserted():
|
||||
esc += "b"
|
||||
msg.write(" (B) to write to lower SD slot.")
|
||||
msg.write(" %s to abort." % X)
|
||||
|
||||
while True:
|
||||
ch = await ux_show_story(msg, title="OK TO SEND?", escape="2b")
|
||||
if ch == "2" and needs_txn_explorer:
|
||||
ch = await ux_show_story(msg, title="OK TO SEND?", escape=esc)
|
||||
if ch == "2":
|
||||
await self.txn_explorer()
|
||||
continue
|
||||
else:
|
||||
@ -989,10 +987,10 @@ class ApproveTransaction(UserAuthorizedAction):
|
||||
return await self.failure("Signing failed late", exc)
|
||||
|
||||
if self.approved_cb:
|
||||
kws = dict(psbt=self.psbt)
|
||||
if self.is_sd and (ch == "b"):
|
||||
kws["slot_b"] = True
|
||||
await self.approved_cb(**kws)
|
||||
self.cb_kws["slot_b"] = True
|
||||
|
||||
await self.approved_cb(self.psbt, **self.cb_kws)
|
||||
self.done()
|
||||
return
|
||||
|
||||
@ -1221,7 +1219,9 @@ class ApproveTransaction(UserAuthorizedAction):
|
||||
def sign_transaction(psbt_len, flags=0x0, psbt_sha=None):
|
||||
# transaction (binary) loaded into PSRAM already, checksum checked
|
||||
UserAuthorizedAction.check_busy(ApproveTransaction)
|
||||
UserAuthorizedAction.active_request = ApproveTransaction(psbt_len, flags, psbt_sha=psbt_sha)
|
||||
UserAuthorizedAction.active_request = ApproveTransaction(
|
||||
psbt_len, flags, psbt_sha=psbt_sha, cb_kws={"input_method": "usb"}
|
||||
)
|
||||
|
||||
# kill any menu stack, and put our thing at the top
|
||||
abort_and_goto(UserAuthorizedAction.active_request)
|
||||
@ -1246,10 +1246,203 @@ def psbt_encoding_taster(taste, psbt_len):
|
||||
raise ValueError("not psbt")
|
||||
|
||||
return decoder, output_encoder, psbt_len
|
||||
|
||||
|
||||
|
||||
async def done_signing(psbt, input_method=None, filename=None, force_vdisk=False,
|
||||
output_encoder=None, slot_b=False, finalize=None):
|
||||
from glob import dis, PSRAM
|
||||
from files import CardSlot, CardMissingError
|
||||
from sffile import SFFile
|
||||
from ux import show_qr_code, import_export_prompt, ux_show_story
|
||||
|
||||
txid = None
|
||||
|
||||
with SFFile(TXN_OUTPUT_OFFSET, max_size=MAX_TXN_LEN, message="Saving...") as psram:
|
||||
is_complete = psbt.is_complete()
|
||||
if is_complete:
|
||||
txid = psbt.finalize(psram)
|
||||
else:
|
||||
psbt.serialize(psram)
|
||||
txid = None
|
||||
|
||||
data_len = psram.tell()
|
||||
data_sha2 = psram.checksum.digest()
|
||||
|
||||
UserAuthorizedAction.cleanup()
|
||||
|
||||
n = 0
|
||||
ch = None
|
||||
fname_input_method = input_method
|
||||
|
||||
if txid and await try_push_tx(data_len, txid, data_sha2):
|
||||
n = 1 # go directly to reexport menu after pushTX
|
||||
|
||||
while True:
|
||||
base_msg = "Finalized TX ready for broadcast" if is_complete else "Partly Signed PSBT"
|
||||
if n:
|
||||
ch = await import_export_prompt(base_msg, no_qr=False if version.has_qr else True)
|
||||
if ch == KEY_CANCEL:
|
||||
break
|
||||
elif ch == KEY_QR or input_method == "qr":
|
||||
here = PSRAM.read_at(TXN_OUTPUT_OFFSET, data_len)
|
||||
hex_here = b2a_hex(here).upper()
|
||||
msg = txid or 'Partly Signed PSBT'
|
||||
try:
|
||||
await show_qr_code(hex_here.decode(), is_alnum=True, msg=msg)
|
||||
except (ValueError, RuntimeError):
|
||||
from ux_q1 import show_bbqr_codes
|
||||
await show_bbqr_codes('T' if txid else 'P', hex_here, msg, already_hex=True)
|
||||
|
||||
msg = base_msg + " shared via QR."
|
||||
del here
|
||||
elif ch == KEY_NFC or input_method == "nfc":
|
||||
from glob import NFC
|
||||
if is_complete:
|
||||
await NFC.share_signed_txn(txid, TXN_OUTPUT_OFFSET, data_len, data_sha2)
|
||||
else:
|
||||
await NFC.share_psbt(TXN_OUTPUT_OFFSET, data_len, data_sha2)
|
||||
msg = base_msg + " shared via NFC."
|
||||
else:
|
||||
assert isinstance(ch, dict) or input_method is None # SD/VDisk
|
||||
dis.fullscreen("Wait...")
|
||||
if ch is None:
|
||||
ch = {"force_vdisk": force_vdisk, "slot_b": slot_b}
|
||||
|
||||
if filename:
|
||||
_, basename = filename.rsplit('/', 1)
|
||||
base = basename.rsplit('.', 1)[0]
|
||||
else:
|
||||
base = fname_input_method
|
||||
|
||||
out2_fn = None
|
||||
out_fn = None
|
||||
|
||||
from glob import settings
|
||||
import os
|
||||
del_after = settings.get('del', 0)
|
||||
|
||||
while 1:
|
||||
# try to put back into same spot, but also do top-of-card
|
||||
if not is_complete:
|
||||
# keep the filename under control during multiple passes
|
||||
target_fname = base.replace('-part', '') + '-part.psbt'
|
||||
else:
|
||||
# add -signed to end. We won't offer to sign again.
|
||||
target_fname = base + '-signed.psbt'
|
||||
|
||||
try:
|
||||
with CardSlot(readonly=True, **ch) as card:
|
||||
out_full, out_fn = card.pick_filename(target_fname)
|
||||
out_path = out_full.rsplit("/", 1)[0] + "/"
|
||||
|
||||
except CardMissingError:
|
||||
prob = 'Missing card.\n\n'
|
||||
out_fn = None
|
||||
|
||||
if not out_fn:
|
||||
# need them to insert a card
|
||||
prob = ''
|
||||
else:
|
||||
# attempt write-out
|
||||
def _chunk_write(file_d, ofs, chunk=4096):
|
||||
written = 0
|
||||
while written < data_len:
|
||||
if (written + chunk) > data_len:
|
||||
chunk = data_len - written
|
||||
|
||||
file_d.write(PSRAM.read_at(ofs, chunk))
|
||||
written += chunk
|
||||
ofs += chunk
|
||||
|
||||
try:
|
||||
with CardSlot(**ch) as card:
|
||||
if is_complete and del_after:
|
||||
# don't write signed PSBT if we'd just delete it anyway
|
||||
out_fn = None
|
||||
else:
|
||||
with output_encoder(card.open(out_full, 'wb')) as fd:
|
||||
# save as updated PSBT
|
||||
if not is_complete:
|
||||
_chunk_write(fd, TXN_OUTPUT_OFFSET)
|
||||
else:
|
||||
psbt.serialize(fd)
|
||||
|
||||
if is_complete:
|
||||
# write out as hex too, if it's final
|
||||
out2_full, out2_fn = card.pick_filename(
|
||||
base + '-final.txn' if not del_after else 'tmp.txn',
|
||||
out_path)
|
||||
|
||||
if out2_full:
|
||||
with HexWriter(card.open(out2_full, 'w+t')) as fd:
|
||||
# save transaction, in hex
|
||||
if is_complete:
|
||||
_chunk_write(fd, TXN_OUTPUT_OFFSET)
|
||||
else:
|
||||
txid = psbt.finalize(fd)
|
||||
|
||||
if del_after:
|
||||
# rename it now that we know the txid
|
||||
after_full, out2_fn = card.pick_filename(
|
||||
txid + '.txn', out_path, overwrite=True)
|
||||
os.rename(out2_full, after_full)
|
||||
|
||||
if del_after:
|
||||
# this can do nothing if they swapped SDCard between steps, which is ok,
|
||||
# but if the original file is still there, this blows it away.
|
||||
# - if not yet final, the foo-part.psbt file stays
|
||||
try:
|
||||
card.securely_blank_file(filename)
|
||||
except: pass
|
||||
|
||||
# success and done!
|
||||
break
|
||||
|
||||
except OSError as exc:
|
||||
prob = 'Failed to write!\n\n%s\n\n' % exc
|
||||
sys.print_exception(exc)
|
||||
# fall through to try again
|
||||
|
||||
if force_vdisk:
|
||||
await ux_show_story(prob, title='Error')
|
||||
return
|
||||
|
||||
# prompt them to input another card?
|
||||
ch = await ux_show_story(
|
||||
prob + "Please insert an SDCard to receive signed transaction, "
|
||||
"and press OK.", title="Need Card")
|
||||
if ch == 'x':
|
||||
await ux_aborted()
|
||||
return
|
||||
|
||||
# done.
|
||||
if out_fn:
|
||||
msg = "Updated PSBT is:\n\n%s" % out_fn
|
||||
if out2_fn:
|
||||
msg += '\n\n'
|
||||
else:
|
||||
# del_after is probably set
|
||||
msg = ''
|
||||
|
||||
if out2_fn:
|
||||
msg += 'Finalized transaction (ready for broadcast):\n\n%s' % out2_fn
|
||||
if txid and not del_after:
|
||||
msg += '\n\nFinal TXID:\n' + txid
|
||||
|
||||
re_exp = "\n\nPress (0) to re-export."
|
||||
ch = await ux_show_story(msg + re_exp, title='PSBT Signed',
|
||||
escape="0")
|
||||
if ch != "0":
|
||||
break
|
||||
else:
|
||||
input_method = None
|
||||
n += 1
|
||||
|
||||
|
||||
async def sign_psbt_file(filename, force_vdisk=False, slot_b=None):
|
||||
# sign a PSBT file found on a MicroSD card
|
||||
# - or from VirtualDisk (mk4)
|
||||
from files import CardSlot
|
||||
from glob import dis
|
||||
from ux import the_ux
|
||||
|
||||
@ -1257,7 +1450,7 @@ async def sign_psbt_file(filename, force_vdisk=False, slot_b=None):
|
||||
|
||||
# copy file into PSRAM
|
||||
# - can't work in-place on the card because we want to support writing out to different card
|
||||
# - accepts hex or base64 encoding, but binary prefered
|
||||
# - accepts hex or base64 encoding, but binary preferred
|
||||
with CardSlot(force_vdisk, readonly=True, slot_b=slot_b) as card:
|
||||
with card.open(filename, 'rb') as fd:
|
||||
dis.fullscreen('Reading...', 0)
|
||||
@ -1297,131 +1490,14 @@ async def sign_psbt_file(filename, force_vdisk=False, slot_b=None):
|
||||
assert total <= psbt_len
|
||||
psbt_len = total
|
||||
|
||||
async def done(psbt, slot_b=None):
|
||||
dis.fullscreen("Wait...")
|
||||
orig_path, basename = filename.rsplit('/', 1)
|
||||
orig_path += '/'
|
||||
base = basename.rsplit('.', 1)[0]
|
||||
out2_fn = None
|
||||
out_fn = None
|
||||
txid = None
|
||||
|
||||
from glob import settings
|
||||
import os
|
||||
del_after = settings.get('del', 0)
|
||||
|
||||
while 1:
|
||||
# try to put back into same spot, but also do top-of-card
|
||||
is_comp = psbt.is_complete()
|
||||
if not is_comp:
|
||||
# keep the filename under control during multiple passes
|
||||
target_fname = base.replace('-part', '')+'-part.psbt'
|
||||
else:
|
||||
# add -signed to end. We won't offer to sign again.
|
||||
target_fname = base+'-signed.psbt'
|
||||
|
||||
for path in [orig_path, None]:
|
||||
try:
|
||||
with CardSlot(force_vdisk, readonly=True, slot_b=slot_b) as card:
|
||||
out_full, out_fn = card.pick_filename(target_fname, path)
|
||||
out_path = path
|
||||
if out_full: break
|
||||
except CardMissingError:
|
||||
prob = 'Missing card.\n\n'
|
||||
out_fn = None
|
||||
|
||||
if not out_fn:
|
||||
# need them to insert a card
|
||||
prob = ''
|
||||
else:
|
||||
# attempt write-out
|
||||
try:
|
||||
with CardSlot(force_vdisk, slot_b=slot_b) as card:
|
||||
if is_comp and del_after:
|
||||
# don't write signed PSBT if we'd just delete it anyway
|
||||
out_fn = None
|
||||
else:
|
||||
with output_encoder(card.open(out_full, 'wb')) as fd:
|
||||
# save as updated PSBT
|
||||
psbt.serialize(fd)
|
||||
|
||||
if is_comp:
|
||||
# write out as hex too, if it's final
|
||||
out2_full, out2_fn = card.pick_filename(
|
||||
base+'-final.txn' if not del_after else 'tmp.txn', out_path)
|
||||
|
||||
with SFFile(TXN_OUTPUT_OFFSET, max_size=MAX_TXN_LEN, message="Saving...") as fd0:
|
||||
txid = psbt.finalize(fd0)
|
||||
fd0.flush_out() # need to flush here as we are probably not gona call .read( again
|
||||
tx_len, tx_sha = fd0.tell(), fd0.checksum.digest()
|
||||
if txid and await try_push_tx(tx_len, txid, tx_sha):
|
||||
return # success, exit
|
||||
|
||||
if out2_full:
|
||||
fd0.seek(0)
|
||||
|
||||
with HexWriter(card.open(out2_full, 'w+t')) as fd:
|
||||
# save transaction, in hex
|
||||
tmp_buf = bytearray(4096)
|
||||
while True:
|
||||
rv = fd0.readinto(tmp_buf)
|
||||
if not rv: break
|
||||
fd.write(memoryview(tmp_buf)[:rv])
|
||||
|
||||
if del_after:
|
||||
# rename it now that we know the txid
|
||||
after_full, out2_fn = card.pick_filename(
|
||||
txid+'.txn', out_path, overwrite=True)
|
||||
os.rename(out2_full, after_full)
|
||||
|
||||
if del_after:
|
||||
# this can do nothing if they swapped SDCard between steps, which is ok,
|
||||
# but if the original file is still there, this blows it away.
|
||||
# - if not yet final, the foo-part.psbt file stays
|
||||
try:
|
||||
card.securely_blank_file(filename)
|
||||
except: pass
|
||||
|
||||
# success and done!
|
||||
break
|
||||
|
||||
except OSError as exc:
|
||||
prob = 'Failed to write!\n\n%s\n\n' % exc
|
||||
sys.print_exception(exc)
|
||||
# fall thru to try again
|
||||
|
||||
if force_vdisk:
|
||||
await ux_show_story(prob, title='Error')
|
||||
return
|
||||
|
||||
# prompt them to input another card?
|
||||
ch = await ux_show_story(prob+"Please insert an SDCard to receive signed transaction, "
|
||||
"and press %s." % OK, title="Need Card")
|
||||
if ch == 'x':
|
||||
await ux_aborted()
|
||||
return
|
||||
|
||||
# done.
|
||||
if out_fn:
|
||||
msg = "Updated PSBT is:\n\n%s" % out_fn
|
||||
if out2_fn:
|
||||
msg += '\n\n'
|
||||
else:
|
||||
# del_after is probably set
|
||||
msg = ''
|
||||
|
||||
if out2_fn:
|
||||
msg += 'Finalized transaction (ready for broadcast):\n\n%s' % out2_fn
|
||||
if txid and not del_after:
|
||||
msg += '\n\nFinal TXID:\n'+txid
|
||||
|
||||
await ux_show_story(msg, title='PSBT Signed')
|
||||
|
||||
UserAuthorizedAction.cleanup()
|
||||
|
||||
UserAuthorizedAction.cleanup()
|
||||
UserAuthorizedAction.active_request = ApproveTransaction(psbt_len, approved_cb=done,
|
||||
is_sd=not force_vdisk)
|
||||
UserAuthorizedAction.active_request = ApproveTransaction(
|
||||
psbt_len,
|
||||
approved_cb=done_signing,
|
||||
cb_kws={"filename": filename,
|
||||
"force_vdisk": force_vdisk,
|
||||
"output_encoder": output_encoder}
|
||||
)
|
||||
the_ux.push(UserAuthorizedAction.active_request)
|
||||
|
||||
class RemoteBackup(UserAuthorizedAction):
|
||||
|
||||
@ -520,11 +520,11 @@ class NFCHandler:
|
||||
await self.wipe(False)
|
||||
return rv
|
||||
|
||||
|
||||
async def start_psbt_rx(self):
|
||||
from auth import psbt_encoding_taster, TXN_INPUT_OFFSET
|
||||
from auth import UserAuthorizedAction, ApproveTransaction
|
||||
from ux import the_ux
|
||||
from auth import done_signing
|
||||
from sffile import SFFile
|
||||
|
||||
data = await self.start_nfc_rx()
|
||||
@ -546,10 +546,7 @@ class NFCHandler:
|
||||
if urn == 'urn:nfc:ext:bitcoin.org:sha256' and len(msg) == 32:
|
||||
# probably produced by another Coldcard: SHA256 over expected contents
|
||||
psbt_sha = bytes(msg)
|
||||
except Exception as e:
|
||||
# dont crash when given garbage
|
||||
import sys; sys.print_exception(e)
|
||||
pass
|
||||
except Exception: pass # dont crash when given garbage
|
||||
|
||||
if psbt_in is None:
|
||||
await ux_show_story("Could not find PSBT in what was written.", title="Sorry!")
|
||||
@ -572,41 +569,12 @@ class NFCHandler:
|
||||
UserAuthorizedAction.cleanup()
|
||||
UserAuthorizedAction.active_request = ApproveTransaction(
|
||||
psbt_len, 0x0, psbt_sha=psbt_sha,
|
||||
approved_cb=self.signing_done
|
||||
)
|
||||
approved_cb=done_signing,
|
||||
cb_kws={"input_method": "nfc",
|
||||
"output_encoder": output_encoder})
|
||||
# kill any menu stack, and put our thing at the top
|
||||
the_ux.push(UserAuthorizedAction.active_request)
|
||||
|
||||
async def signing_done(self, psbt):
|
||||
# User approved the PSBT, and signing worked... share result over NFC (only)
|
||||
from auth import TXN_OUTPUT_OFFSET, try_push_tx
|
||||
from version import MAX_TXN_LEN
|
||||
from sffile import SFFile
|
||||
|
||||
txid = None
|
||||
|
||||
# re-serialize the PSBT back out (into PSRAM)
|
||||
with SFFile(TXN_OUTPUT_OFFSET, max_size=MAX_TXN_LEN, message="Saving...") as fd:
|
||||
if psbt.is_complete():
|
||||
txid = psbt.finalize(fd)
|
||||
else:
|
||||
psbt.serialize(fd)
|
||||
|
||||
self.result = (fd.tell(), fd.checksum.digest())
|
||||
|
||||
out_len, out_sha = self.result
|
||||
|
||||
if txid and await try_push_tx(out_len, txid, out_sha):
|
||||
return # success, exit
|
||||
|
||||
if txid:
|
||||
await self.share_signed_txn(txid, TXN_OUTPUT_OFFSET, out_len, out_sha)
|
||||
else:
|
||||
await self.share_psbt(TXN_OUTPUT_OFFSET, out_len, out_sha)
|
||||
|
||||
# ? show txid on screen ?
|
||||
# thank them?
|
||||
|
||||
@classmethod
|
||||
async def selftest(cls):
|
||||
# Check for chip present, field present .. and that it works
|
||||
|
||||
@ -604,7 +604,6 @@ async def import_from_other(menu, *a):
|
||||
else:
|
||||
def contains_json(fname):
|
||||
if not fname.endswith('.json'): return False
|
||||
print(fname)
|
||||
try:
|
||||
obj = json.load(open(fname, 'rt'))
|
||||
assert 'coldcard_notes' in obj
|
||||
|
||||
@ -734,4 +734,15 @@ def wipe_if_deltamode():
|
||||
import callgate
|
||||
callgate.fast_wipe()
|
||||
|
||||
def chunk_checksum(fd, chunk=1024):
|
||||
# reads from open file descriptor
|
||||
md = sha256()
|
||||
while True:
|
||||
data = fd.read(chunk)
|
||||
if not data:
|
||||
break
|
||||
md.update(data)
|
||||
|
||||
return md.digest()
|
||||
|
||||
# EOF
|
||||
|
||||
@ -2,16 +2,14 @@
|
||||
#
|
||||
# ux_q1.py - UX/UI interactions that are Q1 specific and use big screen, keyboard.
|
||||
#
|
||||
import utime, gc, ngu, sys, chains
|
||||
import utime, gc, ngu, sys, bip39
|
||||
import uasyncio as asyncio
|
||||
from uasyncio import sleep_ms
|
||||
from charcodes import *
|
||||
from lcd_display import CHARS_W, CHARS_H, CursorSpec, CURSOR_SOLID, CURSOR_OUTLINE
|
||||
from exceptions import AbortInteraction, QRDecodeExplained
|
||||
import bip39
|
||||
from decoders import decode_qr_result
|
||||
from ubinascii import hexlify as b2a_hex
|
||||
from ubinascii import unhexlify as a2b_hex
|
||||
from ubinascii import b2a_base64
|
||||
|
||||
from utils import problem_file_line, show_single_address
|
||||
@ -996,20 +994,17 @@ class QRScannerInteraction:
|
||||
async def qr_psbt_sign(decoder, psbt_len, raw):
|
||||
# Got a PSBT coming in from QR scanner. Sign it.
|
||||
# - similar to auth.sign_psbt_file()
|
||||
from auth import UserAuthorizedAction, ApproveTransaction, try_push_tx
|
||||
from utils import CapsHexWriter
|
||||
from glob import dis, PSRAM
|
||||
from ux import show_qr_code, the_ux, ux_show_story
|
||||
from ux_q1 import show_bbqr_codes
|
||||
from auth import UserAuthorizedAction, ApproveTransaction, done_signing
|
||||
from ux import the_ux
|
||||
from sffile import SFFile
|
||||
from auth import MAX_TXN_LEN, TXN_INPUT_OFFSET, TXN_OUTPUT_OFFSET
|
||||
from auth import TXN_INPUT_OFFSET, psbt_encoding_taster
|
||||
|
||||
if raw != 'PSRAM': # might already be in place
|
||||
|
||||
# copy to PSRAM, and convert encoding at same time
|
||||
if isinstance(raw, str):
|
||||
raw = raw.encode()
|
||||
|
||||
# copy to PSRAM, and convert encoding at same time
|
||||
_, output_encoder, _ = psbt_encoding_taster(raw[:10], psbt_len)
|
||||
total = 0
|
||||
with SFFile(TXN_INPUT_OFFSET, max_size=psbt_len) as out:
|
||||
if not decoder:
|
||||
@ -1023,37 +1018,17 @@ async def qr_psbt_sign(decoder, psbt_len, raw):
|
||||
assert total <= psbt_len
|
||||
psbt_len = total
|
||||
|
||||
async def done(psbt):
|
||||
dis.fullscreen("Wait...")
|
||||
txid = None
|
||||
|
||||
with SFFile(TXN_OUTPUT_OFFSET, max_size=MAX_TXN_LEN, message="Saving...") as psram:
|
||||
|
||||
# save transaction, as hex into PSRAM
|
||||
with CapsHexWriter(psram) as fd:
|
||||
if psbt.is_complete():
|
||||
txid = psbt.finalize(fd)
|
||||
else:
|
||||
psbt.serialize(fd)
|
||||
|
||||
data_len, sha = psram.tell(), fd.checksum.digest()
|
||||
|
||||
UserAuthorizedAction.cleanup()
|
||||
|
||||
# Show the result as a QR, perhaps many BBQr's
|
||||
# - note: already HEX here!
|
||||
here = PSRAM.read_at(TXN_OUTPUT_OFFSET, data_len)
|
||||
if txid and await try_push_tx(a2b_hex(here), txid, sha):
|
||||
return # success, exit
|
||||
|
||||
msg = txid or "Partly Signed PSBT"
|
||||
try:
|
||||
await show_qr_code(here.decode(), is_alnum=True, msg=msg)
|
||||
except (ValueError, RuntimeError):
|
||||
await show_bbqr_codes('T' if txid else 'P', here, msg, already_hex=True)
|
||||
else:
|
||||
with SFFile(TXN_INPUT_OFFSET, max_size=psbt_len) as out:
|
||||
taste = out.read(10)
|
||||
_, output_encoder, _ = psbt_encoding_taster(taste, psbt_len)
|
||||
|
||||
UserAuthorizedAction.cleanup()
|
||||
UserAuthorizedAction.active_request = ApproveTransaction(psbt_len, approved_cb=done)
|
||||
UserAuthorizedAction.active_request = ApproveTransaction(
|
||||
psbt_len, approved_cb=done_signing,
|
||||
cb_kws={"input_method": "qr",
|
||||
"output_encoder": output_encoder}
|
||||
)
|
||||
the_ux.push(UserAuthorizedAction.active_request)
|
||||
|
||||
async def ux_visualize_txn(bin_txn):
|
||||
@ -1192,7 +1167,7 @@ async def show_bbqr_codes(type_code, data, msg, already_hex=False):
|
||||
# - BUT: need zlib compress (not present) .. delayed for now
|
||||
from bbqr import TYPE_LABELS, int2base36, b32encode, num_qr_needed
|
||||
from glob import PSRAM, dis
|
||||
from ux import ux_wait_keyup, ux_wait_keydown
|
||||
from ux import ux_wait_keydown
|
||||
import uqr
|
||||
|
||||
assert not PSRAM.is_at(data, 0) # input data would be overwritten with our work
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
# (c) Copyright 2020 by Coinkite Inc. This file is covered by license found in COPYING-CC.
|
||||
#
|
||||
import pytest, time, sys, random, re, ndef, os, glob, hashlib, json, functools, io, math, pdb
|
||||
import pytest, time, sys, random, re, ndef, os, glob, hashlib, json, functools, io, math, pdb, base64
|
||||
from subprocess import check_output
|
||||
from ckcc.protocol import CCProtocolPacker
|
||||
from helpers import B2A, U2SAT, hash160, addr_from_display_format
|
||||
@ -1291,12 +1291,12 @@ def try_sign_microsd(open_microsd, cap_story, pick_menu_item, goto_home,
|
||||
assert False, 'timed out'
|
||||
|
||||
txid = None
|
||||
lines = story.split('\n')
|
||||
if 'Final TXID:' in lines:
|
||||
txid = lines[-1]
|
||||
result_fname = lines[-4]
|
||||
lines = story.split('\n\n')
|
||||
if 'Final TXID:' in story:
|
||||
txid = lines[-2].split("\n")[-1]
|
||||
result_fname = lines[-3]
|
||||
else:
|
||||
result_fname = lines[-1]
|
||||
result_fname = lines[-2]
|
||||
|
||||
result = open_microsd(result_fname, 'rb').read()
|
||||
|
||||
@ -1702,6 +1702,42 @@ def nfc_read_text(nfc_read):
|
||||
return got.text
|
||||
return doit
|
||||
|
||||
@pytest.fixture()
|
||||
def nfc_read_txn(nfc_read, press_select):
|
||||
def doit(txid=None, contents=None):
|
||||
if contents is None:
|
||||
contents = nfc_read()
|
||||
time.sleep(.5)
|
||||
press_select()
|
||||
|
||||
got_txid = None
|
||||
got_txn = None
|
||||
got_psbt = None
|
||||
got_hash = None
|
||||
for got in ndef.message_decoder(contents):
|
||||
if got.type == 'urn:nfc:wkt:T':
|
||||
assert 'Transaction' in got.text or 'PSBT' in got.text
|
||||
if 'Transaction' in got.text and txid:
|
||||
assert b2a_hex(txid).decode() in got.text
|
||||
elif got.type == 'urn:nfc:ext:bitcoin.org:txid':
|
||||
got_txid = b2a_hex(got.data).decode('ascii')
|
||||
elif got.type == 'urn:nfc:ext:bitcoin.org:txn':
|
||||
got_txn = got.data
|
||||
elif got.type == 'urn:nfc:ext:bitcoin.org:psbt':
|
||||
got_psbt = got.data
|
||||
elif got.type == 'urn:nfc:ext:bitcoin.org:sha256':
|
||||
got_hash = got.data
|
||||
else:
|
||||
raise ValueError(got.type)
|
||||
|
||||
assert got_psbt or got_txn, 'no data?'
|
||||
assert got_hash
|
||||
assert got_hash == hashlib.sha256(got_psbt or got_txn).digest()
|
||||
|
||||
return got_txid, got_psbt, got_txn
|
||||
return doit
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def nfc_block4rf(sim_eval):
|
||||
# wait until RF is enabled and something to read (doesn't read it tho)
|
||||
@ -1814,10 +1850,10 @@ def load_export_and_verify_signature(microsd_path, virtdisk_path, verify_detache
|
||||
@pytest.fixture
|
||||
def load_export(need_keypress, cap_story, microsd_path, virtdisk_path, nfc_read_text, nfc_read_json,
|
||||
load_export_and_verify_signature, is_q1, press_cancel, press_select, readback_bbqr,
|
||||
cap_screen_qr):
|
||||
cap_screen_qr, nfc_read_txn):
|
||||
def doit(way, label, is_json, sig_check=True, addr_fmt=AF_CLASSIC, ret_sig_addr=False,
|
||||
tail_check=None, sd_key=None, vdisk_key=None, nfc_key=None, ret_fname=False,
|
||||
fpattern=None, qr_key=None):
|
||||
fpattern=None, qr_key=None, is_tx=False, encoding="base64"):
|
||||
|
||||
s_label = None
|
||||
if label == "Address summary":
|
||||
@ -1832,7 +1868,8 @@ def load_export(need_keypress, cap_story, microsd_path, virtdisk_path, nfc_read_
|
||||
time.sleep(0.2)
|
||||
title, story = cap_story()
|
||||
if way == "sd":
|
||||
if f"({key_map['sd']}) to save {s_label if s_label else label} file to SD Card" in story:
|
||||
if (f"({key_map['sd']}) to save {s_label if s_label else label} "
|
||||
f"{'' if is_tx else 'file '}to SD Card") in story:
|
||||
need_keypress(key_map['sd'])
|
||||
|
||||
elif way == "nfc":
|
||||
@ -1841,6 +1878,10 @@ def load_export(need_keypress, cap_story, microsd_path, virtdisk_path, nfc_read_
|
||||
else:
|
||||
need_keypress(key_map['nfc'])
|
||||
time.sleep(0.2)
|
||||
if is_tx:
|
||||
nfc_export = nfc_read_txn()
|
||||
return nfc_export[1:]
|
||||
|
||||
if is_json:
|
||||
nfc_export = nfc_read_json()
|
||||
else:
|
||||
@ -1863,6 +1904,8 @@ def load_export(need_keypress, cap_story, microsd_path, virtdisk_path, nfc_read_
|
||||
return json.loads(data)
|
||||
elif file_type == "U":
|
||||
return data.decode('utf-8') if not isinstance(data, str) else data
|
||||
elif file_type in ("P", "T"):
|
||||
return data
|
||||
else:
|
||||
raise NotImplementedError
|
||||
except:
|
||||
@ -1880,11 +1923,37 @@ def load_export(need_keypress, cap_story, microsd_path, virtdisk_path, nfc_read_
|
||||
|
||||
time.sleep(0.2)
|
||||
title, story = cap_story()
|
||||
path_f = microsd_path if way == "sd" else virtdisk_path
|
||||
if sig_check:
|
||||
export, sig_addr = load_export_and_verify_signature(story, way, is_json=is_json,
|
||||
addr_fmt=addr_fmt, label=label,
|
||||
tail_check=tail_check,
|
||||
fpattern=fpattern)
|
||||
elif is_tx:
|
||||
enc = "rb" if encoding == "binary" else "r"
|
||||
_split = story.split("\n\n")
|
||||
export = None
|
||||
if 'Updated PSBT is:' == _split[0]:
|
||||
fname = _split[1]
|
||||
path = path_f(fname)
|
||||
with open(path, enc) as f:
|
||||
export = f.read()
|
||||
|
||||
export_tx = None
|
||||
if "Finalized transaction (ready for broadcast)" in _split[2]:
|
||||
fname_tx = _split[3]
|
||||
path_tx = path_f(fname_tx)
|
||||
with open(path_tx, enc) as f:
|
||||
export_tx = f.read()
|
||||
else:
|
||||
# just finalized tx
|
||||
assert "Finalized transaction (ready for broadcast):" == _split[0]
|
||||
fname_tx = _split[1]
|
||||
path_tx = path_f(fname_tx)
|
||||
with open(path_tx, enc) as f:
|
||||
export_tx = f.read()
|
||||
|
||||
return export, export_tx
|
||||
else:
|
||||
assert f"{label} file written" in story
|
||||
if tail_check:
|
||||
@ -1896,10 +1965,8 @@ def load_export(need_keypress, cap_story, microsd_path, virtdisk_path, nfc_read_
|
||||
assert fpattern in fname
|
||||
if is_json:
|
||||
assert fname.endswith(".json")
|
||||
if way == "sd":
|
||||
path = microsd_path(fname)
|
||||
else:
|
||||
path = virtdisk_path(fname)
|
||||
|
||||
path = path_f(fname)
|
||||
with open(path, "r") as f:
|
||||
export = f.read()
|
||||
if is_json:
|
||||
@ -1916,6 +1983,110 @@ def load_export(need_keypress, cap_story, microsd_path, virtdisk_path, nfc_read_
|
||||
return doit
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def signing_artifacts_reexport(cap_story, need_keypress, load_export, press_cancel, is_q1):
|
||||
def doit(way, tx_final=False, txid=None, encoding=None, del_after=False, is_usb=False):
|
||||
label = "Finalized TX ready for broadcast" if tx_final else "Partly Signed PSBT"
|
||||
def _check_story(the_way):
|
||||
time.sleep(.2)
|
||||
title, story = cap_story()
|
||||
assert title == "PSBT Signed"
|
||||
|
||||
if the_way in ["qr", "nfc"]:
|
||||
what = label + " shared via %s." % the_way.upper()
|
||||
assert what in story
|
||||
else:
|
||||
if not del_after:
|
||||
assert "Updated PSBT is" in story
|
||||
if tx_final:
|
||||
assert "Finalized transaction (ready for broadcast)" in story
|
||||
if txid:
|
||||
assert txid in story
|
||||
|
||||
assert "Press (0) to re-export." in story
|
||||
need_keypress("0")
|
||||
|
||||
to_do = ["sd", "vdisk", "nfc", "qr"]
|
||||
if not is_usb:
|
||||
_check_story(way)
|
||||
to_do.remove(way) # put it as the last item
|
||||
to_do.append(way)
|
||||
|
||||
if not is_q1:
|
||||
to_do.remove("qr")
|
||||
|
||||
res = []
|
||||
res_tx = []
|
||||
for _way in to_do:
|
||||
try:
|
||||
rv = load_export(_way, label, is_json=False, sig_check=False,
|
||||
is_tx=True, encoding=encoding)
|
||||
if isinstance(rv, tuple):
|
||||
_psbt, _tx = rv
|
||||
if _psbt:
|
||||
res.append(_psbt)
|
||||
if _tx:
|
||||
res_tx.append(_tx)
|
||||
else:
|
||||
if tx_final:
|
||||
res_tx.append(rv)
|
||||
else:
|
||||
res.append(rv)
|
||||
if _way in ("qr", "nfc"):
|
||||
# nfc now needs cancel as it keeps reexporting
|
||||
# qr needs to go back from qr view
|
||||
press_cancel()
|
||||
_check_story(_way)
|
||||
except BaseException as e:
|
||||
if _way != "vdisk":
|
||||
raise
|
||||
|
||||
# check we exported the same - even if in different format
|
||||
final_res = []
|
||||
for x in res:
|
||||
if x is not None:
|
||||
x = x.strip()
|
||||
if isinstance(x, bytearray):
|
||||
x = bytes(x)
|
||||
if not isinstance(x, bytes):
|
||||
try:
|
||||
# is just a hex string
|
||||
x = bytes.fromhex(x)
|
||||
except:
|
||||
x = base64.b64decode(x)
|
||||
else:
|
||||
try:
|
||||
x = base64.b64decode(x.decode())
|
||||
except: pass
|
||||
|
||||
final_res.append(x)
|
||||
|
||||
final_res_tx = []
|
||||
for y in res_tx:
|
||||
if y is not None:
|
||||
y = y.strip()
|
||||
try:
|
||||
y = a2b_hex(y)
|
||||
except: pass
|
||||
if isinstance(y, bytearray):
|
||||
# bytearray is unhashable type
|
||||
y = bytes(y)
|
||||
|
||||
final_res_tx.append(y)
|
||||
|
||||
if not del_after and final_res:
|
||||
assert len(set(final_res)) == 1
|
||||
|
||||
fin_tx = None
|
||||
if final_res_tx:
|
||||
assert len(set(final_res_tx)) == 1
|
||||
fin_tx = final_res_tx[0]
|
||||
|
||||
return final_res[0] if final_res else None, fin_tx
|
||||
|
||||
return doit
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tapsigner_encrypted_backup(microsd_path, virtdisk_path):
|
||||
def doit(way, testnet=True):
|
||||
@ -2312,7 +2483,7 @@ from test_multisig import make_ms_address, clear_ms, make_myself_wallet, import_
|
||||
from test_nfc import try_sign_nfc
|
||||
from test_se2 import goto_trick_menu, clear_all_tricks, new_trick_pin, se2_gate, new_pin_confirmed
|
||||
from test_seed_xor import restore_seed_xor
|
||||
from test_ux import pass_word_quiz, word_menu_entry
|
||||
from test_ux import pass_word_quiz, word_menu_entry, enable_hw_ux
|
||||
from txn import fake_txn
|
||||
|
||||
# EOF
|
||||
|
||||
@ -232,6 +232,7 @@ def test_show_bbqr_contents(src, cap_screen_qr, sim_exec, render_bbqr, load_shar
|
||||
assert ft == 'B'
|
||||
|
||||
@pytest.mark.bitcoind
|
||||
@pytest.mark.reexport
|
||||
@pytest.mark.parametrize('size', [ 2, 10 ] )
|
||||
@pytest.mark.parametrize('max_ver', [ 20 ] ) # 20 max due to 4k USB buffer limit
|
||||
@pytest.mark.parametrize('encoding', '2HZ' )
|
||||
@ -242,7 +243,7 @@ def test_bbqr_psbt(size, encoding, max_ver, partial, segwit, scan_a_qr, readback
|
||||
cap_screen_qr, render_bbqr, goto_home, use_regtest, cap_story,
|
||||
decode_psbt_with_bitcoind, decode_with_bitcoind, fake_txn, dev,
|
||||
start_sign, end_sign, press_cancel, press_select, need_keypress,
|
||||
base64str, try_sign_bbqr):
|
||||
base64str, try_sign_bbqr, signing_artifacts_reexport):
|
||||
|
||||
num_in = size
|
||||
num_out = size*10
|
||||
@ -291,6 +292,12 @@ def test_bbqr_psbt(size, encoding, max_ver, partial, segwit, scan_a_qr, readback
|
||||
assert oc == num_out
|
||||
|
||||
press_cancel() # back to menu
|
||||
_psbt, _txn = signing_artifacts_reexport("qr", tx_final=not partial,
|
||||
encoding="binary")
|
||||
if partial:
|
||||
assert _psbt == rb
|
||||
else:
|
||||
assert _txn == rb
|
||||
|
||||
@pytest.mark.parametrize('test_size', [7854, 4592,
|
||||
758, 375, 465, # v15 capacity
|
||||
|
||||
@ -2864,7 +2864,7 @@ def test_bitcoind_MofN_tutorial(m_n, script, clear_ms, goto_home, need_keypress,
|
||||
split_story = story.split("\n\n")
|
||||
fname = split_story[1]
|
||||
fname_tx = split_story[3]
|
||||
cc_tx_id = split_story[-1].split("\n")[-1]
|
||||
cc_tx_id = split_story[-2].split("\n")[-1]
|
||||
with open(microsd_path(fname), "r") as f:
|
||||
final_psbt = f.read().strip()
|
||||
|
||||
@ -2960,7 +2960,7 @@ def test_bitcoind_MofN_tutorial(m_n, script, clear_ms, goto_home, need_keypress,
|
||||
assert "PSBT Signed" == title
|
||||
assert "Updated PSBT is:" in story
|
||||
press_select()
|
||||
fname = story.split("\n\n")[-1]
|
||||
fname = story.split("\n\n")[-2]
|
||||
with open(microsd_path(fname), "r") as f:
|
||||
cc_signed_psbt = f.read().strip()
|
||||
|
||||
|
||||
@ -149,7 +149,7 @@ def test_ndef_ccfile(ccfile, load_shared_mod):
|
||||
@pytest.fixture
|
||||
def try_sign_nfc(cap_story, pick_menu_item, goto_home, need_keypress,
|
||||
sim_exec, nfc_read, nfc_write, nfc_block4rf, press_select,
|
||||
press_cancel, press_nfc):
|
||||
press_cancel, press_nfc, nfc_read_txn):
|
||||
|
||||
# like "try_sign" but use NFC to send/receive PSBT/results
|
||||
|
||||
@ -263,29 +263,7 @@ def try_sign_nfc(cap_story, pick_menu_item, goto_home, need_keypress,
|
||||
press_select()
|
||||
txid = None
|
||||
|
||||
got_txid = None
|
||||
got_txn = None
|
||||
got_psbt = None
|
||||
got_hash = None
|
||||
for got in ndef.message_decoder(contents):
|
||||
if got.type == 'urn:nfc:wkt:T':
|
||||
assert 'Transaction' in got.text or 'PSBT' in got.text
|
||||
if 'Transaction' in got.text and txid:
|
||||
assert b2a_hex(txid).decode() in got.text
|
||||
elif got.type == 'urn:nfc:ext:bitcoin.org:txid':
|
||||
got_txid = b2a_hex(got.data).decode('ascii')
|
||||
elif got.type == 'urn:nfc:ext:bitcoin.org:txn':
|
||||
got_txn = got.data
|
||||
elif got.type == 'urn:nfc:ext:bitcoin.org:psbt':
|
||||
got_psbt = got.data
|
||||
elif got.type == 'urn:nfc:ext:bitcoin.org:sha256':
|
||||
got_hash = got.data
|
||||
else:
|
||||
raise ValueError(got.type)
|
||||
|
||||
assert got_psbt or got_txn, 'no data?'
|
||||
assert got_hash
|
||||
assert got_hash == sha256(got_psbt or got_txn).digest()
|
||||
got_txid, got_psbt, got_txn = nfc_read_txn(txid=txid, contents=contents)
|
||||
|
||||
if got_txid and not txid:
|
||||
# Txid not shown in pure NFC case
|
||||
@ -331,6 +309,7 @@ def try_sign_nfc(cap_story, pick_menu_item, goto_home, need_keypress,
|
||||
assert was.txn == now.txn
|
||||
assert was != now
|
||||
|
||||
press_cancel() # exit re-export animation
|
||||
return ip, (got_psbt or got_txn), txid
|
||||
|
||||
yield doit
|
||||
@ -381,10 +360,15 @@ def test_nfc_after(num_outs, fake_txn, try_sign, nfc_read, need_keypress,
|
||||
raise ValueError(got.type)
|
||||
|
||||
@pytest.mark.unfinalized # iff partial=1
|
||||
@pytest.mark.reexport
|
||||
@pytest.mark.parametrize('encoding', ['binary', 'hex', 'base64'])
|
||||
@pytest.mark.parametrize('num_outs', [1,2])
|
||||
@pytest.mark.parametrize('partial', [1, 0])
|
||||
def test_nfc_signing(encoding, num_outs, partial, try_sign_nfc, fake_txn, dev):
|
||||
def test_nfc_signing(encoding, num_outs, partial, try_sign_nfc, fake_txn, dev,
|
||||
signing_artifacts_reexport, microsd_wipe):
|
||||
# clear any possible files on SD - that are created by signing_artifacts_reexport
|
||||
microsd_wipe()
|
||||
|
||||
xp = dev.master_xpub
|
||||
|
||||
def hack(psbt):
|
||||
@ -396,7 +380,13 @@ def test_nfc_signing(encoding, num_outs, partial, try_sign_nfc, fake_txn, dev):
|
||||
|
||||
psbt = fake_txn(2, num_outs, xp, segwit_in=True, psbt_hacker=hack)
|
||||
|
||||
_, txn, txid = try_sign_nfc(psbt, expect_finalize=not partial, encoding=encoding)
|
||||
got_psbt, txn, txid = try_sign_nfc(psbt, expect_finalize=not partial, encoding=encoding)
|
||||
_psbt, _txn = signing_artifacts_reexport("nfc", tx_final=not partial, txid=txid,
|
||||
encoding=encoding)
|
||||
if partial:
|
||||
assert _psbt == txn
|
||||
else:
|
||||
assert _txn == txn
|
||||
|
||||
def test_rf_uid(rf_interface, cap_story, goto_home, pick_menu_item):
|
||||
# read UID of NFC chip over the air
|
||||
|
||||
@ -1306,12 +1306,13 @@ def test_txid_calc(num_ins, fake_txn, try_sign, dev, segwit, decode_with_bitcoin
|
||||
assert decoded['txid'] == txid
|
||||
|
||||
@pytest.mark.unfinalized # iff partial=1
|
||||
@pytest.mark.reexport
|
||||
@pytest.mark.parametrize('encoding', ['binary', 'hex', 'base64'])
|
||||
#@pytest.mark.parametrize('num_outs', [1,2,3,4,5,6,7,8])
|
||||
@pytest.mark.parametrize('num_outs', [1,15])
|
||||
@pytest.mark.parametrize('del_after', [1, 0])
|
||||
@pytest.mark.parametrize('partial', [1, 0])
|
||||
def test_sdcard_signing(encoding, num_outs, del_after, partial, try_sign_microsd, fake_txn, try_sign, dev, settings_set):
|
||||
def test_sdcard_signing(encoding, num_outs, del_after, partial, try_sign_microsd, fake_txn,
|
||||
dev, settings_set, signing_artifacts_reexport):
|
||||
# exercise the txn encode/decode from sdcard
|
||||
xp = dev.master_xpub
|
||||
|
||||
@ -1327,7 +1328,13 @@ def test_sdcard_signing(encoding, num_outs, del_after, partial, try_sign_microsd
|
||||
psbt = fake_txn(2, num_outs, xp, segwit_in=True, psbt_hacker=hack)
|
||||
|
||||
_, txn, txid = try_sign_microsd(psbt, finalize=not partial,
|
||||
encoding=encoding, del_after=del_after)
|
||||
encoding=encoding, del_after=del_after)
|
||||
_psbt, _txn = signing_artifacts_reexport("sd", tx_final=not partial, txid=txid,
|
||||
encoding=encoding, del_after=del_after)
|
||||
if partial:
|
||||
assert _psbt == txn
|
||||
else:
|
||||
assert _txn == txn
|
||||
|
||||
@pytest.mark.unfinalized
|
||||
@pytest.mark.parametrize('num_ins', [2,3,8])
|
||||
@ -2388,7 +2395,7 @@ def test_locktime_ux(use_regtest, bitcoind_d_sim_watch, start_sign, end_sign,
|
||||
assert "Finalized transaction (ready for broadcast)" in story
|
||||
assert "TXID" in story
|
||||
split_story = story.split("\n\n")
|
||||
story_txid = split_story[-1].split("\n")[-1]
|
||||
story_txid = split_story[-2].split("\n")[-1]
|
||||
signed_psbt_fname = split_story[1]
|
||||
with open(microsd_path(signed_psbt_fname), "r") as f:
|
||||
signed_psbt = f.read().strip()
|
||||
@ -2500,7 +2507,7 @@ def test_nsequence_blockheight_relative_locktime_ux(sequence, use_regtest, bitco
|
||||
assert "Finalized transaction (ready for broadcast)" in story
|
||||
assert "TXID" in story
|
||||
split_story = story.split("\n\n")
|
||||
story_txid = split_story[-1].split("\n")[-1]
|
||||
story_txid = split_story[-2].split("\n")[-1]
|
||||
signed_psbt_fname = split_story[1]
|
||||
with open(microsd_path(signed_psbt_fname), "r") as f:
|
||||
signed_psbt = f.read().strip()
|
||||
@ -2611,7 +2618,7 @@ def test_nsequence_timebased_relative_locktime_ux(seconds, use_regtest, bitcoind
|
||||
assert "Finalized transaction (ready for broadcast)" in story
|
||||
assert "TXID" in story
|
||||
split_story = story.split("\n\n")
|
||||
story_txid = split_story[-1].split("\n")[-1]
|
||||
story_txid = split_story[-2].split("\n")[-1]
|
||||
signed_psbt_fname = split_story[1]
|
||||
with open(microsd_path(signed_psbt_fname), "r") as f:
|
||||
signed_psbt = f.read().strip()
|
||||
@ -2724,7 +2731,7 @@ def test_mixed_locktimes(num_rtl, use_regtest, bitcoind_d_sim_watch, start_sign,
|
||||
assert "Finalized transaction (ready for broadcast)" in story
|
||||
assert "TXID" in story
|
||||
split_story = story.split("\n\n")
|
||||
story_txid = split_story[-1].split("\n")[-1]
|
||||
story_txid = split_story[-2].split("\n")[-1]
|
||||
signed_psbt_fname = split_story[1]
|
||||
with open(microsd_path(signed_psbt_fname), "r") as f:
|
||||
signed_psbt = f.read().strip()
|
||||
|
||||
@ -29,14 +29,16 @@ def test_vd_basics(dev, virtdisk_path, is_simulator):
|
||||
assert os.path.isfile(virtdisk_path(f'ident/ckcc-{sn}.txt'))
|
||||
|
||||
@pytest.fixture
|
||||
def try_sign_virtdisk(press_select, virtdisk_path, cap_story, virtdisk_wipe, press_cancel):
|
||||
def try_sign_virtdisk(press_select, virtdisk_path, cap_story, virtdisk_wipe, press_cancel,
|
||||
pick_menu_item, goto_home):
|
||||
|
||||
# like "try_sign" but use Virtual Disk to send/receive PSBT/results
|
||||
# - on real dev, need user to manually say yes ... alot
|
||||
# - on simulator, start with "--eject" arg so no SDCard emulated
|
||||
|
||||
|
||||
def doit(f_or_data, accept=True, expect_finalize=False, accept_ms_import=False, complete=False, encoding='binary'):
|
||||
def doit(f_or_data, accept=True, expect_finalize=False, accept_ms_import=False,
|
||||
encoding='binary'):
|
||||
|
||||
assert not accept_ms_import, 'no support'
|
||||
assert accept, 'no support'
|
||||
@ -48,7 +50,8 @@ def try_sign_virtdisk(press_select, virtdisk_path, cap_story, virtdisk_wipe, pre
|
||||
filename = 'memory'
|
||||
else:
|
||||
filename = f_or_data
|
||||
ip = open(f_or_data, 'rb').read()
|
||||
with open(f_or_data, 'rb') as f:
|
||||
ip = f.read()
|
||||
if ip[0:10] == b'70736274ff':
|
||||
ip = a2b_hex(ip.strip())
|
||||
assert ip[0:5] == b'psbt\xff'
|
||||
@ -65,12 +68,16 @@ def try_sign_virtdisk(press_select, virtdisk_path, cap_story, virtdisk_wipe, pre
|
||||
virtdisk_wipe()
|
||||
|
||||
xfn = virtdisk_path('testcase.psbt')
|
||||
open(xfn, 'wb').write(ip)
|
||||
with open(xfn, 'wb') as f:
|
||||
f.write(ip)
|
||||
|
||||
press_select() # ready to sign (hopefully)
|
||||
goto_home()
|
||||
pick_menu_item("Ready To Sign")
|
||||
|
||||
# CC scans drive, reads PSBT, verifies...
|
||||
time.sleep(1)
|
||||
title, story = cap_story()
|
||||
assert "OK TO SEND" in title
|
||||
|
||||
# approve siging txn
|
||||
if accept:
|
||||
@ -78,35 +85,40 @@ def try_sign_virtdisk(press_select, virtdisk_path, cap_story, virtdisk_wipe, pre
|
||||
else:
|
||||
press_cancel()
|
||||
|
||||
if accept == False:
|
||||
if accept is False:
|
||||
time.sleep(0.050)
|
||||
|
||||
# look for "Aborting..." ??
|
||||
return ip, None, None
|
||||
|
||||
# wait for it to finish signing
|
||||
time.sleep(.1)
|
||||
title, story = cap_story()
|
||||
if "OK TO SEND" in title or "PSBT Signed" in title:
|
||||
press_select()
|
||||
assert "PSBT Signed" in title
|
||||
|
||||
result_fn = xfn.replace('.psbt', '-*.psbt')
|
||||
result_txn = xfn.replace('.psbt', '.txn')
|
||||
split_story = story.split("\n\n")
|
||||
result_fn = split_story[1]
|
||||
result_txn = None
|
||||
result_txid = None
|
||||
if expect_finalize:
|
||||
result_txn = split_story[3]
|
||||
result_txid = split_story[4].split("\n")[-1]
|
||||
reexport_msg = split_story[5]
|
||||
else:
|
||||
reexport_msg = split_story[2]
|
||||
|
||||
assert "Press (0) to re-export." == reexport_msg
|
||||
got_psbt = None
|
||||
got_txn = None
|
||||
txid, got_txid = None, None
|
||||
for i in range(15):
|
||||
try:
|
||||
got_txn = open(result_txn, 'rb').read()
|
||||
except FileNotFoundError as e:
|
||||
print(e)
|
||||
pass
|
||||
|
||||
lst = glob.glob(result_fn)
|
||||
if lst:
|
||||
assert len(lst) == 1, "multi files: " + ', '.join(lst)
|
||||
result_fn = lst[0]
|
||||
got_psbt = open(result_fn, 'rb').read()
|
||||
for i in range(15):
|
||||
if result_txn:
|
||||
with open(virtdisk_path(result_txn), 'rb') as f:
|
||||
got_txn = f.read()
|
||||
|
||||
with open(virtdisk_path(result_fn), 'rb') as f:
|
||||
got_psbt = f.read()
|
||||
|
||||
# for delete-psbt mode
|
||||
for ff in glob.glob(virtdisk_path('*.txn')):
|
||||
@ -115,7 +127,9 @@ def try_sign_virtdisk(press_select, virtdisk_path, cap_story, virtdisk_wipe, pre
|
||||
got_txid = re.findall(r'[0-9a-f]{64}', ff)[0]
|
||||
except IndexError:
|
||||
got_txid = None
|
||||
got_txn = a2b_hex(open(ff, 'rt').read().strip())
|
||||
|
||||
with open(ff, 'rt') as f:
|
||||
got_txn = a2b_hex(f.read().strip())
|
||||
|
||||
if got_txn or got_psbt:
|
||||
break
|
||||
@ -130,10 +144,11 @@ def try_sign_virtdisk(press_select, virtdisk_path, cap_story, virtdisk_wipe, pre
|
||||
txid = got_txid
|
||||
|
||||
if got_txid:
|
||||
assert got_txn
|
||||
assert got_txid == txid
|
||||
assert expect_finalize
|
||||
open("debug/vd-result.txn", 'wb').write(got_txid)
|
||||
assert got_txn
|
||||
assert got_txid == txid == result_txid
|
||||
with open("debug/vd-result.txn", 'wb') as f:
|
||||
f.write(got_txid)
|
||||
|
||||
|
||||
# check output encoding matches input (for PSBT only)
|
||||
@ -160,7 +175,8 @@ def try_sign_virtdisk(press_select, virtdisk_path, cap_story, virtdisk_wipe, pre
|
||||
|
||||
if got_psbt:
|
||||
assert got_psbt[0:5] == b'psbt\xff'
|
||||
open("debug/vd-result.psbt", 'wb').write(got_psbt)
|
||||
with open("debug/vd-result.psbt", 'wb') as f:
|
||||
f.write(got_psbt)
|
||||
|
||||
from psbt import BasicPSBT
|
||||
was = BasicPSBT().parse(ip)
|
||||
@ -168,16 +184,18 @@ def try_sign_virtdisk(press_select, virtdisk_path, cap_story, virtdisk_wipe, pre
|
||||
assert was.txn == now.txn
|
||||
assert was != now
|
||||
|
||||
return ip, (got_psbt or got_txn), txid
|
||||
return ip, (got_txn or got_psbt), txid
|
||||
|
||||
return doit
|
||||
|
||||
|
||||
@pytest.mark.unfinalized # iff partial=1
|
||||
@pytest.mark.reexport
|
||||
@pytest.mark.parametrize('encoding', ['binary', 'hex', 'base64'])
|
||||
@pytest.mark.parametrize('num_outs', [1,2])
|
||||
@pytest.mark.parametrize('partial', [1, 0])
|
||||
def test_virtdisk_signing(encoding, num_outs, partial, try_sign_virtdisk, fake_txn, dev, sd_cards_eject):
|
||||
def test_virtdisk_signing(encoding, num_outs, partial, try_sign_virtdisk, fake_txn, dev,
|
||||
sd_cards_eject, signing_artifacts_reexport):
|
||||
xp = dev.master_xpub
|
||||
sd_cards_eject()
|
||||
|
||||
@ -192,6 +210,14 @@ def test_virtdisk_signing(encoding, num_outs, partial, try_sign_virtdisk, fake_t
|
||||
|
||||
_, txn, txid = try_sign_virtdisk(psbt, expect_finalize=not partial, encoding=encoding)
|
||||
|
||||
sd_cards_eject(slot_a=0)
|
||||
_psbt, _txn = signing_artifacts_reexport("vdisk", tx_final=not partial, txid=txid,
|
||||
encoding=encoding)
|
||||
if partial:
|
||||
assert _psbt == txn
|
||||
else:
|
||||
assert _txn == txn
|
||||
|
||||
if 0:
|
||||
@pytest.mark.parametrize('num_outs', [ 1, 20, 250])
|
||||
def test_virtdisk_after(num_outs, fake_txn, try_sign, nfc_read, need_keypress, cap_story, only_mk4):
|
||||
@ -260,7 +286,7 @@ def test_macos_detection():
|
||||
def test_import_prv_virtdisk(testnet, pick_menu_item, cap_story, need_keypress,
|
||||
unit_test, cap_menu, get_secrets, multiple_runs,
|
||||
reset_seed_words, virtdisk_path, virtdisk_wipe,
|
||||
settings_set, press_select):
|
||||
settings_set, press_select, enable_hw_ux):
|
||||
# copied from test_ux as we need vdisk enabled and card ejected
|
||||
if testnet:
|
||||
netcode = "XTN"
|
||||
@ -271,6 +297,8 @@ def test_import_prv_virtdisk(testnet, pick_menu_item, cap_story, need_keypress,
|
||||
|
||||
unit_test('devtest/clear_seed.py')
|
||||
|
||||
enable_hw_ux("vdisk")
|
||||
|
||||
fname = 'test-%d.txt' % os.getpid()
|
||||
path = virtdisk_path(fname)
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user