From e8ba25fd04e85ddf127e868198cf000c2f2d5f53 Mon Sep 17 00:00:00 2001 From: scgbckbone Date: Wed, 21 May 2025 13:30:30 +0200 Subject: [PATCH] bugfix: ownership check needed re-run for values near max --- releases/Next-ChangeLog.md | 1 + shared/address_explorer.py | 20 +-- shared/display.py | 6 +- shared/drv_entro.py | 2 +- shared/export.py | 8 +- shared/multisig.py | 26 ++-- shared/notes.py | 2 +- shared/ownership.py | 211 ++++++++++++++++------------- shared/paper.py | 2 +- shared/utils.py | 14 +- testing/test_ownership.py | 269 +++++++++++++++++++++++++++++++++---- 11 files changed, 399 insertions(+), 162 deletions(-) diff --git a/releases/Next-ChangeLog.md b/releases/Next-ChangeLog.md index ca78e603..36bdade3 100644 --- a/releases/Next-ChangeLog.md +++ b/releases/Next-ChangeLog.md @@ -28,6 +28,7 @@ Spending policies for "Single Signers" adds new spending policy options: - Bugfix: Fix filesystem initialization after Wife LFS or Destroy Seed. - Bugfix: Fix MicroSD selftest code. - Bugfix: NFC loop exporting secrets would not work after first value exported. +- Bugfix: Ownership check failing to find addresses near max (~760), needed to be re-run to succeed # Mk4 Specific Changes diff --git a/shared/address_explorer.py b/shared/address_explorer.py index dfa669a7..63026a93 100644 --- a/shared/address_explorer.py +++ b/shared/address_explorer.py @@ -429,14 +429,11 @@ def generate_address_csv(path, addr_fmt, ms_wallet, account_num, n, start=0, cha + ['Derivation (%d of %d)' % (i+1, ms_wallet.N) for i in range(ms_wallet.N)] ) + '"\n' - if (start == 0) and (n > 100) and change in (0, 1): - saver = OWNERSHIP.saver(ms_wallet, change, start) - else: - saver = None + saver = OWNERSHIP.saver(ms_wallet, change, start, n) for (idx, addr, derivs, script) in ms_wallet.yield_addresses(start, n, change_idx=change): if saver: - saver(addr) + saver(addr, idx) # policy choice: never provide a complete multisig address to user. addr = censor_address(addr) @@ -448,7 +445,7 @@ def generate_address_csv(path, addr_fmt, ms_wallet, account_num, n, start=0, cha yield ln if saver: - saver(None) # close file + saver(None, 0) # close cache file return @@ -456,20 +453,17 @@ def generate_address_csv(path, addr_fmt, ms_wallet, account_num, n, start=0, cha from wallet import MasterSingleSigWallet main = MasterSingleSigWallet(addr_fmt, path, account_num) - if n and (start == 0) and (n > 100) and change in (0, 1): - saver = OWNERSHIP.saver(main, change, start) - else: - saver = None + saver = OWNERSHIP.saver(main, change, start, n) yield '"Index","Payment Address","Derivation"\n' for (idx, addr, deriv) in main.yield_addresses(start, n, change_idx=change): if saver: - saver(addr) + saver(addr, idx) yield '%d,"%s","%s"\n' % (idx, addr, deriv) if saver: - saver(None) # close + saver(None, 0) # close cache file async def make_address_summary_file(path, addr_fmt, ms_wallet, account_num, start=0, count=250, change=0, **save_opts): @@ -516,7 +510,7 @@ async def make_address_summary_file(path, addr_fmt, ms_wallet, account_num, except CardMissingError: await needs_microsd() except Exception as e: - await ux_show_story('Failed to write!\n\n\n%s\n%s' % (e, problem_file_line(e))) + await ux_show_story('Failed to write!\n\n%s\n%s' % (e, problem_file_line(e))) async def address_explore(*a): diff --git a/shared/display.py b/shared/display.py index 80352a1d..ab1a2cc1 100644 --- a/shared/display.py +++ b/shared/display.py @@ -150,11 +150,15 @@ class Display: def fullscreen(self, msg, percent=None, line2=None): # show a simple message "fullscreen". - # - 'line2' not supported on smaller screen sizes, ignore self.clear() y = 14 self.text(None, y, msg, font=FontLarge) + if line2: + y += FontLarge.height # add height of above + y += FontTiny.height # add space of size FontTiny height + self.text(None, y, line2, font=FontSmall) + if percent is not None: self.progress_bar(percent) self.show() diff --git a/shared/drv_entro.py b/shared/drv_entro.py index f9c175b4..8f52d94e 100644 --- a/shared/drv_entro.py +++ b/shared/drv_entro.py @@ -241,7 +241,7 @@ async def drv_entro_step2(_1, picked, _2, just_pick=False): await needs_microsd() continue except Exception as e: - await ux_show_story('Failed to write!\n\n\n'+str(e)) + await ux_show_story('Failed to write!\n\n'+str(e)) continue story = "Filename is:\n\n%s" % out_fn diff --git a/shared/export.py b/shared/export.py index 19579059..320b45dc 100644 --- a/shared/export.py +++ b/shared/export.py @@ -91,10 +91,10 @@ async def export_contents(title, contents, fname_pattern, derive=None, addr_fmt= await ux_show_story(msg) - except CardMissingError: - await needs_microsd() - except Exception as e: - await ux_show_story('Failed to write!\n\n\n' + str(e)) + except CardMissingError: + await needs_microsd() + except Exception as e: + await ux_show_story('Failed to write!\n\n' + str(e)) # both exceptions & success gets here if no_qr and (NFC is None) and (VD is None) and not force_prompt: diff --git a/shared/multisig.py b/shared/multisig.py index 446998a2..b17d0442 100644 --- a/shared/multisig.py +++ b/shared/multisig.py @@ -245,9 +245,10 @@ class MultisigWallet(WalletABC): return rv @classmethod - def iter_wallets(cls, M=None, N=None, not_idx=None, addr_fmt=None): + def iter_wallets(cls, M=None, N=None, not_idx=None, addr_fmts=None): # yield MS wallets we know about, that match at least right M,N if known. # - this is only place we should be searching this list, please!! + # addr_fmts: list of address formats we're intersted in lst = settings.get('multisig', []) for idx, rec in enumerate(lst): @@ -261,11 +262,11 @@ class MultisigWallet(WalletABC): if M is not None and has_m != M: continue if N is not None and has_n != N: continue - if addr_fmt is not None: + if addr_fmts: opts = rec[3] af = opts.get('ft', AF_P2SH) - if af != addr_fmt: continue - + if af not in addr_fmts: continue + yield cls.deserialize(rec, idx) def get_xfp_paths(self): @@ -273,28 +274,23 @@ class MultisigWallet(WalletABC): return list(self.xfp_paths.values()) @classmethod - def find_match(cls, M, N, xfp_paths, addr_fmt=None): + def find_match(cls, M, N, xfp_paths, addr_fmts=None): # Find index of matching wallet # - xfp_paths is list of lists: [xfp, *path] like in psbt files # - M and N must be known # - returns instance, or None if not found - for rv in cls.iter_wallets(M, N, addr_fmt=addr_fmt): + for rv in cls.iter_wallets(M, N, addr_fmts=addr_fmts): if rv.matching_subpaths(xfp_paths): return rv return None @classmethod - def find_candidates(cls, xfp_paths, addr_fmt=None, M=None): + def find_candidates(cls, xfp_paths): # Return a list of matching wallets for various M values. # - xpfs_paths should already be sorted - # - returns set of matches, of any M value - - # we know N, but not M at this point. - N = len(xfp_paths) - matches = [] - for rv in cls.iter_wallets(M=M, addr_fmt=addr_fmt): + for rv in cls.iter_wallets(): if rv.matching_subpaths(xfp_paths): matches.append(rv) @@ -408,7 +404,7 @@ class MultisigWallet(WalletABC): # - count_similar: same N, same xfp+paths lst = self.get_xfp_paths() - c = self.find_match(self.M, self.N, lst, addr_fmt=self.addr_fmt) + c = self.find_match(self.M, self.N, lst, addr_fmts=[self.addr_fmt]) if c: # All details are same: M/N, paths, addr fmt if sorted(self.xpubs) != sorted(c.xpubs): @@ -454,7 +450,7 @@ class MultisigWallet(WalletABC): assert self.storage_idx >= 0 # safety check - for existing in self.iter_wallets(M=self.M, N=self.N, addr_fmt=self.addr_fmt): + for existing in self.iter_wallets(M=self.M, N=self.N, addr_fmts=[self.addr_fmt]): if existing.storage_idx != self.storage_idx: continue break else: diff --git a/shared/notes.py b/shared/notes.py index f5ef7f78..d63bae46 100644 --- a/shared/notes.py +++ b/shared/notes.py @@ -620,7 +620,7 @@ async def start_export(notes): await needs_microsd() return except Exception as e: - await ux_show_story('Failed to write!\n\n\n'+str(e)) + await ux_show_story('Failed to write!\n\n'+str(e)) return msg = 'Export file written:\n\n%s\n\nSignature file written:\n\n%s' % ( diff --git a/shared/ownership.py b/shared/ownership.py index 94a2ef9b..ef680a5a 100644 --- a/shared/ownership.py +++ b/shared/ownership.py @@ -2,7 +2,7 @@ # # ownership.py - store a cache of hashes related to addresses we might control. # -import os, sys, chains, ngu, struct, version +import os, chains, ngu, struct, version from glob import settings from ucollections import namedtuple from ubinascii import hexlify as b2a_hex @@ -11,8 +11,7 @@ from utils import problem_file_line, show_single_address # Track many addresses, but in compressed form # - map from random Bech32/Base58 payment address to (wallet) + keypath -# - only normal (external, not change) addresses, and won't consider -# any keypath that does not end in 0/* +# - won't consider any keypath that does not end in <0;1>/* # - store only hints, since we can re-construct any address and want to fully verify # - try to keep private between different duress wallets, and seed vaults # - storing bulk data into LFS, not settings @@ -39,7 +38,6 @@ OWNERSHIP_MAGIC = 0x10A0 # "Address Ownership" v1.0 # target 3 flash blocks, max file size => 764 addresses MAX_ADDRS_STORED = const(764) # =((3*512) - OWNERSHIP_FILE_HDR_LEN) // HASH_ENC_LEN -BONUS_GAP_LIMIT = const(20) def encode_addr(addr, salt): # Convert text address to something we can store while preserving privacy. @@ -56,6 +54,7 @@ class AddressCacheFile: self.salt = h[32:] self.count = 0 self.hdr = None + self.fd = None self.peek() @@ -65,9 +64,6 @@ class AddressCacheFile: rv += ' (change)' return rv - def exists(self): - return bool(self.count) - def peek(self): # see what we have on-disk; just reads header. try: @@ -105,15 +101,14 @@ class AddressCacheFile: self.fd.write(hdr) def append(self, addr): - if addr is None: - # close file, done - self.fd.close() - del self.fd - return - - assert '_' not in addr self.fd.write(encode_addr(addr, self.salt)) + def close(self): + # close file, done + if self.fd is not None: + self.fd.close() + self.fd = None + def fast_search(self, addr): # Do the easy part of the searching, using the existing file's contents. # - generates candidate path subcomponents; might be false positive @@ -121,6 +116,7 @@ class AddressCacheFile: from glob import dis if not self.hdr or not self.count: + # cache empty return with open(self.fname, 'rb') as fd: @@ -132,7 +128,7 @@ class AddressCacheFile: chk = encode_addr(addr, self.salt) for idx in range(self.count): if buf[idx*HASH_ENC_LEN : (idx*HASH_ENC_LEN)+HASH_ENC_LEN] == chk: - yield (self.change_idx, idx) + yield self.change_idx, idx dis.progress_sofar(idx, self.count) @@ -148,68 +144,78 @@ class AddressCacheFile: # - return subpath for a hit or None from glob import dis - bonus = 0 match = None start_idx = self.count count = MAX_ADDRS_STORED - start_idx if count <= 0: - return None + return match self.setup(self.change_idx, start_idx) + bonus = None for idx,here,*_ in self.wallet.yield_addresses(start_idx, count, - change_idx=self.change_idx): - - if here == addr: - # Found it! But keep going a little for next time. - match = (self.change_idx, idx) - + change_idx=self.change_idx): self.append(here) self.count += 1 - if match: + + if bonus: + if bonus >= 20: + # do (at most) 20 more - limited by 'start_idx' & 'count' + break bonus += 1 - if match and bonus >= BONUS_GAP_LIMIT: - self.append(None) - return match - dis.progress_sofar(idx-start_idx, count) + if here == addr: + # match but keep going + match = (self.change_idx, idx) + bonus = 1 - self.append(None) + dis.progress_sofar(idx - start_idx, count) - return None + self.close() + return match class OwnershipCache: @classmethod - def saver(cls, wallet, change_idx, start_idx): - # when we are generating many addresses for export, capture them + def saver(cls, wallet, change_idx, start_idx, count): + # when we are generating many addresses for export, capture them (if suitable) # as we go with this function - # - not change -- only main addrs + if not count: + return + if change_idx not in (0, 1): + return + if start_idx >= MAX_ADDRS_STORED: + return + file = AddressCacheFile(wallet, change_idx) + current_pos = file.count - if file.exists(): - # don't save to existing file, has some already - return None + if start_idx > current_pos: + # nothing to do here, we are missing some addresses in the middle + return + if (start_idx + count) <= current_pos: + # we already have all these addresses + return - try: - file.setup(change_idx, start_idx) - except: - # in some cases we don't want to save anything, not an error - return None + file.setup(change_idx, current_pos) - return file.append + def doit(addr, idx): + if addr is None: + file.close() + elif (idx < MAX_ADDRS_STORED) and idx >= current_pos: + file.append(addr) + + return doit @classmethod - def search(cls, addr): - # Find it! - # - returns wallet object, and tuple2 of final 2 subpath components + def filter(cls, addr): + # Filter possible candidates! # - if you start w/ testnet, we'll follow that from multisig import MultisigWallet from public_constants import AFC_SCRIPT, AF_P2WPKH_P2SH, AF_P2SH, AF_P2WSH_P2SH - from glob import dis ch = chains.current_chain() @@ -219,21 +225,20 @@ class OwnershipCache: raise UnknownAddressExplained('That address is not valid on ' + ch.name) possibles = [] - if addr_fmt & AFC_SCRIPT: - # multisig or script at least.. must exist already - possibles.extend(MultisigWallet.iter_wallets(addr_fmt=addr_fmt)) - + # multisig or script at least... must exist already + afs = [addr_fmt] if addr_fmt == AF_P2SH: # might look like P2SH but actually be AF_P2WSH_P2SH - possibles.extend(MultisigWallet.iter_wallets(addr_fmt=AF_P2WSH_P2SH)) + # wrapped segwit is more used than legacy + afs = [AF_P2WSH_P2SH, AF_P2SH] # Might be single-sig p2wpkh wrapped in p2sh ... but that was a transition # thing that hopefully is going away, so if they have any multisig wallets, # defined, assume that that's the only p2sh address source. addr_fmt = AF_P2WPKH_P2SH - # TODO: add tapscript and such fancy stuff here + possibles.extend(MultisigWallet.iter_wallets(addr_fmts=afs)) try: # Construct possible single-signer wallets, always at least account=0 case @@ -247,60 +252,84 @@ class OwnershipCache: if af == addr_fmt and acct_num: w = MasterSingleSigWallet(addr_fmt, account_idx=acct_num) possibles.append(w) - except (KeyError, ValueError): pass # if not single sig address format + except (KeyError, ValueError): + pass # if not single sig address format if not possibles: # can only happen w/ scripts; for single-signer we have things to check raise UnknownAddressExplained( - "No suitable multisig wallets are currently defined.") + "No suitable multisig wallets are currently defined.") + # ordering here + return possibles + + @classmethod + def search_wallet_cache(cls, addr, cf): + # - returns wallet object, and tuple2 of final 2 subpath components # "quick" check first, before doing any generations + # external chain first, then internal (change) + for maybe in cf.fast_search(addr): + ok = cf.check_match(addr, maybe) + if ok: + return cf.wallet, maybe + return None, None - count = 0 - phase2 = [] - for change_idx in (0, 1): - files = [AddressCacheFile(w, change_idx) for w in possibles] - for f in files: - if dis.has_lcd: - dis.fullscreen('Searching wallet(s)...', line2=f.nice_name()) - else: - dis.fullscreen('Searching...') - - for maybe in f.fast_search(addr): - ok = f.check_match(addr, maybe) - if not ok: continue # false positive - will happen - - # found winner. - return f.wallet, maybe - - if f.count < MAX_ADDRS_STORED: - phase2.append(f) - - count += f.count + @classmethod + def search_build_wallet(cls, addr, cf): # maybe we haven't calculated all the addresses yet, so do that # - very slow, but only needed once; any negative (failed) search causes this # - could stop when match found, but we go a bit beyond that for next time # - we could search all in parallel, rather than serially because # more likely to find a match with low index... but seen as too much memory - - for f in phase2: - b4 = f.count - if dis.has_lcd: - dis.fullscreen("Generating addresses...", line2=f.nice_name()) - else: - dis.fullscreen("Generating...") - - result = f.build_and_search(addr) - if result: - # found it, so report it and stop - return f.wallet, result - - count += f.count - b4 + result = cf.build_and_search(addr) + if result: + # found it, so report it and stop + return cf.wallet, result # possible phase 3: other seedvault... slow, rare and not implemented + return None, None - raise UnknownAddressExplained('Searched %d candidates without finding a match.' % count) + @classmethod + def search(cls, addr): + from glob import dis + + matches = OWNERSHIP.filter(addr) + + # build cache files for both external & internal chain + cachefs = [] + for w in matches: + cachefs.append(AddressCacheFile(w, 0)) + cachefs.append(AddressCacheFile(w, 1)) + + for cf in cachefs: + msg, l2 = "Searching...", "(change)" if cf.change_idx else None + if dis.has_lcd: + msg, l2 = 'Searching wallet(s)...', cf.nice_name() + + dis.fullscreen(msg, line2=l2) + wallet, subpath = OWNERSHIP.search_wallet_cache(addr, cf) + if wallet: + # first arg from_cache=True + return True, wallet, subpath + + # nothing found in existing cache files + c = 0 + for cf in cachefs: + msg, l2 = "Generating...", "(change)" if cf.change_idx else None + if dis.has_lcd: + msg, l2 = 'Generating addresses...', cf.nice_name() + + dis.fullscreen(msg, line2=l2) + wallet, subpath = OWNERSHIP.search_build_wallet(addr, cf) + c += cf.count + if wallet: + # first arg from_cache=False + return False, wallet, subpath + + else: + raise UnknownAddressExplained('Searched %d candidate addresses in %d wallet(s)' + ' without finding a match.' % (c, len(matches))) @classmethod async def search_ux(cls, addr): @@ -311,7 +340,7 @@ class OwnershipCache: from public_constants import AFC_BECH32, AFC_BECH32M try: - wallet, subpath = OWNERSHIP.search(addr) + _, wallet, subpath = cls.search(addr) is_ms = isinstance(wallet, MultisigWallet) sp = wallet.render_path(*subpath) diff --git a/shared/paper.py b/shared/paper.py index fbe6b2ab..8e51849f 100644 --- a/shared/paper.py +++ b/shared/paper.py @@ -179,7 +179,7 @@ class PaperWalletMaker: return except Exception as e: from utils import problem_file_line - await ux_show_story('Failed to write!\n\n\n'+problem_file_line(e)) + await ux_show_story('Failed to write!\n\n'+problem_file_line(e)) return story = "Done! Created file(s):\n\n%s" % nice_txt diff --git a/shared/utils.py b/shared/utils.py index cce7c86c..73d7e3a3 100644 --- a/shared/utils.py +++ b/shared/utils.py @@ -644,13 +644,10 @@ def decode_bip21_text(got): proto, args, addr = None, None, None - # remove URL protocol: if present - if ':' in got[0:16]: - proto, got = got.split(':', 1) - + # remove query params first - if any # looks like BIP-21 payment URL if '?' in got: - addr, args = got.split('?', 1) + got, args = got.split('?', 1) # full URL decode here, but assuming no repeated keys parts = args.split('&') @@ -659,7 +656,12 @@ def decode_bip21_text(got): k, v = p.split('=', 1) args[k] = url_unquote(v) - # assume it's an bare address for now + # remove URL protocol: if present + if ':' in got: + proto, got = got.split(':', 1) + assert proto.lower() == "bitcoin" + + # assume it's a bare address for now if not addr: addr = got diff --git a/testing/test_ownership.py b/testing/test_ownership.py index 0b9553e2..f7150d7f 100644 --- a/testing/test_ownership.py +++ b/testing/test_ownership.py @@ -5,6 +5,7 @@ import pytest, time, io, csv from txn import fake_address from base58 import encode_base58_checksum +from bech32 import encode as bech32_encode from helpers import hash160, addr_from_display_format from bip32 import BIP32Node from constants import AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH, AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH @@ -50,7 +51,7 @@ def test_negative(addr_fmt, testnet, sim_exec): (AF_P2SH, True), (AF_P2WSH_P2SH,True), ]) -@pytest.mark.parametrize('offset', [ 3, 760] ) +@pytest.mark.parametrize('offset', [ 3, 760, 763] ) @pytest.mark.parametrize('subaccount', [ 0, 34] ) @pytest.mark.parametrize('change_idx', [ 0, 1] ) @pytest.mark.parametrize('from_empty', [ True, False] ) @@ -58,7 +59,6 @@ def test_positive(addr_fmt, offset, subaccount, testnet, from_empty, change_idx, sim_exec, wipe_cache, make_myself_wallet, use_testnet, goto_home, pick_menu_item, enter_number, press_cancel, settings_set, import_ms_wallet, clear_ms ): - from bech32 import encode as bech32_encode # API/Unit test, limited UX @@ -66,11 +66,9 @@ def test_positive(addr_fmt, offset, subaccount, testnet, from_empty, change_idx, # multisig jigs assume testnet raise pytest.skip('testnet only') + wipe_cache() + settings_set('accts', []) use_testnet(testnet) - if from_empty: - wipe_cache() # very different codepaths - settings_set('accts', []) - coin_type = 1 if testnet else 0 if addr_fmt in { AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH }: @@ -108,7 +106,7 @@ def test_positive(addr_fmt, offset, subaccount, testnet, from_empty, change_idx, # see addr_vs_path mk = BIP32Node.from_wallet_key(simulator_fixed_tprv if testnet else simulator_fixed_xprv) - sk = mk.subkey_for_path(path[2:].replace('h', "'")) + sk = mk.subkey_for_path(path[2:]) if addr_fmt == AF_CLASSIC: addr = sk.address(netcode="XTN" if testnet else "BTC") @@ -130,20 +128,28 @@ def test_positive(addr_fmt, offset, subaccount, testnet, from_empty, change_idx, pick_menu_item(menu_item) press_cancel() - cmd = f'from ownership import OWNERSHIP; w,path=OWNERSHIP.search({addr!r}); '\ - 'RV.write(repr([w.name, path]))' - lst = sim_exec(cmd) - if 'candidates without finding a match' in lst: - # some kinda timing issue, but don't want big delays, so just retry - print("RETRY search!") - lst = sim_exec(cmd) - + cmd = (f'from ownership import OWNERSHIP;' + f'c,w,path=OWNERSHIP.search({addr!r});' + f'RV.write(repr([c, w.name, path]))') + + if not from_empty: + # we expect here to find address from cache + # so we first need to generate proper cache + lst = sim_exec(cmd, timeout=None) + assert 'Traceback' not in lst, lst + lst = eval(lst) + assert len(lst) == 3 + assert lst[0] is False # not from cache, needed to build it + + + lst = sim_exec(cmd, timeout=None) assert 'Traceback' not in lst, lst - lst = eval(lst) - assert len(lst) == 2 + assert len(lst) == 3 - got_name, got_path = lst + from_cache, got_name, got_path = lst + + assert from_cache == (not from_empty) assert expect_name in got_name if subaccount and '...' not in path: # not expected for multisig, since we have proper wallet name @@ -249,18 +255,20 @@ def test_ux(valid, testnet, method, else: assert title == 'Unknown Address' - assert 'Searched ' in story - assert 'candidates without finding a match' in story + assert 'Searched 1528' in story # max + assert "1 wallet(s)" in story + assert 'without finding a match' in story @pytest.mark.parametrize("af", ["P2SH-Segwit", "Segwit P2WPKH", "Classic P2PKH", "ms0"]) def test_address_explorer_saver(af, wipe_cache, settings_set, goto_address_explorer, pick_menu_item, need_keypress, sim_exec, clear_ms, import_ms_wallet, press_select, goto_home, nfc_write, load_shared_mod, load_export_and_verify_signature, - cap_story, is_q1, src_root_dir, sim_root_dir): + cap_story, is_q1, src_root_dir, sim_root_dir, settings_remove): goto_home() wipe_cache() settings_set('accts', []) + settings_set('msas', 1) if af == "ms0": clear_ms() @@ -277,20 +285,22 @@ def test_address_explorer_saver(af, wipe_cache, settings_set, goto_address_explo lst = eval(lst) assert lst - if af == "ms0": - return # multisig addresses are blanked - title, body = cap_story() contents, sig_addr, _ = load_export_and_verify_signature(body, "sd", label="Address summary") addr_dump = io.StringIO(contents) cc = csv.reader(addr_dump) hdr = next(cc) - assert hdr == ['Index', 'Payment Address', 'Derivation'] + if af == "ms0": + for i in ['Index', 'Payment Address', 'Redeem Script']: + assert i in hdr + else: + assert hdr == ['Index', 'Payment Address', 'Derivation'] + addr = None - for n, (idx, addr, deriv) in enumerate(cc, start=0): - assert int(idx) == n - if idx == 200: - addr = addr + for n, rv in enumerate(cc, start=0): + assert int(rv[0]) == n + if int(rv[0]) == 200: + addr = rv[1] cc_ndef = load_shared_mod('cc_ndef', f'{src_root_dir}/shared/ndef.py') n = cc_ndef.ndefMaker() @@ -319,6 +329,144 @@ def test_address_explorer_saver(af, wipe_cache, settings_set, goto_address_explo else: assert af in story + settings_remove("msas") + + +def test_ae_saver(wipe_cache, settings_set, goto_address_explorer, cap_story, + pick_menu_item, need_keypress, sim_exec, clear_ms, is_q1, + import_ms_wallet, press_select, goto_home, nfc_write, + load_shared_mod, load_export_and_verify_signature, + set_addr_exp_start_idx, use_testnet): + + cmd = lambda a: ( + f'from ownership import OWNERSHIP;' + f'c,w,path=OWNERSHIP.search({a!r});' + f'RV.write(repr([c, w.name, path]))') + + def cache_check(a, from_cache): + l = sim_exec(cmd(a), timeout=None) + assert 'Traceback' not in l, l + assert eval(l)[0] == from_cache + + use_testnet() + goto_home() + wipe_cache() + settings_set('accts', []) + settings_set('aei', True) + + goto_address_explorer() + set_addr_exp_start_idx(7) # starting from index 7 + pick_menu_item("Segwit P2WPKH") + need_keypress("1") # save to SD + + time.sleep(.1) + title, body = cap_story() + contents, sig_addr, _ = load_export_and_verify_signature(body, "sd", label="Address summary") + addr_dump = io.StringIO(contents) + cc = csv.reader(addr_dump) + hdr = next(cc) + assert hdr == ['Index', 'Payment Address', 'Derivation'] + addrs = {} + for idx, addr, deriv in cc: + addrs[int(idx)] = addr + + # nothing was created from above as start index was 7 + cache_check(addrs[7], False) + # now we have cached addresses up to 27 + for i in range(8, 28): + cache_check(addrs[i], True) + + # cache file position at 27 (aka count) + goto_address_explorer() + set_addr_exp_start_idx(1) # starting from index 1 + pick_menu_item("Segwit P2WPKH") + need_keypress("1") # save to SD + + time.sleep(.1) + title, body = cap_story() + load_export_and_verify_signature(body, "sd", label="Address summary") + + # after above we must have first 250 addresses cached + cache_check(addrs[249], True) + + # cache file position at 250 (aka count) + goto_address_explorer() + set_addr_exp_start_idx(250) # starting from index 250 + pick_menu_item("Segwit P2WPKH") + need_keypress("1") # save to SD + + time.sleep(.1) + title, body = cap_story() + contents, sig_addr, _ = load_export_and_verify_signature(body, "sd", label="Address summary") + addr_dump = io.StringIO(contents) + cc = csv.reader(addr_dump) + hdr = next(cc) + assert hdr == ['Index', 'Payment Address', 'Derivation'] + addrs = {} + for idx, addr, deriv in cc: + addrs[int(idx)] = addr + + # after above we must have first 500 addresses cached + cache_check(addrs[300], True) + cache_check(addrs[400], True) + cache_check(addrs[499], True) + + # now addresses that we already have, does nothing + goto_address_explorer() + set_addr_exp_start_idx(100) # starting from index 100 + pick_menu_item("Segwit P2WPKH") + need_keypress("1") # save to SD + + time.sleep(.1) + title, body = cap_story() + load_export_and_verify_signature(body, "sd", label="Address summary") + cache_check(addrs[499], True) + + # now move count up via ownership + mk = BIP32Node.from_wallet_key(simulator_fixed_tprv) + sk = mk.subkey_for_path("84h/1h/0h/0/580") + addr = bech32_encode('tb', 0, sk.hash160()) + cache_check(addr, False) + # now count at 600 (580+20) + + # now over the max but with some we already have + goto_address_explorer() + set_addr_exp_start_idx(550) # starting from index 550 (would go up to 800) + pick_menu_item("Segwit P2WPKH") + need_keypress("1") # save to SD + + time.sleep(.1) + title, body = cap_story() + contents, sig_addr, _ = load_export_and_verify_signature(body, "sd", label="Address summary") + addr_dump = io.StringIO(contents) + cc = csv.reader(addr_dump) + hdr = next(cc) + assert hdr == ['Index', 'Payment Address', 'Derivation'] + addrs = {} + for idx, addr, deriv in cc: + addrs[int(idx)] = addr + + assert 799 in addrs + cache_check(addrs[763], True) # max + + # start idx over max stored addresses + goto_address_explorer() + set_addr_exp_start_idx(764) # starting from index 764 + pick_menu_item("Segwit P2WPKH") + need_keypress("1") # save to SD + + time.sleep(.1) + title, body = cap_story() + load_export_and_verify_signature(body, "sd", label="Address summary") + # does notthing harmful, nothing added + + cache_check(addrs[763], True) # max + l = sim_exec(cmd(addrs[764]), timeout=None) + assert 'Traceback' in l + assert 'Searched 1528' in l # max + assert "1 wallet(s)" in l + assert 'without finding a match' in l + def test_regtest_addr_on_mainnet(goto_home, is_q1, pick_menu_item, scan_a_qr, nfc_write, cap_story, need_keypress, load_shared_mod, use_mainnet, src_root_dir, sim_root_dir): @@ -360,4 +508,67 @@ def test_regtest_addr_on_mainnet(goto_home, is_q1, pick_menu_item, scan_a_qr, nf assert title == 'Unknown Address' assert "not valid on Bitcoin Mainnet" in story + +def test_20_more_build_after_match(sim_exec, import_ms_wallet, clear_ms, wipe_cache, settings_set): + from test_multisig import make_ms_address, HARD + + cmd = lambda a: ( + f'from ownership import OWNERSHIP;' + f'c,w,path=OWNERSHIP.search({a!r});' + f'RV.write(repr([c, w.name, path]))') + + # create multisig wallet + M, N = 2, 3 + expect_name = 'test20more' + clear_ms() + keys = import_ms_wallet(M, N, name=expect_name, accept=True, addr_fmt="p2wsh") + + make_a = lambda index: make_ms_address( + M, keys, + is_change=False, idx=index, addr_fmt=AF_P2WSH, testnet=True, + path_mapper=lambda cosigner: [HARD(45), 0, index]) + + def cache_check(index, from_cache): + a = make_a(index)[0] + l = sim_exec(cmd(a), timeout=None) + assert 'Traceback' not in l, l + assert eval(l)[0] == from_cache + + # clean slate + wipe_cache() + settings_set('accts', []) + + # generate 10th (idx=9) address (external) + # first run, generated first 10 addresses + 20 + cache_check(9, False) + + # now we can go up to index 29 - all must come from cache + for i in range(10, 30): + cache_check(i, True) + + # idx 30 - not in cache + # but will cache next 20 addrs + cache_check(30, False) + + # now we can go up to index 51 - all must come from cache + for i in range(31, 51): + cache_check(i, True) + + # idx 51 - not in cache + # but will cache next 20 addrs + cache_check(51, False) + + cache_check(760, False) + cache_check(761, True) + cache_check(762, True) + cache_check(763, True) + + # after max - not gonna find + addr = make_a(764)[0] + l = sim_exec(cmd(addr), timeout=None) + assert 'Traceback' in l + assert 'Searched 1528' in l # max + assert "1 wallet(s)" in l + assert 'without finding a match' in l + # EOF