diff --git a/shared/auth.py b/shared/auth.py index 889e9ca8..e5db7da0 100644 --- a/shared/auth.py +++ b/shared/auth.py @@ -1389,10 +1389,11 @@ def usb_show_address(addr_format, subpath): class NewEnrollRequest(UserAuthorizedAction): - def __init__(self, ms, auto_export=False): + def __init__(self, ms, auto_export=False, bsms_index=None): super().__init__() self.wallet = ms self.auto_export = auto_export + self.bsms_index = bsms_index # self.result ... will be re-serialized xpub @@ -1404,6 +1405,10 @@ class NewEnrollRequest(UserAuthorizedAction): ch = await ms.confirm_import() if ch == 'y': + if self.bsms_index is not None: + # remove signer round 2 from settings after multisig import is approved by user + from bsms import BSMSSettings + BSMSSettings.signer_delete(self.bsms_index) if self.auto_export: # save cosigner details now too await ms.export_wallet_file('created on', @@ -1422,9 +1427,22 @@ class NewEnrollRequest(UserAuthorizedAction): sys.print_exception(exc) finally: UserAuthorizedAction.cleanup() # because no results to store - self.pop_menu() + if self.bsms_index is not None: + # bsms special case, get him back to multisig menu + from ux import the_ux, restore_menu + from multisig import MultisigMenu + while 1: + top = the_ux.top_of_stack() + if not top: break + if not isinstance(top, MultisigMenu): + the_ux.pop() + continue + break + restore_menu() + else: + self.pop_menu() -def maybe_enroll_xpub(sf_len=None, config=None, name=None, ux_reset=False): +def maybe_enroll_xpub(sf_len=None, config=None, name=None, ux_reset=False, bsms_index=None): # Offer to import (enroll) a new multisig wallet. Allow reject by user. from multisig import MultisigWallet @@ -1438,7 +1456,7 @@ def maybe_enroll_xpub(sf_len=None, config=None, name=None, ux_reset=False): # and be shown on screen/over usb ms = MultisigWallet.from_file(config, name=name) - UserAuthorizedAction.active_request = NewEnrollRequest(ms) + UserAuthorizedAction.active_request = NewEnrollRequest(ms, bsms_index=bsms_index) if ux_reset: # for USB case, and import from PSBT diff --git a/shared/bsms.py b/shared/bsms.py new file mode 100644 index 00000000..69bf5226 --- /dev/null +++ b/shared/bsms.py @@ -0,0 +1,1095 @@ +# (c) Copyright 2022 by Coinkite Inc. This file is covered by license found in COPYING-CC. +# +# bsms.py - Bitcoin Secure Multisig Setup: BIP-129 +# +# For faster testing... +# ./simulator.py --seq 99y3y4y +# +import ngu, os, stash, chains +from ubinascii import b2a_base64, a2b_base64 +from ubinascii import unhexlify as a2b_hex +from ubinascii import hexlify as b2a_hex + +from public_constants import AF_P2WSH, AF_P2WSH_P2SH, AF_CLASSIC, MAX_SIGNERS +from utils import xfp2str, problem_file_line, import_prompt_builder, export_prompt_builder +from menu import MenuSystem, MenuItem +from files import CardSlot, CardMissingError, needs_microsd +from ux import ux_show_story, ux_enter_number, restore_menu, ux_input_numbers, ux_spinner_edit +from ux import the_ux +from descriptor import MultisigDescriptor, append_checksum + + +BSMS_VERSION = "BSMS 1.0" +ALLOWED_PATH_RESTRICTIONS = "/0/*,/1/*" + +ENCRYPTION_TYPES = { + "1": "STANDARD", + "2": "EXTENDED", + "3": "NO ENCRYPTION" +} + +class RejectAutoCollection(BaseException): + pass + +class BSMSOutOfSpace(RuntimeError): + # should not be a concern on Mk4 and later; just in case, handle well. + pass + +def exceptions_handler(f): + nice_name = " ".join(f.__name__.split("_")).replace("bsms", "BSMS") + async def new_func(*args): + try: + await f(*args) + except BaseException as e: + await ux_show_story(title="FAILURE", msg='%s\n\n%s failed\n%s' % (e, nice_name, problem_file_line(e))) + return new_func + + +def normalize_token(token_hex): + if token_hex[:2] in ["0x", "0X"]: + token_hex = token_hex[2:] # remove 0x prefix + return token_hex + + +def validate_token(token_hex): + if token_hex == "00": + return + try: + int(token_hex, 16) + except: + raise ValueError("Invalid token: %s" % token_hex) + if len(token_hex) not in [16, 32]: + raise ValueError("Invalid token length. Expected 64 or 128 bits (16 or 32 hex characters)") + + +def key_derivation_function(token_hex): + if token_hex == "00": + return + return ngu.hash.pbkdf2_sha512("No SPOF", a2b_hex(token_hex), 2048)[:32] + + +def hmac_key(key): + return ngu.hash.sha256s(key) + + +def msg_auth_code(key, token_hex, data): + msg_str = token_hex + data + msg_bytes = bytes(msg_str, "utf-8") + return ngu.hmac.hmac_sha256(key, msg_bytes) + + +def bsms_decrypt(key, data_bytes): + mac, ciphertext = data_bytes[:32], data_bytes[32:] + iv = mac[:16] + decrypt = ngu.aes.CTR(key, iv) + decrypted = decrypt.cipher(ciphertext) + try: + plaintext = decrypted.decode() + if not plaintext.startswith("BSMS"): + raise ValueError + return plaintext + except: + # failed decryption + return "" + + +def bsms_encrypt(key, token_hex, data_str): + hmac_k = hmac_key(key) + mac = msg_auth_code(hmac_k, token_hex, data_str) + iv = mac[:16] + encrypt = ngu.aes.CTR(key, iv) + ciphertext = encrypt.cipher(data_str) + + return mac + ciphertext + + +def signer_data_round1(token_hex, desc_type_key, key_description, sig_bytes=None): + result = "%s\n" % BSMS_VERSION + result += "%s\n" % token_hex + result += "%s\n" % desc_type_key + result += "%s" % key_description + + if sig_bytes: + sig = b2a_base64(sig_bytes).decode().strip() + result += "\n" + sig + + return result + + +def coordinator_data_round2(desc_template, addr, path_restrictions=ALLOWED_PATH_RESTRICTIONS): + result = "%s\n" % BSMS_VERSION + result += "%s\n" % desc_template + result += "%s\n" % path_restrictions + result += "%s" % addr + + return result + + +def token_summary(tokens): + if len(tokens) == 1: + return tokens[0] + + numbered_tokens = ["%d. %s" % (i, token) for i, token in enumerate(tokens, start=1)] + return "\n\n".join(numbered_tokens) + + +def coordinator_summary(M, N, addr_fmt, et, tokens): + addr_fmt_str = "p2wsh" if addr_fmt == AF_P2WSH else "p2sh-p2wsh" + summary = "%d of %d\n\n" % (M, N) + summary += "Address format:\n%s\n\n" % addr_fmt_str + summary += "Encryption type:\n%s\n\n" % ENCRYPTION_TYPES[et] + + if tokens: + summary += "Tokens:\n" + token_summary(tokens) + "\n\n" + + return summary + + +class BSMSSettings: + # keys in settings object + BSMS_SETTINGS = "bsms" + BSMS_SIGNER_SETTINGS = "s" + BSMS_COORD_SETTINGS = "c" + + @classmethod + def save(cls, updated_settings, orig): + try: + updated_settings.save() + except: + # back out change; no longer sure of NVRAM state + try: + updated_settings.set(cls.BSMS_SETTINGS, orig) + updated_settings.save() + except: + pass # give up on recovery + raise BSMSOutOfSpace + + @classmethod + def add(cls, who, value): + from glob import settings + + settings_bsms = settings.get(cls.BSMS_SETTINGS, {}) + orig = settings_bsms.copy() + if who in settings_bsms: + settings_bsms[who].append(value) + else: + settings_bsms[who] = [value] + + settings.set(cls.BSMS_SETTINGS, settings_bsms) + cls.save(settings, orig) + + @classmethod + def delete(cls, who, index): + from glob import settings + + settings_bsms = settings.get(cls.BSMS_SETTINGS, {}) + orig = settings_bsms.copy() + if who in settings_bsms: + try: + settings_bsms[who].pop(index) + settings.set(cls.BSMS_SETTINGS, settings_bsms) + cls.save(settings, orig) + except IndexError: + pass + + @classmethod + def signer_add(cls, token_hex): + cls.add(cls.BSMS_SIGNER_SETTINGS, token_hex) + + @classmethod + def coordinator_add(cls, config_tuple): + cls.add(cls.BSMS_COORD_SETTINGS, config_tuple) + + @classmethod + def signer_delete(cls, index): + cls.delete(cls.BSMS_SIGNER_SETTINGS, index) + + @classmethod + def coordinator_delete(cls, index): + cls.delete(cls.BSMS_COORD_SETTINGS, index) + + @classmethod + def get(cls): + from glob import settings + return settings.get(cls.BSMS_SETTINGS, {}) + + @classmethod + def get_signers(cls): + bsms = cls.get() + return bsms.get(cls.BSMS_SIGNER_SETTINGS, []) + + @classmethod + def get_coordinators(cls): + bsms = cls.get() + return bsms.get(cls.BSMS_COORD_SETTINGS, []) + + +class BSMSMenu(MenuSystem): + @classmethod + def construct(cls): + raise NotImplementedError + + def update_contents(self): + tmp = self.construct() + self.replace_items(tmp) + + +async def user_delete_signer_settings(menu, label, item): + index = item.arg + BSMSSettings.signer_delete(index) + the_ux.pop() + restore_menu() + +async def bsms_signer_detail(menu, label, item): + token_hex = BSMSSettings.get_signers()[item.arg] + # shoulf not raise here, as token is only saved if properly validated + token_dec = str(int(token_hex, 16)) + await ux_show_story("Token HEX:\n%s\n\nToken decimal:\n%s" % (token_hex, token_dec)) + + +async def bsms_coordinator_detail(menu, label, item): + M, N, addr_fmt, et, tokens = BSMSSettings.get_coordinators()[item.arg] + summary = coordinator_summary(M, N, addr_fmt, et, tokens) + await ux_show_story(title="SUMMARY", msg=summary) + + +async def make_bsms_signer_r2_menu(menu, label, item): + index = item.arg + rv = [ + MenuItem('Round 2', f=bsms_signer_round2, arg=index), + MenuItem('Detail', f=bsms_signer_detail, arg=index), + MenuItem('Delete', f=user_delete_signer_settings, arg=index), + ] + return rv + + +class BSMSSignerMenu(BSMSMenu): + @classmethod + def construct(cls): + # Dynamic + rv = [] + signers = BSMSSettings.get_signers() + if signers: + for i, token_hex in enumerate(signers): + label = "%d %s" % (i+1, token_hex[:4]) + rv.append(MenuItem('%s' % label, menu=make_bsms_signer_r2_menu, arg=i)) + rv.append(MenuItem('Round 1', f=bsms_signer_round1)) + + return rv + + +async def user_delete_coordinator_settings(menu, label, item): + index = item.arg + BSMSSettings.coordinator_delete(index) + the_ux.pop() + restore_menu() + + +async def make_bsms_coord_r2_menu(menu, label, item): + index = item.arg + rv = [ + MenuItem('Round 2', f=bsms_coordinator_round2, arg=index), + MenuItem('Detail', f=bsms_coordinator_detail, arg=index), + MenuItem('Delete', f=user_delete_coordinator_settings, arg=index), + ] + return rv + + +class BSMSCoordinatorMenu(BSMSMenu): + @classmethod + def construct(cls): + # Dynamic + rv = [] + coordinators = BSMSSettings.get_coordinators() + if coordinators: + for i, (M, N, addr_fmt, et, tokens) in enumerate(coordinators): + # only p2wsh and p2sh-p2wsh are allowed + if addr_fmt == AF_P2WSH: + af_str = "native" + else: + af_str = "nested" + label = "%d %dof%d_%s_%s" % (i+1, M, N, af_str, et) + rv.append(MenuItem('%s' % label, menu=make_bsms_coord_r2_menu, arg=i)) + rv.append(MenuItem('Create BSMS', f=bsms_coordinator_start)) + + return rv + + +async def make_ms_wallet_bsms_menu(menu, label, item): + from pincodes import pa + + if pa.is_secret_blank(): + await ux_show_story("You must have wallet seed before creating multisig wallets.") + return + + await ux_show_story( +"Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets. " +"On the next screen you choose your role in this process.\n\n" +"WARNING: BSMS is an EXPERIMENTAL and BETA feature which requires supporting implementations " +"on other signing devices to work properly. Please test the final wallet carefully " +"and report any problems to appropriate vendor. Deposit only small test amounts and verify " +"all co-signers can sign transactions before use.") + rv = [ + MenuItem('Signer', menu=make_bsms_signer_menu), + MenuItem('Coordinator', menu=make_bsms_coordinator_menu), + ] + return rv + + +async def make_bsms_signer_menu(menu, label, item): + rv = BSMSSignerMenu.construct() + return BSMSSignerMenu(rv) + + +async def make_bsms_coordinator_menu(menu, label, item): + rv = BSMSCoordinatorMenu.construct() + return BSMSCoordinatorMenu(rv) + + +async def decrypt_nfc_data(key, data): + try: + data_bytes = a2b_hex(data) + data = bsms_decrypt(key, data_bytes) + return data + except: + # will be offered another chance + return + +@exceptions_handler +async def bsms_coordinator_start(*a): + from glob import NFC, dis, settings + xfp = xfp2str(settings.get('xfp', 0)) + # M/N + N = await ux_enter_number('No. of signers?(N)', 15) + assert 2 <= N <= MAX_SIGNERS, "Number of co-signers must be 2-15" + + M = await ux_enter_number("Threshold? (M)", 15) + assert 1 <= M <= N, "M cannot be bigger than N (N=%d)" % N + + ch = await ux_show_story("Default address format is P2WSH.\n\n" + "Press (2) for P2SH-P2WSH instead.", escape='2') + if ch == 'y': + addr_fmt = AF_P2WSH + elif ch == '2': + addr_fmt = AF_P2WSH_P2SH + else: + return + + while 1: + encryption_type = await ux_show_story( + "Choose encryption type. Press (1) for STANDARD encryption, (2) for EXTENDED," + " and (3) for no encryption", escape="123") + + if encryption_type == 'x': return + if encryption_type in "123": + break + + tokens = [] + if encryption_type == "2": + dis.fullscreen('Generating...') + for i in range(N): # each signer different 16 bytes (128bits) nonce/token + tokens.append(b2a_hex(ngu.random.bytes(16)).decode()) + dis.progress_bar_show(i / N) + elif encryption_type == "1": + tokens.append(b2a_hex(ngu.random.bytes(8)).decode()) # all signers same token + + summary = coordinator_summary(M, N, addr_fmt, encryption_type, tokens) + summary += "Press OK to continue, or X to cancel" + ch = await ux_show_story(title="SUMMARY", msg=summary) + if ch != "y": + return + + token_hex = "00" if not tokens else tokens[0] + ch = await ux_show_story("Press (1) to participate as co-signer in this BSMS " + "with current active key [%s] and token '%s'. " + "Press OK to continue normally." % (xfp, token_hex), escape="1") + export_tokens = tokens[:] + if ch == "1": + b4 = len(BSMSSettings.get_signers()) + await bsms_signer_round1(token_hex) + current = BSMSSettings.get_signers() + if len(current) > b4 and token_hex in current: + if encryption_type == "2": + # remove 0th token from the list as we already used that for self + # we do not need this token for export, but still need to store it in settings + export_tokens = tokens[1:] + + force_vdisk = False + title = "BSMS token file(s)" + prompt, escape = export_prompt_builder(title) + if tokens and prompt: + ch = await ux_show_story(prompt, escape=escape) + if ch == '3' and tokens: + force_vdisk = None + await NFC.share_text(token_summary(export_tokens)) + elif ch == "2": + force_vdisk = True + elif ch == '1': + force_vdisk = False + else: + return + + msg = "Success. Coordinator round 1 saved." + if tokens and force_vdisk is not None: + dis.fullscreen("Saving...") + f_pattern = "bsms" + f_names = [] + try: + with CardSlot(force_vdisk=force_vdisk) as card: + for i, token in enumerate(export_tokens, start=1): + f_name = "%s_%s.token" % (f_pattern, token[:4]) + fname, nice = card.pick_filename(f_name) + with open(fname, 'wt') as fd: + fd.write(token) + f_names.append(nice) + dis.progress_bar_show(i / len(tokens)) + except CardMissingError: + await needs_microsd() + return + except Exception as e: + await ux_show_story('Failed to write!\n\n\n' + str(e)) + return + msg = '''%s written.\n\nFiles:\n\n%s''' % (title, "\n\n".join(f_names)) + + BSMSSettings.coordinator_add((M, N, addr_fmt, encryption_type, tokens)) + await ux_show_story(msg) + restore_menu() + + +async def nfc_import_signer_round1_data(N, tkm, et, get_token_func): + from glob import NFC + + all_data = [] + for i in range(N): + token = get_token_func(i) + for attempt in range(2): + prompt = "Share co-signer #%d round-1 data" % (i + 1) + if et == "2": + prompt += " for token starting with %s" % token[:4] + ch = await ux_show_story(prompt) + if ch != "y": + return + + data = await NFC.read_bsms_data() + if et in "12": + encryption_key = key_derivation_function(token) + data = await decrypt_nfc_data(encryption_key, data) + if not data: + fail_msg = "Decryption failed for co-signer #%d" % (i + 1) + if et == "2": + fail_msg += " with token %s" % token[:4] + ch = await ux_show_story( + title="FAILURE", + msg=fail_msg + ". Try again?" if attempt == 0 else fail_msg) # second chance + if ch == "y" and attempt == 0: + continue + else: + return + tkm[token] = encryption_key + + all_data.append(data) + break # exit "second chance" loop + return all_data + +@exceptions_handler +async def bsms_coordinator_round2(menu, label, item): + from glob import NFC, dis + from actions import file_picker + from multisig import make_redeem_script + + bsms_settings_index = item.arg + chain = chains.current_chain() + + # or xpub or tpub as we use descriptors (no SLIP-132 allowed) + ext_key_prefix = "%spub" % chain.slip132[AF_CLASSIC].hint + force_vdisk = False + + # this can be RAM intensive (max 15 F mapped to keys) + # => ((32 + 16) * 15) roughly (actually more with python overhead) + token_key_map = {} + + # choose correct values based on label (index in coordinator bsms settings) + M, N, addr_fmt, et, tokens = BSMSSettings.get_coordinators()[bsms_settings_index] + + def get_token(index): + if len(tokens) == 1 and et == "1": + token = tokens[0] + elif len(tokens) == N and et == "2": + token = tokens[index] + else: + token = "00" + return token + + is_encrypted = et in "12" and tokens + suffix = ".dat" if is_encrypted else ".txt" + mode = "rb" if is_encrypted else "rt" + prompt, escape = import_prompt_builder("co-signer round 1 files") + if prompt: + ch = await ux_show_story(prompt, escape=escape) + if ch == '3': + force_vdisk = None + r1_data = await nfc_import_signer_round1_data(N, token_key_map, et, get_token) + else: + if ch == "1": + force_vdisk = False + else: + force_vdisk = True + + if force_vdisk is not None: + # auto-collection attempt + r1_data = [] + try: + f_pattern = "bsms_sr1" + auto_msg = "Press OK to pick co-signer round 1 files manually, or press (1) to attempt auto-collection." + auto_msg += " For auto-collection to succeed all filenames have to start with '%s'" % f_pattern + auto_msg += " and end with extension '%s'." % suffix + if et == "2": # EXTENDED + auto_msg += (" In addition for EXTENDED encryption all files must contain first four characters of" + " respective token. For example '%s_af9f%s'." % (f_pattern, suffix)) + elif et == "3": # NO_ENCRYPTION + auto_msg += (" In addition for NO ENCRYPTION cases, number of files with above mentioned" + " pattern and suffix must equal number of signers (N).") + auto_msg += " If above is not respected auto-collection fails and defaults to manual selection of files." + ch = await ux_show_story(auto_msg, escape="1") + if ch == "x": return # exit + if ch == "y": raise RejectAutoCollection + # try autodiscovery first - if failed - default to manual input + dis.fullscreen("Collecting...") + file_names = [] + with CardSlot(force_vdisk=force_vdisk) as card: + f_list = os.listdir(card.mountpt) + f_list_len = len(f_list) + for i, name in enumerate(f_list, start=1): + if not card.is_dir(name) and f_pattern in name and name.endswith(suffix): + file_names.append(name) + dis.progress_bar_show(i / f_list_len) + file_names_len = len(file_names) + dis.fullscreen("Validating...") + if et == "1": + # can have multiple of these files - we will try to decrypt all that + # have above pattern. Those that fail will be ignored and at the end + # we check if we have correct num of files (num==N) + token = get_token(0) # STANDARD encryption has just one token + encryption_key = key_derivation_function(token) + token_key_map[token] = encryption_key + + with CardSlot(force_vdisk=force_vdisk) as card: + for i, fname in enumerate(file_names, start=1): + with open(card.abs_path(fname), mode) as f: + data = f.read() + data = bsms_decrypt(encryption_key, data) + if not data: + continue + + assert data.startswith("BSMS"), "Failure - not BSMS file?" + r1_data.append(data) + dis.progress_bar_show(i / file_names_len) + + elif et == "2": + with CardSlot(force_vdisk=force_vdisk) as card: + for i in range(N): + token = get_token(i) + for fname in file_names: + if token[:4] in fname: + with open(card.abs_path(fname), mode) as f: + data = f.read() + encryption_key = key_derivation_function(token) + data = bsms_decrypt(encryption_key, data) + + assert data, "Failed to decrypt %s with token %s" % (fname, token) + assert data.startswith("BSMS"), "Failure - not BSMS file?" + token_key_map[token] = encryption_key + r1_data.append(data) + + break + else: + assert False, "haven't find file for token %s" % token + + dis.progress_bar_show(i / N) + else: + assert file_names_len == N, "Need same number of files (%d) as co-signers(N=%d)"\ + % (file_names_len, N) + + with CardSlot(force_vdisk=force_vdisk) as card: + for i, fname in enumerate(file_names, start=1): + with open(card.abs_path(fname), mode) as f: + data = f.read() + assert data.startswith("BSMS"), "Failure - not BSMS file?" + r1_data.append(data) + dis.progress_bar_show(i / file_names_len) + + assert len(r1_data) == N, "No. of signer round 1 data auto-collected "\ + "does not equal number of signers (N)" + except BaseException as e: + if isinstance(e, RejectAutoCollection): + # raised when user manually chooses not to use auto-collection + msg_prefix = "" + else: + msg_prefix = "Auto-collection failed. Defaulting to manual selection of files. " + + # iterate over N and prompt user to choose correct files + for i in range(N): + token = get_token(i) + f_pick_msg = msg_prefix + f_pick_msg += 'Select co-signer #%d file containing round 1 data' % (i + 1) + if et == "2": + f_pick_msg += " for token starting with %s" % token[:4] + f_pick_msg += '. File extension has to be "%s"' % suffix + for attempt in range(2): # two chances to succeed + + fn = await file_picker(f_pick_msg, min_size=220, max_size=500, + suffix=suffix, force_vdisk=force_vdisk) + if not fn: return + + dis.fullscreen("Wait...") + with CardSlot(force_vdisk=force_vdisk) as card: + dis.progress_bar_show(0.1) + with open(fn, mode) as fd: + data = fd.read() + dis.progress_bar_show(0.3) + if is_encrypted: + encryption_key = key_derivation_function(token) + dis.progress_bar_show(0.6) + data = bsms_decrypt(encryption_key, data) + if not data: + fail_msg = "Decryption failed for co-signer #%d" % (i + 1) + if et == "2": + fail_msg += " with token %s" % token[:4] + ch = await ux_show_story(title="FAILURE", msg=fail_msg + + (" Try again?" if attempt == 0 else fail_msg)) + + if ch == "y" and attempt == 0: + continue + else: + return + + dis.progress_bar_show(0.9) + token_key_map[token] = encryption_key + + r1_data.append(data) + dis.progress_bar_show(1) + + break # break from "second chance loop" + + keys = [] + nodes = [] + dis.fullscreen("Validating...") + for i, data in enumerate(r1_data): + # divided in the loop with number of in-loop occurences of 'dis.progress_bar_show' (currently 5) + i_div_N = (i+1) / N + token = get_token(i) + assert data.startswith(BSMS_VERSION), "Incompatible BSMS version. Need %s got %s" % ( + BSMS_VERSION, data[:9] + ) + version, tok, key_exp, description, sig = data.strip().split("\n") + assert tok == token, "Token mismatch saved %s, received from signer %s" % (token, tok) + koi, ext_key = MultisigDescriptor.parse_key_orig_info(key_exp) + dis.progress_bar_show(i_div_N / 5) + xfp_str, derivation = koi[:8], "m" + koi[8:] + assert ext_key.startswith(ext_key_prefix), "Expected %s, got %s" % ( + ext_key_prefix, ext_key[:4] + ) + node = ngu.hdnode.HDNode() + node.deserialize(ext_key) + dis.progress_bar_show(i_div_N / 4) + msg = signer_data_round1(token, key_exp, description) + digest = chain.hash_message(msg.encode()) + dis.progress_bar_show(i_div_N / 3) + _, recovered_pk = chains.verify_recover_pubkey(a2b_base64(sig), digest) + assert node.pubkey() == recovered_pk, "Recovered key from signature does not equal key provided. Wrong signature?" + dis.progress_bar_show(i_div_N / 2) + keys.append((xfp_str, derivation, ext_key)) + nodes.append(node) + dis.progress_bar_show(i_div_N / 1) + + dis.fullscreen("Generating...") + desc_obj = MultisigDescriptor(M=M, N=N, keys=keys, addr_fmt=addr_fmt) + desc = desc_obj._serialize(int_ext=True) + desc = desc.replace("<0;1>/*", "**") + if not is_encrypted: + # append checksum for unencrypted BSMS + desc = append_checksum(desc) + for i, node in enumerate(nodes): + node.derive(0, False) # external is always first our coordinating "0/*,1/*" + dis.progress_bar_show(i / N) + + script = make_redeem_script(M, nodes, 0) # first address + addr = chain.p2sh_address(addr_fmt, script) + r2_data = coordinator_data_round2(desc, addr) + dis.progress_bar_show(1) + + force_vdisk = False + title = "BSMS descriptor template file(s)" + prompt, escape = export_prompt_builder(title) + if prompt: + ch = await ux_show_story(prompt, escape=escape) + if ch == '3': + if et == "2": + for i, token in enumerate(tokens, start=1): + ch = await ux_show_story("Exporting data for co-signer #%d with token %s" + % (i+1, token[:4])) + if ch != "y": + return + data = bsms_encrypt(token_key_map[token], token, r2_data) + await NFC.share_text(b2a_hex(data).decode()) + elif et == "1": + token = get_token(0) + data = bsms_encrypt(token_key_map[token], token, r2_data) + await NFC.share_text(b2a_hex(data).decode()) + else: + await NFC.share_text(r2_data) + await ux_show_story("All done.") + return + elif ch == "2": + force_vdisk = True + elif ch == '1': + force_vdisk = False + else: + return + + def to_export_generator(): + # save memory + if et == "3": # NO_ENCRYPTION + yield None, r2_data + elif et == "1": # STANDARD + token = get_token(0) + yield token, bsms_encrypt(token_key_map[token], token, r2_data) + else: + # EXTENDED + for token in tokens: + yield token, bsms_encrypt(token_key_map[token], token, r2_data) + + dis.fullscreen("Saving...") + mode = "wb" if is_encrypted else "wt" + f_pattern = "bsms_cr2" + f_names = [] + try: + with CardSlot(force_vdisk=force_vdisk) as card: + for i, (token, data) in enumerate(to_export_generator(), start=1): + f_name = "%s%s%s" % (f_pattern, "_" + token[:4] if et == "2" else "", suffix) + fname, nice = card.pick_filename(f_name) + with open(fname, mode) as fd: + fd.write(data) + f_names.append(nice) + dis.progress_bar_show(i / (len(token_key_map) or 1)) + except CardMissingError: + await needs_microsd() + return + except Exception as e: + await ux_show_story('Failed to write!\n\n\n' + str(e)) + return + msg = '''%s written. Files:\n\n%s''' % (title, "\n\n".join(f_names)) + await ux_show_story(msg) + + +@exceptions_handler +async def bsms_signer_round1(*a): + from glob import dis, NFC, VD, settings + + shortcut = len(a) == 1 + token_int = None + if not shortcut: + prompt = "Press (1) to import token file from SD Card, (2) to input token manually" + prompt += ", (3) for unencrypted BSMS." + escape = "123" + if NFC is not None: + prompt += ", (4) to import via NFC" + escape += "4" + if VD is not None: + prompt += ", (6) to import from Virtual Disk" + escape += "6" + prompt += "." + + ch = await ux_show_story(prompt, escape=escape) + + if ch == '3': + token_hex = "00" + elif ch == "4": + token_hex = await NFC.read_bsms_token() + elif ch == "2": + prompt = "To input token as hex press (1), as decimal press (2)" + escape = "12" + ch = await ux_show_story(prompt, escape=escape) + if ch == "1": + token_hex = await ux_spinner_edit("", hex_only=True) + elif ch == "2": + token_int = await ux_input_numbers("", lambda: True) + token_hex = hex(int(token_int)) + else: + return + elif ch in "16": + from actions import file_picker + force_vdisk = (ch == '6') + + # pick a likely-looking file. + fn = await file_picker('Select file containing the token to be imported. File extension has to be ".token" ' + 'and file has to contain single line with hex encoded token string.', + min_size=15, max_size=35, suffix=".token", force_vdisk=force_vdisk) + if not fn: return + + with CardSlot(force_vdisk=force_vdisk) as card: + with open(fn, 'rt') as fd: + token_hex = fd.read().strip() + else: + return + else: + token_hex = a[0] + + # will raise, exc catched in decorator, FAILURE msg provided + validate_token(token_hex) + token_hex = normalize_token(token_hex) + is_extended = (len(token_hex) == 32) + entered_msg = "%s\n\nhex:\n%s" % (token_int, token_hex) if token_int else token_hex + + if not shortcut: + ch = await ux_show_story("You have entered token:\n" + entered_msg + "\n\nIs token correct?") + if ch != "y": + return + + xfp = xfp2str(settings.get('xfp', 0)) + chain = chains.current_chain() + ch = await ux_show_story( +"Choose co-signer address format for correct SLIP derivation path. Default is 'unknown' as this " +"information may not be known at this point in BSMS. SLIP agnostic path will be chosen. " +"Press (1) for P2WSH. Press (2) for P2SH-P2WSH. " +"Correct SLIP path is completely unnecessary as descriptors (BIP-0380) are used.", + escape='12') + if ch == 'y': + pth_template = "m/129'/{coin}'/{acct_num}'" + af_str = "" + elif ch == '1': + pth_template = "m/48'/{coin}'/{acct_num}'/2'" + af_str = " P2WSH" + elif ch == '2': + pth_template = "m/48'/{coin}'/{acct_num}'/1'" + af_str = " P2SH-P2WSH" + else: + return + + acct_num = await ux_enter_number('Account Number:', 9999) or 0 + + # textual key description + key_description = "Coldcard signer%s account %d" % (af_str, acct_num) + ch = await ux_show_story( +"Choose key description. To continue with default, generated description: '%s' press OK." +"\n\nPress (1) for custom key description." % key_description, escape="1") + + if ch == "1": + key_description = await ux_spinner_edit("", confirm_exit=False) or "" + + key_description_len = len(key_description) + assert key_description_len <= 80, "Key Description: 80 char max (was %d)" % key_description_len + + dis.fullscreen("Wait...") + + with stash.SensitiveValues() as sv: + dis.progress_bar_show(0.1) + + dd = pth_template.format(coin=chain.b44_cointype, acct_num=acct_num) + node = sv.derive_path(dd) + ext_key = chain.serialize_public(node) + + dis.progress_bar_show(0.25) + + desc_type_key = "[%s%s]%s" % (xfp, dd[1:], ext_key) + msg = signer_data_round1(token_hex, desc_type_key, key_description) + digest = chain.hash_message(msg.encode()) + sk = node.privkey() + sv.register(sk) + + dis.progress_bar_show(0.5) + + sig = ngu.secp256k1.sign(sk, digest, 0).to_bytes() + result_data = signer_data_round1(token_hex, desc_type_key, key_description, sig_bytes=sig) + + dis.progress_bar_show(.75) + + encryption_key = key_derivation_function(token_hex) + if encryption_key: + result_data = bsms_encrypt(encryption_key, token_hex, result_data) + + dis.progress_bar_show(1) + + # export round 1 file + force_vdisk = False + title = "BSMS signer round 1 file" + prompt, escape = export_prompt_builder(title) + if prompt: + ch = await ux_show_story(prompt, escape=escape) + if ch == '3': + force_vdisk = None + if isinstance(result_data, bytes): + result_data = b2a_hex(result_data).decode() + await NFC.share_text(result_data) + elif ch == "2": + force_vdisk = True + elif ch == '1': + force_vdisk = False + else: + return + + msg = "Success. Signer round 1 saved." + if force_vdisk is not None: + basename = "bsms_sr1%s" % "_" + token_hex[:4] if is_extended else "bsms_sr1" + f_pattern = basename + ".txt" if encryption_key is None else basename + ".dat" + # choose a filename + try: + with CardSlot(force_vdisk=force_vdisk) as card: + fname, nice = card.pick_filename(f_pattern) + with open(fname, 'wb') as fd: + if isinstance(result_data, str): + result_data = result_data.encode() + fd.write(result_data) + except CardMissingError: + await needs_microsd() + return + except Exception as e: + await ux_show_story('Failed to write!\n\n\n' + str(e)) + return + msg = '''%s written:\n\n%s''' % (title, nice) + BSMSSettings.signer_add(token_hex) + await ux_show_story(msg) + if not shortcut: + restore_menu() + + +@exceptions_handler +async def bsms_signer_round2(menu, label, item): + from glob import NFC, dis, settings + from actions import file_picker + from auth import maybe_enroll_xpub + from multisig import make_redeem_script + + chain = chains.current_chain() + + # or xpub or tpub as we use descriptors (no SLIP132 allowed) + ext_key_prefix = "%spub" % chain.slip132[AF_CLASSIC].hint + force_vdisk = False + + # choose correct values based on label (index in signer bsms settings) + bsms_settings_index = item.arg + token = BSMSSettings.get_signers()[bsms_settings_index] + + decrypt_fail_msg = "Decryption with token %s failed." % token[:4] + is_encrypted = False if token == "00" else True + suffix = ".dat" if is_encrypted else ".txt" + mode = "rb" if is_encrypted else "rt" + + prompt, escape = import_prompt_builder("descriptor template file") + if prompt: + ch = await ux_show_story(prompt, escape=escape) + + if ch == '3': + force_vdisk = None + desc_template_data = await NFC.read_bsms_data() + + if desc_template_data is None: + return + + if is_encrypted: + data_bytes = a2b_hex(desc_template_data) + encryption_key = key_derivation_function(token) + desc_template_data = bsms_decrypt(encryption_key, data_bytes) + assert desc_template_data, decrypt_fail_msg + else: + if ch == "1": + force_vdisk = False + else: + force_vdisk = True + + if force_vdisk is not None: + fn = await file_picker( + 'Select file containing descriptor template from coordinator round 2. ' + 'File extension has to be "%s"' % suffix, + min_size=200, max_size=10000, suffix=suffix, force_vdisk=force_vdisk) + if not fn: return + + with CardSlot(force_vdisk=force_vdisk) as card: + with open(fn, mode) as fd: + desc_template_data = fd.read() + if is_encrypted: + encryption_key = key_derivation_function(token) + desc_template_data = bsms_decrypt(encryption_key, desc_template_data) + assert desc_template_data, decrypt_fail_msg + + dis.fullscreen("Validating...") + assert desc_template_data.startswith(BSMS_VERSION), \ + "Incompatible BSMS version. Need %s got %s" % (BSMS_VERSION, desc_template_data[:9]) + + dis.progress_bar_show(0.05) + version, desc_template, pth_restrictions, addr = desc_template_data.split("\n") + assert pth_restrictions == ALLOWED_PATH_RESTRICTIONS, \ + "Only '%s' allowed as path restrictions. Got %s" % ( + ALLOWED_PATH_RESTRICTIONS, pth_restrictions) + try: + # if checksum is provided we better verify it + MultisigDescriptor.checksum_check(desc_template) + # remove checksum as we need to replace /** + desc_template = desc_template.split("#")[0] + except ValueError: + # missing descriptor checksum - OK + pass + desc = desc_template.replace("/**", "/0/*") + + dis.progress_bar_show(0.1) + desc = append_checksum(desc) + + ms_name = "bsms_" + desc[-4:] + + # will raise ValueError if not "sortedmulti" descriptor script type + desc_obj = MultisigDescriptor.parse(desc) + + dis.progress_bar_show(0.2) + + my_xfp = settings.get('xfp') + my_keys = [] + nodes = [] + progress_counter = 0.2 # last displayed progress + # (desired value after loop - last displayed progress) / N + progress_chunk = (0.5 - progress_counter) / desc_obj.N + for xfp, deriv_path, ext_key in desc_obj.keys: + assert ext_key.startswith(ext_key_prefix), \ + "Expected %s, got %s" % (ext_key_prefix, ext_key[:4]) + node = ngu.hdnode.HDNode() + node.deserialize(ext_key) + if xfp == my_xfp: + my_keys.append((deriv_path, ext_key)) + nodes.append(node) + progress_counter += progress_chunk + dis.progress_bar_show(progress_counter) + + num_my_keys = len(my_keys) + assert num_my_keys <= 1, "Multiple %s keys in descriptor (%d)" % (xfp2str(my_xfp), num_my_keys) + assert num_my_keys == 1, "My key %s missing in descriptor." % xfp2str(my_xfp) + + deriv_path, desc_ext_key = my_keys[0] + with stash.SensitiveValues() as sv: + node = sv.derive_path(deriv_path) + ext_key = chain.serialize_public(node) + assert ext_key == desc_ext_key, "My key %s missing in descriptor." % ext_key + + dis.progress_bar_show(0.55) + + # check address is correct + progress_counter = 0.55 # last displayed progress + # (desired value after loop - last displayed progress) / N + progress_chunk = (0.9 - progress_counter) / desc_obj.N + for node in nodes: + node.derive(0, False) # external is always first in our allowed path restrictions + progress_counter += progress_chunk + dis.progress_bar_show(progress_counter) + + script = make_redeem_script(desc_obj.M, nodes, 0) # first address + dis.progress_bar_show(0.95) + calc_addr = chain.p2sh_address(desc_obj.addr_fmt, script) + + assert calc_addr == addr, "Address mismatch! Calculated %s, got %s" % (calc_addr, addr) + + dis.progress_bar_show(1) + try: + maybe_enroll_xpub(config=desc, name=ms_name, bsms_index=bsms_settings_index) + # bsms_settings_signer_delete(bsms_settings_index) --> moved to auth.py to only be done if actually approved + except Exception as e: + await ux_show_story('Failed to import.\n\n%s\n%s' % (e, problem_file_line(e))) + +# EOF diff --git a/shared/manifest.py b/shared/manifest.py index c9d4a6ee..b8c67edc 100644 --- a/shared/manifest.py +++ b/shared/manifest.py @@ -6,6 +6,7 @@ freeze_as_mpy('', [ 'address_explorer.py', 'auth.py', 'backups.py', + 'bsms.py', 'callgate.py', 'chains.py', 'choosers.py', diff --git a/shared/multisig.py b/shared/multisig.py index 33d7a088..35765701 100644 --- a/shared/multisig.py +++ b/shared/multisig.py @@ -1238,6 +1238,7 @@ class MultisigMenu(MenuSystem): @classmethod def construct(cls): # Dynamic menu with user-defined names of wallets shown + from bsms import make_ms_wallet_bsms_menu if not MultisigWallet.exists(): rv = [MenuItem('(none setup yet)', f=no_ms_yet)] @@ -1250,6 +1251,7 @@ class MultisigMenu(MenuSystem): rv.append(MenuItem('Import from File', f=import_multisig)) rv.append(MenuItem('Import via NFC', f=import_multisig_nfc, predicate=lambda: NFC is not None)) rv.append(MenuItem('Export XPUB', f=export_multisig_xpubs)) + rv.append(MenuItem('BSMS (BIP-129)', menu=make_ms_wallet_bsms_menu)) rv.append(MenuItem('Create Airgapped', f=create_ms_step1)) rv.append(MenuItem('Trust PSBT?', f=trust_psbt_menu)) rv.append(MenuItem('Skip Checks?', f=disable_checks_menu)) diff --git a/shared/nfc.py b/shared/nfc.py index 774d29d4..0939b42f 100644 --- a/shared/nfc.py +++ b/shared/nfc.py @@ -739,4 +739,54 @@ class NFCHandler: return winner + + async def read_bsms_token(self): + data = await self.start_nfc_rx() + if not data: + await ux_show_story('Unable to find data expected in NDEF') + return + + winner = None + for urn, msg, meta in ndef.record_parser(data): + msg = bytes(msg).decode().strip() # from memory view + try: + int(msg, 16) + winner = msg + break + except: pass + + if not winner: + await ux_show_story('Unable to find BSMS token in NDEF data') + return + + return winner + + async def read_bsms_data(self): + data = await self.start_nfc_rx() + if not data: + await ux_show_story('Unable to find data expected in NDEF') + return + + winner = None + for urn, msg, meta in ndef.record_parser(data): + msg = bytes(msg).decode().strip() # from memory view + try: + if "BSMS" in msg: + # unencrypted case + winner = msg + break + elif int(msg[:6], 16): + # encrypted hex case + winner = msg + break + else: + continue + except: pass + + if not winner: + await ux_show_story('Unable to find BSMS data in NDEF data') + return + + return winner + # EOF diff --git a/testing/test_bsms.py b/testing/test_bsms.py new file mode 100644 index 00000000..a9615fa6 --- /dev/null +++ b/testing/test_bsms.py @@ -0,0 +1,1690 @@ +# (c) Copyright 2020 by Coinkite Inc. This file is covered by license found in COPYING-CC. +# +# BSMS-related tests + + +import sys +sys.path.append("../shared") +import pytest, time, pdb, os, random, hashlib, base64 +from constants import simulator_fixed_xprv +from psbt import ser_compact_size +from bsms import CoordinatorSession, Signer +from bsms.encryption import key_derivation_function, decrypt, encrypt +from bsms.util import bitcoin_msg, str2path +from bsms.bip32 import PrvKeyNode, PubKeyNode +from bsms.ecdsa import ecdsa_verify, ecdsa_recover +from bsms.address import p2wsh_address, p2sh_p2wsh_address +from descriptor import MultisigDescriptor, append_checksum + +from pycoin.key.BIP32Node import BIP32Node +from pycoin.contrib.msg_signing import verify_message, sign_message +from pycoin.encoding import from_bytes_32, double_sha256 +from pycoin.ui import address_for_pay_to_script, address_for_pay_to_script_wit +from pycoin.tx.pay_to.ScriptMultisig import ScriptMultisig + + +BSMS_VERSION = "BSMS 1.0" +ALLOWED_PATH_RESTRICTIONS = "/0/*,/1/*" + + +# keys in settings object +BSMS_SETTINGS = "bsms" +BSMS_SIGNER_SETTINGS = "s" +BSMS_COORD_SETTINGS = "c" + + +et_map = { + "1": "STANDARD", + "2": "EXTENDED", + "3": "NO_ENCRYPTION" +} + +af_map = { + "p2wsh": 14, + "p2sh-p2wsh": 26 +} + + +def coordinator_label(M, N, addr_fmt, et, index=None): + fmt_str = "%dof%d_%s_%s" % (M, N, "native" if addr_fmt == "p2wsh" else "nested", et) + if index: + fmt_str = "%d %s" % (index, fmt_str) + return fmt_str + + +def assert_coord_summary(title, story, M, N, addr_fmt, et): + assert title == "SUMMARY" + assert f"{M} of {N}" in story + assert f"Address format:\n{addr_fmt}" in story + assert f"Encryption type:\n{et_map[et].replace('_', ' ')}" in story + tokens = story.split("\n\n")[3:-1] + if et == "1": + assert len(tokens) == 1 + elif et == "2": + assert len(tokens) == N + else: + assert len(tokens) == 0 + return tokens + +@pytest.fixture +def make_coordinator_round1(settings_remove, settings_get, settings_set, microsd_path, virtdisk_path): + def doit(M, N, addr_fmt, et, way, purge_bsms=True, tokens_only=False): + if purge_bsms: + settings_remove(BSMS_SETTINGS) # clear bsms + bsms = settings_get(BSMS_SETTINGS) or {} + tokens = [] + if et == "1": + tokens = [os.urandom(8).hex()] + elif et == "2": + tokens = [os.urandom(16).hex() for _ in range(N)] + coord_tuple = (M, N, af_map[addr_fmt], et, tokens) + if BSMS_COORD_SETTINGS in bsms: + bsms[BSMS_COORD_SETTINGS].append(coord_tuple) + else: + bsms[BSMS_COORD_SETTINGS] = [coord_tuple] + settings_set(BSMS_SETTINGS, bsms) + if tokens_only: + return tokens + if way == "sd": + path_fn = microsd_path + elif way == "vdisk": + path_fn = virtdisk_path + else: + return tokens + for token_hex in tokens: + basename = "bsms_%s.token" % token_hex[:4] + with open(path_fn(basename), "w") as f: + f.write(token_hex) + return tokens + return doit + + +def bsms_sr1_fname(token, is_extended, suffix, index=None): + fname = "bsms_sr1" + if is_extended: + fname += "_" + token[:4] + else: + if index: # ignores index = 0 + fname += "-" + str(index) + return fname + suffix + + +@pytest.fixture +def make_signer_round1(settings_get, settings_set, settings_remove, microsd_path, virtdisk_path): + def doit(token, way, root_xprv=None, bsms_version=BSMS_VERSION, description=None, purge_bsms=True, + add_to_settings=False, data_only=False, index=None, wrong_sig=False, wrong_encryption=False, slip=False): + is_extended = len(token) == 32 + if purge_bsms: + settings_remove(BSMS_SETTINGS) # clear bsms + if add_to_settings: + bsms = settings_get(BSMS_SETTINGS) or {} + if BSMS_SIGNER_SETTINGS in bsms: + bsms[BSMS_COORD_SETTINGS].append(token) + else: + bsms[BSMS_SIGNER_SETTINGS] = [token] + + if root_xprv: + wk = BIP32Node.from_wallet_key(root_xprv) + else: + wk = BIP32Node.from_master_secret(os.urandom(32), netcode="XTN") + root_xfp = wk.fingerprint().hex() + paths = ["48'/1'/0'/2'", "48'/1'/0'/1'", "0'/1'/0'/0'", "0'", "100'/0'"] + path = random.choice(paths) + sk = wk.subkey_for_path(path) + xpub = sk.hwif(as_private=False) + if slip: + xpub = xpub.replace("tpub", random.choice(["upub", "vpub", "Upub", "Vpub"])) + key_expr = "[%s/%s]%s" % (root_xfp, path, xpub) + data = "%s\n" % bsms_version + data += "%s\n" % token + data += "%s\n" % key_expr + if description is None: + description = "Coldcard Signer %s" % root_xfp + data += "%s" % description + preimage = b'\x18Bitcoin Signed Message:\n' + ser_compact_size(len(data)) + data.encode() + md = double_sha256(preimage) + if wrong_sig: + # hash again to get wrong sig + md = double_sha256(md) + md = from_bytes_32(md) + sig = sign_message(sk, msg_hash=md) + data += "\n%s" % sig + suffix = ".txt" + mode = "wt" + if token != "00": + suffix = ".dat" + mode = "wb" + dkey = key_derivation_function(token) + if wrong_encryption: + wrong = "ffff" + token[4:] + dkey = key_derivation_function(wrong) + data = encrypt(dkey, token, data) + data = bytes.fromhex(data) + if data_only: + return data + if way != "nfc": + if way == "sd": + path_fn = microsd_path + else: + # vdisk + path_fn = virtdisk_path + basename = bsms_sr1_fname(token, is_extended, suffix, index) + # pdb.set_trace() + with open(path_fn(basename), mode) as f: + f.write(data) + return data + + return doit + + +def ms_address_from_descriptor_pycoin(desc_obj: MultisigDescriptor, subpath="0/0", network="XTN"): + nodes = [ + BIP32Node.from_hwif(ek).subkey_for_path(subpath) + for _, _, ek in desc_obj.keys + ] + secs = [node.sec() for node in nodes] + secs.sort() + ms_script = ScriptMultisig(desc_obj.M, secs).script() + if desc_obj.addr_fmt == af_map["p2wsh"]: + address = address_for_pay_to_script_wit(ms_script, network) + else: + # wrapped segwith in p2sh + s256 = hashlib.sha256(ms_script).digest() + ws = b"\x00" + len(s256).to_bytes(1, 'little') + s256 + address = address_for_pay_to_script(ws, network) + return address + + +def ms_address_from_descriptor_bsms(desc_obj: MultisigDescriptor, subpath="0/0", network="XTN"): + testnet = True if network == "XTN" else False + nodes = [ + PubKeyNode.parse(ek).derive_path(str2path(subpath)) + for _, _, ek in desc_obj.keys + ] + secs = [node.sec() for node in nodes] + secs.sort() + if desc_obj.addr_fmt == af_map["p2wsh"]: + address = p2wsh_address(secs, desc_obj.M, sortedmulti=True, testnet=testnet) + else: + address = p2sh_p2wsh_address(secs, desc_obj.M, sortedmulti=True, testnet=testnet) + return address + + +def bsms_cr2_fname(token, is_extended, suffix): + fname = "bsms_cr2" + if is_extended: + fname += "_" + token[:4] + return fname + suffix + + +@pytest.fixture +def make_coordinator_round2(make_coordinator_round1, settings_get, settings_set, microsd_path, virtdisk_path): + def doit(M, N, addr_fmt, et, way, has_ours=True, ours_no=1, path_restrictions=ALLOWED_PATH_RESTRICTIONS, + bsms_version=BSMS_VERSION, sortedmulti=True, wrong_address=False, wrong_encryption=False, + wrong_chain=False, add_checksum=False, wrong_checksum=False): + tokens = make_coordinator_round1(M, N, addr_fmt, et, way=way, purge_bsms=True, tokens_only=True) + range_num = N if has_ours is False else N - ours_no + keys = [] + for _ in range(range_num): + wk = BIP32Node.from_master_secret(os.urandom(32), netcode="XTN") + root_xfp = wk.fingerprint().hex() + paths = ["48'/1'/0'/2'", "48'/1'/0'/1'", "0'/1'/0'/0'", "0'", "100'/0'"] + path = random.choice(paths) + sk = wk.subkey_for_path(path) + xpub = sk.hwif(as_private=False) + keys.append((root_xfp, "m/" + path, xpub)) + if has_ours: + for _ in range(ours_no): + wk = BIP32Node.from_wallet_key(simulator_fixed_xprv) + root_xfp = wk.fingerprint().hex() + paths = ["48'/1'/0'/2'", "48'/1'/0'/1'", "0'/1'/0'/0'", "0'", "100'/0'"] + path = random.choice(paths) + sk = wk.subkey_for_path(path) + xpub = sk.hwif(as_private=False) + keys.append((root_xfp, "m/" + path, xpub)) + + desc_obj = MultisigDescriptor(M=M, N=N, addr_fmt=af_map[addr_fmt], keys=keys) + desc = desc_obj._serialize(int_ext=True) + wcs = append_checksum(desc).split("#")[-1] + desc = desc.replace("/<0;1>/*", "/**") + if add_checksum: + desc = append_checksum(desc) + elif wrong_checksum: + desc = desc + "#" + wcs + if not sortedmulti: + desc = desc.replace("sortedmulti", "multi") + if wrong_chain: + desc = desc.replace("tpub", "xpub", 1) + desc_template = "%s\n" % bsms_version + desc_template += "%s\n" % desc + desc_template += "%s\n" % path_restrictions + if wrong_address: + addr = ms_address_from_descriptor_bsms(desc_obj, subpath="1000/100") + else: + addr = ms_address_from_descriptor_bsms(desc_obj) + desc_template += "%s" % addr + + # create signer artificialy and produce correct descriptor template file + bsms = settings_get(BSMS_SETTINGS) or {} + bsms[BSMS_SIGNER_SETTINGS] = [] # purge + if not tokens: + token = "00" + bsms[BSMS_SIGNER_SETTINGS].append(token) + res = desc_template + else: + token = tokens[0] + # same for STANDARD and EXTENDED --> encrypt + bsms[BSMS_SIGNER_SETTINGS].append(token) + if wrong_encryption: + res = encrypt(key_derivation_function(os.urandom(16).hex()), token, desc_template) + else: + res = encrypt(key_derivation_function(token), token, desc_template) + res = bytes.fromhex(res) + + settings_set(BSMS_SETTINGS, bsms) + if way != "nfc": + if way == "sd": + path_fn = microsd_path + else: + # vdisk + path_fn = virtdisk_path + mode = "wb" if et in ["1", "2"] else "wt" + suffix = ".dat" if et in ["1", "2"] else ".txt" + basename = bsms_cr2_fname(token, et == "2", suffix) + with open(path_fn(basename), mode) as f: + f.write(res) + + return res, token + + return doit + + +@pytest.mark.parametrize("way", ["sd", "nfc", "vdisk"]) +@pytest.mark.parametrize("encryption_type", ["1", "2", "3"]) +@pytest.mark.parametrize("M_N", [(2,2), (3, 5), (15, 15)]) +@pytest.mark.parametrize("addr_fmt", ["p2wsh", "p2sh-p2wsh"]) +def test_coordinator_round1(way, encryption_type, M_N, addr_fmt, clear_ms, goto_home, need_keypress, pick_menu_item, + cap_menu, cap_story, microsd_path, settings_remove, nfc_read_text, virtdisk_path, + settings_get, virtdisk_wipe, microsd_wipe): + M, N = M_N + virtdisk_wipe() + microsd_wipe() + settings_remove(BSMS_SETTINGS) # clear bsms + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Coordinator') + menu = cap_menu() + assert len(menu) == 1 # nothing should be in menu at this point but round 1 + pick_menu_item('Create BSMS') + # choose number of signers N + for num in str(N): + need_keypress(num) + need_keypress("y") + # choose threshold M + for num in str(M): + need_keypress(num) + need_keypress("y") + if addr_fmt == "p2wsh": + need_keypress("y") + else: + need_keypress("2") + time.sleep(0.1) + title, story = cap_story() + assert story == "Choose encryption type. Press (1) for STANDARD encryption, (2) for EXTENDED, and (3) for no encryption" + need_keypress(encryption_type) + time.sleep(0.1) + title, story = cap_story() + tokens = assert_coord_summary(title, story, M, N, addr_fmt, encryption_type) + need_keypress("y") # confirm summary + time.sleep(0.1) + title, story = cap_story() + assert "Press (1) to participate as co-signer in this BSMS" in story + need_keypress("y") # continue normally + time.sleep(0.1) + title, story = cap_story() + if encryption_type == "3": + assert story == "Success. Coordinator round 1 saved." + else: + if way == "sd": + if "Press (1) to save BSMS token file(s) to SD Card" in story: + need_keypress("1") + # else no prompt if both NFC and vdisk disabled + elif way == "nfc": + if "press (3) to share via NFC" not in story: + pytest.skip("NFC disabled") + else: + need_keypress("3") + time.sleep(0.2) + bsms_tokens = nfc_read_text() + time.sleep(0.2) + need_keypress("y") # exit NFC UI simulation + time.sleep(0.5) + else: + # virtual disk + if "press (2) to save to Virtual Disk" not in story: + pytest.skip("Vdisk disabled") + else: + need_keypress("2") + + read_tokens = [] + if way == "nfc" and encryption_type != "3": + read_tokens = bsms_tokens.split("\n\n") + else: + time.sleep(0.2) + _, story = cap_story() + assert 'BSMS token file(s) written' in story + fnames = story.split('\n\n')[2:] + # check token files contains first 4 chars of token + try: + token_start = set([tok.split(" ")[1][:4] for tok in tokens]) + except IndexError: + # only one token - special case without numbering + assert len(tokens) == 1 + token_start = set([tokens[0].split("\n")[1][:4]]) + token_fnames_start = set([fn.replace(".token", "").split("_")[-1].split("-")[0] for fn in fnames]) + assert token_start == token_fnames_start + read_tokens = [] + for fname in fnames: + if way == "vdisk": + path = virtdisk_path(fname) + else: + path = microsd_path(fname) + with open(path, 'rt') as f: + token = f.read().strip() + read_tokens.append(token) + + if encryption_type == "1": + assert len(read_tokens) == 1 + elif encryption_type == "2": + assert len(read_tokens) == N + else: + assert len(tokens) == 0 + + need_keypress("y") # confirm success or files written story + time.sleep(0.1) + menu = cap_menu() + assert len(menu) == 2 + current_coord_menu_item = coordinator_label(M, N, addr_fmt, encryption_type, index=1) + assert menu[0] == current_coord_menu_item + assert menu[1] == "Create BSMS" + # check correct summary in detail + pick_menu_item(menu[0]) + time.sleep(0.1) + menu = cap_menu() + assert len(menu) == 3 + assert menu[0] == "Round 2" + assert menu[1] == "Detail" + assert menu[2] == "Delete" + pick_menu_item("Detail") + time.sleep(0.1) + title, story = cap_story() + assert_coord_summary(title, story, M, N, addr_fmt, encryption_type) + need_keypress("y") + # check correct coord tuple saved + bsms_settings = settings_get(BSMS_SETTINGS) + if BSMS_SIGNER_SETTINGS in bsms_settings: + assert bsms_settings[BSMS_SIGNER_SETTINGS] == [] + coord_settings = bsms_settings[BSMS_COORD_SETTINGS] + assert len(coord_settings) == 1 + assert coord_settings[0] == ( + M, N, af_map[addr_fmt], encryption_type, + [tok.split(" ")[-1].replace("Tokens:\n", "") for tok in tokens] if tokens else [] + ) + # delete coordinator settings + pick_menu_item("Delete") + time.sleep(0.1) + menu = cap_menu() + assert len(menu) == 1 + assert menu[0] == "Create BSMS" + bsms_settings = settings_get(BSMS_SETTINGS) + coord_settings = bsms_settings[BSMS_COORD_SETTINGS] + assert coord_settings == [] + + +@pytest.mark.parametrize("way", ["sd", "nfc", "vdisk"]) +@pytest.mark.parametrize("encryption_type", ["1", "2", "3"]) +@pytest.mark.parametrize("M_N", [(2,2), (3, 5), (15, 15)]) +@pytest.mark.parametrize("addr_fmt", ["p2wsh", "p2sh-p2wsh"]) +def test_signer_round1(way, encryption_type, M_N, addr_fmt, clear_ms, goto_home, need_keypress, pick_menu_item, cap_menu, + cap_story, microsd_path, settings_remove, nfc_read_text, virtdisk_path, settings_get, + make_coordinator_round1, nfc_write_text, virtdisk_wipe, microsd_wipe): + M, N = M_N + virtdisk_wipe() + microsd_wipe() + tokens = make_coordinator_round1(M, N, addr_fmt, encryption_type, way) + if encryption_type != "3": + assert tokens + else: + assert tokens == [] + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Signer') + menu = cap_menu() + assert len(menu) == 1 # nothing should be in menu at this point but round 1 + pick_menu_item('Round 1') + time.sleep(0.1) + title, story = cap_story() + if encryption_type == "3": + token = "00" + need_keypress("3") # no token (unencrypted BSMS) + else: + token = random.choice(tokens) + if way == "sd": + if "Press (1) to import token file from SD Card" in story: + need_keypress("1") + # else no prompt if both NFC and vdisk disabled + elif way == "nfc": + if "(4) to import via NFC" not in story: + pytest.skip("NFC disabled") + else: + need_keypress("4") + time.sleep(0.1) + nfc_write_text(token) + time.sleep(0.4) + else: + # virtual disk + if "(6) to import from Virtual Disk" not in story: + pytest.skip("Vdisk disabled") + else: + need_keypress("6") + + if way != "nfc": + time.sleep(0.2) + _, story = cap_story() + assert ('Select file containing the token to be imported. File extension has to be ".token" ' + 'and file has to contain single line with hex encoded token string.') in story + need_keypress("y") + fname = "bsms_%s.token" % token[:4] + pick_menu_item(fname) + + time.sleep(0.1) + title, story = cap_story() + assert "You have entered token:\n%s" % token in story + need_keypress("y") + time.sleep(0.1) + _, story = cap_story() + # address format a.k.a. SLIP derivation path - ignore and use SLIP agnostic + assert "Choose co-signer address format for correct SLIP derivation path" in story + need_keypress("y") # default + # account number prompt + need_keypress("y") + time.sleep(0.1) + _, story = cap_story() + # textual key description + assert "Choose key description" in story + need_keypress("y") # default + time.sleep(0.1) + title, story = cap_story() + suffix = ".txt" if encryption_type == "3" else ".dat" + mode = "rt" if encryption_type == "3" else "rb" + if way == "sd": + if "Press (1) to save BSMS signer round 1 file to SD Card" in story: + need_keypress("1") + elif way == "nfc": + if "press (3) to share via NFC" not in story: + pytest.skip("NFC disabled") + else: + need_keypress("3") + time.sleep(0.2) + signer_r1 = nfc_read_text() + time.sleep(0.2) + need_keypress("y") # exit NFC UI simulation + time.sleep(0.5) + else: + # virtual disk + if "press (2) to save to Virtual Disk" not in story: + pytest.skip("Vdisk disabled") + else: + need_keypress("2") + + if way == "nfc": + pass + else: + time.sleep(0.2) + _, story = cap_story() + assert 'BSMS signer round 1 file written' in story + fname = story.split('\n\n')[-1] + assert suffix in fname + if encryption_type == "2": + # check token files contains first 4 chars of token or just 00 + assert token[:4] == fname.split(".")[0][-4:] + if way == "vdisk": + path = virtdisk_path(fname) + else: + path = microsd_path(fname) + with open(path, mode) as f: + signer_r1 = f.read() + + bsms = settings_get(BSMS_SETTINGS) + assert len(bsms[BSMS_SIGNER_SETTINGS]) == 1 + assert bsms[BSMS_SIGNER_SETTINGS][0] == token + + if encryption_type in ["1", "2"]: + # decrypt + if isinstance(signer_r1, bytes): + signer_r1 = signer_r1.hex() + signer_r1 = decrypt(key_derivation_function(token), signer_r1) + + version, tok, key_exp, description, sig = signer_r1.strip().split("\n") + assert version == BSMS_VERSION + assert tok == token + close_index = key_exp.find("]") + assert key_exp[0] == "[" and close_index != -1 + key_orig_info = key_exp[1:close_index] # remove brackets + xpub = key_exp[close_index + 1:] + assert xpub[:4] in ["xpub", "tpub"] + xfp, path = key_orig_info.split("/", 1) + # pycoin xpub check + mk = BIP32Node.from_wallet_key(simulator_fixed_xprv) + sk = mk.subkey_for_path(path) + pycoin_xpub = sk.hwif(as_private=False) + assert xpub == pycoin_xpub + # bsms lib xpub check + mk0 = PrvKeyNode.parse(simulator_fixed_xprv, testnet=True) + sk0 = mk0.derive_path(str2path(path)) + bsms_xpub = sk0.extended_public_key() + assert xpub == bsms_xpub + signed_data = "\n".join([version, tok, key_exp, description]) + # verify msg pycoin + assert verify_message(sk, sig, signed_data) + # verify msg bsms lib (pure python ecdsa) + signed_digest = bitcoin_msg(signed_data) + decoded_sig = base64.b64decode(sig) + recovered_sec = ecdsa_recover(signed_digest, decoded_sig) + assert ecdsa_verify(signed_digest, decoded_sig, recovered_sec), "Signature invalid" + + +@pytest.mark.parametrize("way", ["sd", "nfc", "vdisk"]) +@pytest.mark.parametrize("encryption_type", ["1", "2", "3"]) +@pytest.mark.parametrize("M_N", [(2,2), (3, 5), (15, 15)]) +@pytest.mark.parametrize("addr_fmt", ["p2wsh", "p2sh-p2wsh"]) +@pytest.mark.parametrize("auto_collect", [True, False]) +def test_coordinator_round2(way, encryption_type, M_N, addr_fmt, auto_collect, clear_ms, goto_home, need_keypress, + cap_menu, cap_story, microsd_path, settings_remove, nfc_read_text, virtdisk_path, + settings_get, make_coordinator_round1, make_signer_round1, nfc_write_text, + virtdisk_wipe, microsd_wipe, pick_menu_item): + def get_token(index): + if len(tokens) == 1 and encryption_type == "1": + token = tokens[0] + elif len(tokens) == N and encryption_type == "2": + token = tokens[index] + else: + token = "00" + return token + + M, N = M_N + virtdisk_wipe() + microsd_wipe() + tokens = make_coordinator_round1(M, N, addr_fmt, encryption_type, way=way, tokens_only=True) + all_data = [] + for i in range(N): + token = get_token(i) + index = None + if encryption_type != "2": + index = i + 1 + # pdb.set_trace() + all_data.append(make_signer_round1(token, way, purge_bsms=False, index=index)) + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Coordinator') + menu = cap_menu() + assert len(menu) == 2 + coord_menu_item = coordinator_label(M, N, addr_fmt, encryption_type, index=1) + assert coord_menu_item in menu + pick_menu_item(coord_menu_item) + pick_menu_item("Round 2") + time.sleep(0.1) + _, story = cap_story() + if way == "sd": + if "Press (1) to import co-signer round 1 files from SD Card" in story: + need_keypress("1") + # else no prompt if both NFC and vdisk disabled + elif way == "vdisk": + if "(2) to import from Virtual Disk" not in story: + pytest.skip("Vdisk disabled") + else: + need_keypress("2") + else: + # NFC + if "(3) to import via NFC" not in story: + pytest.skip("NFC disabled") + else: + need_keypress("3") + + if way == "nfc": + if auto_collect is True: + pytest.skip("No auto-collection for NFC") + for i, data in enumerate(all_data): + time.sleep(0.1) + title, story = cap_story() + token = get_token(i) + if encryption_type == "2": + expect = "Share co-signer #%d round-1 data for token starting with %s" % (i + 1, token[:4]) + else: + expect = "Share co-signer #%d round-1 data" % (i + 1) + assert expect in story + nfc_write_text(data.hex() if isinstance(data, bytes) else data) + time.sleep(0.1) + # TODO pytest.skip here as length of data is more than 250 + else: + suffix = ".txt" if encryption_type == "3" else ".dat" + time.sleep(0.1) + title, story = cap_story() + assert "Press OK to pick co-signer round 1 files manually, or press (1) to attempt auto-collection." in story + assert "For auto-collection to succeed all filenames have to start with 'bsms_sr1'" in story + suffix_target = "and end with extension '%s'" % suffix + assert suffix_target in story + if encryption_type == "2": + assert "In addition for EXTENDED encryption all files must contain first four characters of respective token." in story + elif encryption_type == "3": + assert ("In addition for NO ENCRYPTION cases, number of files with above mentioned" + " pattern and suffix must equal number of signers (N).") in story + assert "If above is not respected auto-collection fails and defaults to manual selection of files." in story + if auto_collect: + need_keypress("1") + else: + need_keypress("y") # continue with manual selection + for i, _ in enumerate(all_data, start=1): + token = get_token(i - 1) + time.sleep(0.1) + title, story = cap_story() + if encryption_type == "2": + expect = 'Select co-signer #%d file containing round 1 data for token starting with %s' % (i, token[:4]) + else: + expect = 'Select co-signer #%d file containing round 1 data' % i + expect += '. File extension has to be "%s"' % suffix + assert expect in story + need_keypress("y") + menu_item = bsms_sr1_fname(token, encryption_type == "2", suffix, i) + pick_menu_item(menu_item) + + time.sleep(0.1) + _, story = cap_story() + if way == "sd": + if "Press (1) to save BSMS descriptor template file(s) to SD Card" in story: + need_keypress("1") + # else no prompt if both NFC and vdisk disabled + elif way == "nfc": + if "(3) to share via NFC" not in story: + pytest.skip("NFC disabled") + else: + need_keypress("3") + else: + # virtual disk + if "(2) to save to Virtual Disk" not in story: + pytest.skip("Vdisk disabled") + else: + need_keypress("2") + if way == "nfc": + # not implemented because of the fake nfc limit + # pytest skip will be raised before we can get here + pass + else: + if way == "sd": + path_fn = microsd_path + else: + path_fn = virtdisk_path + time.sleep(0.1) + _, story = cap_story() + assert "BSMS descriptor template file(s) written." in story + fnames = story.split("\n\n")[1:] + if encryption_type == "2": + for fname, token in zip(fnames, tokens): + assert token[:4] in fname + descriptor_templates = [] + for fname in fnames: + with open(path_fn(fname), "rt" if encryption_type == "3" else "rb") as f: + desc_temp = f.read() + descriptor_templates.append(desc_temp) + + assert descriptor_templates + if encryption_type == "2": + # each file encrypted with different token/key + templates = set() + for token, desc_template in zip(tokens, descriptor_templates): + plaintext = decrypt(key_derivation_function(token), desc_template.hex()) + assert plaintext + templates.add(plaintext) + assert len(templates) == 1 + # pick last to be the template + the_template = plaintext + elif encryption_type == "1": + # just one template but encrypted + assert len(descriptor_templates) == 1 + plaintext = decrypt(key_derivation_function(get_token(0)), descriptor_templates[0].hex()) + assert plaintext + the_template = plaintext + else: + assert len(descriptor_templates) == 1 + the_template = descriptor_templates[0] + + version, descriptor, pth_restrictions, addr = the_template.split("\n") + assert version == BSMS_VERSION + try: + MultisigDescriptor.checksum_check(descriptor) + descriptor = descriptor.split("#")[0] + except ValueError: + pass + # replace /** so we can parse it + descriptor = descriptor.replace("/**", "/0/*") + descriptor = append_checksum(descriptor) + desc_obj = MultisigDescriptor.parse(descriptor) + assert len(desc_obj.keys) == N + assert pth_restrictions == ALLOWED_PATH_RESTRICTIONS + # pycoin test ms address + address = ms_address_from_descriptor_pycoin(desc_obj) + assert addr == address + # bsms lib test ms address + address = ms_address_from_descriptor_bsms(desc_obj) + assert addr == address + + +@pytest.mark.parametrize("refuse", [True, False]) +@pytest.mark.parametrize("way", ["sd", "nfc", "vdisk"]) +@pytest.mark.parametrize("encryption_type", ["1", "2", "3"]) +@pytest.mark.parametrize("with_checksum", [True, False]) +@pytest.mark.parametrize("M_N", [(2,2), (3, 5), (15, 15)]) +@pytest.mark.parametrize("addr_fmt", ["p2wsh", "p2sh-p2wsh"]) +def test_signer_round2(refuse, way, encryption_type, M_N, addr_fmt, clear_ms, goto_home, need_keypress, pick_menu_item, + cap_menu, cap_story, microsd_path, settings_remove, nfc_read_text, virtdisk_path, settings_get, + make_coordinator_round2, nfc_write_text, virtdisk_wipe, microsd_wipe, with_checksum): + M, N = M_N + clear_ms() + virtdisk_wipe() + microsd_wipe() + desc_template, token = make_coordinator_round2(M, N, addr_fmt, encryption_type, way=way, add_checksum=with_checksum) + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Signer') + menu = cap_menu() + assert len(menu) == 2 + assert "Round 1" in menu + menu_item = "1 %s" % token[:4] + assert menu_item in menu + pick_menu_item(menu_item) + menu = cap_menu() + assert len(menu) == 3 + assert "Detail" in menu + assert "Delete" in menu + assert "Round 2" in menu + pick_menu_item("Detail") + time.sleep(0.1) + _, story = cap_story() + assert token in story + assert str(int(token, 16)) in story + need_keypress("y") + pick_menu_item("Round 2") + time.sleep(0.1) + _, story = cap_story() + if way == "sd": + if "Press (1) to import descriptor template file from SD Card" in story: + need_keypress("1") + # else no prompt if both NFC and vdisk disabled + elif way == "vdisk": + if "(2) to import from Virtual Disk" not in story: + pytest.skip("Vdisk disabled") + else: + need_keypress("2") + else: + # NFC + if "(3) to import via NFC" not in story: + pytest.skip("NFC disabled") + else: + need_keypress("3") + + if way == "nfc": + time.sleep(0.1) + nfc_write_text(desc_template.hex() if isinstance(desc_template, bytes) else desc_template) + time.sleep(0.1) + # TODO pytest.skip here as length of data is more than 250 + else: + suffix = ".txt" if encryption_type == "3" else ".dat" + time.sleep(0.1) + title, story = cap_story() + expect = ('Select file containing descriptor template from coordinator round 2. ' + 'File extension has to be "%s"' % suffix) + assert expect in story + need_keypress("y") + menu_item = bsms_cr2_fname(token, encryption_type == "2", suffix) + pick_menu_item(menu_item) + + time.sleep(0.1) + _, story = cap_story() + assert "Create new multisig wallet?" in story + assert "bsms" in story # part of the name + policy = "Policy: %d of %d" % (M, N) + assert policy in story + assert addr_fmt.upper() in story + ms_wal_name = story.split("\n\n")[1].split("\n")[-1].strip() + ms_wal_menu_item = "%d/%d: %s" % (M, N, ms_wal_name) + if refuse: + need_keypress("x") + time.sleep(0.1) + menu = cap_menu() + assert ms_wal_menu_item not in menu + bsms_settings = settings_get(BSMS_SETTINGS) + # signer round 2 NOT removed + assert bsms_settings.get(BSMS_SIGNER_SETTINGS) + else: + need_keypress("y") + time.sleep(0.1) + menu = cap_menu() + assert ms_wal_menu_item in menu + bsms_settings = settings_get(BSMS_SETTINGS) + # signer round 2 removed + assert not bsms_settings.get(BSMS_SIGNER_SETTINGS, None) + + +@pytest.mark.parametrize("token", [ + "f" * 15, + "f" * 17, + "0" * 31, + "0" * 33, +]) +@pytest.mark.parametrize("way", ["sd", "nfc", "vdisk", "manual"]) +def test_invalid_token_signer_round1(token, way, pick_menu_item, cap_story, need_keypress, nfc_write_text, microsd_path, + virtdisk_path, goto_home): + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Signer') + pick_menu_item('Round 1') + time.sleep(0.1) + title, story = cap_story() + if way == "manual": + need_keypress("2") # manual + need_keypress("2") # decimal + for num in str(int(token, 16)): + need_keypress(num) + need_keypress("y") + else: + if way != "nfc": + token_fname = "error.token" + path_func = virtdisk_path if way == "vdisk" else microsd_path + with open(path_func(token_fname), "w") as f: + f.write(token) + if way == "sd": + if "Press (1) to import token file from SD Card" in story: + need_keypress("1") + # else no prompt if both NFC and vdisk disabled + elif way == "nfc": + if "(4) to import via NFC" not in story: + pytest.skip("NFC disabled") + else: + need_keypress("4") + time.sleep(0.1) + nfc_write_text(token) + time.sleep(0.4) + else: + # virtual disk + if "(6) to import from Virtual Disk" not in story: + pytest.skip("Vdisk disabled") + else: + need_keypress("6") + + if way != "nfc": + time.sleep(0.2) + _, story = cap_story() + assert ('Select file containing the token to be imported. File extension has to be ".token" ' + 'and file has to contain single line with hex encoded token string.') in story + need_keypress("y") + time.sleep(0.1) + pick_menu_item(token_fname) + + time.sleep(0.1) + title, story = cap_story() + assert title == "FAILURE" + assert "BSMS signer round1 failed" in story + assert "Invalid token length. Expected 64 or 128 bits (16 or 32 hex characters)" in story + + +@pytest.mark.parametrize("failure", ["slip", "wrong_sig", "bsms_version"]) +@pytest.mark.parametrize("encryption_type", ["1", "2", "3"]) +def test_failure_coordinator_round2(encryption_type, make_coordinator_round1, make_signer_round1, microsd_wipe, cap_menu, + virtdisk_wipe, pick_menu_item, need_keypress, goto_home, cap_story, failure): + virtdisk_wipe() + microsd_wipe() + + def get_token(index): + if len(tokens) == 1 and encryption_type == "1": + token = tokens[0] + elif len(tokens) == 2 and encryption_type == "2": + token = tokens[index] + else: + token = "00" + return token + + if failure == "bsms_version": + kws = {failure: "BSMS 1.1"} + else: + kws = {failure: True} + tokens = make_coordinator_round1(2, 2, "p2wsh", encryption_type, way="sd", tokens_only=True) + for i in range(2): + token = get_token(i) + index = None + if encryption_type != "2": + index = i + 1 + make_signer_round1(token, "sd", purge_bsms=False, index=index, **kws) + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Coordinator') + menu = cap_menu() + assert len(menu) == 2 + coord_menu_item = coordinator_label(2, 2, "p2wsh", encryption_type, index=1) + assert coord_menu_item in menu + pick_menu_item(coord_menu_item) + pick_menu_item("Round 2") + time.sleep(0.1) + _, story = cap_story() + if "Press (1) to import co-signer round 1 files from SD Card" in story: + need_keypress("1") + need_keypress("y") # continue with manual file selection + suffix = ".txt" if encryption_type == "3" else ".dat" + for i, _ in enumerate(range(2), start=1): + token = get_token(i - 1) + time.sleep(0.1) + title, story = cap_story() + if encryption_type == "2": + expect = 'Select co-signer #%d file containing round 1 data for token starting with %s' % (i, token[:4]) + else: + expect = 'Select co-signer #%d file containing round 1 data' % i + expect += '. File extension has to be "%s"' % suffix + assert expect in story + need_keypress("y") + menu_item = bsms_sr1_fname(token, encryption_type == "2", suffix, i) + pick_menu_item(menu_item) + time.sleep(0.1) + title, story = cap_story() + assert title == "FAILURE" + assert "BSMS coordinator round2 failed" in story + if failure == "slip": + failure_msg = "Expected tpub" + elif failure == "wrong_sig": + failure_msg = "Recovered key from signature does not equal key provided. Wrong signature?" + else: + failure_msg = "Incompatible BSMS version. Need BSMS 1.0 got BSMS 1.1" + assert failure_msg in story + + +# TODO do this for NFC too when length requirements are lifted from 250 +@pytest.mark.parametrize("encryption_type", ["1", "2"]) +def test_wrong_encryption_coordinator_round2(encryption_type, make_coordinator_round1, make_signer_round1, microsd_wipe, + cap_menu, virtdisk_wipe, pick_menu_item, need_keypress, goto_home, cap_story): + def get_token(index): + if len(tokens) == 1 and encryption_type == "1": + token = tokens[0] + elif len(tokens) == 2 and encryption_type == "2": + token = tokens[index] + else: + token = "00" + return token + + virtdisk_wipe() + microsd_wipe() + tokens = make_coordinator_round1(2, 2, "p2wsh", encryption_type, way="sd", tokens_only=True) + for i in range(2): + token = get_token(i) + index = None + if encryption_type == "1": + index = i + 1 + make_signer_round1(token, "sd", purge_bsms=False, index=index, wrong_encryption=True) + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Coordinator') + menu = cap_menu() + assert len(menu) == 2 + coord_menu_item = coordinator_label(2, 2, "p2wsh", encryption_type, index=1) + assert coord_menu_item in menu + pick_menu_item(coord_menu_item) + pick_menu_item("Round 2") + time.sleep(0.1) + _, story = cap_story() + if "Press (1) to import co-signer round 1 files from SD Card" in story: + need_keypress("1") + need_keypress("y") # continue with manual file selection + suffix = ".txt" if encryption_type == "3" else ".dat" + for i, _ in enumerate(range(2), start=1): + for attempt in range(2): + token = get_token(i - 1) + time.sleep(0.1) + title, story = cap_story() + if encryption_type == "2": + expect = 'Select co-signer #%d file containing round 1 data for token starting with %s' % (i, token[:4]) + else: + expect = 'Select co-signer #%d file containing round 1 data' % i + expect += '. File extension has to be "%s"' % suffix + assert expect in story + need_keypress("y") + menu_item = bsms_sr1_fname(token, encryption_type == "2", suffix, i) + pick_menu_item(menu_item) + time.sleep(0.1) + _, story = cap_story() + expect_story = "Decryption failed for co-signer #%d" % i + if encryption_type == 2: + expect_story += " with token %s" % token[:4] + assert expect_story in story + if attempt == 0: + assert "Try again?" in story + need_keypress("y") + else: + assert "Try again?" not in story + need_keypress("x") + break + break + + +@pytest.mark.parametrize("failure", [ + "wrong_address", "path_restrictions", "bsms_version", "sortedmulti", "has_ours", "ours_no", + "wrong_encryption", "wrong_chain", "wrong_checksum" +]) +@pytest.mark.parametrize("encryption_type", ["1", "2", "3"]) +def test_failure_signer_round2(encryption_type, goto_home, need_keypress, pick_menu_item, cap_menu, cap_story, + microsd_path, settings_remove, nfc_read_text, virtdisk_path, settings_get, microsd_wipe, + make_coordinator_round2, virtdisk_wipe, failure): + virtdisk_wipe() + microsd_wipe() + if failure == "wrong_address": + kws = {failure: True} + failure_msg = "Address mismatch!" + elif failure == "path_restrictions": + kws = {failure: "5/*,4/*"} + failure_msg = "Only '/0/*,/1/*' allowed as path restrictions." + elif failure == "bsms_version": + kws = {failure: "BSMS 2.0"} + failure_msg = "Incompatible BSMS version. Need BSMS 1.0 got BSMS 2.0" + elif failure == "sortedmulti": + kws = {failure: False} + failure_msg = "Unsupported descriptor. Supported: sh(, sh(wsh(, wsh(. All have to be sortedmulti." + elif failure == "has_ours": + kws = {failure: False} + failure_msg = "My key 0F056943 missing in descriptor." + elif failure == "ours_no": + kws = {failure: 2} + failure_msg = "Multiple 0F056943 keys in descriptor (2)" + elif failure == "wrong_chain": + kws = {failure: True} + failure_msg = "Expected tpub" + elif failure == "wrong_checksum": + kws = {failure: True} + failure_msg = "Wrong checksum" + else: + assert failure == "wrong_encryption" + if encryption_type == "3": + pytest.skip("Cannot test wrong encryption on unencrypted BSMS") + kws = {failure: True} + failure_msg = "Decryption with token {token} failed." + + desc_template, token = make_coordinator_round2(2, 2, "p2wsh", encryption_type, way="sd", **kws) + failure_msg = failure_msg.format(token=token[:4]) + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Signer') + menu_item = "1 %s" % token[:4] + pick_menu_item(menu_item) + pick_menu_item("Round 2") + time.sleep(0.1) + _, story = cap_story() + if "Press (1) to import descriptor template file from SD Card" in story: + need_keypress("1") + + suffix = ".txt" if encryption_type == "3" else ".dat" + time.sleep(0.1) + title, story = cap_story() + expect = ('Select file containing descriptor template from coordinator round 2. ' + 'File extension has to be "%s"' % suffix) + assert expect in story + need_keypress("y") + menu_item = bsms_cr2_fname(token, encryption_type == "2", suffix) + pick_menu_item(menu_item) + time.sleep(0.1) + title, story = cap_story() + assert title == "FAILURE" + assert "BSMS signer round2 failed" in story + assert failure_msg in story + + +@pytest.mark.parametrize("encryption_type", ["1", "2", "3"]) +@pytest.mark.parametrize("M_N", [(2,2), (3, 5), (15, 15)]) +@pytest.mark.parametrize("addr_fmt", ["p2wsh", "p2sh-p2wsh"]) +def test_integration_signer(encryption_type, M_N, addr_fmt, clear_ms, microsd_wipe, goto_home, pick_menu_item, cap_story, + need_keypress, settings_remove, microsd_path, settings_get, cap_menu, use_mainnet): + # test CC signer full with bsms lib coordinator (test just SD card no need to retest IO paths again - tested above) + def get_token(index): + if len(tokens) == 1 and encryption_type == "1": + token = tokens[0] + elif len(tokens) == N and encryption_type == "2": + token = tokens[index] + else: + token = "00" + return token + + M, N = M_N + settings_remove(BSMS_SETTINGS) + use_mainnet() + clear_ms() + microsd_wipe() + coordinator = CoordinatorSession(M, N, addr_fmt, et_map[encryption_type]) + session_data = coordinator.generate_token_key_pairs() + tokens = [x[0] for x in session_data] + cc_token = get_token(0) + other_signers = [] + for i in range(1, N): + other_signers.append(Signer(token=get_token(i), key_description="Other signer %d" % i)) + # ROUND 1 + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Signer') + pick_menu_item('Round 1') + time.sleep(0.1) + _, story = cap_story() + if encryption_type == "3": + need_keypress("3") # no token (unencrypted BSMS) + else: + fname = "bsms_%s.token" % cc_token[:4] if cc_token != "00" else "1" + with open(microsd_path(fname), "w") as f: + f.write(cc_token) + if "Press (1) to import token file from SD Card" in story: + need_keypress("1") + time.sleep(0.2) + _, story = cap_story() + assert ('Select file containing the token to be imported. File extension has to be ".token" ' + 'and file has to contain single line with hex encoded token string.') in story + need_keypress("y") + fname = "bsms_%s.token" % cc_token[:4] + pick_menu_item(fname) + + time.sleep(0.1) + title, story = cap_story() + assert "You have entered token:\n%s" % cc_token in story + need_keypress("y") + time.sleep(0.1) + _, story = cap_story() + # address format a.k.a. SLIP derivation path - ignore and use SLIP agnostic + assert "Choose co-signer address format for correct SLIP derivation path" in story + need_keypress("y") + # account number prompt + need_keypress("y") + time.sleep(0.1) + _, story = cap_story() + # textual key description + assert "Choose key description" in story + need_keypress("y") # default + time.sleep(0.1) + title, story = cap_story() + suffix = ".txt" if encryption_type == "3" else ".dat" + mode = "rt" if encryption_type == "3" else "rb" + if "Press (1) to save BSMS signer round 1 file to SD Card" in story: + need_keypress("1") + time.sleep(0.2) + _, story = cap_story() + assert 'BSMS signer round 1 file written' in story + fname = story.split('\n\n')[-1] + assert suffix in fname + path = microsd_path(fname) + with open(path, mode) as f: + signer_r1 = f.read() + + bsms = settings_get(BSMS_SETTINGS) + assert len(bsms[BSMS_SIGNER_SETTINGS]) == 1 + assert bsms[BSMS_SIGNER_SETTINGS][0] == cc_token + + # ROUND 2 + all_r1_data = [signer_r1.hex() if encryption_type != "3" else signer_r1] + for s in other_signers: + all_r1_data.append(s.round_1()) + + descriptor_templates = coordinator.round_2(all_r1_data) + if encryption_type == "2": + assert len(descriptor_templates) == N + for signer, tmplt in zip(other_signers, descriptor_templates[1:]): + signer.round_2(tmplt) + else: + assert len(descriptor_templates) == 1 + for signer in other_signers: + signer.round_2(descriptor_templates[0]) + + cc_desc_template = descriptor_templates[0] # zeroeth as our token is zero too + suffix = ".txt" if encryption_type == "3" else ".dat" + mode = "wt" if encryption_type == "3" else "wb" + fname = bsms_cr2_fname(cc_token, encryption_type == "2", suffix) + with open(microsd_path(fname), mode) as f: + f.write(bytes.fromhex(cc_desc_template) if mode == "wb" else cc_desc_template) + time.sleep(0.1) + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Signer') + menu_item = "1 %s" % cc_token[:4] + pick_menu_item(menu_item) + pick_menu_item("Round 2") + time.sleep(0.1) + _, story = cap_story() + if "Press (1) to import descriptor template file from SD Card" in story: + need_keypress("1") + time.sleep(0.1) + title, story = cap_story() + expect = ('Select file containing descriptor template from coordinator round 2. ' + 'File extension has to be "%s"' % suffix) + assert expect in story + need_keypress("y") + menu_item = bsms_cr2_fname(cc_token, encryption_type == "2", suffix) + pick_menu_item(menu_item) + time.sleep(0.1) + title, story = cap_story() + assert "Create new multisig wallet?" in story + assert "bsms" in story # part of the name + policy = "Policy: %d of %d" % (M, N) + assert policy in story + assert addr_fmt.upper() in story + ms_wal_name = story.split("\n\n")[1].split("\n")[-1].strip() + ms_wal_menu_item = "%d/%d: %s" % (M, N, ms_wal_name) + need_keypress("y") + time.sleep(0.1) + menu = cap_menu() + assert ms_wal_menu_item in menu + bsms_settings = settings_get(BSMS_SETTINGS) + # signer round 2 removed + assert not bsms_settings.get(BSMS_SIGNER_SETTINGS, None) + + +@pytest.mark.parametrize("encryption_type", ["1", "2", "3"]) +@pytest.mark.parametrize("M_N", [(2,2), (3, 5), (15, 15)]) +@pytest.mark.parametrize("addr_fmt", ["p2wsh", "p2sh-p2wsh"]) +@pytest.mark.parametrize("cr1_shortcut", [True, False]) +def test_integration_coordinator(encryption_type, M_N, addr_fmt, clear_ms, microsd_wipe, goto_home, pick_menu_item, + cap_story, need_keypress, settings_remove, microsd_path, settings_get, cap_menu, + use_mainnet, cr1_shortcut): + M, N = M_N + settings_remove(BSMS_SETTINGS) + use_mainnet() + clear_ms() + microsd_wipe() + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Coordinator') + menu = cap_menu() + assert len(menu) == 1 # nothing should be in menu at this point but round 1 + pick_menu_item('Create BSMS') + # choose number of signers N + for num in str(N): + need_keypress(num) + need_keypress("y") + # choose threshold M + for num in str(M): + need_keypress(num) + need_keypress("y") + if addr_fmt == "p2wsh": + need_keypress("y") + else: + need_keypress("2") + time.sleep(0.1) + title, story = cap_story() + assert story == "Choose encryption type. Press (1) for STANDARD encryption, (2) for EXTENDED, and (3) for no encryption" + need_keypress(encryption_type) + time.sleep(0.1) + title, story = cap_story() + assert_coord_summary(title, story, M, N, addr_fmt, encryption_type) + need_keypress("y") # confirm summary + time.sleep(0.1) + title, story = cap_story() + assert "Press (1) to participate as co-signer in this BSMS" in story + if cr1_shortcut: + _start_idx = 1 + need_keypress("1") + need_keypress("y") # slip + need_keypress("y") # acct num 0 + need_keypress("y") # default textual key description + time.sleep(0.1) + _, story = cap_story() + if "Press (1) to save BSMS signer round 1 file to SD Card" in story: + need_keypress("1") + time.sleep(0.2) + _, story = cap_story() + shortcut_fname = story.split("\n\n")[-1] + need_keypress("y") # looking at save sr1 filename + else: + _start_idx = 0 + need_keypress("y") # continue normally + + time.sleep(0.1) + title, story = cap_story() + read_tokens = [] + if encryption_type == "3": + assert story == "Success. Coordinator round 1 saved." + else: + if "Press (1) to save BSMS token file(s) to SD Card" in story: + need_keypress("1") + time.sleep(0.2) + _, story = cap_story() + assert 'BSMS token file(s) written' in story + fnames = story.split('\n\n')[2:] + for fname in fnames: + path = microsd_path(fname) + with open(path, 'rt') as f: + tok = f.read().strip() + read_tokens.append(tok) + + all_signers = [] + if encryption_type == "1": + assert len(read_tokens) == 1 + for i in range(_start_idx, N): + all_signers.append(Signer(read_tokens[0], "key %d" % i)) + elif encryption_type == "2": + assert len(read_tokens) == (N - _start_idx) + for i in range(N - _start_idx): + all_signers.append(Signer(read_tokens[i], "key %d" % i)) + else: + assert len(read_tokens) == 0 + for i in range(N - _start_idx): + all_signers.append(Signer("00", "key %d" % i)) + + need_keypress("y") # confirm success or files written story + time.sleep(0.1) + menu = cap_menu() + assert len(menu) == 2 + current_coord_menu_item = coordinator_label(M, N, addr_fmt, encryption_type, index=1) + assert menu[0] == current_coord_menu_item + # check correct coord tuple saved + bsms_settings = settings_get(BSMS_SETTINGS) + if BSMS_SIGNER_SETTINGS in bsms_settings: + if cr1_shortcut: + assert len(bsms_settings[BSMS_SIGNER_SETTINGS]) == 1 + shortcut_token = bsms_settings[BSMS_SIGNER_SETTINGS][0] + else: + assert bsms_settings[BSMS_SIGNER_SETTINGS] == [] + shortcut_token = None + coord_settings = bsms_settings[BSMS_COORD_SETTINGS] + assert len(coord_settings) == 1 + if read_tokens: + expect_tokens = [tok.split(" ")[-1] for tok in read_tokens] + if cr1_shortcut and encryption_type == "2": + expect_tokens = [shortcut_token] + expect_tokens + else: + expect_tokens = [] + assert coord_settings[0] == (M, N, af_map[addr_fmt], encryption_type, expect_tokens) + + # ROUND 2 + def get_token(index): + if len(read_tokens) == 1 and encryption_type == "1": + token = read_tokens[0] + elif encryption_type == "2": + token = read_tokens[index] + else: + token = "00" + return token + + all_r1_signer_data = [s.round_1() for s in all_signers] + mode = "wt" if encryption_type == "3" else "wb" + suffix = ".txt" if encryption_type == "3" else ".dat" + for i, data in enumerate(all_r1_signer_data, start=1): + token = get_token(i - 1) + fname = bsms_sr1_fname(token, encryption_type == "2", suffix, i) + with open(microsd_path(fname), mode) as f: + f.write(bytes.fromhex(data) if mode == "wb" else data) + + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Coordinator') + menu = cap_menu() + assert len(menu) == 2 + coord_menu_item = coordinator_label(M, N, addr_fmt, encryption_type, index=1) + assert coord_menu_item in menu + pick_menu_item(coord_menu_item) + pick_menu_item("Round 2") + time.sleep(0.1) + _, story = cap_story() + if "Press (1) to import co-signer round 1 files from SD Card" in story: + need_keypress("1") + need_keypress("y") # continue with manual file selection + # import pdb;pdb.set_trace() + if cr1_shortcut: + time.sleep(0.1) + title, story = cap_story() + if encryption_type == "2": + expect = 'Select co-signer #1 file containing round 1 data for token starting with %s' % shortcut_token[:4] + else: + expect = 'Select co-signer #1 file containing round 1 data' + assert expect in story + need_keypress("y") + pick_menu_item(shortcut_fname) + for i in range(_start_idx, N): + token = get_token(i - _start_idx) + time.sleep(0.1) + title, story = cap_story() + if encryption_type == "2": + expect = 'Select co-signer #%d file containing round 1 data for token starting with %s' % (i + 1, token[:4]) + else: + expect = 'Select co-signer #%d file containing round 1 data' % (i + 1) + expect += '. File extension has to be "%s"' % suffix + assert expect in story + need_keypress("y") + fname = bsms_sr1_fname(token, encryption_type == "2", suffix, i + 1 - _start_idx) + pick_menu_item(fname) + + time.sleep(0.1) + _, story = cap_story() + if "Press (1) to save BSMS descriptor template file(s) to SD Card" in story: + need_keypress("1") + time.sleep(0.1) + _, story = cap_story() + assert "BSMS descriptor template file(s) written." in story + fnames = story.split("\n\n")[1:] + if encryption_type == "2": + if cr1_shortcut: + read_tokens = [shortcut_token] + read_tokens + for fname, token in zip(fnames, read_tokens): + assert token[:4] in fname + descriptor_templates = [] + for fname in fnames: + with open(microsd_path(fname), "rt" if encryption_type == "3" else "rb") as f: + desc_temp = f.read() + descriptor_templates.append(desc_temp) + if len(descriptor_templates) == 1: + target = descriptor_templates[0] + if isinstance(target, bytes): + target = target.hex() + for signer in all_signers: + signer.round_2(target) + else: + if cr1_shortcut: + _, descriptor_templates = descriptor_templates[0], descriptor_templates[1:] + for signer, desc_tmplt in zip(all_signers, descriptor_templates): + if isinstance(desc_tmplt, bytes): + desc_tmplt = desc_tmplt.hex() + signer.round_2(desc_tmplt) + if cr1_shortcut: + # still need to add our signer + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + need_keypress("y") + pick_menu_item('Signer') + menu_item = "1 %s" % shortcut_token[:4] + pick_menu_item(menu_item) + pick_menu_item("Round 2") + time.sleep(0.1) + _, story = cap_story() + if "Press (1) to import descriptor template file from SD Card" in story: + need_keypress("1") + time.sleep(0.1) + title, story = cap_story() + expect = ('Select file containing descriptor template from coordinator round 2. ' + 'File extension has to be "%s"' % suffix) + assert expect in story + need_keypress("y") + pick_menu_item(fnames[0]) + time.sleep(0.1) + title, story = cap_story() + assert "Create new multisig wallet?" in story + assert "bsms" in story # part of the name + policy = "Policy: %d of %d" % (M, N) + assert policy in story + assert addr_fmt.upper() in story + ms_wal_name = story.split("\n\n")[1].split("\n")[-1].strip() + ms_wal_menu_item = "%d/%d: %s" % (M, N, ms_wal_name) + need_keypress("y") + time.sleep(0.1) + menu = cap_menu() + assert ms_wal_menu_item in menu + bsms_settings = settings_get(BSMS_SETTINGS) + # signer round 2 removed + assert not bsms_settings.get(BSMS_SIGNER_SETTINGS, None) + + + +@pytest.mark.parametrize("encryption_type", ["1", "2", "3"]) +@pytest.mark.parametrize("M_N", [(2, 2), (3, 5), (15, 15)]) +def test_auto_collection_coordinator_r2(encryption_type, M_N, goto_home, need_keypress, pick_menu_item, microsd_wipe, + cap_story, microsd_path,make_coordinator_round1, make_signer_round1): + M, N = M_N + microsd_wipe() + + def get_token(index): + if len(tokens) == 1 and encryption_type == "1": + token = tokens[0] + elif len(tokens) == N and encryption_type == "2": + token = tokens[index] + else: + token = "00" + return token + + # add twice as many files with different tokens - should be still able to collect the correct ones + f_pattern = "bsms_sr1" + if encryption_type == "2": + suffix = ".dat" + for i in range(N): + token = os.urandom(16).hex() + s = Signer(token=token, key_description="key%d" % i) + r1 = s.round_1() + fname = "%s_%s%s" % (f_pattern, token[:4], suffix) + with open(microsd_path(fname), "wb") as f: + f.write(bytes.fromhex(r1)) + + elif encryption_type == "1": + suffix = ".dat" + for i in range(N): + token = os.urandom(8).hex() + s = Signer(token=token, key_description="key%d" % i) + r1 = s.round_1() + fname = "%s%s" % (f_pattern, suffix) + with open(microsd_path(fname), "wb") as f: + f.write(bytes.fromhex(r1)) + + else: + suffix = ".txt" + for i in range(N): + s = Signer(token="00", key_description="key%d" % i) + r1 = s.round_1() + fname = "%s%s" % (f_pattern, suffix) + with open(microsd_path(fname), "w") as f: + f.write(r1) + + tokens = make_coordinator_round1(M, N, "p2wsh", encryption_type, way="sd", tokens_only=True) + all_data = [] + for i in range(N): + token = get_token(i) + index = None + if encryption_type == "1": + index = i + 1 + all_data.append(make_signer_round1(token, "sd", purge_bsms=False, index=index)) + goto_home() + pick_menu_item('Settings') + pick_menu_item('Multisig Wallets') + pick_menu_item('BSMS (BIP-129)') + title, story = cap_story() + assert "Bitcoin Secure Multisig Setup (BIP-129) is a mechanism to securely create multisig wallets." in story + assert "WARNING: BSMS is an EXPERIMENTAL and BETA feature" in story + need_keypress("y") + pick_menu_item('Coordinator') + coord_menu_item = coordinator_label(M, N, "p2wsh", encryption_type, index=1) + pick_menu_item(coord_menu_item) + pick_menu_item("Round 2") + time.sleep(0.1) + _, story = cap_story() + if "Press (1) to import co-signer round 1 files from SD Card" in story: + need_keypress("1") + need_keypress("1") # auto-collection + time.sleep(0.1) + title, story = cap_story() + if encryption_type == "3": + # we need exact number of files for unencrypted as we would have no idea which are part of this multisig setup + assert "Auto-collection failed. Defaulting to manual selection of files." in story + else: + if "Press (1) to save BSMS descriptor template file(s) to SD Card" in story: + # if NFC or Vdisk enabled - but means auto-collection was successful and we are prompted where to + # save the resulting descriptor (coordinator round2 data) + assert True + else: + # NFC and Vdisk disabled, automatically written to SD card - success + assert "BSMS descriptor template file(s) written" in story \ No newline at end of file