This commit is contained in:
scgbckbone 2024-10-10 18:39:31 +02:00
parent ac782fdd59
commit 531ae613c7
4 changed files with 73 additions and 81 deletions

View File

@ -791,6 +791,7 @@ class ApproveTransaction(UserAuthorizedAction):
async def interact(self):
# Prompt user w/ details and get approval
from glob import dis, hsm_active
from ccc import CCCFeature, PolicyViolationException
# step 1: parse PSBT from PSRAM into in-memory objects.
@ -813,7 +814,8 @@ class ApproveTransaction(UserAuthorizedAction):
try:
await self.psbt.validate() # might do UX: accept multisig import
dis.progress_bar_show(0.10)
self.psbt.consider_inputs()
ccc_c_xfp = CCCFeature.get_xfp() # can be None
self.psbt.consider_inputs(cosign_xfp=ccc_c_xfp)
dis.progress_bar_show(0.33)
self.psbt.consider_keys()
@ -823,12 +825,24 @@ class ApproveTransaction(UserAuthorizedAction):
self.psbt.consider_dangerous_sighash()
dis.progress_bar_show(0.85)
if CCCFeature.is_enabled():
if self.psbt.active_multisig and (ccc_c_xfp in self.psbt.active_multisig.xfp_paths):
CCCFeature.validate_tx(self.psbt)
self.psbt.sign_it(CCCFeature.get_encoded_secret(), ccc_c_xfp)
except FraudulentChangeOutput as exc:
print('FraudulentChangeOutput: ' + exc.args[0])
return await self.failure(exc.args[0], title='Change Fraud')
except FatalPSBTIssue as exc:
print('FatalPSBTIssue: ' + exc.args[0])
return await self.failure(exc.args[0])
except PolicyViolationException as exc:
ch = await ux_show_story(
exc.args[0]+"\n\n Would you like to sign with A key?",
title='Policy Violation')
if ch != "y":
return
except BaseException as exc:
del self.psbt
gc.collect()

View File

@ -11,6 +11,10 @@ from seed import seed_words_to_encoded_secret
from stash import SecretStash, len_to_numwords
from charcodes import KEY_QR, KEY_CANCEL, KEY_NFC
class PolicyViolationException(Exception):
pass
class CCCFeature:
@classmethod
def is_enabled(cls):
@ -98,6 +102,17 @@ class CCCFeature:
settings.remove_key('ccc')
settings.save()
@classmethod
def validate_tx(cls, psbt):
policy = cls.get_policy()
magnitude = policy.get("mag", None)
outgoing = psbt.total_value_out - psbt.total_change_value
if magnitude < 1000:
# it is a BTC, convert to sats
magnitude = magnitude * 100000000
if outgoing > magnitude:
raise PolicyViolationException("magnitude")
def render_mag_value(mag):
# handle integer bitcoins, and satoshis in same value
if mag < 1000:

View File

@ -547,7 +547,7 @@ class psbtInputProxy(psbtProxy):
blank_flds = (
'unknown', 'utxo', 'witness_utxo', 'sighash', 'redeem_script', 'witness_script',
'fully_signed', 'is_segwit', 'is_multisig', 'is_p2sh', 'num_our_keys',
'required_key', 'scriptSig', 'amount', 'scriptCode', 'added_sig', 'previous_txid',
'required_key', 'scriptSig', 'amount', 'scriptCode', 'previous_txid',
'prevout_idx', 'sequence', 'req_time_locktime', 'req_height_locktime'
)
@ -556,7 +556,8 @@ class psbtInputProxy(psbtProxy):
#self.utxo = None
#self.witness_utxo = None
self.part_sig = {}
self.part_sigs = {}
self.added_sigs = {} # signature that CC added (clearly seprated from what can be already in part_sigs)
#self.sighash = None
self.subpaths = {} # will typically be non-empty for all inputs
#self.redeem_script = None
@ -579,7 +580,6 @@ class psbtInputProxy(psbtProxy):
#self.scriptCode = None # only expected for segwit inputs
# after signing, we'll have a signature to add to output PSBT
#self.added_sig = None
#self.previous_txid = None
#self.prevout_idx = None
@ -629,13 +629,13 @@ class psbtInputProxy(psbtProxy):
# rework the pubkey => subpath mapping
self.parse_subpaths(my_xfp, parent.warnings)
if self.part_sig:
if self.part_sigs:
# How complete is the set of signatures so far?
# - assuming PSBT creator doesn't give us extra data not required
# - seems harmless if they fool us into thinking already signed; we do nothing
# - could also look at pubkey needed vs. sig provided
# - could consider structure of MofN in p2sh cases
self.fully_signed = (len(self.part_sig) >= len(self.subpaths))
self.fully_signed = (len(self.part_sigs) >= len(self.subpaths))
else:
# No signatures at all yet for this input (typical non multisig)
self.fully_signed = False
@ -714,7 +714,7 @@ class psbtInputProxy(psbtProxy):
return utxo
def determine_my_signing_key(self, my_idx, utxo, my_xfp, psbt):
def determine_my_signing_key(self, my_idx, utxo, my_xfp, psbt, cosign_xfp=None):
# See what it takes to sign this particular input
# - type of script
# - which pubkey needed
@ -758,25 +758,18 @@ class psbtInputProxy(psbtProxy):
# Assume we'll be signing with any key we know
# - limitation: we cannot be two legs of a multisig (only if CCC feature used)
# - but if partial sig already in place, ignore that one
from ccc import CCCFeature
ccc_enabled = CCCFeature.is_enabled()
ccc_c_xfp = CCCFeature.get_xfp()
if not which_key:
which_key = set()
for pubkey, path in self.subpaths.items():
if self.part_sig and (pubkey in self.part_sig):
if self.part_sigs and (pubkey in self.part_sigs):
# pubkey has already signed, so ignore
continue
if path[0] == my_xfp:
if path[0] in (my_xfp, cosign_xfp):
# slight chance of dup xfps, so handle
if not which_key:
which_key = set()
which_key.add(pubkey)
elif ccc_enabled and (path[0] == ccc_c_xfp):
# CCC feature enabled and xfp part of BIP32 paths
psbt.ccc = True
if not addr_is_segwit and \
len(redeem_script) == 22 and \
redeem_script[0] == 0 and redeem_script[1] == 20:
@ -888,7 +881,7 @@ class psbtInputProxy(psbtProxy):
elif kt == PSBT_IN_WITNESS_UTXO:
self.witness_utxo = val
elif kt == PSBT_IN_PARTIAL_SIG:
self.part_sig[key[1:]] = val
self.part_sigs[key[1:]] = val
elif kt == PSBT_IN_BIP32_DERIVATION:
self.subpaths[key[1:]] = val
elif kt == PSBT_IN_REDEEM_SCRIPT:
@ -924,13 +917,13 @@ class psbtInputProxy(psbtProxy):
if self.witness_utxo:
wr(PSBT_IN_WITNESS_UTXO, self.witness_utxo)
if self.part_sig:
for pk in self.part_sig:
wr(PSBT_IN_PARTIAL_SIG, self.part_sig[pk], pk)
if self.part_sigs:
for pk, sig in self.part_sigs.items():
wr(PSBT_IN_PARTIAL_SIG, sig, pk)
if self.added_sig:
pubkey, sig = self.added_sig
wr(PSBT_IN_PARTIAL_SIG, sig, pubkey)
if self.added_sigs:
for pk, sig in self.added_sigs.items():
wr(PSBT_IN_PARTIAL_SIG, sig, pk)
if self.sighash is not None:
wr(PSBT_IN_SIGHASH_TYPE, pack('<I', self.sighash))
@ -978,7 +971,6 @@ class psbtObject(psbtProxy):
self.xpubs = [] # tuples(xfp_path, xpub)
self.my_xfp = settings.get('xfp', 0)
self.ccc = False
# details that we discover as we go
self.inputs = None
@ -1530,19 +1522,6 @@ class psbtObject(psbtProxy):
# Enforce policy related to change outputs
self.consider_dangerous_change(self.my_xfp)
if not self.consolidation_tx and self.ccc:
# only check if ccc enabled & we're actually sending something
# pure consolidation will always be co-signed with C key
self.consider_ccc_policy()
def consider_ccc_policy(self):
from ccc import CCCFeature
policy = CCCFeature.get_policy()
magnitude = policy.get("mag", None)
outgoing = self.total_value_out - self.total_change_value
if outgoing > magnitude:
self.ccc = False
def consider_dangerous_sighash(self):
# Check sighash flags are legal, useful, and safe. Warn about
# some risks if user has enabled special sighash values.
@ -1655,7 +1634,7 @@ class psbtObject(psbtProxy):
for p in probs:
self.warnings.append(('Troublesome Change Outs', p))
def consider_inputs(self):
def consider_inputs(self, cosign_xfp=None):
# Look at the UTXO's that we are spending. Do we have them? Do the
# hashes match, and what values are we getting?
# Important: parse incoming UTXO to build total input value
@ -1686,7 +1665,7 @@ class psbtObject(psbtProxy):
# type of signing will be required, and which key we need.
# - also validates redeem_script when present
# - also finds appropriate multisig wallet to be used
inp.determine_my_signing_key(i, utxo, self.my_xfp, self)
inp.determine_my_signing_key(i, utxo, self.my_xfp, self, cosign_xfp)
# iff to UTXO is segwit, then check it's value, and also
# capture that value, since it's supposed to be immutable
@ -1883,7 +1862,7 @@ class psbtObject(psbtProxy):
der_sig = ser_sig_der(r, s, sighash)
return der_sig
def sign_it(self):
def sign_it(self, alternate_secret=None, my_xfp=None):
# txn is approved. sign all inputs we can sign. add signatures
# - hash the txn first
# - sign all inputs we have the key for
@ -1892,11 +1871,11 @@ class psbtObject(psbtProxy):
# - update our state with new partial sigs
from glob import dis
from ownership import OWNERSHIP
from ccc import CCCFeature
c_secret = CCCFeature.get_encoded_secret() if self.ccc else ...
c_xfp = CCCFeature.get_xfp()
with stash.SensitiveValues() as sv, stash.SensitiveValues(secret=c_secret) as c_sv:
if my_xfp is None:
my_xfp = self.my_xfp
with stash.SensitiveValues(secret=alternate_secret) as sv:
# Double-check the change outputs are right. This is slow, but critical because
# it detects bad actors, not bugs or mistakes.
# - equivalent check already done for p2sh outputs when we re-built the redeem script
@ -1914,17 +1893,13 @@ class psbtObject(psbtProxy):
for pubkey, subpath in oup.subpaths.items():
# for multisig, will be N paths, and exactly one will
# be our key. For single-signer, should always be my XFP
if subpath[0] == self.my_xfp:
if subpath[0] == my_xfp:
# derive actual pubkey from private
res = self.check_pubkey_at_path(sv, subpath, pubkey)
if res:
good += 1
# TODO is this needed if output is multisig?
OWNERSHIP.note_subpath_used(subpath)
elif self.ccc and (subpath[0] == c_xfp):
res = self.check_pubkey_at_path(c_sv, subpath, pubkey)
if res:
good += 1
if not good:
raise FraudulentChangeOutput(out_idx,
@ -1936,7 +1911,6 @@ class psbtObject(psbtProxy):
# randomize secp context before each signing session
ngu.secp256k1.ctx_rnd()
# Sign individual inputs
success = set()
for in_idx, txi in self.input_iter():
dis.progress_sofar(in_idx, self.num_inputs)
@ -1959,8 +1933,6 @@ class psbtObject(psbtProxy):
assert txi.scriptSig, "no scriptsig?"
inp.handle_none_sighash()
c_node = None
c_req_key = None
if inp.is_multisig:
# need to consider a set of possible keys, since xfp may not be unique
for which_key in inp.required_key:
@ -1971,22 +1943,15 @@ class psbtObject(psbtProxy):
else:
raise AssertionError("Input #%d needs pubkey I dont have" % in_idx)
if self.ccc:
for pubkey, path in inp.subpaths.items():
if path[0] == c_xfp:
c_node = self.check_pubkey_at_path(c_sv, path, pubkey)
assert c_node
c_req_key = pubkey
else:
# single pubkey <=> single key
which_key = inp.required_key
assert not inp.added_sig, "already done??"
assert not inp.added_sigs, "already done??"
assert which_key in inp.subpaths, 'unk key'
if inp.subpaths[which_key][0] != self.my_xfp:
if inp.subpaths[which_key][0] != my_xfp:
# we don't have the key for this subkey
# (redundant, required_key wouldn't be set)
continue
@ -2031,23 +1996,13 @@ class psbtObject(psbtProxy):
stash.blank_object(node)
del pk, node
inp.added_sig = (which_key, der_sig)
if self.ccc and c_node:
c_sk = c_node.privkey()
c_der_sig = self.ecdsa_grind_sign(c_sk, digest, inp.sighash)
stash.blank_object(c_sk)
stash.blank_object(c_secret)
del c_node
inp.part_sig[c_req_key] = c_der_sig
inp.added_sigs[which_key] = der_sig
# Could remove sighash from input object - it is not required, takes space,
# and is already in signature or is implicit by not being part of the
# signature (taproot SIGHASH_DEFAULT)
## inp.sighash = None
success.add(in_idx)
if self.is_v2:
self.set_modifiable_flag(inp)
@ -2242,7 +2197,7 @@ class psbtObject(psbtProxy):
# but we can't combine/finalize multisig stuff, so will never't be 'final'
return False
if inp.added_sig:
if inp.added_sigs:
signed += 1
return signed == self.num_inputs
@ -2285,10 +2240,10 @@ class psbtObject(psbtProxy):
else:
# insert the new signature(s), assuming fully signed txn.
assert inp.added_sig, 'No signature on input #%d'%in_idx
assert inp.added_sigs, 'No signature on input #%d'%in_idx
assert not inp.is_multisig, 'Multisig PSBT combine not supported'
pubkey, der_sig = inp.added_sig
pubkey, der_sig = inp.added_sigs.items()[0]
s = b''
s += ser_push_data(der_sig)
@ -2315,12 +2270,12 @@ class psbtObject(psbtProxy):
for in_idx, wit in self.input_witness_iter():
inp = self.inputs[in_idx]
if inp.is_segwit and inp.added_sig:
if inp.is_segwit and inp.added_sigs:
# put in new sig: wit is a CTxInWitness
assert not wit.scriptWitness.stack, 'replacing non-empty?'
assert not inp.is_multisig, 'Multisig PSBT combine not supported'
pubkey, der_sig = inp.added_sig
pubkey, der_sig = inp.added_sigs.items()[0]
assert pubkey[0] in {0x02, 0x03} and len(pubkey) == 33, "bad v0 pubkey"
wit.scriptWitness.stack = [der_sig, pubkey]

View File

@ -320,7 +320,7 @@ def ccc_ms_setup(microsd_path, virtdisk_path, scan_a_qr, is_q1, cap_menu, pick_m
@pytest.mark.parametrize("magnitude_ok", [True, False])
def test_ccc_cosign(setup_ccc, enter_enabled_ccc, ccc_ms_setup, fake_ms_txn, start_sign,
cap_menu, pick_menu_item, need_keypress, cap_story, microsd_path,
bitcoind, end_sign, magnitude_ok, settings_set):
bitcoind, end_sign, magnitude_ok, settings_set, press_select):
settings_set("ccc", None)
words = setup_ccc()
@ -372,6 +372,14 @@ def test_ccc_cosign(setup_ccc, enter_enabled_ccc, ccc_ms_setup, fake_ms_txn, sta
psbt = psbt_resp.get("psbt")
start_sign(base64.b64decode(psbt))
if not magnitude_ok:
time.sleep(.1)
title, story = cap_story()
assert "Policy Violation" == title
assert "magnitude" in story
assert "sign with A key" in story
press_select()
signed = end_sign(accept=True)
po = BasicPSBT().parse(signed)