provide info about Tx level locktimes (nLocktime, nSequence) when signing
(cherry picked from commit af753c38be)
This commit is contained in:
parent
f3db5d7822
commit
313fb74dc7
@ -2,6 +2,10 @@
|
||||
|
||||
- New Feature: Temporary Seed from COLDCARD encrypted backup.
|
||||
- New Feature: Export seed as SeedQR
|
||||
- New Feature: Provide user with info about transaction level timelocks
|
||||
([nLockTime](https://en.bitcoin.it/wiki/NLockTime),
|
||||
[nSequence](https://github.com/bitcoin/bips/blob/master/bip-0068.mediawiki))
|
||||
when signing
|
||||
- Enhancement: Add current temporary seed to Seed Vault from within Seed Vault menu.
|
||||
If current seed is temporary and not saved yet, `Add current tmp` menu item is
|
||||
shown in Seed Vault menu.
|
||||
|
||||
@ -712,6 +712,13 @@ class ApproveTransaction(UserAuthorizedAction):
|
||||
self.output_change_text(msg)
|
||||
gc.collect()
|
||||
|
||||
if self.psbt.ux_notes:
|
||||
# currently we only have locktimes in ux_notes
|
||||
msg.write('\nTX LOCKTIMES\n\n')
|
||||
|
||||
for label, m in self.psbt.ux_notes:
|
||||
msg.write('- %s: %s\n\n' % (label, m))
|
||||
|
||||
if self.psbt.warnings:
|
||||
msg.write('\n---WARNING---\n\n')
|
||||
|
||||
|
||||
136
shared/psbt.py
136
shared/psbt.py
@ -5,6 +5,7 @@
|
||||
from ustruct import unpack_from, unpack, pack
|
||||
from ubinascii import hexlify as b2a_hex
|
||||
from utils import xfp2str, B2A, keypath_to_str, problem_file_line
|
||||
from utils import seconds2human_readable, datetime_from_timestamp, datetime_to_str
|
||||
import stash, gc, history, sys, ngu, ckcc, chains
|
||||
from uhashlib import sha256
|
||||
from uio import BytesIO
|
||||
@ -574,6 +575,32 @@ class psbtInputProxy(psbtProxy):
|
||||
|
||||
self.parse(fd)
|
||||
|
||||
def has_relative_timelock(self, txin):
|
||||
# https://github.com/bitcoin/bips/blob/master/bip-0068.mediawiki
|
||||
SEQUENCE_LOCKTIME_DISABLE_FLAG = (1 << 31)
|
||||
SEQUENCE_LOCKTIME_TYPE_FLAG = (1 << 22)
|
||||
SEQUENCE_LOCKTIME_MASK = 0x0000ffff
|
||||
SEQUENCE_LOCKTIME_GRANULARITY = 9
|
||||
is_timebased = False
|
||||
|
||||
if txin.nSequence & SEQUENCE_LOCKTIME_DISABLE_FLAG:
|
||||
# RTL disabled
|
||||
return
|
||||
if txin.nSequence & SEQUENCE_LOCKTIME_TYPE_FLAG:
|
||||
# Time-based relative lock-time
|
||||
is_timebased = True
|
||||
res = (txin.nSequence & SEQUENCE_LOCKTIME_MASK) << SEQUENCE_LOCKTIME_GRANULARITY
|
||||
else:
|
||||
# Block height relative lock-time
|
||||
res = txin.nSequence & SEQUENCE_LOCKTIME_MASK
|
||||
|
||||
if res == 0:
|
||||
# any locktime that is zero, regardless of MPT or blocks
|
||||
# is always immediately spendable
|
||||
return
|
||||
|
||||
return is_timebased, res
|
||||
|
||||
def validate(self, idx, txin, my_xfp, parent):
|
||||
# Validate this txn input: given deserialized CTxIn and maybe witness
|
||||
|
||||
@ -963,6 +990,9 @@ class psbtObject(psbtProxy):
|
||||
self.active_multisig = None
|
||||
|
||||
self.warnings = []
|
||||
# not a warning just more info about tx
|
||||
# presented in UX on confirm tx screen before warnings
|
||||
self.ux_notes = []
|
||||
|
||||
# v1 vs v2 validation
|
||||
self.is_v2 = False
|
||||
@ -1238,6 +1268,74 @@ class psbtObject(psbtProxy):
|
||||
# we should not reach this point (ie. raise something to abort signing)
|
||||
return
|
||||
|
||||
def ux_relative_timelocks(self, tb, bb):
|
||||
# visualize 10 largest timelock to user
|
||||
# when signing a tx
|
||||
MAX_SHOW = 10
|
||||
num_tb = len(tb)
|
||||
num_bb = len(bb)
|
||||
|
||||
if (num_tb + num_bb) > MAX_SHOW:
|
||||
# 10 from each is enough for us to have in memory
|
||||
tb = sorted(tb, key=lambda item: item[1], reverse=True)[:10]
|
||||
bb = sorted(bb, key=lambda item: item[1], reverse=True)[:10]
|
||||
if (num_tb >= 5) and (num_bb >= 5):
|
||||
# 5 biggest from each
|
||||
tb = tb[:5]
|
||||
bb = bb[:5]
|
||||
else:
|
||||
if num_tb < num_bb:
|
||||
tb = tb[:num_tb]
|
||||
bb = bb[:(MAX_SHOW - num_tb)]
|
||||
else:
|
||||
bb = bb[:num_bb]
|
||||
tb = tb[:(MAX_SHOW - num_bb)]
|
||||
|
||||
if num_bb:
|
||||
# Block height relative lock-time
|
||||
if num_bb == 1:
|
||||
idx, val = bb[0]
|
||||
msg = "Input %d. has relative block height timelock of %d blocks" % (
|
||||
idx, val
|
||||
)
|
||||
elif all(bb[0][1] == i[1] for i in bb):
|
||||
msg = "%d inputs have relative block height timelock of %d blocks" % (
|
||||
num_bb, bb[0][1]
|
||||
)
|
||||
else:
|
||||
msg = "%d inputs have relative block height timelock." % num_bb
|
||||
if num_bb > len(bb):
|
||||
msg += " Showing only %d with highest values." % len(bb)
|
||||
msg += "\n\n"
|
||||
for idx, num_blocks in bb:
|
||||
msg += " %d. %d blocks\n" % (idx, num_blocks)
|
||||
msg += "\n"
|
||||
|
||||
self.ux_notes.append(("Block height RTL", msg))
|
||||
|
||||
if num_tb:
|
||||
# Block height relative lock-time
|
||||
if num_tb == 1:
|
||||
idx, val = tb[0]
|
||||
val = seconds2human_readable(val)
|
||||
msg = "Input %d. has relative time-based timelock of:\n %s" % (
|
||||
idx, val
|
||||
)
|
||||
elif all(tb[0][1] == i[1] for i in tb):
|
||||
msg = "%d inputs have relative time-based timelock of:\n %s" % (
|
||||
num_tb, seconds2human_readable(tb[0][1])
|
||||
)
|
||||
else:
|
||||
msg = "%d inputs have relative time-based timelock." % num_tb
|
||||
if num_tb > len(tb):
|
||||
msg += " Showing only %d with highest values." % len(tb)
|
||||
msg += "\n\n"
|
||||
for idx, seconds in tb:
|
||||
hr = seconds2human_readable(seconds)
|
||||
msg += " %d. %s\n" % (idx, hr)
|
||||
msg += "\n"
|
||||
|
||||
self.ux_notes.append(("Time-based RTL", msg))
|
||||
|
||||
async def validate(self):
|
||||
# Do a first pass over the txn. Raise assertions, be terse tho because
|
||||
@ -1278,6 +1376,11 @@ class psbtObject(psbtProxy):
|
||||
assert out.amount is None
|
||||
assert out.script is None
|
||||
|
||||
# time based relative locks
|
||||
tb_rel_locks = []
|
||||
# block height based relative locks
|
||||
bb_rel_locks = []
|
||||
smallest_nsequence = 0xffffffff
|
||||
# this parses the input TXN in-place
|
||||
for idx, txin in self.input_iter():
|
||||
inp = self.inputs[idx]
|
||||
@ -1298,6 +1401,39 @@ class psbtObject(psbtProxy):
|
||||
assert inp.req_height_locktime is None
|
||||
|
||||
self.inputs[idx].validate(idx, txin, self.my_xfp, self)
|
||||
if self.txn_version >= 2:
|
||||
has_rtl = self.inputs[idx].has_relative_timelock(txin)
|
||||
if has_rtl:
|
||||
if has_rtl[0]:
|
||||
tb_rel_locks.append((idx, has_rtl[1]))
|
||||
else:
|
||||
bb_rel_locks.append((idx, has_rtl[1]))
|
||||
|
||||
if txin.nSequence < smallest_nsequence:
|
||||
smallest_nsequence = txin.nSequence
|
||||
|
||||
if isinstance(self.lock_time, int) and self.lock_time > 0:
|
||||
if smallest_nsequence == 0xffffffff:
|
||||
self.warnings.append((
|
||||
"Bad Locktime",
|
||||
"Locktime has no effect! None of the nSequences decremented."
|
||||
))
|
||||
else:
|
||||
msg = "This tx can only be spent after "
|
||||
if self.lock_time < 500000000:
|
||||
msg += "block height of %d" % self.lock_time
|
||||
else:
|
||||
try:
|
||||
dt = datetime_from_timestamp(self.lock_time)
|
||||
msg += datetime_to_str(dt)
|
||||
except:
|
||||
msg += "%d (unix timestamp)" % self.lock_time
|
||||
|
||||
msg += " (MTP)" # median time past
|
||||
self.ux_notes.append(("Abs Locktime", msg))
|
||||
|
||||
# create UX for users about tx level relative timelocks (nSequence)
|
||||
self.ux_relative_timelocks(tb_rel_locks, bb_rel_locks)
|
||||
|
||||
assert len(self.inputs) == self.num_inputs, 'ni mismatch'
|
||||
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
#
|
||||
# utils.py - Misc utils. My favourite kind of source file.
|
||||
#
|
||||
import gc, sys, ustruct, ngu, chains, ure
|
||||
import gc, sys, ustruct, ngu, chains, ure, time
|
||||
from ubinascii import unhexlify as a2b_hex
|
||||
from ubinascii import hexlify as b2a_hex
|
||||
from ubinascii import a2b_base64, b2a_base64
|
||||
@ -538,4 +538,39 @@ def pad_raw_secret(raw_sec_str):
|
||||
raw[0:len(x)] = x
|
||||
return raw
|
||||
|
||||
def seconds2human_readable(s):
|
||||
days = s // (3600 * 24)
|
||||
hours = s % (3600 * 24) // 3600
|
||||
minutes = (s % 3600) // 60
|
||||
seconds = (s % 3600) % 60
|
||||
msg = []
|
||||
if days:
|
||||
msg.append("%dd" % days)
|
||||
if hours:
|
||||
msg.append("%dh" % hours)
|
||||
if minutes:
|
||||
msg.append("%dm" % minutes)
|
||||
if seconds:
|
||||
msg.append("%ds" % seconds)
|
||||
|
||||
return " ".join(msg)
|
||||
|
||||
def datetime_from_timestamp(ts):
|
||||
gm_t = time.gmtime(0)
|
||||
if gm_t[0] == 1970:
|
||||
# unix
|
||||
epoch_sub = 0
|
||||
elif gm_t[0] == 2000:
|
||||
# stm32
|
||||
epoch_sub = 946684800
|
||||
else:
|
||||
assert False
|
||||
|
||||
return time.gmtime(ts - epoch_sub)
|
||||
|
||||
def datetime_to_str(dt, fmt="%d-%02d-%02d %02d:%02d:%02d"):
|
||||
y, mo, d, h, mi, s = dt[:6]
|
||||
dts = fmt % (y, mo, d, h, mi, s)
|
||||
return dts + " UTC"
|
||||
|
||||
# EOF
|
||||
|
||||
@ -10,6 +10,7 @@ from sim_settings import sim_defaults
|
||||
if not pa.is_secret_blank():
|
||||
# clear settings associated with this key, since it will be no more
|
||||
settings.current = dict(sim_defaults)
|
||||
settings.nvram_key = bytes(32)
|
||||
pa.tmp_value = None
|
||||
|
||||
# save a blank secret (all zeros is a special case, detected by bootloader)
|
||||
@ -21,7 +22,10 @@ if not pa.is_secret_blank():
|
||||
pa.login()
|
||||
|
||||
assert pa.is_secret_blank()
|
||||
settings.blank()
|
||||
|
||||
settings.master_sv_data = {}
|
||||
settings.master_nvram_key = None
|
||||
# reset top menu and go there
|
||||
from actions import goto_top_menu
|
||||
goto_top_menu()
|
||||
|
||||
@ -258,4 +258,22 @@ def detruncate_address(s):
|
||||
start, end = s.split('⋯')
|
||||
return start, end
|
||||
|
||||
def seconds2human_readable(s):
|
||||
# duplicate from shared/utils.py - needed for tests
|
||||
days = s // (3600 * 24)
|
||||
hours = s % (3600 * 24) // 3600
|
||||
minutes = (s % 3600) // 60
|
||||
seconds = (s % 3600) % 60
|
||||
msg = ""
|
||||
if days:
|
||||
msg += "%dd" % days
|
||||
if hours:
|
||||
msg += " %dh" % hours
|
||||
if minutes:
|
||||
msg += " %dm" % minutes
|
||||
if seconds:
|
||||
msg += " %ds" % seconds
|
||||
|
||||
return msg
|
||||
|
||||
# EOF
|
||||
|
||||
@ -1292,7 +1292,6 @@ def test_tmp_upgrade_disabled(reset_seed_words, need_keypress, pick_menu_item,
|
||||
m = cap_menu()
|
||||
assert "Upgrade Firmware" in m
|
||||
node = BIP32Node.from_master_secret(os.urandom(32), netcode="XTN")
|
||||
xfp = node.fingerprint().hex().upper()
|
||||
k0 = node.hwif(as_private=True)
|
||||
import_ephemeral_xprv("sd", extended_key=k0, seed_vault=True, from_main=True)
|
||||
goto_home()
|
||||
|
||||
@ -52,7 +52,8 @@ def restore_seed_xor(set_seed_words, goto_home, pick_menu_item, cap_story,
|
||||
choose_by_word_length, need_keypress, get_secrets,
|
||||
word_menu_entry, verify_ephemeral_secret_ui,
|
||||
confirm_tmp_seed, seed_vault_enable):
|
||||
def doit(parts, expect, incl_self=False, save_to_vault=False):
|
||||
def doit(parts, expect, incl_self=False, save_to_vault=False,
|
||||
is_master_tmp_fail=False):
|
||||
if expect is None:
|
||||
parts, expect = prepare_test_pairs(*parts)
|
||||
|
||||
@ -111,7 +112,15 @@ def restore_seed_xor(set_seed_words, goto_home, pick_menu_item, cap_story,
|
||||
assert 'ZERO WARNING' in body
|
||||
|
||||
need_keypress('2')
|
||||
confirm_tmp_seed(seedvault=save_to_vault)
|
||||
try:
|
||||
confirm_tmp_seed(seedvault=save_to_vault)
|
||||
except AssertionError as e:
|
||||
if is_master_tmp_fail:
|
||||
assert "Cannot use master seed as temporary" in str(e)
|
||||
return
|
||||
else:
|
||||
raise
|
||||
|
||||
verify_ephemeral_secret_ui(mnemonic=expect.split(" "),
|
||||
seed_vault=save_to_vault)
|
||||
assert get_secrets()['mnemonic'] == expect
|
||||
@ -136,15 +145,6 @@ def restore_seed_xor(set_seed_words, goto_home, pick_menu_item, cap_story,
|
||||
'save saddle indicate embrace detail weasel spread life staff mushroom bicycle light',
|
||||
'unlock damp injury tape enhance pause sheriff onion valley panic finger moon'],
|
||||
'drama jeans craft mixture filter lamp invest suggest vacant neutral history swim'),
|
||||
([zero32]*2, zero32),
|
||||
([zero24]*2, zero24),
|
||||
([zero16]*2, zero16),
|
||||
([ones32]*7, ones32),
|
||||
([ones24]*7, ones24),
|
||||
([ones16]*7, ones16),
|
||||
([ones32]*4, zero32),
|
||||
([ones24]*4, zero24),
|
||||
([ones16]*4, zero16),
|
||||
# random generated
|
||||
*random_test_cases()
|
||||
])
|
||||
@ -152,6 +152,24 @@ def test_import_xor(seed_vault, incl_self, parts, expect, restore_seed_xor):
|
||||
restore_seed_xor(parts, expect, incl_self, seed_vault)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('incl_self', [False, True])
|
||||
@pytest.mark.parametrize("parts, expect", [
|
||||
([zero32] * 2, zero32),
|
||||
([zero24] * 2, zero24),
|
||||
([zero16] * 2, zero16),
|
||||
([ones32] * 7, ones32),
|
||||
([ones24] * 7, ones24),
|
||||
([ones16] * 7, ones16),
|
||||
([ones32] * 4, zero32),
|
||||
([ones24] * 4, zero24),
|
||||
([ones16] * 4, zero16),
|
||||
])
|
||||
def test_import_xor_zeros_ones(incl_self, parts, expect, restore_seed_xor):
|
||||
restore_seed_xor(parts, expect, incl_self, False,
|
||||
is_master_tmp_fail=True if incl_self else False)
|
||||
|
||||
|
||||
|
||||
@pytest.mark.parametrize('num_words', [12, 18, 24])
|
||||
@pytest.mark.parametrize('qty', [2, 3, 4])
|
||||
@pytest.mark.parametrize('trng', [False, True])
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
# Transaction Signing. Important.
|
||||
#
|
||||
|
||||
import time, pytest, os, random, pdb, struct, base64, binascii, itertools
|
||||
import time, pytest, os, random, pdb, struct, base64, binascii, itertools, datetime
|
||||
from ckcc_protocol.protocol import CCProtocolPacker, CCProtoError, MAX_TXN_LEN, CCUserRefused
|
||||
from binascii import b2a_hex, a2b_hex
|
||||
from psbt import BasicPSBT, BasicPSBTInput, BasicPSBTOutput, PSBT_IN_REDEEM_SCRIPT
|
||||
@ -12,13 +12,16 @@ from pprint import pprint, pformat
|
||||
from decimal import Decimal
|
||||
from base64 import b64encode, b64decode
|
||||
from helpers import B2A, U2SAT, prandom, fake_dest_addr, make_change_addr, parse_change_back
|
||||
from helpers import xfp2str
|
||||
from helpers import xfp2str, seconds2human_readable
|
||||
from pycoin.key.BIP32Node import BIP32Node
|
||||
from constants import ADDR_STYLES, ADDR_STYLES_SINGLE, SIGHASH_MAP
|
||||
from txn import *
|
||||
from ckcc_protocol.constants import STXN_FINALIZE, STXN_VISUALIZE, STXN_SIGNED
|
||||
|
||||
|
||||
SEQUENCE_LOCKTIME_TYPE_FLAG = (1 << 22)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('finalize', [ False, True ])
|
||||
def test_sign1(dev, need_keypress, finalize):
|
||||
in_psbt = a2b_hex(open('data/p2pkh-in-scriptsig.psbt', 'rb').read())
|
||||
@ -628,8 +631,7 @@ def test_change_fraud_path(start_sign, use_regtest, end_sign, case, check_agains
|
||||
_, story = cap_story()
|
||||
assert chg_addr in story
|
||||
assert 'Change back:' not in story
|
||||
|
||||
signed = end_sign(True)
|
||||
end_sign(True)
|
||||
|
||||
@pytest.mark.bitcoind
|
||||
def test_change_fraud_addr(start_sign, end_sign, use_regtest, check_against_bitcoind, cap_story):
|
||||
@ -714,8 +716,7 @@ def test_change_p2sh_p2wpkh(start_sign, end_sign, check_against_bitcoind, use_re
|
||||
#print(story)
|
||||
assert expect_addr in story
|
||||
assert parse_change_back(story) == (Decimal('1.09997082'), [expect_addr])
|
||||
|
||||
signed = end_sign(True)
|
||||
end_sign(True)
|
||||
|
||||
def test_sign_multisig_partial_fail(start_sign, end_sign):
|
||||
|
||||
@ -2217,4 +2218,529 @@ def test_psbt_v2_global_quantities(way, fake_txn, start_sign, end_sign, cap_stor
|
||||
title, story = cap_story()
|
||||
assert "failed" in story or "Invalid PSBT" in story or "Network fee bigger" in story
|
||||
|
||||
|
||||
@pytest.mark.bitcoind
|
||||
@pytest.mark.parametrize("locktime", [
|
||||
0, # zero default
|
||||
False, # current block height
|
||||
800000,
|
||||
1513209600, # 2017-12-14 00:00:00
|
||||
1387324800, # 2013-12-18 00:00:00
|
||||
1294790399, # 2011-11-01 23:59:59
|
||||
1748671747, # 2025-05-31 07:09:07
|
||||
])
|
||||
def test_locktime_ux(use_regtest, bitcoind_d_sim_watch, start_sign, end_sign,
|
||||
microsd_path, cap_story, goto_home, need_keypress,
|
||||
pick_menu_item, bitcoind, locktime):
|
||||
use_regtest()
|
||||
sim = bitcoind_d_sim_watch
|
||||
addr = sim.getnewaddress()
|
||||
bitcoind.supply_wallet.sendtoaddress(addr, 2)
|
||||
bitcoind.supply_wallet.generatetoaddress(1, bitcoind.supply_wallet.getnewaddress())
|
||||
bi = sim.getblockchaininfo()
|
||||
blocks = bi["blocks"]
|
||||
|
||||
success = True
|
||||
if locktime is False:
|
||||
# current height - allowed
|
||||
locktime = blocks
|
||||
|
||||
if locktime < 500000000:
|
||||
# blocks
|
||||
if locktime > blocks:
|
||||
success = False
|
||||
else:
|
||||
# MTP
|
||||
if locktime > datetime.datetime.utcnow().timestamp():
|
||||
success = False
|
||||
|
||||
dest_addr = sim.getnewaddress() # self-spend
|
||||
psbt_resp = sim.walletcreatefundedpsbt([], [{dest_addr: 1.0}], locktime, {"fee_rate": 20})
|
||||
psbt = psbt_resp.get("psbt")
|
||||
psbt_fname = "locktime.psbt"
|
||||
with open(microsd_path(psbt_fname), "w") as f:
|
||||
f.write(psbt)
|
||||
goto_home()
|
||||
pick_menu_item('Ready To Sign')
|
||||
title, story = cap_story()
|
||||
if "Choose PSBT file to be signed" in story:
|
||||
need_keypress("y")
|
||||
pick_menu_item(psbt_fname)
|
||||
time.sleep(0.1)
|
||||
title, story = cap_story()
|
||||
|
||||
assert "WARNING" not in story
|
||||
if locktime != 0:
|
||||
assert "LOCKTIMES" in story
|
||||
assert "Abs Locktime" in story
|
||||
if locktime < 500000000:
|
||||
assert f"This tx can only be spent after block height of {locktime}" in story
|
||||
else:
|
||||
dt = datetime.datetime.utcfromtimestamp(locktime)
|
||||
ux_dt = dt.strftime("%Y-%m-%d %H:%M:%S")
|
||||
assert f"This tx can only be spent after {ux_dt} UTC (MTP)" in story
|
||||
# assert f"This tx can only be spent after {locktime} (unix timestamp)" in story
|
||||
else:
|
||||
assert "LOCKTIMES" not in story
|
||||
|
||||
need_keypress("y") # confirm signing
|
||||
time.sleep(0.1)
|
||||
title, story = cap_story()
|
||||
assert title == 'PSBT Signed'
|
||||
assert "Updated PSBT is:" in story
|
||||
assert "Finalized transaction (ready for broadcast)" in story
|
||||
assert "TXID" in story
|
||||
split_story = story.split("\n\n")
|
||||
story_txid = split_story[-1].split("\n")[-1]
|
||||
signed_psbt_fname = split_story[1]
|
||||
with open(microsd_path(signed_psbt_fname), "r") as f:
|
||||
signed_psbt = f.read().strip()
|
||||
signed_txn_fname = split_story[3]
|
||||
with open(microsd_path(signed_txn_fname), "r") as f:
|
||||
signed_txn = f.read().strip()
|
||||
assert signed_psbt != psbt
|
||||
finalize_res = sim.finalizepsbt(signed_psbt)
|
||||
bitcoind_signed_txn = finalize_res["hex"]
|
||||
assert finalize_res["complete"] is True
|
||||
accept_res = sim.testmempoolaccept([bitcoind_signed_txn])[0]
|
||||
assert accept_res["allowed"] is success
|
||||
assert signed_txn == bitcoind_signed_txn
|
||||
if success:
|
||||
txid = sim.sendrawtransaction(signed_txn)
|
||||
else:
|
||||
with pytest.raises(Exception):
|
||||
sim.sendrawtransaction(signed_txn)
|
||||
txid = accept_res["txid"]
|
||||
assert len(txid) == 64
|
||||
assert txid == story_txid
|
||||
|
||||
|
||||
@pytest.mark.bitcoind
|
||||
@pytest.mark.parametrize("num_ins", [1, 4, 11])
|
||||
@pytest.mark.parametrize("differ", [True, False])
|
||||
@pytest.mark.parametrize("sequence", [0, 1, 50, 65534])
|
||||
def test_nsequence_blockheight_relative_locktime_ux(sequence, use_regtest, bitcoind_d_sim_watch,
|
||||
start_sign, end_sign, microsd_path, cap_story,
|
||||
goto_home, need_keypress, pick_menu_item,
|
||||
bitcoind, num_ins, differ):
|
||||
if differ and (sequence == 0):
|
||||
# this case makes no sense
|
||||
return
|
||||
|
||||
use_regtest()
|
||||
sim = bitcoind_d_sim_watch
|
||||
sim.keypoolrefill(20)
|
||||
for i in range(num_ins):
|
||||
addr = sim.getnewaddress()
|
||||
bitcoind.supply_wallet.sendtoaddress(addr, 1)
|
||||
|
||||
bitcoind.supply_wallet.generatetoaddress(1, bitcoind.supply_wallet.getnewaddress())
|
||||
dest_addr = sim.getnewaddress() # self-spend
|
||||
utxos = sim.listunspent()
|
||||
assert len(utxos) == num_ins
|
||||
|
||||
ins = []
|
||||
num_ins_locked = 0
|
||||
locks = []
|
||||
for i, utxo in enumerate(utxos):
|
||||
confirmations = utxo["confirmations"]
|
||||
lock = (confirmations + sequence) if sequence else 0
|
||||
if i and differ:
|
||||
# not first one (0th) as it should have sequence provided via parametrize
|
||||
# all others decremented by iteration count
|
||||
nSeq = lock - i
|
||||
if nSeq < 0:
|
||||
nSeq = 0
|
||||
else:
|
||||
nSeq = lock
|
||||
|
||||
if nSeq > 0:
|
||||
num_ins_locked += 1
|
||||
locks.append(nSeq)
|
||||
|
||||
# block height based RTL
|
||||
inp = {
|
||||
"txid": utxo["txid"],
|
||||
"vout": utxo["vout"],
|
||||
"sequence": nSeq,
|
||||
}
|
||||
ins.append(inp)
|
||||
|
||||
psbt_resp = sim.walletcreatefundedpsbt(ins, [{dest_addr: (num_ins - 0.1)}], 0, {"fee_rate": 20})
|
||||
psbt = psbt_resp.get("psbt")
|
||||
psbt_fname = "rtl-blockheight.psbt"
|
||||
with open(microsd_path(psbt_fname), "w") as f:
|
||||
f.write(psbt)
|
||||
goto_home()
|
||||
pick_menu_item('Ready To Sign')
|
||||
title, story = cap_story()
|
||||
if "Choose PSBT file to be signed" in story:
|
||||
need_keypress("y")
|
||||
pick_menu_item(psbt_fname)
|
||||
time.sleep(0.1)
|
||||
title, story = cap_story()
|
||||
|
||||
assert "WARNING" not in story
|
||||
if sequence:
|
||||
assert "TX LOCKTIMES" in story
|
||||
assert "Block height RTL" in story
|
||||
if num_ins_locked == 1:
|
||||
assert ("has relative block height timelock of %d" % lock) in story
|
||||
else:
|
||||
if differ:
|
||||
assert ("%d inputs have relative block height timelock." % num_ins_locked) in story
|
||||
for i in range(num_ins_locked):
|
||||
if not (("%d. " % i) in story):
|
||||
assert "only 10 with highest values" in story
|
||||
else:
|
||||
assert ("%d inputs have relative block height timelock of %d" % (num_ins_locked, lock)) in story
|
||||
else:
|
||||
assert "TX LOCKTIMES" not in story
|
||||
|
||||
need_keypress("y") # confirm signing
|
||||
time.sleep(0.1)
|
||||
title, story = cap_story()
|
||||
assert title == 'PSBT Signed'
|
||||
assert "Updated PSBT is:" in story
|
||||
assert "Finalized transaction (ready for broadcast)" in story
|
||||
assert "TXID" in story
|
||||
split_story = story.split("\n\n")
|
||||
story_txid = split_story[-1].split("\n")[-1]
|
||||
signed_psbt_fname = split_story[1]
|
||||
with open(microsd_path(signed_psbt_fname), "r") as f:
|
||||
signed_psbt = f.read().strip()
|
||||
signed_txn_fname = split_story[3]
|
||||
with open(microsd_path(signed_txn_fname), "r") as f:
|
||||
signed_txn = f.read().strip()
|
||||
assert signed_psbt != psbt
|
||||
finalize_res = sim.finalizepsbt(signed_psbt)
|
||||
bitcoind_signed_txn = finalize_res["hex"]
|
||||
assert finalize_res["complete"] is True
|
||||
accept_res = sim.testmempoolaccept([bitcoind_signed_txn])[0]
|
||||
if sequence == 0:
|
||||
assert accept_res["allowed"]
|
||||
return
|
||||
|
||||
assert accept_res["allowed"] is False
|
||||
assert accept_res["reject-reason"] == 'non-BIP68-final'
|
||||
if sequence > 50:
|
||||
# not gonna mine 65k blocks
|
||||
return
|
||||
sim.generatetoaddress(sequence, bitcoind.supply_wallet.getnewaddress()) # mine N blocks
|
||||
accept_res = sim.testmempoolaccept([bitcoind_signed_txn])[0]
|
||||
assert accept_res["allowed"] is True
|
||||
assert signed_txn == bitcoind_signed_txn
|
||||
txid = sim.sendrawtransaction(signed_txn)
|
||||
assert len(txid) == 64
|
||||
assert txid == story_txid
|
||||
|
||||
|
||||
@pytest.mark.bitcoind
|
||||
@pytest.mark.veryslow
|
||||
@pytest.mark.parametrize("num_ins", [1, 4, 11])
|
||||
@pytest.mark.parametrize("differ", [True, False])
|
||||
@pytest.mark.parametrize("seconds", [512, 10000, 1000000, 33554431])
|
||||
def test_nsequence_timebased_relative_locktime_ux(seconds, use_regtest, bitcoind_d_sim_watch, start_sign,
|
||||
microsd_path, cap_story, goto_home, need_keypress,
|
||||
pick_menu_item, bitcoind, end_sign, num_ins, differ):
|
||||
sequence = SEQUENCE_LOCKTIME_TYPE_FLAG | (seconds >> 9)
|
||||
use_regtest()
|
||||
sim = bitcoind_d_sim_watch
|
||||
sim.keypoolrefill(20)
|
||||
for i in range(num_ins):
|
||||
addr = sim.getnewaddress()
|
||||
bitcoind.supply_wallet.sendtoaddress(addr, 1)
|
||||
|
||||
bitcoind.supply_wallet.generatetoaddress(1, bitcoind.supply_wallet.getnewaddress())
|
||||
dest_addr = sim.getnewaddress() # self-spend
|
||||
utxos = sim.listunspent()
|
||||
assert len(utxos) == num_ins
|
||||
|
||||
bi = sim.getblockchaininfo()
|
||||
|
||||
ins = []
|
||||
num_ins_locked = 0
|
||||
for i, utxo in enumerate(utxos):
|
||||
# time-based RTL
|
||||
if i and differ:
|
||||
nSeq = sequence - (sequence * i)
|
||||
if nSeq < 0:
|
||||
nSeq = 0
|
||||
|
||||
else:
|
||||
nSeq = sequence
|
||||
|
||||
if nSeq > 0:
|
||||
num_ins_locked += 1
|
||||
|
||||
inp = {
|
||||
"txid": utxo["txid"],
|
||||
"vout": utxo["vout"],
|
||||
"sequence": nSeq,
|
||||
}
|
||||
ins.append(inp)
|
||||
|
||||
psbt_resp = sim.walletcreatefundedpsbt(ins, [{dest_addr: (num_ins - 0.1)}], 0, {"fee_rate": 20})
|
||||
psbt = psbt_resp.get("psbt")
|
||||
psbt_fname = "rtl-time.psbt"
|
||||
with open(microsd_path(psbt_fname), "w") as f:
|
||||
f.write(psbt)
|
||||
goto_home()
|
||||
pick_menu_item('Ready To Sign')
|
||||
title, story = cap_story()
|
||||
if "Choose PSBT file to be signed" in story:
|
||||
need_keypress("y")
|
||||
pick_menu_item(psbt_fname)
|
||||
time.sleep(0.1)
|
||||
title, story = cap_story()
|
||||
|
||||
assert "WARNING" not in story
|
||||
assert "TX LOCKTIMES" in story
|
||||
assert "Time-based RTL" in story
|
||||
t_from_seq = (sequence & 0x0000ffff) << 9
|
||||
base_msg = "relative time-based timelock of:\n %s" % seconds2human_readable(t_from_seq)
|
||||
if num_ins_locked == 1:
|
||||
assert ("has " + base_msg) in story
|
||||
else:
|
||||
if differ:
|
||||
assert ("%d inputs have relative time-based timelock." % num_ins_locked) in story
|
||||
for i in range(num_ins_locked):
|
||||
assert ("%d. " % i) in story
|
||||
else:
|
||||
msg1 = "%d inputs have " % num_ins_locked
|
||||
assert (msg1 + base_msg) in story
|
||||
|
||||
need_keypress("y") # confirm signing
|
||||
time.sleep(0.1)
|
||||
title, story = cap_story()
|
||||
assert title == 'PSBT Signed'
|
||||
assert "Updated PSBT is:" in story
|
||||
assert "Finalized transaction (ready for broadcast)" in story
|
||||
assert "TXID" in story
|
||||
split_story = story.split("\n\n")
|
||||
story_txid = split_story[-1].split("\n")[-1]
|
||||
signed_psbt_fname = split_story[1]
|
||||
with open(microsd_path(signed_psbt_fname), "r") as f:
|
||||
signed_psbt = f.read().strip()
|
||||
signed_txn_fname = split_story[3]
|
||||
with open(microsd_path(signed_txn_fname), "r") as f:
|
||||
signed_txn = f.read().strip()
|
||||
assert signed_psbt != psbt
|
||||
finalize_res = sim.finalizepsbt(signed_psbt)
|
||||
bitcoind_signed_txn = finalize_res["hex"]
|
||||
assert finalize_res["complete"] is True
|
||||
accept_res = sim.testmempoolaccept([bitcoind_signed_txn])[0]
|
||||
assert accept_res["allowed"] is False
|
||||
assert accept_res["reject-reason"] == 'non-BIP68-final'
|
||||
if seconds > 512:
|
||||
# not gonna wait for it
|
||||
return
|
||||
# mine blocks - mining increases the timestamp but somehow randomly
|
||||
while True:
|
||||
sim.generatetoaddress(5, bitcoind.supply_wallet.getnewaddress())
|
||||
t = sim.getblockchaininfo()["time"]
|
||||
if (t - bi["time"]) > 600:
|
||||
break
|
||||
accept_res = sim.testmempoolaccept([bitcoind_signed_txn])[0]
|
||||
assert accept_res["allowed"] is True
|
||||
assert signed_txn == bitcoind_signed_txn
|
||||
txid = sim.sendrawtransaction(signed_txn)
|
||||
assert len(txid) == 64
|
||||
assert txid == story_txid
|
||||
|
||||
|
||||
@pytest.mark.bitcoind
|
||||
@pytest.mark.veryslow
|
||||
@pytest.mark.parametrize("abs_lock", [True, False])
|
||||
@pytest.mark.parametrize("num_rtl", [(2,3),(4,7),(8,3),(6,7)])
|
||||
def test_mixed_locktimes(num_rtl, use_regtest, bitcoind_d_sim_watch, start_sign,
|
||||
microsd_path, cap_story, goto_home, need_keypress,
|
||||
pick_menu_item, bitcoind, end_sign, abs_lock):
|
||||
tb, bb = num_rtl
|
||||
num_ins = tb + bb
|
||||
sequence = SEQUENCE_LOCKTIME_TYPE_FLAG | (512 >> 9)
|
||||
use_regtest()
|
||||
sim = bitcoind_d_sim_watch
|
||||
sim.keypoolrefill(20)
|
||||
for i in range(num_ins):
|
||||
addr = sim.getnewaddress()
|
||||
bitcoind.supply_wallet.sendtoaddress(addr, 1)
|
||||
|
||||
bitcoind.supply_wallet.generatetoaddress(1, bitcoind.supply_wallet.getnewaddress())
|
||||
dest_addr = sim.getnewaddress() # self-spend
|
||||
utxos = sim.listunspent()
|
||||
assert len(utxos) == num_ins
|
||||
|
||||
bi = sim.getblockchaininfo()
|
||||
blocks = bi["blocks"]
|
||||
if abs_lock:
|
||||
# absolute locktime smaller then relative
|
||||
locktime = blocks + 10
|
||||
else:
|
||||
locktime = 0
|
||||
|
||||
ins = []
|
||||
for i, utxo in enumerate(utxos):
|
||||
# time-based RTL
|
||||
if i < tb:
|
||||
nSeq = sequence
|
||||
else:
|
||||
confirmations = utxo["confirmations"]
|
||||
nSeq = confirmations + 20 # blocks
|
||||
|
||||
inp = {
|
||||
"txid": utxo["txid"],
|
||||
"vout": utxo["vout"],
|
||||
"sequence": nSeq,
|
||||
}
|
||||
ins.append(inp)
|
||||
|
||||
psbt_resp = sim.walletcreatefundedpsbt(ins, [{dest_addr: (num_ins - 0.1)}], locktime, {"fee_rate": 20})
|
||||
psbt = psbt_resp.get("psbt")
|
||||
psbt_fname = "rtl-mixin-time.psbt"
|
||||
with open(microsd_path(psbt_fname), "w") as f:
|
||||
f.write(psbt)
|
||||
goto_home()
|
||||
pick_menu_item('Ready To Sign')
|
||||
title, story = cap_story()
|
||||
if "Choose PSBT file to be signed" in story:
|
||||
need_keypress("y")
|
||||
pick_menu_item(psbt_fname)
|
||||
time.sleep(0.1)
|
||||
title, story = cap_story()
|
||||
|
||||
assert "WARNING" not in story
|
||||
assert "TX LOCKTIMES" in story
|
||||
assert "Time-based RTL" in story
|
||||
t_from_seq = (sequence & 0x0000ffff) << 9
|
||||
base_msg = "relative time-based timelock of:\n %s" % seconds2human_readable(t_from_seq)
|
||||
msg1 = "%d inputs have " % tb
|
||||
assert (msg1 + base_msg) in story
|
||||
assert "Block height RTL" in story
|
||||
assert ("%d inputs have relative block height timelock of %d" % (bb, 21)) in story
|
||||
|
||||
if abs_lock:
|
||||
assert "Abs Locktime" in story
|
||||
assert f"This tx can only be spent after block height of {locktime}" in story
|
||||
else:
|
||||
assert "Abs Locktime" not in story
|
||||
|
||||
need_keypress("y") # confirm signing
|
||||
time.sleep(0.1)
|
||||
title, story = cap_story()
|
||||
assert title == 'PSBT Signed'
|
||||
assert "Updated PSBT is:" in story
|
||||
assert "Finalized transaction (ready for broadcast)" in story
|
||||
assert "TXID" in story
|
||||
split_story = story.split("\n\n")
|
||||
story_txid = split_story[-1].split("\n")[-1]
|
||||
signed_psbt_fname = split_story[1]
|
||||
with open(microsd_path(signed_psbt_fname), "r") as f:
|
||||
signed_psbt = f.read().strip()
|
||||
signed_txn_fname = split_story[3]
|
||||
with open(microsd_path(signed_txn_fname), "r") as f:
|
||||
signed_txn = f.read().strip()
|
||||
assert signed_psbt != psbt
|
||||
finalize_res = sim.finalizepsbt(signed_psbt)
|
||||
bitcoind_signed_txn = finalize_res["hex"]
|
||||
assert finalize_res["complete"] is True
|
||||
accept_res = sim.testmempoolaccept([bitcoind_signed_txn])[0]
|
||||
assert accept_res["allowed"] is False
|
||||
|
||||
if abs_lock:
|
||||
assert accept_res["reject-reason"] == 'non-final'
|
||||
else:
|
||||
assert accept_res["reject-reason"] == 'non-BIP68-final'
|
||||
|
||||
# try to mine 21 blocks - which should unlock height based inpputs
|
||||
# and also absolute timelock which is smaller than relative
|
||||
# but tx must be still unspendable as time based are still locked
|
||||
sim.generatetoaddress(21, bitcoind.supply_wallet.getnewaddress())
|
||||
accept_res = sim.testmempoolaccept([bitcoind_signed_txn])[0]
|
||||
assert accept_res["allowed"] is False
|
||||
assert accept_res["reject-reason"] == 'non-BIP68-final'
|
||||
|
||||
# mine blocks - mining increases the timestamp but somehow randomly
|
||||
while True:
|
||||
sim.generatetoaddress(5, bitcoind.supply_wallet.getnewaddress())
|
||||
t = sim.getblockchaininfo()["time"]
|
||||
if (t - bi["time"]) > 600:
|
||||
break
|
||||
accept_res = sim.testmempoolaccept([bitcoind_signed_txn])[0]
|
||||
assert accept_res["allowed"] is True
|
||||
assert signed_txn == bitcoind_signed_txn
|
||||
txid = sim.sendrawtransaction(signed_txn)
|
||||
assert len(txid) == 64
|
||||
assert txid == story_txid
|
||||
|
||||
def random_nLockTime_test_cases(num=10):
|
||||
res = []
|
||||
now = datetime.datetime.utcnow()
|
||||
for i in range(num):
|
||||
td = datetime.timedelta(days=i, hours=i+i, seconds=7**i)
|
||||
var = now + td
|
||||
var = var.replace(tzinfo=datetime.timezone.utc)
|
||||
res.append((int(var.timestamp()), var.strftime("%Y-%m-%d %H:%M:%S")))
|
||||
return res
|
||||
|
||||
|
||||
@pytest.mark.parametrize("nLockTime", [
|
||||
(1513209600, "2017-12-14 00:00:00"),
|
||||
(1387324800, "2013-12-18 00:00:00"),
|
||||
(1294790399, "2011-01-11 23:59:59"),
|
||||
(1748671747, "2025-05-31 06:09:07"),
|
||||
*random_nLockTime_test_cases()
|
||||
])
|
||||
def test_timelocks_visualize(start_sign, end_sign, dev, bitcoind, use_regtest,
|
||||
bitcoind_d_sim_watch, nLockTime):
|
||||
# - works on simulator and connected USB real-device
|
||||
nLockTime, expect_ux = nLockTime
|
||||
num_ins = 10
|
||||
use_regtest()
|
||||
bitcoind_d_sim_watch.keypoolrefill(20)
|
||||
for i in range(num_ins):
|
||||
addr = bitcoind_d_sim_watch.getnewaddress()
|
||||
bitcoind.supply_wallet.sendtoaddress(addr, 1)
|
||||
|
||||
bitcoind.supply_wallet.generatetoaddress(1, bitcoind.supply_wallet.getnewaddress())
|
||||
dest_addr = bitcoind_d_sim_watch.getnewaddress() # self-spend
|
||||
utxos = bitcoind_d_sim_watch.listunspent()
|
||||
assert len(utxos) == num_ins
|
||||
|
||||
ins = []
|
||||
for i, utxo in enumerate(utxos):
|
||||
if i % 2 == 0:
|
||||
nSeq = (SEQUENCE_LOCKTIME_TYPE_FLAG | i)
|
||||
else:
|
||||
confirmations = utxo["confirmations"]
|
||||
nSeq = confirmations + (20*i)
|
||||
|
||||
inp = {
|
||||
"txid": utxo["txid"],
|
||||
"vout": utxo["vout"],
|
||||
"sequence": nSeq,
|
||||
}
|
||||
ins.append(inp)
|
||||
|
||||
psbt_resp = bitcoind_d_sim_watch.walletcreatefundedpsbt(
|
||||
ins, [{dest_addr: (num_ins - 0.1)}],
|
||||
nLockTime, {"fee_rate": 20}
|
||||
)
|
||||
psbt = base64.b64decode(psbt_resp.get("psbt"))
|
||||
|
||||
open('debug/locktimes.psbt', 'wb').write(psbt)
|
||||
|
||||
# should be able to sign, but get warning
|
||||
|
||||
# use new feature to have Coldcard return the 'visualization' of transaction
|
||||
start_sign(psbt, False, stxn_flags=STXN_VISUALIZE)
|
||||
story = end_sign(accept=None, expect_txn=False)
|
||||
|
||||
story = story.decode('ascii')
|
||||
assert datetime.datetime.utcfromtimestamp(nLockTime).strftime("%Y-%m-%d %H:%M:%S") == expect_ux
|
||||
assert f"Abs Locktime: This tx can only be spent after {expect_ux} UTC (MTP)" in story
|
||||
assert "Block height RTL: 5 inputs have relative block height timelock" in story
|
||||
# when i=0 in loop time based RTL is zero
|
||||
assert "Time-based RTL: 4 inputs have relative time-based timelock" in story
|
||||
|
||||
# EOF
|
||||
|
||||
Loading…
Reference in New Issue
Block a user