# (c) Copyright 2018 by Coinkite Inc. This file is covered by license found in COPYING-CC. # # Operations that require user authorization, like our core features: signing messages # and signing bitcoin transactions. # import stash, ure, chains, sys, gc, uio, version, ngu, ujson from ubinascii import b2a_base64, a2b_base64 from ubinascii import hexlify as b2a_hex from ubinascii import unhexlify as a2b_hex from uhashlib import sha256 from public_constants import AFC_SCRIPT, AF_CLASSIC, AFC_BECH32, SUPPORTED_ADDR_FORMATS from public_constants import STXN_FINALIZE, STXN_VISUALIZE, STXN_SIGNED, AF_P2SH, AF_P2WPKH_P2SH from sffile import SFFile from menu import MenuSystem, MenuItem from serializations import ser_uint256, SIGHASH_ALL from ux import ux_show_story, abort_and_goto, ux_dramatic_pause, ux_clear_keys, ux_confirm, the_ux from ux import show_qr_code, OK, X, abort_and_push, AbortInteraction, ux_input_text, ux_enter_number from usb import CCBusyError from utils import (HexWriter, xfp2str, problem_file_line, cleanup_deriv_path, B2A, node_from_privkey, show_single_address, keypath_to_str, seconds2human_readable) from psbt import psbtObject, FatalPSBTIssue, FraudulentChangeOutput from files import CardSlot, CardMissingError from exceptions import HSMDenied, QRTooBigError from version import MAX_TXN_LEN from charcodes import KEY_QR, KEY_NFC, KEY_ENTER, KEY_CANCEL, KEY_LEFT, KEY_RIGHT from msgsign import sign_message_digest # Where in SPI flash/PSRAM the two PSBT files are (in and out) TXN_INPUT_OFFSET = 0 TXN_OUTPUT_OFFSET = MAX_TXN_LEN class UserAuthorizedAction: active_request = None def __init__(self): self.refused = False self.failed = None self.result = None self.ux_done = False def done(self, redraw=True): # drop them back into menu system, but at top. self.ux_done = True from actions import goto_top_menu m = goto_top_menu() if redraw: m.show() def pop_menu(self): # drop them back into menu system, but try not to affect # menu position. from actions import goto_top_menu from ux import the_ux, restore_menu self.ux_done = True if the_ux.top_of_stack() == self: empty = the_ux.pop() if empty: goto_top_menu() restore_menu() @classmethod def cleanup(cls): # user has collected the results/errors and no need for objs cls.active_request = None gc.collect() @classmethod def check_busy(cls, allowed_cls=None): # see if we're busy. don't interrupt that... unless it's of allowed_cls # - also handle cleanup of stale actions if not cls.active_request: return if allowed_cls and isinstance(cls.active_request, allowed_cls): return # check if UX actually was cleared, and we're not really doing that anymore; recover # - happens if USB caller never comes back for their final results from ux import the_ux top_ux = the_ux.top_of_stack() if not isinstance(top_ux, cls) and cls.active_request.ux_done: # do cleaup cls.cleanup() return raise CCBusyError() async def failure(self, msg, exc=None, title='Failure'): self.failed = msg self.done() # show line number and/or simple text about error if exc: #print("%s:" % msg) #sys.print_exception(exc) msg += '\n\n' em = str(exc) if em: msg += em msg += '\n\n' msg += problem_file_line(exc) from glob import hsm_active, dis # do nothing more for HSM case: msg will be available over USB if hsm_active: dis.progress_bar_show(1) # finish the Validating... or whatever was up return # may be a user-abort waiting, but we want to see error msg; so clear it ux_clear_keys(True) return await ux_show_story(msg, title) # Confirmation text for user when signing text messages. # MSG_SIG_TEMPLATE = '''\ Ok to sign this? --=-- {msg} --=-- Using the key associated with address: {subpath} => {addr} Press %s to continue, otherwise %s to cancel.''' % (OK, X) class ApproveMessageSign(UserAuthorizedAction): def __init__(self, text, subpath, addr_fmt, approved_cb=None, msg_sign_request=None, allow_tab_nl=False, privkey=None): super().__init__() is_json = False from msgsign import validate_text_for_signing, parse_msg_sign_request if msg_sign_request: text, subpath, addr_fmt, is_json = parse_msg_sign_request(msg_sign_request) self.text = validate_text_for_signing( text, allow_tab_nl=is_json and allow_tab_nl ) self.subpath = cleanup_deriv_path(subpath) self.addr_fmt = chains.parse_addr_fmt_str(addr_fmt) self.approved_cb = approved_cb self.privkey = privkey from glob import dis dis.fullscreen('Wait...') if self.privkey: node = node_from_privkey(self.privkey) self.address = chains.current_chain().address(node, self.addr_fmt) else: with stash.SensitiveValues() as sv: node = sv.derive_path(self.subpath) self.address = sv.chain.address(node, self.addr_fmt) dis.progress_bar_show(1) async def interact(self): # Prompt user w/ details and get approval from glob import hsm_active if hsm_active: ch = await hsm_active.approve_msg_sign(self.text, self.address, self.subpath) else: story = MSG_SIG_TEMPLATE.format(msg=self.text, addr=show_single_address(self.address), subpath=self.subpath) ch = await ux_show_story(story) if ch != 'y': # they don't want to! self.refused = True else: # perform signing (progress bar shown) digest = chains.current_chain().hash_message(self.text.encode()) self.result, _ = sign_message_digest(digest, self.subpath, "Signing...", self.addr_fmt, pk=self.privkey) if self.approved_cb: # for micro sd case await self.approved_cb(self.result, self.address, self.text) if self.approved_cb: # don't kill menu depth for file case UserAuthorizedAction.cleanup() self.pop_menu() else: self.done() def sign_msg(text, subpath, addr_fmt): # Start the approval process for message signing. UserAuthorizedAction.check_busy() UserAuthorizedAction.active_request = ApproveMessageSign(text, subpath, addr_fmt) # kill any menu stack, and put our thing at the top abort_and_goto(UserAuthorizedAction.active_request) async def approve_msg_sign(text, subpath, addr_fmt, approved_cb=None, msg_sign_request=None, kill_menu=False, allow_tab_nl=False, privkey=None): # Ask user if they want to sign some short text message. UserAuthorizedAction.cleanup() UserAuthorizedAction.check_busy(ApproveMessageSign) try: UserAuthorizedAction.active_request = ApproveMessageSign( text, subpath, addr_fmt, approved_cb=approved_cb, msg_sign_request=msg_sign_request, allow_tab_nl=allow_tab_nl, privkey=privkey ) if kill_menu: abort_and_goto(UserAuthorizedAction.active_request) else: # do not kill the menu stack! just push from ux import the_ux the_ux.push(UserAuthorizedAction.active_request) except (AssertionError, ValueError) as exc: await ux_show_story("Problem: %s\n\nMessage to be signed must be a single line of ASCII text." % exc) async def sign_txt_file(filename): # sign a one-line text file found on a MicroSD card # - not yet clear how to do address types other than 'classic' from ux import the_ux from msgsign import sd_sign_msg_done async def done(signature, address, text): # complete. write out result orig_path, basename = filename.rsplit('/', 1) orig_path += '/' base = basename.rsplit('.', 1)[0] await sd_sign_msg_done(signature, address, text, base, orig_path) UserAuthorizedAction.cleanup() UserAuthorizedAction.check_busy() # copy message into memory with CardSlot() as card: with card.open(filename, 'rt') as fd: res = fd.read() await approve_msg_sign(None, None, None, approved_cb=done, msg_sign_request=res) async def try_push_tx(data, txid, txn_sha=None): # if NFC PushTx is enabled, do that w/o questions. from glob import settings, PSRAM, NFC url = settings.get('ptxurl', False) if NFC and url: try: if isinstance(data, int): data = PSRAM.read_at(TXN_OUTPUT_OFFSET, data) if txn_sha is None: txn_sha = ngu.hash.sha256s(data)[-8:] await NFC.share_push_tx(url, txid, data, txn_sha) return True except: pass # continue normally if it fails, perhaps too big? return False class ApproveTransaction(UserAuthorizedAction): def __init__(self, psbt_len, flags=None, psbt_sha=None, input_method=None, output_encoder=None, filename=None, offset=TXN_INPUT_OFFSET): super().__init__() self.offset = offset self.psbt_len = psbt_len # do finalize is None if not USB, None = decide based on is_complete if flags is None: self.do_finalize = self.do_visualize = None else: self.do_finalize = bool(flags & STXN_FINALIZE) self.do_visualize = bool(flags & STXN_VISUALIZE) self.stxn_flags = flags self.psbt = None self.psbt_sha = psbt_sha self.input_method = input_method self.output_encoder = output_encoder self.filename = filename self.result = None # will be (len, sha256) of the resulting PSBT self.chain = chains.current_chain() def render_output(self, o): # Pretty-print a transactions output. # - expects CTxOut object # - gives user-visible string # returns: tuple(ux_output_rendition, address_or_script_str_for_qr_display) # val = ' '.join(self.chain.render_value(o.nValue)) try: dest = self.chain.render_address(o.scriptPubKey) # known script types are short enough that we can display QR on both hw versions return '%s\n - to address -\n%s\n' % (val, show_single_address(dest)), dest except ValueError: pass # Handle future things better: allow them to happen at least. # sending to some unknown script, possibly very long # but full-show required for verification # OP_RETURN dest contains also OP_RETURN itself (for PSBT qr explorer) dest = B2A(o.scriptPubKey) # check for OP_RETURN data = self.chain.op_return(o.scriptPubKey) # In UX story only data are shown as OP_RETURN is part of base msg if data is None: rv = '%s\n - to script -\n%s\n' % (val, dest) else: base = '%s\n - OP_RETURN -\n%s' if not data: dest = "" rv = base % (val, "null-data\n") else: data_ascii = None if len(data) > 160: # completely arbitrary limit, prevents huge stories # anchor data are not relevant for verification - can be hidden ss = b2a_hex(data[:80]).decode() + "\n ⋯\n" + b2a_hex(data[-80:]).decode() # but we show empty QR in txn explorer for these big, modified data else: ss = b2a_hex(data).decode() if (min(data) >= 32) and (max(data) < 127): # printable & not huge try: data_ascii = data.decode("ascii") except: pass rv = base % (val, ss) if data_ascii: rv += " (ascii: %s)" % data_ascii rv += "\n" return rv, dest async def interact(self): # Prompt user w/ details and get approval from glob import dis, hsm_active from ccc import CCCFeature, SSSPFeature # step 1: parse PSBT from PSRAM into in-memory objects. try: with SFFile(self.offset, length=self.psbt_len, message='Reading...') as fd: # NOTE: psbtObject captures the file descriptor and uses it later self.psbt = psbtObject.read_psbt(fd) except BaseException as exc: if isinstance(exc, MemoryError): msg = "Transaction is too complex" exc = None else: msg = "PSBT parse failed" return await self.failure(msg, exc) dis.fullscreen("Validating...") # Do some analysis/ validation try: await self.psbt.validate() # might do UX: accept multisig import dis.progress_sofar(10, 100) if not self.psbt.wif_store: self.psbt.consider_keys() dis.progress_sofar(20, 100) ccc_c_xfp = CCCFeature.get_xfp() # can be None self.psbt.consider_inputs(cosign_xfp=ccc_c_xfp) if self.psbt.wif_store: self.psbt.consider_keys() dis.progress_sofar(50, 100) self.psbt.consider_outputs() dis.progress_sofar(75, 100) self.psbt.consider_dangerous_sighash() dis.progress_sofar(90, 100) except FraudulentChangeOutput as exc: #print('FraudulentChangeOutput: ' + exc.args[0]) return await self.failure(exc.args[0], title='Change Fraud') except FatalPSBTIssue as exc: #print('FatalPSBTIssue: ' + exc.args[0]) return await self.failure(exc.args[0]) except BaseException as exc: # sys.print_exception(exc) del self.psbt gc.collect() if isinstance(exc, MemoryError): msg = "Transaction is too complex" exc = None else: msg = "Invalid PSBT" return await self.failure(msg, exc) # early test for spending policy; not an error if violates policy # - might add warnings could_ccc_sign, ccc_needs_2fa = CCCFeature.could_cosign(self.psbt) # test for allowing any signature when in single-signer mode # - but CCC will override it. should_block, ss_needs_2fa = SSSPFeature.can_allow(self.psbt) if should_block and not could_ccc_sign: return await self.failure('Spending Policy violation.') # step 2: figure out what we are approving, so we can get sign-off # - outputs, amounts # - fee # # notes: # - try to handle lots of outputs # - cannot calc fee as sat/byte, only as percent # - somethings are 'warnings': # - fee too big # - inputs we can't sign (no key) # try: msg = uio.StringIO() is_por = self.psbt.por322 and (self.psbt.num_inputs > 1) # mention warning at top wl= len(self.psbt.warnings) if wl == 1: msg.write('(1 warning below)\n\n') elif wl >= 2: msg.write('(%d warnings below)\n\n' % wl) if self.psbt.por322: msg.write("%s\n\n" % ("Proof of Reserves" if is_por else "BIP-322 Message")) msg.write("Message:\n%s\n\n" % self.psbt.por322_msg) if is_por: msg.write("Amount %s %s\n\n" % self.chain.render_value(self.psbt.total_value_in)) try: addr = self.chain.render_address(self.psbt.por322_msg_challenge) msg.write("Challenge Address:\n%s\n\n" % show_single_address(addr)) except ValueError: msg.write("Message Challenge:\n%s\n\n" % b2a_hex(self.psbt.por322_msg_challenge).decode()) else: if self.psbt.consolidation_tx: # consolidating txn that doesn't change balance of account. msg.write("Consolidating %s %s\nwithin wallet.\n\n" % self.chain.render_value(self.psbt.total_value_out)) else: msg.write("Sending %s %s\n" % self.chain.render_value( self.psbt.total_value_out - self.psbt.total_change_value)) fee = self.psbt.calculate_fee() if fee is not None: msg.write("Network fee %s %s\n\n" % self.chain.render_value(fee)) if not self.psbt.por322 or is_por: msg.write(" %d %s\n %d %s\n\n" % ( self.psbt.num_inputs, "input" if self.psbt.num_inputs == 1 else "inputs", self.psbt.num_outputs, "output" if self.psbt.num_outputs == 1 else "outputs", )) if not self.psbt.por322: # outputs + change story created here self.output_summary_text(msg) gc.collect() if self.psbt.ux_notes: # currently we only have locktimes in ux_notes msg.write('TX LOCKTIMES\n\n') for label, m in self.psbt.ux_notes: msg.write('- %s: %s\n' % (label, m)) if self.psbt.warnings: msg.write('---WARNING---\n\n') for label, m in self.psbt.warnings: msg.write('- %s: %s\n\n' % (label, m)) if self.do_visualize: # stop here and just return the text of approval message itself self.result = await self.save_visualization(msg, (self.stxn_flags & STXN_SIGNED)) del self.psbt self.done() return ux_clear_keys(True) dis.progress_bar_show(1) # finish the Validating... if not hsm_active: esc = "2" noun = "transaction" if self.psbt.por322: noun = "proof of reserves" if is_por else "message" msg.write("Press %s to approve and sign %s." " Press (2) to explore transaction." % (OK, noun)) if (self.input_method == "sd") and CardSlot.both_inserted(): esc += "b" msg.write(" (B) to write to lower SD slot.") msg.write(" %s to abort." % X) title = "OK TO %s?" % ("SIGN" if self.psbt.por322 else "SEND") while True: ch = await ux_show_story(msg, title=title, escape=esc) if ch == "2": await TXExplorer.start(self) continue else: msg.close() del msg break else: # get approval (maybe) from the HSM ch = await hsm_active.approve_transaction(self.psbt, self.psbt_sha, msg.getvalue()) except MemoryError: # recovery? maybe. try: del self.psbt del msg except: pass # might be NameError since we don't know how far we got gc.collect() msg = "Transaction is too complex" return await self.failure(msg) if ch not in 'yb': # they don't want to sign! self.refused = True await ux_dramatic_pause("Refused.", 1) del self.psbt self.done() return if ccc_needs_2fa and could_ccc_sign: # They still need to pass web2fa challenge (but it meets other specs ok) try: await CCCFeature.web2fa_challenge() except: could_ccc_sign = False ch2 = await ux_show_story("Will not add CCC signature. Proceed anyway?") if ch2 != 'y': return await self.failure("2FA Failed") elif ss_needs_2fa: # Need 2FA for single-sig case .. refuse to sign if it fails. try: await SSSPFeature.web2fa_challenge() except: return await self.failure("2FA Failed") # do the actual signing. try: dis.fullscreen('Wait...') gc.collect() # visible delay caused by this but also sign_it() below self.psbt.sign_it() if could_ccc_sign: # this is where the CCC co-signing happens. dis.fullscreen('Co-Signing...') gc.collect() CCCFeature.sign_psbt(self.psbt) if SSSPFeature.is_enabled(): # update SSSP block_h even if SSSP blocks and overridden by CCC SSSPFeature.update_last_signed(self.psbt) except FraudulentChangeOutput as exc: return await self.failure(exc.args[0], title='Change Fraud') except MemoryError: msg = "Transaction is too complex" return await self.failure(msg) except BaseException as exc: return await self.failure("Signing failed late", exc) try: await done_signing(self.psbt, self, self.input_method, self.filename, self.output_encoder, slot_b=(ch == "b"), finalize=self.do_finalize) self.done() except AbortInteraction: # user might have sent new sign cmd, while we still at export prompt pass except BaseException as exc: # sys.print_exception(exc) return await self.failure("PSBT output failed", exc) async def save_visualization(self, msg, sign_text=False): # write story text out, maybe signing it as we go # - return length and checksum from charcodes import OUT_CTRL_ADDRESS txt_len = msg.seek(0, 2) msg.seek(0) chk = self.chain.hash_message(msg_len=txt_len) if sign_text else None with SFFile(TXN_OUTPUT_OFFSET, max_size=txt_len+300, message="Visualizing...") as fd: while 1: # replace with empty space, to keep correct txt_len - already hashed blk = msg.read(256).replace(OUT_CTRL_ADDRESS, ' ').encode('ascii') if not blk: break if chk: chk.update(blk) fd.write(blk) if chk: # append the signature digest = ngu.hash.sha256s(chk.digest()) sig, _ = sign_message_digest(digest, 'm', None, AF_CLASSIC) fd.write(b2a_base64(sig).decode('ascii').strip()) fd.write('\n') return fd.tell(), fd.checksum.digest() def output_summary_text(self, msg): # Produce text report of where their cash is going. This is what # they use to decide if correct transaction is being signed. # Produce text report of where all outputs, both normal and "change" are going. # - we do expect all users to verify these outputs completely; do not hide details # - show larger outputs first, total-up the not-shown values if any # - change shown as such, only because we've done all the check/validations already # - when too much to show now, offer to page user through all the ouputs (txn explorer) MAX_VISIBLE_OUTPUTS = const(10) MAX_VISIBLE_CHANGE = const(20) largest_outs = [] largest_change = [] total_change = 0 has_change = False for idx, tx_out in self.psbt.output_iter(): outp = self.psbt.outputs[idx] if outp.is_change: has_change = True total_change += tx_out.nValue if len(largest_change) < MAX_VISIBLE_CHANGE: _, addr = self.render_output(tx_out) largest_change.append((tx_out.nValue, addr)) if len(largest_change) == MAX_VISIBLE_CHANGE: largest_change = sorted(largest_change, key=lambda x: x[0], reverse=True) continue else: if len(largest_outs) < MAX_VISIBLE_OUTPUTS: rendered, _ = self.render_output(tx_out) largest_outs.append((tx_out.nValue, rendered)) if len(largest_outs) == MAX_VISIBLE_OUTPUTS: # descending sort from the biggest value to lowest (sort on out.nValue) largest_outs = sorted(largest_outs, key=lambda x: x[0], reverse=True) continue # insertion sort here = tx_out.nValue largest = largest_change if outp.is_change else largest_outs for li, (nv, txt) in enumerate(largest): if here > nv: keep = li break else: continue # too small largest.pop(-1) rendered, dest = self.render_output(tx_out) largest.insert(keep, (here, dest if outp.is_change else rendered)) # foreign outputs (soon to be other people's coins) visible_out_sum = 0 for val, txt in largest_outs: visible_out_sum += val msg.write(txt) # txt is result of render_output msg.write('\n') left = self.psbt.num_outputs - len(largest_outs) - self.psbt.num_change_outputs if left > 0: msg.write('.. plus %d smaller output(s), not shown here, which total: ' % left) # calculate left over value msg.write('%s %s\n' % self.chain.render_value( self.psbt.total_value_out - total_change - visible_out_sum)) msg.write("\n") # change outputs - verified to be coming back to our wallet if has_change: msg.write("Change back:\n%s %s\n" % self.chain.render_value(total_change)) visible_change_sum = 0 if len(largest_change) == 1: visible_change_sum += largest_change[0][0] msg.write(' - to address -\n%s\n\n' % show_single_address(largest_change[0][1])) else: msg.write(' - to addresses -\n') for val, addr in largest_change: visible_change_sum += val msg.write(show_single_address(addr)) msg.write('\n\n') left_c = self.psbt.num_change_outputs - len(largest_change) if left_c: msg.write('.. plus %d smaller change output(s), not shown here, which total: ' % left_c) msg.write('%s %s\n\n' % self.chain.render_value(total_change - visible_change_sum)) def sign_transaction(psbt_len, flags=0x0, psbt_sha=None, input_method="usb", offset=TXN_INPUT_OFFSET): # transaction (binary) loaded into PSRAM already, checksum checked UserAuthorizedAction.check_busy(ApproveTransaction) UserAuthorizedAction.active_request = ApproveTransaction( psbt_len, flags, psbt_sha=psbt_sha, input_method=input_method, offset=offset ) # kill any menu stack, and put our thing at the top abort_and_goto(UserAuthorizedAction.active_request) def psbt_encoding_taster(taste, psbt_len): # look at first 10 bytes, and detect file encoding (binary, hex, base64) # - return len is upper bound on size because of unknown whitespace from utils import HexStreamer, Base64Streamer, HexWriter, Base64Writer taste = bytes(taste) if taste[0:5] == b'psbt\xff': decoder = None output_encoder = lambda x: x elif taste[0:10].lower() == b'70736274ff': decoder = HexStreamer() output_encoder = HexWriter psbt_len //= 2 elif taste[0:6] == b'cHNidP': decoder = Base64Streamer() output_encoder = Base64Writer psbt_len = (psbt_len * 3 // 4) + 10 else: raise ValueError("not psbt") return decoder, output_encoder, psbt_len async def done_signing(psbt, tx_req, input_method=None, filename=None, output_encoder=None, slot_b=False, finalize=None): # User authorized PSBT for signing, and we added signatures. # - allow PushTX if enabled (first thing) # - can save final TXN out to SD card/VirtDisk, share by NFC, QR. from glob import PSRAM, hsm_active from sffile import SFFile from ux import show_qr_code, import_export_prompt first_time = True msg = None title = None is_complete = psbt.is_complete() if finalize is not None: # USB case - user can choose whether to attempt finalization is_complete = finalize if psbt.por322: # network txn strips PSBT BIP-32 with paths with pubkey required for verification # overrides --finalize from USB # disable pushTX for BIP-322 is_complete = False with SFFile(TXN_OUTPUT_OFFSET, max_size=MAX_TXN_LEN, message="Saving...") as psram: if is_complete: txid = psbt.finalize(psram) noun = "Finalized TX ready for broadcast" else: psbt.serialize(psram) noun = "Signed BIP-322 PSBT" if psbt.por322 else "Partly Signed PSBT" txid = None data_len = psram.tell() data_sha2 = psram.checksum.digest() if input_method == "usb": # return result over USB before going to all options tx_req.result = data_len, data_sha2 if hsm_active: # it is enough to just return back via USB, other options # are pointless return first_time = False msg = noun + " shared via USB." title = "PSBT Signed" elif input_method == "kt": first_time = False title = "PSBT Signed" if txid and await try_push_tx(data_len, txid, data_sha2): # go directly to reexport menu after pushTX first_time = False title = "TX Pushed" # for specific cases, key teleport is an option offer_kt = False if not is_complete and psbt.active_multisig and version.has_qwerty: offer_kt = 'use Key Teleport to send PSBT to other co-signers' while True: ch = None if first_time: # first time, assume they want to send out same way it came in -- don't prompt if input_method == "qr": ch = KEY_QR elif input_method == "nfc": ch = KEY_NFC else: # SD/VDisk ch = {"force_vdisk": input_method == "vdisk", "slot_b": slot_b} if not ch: # show all possible export options (based on hardware enabled, features) intro = [] key6 = None if msg: intro.append(msg) if txid: key6 = "for QR Code of TXID" intro.append('TXID:\n' + txid) # "force_prompt" is needed after first iteration as we can be Mk4, with NFC,Vdisk off, # no QR support & not finalizing (no option to show txid provided). # In that case this would just return dict and keep producing signed # files on SD infinitely (would never actually prompt). ch = await import_export_prompt(noun, intro="\n\n".join(intro), offer_kt=offer_kt, key6=key6, title=title, force_prompt=not first_time, no_qr=not version.has_qwerty) if ch == KEY_CANCEL: UserAuthorizedAction.cleanup() break elif txid and (ch == '6'): await show_qr_code(txid, is_alnum=True, force_msg=True) continue elif ch == KEY_QR: here = PSRAM.read_at(TXN_OUTPUT_OFFSET, data_len) msg = txid or noun try: if len(here) > 920: # too big for simple QR - use BBQr instead raise QRTooBigError hex_here = b2a_hex(here).upper().decode() await show_qr_code(hex_here, is_alnum=True, msg=msg) except QRTooBigError: from ux_q1 import show_bbqr_codes await show_bbqr_codes('T' if txid else 'P', here, msg) msg = noun + " shared via QR." del here elif ch == KEY_NFC: from glob import NFC if is_complete: await NFC.share_signed_txn(txid, TXN_OUTPUT_OFFSET, data_len, data_sha2) else: await NFC.share_psbt(TXN_OUTPUT_OFFSET, data_len, data_sha2) msg = noun + " shared via NFC." elif (ch == 't') and not is_complete: # they might want to teleport it, but only if we have PSBT # there is no need to teleport PSBT if txn is already complete & ready to be broadcast # updated PSBT is at TXN_OUTPUT_OFFSET (at TXN_INPUT_OFFSET is PSBT that is NOT updated) from teleport import kt_send_psbt ok = await kt_send_psbt(psbt, data_len, psbt_offset=TXN_OUTPUT_OFFSET) if ok is None: title = "Failed to Teleport" else: title = "Sent by Teleport" _, num_sigs_needed = ok if num_sigs_needed > 0: s, aux = ("", "is") if num_sigs_needed == 1 else ("s", "are") msg = "%d more signature%s %s still required." % (num_sigs_needed, s, aux) continue else: # typical case: save to SD card, show filenames we used assert isinstance(ch, dict) msg = await _save_to_disk(psbt, txid, ch, is_complete, data_len, output_encoder, filename) input_method = None first_time = False title = "PSBT Signed" async def _save_to_disk(psbt, txid, save_options, is_complete, data_len, output_encoder, filename=None): # Saving a PSBT from PSRAM to something disk-like. # - handle save-to-SD/VirtDisk cases. With re-attempt when no card, etc. assert isinstance(save_options, dict) # from import_export_prompt from glob import dis, settings, PSRAM import os dis.fullscreen("Wait...") if filename: _, basename = filename.rsplit('/', 1) base = basename.rsplit('.', 1)[0] else: base = 'recent-txn' # default encoding is binary output_encoder = output_encoder or (lambda x:x) out2_fn = None out_fn = None del_after = settings.get('del', 0) def _chunk_write(file_d, ofs, chunk=4096): written = 0 while written < data_len: if (written + chunk) > data_len: chunk = data_len - written file_d.write(PSRAM.read_at(ofs, chunk)) written += chunk ofs += chunk while 1: # try to put back into same spot, but also do top-of-card if not is_complete: # keep the filename under control during multiple passes target_fname = base.replace('-part', '') + '-part.psbt' else: # add -signed to end. We won't offer to sign again. target_fname = base + '-signed.psbt' # attempt write-out try: with CardSlot(**save_options) as card: out_full, out_fn = card.pick_filename(target_fname) out_path = out_full.rsplit("/", 1)[0] + "/" if is_complete and del_after: # don't write signed PSBT if we'd just delete it anyway out_fn = None else: with output_encoder(card.open(out_full, 'wb')) as fd: # save as updated PSBT if not is_complete: _chunk_write(fd, TXN_OUTPUT_OFFSET) else: psbt.serialize(fd) if is_complete: # write out as hex too, if it's final out2_full, out2_fn = card.pick_filename( base + '-final.txn' if not del_after else 'tmp.txn', out_path) if out2_full: with HexWriter(card.open(out2_full, 'w+t')) as fd: # save transaction, in hex if is_complete: _chunk_write(fd, TXN_OUTPUT_OFFSET) else: txid = psbt.finalize(fd) if del_after: # rename it now that we know the txid after_full, out2_fn = card.pick_filename( txid + '.txn', out_path, overwrite=True) os.rename(out2_full, after_full) if del_after and filename: # this can do nothing if they swapped SDCard between steps, which is ok, # but if the original file is still there, this blows it away. # - if not yet final, the foo-part.psbt file stays try: card.securely_blank_file(filename) except: pass # success and done! break except CardMissingError: prob = 'Need a card!\n\n' except OSError as exc: prob = 'Failed to write!\n\n%s\n\n' % exc # sys.print_exception(exc) # fall through to try again # If this point reached, some problem, we could not write. if save_options.get('force_vdisk'): await ux_show_story(prob, title='Error') # they can't fix here, so give up return # prompt them to input another card? ch = await ux_show_story( prob + "Please insert a card to receive signed transaction, " "and press OK.", title="Need Card") if ch == 'x': return # Done, show the filenames we used. if out_fn: msg = "Updated PSBT is:\n\n%s" % out_fn if out2_fn: msg += '\n\n' else: # del_after is probably set msg = '' if out2_fn: msg += 'Finalized transaction (ready for broadcast):\n\n%s' % out2_fn return msg async def sign_psbt_file(filename, force_vdisk=False, slot_b=None, just_read=False, ux_abort=False): # sign a PSBT file found on a MicroSD card # - or from VirtualDisk (mk4) # - to re-use reading/decoding logic, pass just_read from glob import dis from ux import the_ux tmp_buf = bytearray(4096) # copy file into PSRAM # - can't work in-place on the card because we want to support writing out to different card # - accepts hex or base64 encoding, but binary preferred with CardSlot(force_vdisk, readonly=True, slot_b=slot_b) as card: with card.open(filename, 'rb') as fd: dis.fullscreen('Reading...', 0) # see how long it is psbt_len = fd.seek(0, 2) fd.seek(0) # determine encoding used, altho we prefer binary taste = fd.read(10) fd.seek(0) decoder, output_encoder, psbt_len = psbt_encoding_taster(taste, psbt_len) total = 0 with SFFile(TXN_INPUT_OFFSET, max_size=psbt_len) as out: while 1: n = fd.readinto(tmp_buf) if not n: break if n == len(tmp_buf): abuf = tmp_buf else: abuf = memoryview(tmp_buf)[0:n] if not decoder: out.write(abuf) total += n else: for here in decoder.more(abuf): out.write(here) total += len(here) dis.progress_sofar(total, psbt_len) # might have been whitespace inflating initial estimate of PSBT size assert total <= psbt_len psbt_len = total if just_read: return psbt_len UserAuthorizedAction.cleanup() UserAuthorizedAction.active_request = ApproveTransaction( psbt_len, input_method="vdisk" if force_vdisk else "sd", filename=filename, output_encoder=output_encoder, ) if ux_abort: # needed for auto vdisk mode abort_and_push(UserAuthorizedAction.active_request) else: the_ux.push(UserAuthorizedAction.active_request) class RemoteBackup(UserAuthorizedAction): def __init__(self): super().__init__() # self.result ... will be (len, sha256) of the resulting file at zero async def interact(self): try: # Lead the user thru a complex UX. from backups import make_complete_backup r = await make_complete_backup(write_sflash=True) if r: # expect (length, sha) self.result = r else: self.refused = True except BaseException as exc: self.failed = "Error during backup process." #print("Backup failure: ") #sys.print_exception(exc) finally: self.done() class RemoteRestoreBackup(UserAuthorizedAction): def __init__(self, file_len, bitflag): super().__init__() self.file_len = file_len self.custom_pwd = bitflag & 1 self.plaintext = bitflag & 2 self.force_tmp = bitflag & 4 def to_words(self): # conversion to "words" argument of "restore_complete" function if self.plaintext: return None elif self.custom_pwd: return False return True def to_tmp(self): # conversion to "temporary" argument of "restore_complete" function from pincodes import pa if pa.is_secret_blank() and not self.force_tmp: # no master secret & not forcing tmp # will load backup as master seed return False, "master" # has master secret --> load backup as tmp # secret is blank but user forcing tmp return True, "temporary" async def interact(self): try: # requires confirm from user tmp, noun = self.to_tmp() if await ux_confirm("Restore uploaded backup as a %s seed?" % noun): from backups import restore_complete await restore_complete(self.file_len, tmp, self.to_words(), usb=True) else: self.refused = True except BaseException as exc: self.failed = "Error during backup restore." # sys.print_exception(exc) finally: self.done() def start_remote_backup(): # tell the local user the secret words, and then save to SPI flash # USB caller has to come back and download encrypted contents. UserAuthorizedAction.cleanup() UserAuthorizedAction.active_request = RemoteBackup() # kill any menu stack, and put our thing at the top abort_and_goto(UserAuthorizedAction.active_request) def start_remote_restore_backup(file_len, bitflag): UserAuthorizedAction.cleanup() UserAuthorizedAction.active_request = RemoteRestoreBackup(file_len, bitflag) # kill any menu stack, and put our thing at the top abort_and_goto(UserAuthorizedAction.active_request) class NewPassphrase(UserAuthorizedAction): def __init__(self, pw): super().__init__() self._pw = pw # self.result ... will be (len, sha256) of the resulting file at zero async def interact(self): # prompt them from glob import settings from pincodes import pa title = "Passphrase" escape = "yx2" + KEY_CANCEL + KEY_ENTER while 1: msg = ('BIP-39 passphrase (%d chars long) has been provided over ' 'USB connection. Should we switch to that wallet now?\n\n' 'Press %s to add passphrase ' % (len(self._pw), OK)) if pa.tmp_value: msg += "to current active temporary seed. " else: msg += "to master seed. " msg += ('Press (2) to view the provided passphrase. %s to cancel.' % X) ch = await ux_show_story(msg=msg, title=title, escape=escape, strict_escape=True) if ch == '2': await ux_show_story('Provided:\n\n%s\n\n' % self._pw, title=title) continue else: break try: if ch not in ('y'+ KEY_ENTER): # they don't want to! self.refused = True await ux_dramatic_pause("Refused.", 1) else: from seed import set_bip39_passphrase # full screen message shown: "Working..." await set_bip39_passphrase(self._pw, summarize_ux=False) self.result = settings.get('xpub') except BaseException as exc: self.failed = "Exception" # sys.print_exception(exc) finally: self.done() if self.result: new_xfp = settings.get('xfp') await ux_show_story('Above is the master key fingerprint ' 'of the current wallet.', title="[%s]" % xfp2str(new_xfp)) def start_bip39_passphrase(pw): # new passphrase has come in via USB. offer to switch to it. UserAuthorizedAction.cleanup() UserAuthorizedAction.active_request = NewPassphrase(pw) # kill any menu stack, and put our thing at the top abort_and_goto(UserAuthorizedAction.active_request) class ShowAddressBase(UserAuthorizedAction): title = 'Address:' def __init__(self, *args, **kwargs): self.restore_menu = kwargs.get("restore_menu", False) super().__init__() from glob import dis dis.fullscreen('Wait...') # this must set self.address and do other slow setup self.setup(*args) async def interact(self): # Just show the address... no real confirmation needed. from glob import hsm_active, dis, NFC if not hsm_active: msg = self.get_msg() msg += '\n\nCompare this payment address to the one shown on your other, less-trusted, software.' esc = "4" if not version.has_qwerty: if NFC: msg += ' Press (3) to share via NFC.' esc += "3" msg += ' Press (4) to view QR Code.' while 1: ch = await ux_show_story(msg, title=self.title, escape=esc, hint_icons=KEY_QR+(KEY_NFC if NFC else '')) if ch in '4'+KEY_QR: await show_qr_code(self.address, (self.addr_fmt & AFC_BECH32), is_addrs=True) continue if NFC and (ch in '3'+KEY_NFC): await NFC.share_text(self.address) continue break else: # finish the Wait... dis.progress_bar_show(1) if self.restore_menu: self.pop_menu() else: self.done() UserAuthorizedAction.cleanup() # because no results to store class ShowPKHAddress(ShowAddressBase): def setup(self, addr_fmt, subpath): self.subpath = subpath self.addr_fmt = addr_fmt with stash.SensitiveValues() as sv: node = sv.derive_path(subpath) self.address = sv.chain.address(node, addr_fmt) def get_msg(self): return '''{addr}\n\n= {sp}''' .format(addr=show_single_address(self.address), sp=self.subpath) class ShowP2SHAddress(ShowAddressBase): def setup(self, ms, addr_fmt, xfp_paths, witdeem_script): self.witdeem_script = witdeem_script self.addr_fmt = addr_fmt self.ms = ms # calculate all the pubkeys involved. self.subpath_help = ms.validate_script(witdeem_script, xfp_paths=xfp_paths) self.address = ms.chain.p2sh_address(addr_fmt, witdeem_script) def get_msg(self): return '''\ {addr} Wallet: {name} {M} of {N} Paths: {sp}'''.format(addr=show_single_address(self.address), name=self.ms.name, M=self.ms.M, N=self.ms.N, sp='\n\n'.join(self.subpath_help)) def start_show_p2sh_address(M, N, addr_format, xfp_paths, witdeem_script): # Show P2SH address to user, also returns it. # - first need to find appropriate multisig wallet associated # - they must provide full redeem script, and we will re-verify it and check pubkeys inside it from multisig import MultisigWallet try: assert addr_format in SUPPORTED_ADDR_FORMATS assert addr_format & AFC_SCRIPT except: raise AssertionError('Unknown/unsupported addr format') # Search for matching multisig wallet that we must already know about xs = list(xfp_paths) xs.sort() ms = MultisigWallet.find_match(M, N, xs) assert ms, 'Multisig wallet with those fingerprints not found' assert ms.M == M assert ms.N == N UserAuthorizedAction.check_busy(ShowAddressBase) UserAuthorizedAction.active_request = ShowP2SHAddress(ms, addr_format, xfp_paths, witdeem_script) # kill any menu stack, and put our thing at the top abort_and_goto(UserAuthorizedAction.active_request) # provide the value back to attached desktop return UserAuthorizedAction.active_request.address def show_address(addr_format, subpath, restore_menu=False): try: assert addr_format in SUPPORTED_ADDR_FORMATS assert not (addr_format & AFC_SCRIPT) except: raise AssertionError('Unknown/unsupported addr format') # require a path to a key subpath = cleanup_deriv_path(subpath) from glob import hsm_active if hsm_active and not hsm_active.approve_address_share(subpath): raise HSMDenied UserAuthorizedAction.cleanup() UserAuthorizedAction.check_busy(ShowAddressBase) UserAuthorizedAction.active_request = ShowPKHAddress(addr_format, subpath, restore_menu=restore_menu) return UserAuthorizedAction.active_request def usb_show_address(addr_format, subpath): active_request = show_address(addr_format, subpath) # kill any menu stack, and put our thing at the top abort_and_goto(active_request) # provide the value back to attached desktop return active_request.address class NewEnrollRequest(UserAuthorizedAction): def __init__(self, ms): super().__init__() self.wallet = ms # self.result ... will be re-serialized xpub async def interact(self): from multisig import MultisigOutOfSpace ms = self.wallet try: ch = await ms.confirm_import() if ch != 'y': # they don't want to! self.refused = True await ux_dramatic_pause("Refused.", 2) except MultisigOutOfSpace: return await self.failure('No space left') except BaseException as exc: self.failed = "Exception" # sys.print_exception(exc) finally: UserAuthorizedAction.cleanup() # because no results to store self.pop_menu() def maybe_enroll_xpub(sf_len=None, config=None, name=None, ux_reset=False): # Offer to import (enroll) a new multisig wallet. Allow reject by user. from glob import dis from multisig import MultisigWallet UserAuthorizedAction.cleanup() dis.fullscreen('Wait...') # needed dis.busy_bar(True) try: if sf_len: with SFFile(TXN_INPUT_OFFSET, length=sf_len) as fd: config = fd.read(sf_len).decode() try: j_conf = ujson.loads(config) assert "desc" in j_conf, "'desc' key required" config = j_conf["desc"] assert config, "'desc' empty" if "name" in j_conf: # name from json has preference over filenames and desc checksum name = j_conf["name"] assert 2 <= len(name) <= 40, "'name' length" except ValueError: pass # this call will raise on parsing errors, so let them rise up # and be shown on screen/over usb ms = MultisigWallet.from_file(config, name=name) UserAuthorizedAction.active_request = NewEnrollRequest(ms) if ux_reset: # for USB case, and import from PSBT # kill any menu stack, and put our thing at the top abort_and_goto(UserAuthorizedAction.active_request) else: # menu item case: add to stack from ux import the_ux the_ux.push(UserAuthorizedAction.active_request) finally: # always finish busy bar dis.busy_bar(False) class FirmwareUpgradeRequest(UserAuthorizedAction): def __init__(self, hdr, length, hdr_check=False, psram_offset=None): super().__init__() self.hdr = hdr self.length = length self.hdr_check = hdr_check self.psram_offset = psram_offset async def interact(self): from version import decode_firmware_header from utils import check_firmware_hdr # check header values if self.hdr_check: # when coming in via USB, this part already done # so the error can be sent back over USB port failed = check_firmware_hdr(self.hdr, self.length) if failed: await ux_show_story(failed, 'Sorry!') UserAuthorizedAction.cleanup() self.pop_menu() return # Get informed consent to upgrade. date, version, _ = decode_firmware_header(self.hdr) msg = '''\ Install this new firmware? {version} {built} Binary checksum and signature will be further verified before any changes are made. '''.format(version=version, built=date) try: ch = await ux_show_story(msg) if ch == 'y': # Accepted: # - write final file header, so bootloader will see it # - reboot to start process from glob import dis dis.fullscreen('Upgrading...', percent=1) dis.bootrom_takeover() # Mk4 copies from PSRAM to flash inside bootrom, we have # nothing to do here except start that process. from pincodes import pa pa.firmware_upgrade(self.psram_offset, self.length) # not reached, unless issue? raise RuntimeError("bootrom fail") else: # they don't want to! self.refused = True await ux_dramatic_pause("Refused.", 2) except BaseException as exc: self.failed = "Exception" # sys.print_exception(exc) finally: UserAuthorizedAction.cleanup() # because no results to store self.pop_menu() def authorize_upgrade(hdr, length, **kws): # final USB write has come in, get buy-in # Do some verification before we even show to the local user UserAuthorizedAction.check_busy() UserAuthorizedAction.active_request = FirmwareUpgradeRequest(hdr, length, **kws) # kill any menu stack, and put our thing at the top abort_and_goto(UserAuthorizedAction.active_request) class TXExplorer: def __init__(self, n, user_auth_action, max_items): self.n = n self.user_auth_action = user_auth_action self.max_items = max_items self.chain = chains.current_chain() self.qr_msgs = [] self.title = None def can_goto_idx(self): return self.max_items > 1 @classmethod async def start(cls, user_auth_action): rv = [ MenuItem("Inputs", f=TXInpExplorer(user_auth_action).explore), MenuItem("Outputs", f=TXOutExplorer(user_auth_action).explore), ] the_ux.push(MenuSystem(rv)) await the_ux.interact() def make_ux_msg(self, offset, count): from glob import dis dis.fullscreen('Wait...') esc = "4"+KEY_QR rv = "" qrs = [] change = [] end = min(offset + count, self.max_items) for idx, item in self.yield_item(offset, end, qrs, change): rv += item dis.progress_sofar(idx-offset+1, count) hints = [] if end < self.max_items: hints.append('RIGHT to see next group') esc += KEY_RIGHT + "9" if offset: hints.append('LEFT to go back') esc += KEY_LEFT + "7" if self.can_goto_idx(): hints.append("(2) to go to index") esc += "2" if not version.has_qwerty: # Q has hint key hints.append("(4) to show QR code") if hints: rv += 'Press ' + ', '.join(hints) rv += ('. %s to quit.' % X) else: rv += 'Press %s to quit.' % X return rv, qrs, change, end, esc async def explore(self, *a): # Page through unlimited-sized transaction details # - shows all outputs (including change): their address and amounts. # - shows all inputs: utxo amount and address, txid & tx index. start = 0 msg, addrs, change, end, esc = self.make_ux_msg(start, self.n) while True: ch = await ux_show_story(msg, title=self.title, hint_icons=KEY_QR, escape=esc) if ch == 'x': del msg return elif (ch in "4"+KEY_QR) and addrs: from ux import show_qr_codes # showing addresses from PSBT, no idea what is in there # handle QR code failures gracefully await show_qr_codes(addrs, False, start, is_addrs=True, change_idxs=change, can_raise=False, qr_msgs=self.qr_msgs, no_index=bool(self.qr_msgs)) continue elif ch in (KEY_LEFT+"7"): if not start: continue # 0 start = max(start - self.n, 0) elif ch in (KEY_RIGHT+"9"): if (start + self.n) >= self.max_items: continue else: # go forwards start += self.n elif (ch == "2") and (self.max_items > 1): max_v = self.max_items - 1 res = await ux_enter_number("Start Idx (0-%d):" % max_v, max_value=max_v) if res is None: continue start = res else: # nothing changed - do not recalc msg continue msg, addrs, change, end, esc = self.make_ux_msg(start, self.n) class TXOutExplorer(TXExplorer): def __init__(self, user_auth_action): super().__init__(10, user_auth_action, user_auth_action.psbt.num_outputs) def yield_item(self, offset, end, qr_items, change_idxs): # showing 10 outputs per UX page (just address/script + whether change) self.title = "%d-%d" % (offset, end - 1) for i, (idx, out) in enumerate(self.user_auth_action.psbt.output_iter(offset, end)): outp = self.user_auth_action.psbt.outputs[idx] item = "Output %d%s:\n\n" % (idx, " (change)" if outp.is_change else "") msg, addr_or_script = self.user_auth_action.render_output(out) item += msg qr_items.append(addr_or_script) if outp.is_change: change_idxs.append(i) item += "\n" yield idx, item class TXInpExplorer(TXExplorer): def __init__(self, user_auth_action): super().__init__(1, user_auth_action, user_auth_action.psbt.num_inputs) self.qr_msgs = ["TXID", "UTXO ADDR"] def yield_item(self, offset, end, qr_items, change_idxs): # showing just one input per UX page i, (idx, txin) = next(enumerate(self.user_auth_action.psbt.input_iter(offset, offset+1))) self.title = "Input %d" % idx inp = self.user_auth_action.psbt.inputs[idx] txid = b2a_hex(ser_uint256(txin.prevout.hash)).decode() qr_items.append(txid) item = "%s:%d\n\n" % (txid, txin.prevout.n) has_utxo = inp.has_utxo() if has_utxo: utxo = inp.get_utxo(txin.prevout.n) spk = b2a_hex(utxo.scriptPubKey).decode() try: addr = self.chain.render_address(utxo.scriptPubKey) except: # some script we do not understand addr = None val, unit = self.chain.render_value(utxo.nValue) item += "=== UTXO ===\n\n%s %s\n\n%s\n\n" % (val, unit, spk) if addr: item += show_single_address(addr) + "\n\n" qr_items.append(addr) if inp.addr_fmt is not None: item += "Address Format: %s\n\n" % chains.addr_fmt_str(inp.addr_fmt) if self.user_auth_action.psbt.txn_version >= 2: has_rtl = inp.has_relative_timelock(txin) if has_rtl: if has_rtl[0]: val = seconds2human_readable(has_rtl[1]) msg = "time-based timelock of:\n %s" % val else: msg = "block height timelock of %d blocks" % (has_rtl[1]) item += "Input has relative %s\n\n" % msg psbt_item = "" if inp.required_key: ws = self.user_auth_action.psbt.wif_store our = [inp.required_key] if isinstance(inp.required_key, bytes) else inp.required_key psbt_item += "Our key%s:\n\n" % ("s" if len(our) > 1 else "") wif_note = "(WIF Store)" for k in our: pubkey = b2a_hex(k).decode() pth = inp.subpaths.get(k) note = "" if pth: label = keypath_to_str(pth, prefix="%s/" % xfp2str(pth[0])) if ws and k in ws: note = "\n" + wif_note psbt_item += "%s:\n%s%s\n\n" % (label, pubkey, note) else: psbt_item += "%s\n%s\n\n" % (pubkey, wif_note) if inp.is_multisig: ks_coord = inp.witness_script or inp.redeem_script if ks_coord: ks = self.user_auth_action.psbt.get(ks_coord) from multisig import disassemble_multisig_mn try: M, N = disassemble_multisig_mn(ks) psbt_item += "Multisig: %dof%d\n\n" % (M, N) except: pass if inp.part_sigs: # do not show XFPs in case input is fully signed --> elif # only part_sig should be available, as we haven't signed yet so added_sigs empty done = [] for pk, pth in inp.subpaths.items(): if pk in inp.part_sigs: done.append(xfp2str(pth[0])) if inp.fully_signed: psbt_item += "Input fully signed.\n\n" else: psbt_item += "Already signed:\n" for xfp in done: psbt_item += " %s\n" % xfp psbt_item += "\n" if inp.sighash and (inp.sighash != SIGHASH_ALL): # only show sighash value to the user if it is non-standard psbt_item += "sighash: %s\n\n" % { 1: "ALL", 2: "NONE", 3: "SINGLE", 1 | 0x80: "ALL|ANYONECANPAY", 2 | 0x80: "NONE|ANYONECANPAY", 3 | 0x80: "SINGLE|ANYONECANPAY", }.get(inp.sighash, "0x%02x (non-standard)" % inp.sighash) if psbt_item: psbt_item = "=== PSBT ===\n\n" + psbt_item item += psbt_item yield idx, item # EOF