test fixes, multiproc for test_miniscript.py
This commit is contained in:
parent
9d31349298
commit
ab91463ec6
@ -640,11 +640,7 @@ def verify_qr_address(cap_screen_qr, cap_screen, is_q1):
|
||||
for c, line in zip("XXXXXXBACK", full_split):
|
||||
assert not line.endswith(c)
|
||||
|
||||
txt = ''.join(l for l in full_split if len(l)>4).replace('~', '')
|
||||
if txt:
|
||||
# just index remained
|
||||
int(txt)
|
||||
txt = None
|
||||
txt = None # most of the time there is no address
|
||||
else:
|
||||
if is_change:
|
||||
assert "CHANGE BACK" in full
|
||||
|
||||
@ -319,7 +319,7 @@ def main():
|
||||
help="run simulator instance in headless mode")
|
||||
parser.add_argument("--multiproc", action="store_true", default=False,
|
||||
help="Run tests & simulators in parallel")
|
||||
parser.add_argument("--num-proc", type=int, default=14,
|
||||
parser.add_argument("--num-proc", type=int, default=16,
|
||||
help="How many executors/simulators to run in parallel in --multiproc mode")
|
||||
parser.add_argument("--turbo", action="store_true", default=False,
|
||||
help="Both Mk4 and Q at the same time")
|
||||
@ -389,7 +389,14 @@ def main():
|
||||
if args.multiproc:
|
||||
start_time = time.time()
|
||||
def add_to_queue(module_name, simulator_args, queue):
|
||||
if module_name == "test_multisig.py":
|
||||
if module_name == "test_miniscript.py":
|
||||
queue.append((2, [module_name, simulator_args, "not liana_miniscripts_simple and not test_tapscript and not test_bitcoind_tapscript_address and not test_minitapscript", ""]))
|
||||
queue.append((0, [module_name, simulator_args, "liana_miniscripts_simple", "-sep1"]))
|
||||
queue.append((2, [module_name, simulator_args, "test_tapscript", "-sep2"]))
|
||||
queue.append((0, [module_name, simulator_args, "test_bitcoind_tapscript_address", "-sep3"]))
|
||||
queue.append((0, [module_name, simulator_args, "test_minitapscript", "-sep4"]))
|
||||
|
||||
elif module_name == "test_multisig.py":
|
||||
# split takes too much time
|
||||
queue.append((0, [module_name, simulator_args, "not tutorial and not airgapped and not ms_address and not descriptor_export", ""]))
|
||||
queue.append((0, [module_name, simulator_args, "airgapped", "-sep1"]))
|
||||
@ -403,7 +410,7 @@ def main():
|
||||
queue.append((0, [module_name, simulator_args, "not test_import_xor", ""]))
|
||||
|
||||
elif module_name in ["test_export.py", "test_ephemeral.py", "test_sign.py", "test_msg.py",
|
||||
"test_backup.py"]:
|
||||
"test_backup.py", "test_bsms.py"]:
|
||||
# higher priority
|
||||
queue.append((1, [module_name, simulator_args, None, ""]))
|
||||
|
||||
|
||||
@ -442,6 +442,159 @@ def address_explorer_check(goto_home, pick_menu_item, need_keypress, cap_menu,
|
||||
return doit
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bitcoind_miniscript(bitcoind, need_keypress, cap_story, load_export,
|
||||
pick_menu_item, goto_home, cap_menu, microsd_path,
|
||||
use_regtest, get_cc_key, import_miniscript,
|
||||
bitcoin_core_signer, import_duplicate, press_select,
|
||||
virtdisk_path, garbage_collector):
|
||||
def doit(M, N, script_type, internal_key=None, cc_account=0, funded=True,
|
||||
tapscript_threshold=False, add_own_pk=False, same_account=False, way="sd"):
|
||||
|
||||
use_regtest()
|
||||
bitcoind_signers = []
|
||||
bitcoind_signers_xpubs = []
|
||||
for i in range(N - 1):
|
||||
s, core_key = bitcoin_core_signer(f"bitcoind--signer{i}")
|
||||
s.keypoolrefill(10)
|
||||
bitcoind_signers.append(s)
|
||||
bitcoind_signers_xpubs.append(core_key)
|
||||
|
||||
# watch only wallet where multisig descriptor will be imported
|
||||
ms = bitcoind.create_wallet(
|
||||
wallet_name=f"watch_only_{script_type}_{M}of{N}", disable_private_keys=True,
|
||||
blank=True, passphrase=None, avoid_reuse=False, descriptors=True
|
||||
)
|
||||
me_pth = f"m/48h/1h/{cc_account}h/3h"
|
||||
me = get_cc_key(me_pth)
|
||||
ik = internal_key or ranged_unspendable_internal_key()
|
||||
|
||||
if tapscript_threshold:
|
||||
signers_xp = [me] + bitcoind_signers_xpubs
|
||||
assert len(signers_xp) == N
|
||||
desc = f"tr({ik},%s)"
|
||||
|
||||
scripts = []
|
||||
for c in itertools.combinations(signers_xp, M):
|
||||
tmplt = f"sortedmulti_a({M},{','.join(c)})"
|
||||
scripts.append(tmplt)
|
||||
|
||||
if len(scripts) > 8:
|
||||
while True:
|
||||
# just some of them but at least one has to have my key
|
||||
x = random.sample(scripts, 8)
|
||||
if any(me in s for s in x):
|
||||
scripts = x
|
||||
break
|
||||
|
||||
if add_own_pk:
|
||||
if len(scripts) < 8:
|
||||
if same_account:
|
||||
cc_key = get_cc_key(me_pth, subderiv="/<2;3>/*")
|
||||
else:
|
||||
cc_key = get_cc_key("m/86h/1h/1000h")
|
||||
cc_pk_leaf = f"pk({cc_key})"
|
||||
scripts.append(cc_pk_leaf)
|
||||
else:
|
||||
pytest.skip("Scripts full")
|
||||
|
||||
temp = TREE[len(scripts)]
|
||||
temp = temp % tuple(scripts)
|
||||
|
||||
desc = desc % temp
|
||||
|
||||
else:
|
||||
if add_own_pk:
|
||||
if same_account:
|
||||
ss = [get_cc_key(me_pth, subderiv="/<4;5>/*")] + bitcoind_signers_xpubs
|
||||
cc_key = get_cc_key(me_pth, subderiv="/<6;7>/*")
|
||||
else:
|
||||
ss = [get_cc_key("m/86h/1h/0h")] + bitcoind_signers_xpubs
|
||||
cc_key = get_cc_key("m/86h/1h/1000h")
|
||||
|
||||
tmplt = f"sortedmulti_a({M},{','.join(ss)})"
|
||||
cc_pk_leaf = f"pk({cc_key})"
|
||||
desc = f"tr({ik},{{{tmplt},{cc_pk_leaf}}})"
|
||||
else:
|
||||
desc = f"tr({ik},sortedmulti_a({M},{me},{','.join(bitcoind_signers_xpubs)}))"
|
||||
|
||||
name = "minisc"
|
||||
fname = None
|
||||
if way in ["sd", "vdisk"]:
|
||||
data = None
|
||||
fname = f"{name}.txt"
|
||||
path_f = microsd_path if way == 'sd' else virtdisk_path
|
||||
fpath = path_f(fname)
|
||||
with open(fpath, "w") as f:
|
||||
f.write(desc + "\n")
|
||||
garbage_collector.append(fpath)
|
||||
else:
|
||||
data = dict(name=name, desc=desc)
|
||||
|
||||
_, story = import_miniscript(fname, way=way, data=data)
|
||||
assert "Create new miniscript wallet?" in story
|
||||
assert name in story
|
||||
if script_type == "p2tr":
|
||||
assert "Taproot internal key" in story
|
||||
assert "Tapscript" in story
|
||||
assert "Press (1) to see extended public keys" in story
|
||||
if script_type == "p2wsh":
|
||||
assert "P2WSH" in story
|
||||
elif script_type == "p2sh":
|
||||
assert "P2SH" in story
|
||||
elif script_type == "p2tr":
|
||||
assert "P2TR" in story
|
||||
else:
|
||||
assert "P2SH-P2WSH" in story
|
||||
# assert "Derivation:\n Varies (2)" in story
|
||||
press_select() # approve multisig import
|
||||
import_duplicate(fname, way=way, data=data)
|
||||
goto_home()
|
||||
pick_menu_item('Settings')
|
||||
pick_menu_item('Miniscript')
|
||||
menu = cap_menu()
|
||||
pick_menu_item(menu[0]) # pick imported descriptor multisig wallet
|
||||
pick_menu_item("Descriptors")
|
||||
pick_menu_item("Bitcoin Core")
|
||||
text = load_export(way, label="Bitcoin Core miniscript", is_json=False, sig_check=False)
|
||||
text = text.replace("importdescriptors ", "").strip()
|
||||
# remove junk
|
||||
r1 = text.find("[")
|
||||
r2 = text.find("]", -1, 0)
|
||||
text = text[r1: r2]
|
||||
core_desc_object = json.loads(text)
|
||||
# import descriptors to watch only wallet
|
||||
res = ms.importdescriptors(core_desc_object)
|
||||
assert res[0]["success"]
|
||||
assert res[1]["success"]
|
||||
|
||||
if funded:
|
||||
if script_type == "p2wsh":
|
||||
addr_type = "bech32"
|
||||
elif script_type == "p2tr":
|
||||
addr_type = "bech32m"
|
||||
elif script_type == "p2sh":
|
||||
addr_type = "legacy"
|
||||
else:
|
||||
addr_type = "p2sh-segwit"
|
||||
|
||||
addr = ms.getnewaddress("", addr_type)
|
||||
if script_type == "p2wsh":
|
||||
sw = "bcrt1q"
|
||||
elif script_type == "p2tr":
|
||||
sw = "bcrt1p"
|
||||
else:
|
||||
sw = "2"
|
||||
assert addr.startswith(sw)
|
||||
# get some coins and fund above multisig address
|
||||
bitcoind.supply_wallet.sendtoaddress(addr, 49)
|
||||
bitcoind.supply_wallet.generatetoaddress(1, bitcoind.supply_wallet.getnewaddress()) # mine above
|
||||
|
||||
return ms, bitcoind_signers
|
||||
|
||||
return doit
|
||||
|
||||
|
||||
@pytest.mark.bitcoind
|
||||
@pytest.mark.parametrize("addr_fmt", ["bech32", "p2sh-segwit"])
|
||||
@pytest.mark.parametrize("lt_type", ["older", "after"]) # this is actually not generated by liana (liana is relative only)
|
||||
@ -786,159 +939,6 @@ def test_liana_miniscripts_complex(addr_fmt, minsc, bitcoind, use_regtest, clear
|
||||
address_explorer_check(way, addr_fmt, wo, name)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bitcoind_miniscript(bitcoind, need_keypress, cap_story, load_export,
|
||||
pick_menu_item, goto_home, cap_menu, microsd_path,
|
||||
use_regtest, get_cc_key, import_miniscript,
|
||||
bitcoin_core_signer, import_duplicate, press_select,
|
||||
virtdisk_path, garbage_collector):
|
||||
def doit(M, N, script_type, internal_key=None, cc_account=0, funded=True,
|
||||
tapscript_threshold=False, add_own_pk=False, same_account=False, way="sd"):
|
||||
|
||||
use_regtest()
|
||||
bitcoind_signers = []
|
||||
bitcoind_signers_xpubs = []
|
||||
for i in range(N - 1):
|
||||
s, core_key = bitcoin_core_signer(f"bitcoind--signer{i}")
|
||||
s.keypoolrefill(10)
|
||||
bitcoind_signers.append(s)
|
||||
bitcoind_signers_xpubs.append(core_key)
|
||||
|
||||
# watch only wallet where multisig descriptor will be imported
|
||||
ms = bitcoind.create_wallet(
|
||||
wallet_name=f"watch_only_{script_type}_{M}of{N}", disable_private_keys=True,
|
||||
blank=True, passphrase=None, avoid_reuse=False, descriptors=True
|
||||
)
|
||||
me_pth = f"m/48h/1h/{cc_account}h/3h"
|
||||
me = get_cc_key(me_pth)
|
||||
ik = internal_key or ranged_unspendable_internal_key()
|
||||
|
||||
if tapscript_threshold:
|
||||
signers_xp = [me] + bitcoind_signers_xpubs
|
||||
assert len(signers_xp) == N
|
||||
desc = f"tr({ik},%s)"
|
||||
|
||||
scripts = []
|
||||
for c in itertools.combinations(signers_xp, M):
|
||||
tmplt = f"sortedmulti_a({M},{','.join(c)})"
|
||||
scripts.append(tmplt)
|
||||
|
||||
if len(scripts) > 8:
|
||||
while True:
|
||||
# just some of them but at least one has to have my key
|
||||
x = random.sample(scripts, 8)
|
||||
if any(me in s for s in x):
|
||||
scripts = x
|
||||
break
|
||||
|
||||
if add_own_pk:
|
||||
if len(scripts) < 8:
|
||||
if same_account:
|
||||
cc_key = get_cc_key(me_pth, subderiv="/<2;3>/*")
|
||||
else:
|
||||
cc_key = get_cc_key("m/86h/1h/1000h")
|
||||
cc_pk_leaf = f"pk({cc_key})"
|
||||
scripts.append(cc_pk_leaf)
|
||||
else:
|
||||
pytest.skip("Scripts full")
|
||||
|
||||
temp = TREE[len(scripts)]
|
||||
temp = temp % tuple(scripts)
|
||||
|
||||
desc = desc % temp
|
||||
|
||||
else:
|
||||
if add_own_pk:
|
||||
if same_account:
|
||||
ss = [get_cc_key(me_pth, subderiv="/<4;5>/*")] + bitcoind_signers_xpubs
|
||||
cc_key = get_cc_key(me_pth, subderiv="/<6;7>/*")
|
||||
else:
|
||||
ss = [get_cc_key("m/86h/1h/0h")] + bitcoind_signers_xpubs
|
||||
cc_key = get_cc_key("m/86h/1h/1000h")
|
||||
|
||||
tmplt = f"sortedmulti_a({M},{','.join(ss)})"
|
||||
cc_pk_leaf = f"pk({cc_key})"
|
||||
desc = f"tr({ik},{{{tmplt},{cc_pk_leaf}}})"
|
||||
else:
|
||||
desc = f"tr({ik},sortedmulti_a({M},{me},{','.join(bitcoind_signers_xpubs)}))"
|
||||
|
||||
name = "minisc"
|
||||
fname = None
|
||||
if way in ["sd", "vdisk"]:
|
||||
data = None
|
||||
fname = f"{name}.txt"
|
||||
path_f = microsd_path if way == 'sd' else virtdisk_path
|
||||
fpath = path_f(fname)
|
||||
with open(fpath, "w") as f:
|
||||
f.write(desc + "\n")
|
||||
garbage_collector.append(fpath)
|
||||
else:
|
||||
data = dict(name=name, desc=desc)
|
||||
|
||||
_, story = import_miniscript(fname, way=way, data=data)
|
||||
assert "Create new miniscript wallet?" in story
|
||||
assert name in story
|
||||
if script_type == "p2tr":
|
||||
assert "Taproot internal key" in story
|
||||
assert "Tapscript" in story
|
||||
assert "Press (1) to see extended public keys" in story
|
||||
if script_type == "p2wsh":
|
||||
assert "P2WSH" in story
|
||||
elif script_type == "p2sh":
|
||||
assert "P2SH" in story
|
||||
elif script_type == "p2tr":
|
||||
assert "P2TR" in story
|
||||
else:
|
||||
assert "P2SH-P2WSH" in story
|
||||
# assert "Derivation:\n Varies (2)" in story
|
||||
press_select() # approve multisig import
|
||||
import_duplicate(fname, way=way, data=data)
|
||||
goto_home()
|
||||
pick_menu_item('Settings')
|
||||
pick_menu_item('Miniscript')
|
||||
menu = cap_menu()
|
||||
pick_menu_item(menu[0]) # pick imported descriptor multisig wallet
|
||||
pick_menu_item("Descriptors")
|
||||
pick_menu_item("Bitcoin Core")
|
||||
text = load_export(way, label="Bitcoin Core miniscript", is_json=False, sig_check=False)
|
||||
text = text.replace("importdescriptors ", "").strip()
|
||||
# remove junk
|
||||
r1 = text.find("[")
|
||||
r2 = text.find("]", -1, 0)
|
||||
text = text[r1: r2]
|
||||
core_desc_object = json.loads(text)
|
||||
# import descriptors to watch only wallet
|
||||
res = ms.importdescriptors(core_desc_object)
|
||||
assert res[0]["success"]
|
||||
assert res[1]["success"]
|
||||
|
||||
if funded:
|
||||
if script_type == "p2wsh":
|
||||
addr_type = "bech32"
|
||||
elif script_type == "p2tr":
|
||||
addr_type = "bech32m"
|
||||
elif script_type == "p2sh":
|
||||
addr_type = "legacy"
|
||||
else:
|
||||
addr_type = "p2sh-segwit"
|
||||
|
||||
addr = ms.getnewaddress("", addr_type)
|
||||
if script_type == "p2wsh":
|
||||
sw = "bcrt1q"
|
||||
elif script_type == "p2tr":
|
||||
sw = "bcrt1p"
|
||||
else:
|
||||
sw = "2"
|
||||
assert addr.startswith(sw)
|
||||
# get some coins and fund above multisig address
|
||||
bitcoind.supply_wallet.sendtoaddress(addr, 49)
|
||||
bitcoind.supply_wallet.generatetoaddress(1, bitcoind.supply_wallet.getnewaddress()) # mine above
|
||||
|
||||
return ms, bitcoind_signers
|
||||
|
||||
return doit
|
||||
|
||||
|
||||
@pytest.mark.bitcoind
|
||||
@pytest.mark.parametrize("cc_first", [True, False])
|
||||
@pytest.mark.parametrize("add_pk", [True, False])
|
||||
@ -1648,26 +1648,6 @@ def test_same_key_change_based_minisc(goto_home, pick_menu_item, cap_story,
|
||||
address_explorer_check("sd", af, wo, "mini-change")
|
||||
|
||||
|
||||
def test_same_key_account_based_multisig(goto_home, pick_menu_item, cap_story,
|
||||
clear_miniscript, microsd_path, load_export, bitcoind,
|
||||
import_miniscript, garbage_collector):
|
||||
clear_miniscript()
|
||||
desc = ("wsh(sortedmulti(2,"
|
||||
"[0f056943/84'/1'/0']tpubDC7jGaaSE66Pn4dgtbAAstde4bCyhSUs4r3P8WhMVvPByvcRrzrwqSvpF9Ghx83Z1LfVugGRrSBko5UEKELCz9HoMv5qKmGq3fqnnbS5E9r/<0;1>/*,"
|
||||
"[0f056943/84'/1'/9']tpubDC7jGaaSE66QBAcX8TUD3JKWari1zmGH4gNyKZcrfq6NwCofKujNF2kyeVXgKshotxw5Yib8UxLrmmCmWd8NVPVTAL8rGfMdc7TsAKqsy6y/<0;1>/*"
|
||||
"))")
|
||||
name = "multi-accounts"
|
||||
fname = f"{name}.txt"
|
||||
fpath = microsd_path(fname)
|
||||
with open(fpath, "w") as f:
|
||||
f.write(desc)
|
||||
garbage_collector.append(fpath)
|
||||
|
||||
_, story = import_miniscript(fname)
|
||||
assert "Failed to import" in story
|
||||
assert "Use Settings -> Multisig Wallets" in story
|
||||
|
||||
|
||||
@pytest.mark.parametrize("desc", [
|
||||
"wsh(or_d(pk(@A),and_v(v:pkh(@A),older(5))))",
|
||||
"tr(@ik,multi_a(2,@A,@A))",
|
||||
@ -1711,7 +1691,7 @@ def test_tapscript_depth(get_cc_key, pick_menu_item, cap_story,
|
||||
assert "num_leafs > 8" in story
|
||||
|
||||
@pytest.mark.bitcoind
|
||||
@pytest.mark.parametrize("lt_type", ["older", "after"])
|
||||
# @pytest.mark.parametrize("lt_type", ["older", "after"])
|
||||
@pytest.mark.parametrize("same_acct", [True, False])
|
||||
@pytest.mark.parametrize("recovery", [True, False])
|
||||
@pytest.mark.parametrize("leaf2_mine", [True, False])
|
||||
@ -1730,7 +1710,7 @@ def test_minitapscript(leaf2_mine, recovery, minisc, clear_miniscript, goto_home
|
||||
use_regtest, bitcoind, microsd_wipe, load_export, dev,
|
||||
address_explorer_check, get_cc_key, import_miniscript,
|
||||
bitcoin_core_signer, same_acct, import_duplicate, press_select,
|
||||
garbage_collector, lt_type):
|
||||
garbage_collector, start_sign, end_sign):
|
||||
lt_type = "older"
|
||||
# needs bitcoind 26.0
|
||||
normal_cosign_core = False
|
||||
@ -1757,6 +1737,7 @@ def test_minitapscript(leaf2_mine, recovery, minisc, clear_miniscript, goto_home
|
||||
for i in range(3):
|
||||
# core signers
|
||||
signer, core_key = bitcoin_core_signer(f"co-signer{i}")
|
||||
signer.keypoolrefill(25)
|
||||
core_keys.append(core_key)
|
||||
signers.append(signer)
|
||||
|
||||
@ -1801,7 +1782,7 @@ def test_minitapscript(leaf2_mine, recovery, minisc, clear_miniscript, goto_home
|
||||
garbage_collector.append(fpath)
|
||||
|
||||
wo = bitcoind.create_wallet(wallet_name=name, disable_private_keys=True, blank=True,
|
||||
passphrase=None, avoid_reuse=False, descriptors=True)
|
||||
passphrase=None, avoid_reuse=False, descriptors=True)
|
||||
|
||||
_, story = import_miniscript(fname)
|
||||
assert "Create new miniscript wallet?" in story
|
||||
@ -1824,25 +1805,32 @@ def test_minitapscript(leaf2_mine, recovery, minisc, clear_miniscript, goto_home
|
||||
for obj in res:
|
||||
assert obj["success"]
|
||||
addr = wo.getnewaddress("", "bech32m")
|
||||
addr_dest = wo.getnewaddress("", "bech32m") # self-spend
|
||||
assert bitcoind.supply_wallet.sendtoaddress(addr, 49)
|
||||
bitcoind.supply_wallet.generatetoaddress(1, bitcoind.supply_wallet.getnewaddress())
|
||||
all_of_it = wo.getbalance()
|
||||
unspent = wo.listunspent()
|
||||
assert len(unspent) == 1
|
||||
inp = {"txid": unspent[0]["txid"], "vout": unspent[0]["vout"]}
|
||||
|
||||
if recovery and sequence and not leaf2_mine:
|
||||
inp["sequence"] = sequence
|
||||
|
||||
# split to
|
||||
num_outs = 20
|
||||
nVal = all_of_it / num_outs
|
||||
conso_addrs = [{wo.getnewaddress("", "bech32m"): nVal} for _ in range(num_outs)] # self-spend
|
||||
psbt_resp = wo.walletcreatefundedpsbt(
|
||||
[inp],
|
||||
[{addr_dest: all_of_it - 1}],
|
||||
conso_addrs,
|
||||
locktime if (recovery and not leaf2_mine) else 0,
|
||||
{"fee_rate": 20, "change_type": "bech32m", "subtractFeeFromOutputs": [0]},
|
||||
{"fee_rate": 2, "change_type": "bech32m", "subtractFeeFromOutputs": [0]},
|
||||
)
|
||||
psbt = psbt_resp.get("psbt")
|
||||
|
||||
if (normal_cosign_core or recovery_cosign_core) and not leaf2_mine:
|
||||
psbt = signers[1].walletprocesspsbt(psbt, True, "ALL")["psbt"]
|
||||
psbt_res = signers[1].walletprocesspsbt(psbt, True, "DEFAULT")
|
||||
assert psbt_res["psbt"] != psbt
|
||||
psbt = psbt_res.get("psbt")
|
||||
|
||||
name = f"{name}.psbt"
|
||||
fpath = microsd_path(name)
|
||||
@ -1859,6 +1847,8 @@ def test_minitapscript(leaf2_mine, recovery, minisc, clear_miniscript, goto_home
|
||||
time.sleep(0.1)
|
||||
title, story = cap_story()
|
||||
assert title == "OK TO SEND?"
|
||||
assert "1 input" in story
|
||||
assert "20 outputs" in story
|
||||
assert "Consolidating" in story
|
||||
press_select() # confirm signing
|
||||
time.sleep(0.5)
|
||||
@ -1891,6 +1881,62 @@ def test_minitapscript(leaf2_mine, recovery, minisc, clear_miniscript, goto_home
|
||||
res = wo.sendrawtransaction(tx_hex)
|
||||
assert len(res) == 64 # tx id
|
||||
|
||||
bitcoind.supply_wallet.generatetoaddress(1, bitcoind.supply_wallet.getnewaddress())
|
||||
unspent = wo.listunspent()
|
||||
assert len(unspent) == 20
|
||||
ins = [{"txid": u["txid"], "vout": u["vout"]} for u in unspent]
|
||||
|
||||
if recovery and sequence and not leaf2_mine:
|
||||
for i in ins:
|
||||
i["sequence"] = sequence
|
||||
|
||||
# consolidate multiple inputs to one for us
|
||||
# BUT also send 1 corn back to supply (so not a consolidation)
|
||||
outs = [
|
||||
{wo.getnewaddress("", "bech32m"): wo.getbalance() - 1},
|
||||
{bitcoind.supply_wallet.getnewaddress("", "bech32"): 1},
|
||||
]
|
||||
psbt_resp = wo.walletcreatefundedpsbt(
|
||||
ins,
|
||||
outs,
|
||||
locktime if (recovery and not leaf2_mine) else 0,
|
||||
{"fee_rate": 2, "change_type": "bech32m", "subtractFeeFromOutputs": [0]},
|
||||
)
|
||||
psbt = psbt_resp.get("psbt")
|
||||
|
||||
# now CC first
|
||||
start_sign(base64.b64decode(psbt))
|
||||
time.sleep(.1)
|
||||
title, story = cap_story()
|
||||
assert title == "OK TO SEND?"
|
||||
assert "Consolidating" not in story
|
||||
assert "20 inputs" in story
|
||||
assert "2 outputs" in story
|
||||
final_psbt = end_sign(True)
|
||||
psbt = base64.b64encode(final_psbt).decode()
|
||||
|
||||
if (normal_cosign_core or recovery_cosign_core) and not leaf2_mine:
|
||||
# core co-signer second after CC (if needed)
|
||||
psbt_res = signers[1].walletprocesspsbt(psbt, True, "DEFAULT")
|
||||
assert psbt_res["psbt"] != psbt
|
||||
psbt = psbt_res.get("psbt")
|
||||
|
||||
res = wo.finalizepsbt(psbt)
|
||||
assert res["complete"]
|
||||
tx_hex = res["hex"]
|
||||
res = wo.testmempoolaccept([tx_hex])
|
||||
if recovery and not leaf2_mine:
|
||||
assert not res[0]["allowed"]
|
||||
assert res[0]["reject-reason"] == 'non-BIP68-final' if sequence else "non-final"
|
||||
bitcoind.supply_wallet.generatetoaddress(6, bitcoind.supply_wallet.getnewaddress())
|
||||
res = wo.testmempoolaccept([tx_hex])
|
||||
assert res[0]["allowed"]
|
||||
else:
|
||||
assert res[0]["allowed"]
|
||||
|
||||
res = wo.sendrawtransaction(tx_hex)
|
||||
assert len(res) == 64 # tx id
|
||||
|
||||
# check addresses
|
||||
address_explorer_check("sd", "bech32m", wo, "minitapscript")
|
||||
|
||||
|
||||
@ -3088,7 +3088,8 @@ def test_txout_explorer(chain, data, fake_txn, start_sign, settings_set, txout_e
|
||||
[(0, b""), (10, b"")],
|
||||
])
|
||||
def test_txout_explorer_op_return(finalize, data, fake_txn, start_sign, cap_story, is_q1,
|
||||
need_keypress, press_cancel, press_select, end_sign):
|
||||
need_keypress, press_cancel, press_select, end_sign,
|
||||
cap_screen_qr):
|
||||
outputs = [["p2tr", 50000, not i] for i in range(20)]
|
||||
outputs += [["op_return", am, None, d] for am, d in data]
|
||||
out_val = sum(o[1] for o in outputs)
|
||||
@ -3216,97 +3217,6 @@ def test_smallest_txn(fake_txn, start_sign, end_sign, reset_seed_words, settings
|
||||
assert "null-data" in story
|
||||
assert "OP_RETURN" in story
|
||||
|
||||
def test_smallest_txn(fake_txn, start_sign, end_sign, reset_seed_words, settings_set):
|
||||
# serialized txn has just 62 bytes and is the smallest that we support
|
||||
# 1 input (iregardless of script type) and 1 zero value null OP_RETURN
|
||||
reset_seed_words()
|
||||
settings_set('fee_limit', -1)
|
||||
psbt = fake_txn(1, 0, op_return=[(10, b"")], input_amount=10)
|
||||
start_sign(psbt, False, stxn_flags=STXN_VISUALIZE)
|
||||
story = end_sign(accept=None, expect_txn=False).decode()
|
||||
assert "null-data" in story
|
||||
assert "OP_RETURN" in story
|
||||
|
||||
|
||||
@pytest.mark.parametrize("num_outs", [1, 12])
|
||||
@pytest.mark.parametrize("change", [True, False])
|
||||
def test_zero_value_outputs(num_outs, change, fake_txn, start_sign, end_sign, reset_seed_words,
|
||||
settings_set):
|
||||
reset_seed_words()
|
||||
# user needs to disable fee limit checks to be able to do this
|
||||
settings_set("fee_limit", -1)
|
||||
change_outs = list(range(num_outs)) if change else []
|
||||
psbt = fake_txn(1, num_outs, outvals=num_outs*[0], change_outputs=change_outs, input_amount=1)
|
||||
start_sign(psbt, False, stxn_flags=STXN_VISUALIZE)
|
||||
story = end_sign(accept=None, expect_txn=False).decode()
|
||||
assert f"Zero Value: Non-standard zero value output(s)" in story
|
||||
assert "1 input" in story
|
||||
assert f"{num_outs} output{'' if num_outs == 1 else 's'}" in story
|
||||
assert 'Network fee 0.00000001 XTN' in story
|
||||
|
||||
if change:
|
||||
assert "0.00000000 XTN" in story.split("\n\n")[4] # change back is zero
|
||||
assert "Consolidating 0.00000000 XTN" in story
|
||||
assert "Change back" in story
|
||||
if num_outs > 1:
|
||||
assert "to addresses" in story
|
||||
else:
|
||||
assert "to address" in story
|
||||
else:
|
||||
# even
|
||||
if num_outs == 12:
|
||||
# even tho we do not see 2 outputs, fee is also 0 and 2 smaller not shown here have also value o 0
|
||||
assert story.count('0.00000000 XTN') == 12
|
||||
else:
|
||||
assert story.count('0.00000000 XTN') == 2
|
||||
assert "Change back" not in story
|
||||
|
||||
|
||||
@pytest.mark.parametrize("change", [True, False])
|
||||
@pytest.mark.parametrize("num_ins", [True, False])
|
||||
def test_zero_value_input(change, num_ins, fake_txn, start_sign, end_sign, reset_seed_words,
|
||||
cap_story):
|
||||
# 0 value inputs - not allowed
|
||||
reset_seed_words()
|
||||
psbt = fake_txn(1, 1, fee=0, input_amount=0)
|
||||
start_sign(psbt, False, stxn_flags=STXN_VISUALIZE)
|
||||
with pytest.raises(Exception):
|
||||
end_sign(accept=None, expect_txn=False).decode()
|
||||
|
||||
_, story = cap_story()
|
||||
assert "zero value txn" in story
|
||||
|
||||
|
||||
@pytest.mark.parametrize("change", [True, False])
|
||||
def test_zero_value_inputs(change, fake_txn, start_sign, end_sign, reset_seed_words):
|
||||
# one input is-non zero
|
||||
# others are zero --> allowed
|
||||
reset_seed_words()
|
||||
invals = [0 for i in range(4)] + [100]
|
||||
psbt = fake_txn(5, 1, invals=invals, outvals=[99], change_outputs=[0] if change else [], fee=20)
|
||||
start_sign(psbt, False, stxn_flags=STXN_VISUALIZE)
|
||||
end_sign(accept=None, expect_txn=False).decode()
|
||||
|
||||
|
||||
def test_negative_amount_inputs(reset_seed_words, fake_txn, start_sign, end_sign, cap_story):
|
||||
reset_seed_words()
|
||||
psbt = fake_txn(1, 1, fee=0, input_amount=-1000)
|
||||
start_sign(psbt, False, stxn_flags=STXN_VISUALIZE)
|
||||
with pytest.raises(Exception):
|
||||
end_sign(accept=None, expect_txn=False).decode()
|
||||
|
||||
_, story = cap_story()
|
||||
assert "negative input value: i0" in story
|
||||
|
||||
def test_negative_amount_outputs(reset_seed_words, fake_txn, start_sign, end_sign, cap_story):
|
||||
reset_seed_words()
|
||||
psbt = fake_txn(1, 1, outvals=[-1000], fee=0)
|
||||
start_sign(psbt, False, stxn_flags=STXN_VISUALIZE)
|
||||
with pytest.raises(Exception):
|
||||
end_sign(accept=None, expect_txn=False).decode()
|
||||
|
||||
_, story = cap_story()
|
||||
assert "negative output value: o0" in story
|
||||
|
||||
@pytest.mark.parametrize("num_outs", [1, 12])
|
||||
@pytest.mark.parametrize("change", [True, False])
|
||||
@ -3318,7 +3228,7 @@ def test_zero_value_outputs(num_outs, change, fake_txn, start_sign, end_sign, re
|
||||
psbt = fake_txn(1, num_outs * [[random.choice(ADDR_STYLES_SINGLE), 0, change]], input_amount=1)
|
||||
start_sign(psbt, False, stxn_flags=STXN_VISUALIZE)
|
||||
story = end_sign(accept=None, expect_txn=False).decode()
|
||||
assert f"Zero Value: Non-standard zero value outputs: {num_outs}" in story
|
||||
assert "Zero Value: Non-standard zero value output(s)." in story
|
||||
assert "1 input" in story
|
||||
assert f"{num_outs} output{'' if num_outs == 1 else 's'}" in story
|
||||
assert 'Network fee 0.00000001 XTN' in story
|
||||
@ -3710,7 +3620,7 @@ def test_invalid_input_taproot_psbt(start_sign, fn_err_msg, cap_story):
|
||||
# assert err_msg in story
|
||||
|
||||
|
||||
def test_invalid_output_tapproot_psbt(fake_txn, start_sign, cap_story, dev):
|
||||
def test_invalid_output_taproot_psbt(fake_txn, start_sign, cap_story, dev):
|
||||
psbt = fake_txn(3, [[],["p2tr", None, True]], master_xpub=dev.master_xpub, addr_fmt="p2tr")
|
||||
# invalid internal key length
|
||||
psbt_obj = BasicPSBT().parse(psbt)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user