CCC usability improvements;ability to remove all addrs from whitelist (with confirmation)
This commit is contained in:
parent
6779345665
commit
0dedaf353d
@ -149,11 +149,11 @@ class CCCFeature:
|
||||
|
||||
block_h = pol.get("block_h", chains.current_chain().ccc_min_block)
|
||||
if psbt.lock_time <= block_h:
|
||||
raise CCCPolicyViolationError("rewound")
|
||||
raise CCCPolicyViolationError("rewound (%d)" % psbt.lock_time)
|
||||
|
||||
# we won't sign txn unless old height + velocity >= new height
|
||||
if psbt.lock_time < (block_h + velocity):
|
||||
raise CCCPolicyViolationError("velocity")
|
||||
raise CCCPolicyViolationError("velocity (%d)" % psbt.lock_time)
|
||||
|
||||
# Whitelist of outputs addresses
|
||||
wl = pol.get("addrs", None)
|
||||
@ -285,7 +285,13 @@ class CCCConfigMenu(MenuSystem):
|
||||
|
||||
async def debug_last_fail(self, *a):
|
||||
# debug for customers: why did we reject that last txn?
|
||||
msg = 'The most recent policy check failed because of:\n\n"%s"\n\nPress (4) to clear.' \
|
||||
pol = CCCFeature.get_policy()
|
||||
bh = pol.get('block_h', None)
|
||||
msg = ''
|
||||
if bh:
|
||||
msg += "CCC height:\n\n%s\n\n" % bh
|
||||
|
||||
msg += 'The most recent policy check failed because of:\n\n%s\n\nPress (4) to clear.' \
|
||||
% CCCFeature.last_fail_reason
|
||||
ch = await ux_show_story(msg, escape='4')
|
||||
|
||||
@ -406,11 +412,8 @@ class CCCAddrWhitelist(MenuSystem):
|
||||
addrs = CCCFeature.get_policy().get('addrs', [])
|
||||
maxxed = (len(addrs) >= MAX_WHITELIST)
|
||||
|
||||
items = [MenuItem(truncate_address(a), f=self.edit_addr, arg=a) for a in addrs]
|
||||
|
||||
if not items:
|
||||
items.append(MenuItem("(none yet)"))
|
||||
|
||||
items = []
|
||||
# better to show usability options at the top, as we can have up to 25 addresses in the menu
|
||||
if version.has_qr:
|
||||
items.append(MenuItem('Scan QR', f=(self.maxed_out if maxxed else self.scan_qr),
|
||||
shortcut=KEY_QR))
|
||||
@ -418,6 +421,16 @@ class CCCAddrWhitelist(MenuSystem):
|
||||
items.append(MenuItem('Import from File',
|
||||
f=(self.maxed_out if maxxed else self.import_file)))
|
||||
|
||||
# show most recent added addresses at the top of the menu list
|
||||
a_items = [MenuItem(truncate_address(a), f=self.edit_addr, arg=a) for a in addrs[::-1]]
|
||||
|
||||
if a_items:
|
||||
items += a_items
|
||||
if len(a_items) > 1:
|
||||
items.append(MenuItem("Clear Whitelist", f=self.clear_all))
|
||||
else:
|
||||
items.append(MenuItem("(none yet)"))
|
||||
|
||||
return items
|
||||
|
||||
async def edit_addr(self, menu, idx, item):
|
||||
@ -436,6 +449,12 @@ class CCCAddrWhitelist(MenuSystem):
|
||||
CCCFeature.update_policy_key(addrs=addrs)
|
||||
self.update_contents()
|
||||
|
||||
async def clear_all(self, *a):
|
||||
if await ux_confirm("Irreversibly remove all addresses from the whitelist?",
|
||||
confirm_key='4'):
|
||||
CCCFeature.update_policy_key(addrs=[])
|
||||
self.update_contents()
|
||||
|
||||
async def import_file(self, *a):
|
||||
# Import from a file, or NFC.
|
||||
# - simulator: --seq tcENTERENTERsENTERwENTERiENTER1
|
||||
@ -476,6 +495,9 @@ class CCCAddrWhitelist(MenuSystem):
|
||||
with CardSlot(readonly=True, **choice) as card:
|
||||
with open(fn, 'rt') as fd:
|
||||
for ln in fd.readlines():
|
||||
if len(results) >= MAX_WHITELIST:
|
||||
# no need to clog memory and parse more, we're done
|
||||
break
|
||||
for here in pat.split(ln):
|
||||
if len(here) >= 4:
|
||||
try:
|
||||
@ -501,8 +523,10 @@ class CCCAddrWhitelist(MenuSystem):
|
||||
while 1:
|
||||
here = await q.scan_for_addresses("Bitcoin Address(es) to Whitelist", line2=ln)
|
||||
if not here: break
|
||||
got.extend(here)
|
||||
ln = 'Got %d so far. ENTER to apply.' % len(got)
|
||||
for addr in here:
|
||||
if addr not in got:
|
||||
got.append(addr)
|
||||
ln = 'Got %d so far. ENTER to apply.' % len(got)
|
||||
|
||||
if got:
|
||||
# import them
|
||||
|
||||
@ -299,7 +299,7 @@ class BitcoinMain(ChainsBase):
|
||||
# see <https://github.com/bitcoin/bitcoin/blob/master/src/chainparams.cpp#L140>
|
||||
ctype = 'BTC'
|
||||
name = 'Bitcoin Mainnet'
|
||||
ccc_min_block = 865572
|
||||
ccc_min_block = 890584
|
||||
|
||||
slip132 = {
|
||||
AF_CLASSIC: Slip132Version(0x0488B21E, 0x0488ADE4, 'x'),
|
||||
|
||||
@ -4,7 +4,7 @@
|
||||
#
|
||||
import stash, chains, ustruct, ure, uio, sys, ngu, uos, ujson, version
|
||||
from utils import xfp2str, str2xfp, swab32, cleanup_deriv_path, keypath_to_str, to_ascii_printable
|
||||
from utils import str_to_keypath, problem_file_line, parse_extended_key, get_filesize, B2A
|
||||
from utils import str_to_keypath, problem_file_line, parse_extended_key, get_filesize, extract_cosigner
|
||||
from ux import ux_show_story, ux_confirm, ux_dramatic_pause, ux_clear_keys
|
||||
from ux import import_export_prompt, ux_enter_bip32_index, ux_enter_number, OK, X
|
||||
from files import CardSlot, CardMissingError, needs_microsd
|
||||
@ -1211,7 +1211,7 @@ Press (1) to see extended public keys, '''.format(M=M, N=N, name=self.name, exp=
|
||||
|
||||
def kt_my_keypair(self, ri):
|
||||
# Calc my keypair for sending PSBT files.
|
||||
#
|
||||
#
|
||||
|
||||
my_xfp = settings.get('xfp')
|
||||
|
||||
@ -1248,7 +1248,7 @@ Press (1) to see extended public keys, '''.format(M=M, N=N, name=self.name, exp=
|
||||
kp = None
|
||||
for ms in cls.iter_wallets():
|
||||
if my_xfp not in ms.xfp_paths:
|
||||
# we aren't a party to this MS wallet? not supposed to happen, but
|
||||
# we aren't a party to this MS wallet? not supposed to happen, but
|
||||
# easy to handle
|
||||
continue
|
||||
|
||||
@ -1273,7 +1273,7 @@ Press (1) to see extended public keys, '''.format(M=M, N=N, name=self.name, exp=
|
||||
# if implied session key decodes the checksum, it is right
|
||||
ses_key, body = decode_step1(kp, his_pubkey, payload[4:])
|
||||
|
||||
if ses_key:
|
||||
if ses_key:
|
||||
return ses_key, body, xfp
|
||||
|
||||
return None, None, None
|
||||
@ -1625,7 +1625,30 @@ async def validate_xpub_for_ms(obj, af_str, chain, my_xfp, xpubs):
|
||||
async def ms_coordinator_qr(af_str, my_xfp, chain):
|
||||
# Scan a number of JSON files from BBQr w/ derive, xfp and xpub details.
|
||||
#
|
||||
from ux_q1 import QRScannerInteraction
|
||||
from ux_q1 import QRScannerInteraction, decode_qr_result, QRDecodeExplained
|
||||
|
||||
def convertor(got):
|
||||
file_type, _, data = decode_qr_result(got, expect_bbqr=True)
|
||||
if isinstance(data, bytes):
|
||||
# we expect BBQr, but simple QR also possible here
|
||||
data = data.decode()
|
||||
|
||||
if file_type == 'U':
|
||||
data = data.strip()
|
||||
if data[0] == '{' and data[-1] == '}':
|
||||
file_type = 'J'
|
||||
if file_type == 'J':
|
||||
try:
|
||||
import json
|
||||
return json.loads(data)
|
||||
except:
|
||||
raise QRDecodeExplained('Unable to decode JSON data')
|
||||
else:
|
||||
for line in data.split("\n"):
|
||||
if len(line) > 112:
|
||||
l_data = extract_cosigner(line, af_str)
|
||||
if l_data:
|
||||
return l_data
|
||||
|
||||
num_mine = 0
|
||||
num_files = 0
|
||||
@ -1633,10 +1656,9 @@ async def ms_coordinator_qr(af_str, my_xfp, chain):
|
||||
|
||||
msg = 'Scan Exported XPUB from Coldcard'
|
||||
while True:
|
||||
vals = await QRScannerInteraction().scan_json(msg)
|
||||
vals = await QRScannerInteraction().scan_general(msg, convertor)
|
||||
if vals is None:
|
||||
break
|
||||
|
||||
try:
|
||||
is_mine = await validate_xpub_for_ms(vals, af_str, chain, my_xfp, xpubs)
|
||||
except KeyError as e:
|
||||
@ -1669,7 +1691,8 @@ async def ms_coordinator_file(af_str, my_xfp, chain, slot_b=None):
|
||||
# ignore subdirs
|
||||
continue
|
||||
|
||||
if not fn.startswith('ccxp-') or not fn.endswith('.json'):
|
||||
if fn.endswith('.bsms'): pass # allows files with [xfp/p/a/t/h]xpub
|
||||
elif not fn.startswith('ccxp-') or not fn.endswith('.json'):
|
||||
# wrong prefix/suffix: ignore
|
||||
continue
|
||||
|
||||
@ -1685,7 +1708,16 @@ async def ms_coordinator_file(af_str, my_xfp, chain, slot_b=None):
|
||||
|
||||
try:
|
||||
with open(full_fname, 'rt') as fp:
|
||||
vals = ujson.load(fp)
|
||||
try:
|
||||
# CC multisig XPUBs JSON expected
|
||||
vals = ujson.load(fp)
|
||||
except:
|
||||
# try looking for BIP-380 key expression
|
||||
fp.seek(0)
|
||||
for line in fp.readlines():
|
||||
vals = extract_cosigner(line, af_str)
|
||||
if vals:
|
||||
break
|
||||
|
||||
is_mine = await validate_xpub_for_ms(vals, af_str, chain,
|
||||
my_xfp, xpubs)
|
||||
@ -1775,8 +1807,8 @@ async def ondevice_multisig_create(mode='p2wsh', addr_fmt=AF_P2WSH, is_qr=False,
|
||||
await ux_show_story("Need at least one other co-signer (key B).")
|
||||
return
|
||||
|
||||
xpubs.append(a)
|
||||
xpubs.append(c)
|
||||
# master seed is always key0, key C is key1, k2..kn backup keys
|
||||
xpubs = [a, c] + xpubs
|
||||
num_mine += 2
|
||||
|
||||
elif not num_mine:
|
||||
|
||||
@ -8,6 +8,7 @@ from ubinascii import hexlify as b2a_hex
|
||||
from ubinascii import a2b_base64, b2a_base64
|
||||
from charcodes import OUT_CTRL_ADDRESS
|
||||
from uhashlib import sha256
|
||||
from public_constants import MAX_PATH_DEPTH, AF_CLASSIC
|
||||
|
||||
B2A = lambda x: str(b2a_hex(x), 'ascii')
|
||||
|
||||
@ -250,7 +251,6 @@ def cleanup_deriv_path(bin_path, allow_star=False):
|
||||
# - assume 'm' prefix, so '34' becomes 'm/34', etc
|
||||
# - do not assume /// is m/0/0/0
|
||||
# - if allow_star, then final position can be * or *h (wildcard)
|
||||
from public_constants import MAX_PATH_DEPTH
|
||||
|
||||
s = to_ascii_printable(bin_path, strip=True).lower()
|
||||
|
||||
@ -759,4 +759,30 @@ def xor(*args):
|
||||
|
||||
return rv
|
||||
|
||||
def extract_cosigner(data, af_str):
|
||||
# decodes any text, looking for key expression [xfp/p/a/t/h]xpub123
|
||||
# BIP-380 https://github.com/bitcoin/bips/blob/master/bip-0380.mediawiki#key-expressions
|
||||
# only first key expression will be parsed from the data
|
||||
# key origin info is required
|
||||
# failure to find "proper" key expression results in None being returned
|
||||
pub = "%spub" % chains.current_chain().slip132[AF_CLASSIC].hint
|
||||
if pub not in data:
|
||||
return
|
||||
|
||||
o_start = data.find("[")
|
||||
o_end = data.find("]")
|
||||
if 0 <= o_start < o_end:
|
||||
key_orig_info = data[o_start+1:o_end]
|
||||
ss = key_orig_info.split("/")
|
||||
xfp = ss[0]
|
||||
if (len(xfp) == 8) and (data[o_end+1:o_end+1+len(pub)] == pub):
|
||||
deriv = "m"
|
||||
der_nums = "/".join(ss[1:])
|
||||
if der_nums:
|
||||
deriv += ("/" + der_nums)
|
||||
ek = data[o_end+1:o_end+1+112]
|
||||
key_deriv = "%s_deriv" % af_str
|
||||
# emulate coldcard export xpubs
|
||||
return {"xfp": xfp, af_str: ek, key_deriv: deriv}
|
||||
|
||||
# EOF
|
||||
|
||||
@ -339,7 +339,7 @@ def setup_ccc(goto_home, pick_menu_item, cap_story, press_select, pass_word_quiz
|
||||
time.sleep(.1)
|
||||
m = cap_menu()
|
||||
mi_addrs = [a for a in m if '⋯' in a]
|
||||
for mia, addr in zip(mi_addrs, whitelist):
|
||||
for mia, addr in zip(mi_addrs, reversed(whitelist)):
|
||||
_start, _end = mia.split('⋯')
|
||||
assert addr.startswith(_start)
|
||||
assert addr.endswith(_end)
|
||||
@ -384,11 +384,13 @@ def enter_enabled_ccc(goto_home, pick_menu_item, cap_story, press_select, is_q1,
|
||||
|
||||
@pytest.fixture
|
||||
def ccc_ms_setup(microsd_path, virtdisk_path, scan_a_qr, is_q1, cap_menu, pick_menu_item,
|
||||
cap_story, press_select, need_keypress, enter_number):
|
||||
def doit(N=3, b_words=12, way="sd", addr_fmt=AF_P2WSH):
|
||||
cap_story, press_select, need_keypress, enter_number, press_cancel):
|
||||
def doit(N=3, b_words=12, way="sd", addr_fmt=AF_P2WSH, ftype="cc", bbqr=True):
|
||||
|
||||
N2 = N - 2 # how many more signers we need (B keys)
|
||||
|
||||
label = "p2wsh" if addr_fmt == AF_P2WSH else "p2sh_p2wsh"
|
||||
|
||||
res = []
|
||||
for i in range(N2):
|
||||
if isinstance(b_words, int):
|
||||
@ -401,7 +403,6 @@ def ccc_ms_setup(microsd_path, virtdisk_path, scan_a_qr, is_q1, cap_menu, pick_m
|
||||
|
||||
master = BIP32Node.from_master_secret(b39_seed)
|
||||
xfp = master.fingerprint().hex().upper()
|
||||
label = "p2wsh" if addr_fmt == AF_P2WSH else "p2sh_p2wsh"
|
||||
derive = f"m/48h/1h/0h/{'2' if addr_fmt == AF_P2WSH else '1'}h"
|
||||
derived = master.subkey_for_path(derive)
|
||||
|
||||
@ -418,21 +419,24 @@ def ccc_ms_setup(microsd_path, virtdisk_path, scan_a_qr, is_q1, cap_menu, pick_m
|
||||
for fn in glob.glob(path_f('ccxp-*.json')):
|
||||
os.remove(fn) # cleanup as we want to control N
|
||||
|
||||
for fn in glob.glob(path_f('*.bsms')):
|
||||
os.remove(fn) # cleanup as we want to control N
|
||||
|
||||
for d, dd in res:
|
||||
fname = f"ccxp-{dd['xfp']}.json"
|
||||
if ftype == "cc":
|
||||
fname = f"ccxp-{dd['xfp']}.json"
|
||||
conts = json.dumps(dd)
|
||||
else:
|
||||
assert ftype == "bsms"
|
||||
xfp = dd['xfp']
|
||||
deriv = dd[f"{label}_deriv"].replace("m/", "")
|
||||
fname = f"{xfp}.bsms"
|
||||
conts = f"[{xfp}/{deriv}]{dd[label]}"
|
||||
|
||||
with open(path_f(fname), "w") as f:
|
||||
f.write(json.dumps(dd))
|
||||
f.write(conts)
|
||||
|
||||
m = cap_menu()
|
||||
target_mi = None
|
||||
for mi in m:
|
||||
if "Build 2-of-N" in mi:
|
||||
target_mi = mi
|
||||
break
|
||||
else:
|
||||
assert False, "not in CCC menu"
|
||||
|
||||
pick_menu_item(target_mi)
|
||||
pick_menu_item("↳ Build 2-of-N")
|
||||
time.sleep(.1)
|
||||
title, story = cap_story()
|
||||
assert "one other device, as key B" in story
|
||||
@ -447,17 +451,44 @@ def ccc_ms_setup(microsd_path, virtdisk_path, scan_a_qr, is_q1, cap_menu, pick_m
|
||||
press_select()
|
||||
else:
|
||||
need_keypress(KEY_QR)
|
||||
time.sleep(.1)
|
||||
title, story = cap_story()
|
||||
assert title == "Address Format"
|
||||
assert "Press ENTER for default address format (P2WSH" in story
|
||||
assert "press (1) for P2SH-P2WSH" in story
|
||||
if addr_fmt == AF_P2WSH:
|
||||
press_select()
|
||||
else:
|
||||
need_keypress("1")
|
||||
|
||||
for d, dd in res:
|
||||
_, parts = split_qrs(json.dumps(dd), 'J', max_version=20)
|
||||
for p in parts:
|
||||
scan_a_qr(p)
|
||||
if ftype == "cc":
|
||||
conts = json.dumps(dd)
|
||||
tc = "J"
|
||||
else:
|
||||
deriv = dd[f"{label}_deriv"].replace("m/", "")
|
||||
conts = f"[{dd['xfp']}/{deriv}]{dd[label]}"
|
||||
tc = "U"
|
||||
|
||||
if bbqr:
|
||||
_, parts = split_qrs(conts, tc, max_version=20)
|
||||
for p in parts:
|
||||
scan_a_qr(p)
|
||||
time.sleep(.1)
|
||||
else:
|
||||
scan_a_qr(conts)
|
||||
time.sleep(.1)
|
||||
|
||||
time.sleep(.5)
|
||||
|
||||
press_cancel() # after we're done scanning keys, exit QR animation to proceed
|
||||
|
||||
# casual on-device multisig create
|
||||
if addr_fmt == AF_P2WSH:
|
||||
press_select()
|
||||
else:
|
||||
need_keypress("1")
|
||||
if way != "qr":
|
||||
if addr_fmt == AF_P2WSH:
|
||||
press_select()
|
||||
else:
|
||||
need_keypress("1")
|
||||
|
||||
# CCC C key account number
|
||||
enter_number("0")
|
||||
@ -532,7 +563,7 @@ def policy_sign(start_sign, end_sign, cap_story, get_last_violation):
|
||||
if violation:
|
||||
assert ("(%d warning%s below)"% (num_warn, "s" if num_warn > 1 else "")) in story
|
||||
assert "CCC: Violates spending policy. Won't sign." in story
|
||||
assert get_last_violation() == violation
|
||||
assert get_last_violation().startswith(violation)
|
||||
if warn_list:
|
||||
for w in warn_list:
|
||||
assert w in story
|
||||
@ -1160,4 +1191,31 @@ def test_c_key_from_seed_vault(has_candidates, setup_ccc, build_test_seed_vault,
|
||||
assert "MUST delete" in story
|
||||
press_select()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("way", ["sd", "qr"])
|
||||
@pytest.mark.parametrize("ftype", ["cc", "bsms"])
|
||||
@pytest.mark.parametrize("is_bbqr", [True, False])
|
||||
@pytest.mark.parametrize("N", [3, 15])
|
||||
def test_ms_setup_cosigner_import(way, ftype, is_bbqr, N, goto_home, settings_set, setup_ccc,
|
||||
ccc_ms_setup, pick_menu_item, cap_story):
|
||||
if way == "sd" and is_bbqr:
|
||||
pytest.skip("useless")
|
||||
|
||||
goto_home()
|
||||
settings_set("ccc", None)
|
||||
settings_set("multisig", [])
|
||||
|
||||
setup_ccc()
|
||||
keys, target_mi = ccc_ms_setup(N=N, way=way, ftype=ftype, bbqr=is_bbqr)
|
||||
|
||||
pick_menu_item(target_mi)
|
||||
pick_menu_item("Descriptors")
|
||||
pick_menu_item("View Descriptor")
|
||||
time.sleep(.1)
|
||||
_, story = cap_story()
|
||||
desc = story.split("\n\n")[-1]
|
||||
|
||||
for _, obj in keys:
|
||||
assert f"[{obj['xfp'].lower()}/{obj['p2wsh_deriv'].replace('m/', '')}]{obj['p2wsh']}" in desc
|
||||
|
||||
# EOF
|
||||
@ -1623,12 +1623,6 @@ def test_make_airgapped(addr_fmt, acct_num, M_N, goto_home, cap_story, pick_menu
|
||||
assert 0, addr_fmt
|
||||
|
||||
if way == "qr":
|
||||
# first non-json garbage
|
||||
scan_a_qr("aaaaaaaaaaaaaaaaaaaa")
|
||||
time.sleep(1)
|
||||
scr = cap_screen()
|
||||
assert f"Expected JSON data" in scr
|
||||
|
||||
# JSON but wrong
|
||||
_, parts = split_qrs('{"json": "but wrong","missing": "important data"}',
|
||||
'J', max_version=20)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user