diff --git a/shared/ccc.py b/shared/ccc.py index 5afb6525..20db8652 100644 --- a/shared/ccc.py +++ b/shared/ccc.py @@ -149,11 +149,11 @@ class CCCFeature: block_h = pol.get("block_h", chains.current_chain().ccc_min_block) if psbt.lock_time <= block_h: - raise CCCPolicyViolationError("rewound") + raise CCCPolicyViolationError("rewound (%d)" % psbt.lock_time) # we won't sign txn unless old height + velocity >= new height if psbt.lock_time < (block_h + velocity): - raise CCCPolicyViolationError("velocity") + raise CCCPolicyViolationError("velocity (%d)" % psbt.lock_time) # Whitelist of outputs addresses wl = pol.get("addrs", None) @@ -285,7 +285,13 @@ class CCCConfigMenu(MenuSystem): async def debug_last_fail(self, *a): # debug for customers: why did we reject that last txn? - msg = 'The most recent policy check failed because of:\n\n"%s"\n\nPress (4) to clear.' \ + pol = CCCFeature.get_policy() + bh = pol.get('block_h', None) + msg = '' + if bh: + msg += "CCC height:\n\n%s\n\n" % bh + + msg += 'The most recent policy check failed because of:\n\n%s\n\nPress (4) to clear.' \ % CCCFeature.last_fail_reason ch = await ux_show_story(msg, escape='4') @@ -406,11 +412,8 @@ class CCCAddrWhitelist(MenuSystem): addrs = CCCFeature.get_policy().get('addrs', []) maxxed = (len(addrs) >= MAX_WHITELIST) - items = [MenuItem(truncate_address(a), f=self.edit_addr, arg=a) for a in addrs] - - if not items: - items.append(MenuItem("(none yet)")) - + items = [] + # better to show usability options at the top, as we can have up to 25 addresses in the menu if version.has_qr: items.append(MenuItem('Scan QR', f=(self.maxed_out if maxxed else self.scan_qr), shortcut=KEY_QR)) @@ -418,6 +421,16 @@ class CCCAddrWhitelist(MenuSystem): items.append(MenuItem('Import from File', f=(self.maxed_out if maxxed else self.import_file))) + # show most recent added addresses at the top of the menu list + a_items = [MenuItem(truncate_address(a), f=self.edit_addr, arg=a) for a in addrs[::-1]] + + if a_items: + items += a_items + if len(a_items) > 1: + items.append(MenuItem("Clear Whitelist", f=self.clear_all)) + else: + items.append(MenuItem("(none yet)")) + return items async def edit_addr(self, menu, idx, item): @@ -436,6 +449,12 @@ class CCCAddrWhitelist(MenuSystem): CCCFeature.update_policy_key(addrs=addrs) self.update_contents() + async def clear_all(self, *a): + if await ux_confirm("Irreversibly remove all addresses from the whitelist?", + confirm_key='4'): + CCCFeature.update_policy_key(addrs=[]) + self.update_contents() + async def import_file(self, *a): # Import from a file, or NFC. # - simulator: --seq tcENTERENTERsENTERwENTERiENTER1 @@ -476,6 +495,9 @@ class CCCAddrWhitelist(MenuSystem): with CardSlot(readonly=True, **choice) as card: with open(fn, 'rt') as fd: for ln in fd.readlines(): + if len(results) >= MAX_WHITELIST: + # no need to clog memory and parse more, we're done + break for here in pat.split(ln): if len(here) >= 4: try: @@ -501,8 +523,10 @@ class CCCAddrWhitelist(MenuSystem): while 1: here = await q.scan_for_addresses("Bitcoin Address(es) to Whitelist", line2=ln) if not here: break - got.extend(here) - ln = 'Got %d so far. ENTER to apply.' % len(got) + for addr in here: + if addr not in got: + got.append(addr) + ln = 'Got %d so far. ENTER to apply.' % len(got) if got: # import them diff --git a/shared/chains.py b/shared/chains.py index 42e5e9f0..c2f5d7aa 100644 --- a/shared/chains.py +++ b/shared/chains.py @@ -299,7 +299,7 @@ class BitcoinMain(ChainsBase): # see ctype = 'BTC' name = 'Bitcoin Mainnet' - ccc_min_block = 865572 + ccc_min_block = 890584 slip132 = { AF_CLASSIC: Slip132Version(0x0488B21E, 0x0488ADE4, 'x'), diff --git a/shared/multisig.py b/shared/multisig.py index e1396095..d0e15b33 100644 --- a/shared/multisig.py +++ b/shared/multisig.py @@ -4,7 +4,7 @@ # import stash, chains, ustruct, ure, uio, sys, ngu, uos, ujson, version from utils import xfp2str, str2xfp, swab32, cleanup_deriv_path, keypath_to_str, to_ascii_printable -from utils import str_to_keypath, problem_file_line, parse_extended_key, get_filesize, B2A +from utils import str_to_keypath, problem_file_line, parse_extended_key, get_filesize, extract_cosigner from ux import ux_show_story, ux_confirm, ux_dramatic_pause, ux_clear_keys from ux import import_export_prompt, ux_enter_bip32_index, ux_enter_number, OK, X from files import CardSlot, CardMissingError, needs_microsd @@ -1211,7 +1211,7 @@ Press (1) to see extended public keys, '''.format(M=M, N=N, name=self.name, exp= def kt_my_keypair(self, ri): # Calc my keypair for sending PSBT files. - # + # my_xfp = settings.get('xfp') @@ -1248,7 +1248,7 @@ Press (1) to see extended public keys, '''.format(M=M, N=N, name=self.name, exp= kp = None for ms in cls.iter_wallets(): if my_xfp not in ms.xfp_paths: - # we aren't a party to this MS wallet? not supposed to happen, but + # we aren't a party to this MS wallet? not supposed to happen, but # easy to handle continue @@ -1273,7 +1273,7 @@ Press (1) to see extended public keys, '''.format(M=M, N=N, name=self.name, exp= # if implied session key decodes the checksum, it is right ses_key, body = decode_step1(kp, his_pubkey, payload[4:]) - if ses_key: + if ses_key: return ses_key, body, xfp return None, None, None @@ -1625,7 +1625,30 @@ async def validate_xpub_for_ms(obj, af_str, chain, my_xfp, xpubs): async def ms_coordinator_qr(af_str, my_xfp, chain): # Scan a number of JSON files from BBQr w/ derive, xfp and xpub details. # - from ux_q1 import QRScannerInteraction + from ux_q1 import QRScannerInteraction, decode_qr_result, QRDecodeExplained + + def convertor(got): + file_type, _, data = decode_qr_result(got, expect_bbqr=True) + if isinstance(data, bytes): + # we expect BBQr, but simple QR also possible here + data = data.decode() + + if file_type == 'U': + data = data.strip() + if data[0] == '{' and data[-1] == '}': + file_type = 'J' + if file_type == 'J': + try: + import json + return json.loads(data) + except: + raise QRDecodeExplained('Unable to decode JSON data') + else: + for line in data.split("\n"): + if len(line) > 112: + l_data = extract_cosigner(line, af_str) + if l_data: + return l_data num_mine = 0 num_files = 0 @@ -1633,10 +1656,9 @@ async def ms_coordinator_qr(af_str, my_xfp, chain): msg = 'Scan Exported XPUB from Coldcard' while True: - vals = await QRScannerInteraction().scan_json(msg) + vals = await QRScannerInteraction().scan_general(msg, convertor) if vals is None: break - try: is_mine = await validate_xpub_for_ms(vals, af_str, chain, my_xfp, xpubs) except KeyError as e: @@ -1669,7 +1691,8 @@ async def ms_coordinator_file(af_str, my_xfp, chain, slot_b=None): # ignore subdirs continue - if not fn.startswith('ccxp-') or not fn.endswith('.json'): + if fn.endswith('.bsms'): pass # allows files with [xfp/p/a/t/h]xpub + elif not fn.startswith('ccxp-') or not fn.endswith('.json'): # wrong prefix/suffix: ignore continue @@ -1685,7 +1708,16 @@ async def ms_coordinator_file(af_str, my_xfp, chain, slot_b=None): try: with open(full_fname, 'rt') as fp: - vals = ujson.load(fp) + try: + # CC multisig XPUBs JSON expected + vals = ujson.load(fp) + except: + # try looking for BIP-380 key expression + fp.seek(0) + for line in fp.readlines(): + vals = extract_cosigner(line, af_str) + if vals: + break is_mine = await validate_xpub_for_ms(vals, af_str, chain, my_xfp, xpubs) @@ -1775,8 +1807,8 @@ async def ondevice_multisig_create(mode='p2wsh', addr_fmt=AF_P2WSH, is_qr=False, await ux_show_story("Need at least one other co-signer (key B).") return - xpubs.append(a) - xpubs.append(c) + # master seed is always key0, key C is key1, k2..kn backup keys + xpubs = [a, c] + xpubs num_mine += 2 elif not num_mine: diff --git a/shared/utils.py b/shared/utils.py index 3ba60b38..ab9dabf7 100644 --- a/shared/utils.py +++ b/shared/utils.py @@ -8,6 +8,7 @@ from ubinascii import hexlify as b2a_hex from ubinascii import a2b_base64, b2a_base64 from charcodes import OUT_CTRL_ADDRESS from uhashlib import sha256 +from public_constants import MAX_PATH_DEPTH, AF_CLASSIC B2A = lambda x: str(b2a_hex(x), 'ascii') @@ -250,7 +251,6 @@ def cleanup_deriv_path(bin_path, allow_star=False): # - assume 'm' prefix, so '34' becomes 'm/34', etc # - do not assume /// is m/0/0/0 # - if allow_star, then final position can be * or *h (wildcard) - from public_constants import MAX_PATH_DEPTH s = to_ascii_printable(bin_path, strip=True).lower() @@ -759,4 +759,30 @@ def xor(*args): return rv +def extract_cosigner(data, af_str): + # decodes any text, looking for key expression [xfp/p/a/t/h]xpub123 + # BIP-380 https://github.com/bitcoin/bips/blob/master/bip-0380.mediawiki#key-expressions + # only first key expression will be parsed from the data + # key origin info is required + # failure to find "proper" key expression results in None being returned + pub = "%spub" % chains.current_chain().slip132[AF_CLASSIC].hint + if pub not in data: + return + + o_start = data.find("[") + o_end = data.find("]") + if 0 <= o_start < o_end: + key_orig_info = data[o_start+1:o_end] + ss = key_orig_info.split("/") + xfp = ss[0] + if (len(xfp) == 8) and (data[o_end+1:o_end+1+len(pub)] == pub): + deriv = "m" + der_nums = "/".join(ss[1:]) + if der_nums: + deriv += ("/" + der_nums) + ek = data[o_end+1:o_end+1+112] + key_deriv = "%s_deriv" % af_str + # emulate coldcard export xpubs + return {"xfp": xfp, af_str: ek, key_deriv: deriv} + # EOF diff --git a/testing/test_ccc.py b/testing/test_ccc.py index 57ba1f32..aafef4f2 100644 --- a/testing/test_ccc.py +++ b/testing/test_ccc.py @@ -339,7 +339,7 @@ def setup_ccc(goto_home, pick_menu_item, cap_story, press_select, pass_word_quiz time.sleep(.1) m = cap_menu() mi_addrs = [a for a in m if '⋯' in a] - for mia, addr in zip(mi_addrs, whitelist): + for mia, addr in zip(mi_addrs, reversed(whitelist)): _start, _end = mia.split('⋯') assert addr.startswith(_start) assert addr.endswith(_end) @@ -384,11 +384,13 @@ def enter_enabled_ccc(goto_home, pick_menu_item, cap_story, press_select, is_q1, @pytest.fixture def ccc_ms_setup(microsd_path, virtdisk_path, scan_a_qr, is_q1, cap_menu, pick_menu_item, - cap_story, press_select, need_keypress, enter_number): - def doit(N=3, b_words=12, way="sd", addr_fmt=AF_P2WSH): + cap_story, press_select, need_keypress, enter_number, press_cancel): + def doit(N=3, b_words=12, way="sd", addr_fmt=AF_P2WSH, ftype="cc", bbqr=True): N2 = N - 2 # how many more signers we need (B keys) + label = "p2wsh" if addr_fmt == AF_P2WSH else "p2sh_p2wsh" + res = [] for i in range(N2): if isinstance(b_words, int): @@ -401,7 +403,6 @@ def ccc_ms_setup(microsd_path, virtdisk_path, scan_a_qr, is_q1, cap_menu, pick_m master = BIP32Node.from_master_secret(b39_seed) xfp = master.fingerprint().hex().upper() - label = "p2wsh" if addr_fmt == AF_P2WSH else "p2sh_p2wsh" derive = f"m/48h/1h/0h/{'2' if addr_fmt == AF_P2WSH else '1'}h" derived = master.subkey_for_path(derive) @@ -418,21 +419,24 @@ def ccc_ms_setup(microsd_path, virtdisk_path, scan_a_qr, is_q1, cap_menu, pick_m for fn in glob.glob(path_f('ccxp-*.json')): os.remove(fn) # cleanup as we want to control N + for fn in glob.glob(path_f('*.bsms')): + os.remove(fn) # cleanup as we want to control N + for d, dd in res: - fname = f"ccxp-{dd['xfp']}.json" + if ftype == "cc": + fname = f"ccxp-{dd['xfp']}.json" + conts = json.dumps(dd) + else: + assert ftype == "bsms" + xfp = dd['xfp'] + deriv = dd[f"{label}_deriv"].replace("m/", "") + fname = f"{xfp}.bsms" + conts = f"[{xfp}/{deriv}]{dd[label]}" + with open(path_f(fname), "w") as f: - f.write(json.dumps(dd)) + f.write(conts) - m = cap_menu() - target_mi = None - for mi in m: - if "Build 2-of-N" in mi: - target_mi = mi - break - else: - assert False, "not in CCC menu" - - pick_menu_item(target_mi) + pick_menu_item("↳ Build 2-of-N") time.sleep(.1) title, story = cap_story() assert "one other device, as key B" in story @@ -447,17 +451,44 @@ def ccc_ms_setup(microsd_path, virtdisk_path, scan_a_qr, is_q1, cap_menu, pick_m press_select() else: need_keypress(KEY_QR) + time.sleep(.1) + title, story = cap_story() + assert title == "Address Format" + assert "Press ENTER for default address format (P2WSH" in story + assert "press (1) for P2SH-P2WSH" in story + if addr_fmt == AF_P2WSH: + press_select() + else: + need_keypress("1") + for d, dd in res: - _, parts = split_qrs(json.dumps(dd), 'J', max_version=20) - for p in parts: - scan_a_qr(p) + if ftype == "cc": + conts = json.dumps(dd) + tc = "J" + else: + deriv = dd[f"{label}_deriv"].replace("m/", "") + conts = f"[{dd['xfp']}/{deriv}]{dd[label]}" + tc = "U" + + if bbqr: + _, parts = split_qrs(conts, tc, max_version=20) + for p in parts: + scan_a_qr(p) + time.sleep(.1) + else: + scan_a_qr(conts) time.sleep(.1) + time.sleep(.5) + + press_cancel() # after we're done scanning keys, exit QR animation to proceed + # casual on-device multisig create - if addr_fmt == AF_P2WSH: - press_select() - else: - need_keypress("1") + if way != "qr": + if addr_fmt == AF_P2WSH: + press_select() + else: + need_keypress("1") # CCC C key account number enter_number("0") @@ -532,7 +563,7 @@ def policy_sign(start_sign, end_sign, cap_story, get_last_violation): if violation: assert ("(%d warning%s below)"% (num_warn, "s" if num_warn > 1 else "")) in story assert "CCC: Violates spending policy. Won't sign." in story - assert get_last_violation() == violation + assert get_last_violation().startswith(violation) if warn_list: for w in warn_list: assert w in story @@ -1160,4 +1191,31 @@ def test_c_key_from_seed_vault(has_candidates, setup_ccc, build_test_seed_vault, assert "MUST delete" in story press_select() + +@pytest.mark.parametrize("way", ["sd", "qr"]) +@pytest.mark.parametrize("ftype", ["cc", "bsms"]) +@pytest.mark.parametrize("is_bbqr", [True, False]) +@pytest.mark.parametrize("N", [3, 15]) +def test_ms_setup_cosigner_import(way, ftype, is_bbqr, N, goto_home, settings_set, setup_ccc, + ccc_ms_setup, pick_menu_item, cap_story): + if way == "sd" and is_bbqr: + pytest.skip("useless") + + goto_home() + settings_set("ccc", None) + settings_set("multisig", []) + + setup_ccc() + keys, target_mi = ccc_ms_setup(N=N, way=way, ftype=ftype, bbqr=is_bbqr) + + pick_menu_item(target_mi) + pick_menu_item("Descriptors") + pick_menu_item("View Descriptor") + time.sleep(.1) + _, story = cap_story() + desc = story.split("\n\n")[-1] + + for _, obj in keys: + assert f"[{obj['xfp'].lower()}/{obj['p2wsh_deriv'].replace('m/', '')}]{obj['p2wsh']}" in desc + # EOF \ No newline at end of file diff --git a/testing/test_multisig.py b/testing/test_multisig.py index ddd47b98..03b8dbc5 100644 --- a/testing/test_multisig.py +++ b/testing/test_multisig.py @@ -1623,12 +1623,6 @@ def test_make_airgapped(addr_fmt, acct_num, M_N, goto_home, cap_story, pick_menu assert 0, addr_fmt if way == "qr": - # first non-json garbage - scan_a_qr("aaaaaaaaaaaaaaaaaaaa") - time.sleep(1) - scr = cap_screen() - assert f"Expected JSON data" in scr - # JSON but wrong _, parts = split_qrs('{"json": "but wrong","missing": "important data"}', 'J', max_version=20)