# (c) Copyright 2018 by Coinkite Inc. This file is covered by license found in COPYING-CC. # # psbt.py - understand PSBT file format: verify and generate them # import stash, gc, history, sys, ngu, ckcc, chains from ustruct import unpack_from, unpack, pack from ubinascii import hexlify as b2a_hex from utils import xfp2str, B2A, keypath_to_str from utils import seconds2human_readable, datetime_from_timestamp, datetime_to_str, node_from_privkey from chains import NLOCK_IS_TIME from uhashlib import sha256 from uio import BytesIO from sffile import SizerFile from multisig import MultisigWallet, disassemble_multisig_mn from exceptions import FatalPSBTIssue, FraudulentChangeOutput from serializations import ser_compact_size, deser_compact_size, hash160 from serializations import CTransaction, CTxIn, CTxInWitness, CTxOut, ser_string, COutPoint from serializations import ser_sig_der, uint256_from_str, ser_push_data from serializations import SIGHASH_ALL, SIGHASH_SINGLE, SIGHASH_NONE, SIGHASH_ANYONECANPAY from serializations import ALL_SIGHASH_FLAGS from opcodes import OP_CHECKMULTISIG, OP_RETURN from glob import settings from wif import WIFStore from public_constants import ( PSBT_GLOBAL_UNSIGNED_TX, PSBT_GLOBAL_XPUB, PSBT_IN_NON_WITNESS_UTXO, PSBT_IN_WITNESS_UTXO, PSBT_IN_PARTIAL_SIG, PSBT_IN_SIGHASH_TYPE, PSBT_IN_REDEEM_SCRIPT, PSBT_IN_WITNESS_SCRIPT, PSBT_IN_BIP32_DERIVATION, PSBT_IN_FINAL_SCRIPTSIG, PSBT_IN_FINAL_SCRIPTWITNESS, PSBT_OUT_REDEEM_SCRIPT, PSBT_OUT_WITNESS_SCRIPT, PSBT_OUT_BIP32_DERIVATION, PSBT_OUT_SCRIPT, PSBT_OUT_AMOUNT, PSBT_GLOBAL_VERSION, PSBT_GLOBAL_TX_MODIFIABLE, PSBT_GLOBAL_OUTPUT_COUNT, PSBT_GLOBAL_INPUT_COUNT, PSBT_GLOBAL_FALLBACK_LOCKTIME, PSBT_GLOBAL_TX_VERSION, PSBT_IN_PREVIOUS_TXID, PSBT_IN_OUTPUT_INDEX, PSBT_IN_SEQUENCE, PSBT_IN_REQUIRED_TIME_LOCKTIME, PSBT_IN_REQUIRED_HEIGHT_LOCKTIME, PSBT_GLOBAL_GENERIC_SIGNED_MESSAGE, MAX_PATH_DEPTH, MAX_SIGNERS, AF_P2WSH_P2SH, AF_P2TR, AF_P2WSH, AF_P2SH, AF_CLASSIC, AF_P2WPKH_P2SH, AF_P2WPKH, AF_BARE_PK ) # transaction version error TX_VER_ERR = "bad txn version" NO_KEY_ERR = "None of the keys involved in this transaction belong to this Coldcard" # single sha256 of b'BIP0322-signed-message' BIP322_TAG_HASH = b'te\x84\xa1\x87/\xa1\x00AUN\xff\xa08\xd6\x12IB\xddy\xb4\xe5\x8aL\xda\x18N\x13\xdb\xe6,I' def build_bip322_to_spend(msg_hash, message_challenge): to_spend = CTransaction() to_spend.nVersion = 0 to_spend.vin = [CTxIn(COutPoint(hash=0, n=0xffffffff), scriptSig=b'\x00\x20' + msg_hash)] to_spend.vout = [CTxOut(0, message_challenge)] return to_spend # PSBT proprietary keytype PSBT_PROPRIETARY = const(0xFC) # PSBT proprietary identifier for Coinkite applications PSBT_PROP_CK_ID = b"COINKITE" # PSBT proprietary subtype for attestation entries PSBT_ATTESTATION_SUBTYPE = const(0) # Max miner's fee, as percentage of output value, that we will allow to be signed. # Amounts over 5% are warned regardless. DEFAULT_MAX_FEE_PERCENTAGE = const(10) # print some things, sometimes DEBUG = ckcc.is_simulator() class HashNDump: def __init__(self, d=None): self.rv = sha256() print('Hashing: ', end='') if d: self.update(d) def update(self, d): print(b2a_hex(d), end=' ') self.rv.update(d) def digest(self): print(' END') return self.rv.digest() def seq_to_str(seq): # take a set or list of numbers and show a tidy list in order. return ', '.join(str(i) for i in sorted(seq)) def _skip_n_objs(fd, n, cls): # skip N sized objects in the stream, for example a vectors of CTxIns # - returns starting position if cls == 'CTxIn': # output point(hash, n) + script sig + locktime pat = [32+4, None, 4] elif cls == 'CTxOut': # nValue + Script pat = [8, None] else: raise ValueError(cls) rv = fd.tell() for i in range(n): for p in pat: if p is None: # variable-length part sz = deser_compact_size(fd) fd.seek(sz, 1) else: fd.seek(p, 1) return rv def calc_txid(fd, poslen, body_poslen=None): # Given the (pos,len) of a transaction in a file, return the txid for that txn. # - doesn't validate data # - does detect witness txn vs. old style # - simple double-sha256() if old style txn, otherwise witness data must be carefully skipped # see if witness encoding in effect fd.seek(poslen[0]) txn_version, marker, flags = unpack(" ll: here = ll rv.update(memoryview(tmp)[0:here]) ll -= here if hasher: return return ngu.hash.sha256s(rv.digest()) def decode_prop_key(key): # decodes a proprietary (0xFC) key and breaks it down into: # - identifier # - subtype # - keydata with BytesIO(key) as fd: identifier_len = deser_compact_size(fd) identifier = fd.read(identifier_len) subtype = deser_compact_size(fd) keydata = fd.read() return identifier, subtype, keydata def encode_prop_key(identifier, subtype, keydata = b''): # encodes a proprietary (0xFC) key into bytes key = b'' key += ser_compact_size(len(identifier)) key += identifier key += ser_compact_size(subtype) key += keydata return key class psbtProxy: # store offsets to values, but track the keys in-memory. short_values = () no_keys = () # these fields will return None but are not stored unless a value is set blank_flds = ('unknown', ) def __init__(self): self.fd = None def __getattr__(self, nm): if nm in self.blank_flds: return None raise AttributeError(nm) def parse(self, fd): self.fd = fd while 1: ks = deser_compact_size(fd) if ks is None: break if ks == 0: break key = fd.read(ks) vs = deser_compact_size(fd) assert vs is not None, 'eof' kt = key[0] if kt in self.no_keys: assert len(key) == 1 # not expecting key # storing offset and length only! Mostly. if kt in self.short_values: actual = fd.read(vs) self.store(kt, bytes(key), actual) else: # skip actual data for now # TODO: could this be stored more compactly? proxy = (fd.tell(), vs) fd.seek(vs, 1) self.store(kt, bytes(key), proxy) def write(self, out_fd, ktype, val, key=b''): # serialize helper: write w/ size and key byte out_fd.write(ser_compact_size(1 + len(key))) out_fd.write(bytes([ktype]) + key) if isinstance(val, tuple): (pos, ll) = val out_fd.write(ser_compact_size(ll)) self.fd.seek(pos) while ll: t = self.fd.read(min(64, ll)) out_fd.write(t) ll -= len(t) elif isinstance(val, list): # for subpaths lists (LE32 ints) assert ktype in (PSBT_IN_BIP32_DERIVATION, PSBT_OUT_BIP32_DERIVATION) out_fd.write(ser_compact_size(len(val) * 4)) for i in val: out_fd.write(pack(' [xfp, *path] # - will be single entry for non-p2sh ins and outs if not self.subpaths: return 0 if self.num_our_keys is not None: # already been here once return self.num_our_keys num_ours = 0 for pk in self.subpaths: assert len(pk) in {33, 65}, "hdpath pubkey len" if len(pk) == 33: assert pk[0] in {0x02, 0x03}, "uncompressed pubkey" vl = self.subpaths[pk][1] assert (vl % 4) == 0, 'corrupt key path' assert (vl//4) <= MAX_PATH_DEPTH, 'too deep' # promote to a list of ints v = self.get(self.subpaths[pk]) here = list(unpack_from('<%dI' % (vl//4), v)) # Tricky & Useful: if xfp of zero is observed in file, assume that's a # placeholder for my XFP value. Replace on the fly. Great when master # XFP is unknown because PSBT built from derived XPUB only. Also privacy. if here[0] == 0: here[0] = my_xfp if not any(True for k,_ in parent.warnings if 'XFP' in k): parent.warnings.append(('Zero XFP', 'Assuming XFP of zero should be replaced by correct XFP')) # update in place self.subpaths[pk] = here if here[0] == my_xfp: num_ours += 1 elif pk in parent.wif_store: num_ours += 1 else: # Address that isn't based on my seed; might be another leg in a p2sh, # or an input we're not supposed to be able to sign... and that's okay. pass self.num_our_keys = num_ours return num_ours # Track details of each output of PSBT # class psbtOutputProxy(psbtProxy): no_keys = { PSBT_OUT_REDEEM_SCRIPT, PSBT_OUT_WITNESS_SCRIPT } blank_flds = ('unknown', 'subpaths', 'redeem_script', 'witness_script', 'is_change', 'num_our_keys', 'amount', 'script', 'attestation') def __init__(self, fd, idx): super().__init__() # things we track #self.subpaths = None # a dictionary if non-empty #self.redeem_script = None #self.witness_script = None #self.script = None #self.amount = None # this flag is set when we are assuming output will be change (same wallet) #self.is_change = False self.parse(fd) def store(self, kt, key, val): # do not forget that key[0] includes kt (type) if kt == PSBT_OUT_BIP32_DERIVATION: if not self.subpaths: self.subpaths = {} self.subpaths[key[1:]] = val elif kt == PSBT_OUT_REDEEM_SCRIPT: self.redeem_script = val elif kt == PSBT_OUT_WITNESS_SCRIPT: self.witness_script = val elif kt == PSBT_OUT_SCRIPT: self.script = val elif kt == PSBT_OUT_AMOUNT: self.amount = val elif kt == PSBT_PROPRIETARY: prefix, subtype, keydata = decode_prop_key(key[1:]) # examine only Coinkite proprietary keys if prefix == PSBT_PROP_CK_ID: if subtype == PSBT_ATTESTATION_SUBTYPE: # prop key for attestation does not have keydata because the # value is a recoverable signature (already contains pubkey) self.attestation = self.get(val) else: self.unknown = self.unknown or {} if key in self.unknown: raise FatalPSBTIssue("Duplicate key. Key for unknown value already provided in output.") self.unknown[key] = val def serialize(self, out_fd, is_v2): wr = lambda *a: self.write(out_fd, *a) if self.subpaths: for k in self.subpaths: wr(PSBT_OUT_BIP32_DERIVATION, self.subpaths[k], k) if self.redeem_script: wr(PSBT_OUT_REDEEM_SCRIPT, self.redeem_script) if self.witness_script: wr(PSBT_OUT_WITNESS_SCRIPT, self.witness_script) if is_v2: wr(PSBT_OUT_SCRIPT, self.script) wr(PSBT_OUT_AMOUNT, self.amount) if self.attestation: wr(PSBT_PROPRIETARY, self.attestation, encode_prop_key(PSBT_PROP_CK_ID, PSBT_ATTESTATION_SUBTYPE)) if self.unknown: for k, v in self.unknown.items(): wr(k[0], v, k[1:]) def validate(self, out_idx, txo, my_xfp, active_multisig, parent): # Do things make sense for this output? # NOTE: We might think it's a change output just because the PSBT # creator has given us a key path. However, we must be **very** # careful and fully validate all the details. # - no output info is needed, in general, so # any output info provided better be right, or fail as "fraud" # - full key derivation and validation is done during signing, and critical. # - we raise fraud alarms, since these are not innocent errors # num_ours = self.parse_subpaths(my_xfp, parent) # - must match expected address for this output, coming from unsigned txn af, addr_or_pubkey, is_segwit = txo.get_address() if (num_ours == 0) or (af in [AF_P2TR, OP_RETURN, None]): # num_ours == 0 # - not considered fraud because other signers looking at PSBT may have them # - user will see them as normal outputs, which they are from our PoV. # OP_RETURN # - nothing we can do with anchor outputs # UNKNOWN # - scripts that we do not understand # P2TR # - unsupported, will be properly rendered as address (no change check) return af if len(self.subpaths) == 1: # p2pk, p2pkh, p2wpkh cases expect_pubkey, = self.subpaths.keys() else: # p2wsh/p2sh cases need full set of pubkeys, and therefore redeem script expect_pubkey = None if af == AF_BARE_PK: # output is public key (not a hash, much less common) assert len(addr_or_pubkey) in (33, 65) # compressed or uncompressed if addr_or_pubkey != expect_pubkey: raise FraudulentChangeOutput(out_idx, "P2PK change output is fraudulent") self.is_change = True return af # Figure out what the hashed addr should be pkh = addr_or_pubkey if af in [AF_P2SH, AF_P2WSH]: # P2SH or Multisig output # Can be both, or either one depending on address type redeem_script = self.get(self.redeem_script) if self.redeem_script else None witness_script = self.get(self.witness_script) if self.witness_script else None if expect_pubkey: # num_ours == 1 and len(subpaths) == 1, single sig, we only allow p2sh-p2wpkh if not redeem_script: # Perhaps an omission, so let's not call fraud on it # But definitely required, else we don't know what script we're sending to. raise FatalPSBTIssue("Missing redeem script for output #%d" % out_idx) target_spk, _ = chains.current_chain().script_pubkey(AF_P2WPKH_P2SH, pubkey=expect_pubkey) if not is_segwit and len(redeem_script) == 22 and \ redeem_script[0] == 0 and redeem_script[1] == 20 and \ txo.scriptPubKey == target_spk: # it's actually segwit p2wpkh inside p2sh pkh = redeem_script[2:22] expect_pkh = hash160(expect_pubkey) else: # unknown or wrong script # p2sh-p2pkh also fall into this category expect_pkh = None else: # Multisig change output, for wallet we're supposed to be a part of. # - our key must be part of it # - must look like input side redeem script (same fingerprints) # - assert M/N structure of output to match any inputs we have signed in PSBT! # - assert all provided pubkeys are in redeem script, not just ours # - we get all of that by re-constructing the script from our wallet details if not redeem_script and not witness_script: # Perhaps an omission, so let's not call fraud on it # But definitely required, else we don't know what script we're sending to. raise FatalPSBTIssue( "Missing redeem/witness script for multisig output #%d" % out_idx ) # it cannot be change if it doesn't precisely match our multisig setup if not active_multisig: # - might be a p2sh output for another wallet that isn't us # - not fraud, just an output with more details than we need. self.is_change = False return af if MultisigWallet.disable_checks: # Without validation, we have to assume all outputs # will be taken from us, and are not really change. self.is_change = False return af if (af == AF_P2SH) and (redeem_script and witness_script) and \ (len(redeem_script) == 34) and \ (redeem_script[0]) == 0 and (redeem_script[1] == 32): # can also check if redeem script hashes to hash160 and compare with scriptPubKey af = AF_P2WSH_P2SH # no need to proceed to script verification if address format does not match if af != active_multisig.addr_fmt: self.is_change = False return af # redeem script must be exactly what we expect # - pubkeys will be reconstructed from derived paths here # - BIP-45, BIP-67 rules applied (BIP-67 optional from now - depending on imported descriptor) # - p2sh-p2wsh needs witness script here, not redeem script value # - if details provided in output section, must our match multisig wallet try: active_multisig.validate_script(witness_script or redeem_script, subpaths=self.subpaths) except BaseException as exc: raise FraudulentChangeOutput(out_idx, "P2WSH or P2SH change output script: %s" % exc) if is_segwit: # p2wsh case # - need witness script and check it's hash against proposed p2wsh value assert len(addr_or_pubkey) == 32 expect_wsh = ngu.hash.sha256s(witness_script) if expect_wsh != addr_or_pubkey: raise FraudulentChangeOutput(out_idx, "P2WSH witness script has wrong hash") self.is_change = True return af if witness_script: # p2sh-p2wsh case (because it had witness script) expect_rs = b'\x00\x20' + ngu.hash.sha256s(witness_script) if redeem_script and expect_rs != redeem_script: # iff they provide a redeeem script, then it needs to match # what we expect it to be raise FraudulentChangeOutput(out_idx, "P2SH-P2WSH redeem script provided, and doesn't match") expect_pkh = hash160(expect_rs) else: # old BIP-16 style; looks like payment addr expect_pkh = hash160(redeem_script) elif af in [AF_CLASSIC, AF_P2WPKH]: # input is hash160 of a single public key assert len(addr_or_pubkey) == 20 expect_pkh = hash160(expect_pubkey) else: # we don't know how to "solve" this type of input return af if pkh != expect_pkh: raise FraudulentChangeOutput(out_idx, "Change output is fraudulent") # We will check pubkey value at the last second, during signing. self.is_change = True return af # Track details of each input of PSBT # class psbtInputProxy(psbtProxy): # just need to store a simple number for these short_values = { PSBT_IN_SIGHASH_TYPE } # only part-sigs have a key to be stored. no_keys = { PSBT_IN_NON_WITNESS_UTXO, PSBT_IN_WITNESS_UTXO, PSBT_IN_SIGHASH_TYPE, PSBT_IN_REDEEM_SCRIPT, PSBT_IN_WITNESS_SCRIPT, PSBT_IN_FINAL_SCRIPTSIG, PSBT_IN_FINAL_SCRIPTWITNESS } blank_flds = ( 'unknown', 'utxo', 'witness_utxo', 'sighash', 'redeem_script', 'witness_script', 'fully_signed', 'is_segwit', 'is_multisig', 'is_p2sh', 'num_our_keys', 'required_key', 'scriptSig', 'amount', 'scriptCode', 'previous_txid', 'prevout_idx', 'sequence', 'req_time_locktime', 'req_height_locktime', 'addr_fmt', 'wif_redeem_script', ) def __init__(self, fd, idx): super().__init__() #self.utxo = None #self.witness_utxo = None self.part_sigs = {} self.added_sigs = {} # signature that CC added (clearly separated from what can be already in part_sigs) #self.sighash = None self.subpaths = {} # will typically be non-empty for all inputs #self.redeem_script = None #self.witness_script = None # Non-zero if one or more of our signing keys involved in input #self.num_our_keys = None # things we've learned #self.fully_signed = False # we can't really learn this until we take apart the UTXO's scriptPubKey #self.is_segwit = None #self.is_multisig = None #self.is_p2sh = False #self.required_key = None # which of our keys will be used to sign input #self.scriptSig = None #self.amount = None #self.scriptCode = None # only expected for segwit inputs # after signing, we'll have a signature to add to output PSBT #self.previous_txid = None #self.prevout_idx = None #self.sequence = None #self.req_time_locktime = None #self.req_height_locktime = None #self.addr_fmt = None address format as decided by determine_my signing key self.parse(fd) def has_relative_timelock(self, txin): # https://github.com/bitcoin/bips/blob/master/bip-0068.mediawiki SEQUENCE_LOCKTIME_DISABLE_FLAG = (1 << 31) SEQUENCE_LOCKTIME_TYPE_FLAG = (1 << 22) SEQUENCE_LOCKTIME_MASK = 0x0000ffff SEQUENCE_LOCKTIME_GRANULARITY = 9 is_timebased = False if txin.nSequence & SEQUENCE_LOCKTIME_DISABLE_FLAG: # RTL disabled return if txin.nSequence & SEQUENCE_LOCKTIME_TYPE_FLAG: # Time-based relative lock-time is_timebased = True res = (txin.nSequence & SEQUENCE_LOCKTIME_MASK) << SEQUENCE_LOCKTIME_GRANULARITY else: # Block height relative lock-time res = txin.nSequence & SEQUENCE_LOCKTIME_MASK if res == 0: # any locktime that is zero, regardless of MPT or blocks # is always immediately spendable return return is_timebased, res def validate(self, idx, txin, my_xfp, parent): # Validate this txn input: given deserialized CTxIn and maybe witness # TODO: tighten these if self.witness_script: assert self.witness_script[1] >= 30 if self.redeem_script: assert self.redeem_script[1] >= 22 # require path for each addr, check some are ours # rework the pubkey => subpath mapping self.parse_subpaths(my_xfp, parent) if self.part_sigs: # How complete is the set of signatures so far? # - assuming PSBT creator doesn't give us extra data not required # - seems harmless if they fool us into thinking already signed; we do nothing # - could also look at pubkey needed vs. sig provided # - structure of MofN is considered in determine_my_signing_key where fully_signed is updated self.fully_signed = (len(self.part_sigs) >= len(self.subpaths)) else: # No signatures at all yet for this input (typical non multisig) self.fully_signed = False if self.utxo: # Important: they might be trying to trick us with an un-related # funding transaction (UTXO) that does not match the input signature we're making # (but if it's segwit, the ploy wouldn't work, Segwit FtW) # - challenge: it's a straight dsha256() for old serializations, but not for newer # segwit txn's... plus I don't want to deserialize it here. try: observed = uint256_from_str(calc_txid(self.fd, self.utxo)) except: raise AssertionError("Trouble parsing UTXO given for input #%d" % idx) assert txin.prevout.hash == observed, "utxo hash mismatch for input #%d" % idx def handle_none_sighash(self): if self.sighash is None: self.sighash = SIGHASH_ALL def has_utxo(self): # do we have a copy of the corresponding UTXO? return bool(self.utxo) or bool(self.witness_utxo) def guess_multisig_addr_fmt(self): # based on provided input scripts (witness/redeem) if self.witness_script and not self.redeem_script: return AF_P2WSH elif self.witness_script and self.redeem_script: return AF_P2WSH_P2SH else: return AF_P2SH def get_utxo(self, idx): # Load up the TxOut for specific output of the input txn associated with this in PSBT # Aka. the "spendable" for this input #. # - preserve the file pointer # - nValue needed for total_value_in, but all fields needed for signing # fd = self.fd old_pos = fd.tell() if self.utxo: # skip over all the parts of the txn we don't care about, without # fully parsing it... pull out a single TXO fd.seek(self.utxo[0]) _, marker, flags = unpack("= M: self.fully_signed = True return xfp_paths = list(subpaths.values()) xfp_paths.sort() # only search wallets with correct script type (aka address format) if not psbt.active_multisig: # search for multisig wallet wal = MultisigWallet.find_match(M, N, xfp_paths, [self.addr_fmt]) if not wal: raise FatalPSBTIssue('Unknown multisig wallet') psbt.active_multisig = wal else: # check consistent w/ already selected wallet psbt.active_multisig.assert_matching(M, N, xfp_paths, self.addr_fmt) # validate redeem script, by disassembling it and checking all pubkeys try: psbt.active_multisig.validate_script(redeem_script, subpaths=subpaths) target_spk, _ = chains.current_chain().script_pubkey(self.addr_fmt, script=redeem_script) assert target_spk == utxo.scriptPubKey, "spk mismatch" except BaseException as exc: # sys.print_exception(exc) raise FatalPSBTIssue('Input #%d: %s' % (my_idx, exc)) if not which_key and DEBUG: print("no key: input #%d: type=%s segwit=%d a_or_pk=%s scriptPubKey=%s" % ( my_idx, chains.addr_fmt_str(self.addr_fmt), self.is_segwit or 0, b2a_hex(addr_or_pubkey), b2a_hex(utxo.scriptPubKey))) self.required_key = which_key if self.is_segwit: if self.addr_fmt in [AF_P2WPKH, AF_P2WPKH_P2SH]: # This comment from : # # Please note that for a P2SH-P2WPKH, the scriptCode is always 26 # bytes including the leading size byte, as 0x1976a914{20-byte keyhash}88ac, # NOT the redeemScript nor scriptPubKey # # Also need this scriptCode for native segwit p2pkh # assert not self.is_multisig self.scriptCode = b'\x19\x76\xa9\x14' + addr + b'\x88\xac' elif not self.scriptCode: # Segwit P2SH. We need the witness script to be provided. if not self.witness_script: raise FatalPSBTIssue('Need witness script for input #%d' % my_idx) # "scriptCode is witnessScript preceeded by a # compactSize integer for the size of witnessScript" self.scriptCode = ser_string(self.get(self.witness_script)) # Could probably free self.subpaths and self.redeem_script now, but only if we didn't # need to re-serialize as a PSBT. def store(self, kt, key, val): # Capture what we are interested in. if kt == PSBT_IN_NON_WITNESS_UTXO: self.utxo = val elif kt == PSBT_IN_WITNESS_UTXO: self.witness_utxo = val elif kt == PSBT_IN_PARTIAL_SIG: self.part_sigs[key[1:]] = val elif kt == PSBT_IN_BIP32_DERIVATION: self.subpaths[key[1:]] = val elif kt == PSBT_IN_REDEEM_SCRIPT: self.redeem_script = val elif kt == PSBT_IN_WITNESS_SCRIPT: self.witness_script = val elif kt == PSBT_IN_SIGHASH_TYPE: self.sighash = unpack(' 0, "no ins?" self.num_inputs = num_in # all the ins are in sequence starting at this position self.vin_start = _skip_n_objs(fd, num_in, 'CTxIn') # next is outputs self.num_outputs = deser_compact_size(fd) self.vout_start = _skip_n_objs(fd, self.num_outputs, 'CTxOut') end_pos = sum(self.txn) # remainder is the witness data, and then the lock time if self.had_witness: # we'll need to come back to this pos if we # want to read the witness data later. self.wit_start = _skip_n_objs(fd, num_in, 'CTxInWitness') # we are at end of outputs, and no witness data, so locktime is here self._lock_time = unpack("= 1 xfp_paths.append(h) if h[0] == self.my_xfp: has_mine += 1 if not has_mine: raise FatalPSBTIssue('My XFP not involved') candidates = MultisigWallet.find_candidates(xfp_paths) if len(candidates) == 1: # exact match (by xfp+deriv set) .. normal case self.active_multisig = candidates[0] else: # don't want to guess M if not needed, but we need it af, M, N = self.guess_M_of_N() if not N: # not multisig, but we can still verify: # - XFP should be one of ours (checked above). # - too slow to re-derive it here, so nothing more to validate at this point return assert N == len(xfp_paths) for c in candidates: if c.M == M and c.N == N: self.active_multisig = c break # if not active_multisig set in this loop # appropriate candidate was not found # --> continue to import from psbt prompt del candidates if not self.active_multisig: # Maybe create wallet, for today, forever, or fail, etc. proposed, need_approval = MultisigWallet.import_from_psbt(af, M, N, self.xpubs) if need_approval: # do a complex UX sequence, which lets them save new wallet from glob import hsm_active if hsm_active: raise FatalPSBTIssue("MS enroll not allowed in HSM mode") ch = await proposed.confirm_import() if ch != 'y': raise FatalPSBTIssue("Refused to import new wallet") self.active_multisig = proposed else: # Validate good match here. The xpubs must be exactly right, but # we're going to use our own values from setup time anyway and not trusting # new values without user interaction. # Check: # - chain codes match what we have stored already # - pubkey vs. path will be checked later # - xfp+path already checked above when selecting wallet # Any issue here is a fraud attempt in some way, not innocent. self.active_multisig.validate_psbt_xpubs(self.xpubs) if not self.active_multisig: # not clear if an error... might be part-way to importing, and # the data is optional anyway, etc. If they refuse to import, # we should not reach this point (ie. raise something to abort signing) return def ux_relative_timelocks(self, tb, bb): # visualize 10 largest timelock to user # when signing a tx MAX_SHOW = 10 num_tb = len(tb) num_bb = len(bb) if (num_tb + num_bb) > MAX_SHOW: # 10 from each is enough for us to have in memory tb = sorted(tb, key=lambda item: item[1], reverse=True)[:10] bb = sorted(bb, key=lambda item: item[1], reverse=True)[:10] if (num_tb >= 5) and (num_bb >= 5): # 5 biggest from each tb = tb[:5] bb = bb[:5] else: if num_tb < num_bb: tb = tb[:num_tb] bb = bb[:(MAX_SHOW - num_tb)] else: bb = bb[:num_bb] tb = tb[:(MAX_SHOW - num_bb)] if num_bb: # Block height relative lock-time if num_bb == 1: idx, val = bb[0] msg = "Input %d. has relative block height timelock of %d blocks\n" % ( idx, val ) elif all(bb[0][1] == i[1] for i in bb): msg = "%d inputs have relative block height timelock of %d blocks\n" % ( num_bb, bb[0][1] ) else: msg = "%d inputs have relative block height timelock." % num_bb if num_bb > len(bb): msg += " Showing only %d with highest values." % len(bb) msg += "\n\n" for idx, num_blocks in bb: msg += " %d. %d blocks\n" % (idx, num_blocks) self.ux_notes.append(("Block height RTL", msg)) if num_tb: # Block height relative lock-time if num_tb == 1: idx, val = tb[0] val = seconds2human_readable(val) msg = "Input %d. has relative time-based timelock of:\n %s\n" % ( idx, val ) elif all(tb[0][1] == i[1] for i in tb): msg = "%d inputs have relative time-based timelock of:\n %s\n" % ( num_tb, seconds2human_readable(tb[0][1]) ) else: msg = "%d inputs have relative time-based timelock." % num_tb if num_tb > len(tb): msg += " Showing only %d with highest values." % len(tb) msg += "\n\n" for idx, seconds in tb: hr = seconds2human_readable(seconds) msg += " %d. %s\n" % (idx, hr) self.ux_notes.append(("Time-based RTL", msg)) async def validate(self): # Do a first pass over the txn. Raise assertions, be terse tho because # these messages are rarely seen. These are syntax/fatal errors. # if self.version is not None: # verision is provided in PSBT - take it as given assert self.version in (0,2) else: # PSBT version is not defined # global unsigned tx is only allowed in v0 self.version = 2 if self.txn is None else 0 self.is_v2 = self.version is not None and self.version >= 2 if self.is_v2: assert self.has_gic, "v2 requires global input count" assert self.has_goc, "v2 requires global output count" assert self.has_gtv, "v2 requires global txn version" assert self.txn is None, "v2 requires exclusion of global unsigned tx" else: assert not self.has_gic, "v0 requires exclusion of global input count" assert not self.has_goc, "v0 requires exclusion of global output count" assert not self.has_gtv, "v0 requires exclusion of global txn version" assert self.txn, "v0 requires inclusion of global unsigned tx" # smallest possible Proof of Reserves transaction has 61 bytes assert self.txn[1] > 60, 'txn too short' assert self.fallback_locktime is None, "v0 requires exclusion of global fallback locktime" assert self.txn_modifiable is None, "v0 requires exclusion of global txn modifiable" num_outs = 0 null_data_op_return = False for idx, txo in self.output_iter(): num_outs += 1 out = self.outputs[idx] if self.is_v2: # v2 requires inclusion assert out.amount is not None assert out.script else: # v0 requires exclusion assert out.amount is None assert out.script is None if txo.nValue == 0 and txo.scriptPubKey == b'\x6a': null_data_op_return = True if null_data_op_return and (num_outs == 1): assert self.por322_msg, "msg" self.por322 = bool(self.por322_msg) if self.por322: if len(self.por322_msg) != len(self.por322_msg.encode()): self.warnings.append(( "Message", "Message contains non-ASCII characters that may not be readable on this screen." )) if self.txn_version == 0: # only allow txn version 0 for Proof of Reserves txn (BIP-322) assert self.por322, TX_VER_ERR if self.por322: assert self.txn_version in {0, 2}, TX_VER_ERR # time based relative locks tb_rel_locks = [] # block height based relative locks bb_rel_locks = [] smallest_nsequence = 0xffffffff # this parses the input TXN in-place for idx, txin in self.input_iter(): inp = self.inputs[idx] if self.is_v2: # v2 requires inclusion assert inp.prevout_idx is not None assert inp.previous_txid if inp.req_time_locktime is not None: assert inp.req_time_locktime >= NLOCK_IS_TIME if inp.req_height_locktime is not None: assert 0 < inp.req_height_locktime < NLOCK_IS_TIME else: # v0 requires exclusion assert inp.prevout_idx is None assert inp.previous_txid is None assert inp.sequence is None assert inp.req_time_locktime is None assert inp.req_height_locktime is None self.inputs[idx].validate(idx, txin, self.my_xfp, self) if self.txn_version >= 2: has_rtl = self.inputs[idx].has_relative_timelock(txin) if has_rtl: if has_rtl[0]: tb_rel_locks.append((idx, has_rtl[1])) else: bb_rel_locks.append((idx, has_rtl[1])) if txin.nSequence < smallest_nsequence: smallest_nsequence = txin.nSequence if isinstance(self.lock_time, int) and self.lock_time > 0: if smallest_nsequence == 0xffffffff: self.warnings.append(( "Bad Locktime", "Locktime has no effect! None of the nSequences decremented." )) else: msg = "This tx can only be spent after " if self.lock_time < NLOCK_IS_TIME: msg += "block height of %d" % self.lock_time else: try: dt = datetime_from_timestamp(self.lock_time) msg += datetime_to_str(dt) except: msg += "%d (unix timestamp)" % self.lock_time msg += " (MTP)" # median time past msg += "\n" self.ux_notes.append(("Abs Locktime", msg)) # create UX for users about tx level relative timelocks (nSequence) self.ux_relative_timelocks(tb_rel_locks, bb_rel_locks) assert len(self.inputs) == self.num_inputs, 'ni mismatch' # if multisig xpub details provided, they better be right and/or offer import if self.xpubs: await self.handle_xpubs() assert self.num_outputs >= 1, 'need outputs' if DEBUG: our_keys = sum(1 for i in self.inputs if i.num_our_keys) print("PSBT: %d inputs, %d output, %d fully-signed, %d ours" % ( self.num_inputs, self.num_outputs, sum(1 for i in self.inputs if i and i.fully_signed), our_keys)) def consider_outputs(self): # scan ouputs: # - is it a change address, defined by redeem script (p2sh) or key we know is ours # - mark change outputs, so perhaps we don't show them to users total_out = 0 total_change = 0 num_op_return = 0 num_op_return_size = 0 num_unknown_scripts = 0 zero_val_outs = 0 # only those that are not OP_RETURN are considered self.num_change_outputs = 0 for idx, txo in self.output_iter(): output = self.outputs[idx] # perform output validation af = output.validate(idx, txo, self.my_xfp, self.active_multisig, self) assert txo.nValue >= 0, "negative output value: o%d" % idx total_out += txo.nValue if (txo.nValue == 0) and (af != OP_RETURN): # OP_RETURN outputs have nValue=0 standard zero_val_outs += 1 if output.is_change: self.num_change_outputs += 1 total_change += txo.nValue if af == OP_RETURN: num_op_return += 1 if len(txo.scriptPubKey) > 83: num_op_return_size += 1 elif af is None: num_unknown_scripts += 1 if self.total_value_out is None: self.total_value_out = total_out else: assert self.total_value_out == total_out, \ '%s != %s' % (self.total_value_out, total_out) if self.total_change_value is None: self.total_change_value = total_change else: assert self.total_change_value == total_change, \ '%s != %s' % (self.total_change_value, total_change) # check fee is reasonable the_fee = self.calculate_fee() if self.por322: # Proof of Reserves - nothing more to check - txn is invalid anyways return if the_fee is None: return if the_fee < 0: raise FatalPSBTIssue("Outputs worth more than inputs!") if self.total_value_out: per_fee = the_fee * 100 / self.total_value_out else: per_fee = 100 fee_limit = settings.get('fee_limit', DEFAULT_MAX_FEE_PERCENTAGE) if fee_limit != -1 and per_fee >= fee_limit: raise FatalPSBTIssue("Network fee bigger than %d%% of total amount (it is %.0f%%)." % (fee_limit, per_fee)) if per_fee >= 5: self.warnings.append(('Big Fee', 'Network fee is more than ' '5%% of total value (%.1f%%).' % per_fee)) if (num_op_return > 1) or num_op_return_size: mm = "" if num_op_return > 1: mm += "\nMultiple OP_RETURN outputs: %d" % num_op_return if num_op_return_size: mm += "\nOP_RETURN > 80 bytes" self.warnings.append( ("OP_RETURN", "TX may not be relayed by some nodes.%s" % mm)) if num_unknown_scripts: self.warnings.append( ('Output?', 'Sending to %d not well understood script(s).' % num_unknown_scripts) ) if zero_val_outs: self.warnings.append( ('Zero Value', 'Non-standard zero value output(s).') ) self.consolidation_tx = (self.num_change_outputs == self.num_outputs) # Enforce policy related to change outputs self.consider_dangerous_change(self.my_xfp) def consider_dangerous_sighash(self): # Check sighash flags are legal, useful, and safe. Warn about # some risks if user has enabled special sighash values. sh_unusual = False none_sh = False for input in self.inputs: # only if it is our input - one that will be eventually sign if input.num_our_keys: if input.sighash is not None: # All inputs MUST have SIGHASH that we are able to sign. if input.sighash not in ALL_SIGHASH_FLAGS: raise FatalPSBTIssue("Unsupported sighash flag 0x%x" % input.sighash) if self.por322 and input.sighash != SIGHASH_ALL: raise FatalPSBTIssue("POR not SIGHASH_ALL") if input.sighash != SIGHASH_ALL: sh_unusual = True if input.sighash in (SIGHASH_NONE, SIGHASH_NONE|SIGHASH_ANYONECANPAY): none_sh = True if sh_unusual and not settings.get("sighshchk"): if self.consolidation_tx: # policy: all inputs must be sighash ALL in purely consolidation txn raise FatalPSBTIssue("Only sighash ALL is allowed for pure consolidation transactions.") if none_sh: # sighash NONE or NONE|ANYONECANPAY is proposed: block raise FatalPSBTIssue("Sighash NONE is not allowed as funds could be going anywhere.") if none_sh: self.warnings.append( ("Danger", "Destination address can be changed after signing (sighash NONE).") ) elif sh_unusual: self.warnings.append( ("Caution", "Some inputs have unusual SIGHASH values not used in typical cases.") ) def consider_dangerous_change(self, my_xfp): # Enforce some policy on change outputs: # - need to "look like" they are going to same wallet as inputs came from # - range limit last two path components (numerically) # - same pattern of hard/not hardened components # - MAX_PATH_DEPTH already enforced before this point # in_paths = [] for inp in self.inputs: if inp.fully_signed: continue if not inp.required_key: continue if not inp.subpaths: continue # not expected if we're signing it for path in inp.subpaths.values(): if path[0] == my_xfp: in_paths.append(path[1:]) if not in_paths: # We aren't adding any signatures? Can happen but we're going to be # showing a warning about that elsewhere. return shortest = min(len(i) for i in in_paths) longest = max(len(i) for i in in_paths) if shortest != longest or shortest <= 2: # We aren't seeing shared input path lengths. # They are probbably doing weird stuff, so leave them alone. return # Assumption: hard/not hardened depths will match for all address in wallet def hard_bits(p): return [bool(i & 0x80000000) for i in p] # Assumption: common wallets modulate the last two components only # of the path. Typically m/.../change/index where change is {0, 1} # and index changes slowly over lifetime of wallet (increasing) path_len = shortest path_prefix = in_paths[0][0:-2] idx_max = max(i[-1]&0x7fffffff for i in in_paths) + 200 hard_pattern = hard_bits(in_paths[0]) probs = [] for nout, out in enumerate(self.outputs): if not out.is_change: continue # it's a change output, okay if a p2sh change; we're looking at paths for path in out.subpaths.values(): if path[0] != my_xfp: continue # possible in p2sh case path = path[1:] if len(path) != path_len: iss = "has wrong path length (%d not %d)" % (len(path), path_len) elif hard_bits(path) != hard_pattern: iss = "has different hardening pattern" elif path[0:len(path_prefix)] != path_prefix: iss = "goes to diff path prefix" elif (path[-2]&0x7fffffff) not in {0, 1}: iss = "2nd last component not 0 or 1" elif (path[-1]&0x7fffffff) > idx_max: iss = "last component beyond reasonable gap" else: # looks ok continue probs.append("Output#%d: %s: %s not %s/{0~1}%s/{0~%d}%s expected" % (nout, iss, keypath_to_str(path, skip=0), keypath_to_str(path_prefix, skip=0), "'" if hard_pattern[-2] else "", idx_max, "'" if hard_pattern[-1] else "", )) break for p in probs: self.warnings.append(('Troublesome Change Outs', p)) def consider_inputs(self, cosign_xfp=None): # Look at the UTXO's that we are spending. Do we have them? Do the # hashes match, and what values are we getting? # Important: parse incoming UTXO to build total input value foreign = [] unverified_witness_utxo = [] total_in = 0 from_wif_store = [] prevouts = set() for i, txi in self.input_iter(): # check for duplicate inputs k = (txi.prevout.hash, txi.prevout.n) if k in prevouts: raise FatalPSBTIssue("Duplicate inputs") if len(prevouts) < 100: prevouts.add(k) inp = self.inputs[i] if not inp.has_utxo(): if inp.num_our_keys and not inp.fully_signed: # we cannot proceed if the input is ours and there is no UTXO raise FatalPSBTIssue('Missing own UTXO(s). Cannot determine value being signed') else: # input clearly not ours foreign.append(i) continue # pull out just the CTXOut object (expensive) utxo = inp.get_utxo(txi.prevout.n) assert utxo.nValue >= 0, "negative input value: i%d" % i total_in += utxo.nValue if not inp.utxo and not inp.witness_utxo_is_provably_segwit(utxo): unverified_witness_utxo.append(i) # Look at what kind of input this will be, and therefore what # type of signing will be required, and which key we need. # - also validates redeem_script when present # - also finds appropriate multisig wallet to be used inp.determine_my_signing_key(i, utxo, self.my_xfp, self, cosign_xfp) if inp.required_key and not inp.is_segwit and not inp.utxo: raise FatalPSBTIssue('Legacy input #%d requires non-witness UTXO' % i) # determine_my_signing_key is updating fully_signed for multisig inputs # based on redeem/witness script if inp.fully_signed: self.presigned_inputs.add(i) if inp.required_key and self.wif_store: is_in = False for pk in inp.required_key if isinstance(inp.required_key, set) else [inp.required_key]: if pk in self.wif_store: is_in = True if is_in: from_wif_store.append(i) # iff to UTXO is segwit, then check it's value, and also # capture that value, since it's supposed to be immutable # Proof of Reserves PSBT must not modify history if inp.is_segwit and not self.por322: history.verify_amount(txi.prevout, inp.amount, i) if self.por322 and (i == 0): # Proof of Reserves 'to_spend' validation try: assert inp.required_key, "not our key" self.validate_bip322_input0(inp, txi, utxo) except Exception as e: raise FatalPSBTIssue("i0: invalid BIP-322 'to_spend': %s" % e) del utxo # XXX scan witness data provided, and consider those ins signed if not multisig? if not foreign and not unverified_witness_utxo: # no foreign inputs, we can calculate the total input value self.total_value_in = total_in assert total_in > 0 or self.por322, "zero value txn" else: # 1+ inputs don't belong to us, we can't calculate the total input value # OK for multi-party transactions (coinjoin etc.) assert not self.por322 # cannot have foreign inputs in POR txn self.total_value_in = None if foreign: self.warnings.append( ("Unable to calculate fee", "Some input(s) haven't provided UTXO(s): " + seq_to_str(foreign)) ) if unverified_witness_utxo: self.warnings.append( ("Unable to calculate fee", "Some input(s) provided unverified witness UTXO(s): " + seq_to_str(unverified_witness_utxo)) ) if len(self.presigned_inputs) == self.num_inputs: # Maybe wrong for multisig cases? Maybe they want to add their # own signature, even tho N of M is satisfied?! raise FatalPSBTIssue('Transaction looks completely signed already?') # We should know pubkey required for each input now. # - but we may not be the signer for those inputs, which is fine. # - TODO: but what if not SIGHASH_ALL no_keys = set( n for n,inp in enumerate(self.inputs) if (inp.required_key is None) and (not inp.fully_signed) ) if len(no_keys) >= self.num_inputs: raise FatalPSBTIssue(NO_KEY_ERR) if no_keys: # This is seen when you re-sign same signed file by accident (multisig) # - case of len(no_keys)==num_inputs is handled by consider_keys self.warnings.append(('Limited Signing', 'We are not signing these inputs, because we do not know the key: ' + seq_to_str(no_keys))) if self.presigned_inputs: # this isn't really even an issue for some complex usage cases self.warnings.append(('Partly Signed Already', 'Some input(s) provided were already completely signed by other parties: ' + seq_to_str(self.presigned_inputs))) if from_wif_store: self.warnings.append(("WIF Store", "Some input(s) use key from the WIF store: " + seq_to_str(from_wif_store))) if MultisigWallet.disable_checks: self.warnings.append(('Danger', 'Some multisig checks are disabled.')) def calculate_fee(self): # what miner's reward is included in txn? if self.total_value_in is None: return None return self.total_value_in - self.total_value_out def consider_keys(self): # check we posess the right keys for the inputs cnt = sum(1 for i in self.inputs if i.num_our_keys) if cnt: return # collect a list of XFP's given in file that aren't ours others = set() for inp in self.inputs: if not inp.subpaths: continue for path in inp.subpaths.values(): others.add(path[0]) if not others: # Can happen w/ Electrum in watch-mode on XPUB. It doesn't know XFP and # so doesn't insert that into PSBT. raise FatalPSBTIssue('PSBT does not contain any key path information.') others.discard(self.my_xfp) msg = ', '.join(xfp2str(i) for i in others) raise FatalPSBTIssue(NO_KEY_ERR + " (need %s, found %s)" % (xfp2str(self.my_xfp), msg)) @classmethod def read_psbt(cls, fd): # read in a PSBT file. Captures fd and keeps it open. hdr = fd.read(5) if hdr != b'psbt\xff': raise ValueError("bad hdr") rv = cls() # read main body (globals) rv.parse(fd) if rv.txn: # learn about the bitcoin transaction we are signing. rv.parse_txn() assert rv.num_inputs is not None assert rv.num_outputs is not None rv.inputs = [psbtInputProxy(fd, idx) for idx in range(rv.num_inputs)] rv.outputs = [psbtOutputProxy(fd, idx) for idx in range(rv.num_outputs)] return rv def serialize(self, out_fd, upgrade_txn=False): # Ouput into a file. wr = lambda *a: self.write(out_fd, *a) out_fd.write(b'psbt\xff') if upgrade_txn and self.is_complete(): # write out the ready-to-transmit txn # - means we are also a PSBT combiner in this case # - hard tho, due to variable length data. # - probably a bad idea, so disabled for now out_fd.write(b'\x01\x00') # keylength=1, key=b'', PSBT_GLOBAL_UNSIGNED_TX with SizerFile() as fd: self.finalize(fd) txn_len = fd.tell() out_fd.write(ser_compact_size(txn_len)) self.finalize(out_fd) else: if not self.is_v2: # can be 0 or None # provide original txn (unchanged) wr(PSBT_GLOBAL_UNSIGNED_TX, self.txn) if self.is_v2: wr(PSBT_GLOBAL_TX_VERSION, pack(' single key which_key = inp.required_key assert not inp.added_sigs, "already done??" if which_key in self.wif_store: node = node_from_privkey(self.wif_store[which_key]) else: assert which_key in inp.subpaths, 'unk key' # get node required node = self.check_pubkey_at_path(sv, inp.subpaths[which_key], which_key) assert node, "Path (%s) led to wrong pubkey for input#%d" % ( keypath_to_str(inp.subpaths[which_key]), in_idx) # track wallet usage OWNERSHIP.note_subpath_used(inp.subpaths[which_key]) if not inp.is_segwit: # Hash by serializing/blanking various subparts of the transaction digest = self.make_txn_sighash(in_idx, txi, inp.sighash) else: # Hash the inputs and such in totally new ways, based on BIP-143 digest = self.make_txn_segwit_sighash(in_idx, txi, inp.amount, inp.scriptCode, inp.sighash) if sv.deltamode: # Current user is actually a thug with a slightly wrong PIN, so we # do have access to the private keys and could sign txn, but we # are going to silently corrupt our signatures. digest = ngu.hash.sha256d(digest) # The precious private key we need pk = node.privkey() #print("privkey %s" % b2a_hex(pk).decode('ascii')) #print(" pubkey %s" % b2a_hex(which_key).decode('ascii')) #print(" digest %s" % b2a_hex(digest).decode('ascii')) der_sig = self.ecdsa_grind_sign(pk, digest, inp.sighash) # private key no longer required stash.blank_object(pk) stash.blank_object(node) del pk, node inp.added_sigs[which_key] = der_sig # Could remove sighash from input object - it is not required, takes space, # and is already in signature or is implicit by not being part of the # signature (taproot SIGHASH_DEFAULT) ## inp.sighash = None if self.is_v2: self.set_modifiable_flag(inp) # drop sighash if default (SIGHASH_ALL) if inp.sighash == SIGHASH_ALL: inp.sighash = None gc.collect() # done. dis.progress_bar_show(1) def set_modifiable_flag(self, inp): # only for PSBTv2 # sighash needs to be properly set on psbtInputProxy object before this runs # TODO possible to also cross-check with sighash from signature: # 1. witnes/scriptSig in serialized tx in PSBT # 2. psbt meta fields partial_sigs, taproot_key_sig and taproot_script_sigs if self.txn_modifiable is None: # set to inputs/outputs modifiable # has SINGLE to false self.txn_modifiable = 3 if not (inp.sighash & SIGHASH_ANYONECANPAY): # Bit 0 is the Inputs Modifiable flag - set to 0 if self.txn_modifiable & 1: self.txn_modifiable &= ~1 out_type = inp.sighash & 0x7f # regardless of ANYONECANPAY if out_type != SIGHASH_NONE: # Bit 1 is the Outputs Modifiable flag - set to 0 if self.txn_modifiable & 2: self.txn_modifiable &= ~2 if out_type == SIGHASH_SINGLE: # Bit 2 is the Has SIGHASH_SINGLE flag - set it to 1 self.txn_modifiable |= 4 def make_txn_sighash(self, replace_idx, replacement, sighash_type): # calculate the hash value for one input of current transaction # - blank all script inputs # - except one single tx in, which is provided # - serialize that without witness data # - sha256 over that fd = self.fd old_pos = fd.tell() # sighash regardless of ANYONECANPAY input part out_sighash_type = sighash_type & 0x7f rv = sha256() # version number rv.update(pack(' # fd = self.fd old_pos = fd.tell() # sighash regardless of ANYONECANPAY input part out_sighash_type = sighash_type & 0x7f if self.hashPrevouts and sighash_type == SIGHASH_ALL: hashPrevouts = self.hashPrevouts hashSequence = self.hashSequence hashOutputs = self.hashOutputs else: # input side hashPrevouts = sha256() hashSequence = sha256() if not (sighash_type & SIGHASH_ANYONECANPAY): for in_idx, txi in self.input_iter(): hashPrevouts.update(txi.prevout.serialize()) if out_sighash_type == SIGHASH_ALL: hashSequence.update(pack("= self.active_multisig.M: return True def is_complete(self): # Are all the inputs (now) signed? # some might have been given as signed signed = len(self.presigned_inputs) # plus we added some signatures for inp in self.inputs: if inp.is_multisig and self.active_multisig: if self.multi_input_complete(inp): signed += 1 elif inp.added_sigs: signed += 1 return signed == self.num_inputs def multisig_signatures(self, inp): assert self.active_multisig # collect all signatures into one place # both we added & those already in part_sigs all_sigs = {} all_sigs.update(inp.added_sigs) for pk, get_data in inp.part_sigs.items(): all_sigs[pk] = self.get(get_data) if self.active_multisig.bip67: # BIP-67 easy just sort by public keys sigs = [sig for pk, sig in sorted(all_sigs.items())] else: # need to respect the order of keys in actual descriptor sigs = [] for xfp, _, _ in self.active_multisig.xpubs: for pk, pth in inp.subpaths.items(): # if xfp matches but pk not in all_sigs -> signer haven't signed # it is ok in threshold multisig - just skip if (xfp == pth[0]) and (pk in all_sigs): sigs.append(all_sigs[pk]) break # save space and only provide necessary amount of signatures (smaller tx, less fees) sigs = sigs[:self.active_multisig.M] return sigs def singlesig_signature(self, inp): # return signature that we added # or one signature from partial sigs if input is fully sign # (i.e. len(part_sigs)>=len(subpaths)) ssig = None if inp.added_sigs: # we have added signature to this single sig input assert len(inp.added_sigs) == 1 ssig = list(inp.added_sigs.items())[0] elif inp.part_sigs and inp.fully_signed: assert len(inp.part_sigs) == 1 rv = list(inp.part_sigs.items())[0] ssig = rv[0], self.get(rv[1]) return ssig def multisig_xfps_needed(self): # provide the set of xfp's that still need to sign PSBT # - used to find which multisig-signer needs to go next rv = set() for inp in self.inputs: for pk, pth in inp.subpaths.items(): if pk in inp.part_sigs: continue if pk in inp.added_sigs: continue rv.add(pth[0]) return rv def finalize(self, fd): # Stream out the finalized transaction, with signatures applied # - raise if not complete already # - returns the TXID of resulting transaction # - but in segwit case, needs to re-read to calculate it # - fd must be read/write and seekable to support txid calc fd.write(pack(' txi.scriptSig = ser_push_data(der_sig) else: # P2PKH: scriptSig is txi.scriptSig = ser_push_data(der_sig) + ser_push_data(pubkey) fd.write(txi.serialize()) # outputs fd.write(ser_compact_size(self.num_outputs)) for out_idx, txo in self.output_iter(): fd.write(txo.serialize()) # capture change output amounts (if segwit) if self.outputs[out_idx].is_change and self.outputs[out_idx].witness_script: history.add_segwit_utxos(out_idx, txo.nValue) body_end = fd.tell() if needs_witness: # witness values # - preserve any given ones, add ours for in_idx, wit in self.input_witness_iter(): inp = self.inputs[in_idx] if inp.is_segwit: # put in new sig: wit is a CTxInWitness assert not wit.scriptWitness.stack, 'replacing non-empty?' if inp.is_multisig: sigs = self.multisig_signatures(inp) wit.scriptWitness.stack = [b""] + sigs + [self.get(inp.witness_script)] else: pubkey, der_sig = self.singlesig_signature(inp) assert pubkey[0] in {0x02, 0x03} and len(pubkey) == 33, "bad v0 pubkey" wit.scriptWitness.stack = [der_sig, pubkey] fd.write(wit.serialize()) # locktime fd.write(pack('