firmware/testing/test_musig2.py

1865 lines
68 KiB
Python

# (c) Copyright 2026 by Coinkite Inc. This file is covered by license found in COPYING-CC.
#
# MuSig2 tests.
#
import pytest, base64, itertools, time, json, copy, random, os
from txn import BasicPSBT
from constants import SIGHASH_MAP
from bip32 import random_keys, ranged_unspendable_internal_key, BIP32Node
from helpers import generate_binary_tree_template
from mnemonic import Mnemonic
def sighash_check(psbt, sighash):
po = BasicPSBT().parse(psbt)
for inp in po.inputs:
if sighash != "DEFAULT":
assert inp.sighash == SIGHASH_MAP[sighash]
else:
assert inp.sighash is None
@pytest.fixture
def build_musig_wallet(bitcoin_core_signer, microsd_path, garbage_collector, press_select, get_cc_key,
import_duplicate, create_core_wallet, import_miniscript, offer_minsc_import):
def doit(wal_name, num_signers, cc_key_orig_der="86h/1h/0h", import_way="usb",
musig_subder=None, tapscript=False, tapscript_musig_threshold=0, wallet_type=0,
tree_design="balanced", num_utxo_available=1):
# wallet type 0 -> musig with all participant keys in taproot internal, N-1 signer leaves in tapscript
# wallet type 1 -> N-1 across both internal key and tapscript leaves + one fallback sortedmulti
# derivation not allowed inside musig
core_pubkeys = []
core_privkeys = []
signers = []
# first signer is CC
cc_key = get_cc_key(cc_key_orig_der).replace("/<0;1>/*", "")
for i in range(num_signers-1):
# core signers
signer, core_pk, core_sk = bitcoin_core_signer(f"{wal_name}_cosigner{i}", privkey=True)
signer.keypoolrefill(25)
core_pubkeys.append(core_pk.replace("/0/*", ""))
core_privkeys.append(core_sk.replace("/0/*", ""))
signers.append(signer)
all_pks = [cc_key] + core_pubkeys
if isinstance(tapscript, str):
# custom descriptor - fill keys
desc = tapscript.replace("$H", ranged_unspendable_internal_key())
for i in range(len(all_pks) -1, -1, -1):
desc = desc.replace(f"${i}", all_pks[i])
else:
random.shuffle(all_pks)
inner = "musig(%s)" % ",".join(all_pks)
if musig_subder:
inner += musig_subder
if tapscript:
if wallet_type == 0:
scripts = []
for c in itertools.combinations(all_pks, tapscript_musig_threshold):
msig = f"pk(musig({','.join(c)}){musig_subder or ''})"
scripts.append(msig)
tmplt = generate_binary_tree_template(len(scripts), strategy=tree_design)
tapscript = tmplt % tuple(scripts)
inner += ","
inner += tapscript
elif wallet_type == 1:
scripts = []
for c in itertools.combinations(all_pks, tapscript_musig_threshold):
msig = f"musig({','.join(c)}){musig_subder or ''}"
scripts.append(msig)
# internal key is just one of the musigs with N-1 keys
inner = scripts.pop(0)
scripts = [f"pk({sc})" for sc in scripts]
# add fallback sortedmulti classic multisig
scripts.append(f"sortedmulti_a({tapscript_musig_threshold},{','.join(all_pks)})")
# as one musig was removed from scripts & one fallback added, len is correct
tmplt = generate_binary_tree_template(len(scripts), strategy=tree_design)
tapscript = tmplt % tuple(scripts)
inner += ","
inner += tapscript
desc = f"tr({inner})"
if import_way == "usb":
_, story = offer_minsc_import(json.dumps(dict(name=wal_name, desc=desc)))
elif import_way == "sd":
fname = f"{wal_name}.txt"
fpath = microsd_path(fname)
with open(fpath, "w") as f:
f.write(desc)
garbage_collector.append(fpath)
_, story = import_miniscript(fname)
else:
raise ValueError # not implemented (yet)
assert "Create new miniscript wallet?" in story
assert wal_name in story
# do some checks on policy --> helper function to replace keys with letters
press_select()
wo = create_core_wallet(wal_name, "bech32m", "sd", num_utxo_available)
desc_lst = []
for obj in wo.listdescriptors()["descriptors"]:
del obj["next"]
del obj["next_index"]
desc_lst.append(obj)
# import musig descriptor to signers
# each signer has it's own privkey loaded
for s, spk, ssk in zip(signers, core_pubkeys, core_privkeys):
to_import = copy.deepcopy(desc_lst)
for dobj in to_import:
dobj["desc"] = dobj["desc"].split("#")[0].replace(spk, ssk)
csum = wo.getdescriptorinfo(dobj["desc"])["checksum"]
dobj["desc"] = dobj["desc"] + "#" + csum
res = s.importdescriptors(to_import)
for o in res:
assert o["success"]
# return watch only wallet with musig imported
# & core signers with musig wallet imported
# & descriptor that was imported into CC and watch onyl wallet
return wo, signers, desc
return doit
@pytest.fixture
def musig_signing(start_sign, end_sign, microsd_path, garbage_collector, cap_story, goto_home,
pick_menu_item, press_select, bitcoind, need_keypress, press_cancel):
def doit(wallet_name, watch_only, core_signers, coldcard_first, signers_start, signers_end,
finalized, split_to=10, sequence=None, locktime=0, cc_first_no_sigs_added=True):
all_of_it = watch_only.getbalance()
unspent = watch_only.listunspent()
assert len(unspent) == 1
if sequence:
inp = [{"txid": unspent[0]["txid"], "vout": unspent[0]["vout"], "sequence": sequence}]
else:
inp = [] # auto-selection
# split to
nVal = all_of_it / split_to
conso_addrs = [{watch_only.getnewaddress("", "bech32m"): nVal} for _ in range(split_to)] # self-spend
psbt_resp = watch_only.walletcreatefundedpsbt(
inp,
conso_addrs,
locktime,
{"fee_rate": 2, "change_type": "bech32m", "subtractFeeFromOutputs": [0]},
)
psbt = psbt_resp.get("psbt")
if not coldcard_first:
# cosigners adding nonces
for s in core_signers:
psbt_resp = s.walletprocesspsbt(psbt, True, "DEFAULT", True, False)
psbt = psbt_resp.get("psbt")
# CC add nonce
# even if all nonces from co-signers are already present we do not add signatures
# 1st & 2nd round are strictly separated
# if CC adds nonce, no signatures are added and vice-versa
start_sign(base64.b64decode(psbt))
title, story = cap_story()
assert "Consolidating" in story
assert f"Wallet: {wallet_name}" in story
need_keypress("2")
pick_menu_item("Inputs")
title, story = cap_story()
assert "MuSig2" in story
press_cancel()
press_cancel()
res_psbt = end_sign(exit_export_loop=False)
time.sleep(.1)
title, story = cap_story()
if not cc_first_no_sigs_added:
assert "PSBT Signed" == title
else:
assert "PSBT Updated" == title
press_cancel() # exit export loop
b64_res_psbt = base64.b64encode(res_psbt).decode()
if coldcard_first:
# if cc was first to add pubnonce - now core cosigners will add
for s in core_signers:
psbt_resp = s.walletprocesspsbt(b64_res_psbt, True, "DEFAULT", True, False)
b64_res_psbt = psbt_resp.get("psbt")
# cosigners adding signatures - core also strictly separates 1st & 2nd round (cannot add both nonce and sigs in one sitting)
for s in core_signers[signers_start: signers_end]:
psbt_resp = s.walletprocesspsbt(b64_res_psbt, True, "DEFAULT", True, False)
b64_res_psbt = psbt_resp.get("psbt")
final_txn = None
cc_txid = None
# now CC adds signatures
# go via SD, as we want to see both PSBT and finalized tx
fname = f"{wallet_name}.psbt"
fpath = microsd_path(fname)
with open(fpath, "w") as f:
f.write(b64_res_psbt)
garbage_collector.append(fpath)
goto_home()
pick_menu_item("Ready To Sign")
time.sleep(.1)
title, story = cap_story()
if "OK TO SEND?" not in title:
time.sleep(0.1)
pick_menu_item(fname)
time.sleep(0.1)
title, story = cap_story()
assert title == "OK TO SEND?"
assert "Consolidating" in story
press_select() # confirm signing
time.sleep(0.5)
title, story = cap_story()
assert "PSBT Signed" == title
assert "Updated PSBT is:" in story
press_select()
split_story = story.split("\n\n")
fname_psbt = split_story[1]
fpath_psbt = microsd_path(fname_psbt)
with open(fpath_psbt, "r") as f:
b64_res_psbt = f.read().strip()
garbage_collector.append(fpath_psbt)
if finalized:
fname_txn = split_story[3]
cc_txid = split_story[4].split("\n")[-1]
fpath_txn = microsd_path(fname_txn)
with open(fpath_txn, "r") as f:
final_txn = f.read().strip()
garbage_collector.append(fpath_txn)
res = watch_only.finalizepsbt(b64_res_psbt)
assert res["complete"]
tx_hex = res["hex"]
if finalized:
assert tx_hex == final_txn
if (sequence or locktime) and not finalized:
# we are signing for timelocked tapscript
res = watch_only.testmempoolaccept([tx_hex])
assert res[0]["allowed"] is False
assert res[0]['reject-reason'] == 'non-BIP68-final' if sequence else "non-final"
if sequence:
bitcoind.supply_wallet.generatetoaddress(sequence, bitcoind.supply_wallet.getnewaddress())
else:
block_h = watch_only.getblockchaininfo()["blocks"]
bitcoind.supply_wallet.generatetoaddress(locktime - block_h, bitcoind.supply_wallet.getnewaddress())
res = watch_only.testmempoolaccept([tx_hex])
assert res[0]["allowed"]
res = watch_only.sendrawtransaction(tx_hex)
assert len(res) == 64 # tx id
if finalized:
assert res == cc_txid
#
# now consolidate multiple inputs to send out
bitcoind.supply_wallet.generatetoaddress(1, bitcoind.supply_wallet.getnewaddress()) # mine above
all_of_it = watch_only.getbalance()
unspent = watch_only.listunspent()
assert len(unspent) == split_to
ins = [{"txid": u["txid"], "vout": u["vout"]} for u in unspent]
if sequence:
for i in ins:
i["sequence"] = sequence
psbt_resp = watch_only.walletcreatefundedpsbt(
ins,
[{bitcoind.supply_wallet.getnewaddress(): all_of_it - 2}], # slightly less so we still have some change
locktime,
{"fee_rate": 2, "change_type": "bech32m"},
)
psbt1 = psbt_resp.get("psbt")
if not coldcard_first:
# cosigners adding nonces
for s in core_signers:
psbt_resp = s.walletprocesspsbt(psbt1, True, "DEFAULT", True, False)
psbt1 = psbt_resp.get("psbt")
# CC adds nonces
start_sign(base64.b64decode(psbt1))
title, story = cap_story()
assert "Consolidating" not in story
assert "Change back:" in story # has one change address
assert f"Wallet: {wallet_name}" in story
res_psbt1 = end_sign(exit_export_loop=False)
time.sleep(.1)
title, story = cap_story()
if not cc_first_no_sigs_added:
assert "PSBT Signed" == title
else:
assert "PSBT Updated" == title
press_cancel() # exit export loop
b64_res_psbt1 = base64.b64encode(res_psbt1).decode()
if coldcard_first:
# if cc was first to add pubnonce - now core cosigners will add nonces
for s in core_signers:
psbt_resp = s.walletprocesspsbt(b64_res_psbt1, True, "DEFAULT", True, False)
b64_res_psbt1 = psbt_resp.get("psbt")
# cosigners adding signatures
for s in core_signers[signers_start: signers_end]:
psbt_resp = s.walletprocesspsbt(b64_res_psbt1, True, "DEFAULT", True, False)
b64_res_psbt1 = psbt_resp.get("psbt")
final_txn1 = None
cc_txid1 = None
# CC adds signatures
# go via SD, as we want to see both PSBT and finalized tx
fname = f"{wallet_name}.psbt"
fpath = microsd_path(fname)
with open(fpath, "w") as f:
f.write(b64_res_psbt1)
garbage_collector.append(fpath)
goto_home()
pick_menu_item("Ready To Sign")
time.sleep(.1)
title, story = cap_story()
if "OK TO SEND?" not in title:
time.sleep(0.1)
pick_menu_item(fname)
time.sleep(0.1)
title, story = cap_story()
assert title == "OK TO SEND?"
assert "Consolidating" not in story
assert "Change back:" in story # has one change address
press_select() # confirm signing
time.sleep(0.5)
title, story = cap_story()
assert "PSBT Signed" == title
assert "Updated PSBT is:" in story
press_select()
split_story = story.split("\n\n")
fname_psbt = split_story[1]
fpath_psbt = microsd_path(fname_psbt)
with open(fpath_psbt, "r") as f:
b64_res_psbt1 = f.read().strip()
garbage_collector.append(fpath_psbt)
if finalized:
fname_txn = split_story[3]
cc_txid1 = split_story[4].split("\n")[-1]
fpath_txn = microsd_path(fname_txn)
with open(fpath_txn, "r") as f:
final_txn1 = f.read().strip()
garbage_collector.append(fpath_txn)
res1 = watch_only.finalizepsbt(b64_res_psbt1)
assert res1["complete"]
tx_hex1 = res1["hex"]
if coldcard_first and finalized:
assert tx_hex1 == final_txn1
if sequence and not finalized:
# we are signing for timelocked tapscript
res = watch_only.testmempoolaccept([tx_hex1])
assert res[0]["allowed"] is False
assert res[0]['reject-reason'] == 'non-BIP68-final'
bitcoind.supply_wallet.generatetoaddress(sequence, bitcoind.supply_wallet.getnewaddress())
res = watch_only.testmempoolaccept([tx_hex1])
assert res[0]["allowed"]
res = watch_only.sendrawtransaction(tx_hex1)
assert len(res) == 64 # tx id
if coldcard_first and finalized:
assert res == cc_txid1
return doit
@pytest.mark.bitcoind
@pytest.mark.parametrize("tapscript", [1, False, 2, 3])
@pytest.mark.parametrize("ts_level", [0, 1])
@pytest.mark.parametrize("cc_first", [True, False])
@pytest.mark.parametrize("originless", [True, False]) # co-signer keys do not include origin derivation
def test_musig(tapscript, ts_level, cc_first, clear_miniscript, microsd_path, use_regtest,
address_explorer_check, get_cc_key, import_miniscript, bitcoin_core_signer,
import_duplicate, press_select, create_core_wallet, garbage_collector,
musig_signing, originless):
use_regtest()
# derivation not allowed inside musig
core_pubkeys = []
core_privkeys = []
signers = []
for i in range(2):
# core signers
signer, core_pk, core_sk = bitcoin_core_signer(f"musig-co-signer{i}", privkey=True)
signer.keypoolrefill(25)
core_pk = core_pk.replace("/0/*", "")
if originless:
core_pk = core_pk.split("]")[-1]
core_sk = core_sk.replace("/0/*", "")
core_pubkeys.append(core_pk)
core_privkeys.append(core_sk)
signers.append(signer)
cc_key = get_cc_key("86h/1h/0h").replace("/<0;1>/*", "")
inner = "musig(%s)/<2;3>/*" % ",".join([cc_key] + core_pubkeys)
sequence = None
cc_first_no_sigs_added = True
if tapscript:
if tapscript == 1:
# musig in tapscript
s0 = f"pk(musig({cc_key},{core_pubkeys[1]})/<2;3>/*)"
s1 = f"pk(musig({cc_key},{core_pubkeys[0]})/<2;3>/*)"
s2 = f"pk(musig({core_pubkeys[0]},{core_pubkeys[1]})/<2;3>/*)"
elif tapscript == 2:
# classic multisig in tapscript
s0 = f"sortedmulti_a(2,{cc_key}/<2;3>/*,{core_pubkeys[1]}/<2;3>/*)"
s1 = f"sortedmulti_a(2,{cc_key}/<2;3>/*,{core_pubkeys[0]}/<2;3>/*)"
s2 = f"sortedmulti_a(2,{core_pubkeys[0]}/<2;3>/*,{core_pubkeys[1]}/<2;3>/*)"
# we will add signatures for classic multisig leafs, so title will be PSBT Signed (not PSBT Updated)
cc_first_no_sigs_added = False
elif tapscript == 3:
# time-locked musig in tapscript
sequence = 10
s0 = f"and_v(v:pk(musig({cc_key},{core_pubkeys[1]})/<2;3>/*),older(10))"
s1 = f"and_v(v:pk(musig({cc_key},{core_pubkeys[0]})/<2;3>/*),older(10))"
s2 = f"and_v(v:pk(musig({core_pubkeys[0]},{core_pubkeys[1]})/<2;3>/*),older(10))"
else:
raise NotImplementedError
inner += ","
# in tapscript we're always signing only with signer[1]
# only one core signer to not satisfy musig in internal key & actually test tapscript
# ts_level decides whether "signable leaf" is at depth 0 or 1
if ts_level:
# signable tapscript leaf (s0) at level 1
inner += "{%s,{%s,%s}}" % (s2,s1,s0)
else:
# signable tapscript leaf (s0) at level 0
inner += "{%s,{%s,%s}}" % (s0,s1,s2)
desc = f"tr({inner})"
clear_miniscript()
name = "musig"
fname = f"{name}.txt"
fpath = microsd_path(fname)
with open(fpath, "w") as f:
f.write(desc)
garbage_collector.append(fpath)
_, story = import_miniscript(fname)
assert "Create new miniscript wallet?" in story
# do some checks on policy --> helper function to replace keys with letters
press_select()
import_duplicate(fname)
wo = create_core_wallet(name, "bech32m", "sd", True)
desc_lst = []
for obj in wo.listdescriptors()["descriptors"]:
del obj["next"]
del obj["next_index"]
desc_lst.append(obj)
# import musig descriptor to signers
# each signer has it's own privkey loaded
for s, spk, ssk in zip(signers, core_pubkeys, core_privkeys):
to_import = copy.deepcopy(desc_lst)
for dobj in to_import:
dobj["desc"] = dobj["desc"].split("#")[0].replace(spk, ssk)
csum = wo.getdescriptorinfo(dobj["desc"])["checksum"]
dobj["desc"] = dobj["desc"] + "#" + csum
res = s.importdescriptors(to_import)
for o in res:
assert o["success"]
if tapscript:
# sign with just one core signer + CC
# all the leafs have 2of2, only internal key has 3of3, so enough to produce finalizable tx
_from, _to = 1, 2
else:
# signing with internal key - needs all signatures
_from, _to = 0, 2
musig_signing(name, wo, signers, cc_first, _from, _to, finalized=not tapscript, split_to=10,
sequence=sequence, cc_first_no_sigs_added=cc_first_no_sigs_added)
# check addresses are correct
address_explorer_check("sd", "bech32m", wo, name)
@pytest.mark.bitcoind
@pytest.mark.parametrize("N_K", [(5,3),(6,4), (10,9)])
@pytest.mark.parametrize("tapscript", [True, False])
@pytest.mark.parametrize("cc_first", [True, False])
def test_musig_big(N_K, cc_first, tapscript, clear_miniscript, use_regtest, address_explorer_check,
build_musig_wallet, musig_signing):
num_signers, threshold = N_K
use_regtest()
clear_miniscript()
# how many signers need to sing in different situations
# if only internal key musig, all must sign so from will be 0 and to len(signers)
if tapscript:
_from, _to = 1, threshold
else:
_from, _to = 0, num_signers
name = "big_musig"
wo, signers, desc = build_musig_wallet(name, num_signers, tapscript=tapscript,
tree_design=random.choice(["left_heavy", "right_heavy"]), # not balanced tree
tapscript_musig_threshold=threshold)
musig_signing(name, wo, signers, cc_first, _from, _to, finalized=not tapscript, split_to=20)
# check addresses are correct
address_explorer_check("sd", "bech32m", wo, name)
@pytest.mark.bitcoind
@pytest.mark.parametrize("N_K", [(3,2),(4,3)])
@pytest.mark.parametrize("cc_first", [True, False])
def test_musig_alt(N_K, cc_first, clear_miniscript, use_regtest, address_explorer_check,
build_musig_wallet, musig_signing):
num_signers, threshold = N_K
use_regtest()
clear_miniscript()
name = "alt_musig"
wo, signers, desc = build_musig_wallet(name, num_signers, tapscript=True, wallet_type=1,
tapscript_musig_threshold=threshold)
# we may finalize, but only randomly as we have no idea whether our key will be in the internal key
# key order is randomized in build musig wallet
musig_signing(name, wo, signers, cc_first, 1, threshold, finalized=False, split_to=5,
cc_first_no_sigs_added=False)
# check addresses are correct
address_explorer_check("sd", "bech32m", wo, name)
@pytest.mark.parametrize("tapscript", [
"tr($H,and_v(v:pk(musig($0,$1,$2)/<0;1>/*),after(120)))",
"tr($H,and_v(vc:pk_k(musig($0,$1,$2)/<0;1>/*),after(120)))",
"tr($H,and_v(v:pkh(musig($0,$1,$2)/<0;1>/*),after(120)))",
"tr($H,and_v(vc:pk_h(musig($0,$1,$2)/<0;1>/*),after(120)))",
"tr($H,{and_v(v:pk(musig($0,$2)/0/*),after(120)),and_v(v:pk(musig($1,$2)/0/*),after(120))})",
])
def test_miniscript_musig_variations(tapscript, clear_miniscript, use_regtest, address_explorer_check,
build_musig_wallet, musig_signing):
num_signers = 3
use_regtest()
clear_miniscript()
name = "mini_tap"
wo, signers, desc = build_musig_wallet(name, num_signers, tapscript=tapscript)
musig_signing(name, wo, signers, False, 0, num_signers, finalized=False,
split_to=4, locktime=120)
# check addresses are correct
address_explorer_check("sd", "bech32m", wo, name)
def test_resign_musig_psbt_nonce(use_regtest, clear_miniscript, build_musig_wallet, start_sign,
cap_story, end_sign, press_cancel):
use_regtest()
clear_miniscript()
name = "musig_resign_nonce"
wo, signers, desc = build_musig_wallet(name, 3, tapscript=True,
tapscript_musig_threshold=2)
psbt_resp = wo.walletcreatefundedpsbt([], [{wo.getnewaddress("", "bech32m"): 5}], 0,
{"fee_rate": 2, "change_type": "bech32m"})
# nothing added yet
empty_psbt = psbt_resp.get("psbt")
start_sign(base64.b64decode(empty_psbt))
title, story = cap_story()
assert "Consolidating" in story
assert f"Wallet: {name}" in story
res_psbt = end_sign()
# resign empty PSBT - even thou CC already has session rand stored for this TX
# FAIL
start_sign(base64.b64decode(empty_psbt))
title, story = cap_story()
assert "Consolidating" in story
assert f"Wallet: {name}" in story
with pytest.raises(Exception) as err:
end_sign()
assert err.value.args[0] == "Coldcard Error: Signing failed late"
time.sleep(.1)
title, story = cap_story()
assert "resign" in story
po = BasicPSBT().parse(res_psbt)
# we added nonce for all the leafs we're part of
assert len(po.inputs[0].musig_pubnonces) == 3
# no signature was added
assert len(po.inputs[0].musig_part_sigs) == 0
my_nonce_psbt = po.as_b64_str()
# provide same PSBT to coldcard - one where it already provided pubnonces
# this causes - session rand to be dropped from cache, so even if this works
# subsequent signature providing will fail
start_sign(base64.b64decode(my_nonce_psbt))
title, story = cap_story()
assert "Consolidating" in story
assert f"Wallet: {name}" in story
with pytest.raises(Exception) as err:
end_sign()
assert err.value.args[0] == "Coldcard Error: Signing failed late"
time.sleep(.1)
title, story = cap_story()
assert "musig needs restart" in story
press_cancel()
def test_resign_musig_psbt_sig(use_regtest, clear_miniscript, build_musig_wallet, start_sign,
cap_story, end_sign, press_cancel):
use_regtest()
clear_miniscript()
name = "musig_resign_sig"
wo, signers, desc = build_musig_wallet(name, 3, tapscript=True,
tapscript_musig_threshold=2)
psbt_resp = wo.walletcreatefundedpsbt([], [{wo.getnewaddress("", "bech32m"): 3}], 0,
{"fee_rate": 3, "change_type": "bech32m"})
# nothing added yet
empty_psbt = psbt_resp.get("psbt")
# add our nonces
start_sign(base64.b64decode(empty_psbt))
title, story = cap_story()
assert "Consolidating" in story
assert f"Wallet: {name}" in story
res_psbt = end_sign()
po = BasicPSBT().parse(res_psbt)
# nothing was added - still just 3 nonce from first run
assert len(po.inputs[0].musig_pubnonces) == 3
# still no signature was added
assert len(po.inputs[0].musig_part_sigs) == 0
# cosigners adding nonces
full_nonce_psbt = po.as_b64_str()
for s in signers:
psbt_resp = s.walletprocesspsbt(full_nonce_psbt, True, "DEFAULT", True, False)
full_nonce_psbt = psbt_resp.get("psbt")
start_sign(base64.b64decode(full_nonce_psbt))
title, story = cap_story()
assert "Consolidating" in story
assert f"Wallet: {name}" in story
res_psbt = end_sign()
po = BasicPSBT().parse(res_psbt)
# all nonces added at this point
assert len(po.inputs[0].musig_pubnonces) == 9
# coldcard also added partial signatures - as all pubnonces were already available
assert len(po.inputs[0].musig_part_sigs) == 3
# resign PSBT that we have already signed
start_sign(base64.b64decode(po.as_b64_str()))
title, story = cap_story()
assert "Consolidating" in story
assert f"Wallet: {name}" in story
res_psbt = end_sign()
# nothing changed
po = BasicPSBT().parse(res_psbt)
assert len(po.inputs[0].musig_pubnonces) == 9
assert len(po.inputs[0].musig_part_sigs) == 3
final_psbt = po.as_b64_str()
for s in signers:
psbt_resp = s.walletprocesspsbt(final_psbt, True, "DEFAULT", True, False) # do not finalize
final_psbt = psbt_resp.get("psbt")
res = wo.finalizepsbt(final_psbt)
assert res["complete"]
tx_hex = res["hex"]
res = wo.testmempoolaccept([tx_hex])
assert res[0]["allowed"]
res = wo.sendrawtransaction(tx_hex)
assert len(res) == 64 # tx id
def test_identical_musig_fragments(use_regtest, bitcoin_core_signer, get_cc_key, clear_miniscript,
offer_minsc_import, press_select, create_core_wallet, start_sign,
end_sign, cap_story, bitcoind):
# three identical musig in descriptor - one in internal key, other two in tapscript leaves
# CC provides signature for internal key & one ONLY one tapleaf as tapleafs are completely same (even sighash is the same)
use_regtest()
core_pubkeys = []
core_privkeys = []
signers = []
for i in range(2):
# core signers
signer, core_pk, core_sk = bitcoin_core_signer(f"musig-co-signer{i}", privkey=True)
signer.keypoolrefill(25)
core_pubkeys.append(core_pk.replace("/0/*", ""))
core_privkeys.append(core_sk.replace("/0/*", ""))
signers.append(signer)
cc_key = get_cc_key("86h/1h/0h").replace("/<0;1>/*", "")
msig = "musig(%s)" % ",".join([cc_key] + core_pubkeys)
desc = f"tr({msig}/<0;1>/*,{{pk({msig}/<0;1>/*),pk({msig}/<0;1>/*)}})"
clear_miniscript()
name = "ident_musig"
_, story = offer_minsc_import(json.dumps(dict(name=name, desc=desc)))
assert "Create new miniscript wallet?" in story
# do some checks on policy --> helper function to replace keys with letters
press_select()
wo = create_core_wallet(name, "bech32m", funded=True)
desc_lst = []
for obj in wo.listdescriptors()["descriptors"]:
del obj["next"]
del obj["next_index"]
desc_lst.append(obj)
# import musig descriptor to signers
# each signer has it's own privkey loaded
for s, spk, ssk in zip(signers, core_pubkeys, core_privkeys):
to_import = copy.deepcopy(desc_lst)
for dobj in to_import:
dobj["desc"] = dobj["desc"].split("#")[0].replace(spk, ssk)
csum = wo.getdescriptorinfo(dobj["desc"])["checksum"]
dobj["desc"] = dobj["desc"] + "#" + csum
res = s.importdescriptors(to_import)
for o in res:
assert o["success"]
psbt_resp = wo.walletcreatefundedpsbt([], [{wo.getnewaddress("", "bech32m"): 5}], 0,
{"fee_rate": 2, "change_type": "bech32m"})
# nothing added yet
psbt = psbt_resp.get("psbt")
start_sign(base64.b64decode(psbt))
title, story = cap_story()
assert "Consolidating" in story
assert f"Wallet: {name}" in story
res_psbt = end_sign()
po = BasicPSBT().parse(res_psbt)
assert len(po.inputs[0].musig_pubnonces) == 2
assert len(po.inputs[0].musig_part_sigs) == 0
full_nonce_psbt = po.as_b64_str()
for s in signers:
psbt_resp = s.walletprocesspsbt(full_nonce_psbt, True, "DEFAULT", True, False)
full_nonce_psbt = psbt_resp.get("psbt")
po = BasicPSBT().parse(base64.b64decode(full_nonce_psbt))
assert len(po.inputs[0].musig_pubnonces) == 6
assert len(po.inputs[0].musig_part_sigs) == 0
start_sign(po.as_bytes())
title, story = cap_story()
assert "Consolidating" in story
assert f"Wallet: {name}" in story
res_psbt = end_sign()
po = BasicPSBT().parse(res_psbt)
assert len(po.inputs[0].musig_pubnonces) == 6
assert len(po.inputs[0].musig_part_sigs) == 2
final_psbt = po.as_b64_str()
for s in signers:
psbt_resp = s.walletprocesspsbt(final_psbt, True, "DEFAULT", True, False)
final_psbt = psbt_resp.get("psbt")
po = BasicPSBT().parse(base64.b64decode(final_psbt))
assert len(po.inputs[0].musig_pubnonces) == 6
assert len(po.inputs[0].musig_part_sigs) == 6
res = wo.finalizepsbt(po.as_b64_str())
assert res["complete"]
tx_hex = res["hex"]
res = wo.testmempoolaccept([tx_hex])
assert res[0]["allowed"]
res = wo.sendrawtransaction(tx_hex)
assert len(res) == 64 # tx id
def test_identical_musig_subder(use_regtest, bitcoin_core_signer, get_cc_key, clear_miniscript,
offer_minsc_import, press_select, create_core_wallet,
musig_signing, address_explorer_check):
# TODO bitcoin-core bitching that this descriptor is not sane because it contains duplicate public keys
# needs https://github.com/bitcoin/bitcoin/pull/34697 (or something less buggy)
# identical musig in one tapleaf, but musig subderivation differs, i.e. different key
raise pytest.skip("needs updated bitcoind")
use_regtest()
core_pubkeys = []
core_privkeys = []
signers = []
for i in range(2):
# core signers
signer, core_pk, core_sk = bitcoin_core_signer(f"musig-co-signer{i}", privkey=True)
signer.keypoolrefill(25)
core_pubkeys.append(core_pk.replace("/0/*", ""))
core_privkeys.append(core_sk.replace("/0/*", ""))
signers.append(signer)
cc_key = get_cc_key("86h/1h/0h").replace("/<0;1>/*", "")
msig = "musig(%s)" % ",".join([cc_key] + core_pubkeys)
desc = f"tr({ranged_unspendable_internal_key()},and_v(v:pk({msig}/<0;1>/*),pk({msig}/<2;3>/*)))"
clear_miniscript()
name = "ident_musig_subder"
_, story = offer_minsc_import(json.dumps(dict(name=name, desc=desc)))
assert "Create new miniscript wallet?" in story
# do some checks on policy --> helper function to replace keys with letters
press_select()
wo = create_core_wallet(name, "bech32m", funded=True)
desc_lst = []
for obj in wo.listdescriptors()["descriptors"]:
del obj["next"]
del obj["next_index"]
desc_lst.append(obj)
# import musig descriptor to signers
# each signer has it's own privkey loaded
for s, spk, ssk in zip(signers, core_pubkeys, core_privkeys):
to_import = copy.deepcopy(desc_lst)
for dobj in to_import:
dobj["desc"] = dobj["desc"].split("#")[0].replace(spk, ssk)
csum = wo.getdescriptorinfo(dobj["desc"])["checksum"]
dobj["desc"] = dobj["desc"] + "#" + csum
res = s.importdescriptors(to_import)
for o in res:
assert o["success"]
musig_signing(name, wo, signers, True, 0, 3, finalized=False,
split_to=2)
# check addresses are correct
address_explorer_check("sd", "bech32m", wo, name)
def test_multiple_musig_sessions_simple(use_regtest, clear_miniscript, build_musig_wallet,
start_sign, end_sign, cap_story, garbage_collector,
goto_home, microsd_path, pick_menu_item, press_select):
use_regtest()
clear_miniscript()
# below wallets have identical structure, but the keys differ
# testing our session cache logic
wo0, signers0, desc0 = build_musig_wallet("wal0", 4, tapscript=True,
tapscript_musig_threshold=3)
wo1, signers1, desc1 = build_musig_wallet("wal1", 4, tapscript=True,
tapscript_musig_threshold=3)
wo2, signers2, desc2 = build_musig_wallet("wal2", 4, tapscript=True,
tapscript_musig_threshold=3)
psbts = [] # ordered 0,1,2
for wal in [wo0, wo1, wo2]:
psbt_resp = wal.walletcreatefundedpsbt([], [{wal.getnewaddress("", "bech32m"): 5}], 0,
{"fee_rate": 2, "change_type": "bech32m"})
psbts.append(psbt_resp.get("psbt"))
# initialize musig sessions for all three PSBTs
for i in range(3):
start_sign(base64.b64decode(psbts[i]))
title, story = cap_story()
assert "Consolidating" in story
assert f"Wallet: wal{i}" in story
res_psbt = end_sign()
po = BasicPSBT().parse(res_psbt)
assert len(po.inputs[0].musig_pubnonces) == 4 # internal key + 3 leafs out of 4
assert len(po.inputs[0].musig_part_sigs) == 0
psbts[i] = po.as_b64_str() # replace with updated PSBT
# add pubnonce from co-signers
for i, signers in enumerate([signers0, signers1, signers2]):
the_psbt = psbts[i]
for s in signers:
psbt_resp = s.walletprocesspsbt(the_psbt, True, "DEFAULT", True, False) # do not finalize
the_psbt = psbt_resp.get("psbt")
psbts[i] = the_psbt # update
for psbt in psbts:
po = BasicPSBT().parse(base64.b64decode(psbt))
assert len(po.inputs[0].musig_pubnonces) == (4*4) # each cosigner added 4 nonces (internal key + 3 leafs)
assert len(po.inputs[0].musig_part_sigs) == 0
# add signatures from co-signers
for i, signers in enumerate([signers0, signers1, signers2]):
the_psbt = psbts[i]
for s in signers:
psbt_resp = s.walletprocesspsbt(the_psbt, True, "DEFAULT", True, False) # do not finalize
the_psbt = psbt_resp.get("psbt")
psbts[i] = the_psbt # update
for i in range(2,-1,-1): # reverse order
fname = f"{i}.psbt"
fpath = microsd_path(fname)
with open(fpath, "w") as f:
f.write(psbts[i])
garbage_collector.append(fpath)
goto_home()
pick_menu_item("Ready To Sign")
time.sleep(.1)
title, story = cap_story()
if "OK TO SEND?" not in title:
time.sleep(0.1)
pick_menu_item(fname)
time.sleep(0.1)
title, story = cap_story()
assert title == "OK TO SEND?"
assert "Consolidating" in story
assert "Change back:" in story
press_select() # confirm signing
time.sleep(0.5)
title, story = cap_story()
assert "PSBT Signed" == title
assert "Updated PSBT is:" in story
press_select()
split_story = story.split("\n\n")
fname_psbt = split_story[1]
fpath_psbt = microsd_path(fname_psbt)
with open(fpath_psbt, "r") as f:
res_psbt = f.read().strip()
garbage_collector.append(fpath_psbt)
# finalize txn will be provided as internal key signing is done
fname_txn = split_story[3]
cc_txid = split_story[4].split("\n")[-1]
fpath_txn = microsd_path(fname_txn)
with open(fpath_txn, "r") as f:
final_txn = f.read().strip()
garbage_collector.append(fpath_txn)
# does not matter which wallet is used for finalization
res = wo0.finalizepsbt(res_psbt)
assert res["complete"]
tx_hex = res["hex"]
assert tx_hex == final_txn
res = wo0.testmempoolaccept([tx_hex])
assert res[0]["allowed"]
res = wo0.sendrawtransaction(tx_hex)
assert len(res) == 64 # tx id
assert res == cc_txid
def test_multiple_musig_sessions_identical_leave(use_regtest, clear_miniscript, build_musig_wallet,
start_sign, end_sign, cap_story, garbage_collector,
create_core_wallet, press_select, offer_minsc_import):
use_regtest()
clear_miniscript()
# below wallets have identical structure, but the keys differ
wo, signers, desc = build_musig_wallet("ww", 3, tapscript=True,
tapscript_musig_threshold=2, musig_subder="/<0;1>/*")
# replace musig internal key with unspendable
ik = ranged_unspendable_internal_key()
new_start = f"tr({ik}"
end_idx = desc.find("*")
desc = new_start + desc[end_idx+1:]
wal_name = "qqq"
_, story = offer_minsc_import(json.dumps(dict(name=wal_name, desc=desc)))
assert "Create new miniscript wallet?" in story
assert wal_name in story
# do some checks on policy --> helper function to replace keys with letters
press_select()
wo0 = create_core_wallet(wal_name, "bech32m", "sd", True)
psbts = []
for wal in [wo, wo0]:
psbt_resp = wal.walletcreatefundedpsbt([], [{wal.getnewaddress("", "bech32m"): 1.256}], 0,
{"fee_rate": 2, "change_type": "bech32m"})
psbts.append(psbt_resp.get("psbt"))
for i in range(2):
start_sign(base64.b64decode(psbts[i]))
title, story = cap_story()
assert "Consolidating" in story
res_psbt = end_sign()
po = BasicPSBT().parse(res_psbt)
psbts[i] = po.as_b64_str() # replace with updated PSBT
# add pubnonces from co-signers
for i in range(2):
the_psbt = psbts[i]
for s in signers:
psbt_resp = s.walletprocesspsbt(the_psbt, True, "DEFAULT", True, False) # do not finalize
the_psbt = psbt_resp.get("psbt")
psbts[i] = the_psbt # update
# add signatures from co-signers
for i in range(2):
the_psbt = psbts[i]
for s in signers:
psbt_resp = s.walletprocesspsbt(the_psbt, True, "DEFAULT", True, False) # do not finalize
the_psbt = psbt_resp.get("psbt")
psbts[i] = the_psbt # update
# finalize on CC
for i in range(2):
start_sign(base64.b64decode(psbts[i]))
title, story = cap_story()
assert "Consolidating" in story
res_psbt = end_sign()
res = wo0.finalizepsbt(base64.b64encode(res_psbt).decode())
assert res["complete"]
tx_hex = res["hex"]
res = wo0.testmempoolaccept([tx_hex])
assert res[0]["allowed"]
res = wo0.sendrawtransaction(tx_hex)
assert len(res) == 64 # tx id
@pytest.mark.parametrize("tapscript", [
False,
"tr($H,{and_v(v:pk(musig($0,$2)/0/*),after(120)),and_v(v:pk(musig($1,$2)/0/*),after(120))})",
])
@pytest.mark.parametrize("cc_first", [True, False])
@pytest.mark.parametrize("sighash", ["NONE", "SINGLE", "ALL|ANYONECANPAY", "NONE|ANYONECANPAY", "SINGLE|ANYONECANPAY"])
def test_exotic_sighash_musig(tapscript, clear_miniscript, use_regtest, address_explorer_check,
build_musig_wallet, start_sign, end_sign, cc_first, sighash,
cap_story, bitcoind, settings_set):
num_signers = 3
locktime = 120
use_regtest()
clear_miniscript()
settings_set("sighshchk", 1) # disable checks
name = "sighash_musig"
wo, signers, desc = build_musig_wallet(name, num_signers, tapscript=tapscript)
if tapscript:
_from, _to = 1, 2
else:
_from, _to = 0, num_signers
all_of_it = wo.getbalance()
unspent = wo.listunspent()
assert len(unspent) == 1
# split to
# use sighash ALL for consolidation
nVal = all_of_it / 4
conso_addrs = [{wo.getnewaddress("", "bech32m"): nVal} for _ in range(4)] # self-spend
psbt_resp = wo.walletcreatefundedpsbt(
[],
conso_addrs,
120 if tapscript else 0,
{"fee_rate": 2, "change_type": "bech32m", "subtractFeeFromOutputs": [0]},
)
psbt = psbt_resp.get("psbt")
if not cc_first:
# cosigners adding nonces
for s in signers:
psbt_resp = s.walletprocesspsbt(psbt, True, "ALL", True, False)
psbt = psbt_resp.get("psbt")
else:
# CC is going first, tweak sighash to ALL (only one working besides DEFAULT for conso tx)
po = BasicPSBT().parse(base64.b64decode(psbt))
for inp in po.inputs:
inp.sighash = SIGHASH_MAP["ALL"]
psbt = po.as_b64_str()
# CC add nonce
start_sign(base64.b64decode(psbt))
title, story = cap_story()
assert "warning" not in story
assert "Consolidating" in story
assert f"Wallet: {name}" in story
res_psbt = end_sign()
sighash_check(res_psbt, "ALL")
b64_res_psbt = base64.b64encode(res_psbt).decode()
if cc_first:
# if cc was first to add pubnonce - now core cosigners will add
for s in signers:
psbt_resp = s.walletprocesspsbt(b64_res_psbt, True, "ALL", True, False)
b64_res_psbt = psbt_resp.get("psbt")
# cosigners adding signatures - seems core is unable to add both nonce and signature in one iteration
for s in signers[_from: _to]:
psbt_resp = s.walletprocesspsbt(b64_res_psbt, True, "ALL", True, False)
b64_res_psbt = psbt_resp.get("psbt")
# CC add sig
start_sign(base64.b64decode(b64_res_psbt))
title, story = cap_story()
assert "warning" not in story
assert "Consolidating" in story
assert f"Wallet: {name}" in story
res_psbt = end_sign()
sighash_check(res_psbt, "ALL")
b64_res_psbt = base64.b64encode(res_psbt).decode()
res = wo.finalizepsbt(b64_res_psbt)
assert res["complete"]
tx_hex = res["hex"]
if tapscript:
# we are signing for timelocked tapscript
res = wo.testmempoolaccept([tx_hex])
assert res[0]["allowed"] is False
assert res[0]['reject-reason'] == "non-final"
block_h = wo.getblockchaininfo()["blocks"]
bitcoind.supply_wallet.generatetoaddress(locktime - block_h, bitcoind.supply_wallet.getnewaddress())
res = wo.testmempoolaccept([tx_hex])
assert res[0]["allowed"]
res = wo.sendrawtransaction(tx_hex)
assert len(res) == 64 # tx id
#
# now consolidate multiple inputs to send out
# we can check all sighash flags here as this is not pure consolidation
bitcoind.supply_wallet.generatetoaddress(1, bitcoind.supply_wallet.getnewaddress()) # mine above
unspent = wo.listunspent()
assert len(unspent) == 4
psbt_resp = wo.walletcreatefundedpsbt(
[],
[
{bitcoind.supply_wallet.getnewaddress(): 2},
{bitcoind.supply_wallet.getnewaddress(): 2},
{bitcoind.supply_wallet.getnewaddress(): 2},
{bitcoind.supply_wallet.getnewaddress(): 2},
],
locktime if tapscript else 0,
{"fee_rate": 2, "change_type": "bech32m"},
)
psbt1 = psbt_resp.get("psbt")
if not cc_first:
# cosigners adding nonces
for s in signers:
psbt_resp = s.walletprocesspsbt(psbt1, True, sighash, True, False)
psbt1 = psbt_resp.get("psbt")
else:
# CC is going first, tweak sighash
po = BasicPSBT().parse(base64.b64decode(psbt1))
for inp in po.inputs:
inp.sighash = SIGHASH_MAP[sighash]
psbt1 = po.as_b64_str()
# CC adds nonce only
start_sign(base64.b64decode(psbt1))
title, story = cap_story()
assert "Consolidating" not in story
assert "Change back:" in story # has one change address
assert "warning" in story
assert "sighash" in story
if sighash == "NONE":
assert sighash in story
else:
assert "Some inputs have unusual SIGHASH values"
assert f"Wallet: {name}" in story
res_psbt1 = end_sign()
sighash_check(res_psbt1, sighash)
b64_res_psbt1 = base64.b64encode(res_psbt1).decode()
if cc_first:
# if cc was first to add pubnonce - now core cosigners will add
for s in signers:
psbt_resp = s.walletprocesspsbt(b64_res_psbt1, True, sighash, True, False)
b64_res_psbt1 = psbt_resp.get("psbt")
# cosigners adding signatures
for s in signers[_from: _to]:
psbt_resp = s.walletprocesspsbt(b64_res_psbt1, True, sighash, True, False)
b64_res_psbt1 = psbt_resp.get("psbt")
# CC adds sig
start_sign(base64.b64decode(b64_res_psbt1))
title, story = cap_story()
assert "warning" in story
assert "sighash" in story
if sighash == "NONE":
assert sighash in story
else:
assert "Some inputs have unusual SIGHASH values"
assert f"Wallet: {name}" in story
res_psbt = end_sign()
sighash_check(res_psbt, sighash)
b64_res_psbt1 = base64.b64encode(res_psbt).decode()
res1 = wo.finalizepsbt(b64_res_psbt1)
assert res1["complete"]
tx_hex1 = res1["hex"]
res = wo.testmempoolaccept([tx_hex1])
assert res[0]["allowed"]
res = wo.sendrawtransaction(tx_hex1)
assert len(res) == 64 # tx id
# now try forbidden consolidation tx with exotic sighash - must fail
settings_set("sighshchk", 0) # enable checks
bitcoind.supply_wallet.generatetoaddress(1, bitcoind.supply_wallet.getnewaddress()) # mine above
al_of_it = wo.getbalance()
psbt_resp = wo.walletcreatefundedpsbt(
[],
[{wo.getnewaddress("", "bech32m"): al_of_it}],
120 if tapscript else 0,
{"fee_rate": 2, "change_type": "bech32m", "subtractFeeFromOutputs": [0]},
)
psbt = psbt_resp.get("psbt")
po = BasicPSBT().parse(base64.b64decode(psbt))
for inp in po.inputs:
inp.sighash = SIGHASH_MAP[sighash]
psbt = po.as_bytes()
start_sign(psbt)
title, story = cap_story()
assert "Failure" == title
assert "Only sighash ALL/DEFAULT is allowed for pure consolidation transactions." in story
def test_duplicate_musig_in_tapleaf(get_cc_key, offer_minsc_import):
path = "99h/1h/0h"
cc_key = get_cc_key(path).replace("/<0;1>/*", "")
keys = random_keys(2, path=path)
# duplicate musig in tapscript leaf
musig = f"musig({cc_key},{keys[0]},{keys[1]})"
desc = f"tr({ranged_unspendable_internal_key()},and_v(v:pk({musig}),pk({musig})))"
with pytest.raises(Exception) as e:
offer_minsc_import(desc)
assert e.value.args[0] == "Coldcard Error: Insane"
def test_unspendable_key_in_musig(get_cc_key, offer_minsc_import):
path = "199h/1h/3h"
cc_key = get_cc_key(path).replace("/<0;1>/*", "")
keys = random_keys(2, path=path)
unspend = ranged_unspendable_internal_key(subderiv="")
# duplicate musig in tapscript leaf
musig = f"musig({cc_key},{keys[0]},{unspend})"
data = [
f"tr({musig})",
f"tr({keys[1]},pk({musig}))",
]
for desc in data:
with pytest.raises(Exception) as e:
offer_minsc_import(desc)
assert e.value.args[0] == "Coldcard Error: unspendable key inside musig"
def test_musig_outside_taproot_context(get_cc_key, offer_minsc_import):
# musig only allowed in taproot - whether internal key or tapscript
path = "1001h/1h/99h"
cc_key = get_cc_key(path).replace("/<0;1>/*", "")
keys = random_keys(2, path=path)
inner = f"musig({cc_key},{keys[0]},{keys[1]}"
data = [
f"wsh(pk({inner}))",
f"sh(wsh(pk({inner})))",
f"sh(pk({inner}))",
]
for desc in data:
with pytest.raises(Exception) as e:
offer_minsc_import(desc)
assert e.value.args[0] == "Coldcard Error: musig in non-taproot context"
def test_nested_musig(get_cc_key, offer_minsc_import):
# musig key expression nested in another key expression is not allowed
path = "99h/1h/0h"
cc_key = get_cc_key(path).replace("/<0;1>/*", "")
keys = random_keys(2, path=path)
data = [
f"tr(musig({cc_key},{keys[0]},musig({cc_key},{keys[0]},{keys[1]})))",
f"tr({cc_key},pk(musig({cc_key},{keys[0]},musig({cc_key},{keys[0]},{keys[1]}))))",
]
for desc in data:
with pytest.raises(Exception) as e:
offer_minsc_import(desc)
assert e.value.args[0] == "Coldcard Error: nested musig not allowed"
def test_key_derivation_not_allowed_inside_musig(get_cc_key, offer_minsc_import):
# only whole musig key expression can have key derivation, not single keys
path = "86h/1h/3h"
cc_key = get_cc_key(path).replace("/<0;1>/*", "")
keys = random_keys(2, path=path)
data = [
f"tr(musig({cc_key}/<0;1>/*,{keys[0]},{keys[1]}))", # internal key
f"tr({cc_key},pk(musig({cc_key}/<0;1>/*,{keys[0]}/<0;1>/*,{keys[1]}/<0;1>/*)))", # nested musig in tapscript
]
for desc in data:
with pytest.raises(Exception) as e:
offer_minsc_import(desc)
assert e.value.args[0] == "Coldcard Error: key derivation not allowed inside musig"
def test_hardened_musig_derivation(get_cc_key, offer_minsc_import):
# only whole musig key expression can have key derivation, not single keys
path = "88h/0h/0h"
cc_key = get_cc_key(path).replace("/<0;1>/*", "")
keys = random_keys(2, path=path)
data = [
f"tr(musig({cc_key},{keys[0]},{keys[1]})/1h/*)",
f"tr({cc_key},pk(musig({cc_key},{keys[0]},{keys[0]})/3h/*))",
]
for desc in data:
with pytest.raises(Exception) as e:
offer_minsc_import(desc)
assert e.value.args[0] == "Coldcard Error: Cannot use hardened sub derivation path"
def test_only_unique_keys_in_musig(get_cc_key, offer_minsc_import):
path = "88h/0h/0h"
cc_key = get_cc_key(path).replace("/<0;1>/*", "")
keys = random_keys(1, path=path)
data = [
f"tr(musig({cc_key},{keys[0]},{keys[0]}))", # internal key non-unique foreign keys
f"tr(musig({cc_key},{cc_key},{keys[0]}))", # internal key non-unique own keys
f"tr({cc_key},pk(musig({cc_key},{keys[0]},{keys[0]})))", # tapscript key non-unique foreign keys
f"tr({cc_key},pk(musig({cc_key},{cc_key},{keys[0]})))", # tapscript key non-unique own keys
]
for desc in data:
with pytest.raises(Exception) as e:
offer_minsc_import(desc)
assert e.value.args[0] == "Coldcard Error: musig keys not unique"
@pytest.mark.bitcoind
def test_tmp_seed_cosign(bitcoind, settings_set, end_sign, start_sign, restore_main_seed, use_regtest,
cap_story, goto_eph_seed_menu, pick_menu_item, word_menu_entry,
confirm_tmp_seed, usb_miniscript_get, offer_minsc_import, press_select,
clear_miniscript, get_cc_key, create_core_wallet):
# proves that we can hold secnonce for multiple seed types on one device (even for same txn where respective keys are co-signers)
b_words = "sight will strike aspect nerve saddle young special dragon fence chest tattoo"
use_regtest()
clear_miniscript()
name = "tmp_musig_cosign"
der_pth = "86h/1h/0h"
# it is string mnemonic
b39_seed = Mnemonic.to_seed(b_words)
b_node = BIP32Node.from_master_secret(b39_seed)
b_xfp = b_node.fingerprint().hex()
b_key = b_node.subkey_for_path(der_pth).hwif()
b_key_exp = f"[{b_xfp}/{der_pth}]{b_key}"
# C is just random key we won't use
c_node = BIP32Node.from_master_secret(os.urandom(32))
c_xfp = c_node.fingerprint().hex()
c_key = c_node.subkey_for_path(der_pth).hwif()
c_key_exp = f"[{c_xfp}/{der_pth}]{c_key}"
cc_key = get_cc_key("86h/1h/0h").replace("/<0;1>/*", "")
inner = "musig(%s)" % ",".join([cc_key, b_key_exp, c_key_exp])
s0 = f"pk(musig({cc_key},{b_key_exp}))"
s1 = f"pk(musig({cc_key},{c_key_exp}))"
s2 = f"pk(musig({c_key_exp},{b_key_exp}))"
inner += ","
inner += "{%s,{%s,%s}}" % (s0,s1,s2)
desc = f"tr({inner})"
title, story = offer_minsc_import(json.dumps(dict(name=name, desc=desc)))
assert "Create new miniscript wallet?" in story
# do some checks on policy --> helper function to replace keys with letters
press_select()
bitcoind_wo = create_core_wallet(name, "bech32m", "sd", True)
psbt = bitcoind_wo.walletcreatefundedpsbt(
[], [{bitcoind.supply_wallet.getnewaddress(): 0.2},
{bitcoind.supply_wallet.getnewaddress(): 0.255}],
0, {"fee_rate": 20, "change_type": "bech32m"}
)['psbt']
start_sign(base64.b64decode(psbt))
time.sleep(.1)
title, story = cap_story()
assert 'OK TO SEND?' == title
signed = end_sign(accept=True)
po = BasicPSBT().parse(signed)
assert not po.inputs[0].musig_part_sigs
assert po.inputs[0].musig_pubnonces
goto_eph_seed_menu()
pick_menu_item("Import Words")
pick_menu_item("12 Words")
time.sleep(0.1)
word_menu_entry(b_words.split())
confirm_tmp_seed(seedvault=False)
title, story = offer_minsc_import(desc)
assert "Create new miniscript wallet?" in story
press_select()
start_sign(signed)
time.sleep(.1)
title, story = cap_story()
assert 'OK TO SEND?' == title
assert "warning" not in story
signed = end_sign(accept=True)
po = BasicPSBT().parse(signed)
# now we should have all pubnonces that we need
assert len(po.inputs[0].musig_part_sigs) == 0
assert po.inputs[0].musig_pubnonces
# 2nd round - get signature
start_sign(signed)
time.sleep(.1)
title, story = cap_story()
assert 'OK TO SEND?' == title
assert "warning" not in story
signed = end_sign(accept=True)
po = BasicPSBT().parse(signed)
# now we should have signature one signature
assert len(po.inputs[0].musig_part_sigs) == 1
try:
# this is run with --eff so number of settings may be incorrect - no prob
restore_main_seed()
except: pass
start_sign(signed)
time.sleep(.1)
title, story = cap_story()
assert 'OK TO SEND?' == title
signed = end_sign(accept=True)
po = BasicPSBT().parse(signed)
assert len(po.inputs[0].musig_part_sigs) == 2
assert po.inputs[0].musig_pubnonces
# 1 aggregate sig for aggregated musig leaf
assert len(po.inputs[0].taproot_script_sigs) == 1
res = bitcoind_wo.finalizepsbt(base64.b64encode(signed).decode())
assert res["complete"]
tx_hex = res["hex"]
res = bitcoind_wo.testmempoolaccept([tx_hex])
assert res[0]["allowed"]
res = bitcoind_wo.sendrawtransaction(tx_hex)
assert len(res) == 64 # tx id
@pytest.mark.bitcoind
@pytest.mark.parametrize("cc_in_musig", [True, False])
def test_musig_in_thresh(cc_in_musig, clear_miniscript, offer_minsc_import, use_regtest, bitcoind,
address_explorer_check, get_cc_key, bitcoin_core_signer, import_duplicate,
press_select, end_sign, create_core_wallet, start_sign, cap_story,
press_cancel):
use_regtest()
sequence = 25
core_musig_pubkeys = []
core_musig_privkeys = []
musig_signers = []
for i in range(2):
signer, core_pk, core_sk = bitcoin_core_signer(f"musig-co-signer{i}", privkey=True)
signer.keypoolrefill(25)
core_pk = core_pk.replace("/0/*", "")
core_sk = core_sk.replace("/0/*", "")
core_musig_pubkeys.append(core_pk)
core_musig_privkeys.append(core_sk)
musig_signers.append(signer)
core_pubkeys = []
core_privkeys = []
signers = []
for i in range(2):
signer, core_pk, core_sk = bitcoin_core_signer(f"co-signer{i}", privkey=True)
signer.keypoolrefill(25)
core_pk = core_pk.replace("/0/*", "/<0;1>/*")
core_pubkeys.append(core_pk)
core_privkeys.append(core_sk)
signers.append(signer)
cc_key = get_cc_key("86h/1h/0h")
if cc_in_musig:
cc_key = cc_key.replace("/<0;1>/*", "")
musig = "musig(%s)/<0;1>/*" % ",".join([cc_key] + core_musig_pubkeys)
tapscript = f"thresh(3,pk({core_pubkeys[0]}),s:pk({core_pubkeys[1]}),s:pk({musig}),sln:older({sequence}))"
else:
musig = "musig(%s)/<0;1>/*" % ",".join(core_musig_pubkeys)
tapscript = f"thresh(3,pk({core_pubkeys[0]}),s:pk({cc_key}),s:pk({musig}),sln:older({sequence}))"
desc = f"tr({ranged_unspendable_internal_key()},{tapscript})"
clear_miniscript()
name = "musig_var"
_, story = offer_minsc_import(json.dumps(dict(name=name, desc=desc)))
assert "Create new miniscript wallet?" in story
press_select()
wo = create_core_wallet(name, "bech32m", "sd", 1)
desc_lst = []
for obj in wo.listdescriptors()["descriptors"]:
del obj["next"]
del obj["next_index"]
desc_lst.append(obj)
if cc_in_musig:
# import musig descriptor to signers
# each signer has it's own privkey loaded
for s, spk, ssk in zip(musig_signers, core_musig_pubkeys, core_musig_privkeys):
to_import = copy.deepcopy(desc_lst)
for dobj in to_import:
dobj["desc"] = dobj["desc"].split("#")[0].replace(spk, ssk)
csum = wo.getdescriptorinfo(dobj["desc"])["checksum"]
dobj["desc"] = dobj["desc"] + "#" + csum
res = s.importdescriptors(to_import)
for o in res:
assert o["success"]
unspent = wo.listunspent()
inp = [{"txid": u["txid"], "vout": u["vout"], "sequence": sequence} for u in unspent]
conso_addr = [{wo.getnewaddress("", "bech32m"): wo.getbalance()}] # self-spend
psbt_resp = wo.walletcreatefundedpsbt(inp, conso_addr, 0, {"fee_rate": 2,
"change_type": "bech32m",
"subtractFeeFromOutputs": [0]})
psbt = psbt_resp.get("psbt")
res_psbt = base64.b64decode(psbt)
if cc_in_musig:
# musig co-signers adding nonces
for s in musig_signers:
psbt_resp = s.walletprocesspsbt(psbt, True, "DEFAULT", True, False)
psbt = psbt_resp.get("psbt")
# CC add nonce
start_sign(base64.b64decode(psbt))
title, story = cap_story()
assert "Consolidating" in story
assert f"Wallet: {name}" in story
res_psbt = end_sign(exit_export_loop=False)
time.sleep(.1)
title, story = cap_story()
assert "PSBT Updated" == title
press_cancel() # exit export loop
po = BasicPSBT().parse(res_psbt)
for inp in po.inputs:
# all co-signers added nonces
assert len(inp.musig_pubnonces) == 3
# no signature was added
assert len(inp.musig_part_sigs) == 0
# CC add signature
start_sign(res_psbt)
title, story = cap_story()
assert "Consolidating" in story
assert f"Wallet: {name}" in story
res_psbt = end_sign(exit_export_loop=False)
time.sleep(.1)
title, story = cap_story()
assert "PSBT Signed" == title
press_cancel() # exit export loop
b64_res_psbt = base64.b64encode(res_psbt).decode()
if cc_in_musig:
# musig co-signers add signatures
for s in musig_signers:
psbt_resp = s.walletprocesspsbt(b64_res_psbt, True, "DEFAULT", True, False)
b64_res_psbt = psbt_resp.get("psbt")
po = BasicPSBT().parse(base64.b64decode(b64_res_psbt))
for inp in po.inputs:
# nonces from 1st round
assert len(inp.musig_pubnonces) == 3
# all co-signers added signatures
assert len(inp.musig_part_sigs) == 3
# now sign with one of the normal core signers that are not part of the musig
b64_res_psbt = signers[0].walletprocesspsbt(b64_res_psbt, True, "DEFAULT", True, False)["psbt"]
res = wo.finalizepsbt(b64_res_psbt)
assert res["complete"]
tx_hex = res["hex"]
# we are signing for timelocked tapscript
res = wo.testmempoolaccept([tx_hex])
assert res[0]["allowed"] is False
assert res[0]['reject-reason'] == 'non-BIP68-final'
bitcoind.supply_wallet.generatetoaddress(sequence, bitcoind.supply_wallet.getnewaddress())
res = wo.testmempoolaccept([tx_hex])
assert res[0]["allowed"]
res = wo.sendrawtransaction(tx_hex)
assert len(res) == 64 # tx id
# check addresses are correct
address_explorer_check("sd", "bech32m", wo, name)
@pytest.mark.parametrize("tapscript", [True, False])
def test_inputs_in_different_musig_rounds(tapscript, use_regtest, clear_miniscript, bitcoind,
build_musig_wallet, start_sign, end_sign, cap_story,
press_cancel):
use_regtest()
clear_miniscript()
name = "diff_round_musig"
wo, signers, desc = build_musig_wallet(name, 3, tapscript=tapscript,
tree_design="left_heavy", # not balanced tree
tapscript_musig_threshold=2, num_utxo_available=2)
unspent = wo.listunspent()
assert len(unspent) == 2
psbt_resp = wo.walletcreatefundedpsbt([], [{bitcoind.supply_wallet.getnewaddress(): 43.4389}],
0, {"fee_rate": 2, "change_type": "bech32m"})
psbt = psbt_resp.get("psbt")
# cosigners adding nonces
for s in signers:
psbt_resp = s.walletprocesspsbt(psbt, True, "DEFAULT", True, False)
psbt = psbt_resp.get("psbt")
# CC add nonce
start_sign(base64.b64decode(psbt))
title, story = cap_story()
assert "Consolidating" not in story
assert f"Wallet: {name}" in story
res_psbt = end_sign(exit_export_loop=False)
time.sleep(.1)
title, story = cap_story()
assert "PSBT Updated" == title
press_cancel() # exit export loop
po = BasicPSBT().parse(res_psbt)
for inp in po.inputs:
assert len(inp.musig_pubnonces) == (9 if tapscript else 3)
assert len(inp.musig_part_sigs) == 0
b64_res_psbt = po.as_b64_str()
for s in signers:
psbt_resp = s.walletprocesspsbt(b64_res_psbt, True, "DEFAULT", True, False)
b64_res_psbt = psbt_resp.get("psbt")
po = BasicPSBT().parse(base64.b64decode(b64_res_psbt))
for inp in po.inputs:
assert len(inp.musig_pubnonces) == (9 if tapscript else 3)
assert len(inp.musig_part_sigs) == (6 if tapscript else 2)
# remove nonces and signatures from 0th input, keep them with 1st input
# causing this PSBT to have inputs in both 1st & 2nd round (not allowed)
po.inputs[0].musig_pubnonces = {}
po.inputs[0].musig_part_sigs = {}
# CC add nonce
start_sign(po.as_bytes())
time.sleep(.1)
title, story = cap_story()
assert "Consolidating" not in story
assert f"Wallet: {name}" in story
with pytest.raises(Exception):
end_sign(exit_export_loop=False)
time.sleep(.1)
title, story = cap_story()
assert title == "Failure"
assert "resign" in story
@pytest.mark.parametrize("tapscript", [True, False])
def test_modify_PSBT_during_musig_signing(tapscript, use_regtest, clear_miniscript, bitcoind,
build_musig_wallet, start_sign, end_sign, cap_story):
use_regtest()
clear_miniscript()
name = "mod_musig"
wo, signers, desc = build_musig_wallet(name, 3, tapscript=tapscript,
tree_design="right_heavy", # not balanced tree
tapscript_musig_threshold=2, num_utxo_available=2)
unspent = wo.listunspent()
assert len(unspent) == 2
# spend first UTXO
inps = [{"txid": unspent[0]["txid"], "vout": unspent[0]["vout"]}]
psbt_resp = wo.walletcreatefundedpsbt(inps, [{bitcoind.supply_wallet.getnewaddress(): 5.12345}],
0, {"fee_rate": 2, "change_type": "bech32m"})
psbt = psbt_resp.get("psbt")
start_sign(base64.b64decode(psbt))
title, story = cap_story()
assert "Consolidating" not in story
assert f"Wallet: {name}" in story
res_psbt = end_sign()
po = BasicPSBT().parse(res_psbt)
# spend second UTXO
inps = [{"txid": unspent[1]["txid"], "vout": unspent[1]["vout"]}]
psbt_resp = wo.walletcreatefundedpsbt(inps, [{bitcoind.supply_wallet.getnewaddress(): 3.54321}],
0, {"fee_rate": 3, "change_type": "bech32m"})
psbt = psbt_resp.get("psbt")
po1 = BasicPSBT().parse(base64.b64decode(psbt))
# change to PSBT v2 to not need handle txn
x = BasicPSBT().parse(po.to_v2())
y = BasicPSBT().parse(po1.to_v2())
combined = BasicPSBT()
combined.version = 2
combined.txn_version = 2
combined.input_count = x.input_count + y.input_count
combined.output_count = x.output_count + y.output_count
combined.fallback_locktime = 0
combined.inputs = x.inputs + y.inputs
combined.outputs = x.outputs + y.outputs
start_sign(combined.as_bytes())
time.sleep(.1)
title, story = cap_story()
assert "Consolidating" not in story
assert f"Wallet: {name}" in story
with pytest.raises(Exception):
end_sign()
time.sleep(.1)
title, story = cap_story()
assert "musig needs restart" in story
@pytest.mark.parametrize("der", ["/**", "/<0;1>/*"])
def test_import_musig_from_b388_policy(der, microsd_path, garbage_collector, clear_miniscript,
pick_menu_item, need_keypress, cap_story, use_mainnet,
press_select, usb_miniscript_policy, goto_home,
usb_miniscript_get):
policy = """{
"name":"MuSig2",
"desc_template":"tr(musig(@0,@1,@2)%s,{{pk(musig(@0,@1)%s),pk(musig(@1,@2)%s)},pk(musig(@0,@2)%s)})",
"keys_info":[
"[a0d3c79c/48h/0h/0h/3h]xpub6DyCgL3TMMS3trskRyxP6M6iCqSUYPgC4QECKYkubC1L27aYUnPZJwvw57vK56G1t42KQHhX8MrU5zkt9nispkAMXYDLfiiSyrQ9HT1aQoy",
"[9a1c8a27/48h/0h/0h/3h]xpub6DmrCyYSAz65fVFRAPEsvLZ5jHZYeF6DvprfYp7754XD2srXQr8kAcayGKhakmWiNe7iVEpjmC9JuqicHAQZk1rpWijrvh2AEwL7WaietUN",
"[0f056943/48h/0h/0h/3h]xpub6FQgdFZAHcAeDMVe9KxWoLMxziCjscCExzuKJhRSjM71CA9dUDZEGNgPe4S2SsRumCBXeaTBZ5nKz2cMDiK4UEbGkFXNipHLkm46inpjE9D"]
}
"""
policy = policy % (der, der, der, der)
fname = "musig_b388.json"
fpath = microsd_path(fname)
with open(fpath, "w") as f:
f.write(policy)
clear_miniscript()
use_mainnet()
goto_home()
pick_menu_item("Settings")
pick_menu_item("Multisig/Miniscript")
pick_menu_item("Import")
need_keypress("1")
try:
pick_menu_item(fname)
except: pass
time.sleep(.1)
title, story = cap_story()
assert "Create new miniscript wallet" in story
assert "/**" in story
assert "/<0;1>/*" not in story
press_select()
pick_menu_item("MuSig2")
pick_menu_item("Descriptors")
pick_menu_item("BIP-388 Policy")
contents = usb_miniscript_policy("MuSig2")
assert "/**" in contents["desc_template"]
assert "/<0;1>/*" not in contents["desc_template"]
if der == "/<0;1>/*":
policy = policy.replace(der, "/**")
assert json.loads(policy) == contents
contents = usb_miniscript_get("MuSig2")
assert "/**" not in contents["desc"]
assert "/<0;1>/*" in contents["desc"]
# EOF