optimize PSBT class more
This commit is contained in:
parent
4439f3d7fe
commit
30457c98cc
@ -358,8 +358,12 @@ class ApproveTransaction(UserAuthorizedAction):
|
||||
await self.psbt.validate() # might do UX: accept multisig import
|
||||
|
||||
ccc_c_xfp = CCCFeature.get_xfp() # can be None
|
||||
self.psbt.consider_inputs(cosign_xfp=ccc_c_xfp)
|
||||
self.psbt.consider_outputs(cosign_xfp=ccc_c_xfp)
|
||||
args = self.psbt.consider_inputs(cosign_xfp=ccc_c_xfp)
|
||||
self.psbt.consider_outputs(*args, cosign_xfp=ccc_c_xfp)
|
||||
del args # not needed anymore
|
||||
# we can properly assess sighash only after we know
|
||||
# which outputs are change
|
||||
self.psbt.consider_dangerous_sighash()
|
||||
|
||||
except FraudulentChangeOutput as exc:
|
||||
# sys.print_exception(exc)
|
||||
@ -921,7 +925,7 @@ async def _save_to_disk(psbt, txid, save_options, is_complete, data_len, output_
|
||||
|
||||
del_after = settings.get('del', 0)
|
||||
|
||||
def _chunk_write(file_d, ofs, chunk=1024):
|
||||
def _chunk_write(file_d, ofs, chunk=2048):
|
||||
written = 0
|
||||
while written < data_len:
|
||||
if (written + chunk) > data_len:
|
||||
|
||||
485
shared/psbt.py
485
shared/psbt.py
@ -295,6 +295,7 @@ class psbtProxy:
|
||||
return self.fd.read(ll)
|
||||
|
||||
def parse_xfp_path(self, coords):
|
||||
# coords are expected to be value from subpaths or taproot subpaths
|
||||
return list(unpack_from('<%dI' % (coords[1] // 4), self.get(coords)))
|
||||
|
||||
def handle_zero_xfp(self, xfp_path, my_xfp, warnings=None):
|
||||
@ -413,23 +414,22 @@ class psbtOutputProxy(psbtProxy):
|
||||
|
||||
self.parse(fd)
|
||||
|
||||
def parse_taproot_tree(self):
|
||||
if not self.taproot_tree:
|
||||
return
|
||||
length = self.taproot_tree[1]
|
||||
|
||||
res = []
|
||||
while length:
|
||||
tree = BytesIO(self.get(self.taproot_tree))
|
||||
depth = tree.read(1)
|
||||
leaf_version = tree.read(1)[0]
|
||||
assert (leaf_version & ~TAPROOT_LEAF_MASK) == 0
|
||||
script_len, nb = deser_compact_size(tree, ret_num_bytes=True)
|
||||
script = tree.read(script_len)
|
||||
res.append((depth, leaf_version, script))
|
||||
length -= (2 + nb + script_len)
|
||||
|
||||
return res
|
||||
# not needed
|
||||
# def parse_taproot_tree(self):
|
||||
# length = self.taproot_tree[1]
|
||||
#
|
||||
# res = []
|
||||
# while length:
|
||||
# tree = BytesIO(self.get(self.taproot_tree))
|
||||
# depth = tree.read(1)
|
||||
# leaf_version = tree.read(1)[0]
|
||||
# assert (leaf_version & ~TAPROOT_LEAF_MASK) == 0
|
||||
# script_len, nb = deser_compact_size(tree, ret_num_bytes=True)
|
||||
# script = tree.read(script_len)
|
||||
# res.append((depth, leaf_version, script))
|
||||
# length -= (2 + nb + script_len)
|
||||
#
|
||||
# return res
|
||||
|
||||
def store(self, kt, key, val):
|
||||
# do not forget that key[0] includes kt (type)
|
||||
@ -496,7 +496,7 @@ class psbtOutputProxy(psbtProxy):
|
||||
for k, v in self.unknown:
|
||||
wr(None, v, k)
|
||||
|
||||
def validate(self, out_idx, txo, my_xfp, parent, cosign_xfp=None):
|
||||
def determine_my_change(self, out_idx, txo, parsed_subpaths, parent):
|
||||
# Do things make sense for this output?
|
||||
|
||||
# NOTE: We might think it's a change output just because the PSBT
|
||||
@ -506,11 +506,6 @@ class psbtOutputProxy(psbtProxy):
|
||||
# any output info provided better be right, or fail as "fraud"
|
||||
# - full key derivation and validation is done during signing, and critical.
|
||||
# - we raise fraud alarms, since these are not innocent errors
|
||||
#
|
||||
if self.taproot_internal_key:
|
||||
assert self.taproot_internal_key[1] == 32 # "PSBT_OUT_TAP_INTERNAL_KEY length != 32"
|
||||
|
||||
parsed_subpaths = self.parse_subpaths(my_xfp, parent.warnings, cosign_xfp)
|
||||
|
||||
# - must match expected address for this output, coming from unsigned txn
|
||||
af, addr_or_pubkey = txo.get_address()
|
||||
@ -734,58 +729,6 @@ class psbtInputProxy(psbtProxy):
|
||||
|
||||
return is_timebased, res
|
||||
|
||||
def validate(self, idx, txin):
|
||||
# Validate this txn input: given deserialized CTxIn and maybe witness
|
||||
|
||||
# TODO: tighten these
|
||||
if self.witness_script:
|
||||
assert self.witness_script[1] >= 30
|
||||
if self.redeem_script:
|
||||
assert self.redeem_script[1] >= 22
|
||||
|
||||
if self.taproot_internal_key:
|
||||
assert self.taproot_internal_key[1] == 32 # "PSBT_IN_TAP_INTERNAL_KEY length != 32"
|
||||
|
||||
if self.taproot_script_sigs:
|
||||
for k, v in self.taproot_script_sigs:
|
||||
# PSBT_IN_TAP_SCRIPT_SIG + 32 bytes xonly pubkey + leafhash 32 bytes
|
||||
assert k[1] == 64
|
||||
# The 64 or 65 byte Schnorr signature for this pubkey and leaf combination
|
||||
assert v[1] in (64,65)
|
||||
|
||||
if self.taproot_scripts:
|
||||
for k, v in self.taproot_scripts:
|
||||
assert k[1] > 32 # "PSBT_IN_TAP_LEAF_SCRIPT control block is too short"
|
||||
assert (k[1] - 1) % 32 == 0 # "PSBT_IN_TAP_LEAF_SCRIPT control block is not valid"
|
||||
assert v[1] != 0 # "PSBT_IN_TAP_LEAF_SCRIPT cannot be empty"
|
||||
|
||||
if self.part_sigs:
|
||||
# How complete is the set of signatures so far?
|
||||
# - assuming PSBT creator doesn't give us extra data not required
|
||||
# - seems harmless if they fool us into thinking already signed; we do nothing
|
||||
# - could also look at pubkey needed vs. sig provided
|
||||
# - could consider structure of MofN in p2sh cases
|
||||
if len(self.part_sigs) >= len(self.subpaths):
|
||||
self.fully_signed = True
|
||||
|
||||
if self.taproot_key_sig:
|
||||
# "PSBT_IN_TAP_KEY_SIG length != 64 or 65"
|
||||
assert self.taproot_key_sig[1] in (64, 65)
|
||||
self.fully_signed = True
|
||||
|
||||
if self.utxo:
|
||||
# Important: they might be trying to trick us with an un-related
|
||||
# funding transaction (UTXO) that does not match the input signature we're making
|
||||
# (but if it's segwit, the ploy wouldn't work, Segwit FtW)
|
||||
# - challenge: it's a straight dsha256() for old serializations, but not for newer
|
||||
# segwit txn's... plus I don't want to deserialize it here.
|
||||
try:
|
||||
observed = uint256_from_str(calc_txid(self.fd, self.utxo))
|
||||
except:
|
||||
raise AssertionError("Trouble parsing UTXO given for input #%d" % idx)
|
||||
|
||||
assert txin.prevout.hash == observed, "utxo hash mismatch for input #%d" % idx
|
||||
|
||||
def handle_none_sighash(self):
|
||||
if self.sighash is None:
|
||||
self.sighash = SIGHASH_DEFAULT if self.taproot_subpaths else SIGHASH_ALL
|
||||
@ -1624,20 +1567,95 @@ class psbtObject(psbtProxy):
|
||||
|
||||
assert len(self.inputs) == self.num_inputs, 'ni mismatch'
|
||||
|
||||
# if multisig xpub details provided, they better be right and/or offer import
|
||||
if self.xpubs:
|
||||
await self.handle_xpubs()
|
||||
|
||||
assert self.num_outputs >= 1, 'need outputs'
|
||||
|
||||
self.validate_unkonwn(self, "global namespace")
|
||||
|
||||
if DEBUG:
|
||||
print("PSBT: %d inputs, %d output, %d fully-signed" % (
|
||||
self.num_inputs, self.num_outputs,
|
||||
sum(1 for i in self.inputs if i and i.fully_signed)))
|
||||
inp_have_subpath = False
|
||||
for i in self.inputs:
|
||||
if i.subpaths or i.taproot_subpaths:
|
||||
inp_have_subpath = True
|
||||
|
||||
def consider_outputs(self, cosign_xfp=None):
|
||||
if self.is_v2:
|
||||
# v2 requires inclusion
|
||||
assert i.prevout_idx is not None
|
||||
assert i.previous_txid
|
||||
if i.req_time_locktime is not None:
|
||||
assert i.req_time_locktime >= NLOCK_IS_TIME
|
||||
if i.req_height_locktime is not None:
|
||||
assert 0 < i.req_height_locktime < NLOCK_IS_TIME
|
||||
else:
|
||||
# v0 requires exclusion
|
||||
assert i.prevout_idx is None
|
||||
assert i.previous_txid is None
|
||||
assert i.sequence is None
|
||||
assert i.req_time_locktime is None
|
||||
assert i.req_height_locktime is None
|
||||
|
||||
if i.witness_script:
|
||||
assert i.witness_script[1] >= 30
|
||||
if i.redeem_script:
|
||||
assert i.redeem_script[1] >= 22
|
||||
|
||||
if i.taproot_internal_key:
|
||||
assert i.taproot_internal_key[1] == 32 # "PSBT_IN_TAP_INTERNAL_KEY length != 32"
|
||||
|
||||
if i.taproot_key_sig:
|
||||
# "PSBT_IN_TAP_KEY_SIG length != 64 or 65"
|
||||
assert i.taproot_key_sig[1] in (64, 65)
|
||||
|
||||
if i.part_sigs:
|
||||
for k, v in i.part_sigs:
|
||||
assert k[1] == 33
|
||||
assert v[1] in (71,72,73) # 73 -> high-s & high-r (maybe should disallow)
|
||||
|
||||
if i.taproot_script_sigs:
|
||||
for k, v in i.taproot_script_sigs:
|
||||
# PSBT_IN_TAP_SCRIPT_SIG + 32 bytes xonly pubkey + leafhash 32 bytes
|
||||
assert k[1] == 64
|
||||
# The 64 or 65 byte Schnorr signature for this pubkey and leaf combination
|
||||
assert v[1] in (64, 65)
|
||||
|
||||
if i.taproot_scripts:
|
||||
for k, v in i.taproot_scripts:
|
||||
assert k[1] > 32 # "PSBT_IN_TAP_LEAF_SCRIPT control block is too short"
|
||||
assert (k[1] - 1) % 32 == 0 # "PSBT_IN_TAP_LEAF_SCRIPT control block is not valid"
|
||||
assert v[1] != 0 # "PSBT_IN_TAP_LEAF_SCRIPT cannot be empty"
|
||||
|
||||
if i.sighash and (i.sighash not in ALL_SIGHASH_FLAGS):
|
||||
raise FatalPSBTIssue("Unsupported sighash flag 0x%x" % i.sighash)
|
||||
|
||||
self.validate_unkonwn(i, "input")
|
||||
|
||||
for o in self.outputs:
|
||||
if self.is_v2:
|
||||
# v2 requires inclusion
|
||||
assert o.amount
|
||||
assert o.script
|
||||
else:
|
||||
# v0 requires exclusion
|
||||
assert o.amount is None
|
||||
assert o.script is None
|
||||
|
||||
if o.taproot_internal_key:
|
||||
assert o.taproot_internal_key[1] == 32 # "PSBT_OUT_TAP_INTERNAL_KEY length != 32"
|
||||
|
||||
self.validate_unkonwn(o, "output")
|
||||
|
||||
if not inp_have_subpath:
|
||||
# Can happen w/ Electrum in watch-mode on XPUB. It doesn't know XFP and
|
||||
# so doesn't insert that into PSBT.
|
||||
# or PSBT provider forgot to include subpaths
|
||||
raise FatalPSBTIssue('PSBT inputs do not contain any key path information.')
|
||||
|
||||
# if multisig xpub details provided, they better be right and/or offer import
|
||||
if self.xpubs:
|
||||
await self.handle_xpubs()
|
||||
|
||||
if DEBUG:
|
||||
print("PSBT: %d inputs, %d output" % (self.num_inputs, self.num_outputs))
|
||||
|
||||
def consider_outputs(self, len_pths, hard_p, prefix_pths, idx_max, cosign_xfp=None):
|
||||
from glob import dis
|
||||
# scan ouputs:
|
||||
# - is it a change address, defined by redeem script (p2sh) or key we know is ours
|
||||
@ -1650,23 +1668,30 @@ class psbtObject(psbtProxy):
|
||||
zero_val_outs = 0 # only those that are not OP_RETURN are considered
|
||||
self.num_change_outputs = 0
|
||||
|
||||
validate_inp_pths = False
|
||||
path_len = None
|
||||
max_gap = idx_max + 200
|
||||
|
||||
# We aren't seeing shared input path lengths.
|
||||
# They are probably doing weird stuff, so leave them alone
|
||||
# and do not validate against inputs paths
|
||||
if len(len_pths) == 1:
|
||||
path_len = 0
|
||||
for pl in len_pths:
|
||||
path_len = pl
|
||||
break
|
||||
if path_len > 2:
|
||||
validate_inp_pths = True
|
||||
|
||||
dis.fullscreen("Validating Outputs..." if version.has_qwerty else "Outputs...")
|
||||
for idx, txo in self.output_iter():
|
||||
dis.progress_sofar(idx, self.num_outputs)
|
||||
output = self.outputs[idx]
|
||||
if self.is_v2:
|
||||
# v2 requires inclusion
|
||||
assert output.amount
|
||||
assert output.script
|
||||
else:
|
||||
# v0 requires exclusion
|
||||
assert output.amount is None
|
||||
assert output.script is None
|
||||
|
||||
self.validate_unkonwn(output, "output")
|
||||
parsed_subpaths = output.parse_subpaths(self.my_xfp, self.warnings, cosign_xfp)
|
||||
|
||||
# perform output validation
|
||||
af = output.validate(idx, txo, self.my_xfp, self, cosign_xfp)
|
||||
af = output.determine_my_change(idx, txo, parsed_subpaths, self)
|
||||
assert txo.nValue >= 0, "negative output value: o%d" % idx
|
||||
total_out += txo.nValue
|
||||
|
||||
@ -1678,6 +1703,48 @@ class psbtObject(psbtProxy):
|
||||
self.num_change_outputs += 1
|
||||
total_change += txo.nValue
|
||||
|
||||
if validate_inp_pths:
|
||||
# Enforce some policy on change outputs:
|
||||
# - need to "look like" they are going to same wallet as inputs came from
|
||||
# - range limit last two path components (numerically)
|
||||
# - same pattern of hard/not hardened components
|
||||
# - MAX_PATH_DEPTH already enforced before this point
|
||||
# - (single-sig only) check ther is only 0,1 at change index
|
||||
is_cmplx = (len(parsed_subpaths) > 1)
|
||||
for i, xpath in enumerate(parsed_subpaths.values()):
|
||||
if i not in output.sp_idxs: continue
|
||||
p = xpath[2:] if output.taproot_subpaths else xpath[1:]
|
||||
|
||||
iss = None
|
||||
if len(p) != path_len:
|
||||
iss = "has wrong path length (%d not %d)" % (len(p), path_len)
|
||||
elif tuple(bool(i & 0x80000000) for i in p) not in hard_p:
|
||||
iss = "has different hardening pattern"
|
||||
elif tuple(p[:-2]) not in prefix_pths:
|
||||
iss = "goes to diff path prefix"
|
||||
elif not is_cmplx and ((p[-2] & 0x7fffffff) not in {0,1}):
|
||||
iss = "2nd last component not 0 or 1"
|
||||
elif (p[-1] & 0x7fffffff) > max_gap:
|
||||
iss = "last component beyond reasonable gap"
|
||||
|
||||
if iss:
|
||||
msg = "Output#%d: %s: %s" % (idx, iss, keypath_to_str(p, skip=0))
|
||||
if len(hard_p) == 1 and len(prefix_pths) == 1:
|
||||
# message can be more verbose
|
||||
# fastest way to get first element from the set
|
||||
# without modifying the set is for-loop
|
||||
for hp in hard_p:
|
||||
break
|
||||
for pp in prefix_pths:
|
||||
break
|
||||
msg += " not %s/{0~1}%s/{0~%d}%s expected" % (
|
||||
keypath_to_str(pp, skip=0),
|
||||
"'" if hp[-2] else "",
|
||||
max_gap,
|
||||
"'" if hp[-1] else ""
|
||||
)
|
||||
self.warnings.append(('Troublesome Change Outs', msg))
|
||||
|
||||
if af == "op_return":
|
||||
num_op_return += 1
|
||||
if len(txo.scriptPubKey) > 83:
|
||||
@ -1743,125 +1810,18 @@ class psbtObject(psbtProxy):
|
||||
|
||||
self.consolidation_tx = (self.num_change_outputs == self.num_outputs)
|
||||
|
||||
# Enforce policy related to change outputs
|
||||
self.consider_dangerous_change()
|
||||
|
||||
if DEBUG:
|
||||
print("PSBT outputs: %d/%d (change count, output count)" % (
|
||||
print("PSBT change outputs: %d out of %d" % (
|
||||
self.num_change_outputs, len(self.outputs)
|
||||
))
|
||||
|
||||
def consider_dangerous_change(self):
|
||||
# Enforce some policy on change outputs:
|
||||
# - need to "look like" they are going to same wallet as inputs came from
|
||||
# - range limit last two path components (numerically)
|
||||
# - same pattern of hard/not hardened components
|
||||
# - MAX_PATH_DEPTH already enforced before this point
|
||||
#
|
||||
in_paths = []
|
||||
for inp in self.inputs:
|
||||
if inp.fully_signed: continue
|
||||
if not inp.sp_idxs: continue
|
||||
for i in inp.sp_idxs:
|
||||
if inp.taproot_subpaths:
|
||||
pos, length = inp.taproot_subpaths[i][1][2]
|
||||
else:
|
||||
pos, length = inp.subpaths[i][1]
|
||||
|
||||
# ommit first element (xfp)
|
||||
in_paths.append(self.parse_xfp_path((pos, length))[1:])
|
||||
|
||||
if not in_paths:
|
||||
# We aren't adding any signatures? Can happen but we're going to be
|
||||
# showing a warning about that elsewhere.
|
||||
return
|
||||
|
||||
shortest = min(len(i) for i in in_paths)
|
||||
longest = max(len(i) for i in in_paths)
|
||||
if shortest != longest or shortest <= 2:
|
||||
# We aren't seeing shared input path lengths.
|
||||
# They are probably doing weird stuff, so leave them alone.
|
||||
return
|
||||
|
||||
# Assumption: hard/not hardened depths will match for all address in wallet
|
||||
def hard_bits(p):
|
||||
return [bool(i & 0x80000000) for i in p]
|
||||
|
||||
# Assumption: common wallets modulate the last two components only
|
||||
# of the path. m/.../change/index where change is typically {0, 1}
|
||||
# and index changes slowly over lifetime of wallet (increasing)
|
||||
path_len = shortest
|
||||
path_prefixes = [p[0:-2] for p in in_paths]
|
||||
path_prefix = path_prefixes[0]
|
||||
|
||||
idx_max = max(i[-1]&0x7fffffff for i in in_paths) + 200
|
||||
hard_pattern = hard_bits(in_paths[0])
|
||||
|
||||
def check_output_path(path):
|
||||
if len(path) != path_len:
|
||||
iss = "has wrong path length (%d not %d)" % (len(path), path_len)
|
||||
elif hard_bits(path) != hard_pattern:
|
||||
iss = "has different hardening pattern"
|
||||
elif path[0:len(path_prefix)] not in path_prefixes:
|
||||
iss = "goes to diff path prefix"
|
||||
# elif (path[-2] & 0x7fffffff) not in {0, 1}:
|
||||
# iss = "2nd last component not 0 or 1"
|
||||
elif (path[-1] & 0x7fffffff) > idx_max:
|
||||
iss = "last component beyond reasonable gap"
|
||||
else:
|
||||
# looks OK
|
||||
iss = None
|
||||
return iss
|
||||
|
||||
def problem_fmt_str(nout, iss, path):
|
||||
return "Output#%d: %s: %s not %s/{0~1}%s/{0~%d}%s expected" % (
|
||||
nout,
|
||||
iss,
|
||||
keypath_to_str(path, skip=0),
|
||||
keypath_to_str(path_prefix, skip=0),
|
||||
"'" if hard_pattern[-2] else "",
|
||||
idx_max,
|
||||
"'" if hard_pattern[-1] else "",
|
||||
)
|
||||
|
||||
probs = []
|
||||
for nout, out in enumerate(self.outputs):
|
||||
if not out.is_change:
|
||||
continue
|
||||
for i in out.sp_idxs:
|
||||
if out.taproot_subpaths:
|
||||
pos, length = out.taproot_subpaths[i][1][2]
|
||||
else:
|
||||
pos, length = out.subpaths[i][1]
|
||||
|
||||
# omit first element (xfp)
|
||||
path = self.parse_xfp_path((pos, length))[1:]
|
||||
iss = check_output_path(path)
|
||||
if iss is None:
|
||||
continue
|
||||
probs.append(problem_fmt_str(nout, iss, path))
|
||||
break
|
||||
|
||||
for p in probs:
|
||||
self.warnings.append(('Troublesome Change Outs', p))
|
||||
|
||||
def consider_inputs(self, cosign_xfp=None):
|
||||
# Look at the UTXO's that we are spending. Do we have them? Do the
|
||||
# hashes match, and what values are we getting?
|
||||
# Important: parse incoming UTXO to build total input value
|
||||
# Check sighash flags are legal, useful, and safe. Warn about
|
||||
# some risks if user has enabled special sighash values.
|
||||
# check nSequences
|
||||
# check nSequences & nLockTime and warn about TX level locktimes
|
||||
from glob import dis
|
||||
|
||||
if not any(i.subpaths or i.taproot_subpaths for i in self.inputs):
|
||||
# Can happen w/ Electrum in watch-mode on XPUB. It doesn't know XFP and
|
||||
# so doesn't insert that into PSBT.
|
||||
# or PSBT provider forgot to include subpaths
|
||||
raise FatalPSBTIssue('PSBT does not contain any key path information.')
|
||||
|
||||
sh_unusual = False
|
||||
none_sh = False
|
||||
foreign = []
|
||||
total_in = 0
|
||||
presigned_inputs = set()
|
||||
@ -1871,30 +1831,43 @@ class psbtObject(psbtProxy):
|
||||
bb_rel_locks = []
|
||||
smallest_nsequence = 0xffffffff
|
||||
|
||||
# collect some input path data from subapths
|
||||
# later used for change outputs path validation
|
||||
length_p = set()
|
||||
hard_pattern = set()
|
||||
prefix_p = set()
|
||||
idx_max = 0
|
||||
|
||||
my_cnt = 0
|
||||
dis.fullscreen("Validating Inputs..."if version.has_qwerty else "Inputs...")
|
||||
for i, txi in self.input_iter():
|
||||
dis.progress_sofar(i, self.num_inputs)
|
||||
inp = self.inputs[i]
|
||||
|
||||
if self.is_v2:
|
||||
# v2 requires inclusion
|
||||
assert inp.prevout_idx is not None
|
||||
assert inp.previous_txid
|
||||
if inp.req_time_locktime is not None:
|
||||
assert inp.req_time_locktime >= NLOCK_IS_TIME
|
||||
if inp.req_height_locktime is not None:
|
||||
assert 0 < inp.req_height_locktime < NLOCK_IS_TIME
|
||||
else:
|
||||
# v0 requires exclusion
|
||||
assert inp.prevout_idx is None
|
||||
assert inp.previous_txid is None
|
||||
assert inp.sequence is None
|
||||
assert inp.req_time_locktime is None
|
||||
assert inp.req_height_locktime is None
|
||||
if inp.part_sigs:
|
||||
# How complete is the set of signatures so far?
|
||||
# - assuming PSBT creator doesn't give us extra data not required
|
||||
# - seems harmless if they fool us into thinking already signed; we do nothing
|
||||
# - could also look at pubkey needed vs. sig provided
|
||||
# - could consider structure of MofN in p2sh cases
|
||||
if len(inp.part_sigs) >= len(inp.subpaths):
|
||||
self.fully_signed = True
|
||||
|
||||
inp.validate(i, txi)
|
||||
self.validate_unkonwn(inp, "input")
|
||||
if inp.taproot_key_sig:
|
||||
self.fully_signed = True
|
||||
|
||||
if inp.utxo:
|
||||
# Important: they might be trying to trick us with an un-related
|
||||
# funding transaction (UTXO) that does not match the input signature we're making
|
||||
# (but if it's segwit, the ploy wouldn't work, Segwit FtW)
|
||||
# - challenge: it's a straight dsha256() for old serializations, but not for newer
|
||||
# segwit txn's... plus I don't want to deserialize it here.
|
||||
try:
|
||||
observed = uint256_from_str(calc_txid(self.fd, inp.utxo))
|
||||
except:
|
||||
raise AssertionError("Trouble parsing UTXO given for input #%d" % i)
|
||||
|
||||
assert txi.prevout.hash == observed, "utxo hash mismatch for input #%d" % i
|
||||
|
||||
if self.txn_version >= 2:
|
||||
has_rtl = inp.has_relative_timelock(txi)
|
||||
@ -1918,7 +1891,10 @@ class psbtObject(psbtProxy):
|
||||
foreign.append(i)
|
||||
continue
|
||||
|
||||
# pull out just the CTXOut object (expensive)
|
||||
# pull out just the CTXOut object
|
||||
# very expensive for non-witness utxo (whole tx)
|
||||
# less expensive for witness UTXO (just necessary TxOut)
|
||||
#
|
||||
utxo = inp.get_utxo(txi.prevout.n)
|
||||
inp.amount = utxo.nValue
|
||||
assert inp.amount >= 0, "negative input value: i%d" % i
|
||||
@ -1928,7 +1904,7 @@ class psbtObject(psbtProxy):
|
||||
# save scriptPubKey of utxo for later use
|
||||
# needed for P2WPKH scriptCode calculation
|
||||
# needed for P2PK & P2PKH scriptSig (when finalizing)
|
||||
# needed for each input if we sign at least one P2TR
|
||||
# needed for each input if we sign at least one P2TR input
|
||||
inp.utxo_spk = utxo.scriptPubKey
|
||||
|
||||
del utxo # not needed anymore
|
||||
@ -1945,6 +1921,23 @@ class psbtObject(psbtProxy):
|
||||
inp.determine_my_signing_key(i, addr_or_pubkey, self.my_xfp, self,
|
||||
parsed_subpaths)
|
||||
|
||||
# determine_my_signing_key may have removed sp_idxs
|
||||
# meaning we're not going to sign this input - other wallet in use
|
||||
if not inp.sp_idxs:
|
||||
continue
|
||||
|
||||
# parsed subpaths are OrderedDict - matches sp_idxs
|
||||
for ii, xpath in enumerate(parsed_subpaths.values()):
|
||||
if ii not in inp.sp_idxs: continue
|
||||
p = xpath[2:] if inp.taproot_subpaths else xpath[1:]
|
||||
length_p.add(len(p)) # ignore xfp
|
||||
hard_pattern.add(tuple(bool(i & 0x80000000) for i in p))
|
||||
prefix_p.add(tuple(p[:-2]))
|
||||
|
||||
index = p[-1] & 0x7fffffff
|
||||
if index > idx_max:
|
||||
idx_max = index
|
||||
|
||||
# iff to UTXO is segwit, then check it's value, and also
|
||||
# capture that value, since it's supposed to be immutable
|
||||
if inp.af and inp.is_segwit:
|
||||
@ -1955,18 +1948,6 @@ class psbtObject(psbtProxy):
|
||||
# attribute after creating sighash
|
||||
self.my_tr_in = True
|
||||
|
||||
if inp.sighash is not None:
|
||||
# All inputs MUST have SIGHASH that we are able to sign.
|
||||
if inp.sighash not in ALL_SIGHASH_FLAGS:
|
||||
raise FatalPSBTIssue("Unsupported sighash flag 0x%x" % inp.sighash)
|
||||
|
||||
if inp.sighash not in (SIGHASH_ALL, SIGHASH_DEFAULT):
|
||||
sh_unusual = True
|
||||
|
||||
if inp.sighash in (SIGHASH_NONE, SIGHASH_NONE | SIGHASH_ANYONECANPAY):
|
||||
none_sh = True
|
||||
|
||||
|
||||
if not my_cnt:
|
||||
raise FatalPSBTIssue('None of the keys involved in this transaction '
|
||||
'belong to this Coldcard (need %s).' % xfp2str(self.my_xfp))
|
||||
@ -2033,6 +2014,32 @@ class psbtObject(psbtProxy):
|
||||
# create UX for users about tx level relative timelocks (nSequence)
|
||||
self.ux_relative_timelocks(tb_rel_locks, bb_rel_locks)
|
||||
|
||||
if MiniScriptWallet.disable_checks:
|
||||
self.warnings.append(('Danger', 'Some miniscript checks are disabled.'))
|
||||
|
||||
if DEBUG:
|
||||
print("PSBT inputs: %d inputs contain our key, %d fully-signed" % (
|
||||
my_cnt, len(presigned_inputs)))
|
||||
|
||||
# useful info from all our parsed paths - will be validated against change outputs
|
||||
return length_p, hard_pattern, prefix_p, idx_max
|
||||
|
||||
def consider_dangerous_sighash(self):
|
||||
# Check sighash flags are legal, useful, and safe. Warn about
|
||||
# some risks if user has enabled special sighash values.
|
||||
# can only be run after consider_outputs is done
|
||||
sh_unusual = False
|
||||
none_sh = False
|
||||
for inp in self.inputs:
|
||||
if inp.sp_idxs and not inp.fully_signed:
|
||||
if inp.sighash:
|
||||
if inp.sighash is not None:
|
||||
if inp.sighash not in (SIGHASH_ALL, SIGHASH_DEFAULT):
|
||||
sh_unusual = True
|
||||
|
||||
if inp.sighash in (SIGHASH_NONE, SIGHASH_NONE | SIGHASH_ANYONECANPAY):
|
||||
none_sh = True
|
||||
|
||||
if sh_unusual and not settings.get("sighshchk"):
|
||||
if self.consolidation_tx:
|
||||
# policy: all inputs must be sighash ALL in purely consolidation txn
|
||||
@ -2051,12 +2058,6 @@ class psbtObject(psbtProxy):
|
||||
("Caution", "Some inputs have unusual SIGHASH values not used in typical cases.")
|
||||
)
|
||||
|
||||
if MiniScriptWallet.disable_checks:
|
||||
self.warnings.append(('Danger', 'Some miniscript checks are disabled.'))
|
||||
|
||||
if DEBUG:
|
||||
print("PSBT inputs: %d inputs contain our key" % my_cnt)
|
||||
|
||||
def calculate_fee(self):
|
||||
# what miner's reward is included in txn?
|
||||
if self.total_value_in is None:
|
||||
@ -2385,7 +2386,9 @@ class psbtObject(psbtProxy):
|
||||
# - none of the inputs that we're signing is P2TR
|
||||
# - this input is not P2PK or P2PKH, otherwise we need utxo_spk for scriptSig
|
||||
if not self.my_tr_in and (inp.af not in ("p2pk", AF_CLASSIC)):
|
||||
del inp.utxo_spk
|
||||
try:
|
||||
del inp.utxo_spk
|
||||
except AttributeError: pass # may not have UTXO
|
||||
|
||||
# The precious private key we need
|
||||
for i, (node, pk_coord) in enumerate(to_sign):
|
||||
|
||||
@ -1027,7 +1027,7 @@ def test_multiple_multisig_wallets(settings_set, setup_ccc, enter_enabled_ccc, c
|
||||
bitcoind_create_watch_only_wallet, cap_story, bitcoind,
|
||||
policy_sign, settings_get, cap_menu, pick_menu_item,
|
||||
press_select, load_export, offer_minsc_import, goto_home,
|
||||
need_keypress, is_q1, enter_text):
|
||||
need_keypress, is_q1, enter_text, enter_complex):
|
||||
# - 'build 2-of-N' path
|
||||
goto_home()
|
||||
settings_set("ccc", None)
|
||||
@ -1093,7 +1093,7 @@ def test_multiple_multisig_wallets(settings_set, setup_ccc, enter_enabled_ccc, c
|
||||
# export one of the wallets
|
||||
w_mn, w_name = ami.split(":", 1)
|
||||
w_name = w_name.strip()
|
||||
new_name = "new"
|
||||
new_name = "AAAA"
|
||||
pick_menu_item(ami) # just another ms wallet
|
||||
pick_menu_item("Descriptors")
|
||||
pick_menu_item("Export")
|
||||
@ -1109,10 +1109,20 @@ def test_multiple_multisig_wallets(settings_set, setup_ccc, enter_enabled_ccc, c
|
||||
pick_menu_item("Miniscript")
|
||||
pick_menu_item(w_name)
|
||||
pick_menu_item("Rename")
|
||||
for i in range(len(w_name)):
|
||||
for i in range(len(w_name) if is_q1 else len(w_name)-1):
|
||||
need_keypress(KEY_DELETE if is_q1 else "x")
|
||||
|
||||
enter_text(new_name)
|
||||
if not is_q1:
|
||||
# below should yield AAAA
|
||||
need_keypress("1")
|
||||
for _ in range(3):
|
||||
need_keypress("9") # next char
|
||||
need_keypress("1") # letters
|
||||
|
||||
press_select()
|
||||
else:
|
||||
enter_text(new_name)
|
||||
|
||||
time.sleep(.1)
|
||||
enter_enabled_ccc(words)
|
||||
m = cap_menu()
|
||||
|
||||
@ -761,7 +761,7 @@ def test_import_dup_safe(N, clear_miniscript, make_multisig, offer_minsc_import,
|
||||
press_select()
|
||||
has_name(orig_name)
|
||||
|
||||
new_name = "xxx-new"
|
||||
new_name = "AAAA"
|
||||
title, story = offer_minsc_import(make_named(new_name))
|
||||
assert 'Duplicate wallet' in story
|
||||
assert f"'{orig_name}' is the same"
|
||||
@ -774,12 +774,20 @@ def test_import_dup_safe(N, clear_miniscript, make_multisig, offer_minsc_import,
|
||||
|
||||
# just simple rename
|
||||
pick_menu_item(orig_name)
|
||||
pick_menu_item('Rename')
|
||||
for i in range(len(orig_name)):
|
||||
pick_menu_item("Rename")
|
||||
for i in range(len(orig_name) if is_q1 else len(orig_name) - 1):
|
||||
need_keypress(KEY_DELETE if is_q1 else "x")
|
||||
|
||||
if not is_q1:
|
||||
# below should yield AAAA
|
||||
need_keypress("1")
|
||||
for _ in range(3):
|
||||
need_keypress("9") # next char
|
||||
need_keypress("1") # letters
|
||||
|
||||
enter_text(new_name)
|
||||
press_select()
|
||||
else:
|
||||
enter_text(new_name)
|
||||
|
||||
press_select()
|
||||
has_name(new_name)
|
||||
|
||||
@ -40,7 +40,7 @@ def test_sign1(dev, finalize):
|
||||
|
||||
#assert 'None of the keys' in str(ee)
|
||||
#assert 'require subpaths' in str(ee)
|
||||
assert 'PSBT does not contain any key path information' in str(ee)
|
||||
assert 'PSBT inputs do not contain any key path information' in str(ee)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('fn', [
|
||||
@ -87,7 +87,7 @@ def test_psbt_parse_good(try_sign, fn, accept):
|
||||
assert ('Missing UTXO' in msg) \
|
||||
or ('None of the keys' in msg) \
|
||||
or ('completely signed already' in msg) \
|
||||
or ('PSBT does not contain any key path information' in msg) \
|
||||
or ('PSBT inputs do not contain any key path information' in msg) \
|
||||
or ('require subpaths' in msg), msg
|
||||
|
||||
|
||||
@ -1445,7 +1445,7 @@ def test_fully_unsigned(fake_txn, try_sign, addr_fmt):
|
||||
with pytest.raises(CCProtoError) as ee:
|
||||
orig, result = try_sign(psbt, accept=True)
|
||||
|
||||
assert 'does not contain any key path information' in str(ee)
|
||||
assert 'PSBT inputs do not contain any key path information' in str(ee)
|
||||
|
||||
@pytest.mark.parametrize('addr_fmt', ["p2wpkh", "p2tr"])
|
||||
def test_wrong_xfp(fake_txn, try_sign, addr_fmt):
|
||||
@ -1656,7 +1656,7 @@ def test_missing_keypaths(dev, try_sign, fake_txn):
|
||||
orig, result = try_sign(mod_psbt, accept=False)
|
||||
|
||||
msg = ee.value.args[0]
|
||||
assert ('does not contain any key path information' in msg)
|
||||
assert ('PSBT inputs do not contain any key path information' in msg)
|
||||
|
||||
def test_wrong_pubkey(dev, try_sign, fake_txn):
|
||||
# psbt input gives a pubkey+subkey path, but that pubkey doesn't map to utxo pubkey
|
||||
|
||||
Loading…
Reference in New Issue
Block a user