bugfix: ownership check needed re-run for values near max
This commit is contained in:
parent
9e762d29a5
commit
e8ba25fd04
@ -28,6 +28,7 @@ Spending policies for "Single Signers" adds new spending policy options:
|
||||
- Bugfix: Fix filesystem initialization after Wife LFS or Destroy Seed.
|
||||
- Bugfix: Fix MicroSD selftest code.
|
||||
- Bugfix: NFC loop exporting secrets would not work after first value exported.
|
||||
- Bugfix: Ownership check failing to find addresses near max (~760), needed to be re-run to succeed
|
||||
|
||||
# Mk4 Specific Changes
|
||||
|
||||
|
||||
@ -429,14 +429,11 @@ def generate_address_csv(path, addr_fmt, ms_wallet, account_num, n, start=0, cha
|
||||
+ ['Derivation (%d of %d)' % (i+1, ms_wallet.N) for i in range(ms_wallet.N)]
|
||||
) + '"\n'
|
||||
|
||||
if (start == 0) and (n > 100) and change in (0, 1):
|
||||
saver = OWNERSHIP.saver(ms_wallet, change, start)
|
||||
else:
|
||||
saver = None
|
||||
saver = OWNERSHIP.saver(ms_wallet, change, start, n)
|
||||
|
||||
for (idx, addr, derivs, script) in ms_wallet.yield_addresses(start, n, change_idx=change):
|
||||
if saver:
|
||||
saver(addr)
|
||||
saver(addr, idx)
|
||||
|
||||
# policy choice: never provide a complete multisig address to user.
|
||||
addr = censor_address(addr)
|
||||
@ -448,7 +445,7 @@ def generate_address_csv(path, addr_fmt, ms_wallet, account_num, n, start=0, cha
|
||||
yield ln
|
||||
|
||||
if saver:
|
||||
saver(None) # close file
|
||||
saver(None, 0) # close cache file
|
||||
|
||||
return
|
||||
|
||||
@ -456,20 +453,17 @@ def generate_address_csv(path, addr_fmt, ms_wallet, account_num, n, start=0, cha
|
||||
from wallet import MasterSingleSigWallet
|
||||
main = MasterSingleSigWallet(addr_fmt, path, account_num)
|
||||
|
||||
if n and (start == 0) and (n > 100) and change in (0, 1):
|
||||
saver = OWNERSHIP.saver(main, change, start)
|
||||
else:
|
||||
saver = None
|
||||
saver = OWNERSHIP.saver(main, change, start, n)
|
||||
|
||||
yield '"Index","Payment Address","Derivation"\n'
|
||||
for (idx, addr, deriv) in main.yield_addresses(start, n, change_idx=change):
|
||||
if saver:
|
||||
saver(addr)
|
||||
saver(addr, idx)
|
||||
|
||||
yield '%d,"%s","%s"\n' % (idx, addr, deriv)
|
||||
|
||||
if saver:
|
||||
saver(None) # close
|
||||
saver(None, 0) # close cache file
|
||||
|
||||
async def make_address_summary_file(path, addr_fmt, ms_wallet, account_num,
|
||||
start=0, count=250, change=0, **save_opts):
|
||||
@ -516,7 +510,7 @@ async def make_address_summary_file(path, addr_fmt, ms_wallet, account_num,
|
||||
except CardMissingError:
|
||||
await needs_microsd()
|
||||
except Exception as e:
|
||||
await ux_show_story('Failed to write!\n\n\n%s\n%s' % (e, problem_file_line(e)))
|
||||
await ux_show_story('Failed to write!\n\n%s\n%s' % (e, problem_file_line(e)))
|
||||
|
||||
|
||||
async def address_explore(*a):
|
||||
|
||||
@ -150,11 +150,15 @@ class Display:
|
||||
|
||||
def fullscreen(self, msg, percent=None, line2=None):
|
||||
# show a simple message "fullscreen".
|
||||
# - 'line2' not supported on smaller screen sizes, ignore
|
||||
self.clear()
|
||||
y = 14
|
||||
self.text(None, y, msg, font=FontLarge)
|
||||
|
||||
if line2:
|
||||
y += FontLarge.height # add height of above
|
||||
y += FontTiny.height # add space of size FontTiny height
|
||||
self.text(None, y, line2, font=FontSmall)
|
||||
|
||||
if percent is not None:
|
||||
self.progress_bar(percent)
|
||||
self.show()
|
||||
|
||||
@ -241,7 +241,7 @@ async def drv_entro_step2(_1, picked, _2, just_pick=False):
|
||||
await needs_microsd()
|
||||
continue
|
||||
except Exception as e:
|
||||
await ux_show_story('Failed to write!\n\n\n'+str(e))
|
||||
await ux_show_story('Failed to write!\n\n'+str(e))
|
||||
continue
|
||||
|
||||
story = "Filename is:\n\n%s" % out_fn
|
||||
|
||||
@ -91,10 +91,10 @@ async def export_contents(title, contents, fname_pattern, derive=None, addr_fmt=
|
||||
|
||||
await ux_show_story(msg)
|
||||
|
||||
except CardMissingError:
|
||||
await needs_microsd()
|
||||
except Exception as e:
|
||||
await ux_show_story('Failed to write!\n\n\n' + str(e))
|
||||
except CardMissingError:
|
||||
await needs_microsd()
|
||||
except Exception as e:
|
||||
await ux_show_story('Failed to write!\n\n' + str(e))
|
||||
|
||||
# both exceptions & success gets here
|
||||
if no_qr and (NFC is None) and (VD is None) and not force_prompt:
|
||||
|
||||
@ -245,9 +245,10 @@ class MultisigWallet(WalletABC):
|
||||
return rv
|
||||
|
||||
@classmethod
|
||||
def iter_wallets(cls, M=None, N=None, not_idx=None, addr_fmt=None):
|
||||
def iter_wallets(cls, M=None, N=None, not_idx=None, addr_fmts=None):
|
||||
# yield MS wallets we know about, that match at least right M,N if known.
|
||||
# - this is only place we should be searching this list, please!!
|
||||
# addr_fmts: list of address formats we're intersted in
|
||||
lst = settings.get('multisig', [])
|
||||
|
||||
for idx, rec in enumerate(lst):
|
||||
@ -261,11 +262,11 @@ class MultisigWallet(WalletABC):
|
||||
if M is not None and has_m != M: continue
|
||||
if N is not None and has_n != N: continue
|
||||
|
||||
if addr_fmt is not None:
|
||||
if addr_fmts:
|
||||
opts = rec[3]
|
||||
af = opts.get('ft', AF_P2SH)
|
||||
if af != addr_fmt: continue
|
||||
|
||||
if af not in addr_fmts: continue
|
||||
|
||||
yield cls.deserialize(rec, idx)
|
||||
|
||||
def get_xfp_paths(self):
|
||||
@ -273,28 +274,23 @@ class MultisigWallet(WalletABC):
|
||||
return list(self.xfp_paths.values())
|
||||
|
||||
@classmethod
|
||||
def find_match(cls, M, N, xfp_paths, addr_fmt=None):
|
||||
def find_match(cls, M, N, xfp_paths, addr_fmts=None):
|
||||
# Find index of matching wallet
|
||||
# - xfp_paths is list of lists: [xfp, *path] like in psbt files
|
||||
# - M and N must be known
|
||||
# - returns instance, or None if not found
|
||||
for rv in cls.iter_wallets(M, N, addr_fmt=addr_fmt):
|
||||
for rv in cls.iter_wallets(M, N, addr_fmts=addr_fmts):
|
||||
if rv.matching_subpaths(xfp_paths):
|
||||
return rv
|
||||
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def find_candidates(cls, xfp_paths, addr_fmt=None, M=None):
|
||||
def find_candidates(cls, xfp_paths):
|
||||
# Return a list of matching wallets for various M values.
|
||||
# - xpfs_paths should already be sorted
|
||||
# - returns set of matches, of any M value
|
||||
|
||||
# we know N, but not M at this point.
|
||||
N = len(xfp_paths)
|
||||
|
||||
matches = []
|
||||
for rv in cls.iter_wallets(M=M, addr_fmt=addr_fmt):
|
||||
for rv in cls.iter_wallets():
|
||||
if rv.matching_subpaths(xfp_paths):
|
||||
matches.append(rv)
|
||||
|
||||
@ -408,7 +404,7 @@ class MultisigWallet(WalletABC):
|
||||
# - count_similar: same N, same xfp+paths
|
||||
|
||||
lst = self.get_xfp_paths()
|
||||
c = self.find_match(self.M, self.N, lst, addr_fmt=self.addr_fmt)
|
||||
c = self.find_match(self.M, self.N, lst, addr_fmts=[self.addr_fmt])
|
||||
if c:
|
||||
# All details are same: M/N, paths, addr fmt
|
||||
if sorted(self.xpubs) != sorted(c.xpubs):
|
||||
@ -454,7 +450,7 @@ class MultisigWallet(WalletABC):
|
||||
assert self.storage_idx >= 0
|
||||
|
||||
# safety check
|
||||
for existing in self.iter_wallets(M=self.M, N=self.N, addr_fmt=self.addr_fmt):
|
||||
for existing in self.iter_wallets(M=self.M, N=self.N, addr_fmts=[self.addr_fmt]):
|
||||
if existing.storage_idx != self.storage_idx: continue
|
||||
break
|
||||
else:
|
||||
|
||||
@ -620,7 +620,7 @@ async def start_export(notes):
|
||||
await needs_microsd()
|
||||
return
|
||||
except Exception as e:
|
||||
await ux_show_story('Failed to write!\n\n\n'+str(e))
|
||||
await ux_show_story('Failed to write!\n\n'+str(e))
|
||||
return
|
||||
|
||||
msg = 'Export file written:\n\n%s\n\nSignature file written:\n\n%s' % (
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
#
|
||||
# ownership.py - store a cache of hashes related to addresses we might control.
|
||||
#
|
||||
import os, sys, chains, ngu, struct, version
|
||||
import os, chains, ngu, struct, version
|
||||
from glob import settings
|
||||
from ucollections import namedtuple
|
||||
from ubinascii import hexlify as b2a_hex
|
||||
@ -11,8 +11,7 @@ from utils import problem_file_line, show_single_address
|
||||
|
||||
# Track many addresses, but in compressed form
|
||||
# - map from random Bech32/Base58 payment address to (wallet) + keypath
|
||||
# - only normal (external, not change) addresses, and won't consider
|
||||
# any keypath that does not end in 0/*
|
||||
# - won't consider any keypath that does not end in <0;1>/*
|
||||
# - store only hints, since we can re-construct any address and want to fully verify
|
||||
# - try to keep private between different duress wallets, and seed vaults
|
||||
# - storing bulk data into LFS, not settings
|
||||
@ -39,7 +38,6 @@ OWNERSHIP_MAGIC = 0x10A0 # "Address Ownership" v1.0
|
||||
|
||||
# target 3 flash blocks, max file size => 764 addresses
|
||||
MAX_ADDRS_STORED = const(764) # =((3*512) - OWNERSHIP_FILE_HDR_LEN) // HASH_ENC_LEN
|
||||
BONUS_GAP_LIMIT = const(20)
|
||||
|
||||
def encode_addr(addr, salt):
|
||||
# Convert text address to something we can store while preserving privacy.
|
||||
@ -56,6 +54,7 @@ class AddressCacheFile:
|
||||
self.salt = h[32:]
|
||||
self.count = 0
|
||||
self.hdr = None
|
||||
self.fd = None
|
||||
|
||||
self.peek()
|
||||
|
||||
@ -65,9 +64,6 @@ class AddressCacheFile:
|
||||
rv += ' (change)'
|
||||
return rv
|
||||
|
||||
def exists(self):
|
||||
return bool(self.count)
|
||||
|
||||
def peek(self):
|
||||
# see what we have on-disk; just reads header.
|
||||
try:
|
||||
@ -105,15 +101,14 @@ class AddressCacheFile:
|
||||
self.fd.write(hdr)
|
||||
|
||||
def append(self, addr):
|
||||
if addr is None:
|
||||
# close file, done
|
||||
self.fd.close()
|
||||
del self.fd
|
||||
return
|
||||
|
||||
assert '_' not in addr
|
||||
self.fd.write(encode_addr(addr, self.salt))
|
||||
|
||||
def close(self):
|
||||
# close file, done
|
||||
if self.fd is not None:
|
||||
self.fd.close()
|
||||
self.fd = None
|
||||
|
||||
def fast_search(self, addr):
|
||||
# Do the easy part of the searching, using the existing file's contents.
|
||||
# - generates candidate path subcomponents; might be false positive
|
||||
@ -121,6 +116,7 @@ class AddressCacheFile:
|
||||
from glob import dis
|
||||
|
||||
if not self.hdr or not self.count:
|
||||
# cache empty
|
||||
return
|
||||
|
||||
with open(self.fname, 'rb') as fd:
|
||||
@ -132,7 +128,7 @@ class AddressCacheFile:
|
||||
chk = encode_addr(addr, self.salt)
|
||||
for idx in range(self.count):
|
||||
if buf[idx*HASH_ENC_LEN : (idx*HASH_ENC_LEN)+HASH_ENC_LEN] == chk:
|
||||
yield (self.change_idx, idx)
|
||||
yield self.change_idx, idx
|
||||
|
||||
dis.progress_sofar(idx, self.count)
|
||||
|
||||
@ -148,68 +144,78 @@ class AddressCacheFile:
|
||||
# - return subpath for a hit or None
|
||||
from glob import dis
|
||||
|
||||
bonus = 0
|
||||
match = None
|
||||
|
||||
start_idx = self.count
|
||||
count = MAX_ADDRS_STORED - start_idx
|
||||
|
||||
if count <= 0:
|
||||
return None
|
||||
return match
|
||||
|
||||
self.setup(self.change_idx, start_idx)
|
||||
|
||||
bonus = None
|
||||
for idx,here,*_ in self.wallet.yield_addresses(start_idx, count,
|
||||
change_idx=self.change_idx):
|
||||
|
||||
if here == addr:
|
||||
# Found it! But keep going a little for next time.
|
||||
match = (self.change_idx, idx)
|
||||
|
||||
change_idx=self.change_idx):
|
||||
self.append(here)
|
||||
self.count += 1
|
||||
if match:
|
||||
|
||||
if bonus:
|
||||
if bonus >= 20:
|
||||
# do (at most) 20 more - limited by 'start_idx' & 'count'
|
||||
break
|
||||
bonus += 1
|
||||
|
||||
if match and bonus >= BONUS_GAP_LIMIT:
|
||||
self.append(None)
|
||||
return match
|
||||
|
||||
dis.progress_sofar(idx-start_idx, count)
|
||||
if here == addr:
|
||||
# match but keep going
|
||||
match = (self.change_idx, idx)
|
||||
bonus = 1
|
||||
|
||||
self.append(None)
|
||||
dis.progress_sofar(idx - start_idx, count)
|
||||
|
||||
return None
|
||||
self.close()
|
||||
return match
|
||||
|
||||
class OwnershipCache:
|
||||
|
||||
@classmethod
|
||||
def saver(cls, wallet, change_idx, start_idx):
|
||||
# when we are generating many addresses for export, capture them
|
||||
def saver(cls, wallet, change_idx, start_idx, count):
|
||||
# when we are generating many addresses for export, capture them (if suitable)
|
||||
# as we go with this function
|
||||
# - not change -- only main addrs
|
||||
if not count:
|
||||
return
|
||||
if change_idx not in (0, 1):
|
||||
return
|
||||
if start_idx >= MAX_ADDRS_STORED:
|
||||
return
|
||||
|
||||
file = AddressCacheFile(wallet, change_idx)
|
||||
current_pos = file.count
|
||||
|
||||
if file.exists():
|
||||
# don't save to existing file, has some already
|
||||
return None
|
||||
if start_idx > current_pos:
|
||||
# nothing to do here, we are missing some addresses in the middle
|
||||
return
|
||||
if (start_idx + count) <= current_pos:
|
||||
# we already have all these addresses
|
||||
return
|
||||
|
||||
try:
|
||||
file.setup(change_idx, start_idx)
|
||||
except:
|
||||
# in some cases we don't want to save anything, not an error
|
||||
return None
|
||||
file.setup(change_idx, current_pos)
|
||||
|
||||
return file.append
|
||||
def doit(addr, idx):
|
||||
if addr is None:
|
||||
file.close()
|
||||
elif (idx < MAX_ADDRS_STORED) and idx >= current_pos:
|
||||
file.append(addr)
|
||||
|
||||
return doit
|
||||
|
||||
@classmethod
|
||||
def search(cls, addr):
|
||||
# Find it!
|
||||
# - returns wallet object, and tuple2 of final 2 subpath components
|
||||
def filter(cls, addr):
|
||||
# Filter possible candidates!
|
||||
# - if you start w/ testnet, we'll follow that
|
||||
from multisig import MultisigWallet
|
||||
from public_constants import AFC_SCRIPT, AF_P2WPKH_P2SH, AF_P2SH, AF_P2WSH_P2SH
|
||||
from glob import dis
|
||||
|
||||
ch = chains.current_chain()
|
||||
|
||||
@ -219,21 +225,20 @@ class OwnershipCache:
|
||||
raise UnknownAddressExplained('That address is not valid on ' + ch.name)
|
||||
|
||||
possibles = []
|
||||
|
||||
if addr_fmt & AFC_SCRIPT:
|
||||
# multisig or script at least.. must exist already
|
||||
possibles.extend(MultisigWallet.iter_wallets(addr_fmt=addr_fmt))
|
||||
|
||||
# multisig or script at least... must exist already
|
||||
afs = [addr_fmt]
|
||||
if addr_fmt == AF_P2SH:
|
||||
# might look like P2SH but actually be AF_P2WSH_P2SH
|
||||
possibles.extend(MultisigWallet.iter_wallets(addr_fmt=AF_P2WSH_P2SH))
|
||||
# wrapped segwit is more used than legacy
|
||||
afs = [AF_P2WSH_P2SH, AF_P2SH]
|
||||
|
||||
# Might be single-sig p2wpkh wrapped in p2sh ... but that was a transition
|
||||
# thing that hopefully is going away, so if they have any multisig wallets,
|
||||
# defined, assume that that's the only p2sh address source.
|
||||
addr_fmt = AF_P2WPKH_P2SH
|
||||
|
||||
# TODO: add tapscript and such fancy stuff here
|
||||
possibles.extend(MultisigWallet.iter_wallets(addr_fmts=afs))
|
||||
|
||||
try:
|
||||
# Construct possible single-signer wallets, always at least account=0 case
|
||||
@ -247,60 +252,84 @@ class OwnershipCache:
|
||||
if af == addr_fmt and acct_num:
|
||||
w = MasterSingleSigWallet(addr_fmt, account_idx=acct_num)
|
||||
possibles.append(w)
|
||||
except (KeyError, ValueError): pass # if not single sig address format
|
||||
except (KeyError, ValueError):
|
||||
pass # if not single sig address format
|
||||
|
||||
if not possibles:
|
||||
# can only happen w/ scripts; for single-signer we have things to check
|
||||
raise UnknownAddressExplained(
|
||||
"No suitable multisig wallets are currently defined.")
|
||||
"No suitable multisig wallets are currently defined.")
|
||||
|
||||
# ordering here
|
||||
return possibles
|
||||
|
||||
@classmethod
|
||||
def search_wallet_cache(cls, addr, cf):
|
||||
# - returns wallet object, and tuple2 of final 2 subpath components
|
||||
# "quick" check first, before doing any generations
|
||||
# external chain first, then internal (change)
|
||||
for maybe in cf.fast_search(addr):
|
||||
ok = cf.check_match(addr, maybe)
|
||||
if ok:
|
||||
return cf.wallet, maybe
|
||||
return None, None
|
||||
|
||||
count = 0
|
||||
phase2 = []
|
||||
for change_idx in (0, 1):
|
||||
files = [AddressCacheFile(w, change_idx) for w in possibles]
|
||||
for f in files:
|
||||
if dis.has_lcd:
|
||||
dis.fullscreen('Searching wallet(s)...', line2=f.nice_name())
|
||||
else:
|
||||
dis.fullscreen('Searching...')
|
||||
|
||||
for maybe in f.fast_search(addr):
|
||||
ok = f.check_match(addr, maybe)
|
||||
if not ok: continue # false positive - will happen
|
||||
|
||||
# found winner.
|
||||
return f.wallet, maybe
|
||||
|
||||
if f.count < MAX_ADDRS_STORED:
|
||||
phase2.append(f)
|
||||
|
||||
count += f.count
|
||||
|
||||
@classmethod
|
||||
def search_build_wallet(cls, addr, cf):
|
||||
# maybe we haven't calculated all the addresses yet, so do that
|
||||
# - very slow, but only needed once; any negative (failed) search causes this
|
||||
# - could stop when match found, but we go a bit beyond that for next time
|
||||
# - we could search all in parallel, rather than serially because
|
||||
# more likely to find a match with low index... but seen as too much memory
|
||||
|
||||
for f in phase2:
|
||||
b4 = f.count
|
||||
if dis.has_lcd:
|
||||
dis.fullscreen("Generating addresses...", line2=f.nice_name())
|
||||
else:
|
||||
dis.fullscreen("Generating...")
|
||||
|
||||
result = f.build_and_search(addr)
|
||||
if result:
|
||||
# found it, so report it and stop
|
||||
return f.wallet, result
|
||||
|
||||
count += f.count - b4
|
||||
result = cf.build_and_search(addr)
|
||||
if result:
|
||||
# found it, so report it and stop
|
||||
return cf.wallet, result
|
||||
|
||||
# possible phase 3: other seedvault... slow, rare and not implemented
|
||||
return None, None
|
||||
|
||||
raise UnknownAddressExplained('Searched %d candidates without finding a match.' % count)
|
||||
@classmethod
|
||||
def search(cls, addr):
|
||||
from glob import dis
|
||||
|
||||
matches = OWNERSHIP.filter(addr)
|
||||
|
||||
# build cache files for both external & internal chain
|
||||
cachefs = []
|
||||
for w in matches:
|
||||
cachefs.append(AddressCacheFile(w, 0))
|
||||
cachefs.append(AddressCacheFile(w, 1))
|
||||
|
||||
for cf in cachefs:
|
||||
msg, l2 = "Searching...", "(change)" if cf.change_idx else None
|
||||
if dis.has_lcd:
|
||||
msg, l2 = 'Searching wallet(s)...', cf.nice_name()
|
||||
|
||||
dis.fullscreen(msg, line2=l2)
|
||||
wallet, subpath = OWNERSHIP.search_wallet_cache(addr, cf)
|
||||
if wallet:
|
||||
# first arg from_cache=True
|
||||
return True, wallet, subpath
|
||||
|
||||
# nothing found in existing cache files
|
||||
c = 0
|
||||
for cf in cachefs:
|
||||
msg, l2 = "Generating...", "(change)" if cf.change_idx else None
|
||||
if dis.has_lcd:
|
||||
msg, l2 = 'Generating addresses...', cf.nice_name()
|
||||
|
||||
dis.fullscreen(msg, line2=l2)
|
||||
wallet, subpath = OWNERSHIP.search_build_wallet(addr, cf)
|
||||
c += cf.count
|
||||
if wallet:
|
||||
# first arg from_cache=False
|
||||
return False, wallet, subpath
|
||||
|
||||
else:
|
||||
raise UnknownAddressExplained('Searched %d candidate addresses in %d wallet(s)'
|
||||
' without finding a match.' % (c, len(matches)))
|
||||
|
||||
@classmethod
|
||||
async def search_ux(cls, addr):
|
||||
@ -311,7 +340,7 @@ class OwnershipCache:
|
||||
from public_constants import AFC_BECH32, AFC_BECH32M
|
||||
|
||||
try:
|
||||
wallet, subpath = OWNERSHIP.search(addr)
|
||||
_, wallet, subpath = cls.search(addr)
|
||||
is_ms = isinstance(wallet, MultisigWallet)
|
||||
sp = wallet.render_path(*subpath)
|
||||
|
||||
|
||||
@ -179,7 +179,7 @@ class PaperWalletMaker:
|
||||
return
|
||||
except Exception as e:
|
||||
from utils import problem_file_line
|
||||
await ux_show_story('Failed to write!\n\n\n'+problem_file_line(e))
|
||||
await ux_show_story('Failed to write!\n\n'+problem_file_line(e))
|
||||
return
|
||||
|
||||
story = "Done! Created file(s):\n\n%s" % nice_txt
|
||||
|
||||
@ -644,13 +644,10 @@ def decode_bip21_text(got):
|
||||
|
||||
proto, args, addr = None, None, None
|
||||
|
||||
# remove URL protocol: if present
|
||||
if ':' in got[0:16]:
|
||||
proto, got = got.split(':', 1)
|
||||
|
||||
# remove query params first - if any
|
||||
# looks like BIP-21 payment URL
|
||||
if '?' in got:
|
||||
addr, args = got.split('?', 1)
|
||||
got, args = got.split('?', 1)
|
||||
|
||||
# full URL decode here, but assuming no repeated keys
|
||||
parts = args.split('&')
|
||||
@ -659,7 +656,12 @@ def decode_bip21_text(got):
|
||||
k, v = p.split('=', 1)
|
||||
args[k] = url_unquote(v)
|
||||
|
||||
# assume it's an bare address for now
|
||||
# remove URL protocol: if present
|
||||
if ':' in got:
|
||||
proto, got = got.split(':', 1)
|
||||
assert proto.lower() == "bitcoin"
|
||||
|
||||
# assume it's a bare address for now
|
||||
if not addr:
|
||||
addr = got
|
||||
|
||||
|
||||
@ -5,6 +5,7 @@
|
||||
import pytest, time, io, csv
|
||||
from txn import fake_address
|
||||
from base58 import encode_base58_checksum
|
||||
from bech32 import encode as bech32_encode
|
||||
from helpers import hash160, addr_from_display_format
|
||||
from bip32 import BIP32Node
|
||||
from constants import AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH, AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH
|
||||
@ -50,7 +51,7 @@ def test_negative(addr_fmt, testnet, sim_exec):
|
||||
(AF_P2SH, True),
|
||||
(AF_P2WSH_P2SH,True),
|
||||
])
|
||||
@pytest.mark.parametrize('offset', [ 3, 760] )
|
||||
@pytest.mark.parametrize('offset', [ 3, 760, 763] )
|
||||
@pytest.mark.parametrize('subaccount', [ 0, 34] )
|
||||
@pytest.mark.parametrize('change_idx', [ 0, 1] )
|
||||
@pytest.mark.parametrize('from_empty', [ True, False] )
|
||||
@ -58,7 +59,6 @@ def test_positive(addr_fmt, offset, subaccount, testnet, from_empty, change_idx,
|
||||
sim_exec, wipe_cache, make_myself_wallet, use_testnet, goto_home, pick_menu_item,
|
||||
enter_number, press_cancel, settings_set, import_ms_wallet, clear_ms
|
||||
):
|
||||
from bech32 import encode as bech32_encode
|
||||
|
||||
# API/Unit test, limited UX
|
||||
|
||||
@ -66,11 +66,9 @@ def test_positive(addr_fmt, offset, subaccount, testnet, from_empty, change_idx,
|
||||
# multisig jigs assume testnet
|
||||
raise pytest.skip('testnet only')
|
||||
|
||||
wipe_cache()
|
||||
settings_set('accts', [])
|
||||
use_testnet(testnet)
|
||||
if from_empty:
|
||||
wipe_cache() # very different codepaths
|
||||
settings_set('accts', [])
|
||||
|
||||
coin_type = 1 if testnet else 0
|
||||
|
||||
if addr_fmt in { AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH }:
|
||||
@ -108,7 +106,7 @@ def test_positive(addr_fmt, offset, subaccount, testnet, from_empty, change_idx,
|
||||
|
||||
# see addr_vs_path
|
||||
mk = BIP32Node.from_wallet_key(simulator_fixed_tprv if testnet else simulator_fixed_xprv)
|
||||
sk = mk.subkey_for_path(path[2:].replace('h', "'"))
|
||||
sk = mk.subkey_for_path(path[2:])
|
||||
|
||||
if addr_fmt == AF_CLASSIC:
|
||||
addr = sk.address(netcode="XTN" if testnet else "BTC")
|
||||
@ -130,20 +128,28 @@ def test_positive(addr_fmt, offset, subaccount, testnet, from_empty, change_idx,
|
||||
pick_menu_item(menu_item)
|
||||
press_cancel()
|
||||
|
||||
cmd = f'from ownership import OWNERSHIP; w,path=OWNERSHIP.search({addr!r}); '\
|
||||
'RV.write(repr([w.name, path]))'
|
||||
lst = sim_exec(cmd)
|
||||
if 'candidates without finding a match' in lst:
|
||||
# some kinda timing issue, but don't want big delays, so just retry
|
||||
print("RETRY search!")
|
||||
lst = sim_exec(cmd)
|
||||
|
||||
cmd = (f'from ownership import OWNERSHIP;'
|
||||
f'c,w,path=OWNERSHIP.search({addr!r});'
|
||||
f'RV.write(repr([c, w.name, path]))')
|
||||
|
||||
if not from_empty:
|
||||
# we expect here to find address from cache
|
||||
# so we first need to generate proper cache
|
||||
lst = sim_exec(cmd, timeout=None)
|
||||
assert 'Traceback' not in lst, lst
|
||||
lst = eval(lst)
|
||||
assert len(lst) == 3
|
||||
assert lst[0] is False # not from cache, needed to build it
|
||||
|
||||
|
||||
lst = sim_exec(cmd, timeout=None)
|
||||
assert 'Traceback' not in lst, lst
|
||||
|
||||
lst = eval(lst)
|
||||
assert len(lst) == 2
|
||||
assert len(lst) == 3
|
||||
|
||||
got_name, got_path = lst
|
||||
from_cache, got_name, got_path = lst
|
||||
|
||||
assert from_cache == (not from_empty)
|
||||
assert expect_name in got_name
|
||||
if subaccount and '...' not in path:
|
||||
# not expected for multisig, since we have proper wallet name
|
||||
@ -249,18 +255,20 @@ def test_ux(valid, testnet, method,
|
||||
|
||||
else:
|
||||
assert title == 'Unknown Address'
|
||||
assert 'Searched ' in story
|
||||
assert 'candidates without finding a match' in story
|
||||
assert 'Searched 1528' in story # max
|
||||
assert "1 wallet(s)" in story
|
||||
assert 'without finding a match' in story
|
||||
|
||||
@pytest.mark.parametrize("af", ["P2SH-Segwit", "Segwit P2WPKH", "Classic P2PKH", "ms0"])
|
||||
def test_address_explorer_saver(af, wipe_cache, settings_set, goto_address_explorer,
|
||||
pick_menu_item, need_keypress, sim_exec, clear_ms,
|
||||
import_ms_wallet, press_select, goto_home, nfc_write,
|
||||
load_shared_mod, load_export_and_verify_signature,
|
||||
cap_story, is_q1, src_root_dir, sim_root_dir):
|
||||
cap_story, is_q1, src_root_dir, sim_root_dir, settings_remove):
|
||||
goto_home()
|
||||
wipe_cache()
|
||||
settings_set('accts', [])
|
||||
settings_set('msas', 1)
|
||||
|
||||
if af == "ms0":
|
||||
clear_ms()
|
||||
@ -277,20 +285,22 @@ def test_address_explorer_saver(af, wipe_cache, settings_set, goto_address_explo
|
||||
lst = eval(lst)
|
||||
assert lst
|
||||
|
||||
if af == "ms0":
|
||||
return # multisig addresses are blanked
|
||||
|
||||
title, body = cap_story()
|
||||
contents, sig_addr, _ = load_export_and_verify_signature(body, "sd", label="Address summary")
|
||||
addr_dump = io.StringIO(contents)
|
||||
cc = csv.reader(addr_dump)
|
||||
hdr = next(cc)
|
||||
assert hdr == ['Index', 'Payment Address', 'Derivation']
|
||||
if af == "ms0":
|
||||
for i in ['Index', 'Payment Address', 'Redeem Script']:
|
||||
assert i in hdr
|
||||
else:
|
||||
assert hdr == ['Index', 'Payment Address', 'Derivation']
|
||||
|
||||
addr = None
|
||||
for n, (idx, addr, deriv) in enumerate(cc, start=0):
|
||||
assert int(idx) == n
|
||||
if idx == 200:
|
||||
addr = addr
|
||||
for n, rv in enumerate(cc, start=0):
|
||||
assert int(rv[0]) == n
|
||||
if int(rv[0]) == 200:
|
||||
addr = rv[1]
|
||||
|
||||
cc_ndef = load_shared_mod('cc_ndef', f'{src_root_dir}/shared/ndef.py')
|
||||
n = cc_ndef.ndefMaker()
|
||||
@ -319,6 +329,144 @@ def test_address_explorer_saver(af, wipe_cache, settings_set, goto_address_explo
|
||||
else:
|
||||
assert af in story
|
||||
|
||||
settings_remove("msas")
|
||||
|
||||
|
||||
def test_ae_saver(wipe_cache, settings_set, goto_address_explorer, cap_story,
|
||||
pick_menu_item, need_keypress, sim_exec, clear_ms, is_q1,
|
||||
import_ms_wallet, press_select, goto_home, nfc_write,
|
||||
load_shared_mod, load_export_and_verify_signature,
|
||||
set_addr_exp_start_idx, use_testnet):
|
||||
|
||||
cmd = lambda a: (
|
||||
f'from ownership import OWNERSHIP;'
|
||||
f'c,w,path=OWNERSHIP.search({a!r});'
|
||||
f'RV.write(repr([c, w.name, path]))')
|
||||
|
||||
def cache_check(a, from_cache):
|
||||
l = sim_exec(cmd(a), timeout=None)
|
||||
assert 'Traceback' not in l, l
|
||||
assert eval(l)[0] == from_cache
|
||||
|
||||
use_testnet()
|
||||
goto_home()
|
||||
wipe_cache()
|
||||
settings_set('accts', [])
|
||||
settings_set('aei', True)
|
||||
|
||||
goto_address_explorer()
|
||||
set_addr_exp_start_idx(7) # starting from index 7
|
||||
pick_menu_item("Segwit P2WPKH")
|
||||
need_keypress("1") # save to SD
|
||||
|
||||
time.sleep(.1)
|
||||
title, body = cap_story()
|
||||
contents, sig_addr, _ = load_export_and_verify_signature(body, "sd", label="Address summary")
|
||||
addr_dump = io.StringIO(contents)
|
||||
cc = csv.reader(addr_dump)
|
||||
hdr = next(cc)
|
||||
assert hdr == ['Index', 'Payment Address', 'Derivation']
|
||||
addrs = {}
|
||||
for idx, addr, deriv in cc:
|
||||
addrs[int(idx)] = addr
|
||||
|
||||
# nothing was created from above as start index was 7
|
||||
cache_check(addrs[7], False)
|
||||
# now we have cached addresses up to 27
|
||||
for i in range(8, 28):
|
||||
cache_check(addrs[i], True)
|
||||
|
||||
# cache file position at 27 (aka count)
|
||||
goto_address_explorer()
|
||||
set_addr_exp_start_idx(1) # starting from index 1
|
||||
pick_menu_item("Segwit P2WPKH")
|
||||
need_keypress("1") # save to SD
|
||||
|
||||
time.sleep(.1)
|
||||
title, body = cap_story()
|
||||
load_export_and_verify_signature(body, "sd", label="Address summary")
|
||||
|
||||
# after above we must have first 250 addresses cached
|
||||
cache_check(addrs[249], True)
|
||||
|
||||
# cache file position at 250 (aka count)
|
||||
goto_address_explorer()
|
||||
set_addr_exp_start_idx(250) # starting from index 250
|
||||
pick_menu_item("Segwit P2WPKH")
|
||||
need_keypress("1") # save to SD
|
||||
|
||||
time.sleep(.1)
|
||||
title, body = cap_story()
|
||||
contents, sig_addr, _ = load_export_and_verify_signature(body, "sd", label="Address summary")
|
||||
addr_dump = io.StringIO(contents)
|
||||
cc = csv.reader(addr_dump)
|
||||
hdr = next(cc)
|
||||
assert hdr == ['Index', 'Payment Address', 'Derivation']
|
||||
addrs = {}
|
||||
for idx, addr, deriv in cc:
|
||||
addrs[int(idx)] = addr
|
||||
|
||||
# after above we must have first 500 addresses cached
|
||||
cache_check(addrs[300], True)
|
||||
cache_check(addrs[400], True)
|
||||
cache_check(addrs[499], True)
|
||||
|
||||
# now addresses that we already have, does nothing
|
||||
goto_address_explorer()
|
||||
set_addr_exp_start_idx(100) # starting from index 100
|
||||
pick_menu_item("Segwit P2WPKH")
|
||||
need_keypress("1") # save to SD
|
||||
|
||||
time.sleep(.1)
|
||||
title, body = cap_story()
|
||||
load_export_and_verify_signature(body, "sd", label="Address summary")
|
||||
cache_check(addrs[499], True)
|
||||
|
||||
# now move count up via ownership
|
||||
mk = BIP32Node.from_wallet_key(simulator_fixed_tprv)
|
||||
sk = mk.subkey_for_path("84h/1h/0h/0/580")
|
||||
addr = bech32_encode('tb', 0, sk.hash160())
|
||||
cache_check(addr, False)
|
||||
# now count at 600 (580+20)
|
||||
|
||||
# now over the max but with some we already have
|
||||
goto_address_explorer()
|
||||
set_addr_exp_start_idx(550) # starting from index 550 (would go up to 800)
|
||||
pick_menu_item("Segwit P2WPKH")
|
||||
need_keypress("1") # save to SD
|
||||
|
||||
time.sleep(.1)
|
||||
title, body = cap_story()
|
||||
contents, sig_addr, _ = load_export_and_verify_signature(body, "sd", label="Address summary")
|
||||
addr_dump = io.StringIO(contents)
|
||||
cc = csv.reader(addr_dump)
|
||||
hdr = next(cc)
|
||||
assert hdr == ['Index', 'Payment Address', 'Derivation']
|
||||
addrs = {}
|
||||
for idx, addr, deriv in cc:
|
||||
addrs[int(idx)] = addr
|
||||
|
||||
assert 799 in addrs
|
||||
cache_check(addrs[763], True) # max
|
||||
|
||||
# start idx over max stored addresses
|
||||
goto_address_explorer()
|
||||
set_addr_exp_start_idx(764) # starting from index 764
|
||||
pick_menu_item("Segwit P2WPKH")
|
||||
need_keypress("1") # save to SD
|
||||
|
||||
time.sleep(.1)
|
||||
title, body = cap_story()
|
||||
load_export_and_verify_signature(body, "sd", label="Address summary")
|
||||
# does notthing harmful, nothing added
|
||||
|
||||
cache_check(addrs[763], True) # max
|
||||
l = sim_exec(cmd(addrs[764]), timeout=None)
|
||||
assert 'Traceback' in l
|
||||
assert 'Searched 1528' in l # max
|
||||
assert "1 wallet(s)" in l
|
||||
assert 'without finding a match' in l
|
||||
|
||||
|
||||
def test_regtest_addr_on_mainnet(goto_home, is_q1, pick_menu_item, scan_a_qr, nfc_write, cap_story,
|
||||
need_keypress, load_shared_mod, use_mainnet, src_root_dir, sim_root_dir):
|
||||
@ -360,4 +508,67 @@ def test_regtest_addr_on_mainnet(goto_home, is_q1, pick_menu_item, scan_a_qr, nf
|
||||
assert title == 'Unknown Address'
|
||||
assert "not valid on Bitcoin Mainnet" in story
|
||||
|
||||
|
||||
def test_20_more_build_after_match(sim_exec, import_ms_wallet, clear_ms, wipe_cache, settings_set):
|
||||
from test_multisig import make_ms_address, HARD
|
||||
|
||||
cmd = lambda a: (
|
||||
f'from ownership import OWNERSHIP;'
|
||||
f'c,w,path=OWNERSHIP.search({a!r});'
|
||||
f'RV.write(repr([c, w.name, path]))')
|
||||
|
||||
# create multisig wallet
|
||||
M, N = 2, 3
|
||||
expect_name = 'test20more'
|
||||
clear_ms()
|
||||
keys = import_ms_wallet(M, N, name=expect_name, accept=True, addr_fmt="p2wsh")
|
||||
|
||||
make_a = lambda index: make_ms_address(
|
||||
M, keys,
|
||||
is_change=False, idx=index, addr_fmt=AF_P2WSH, testnet=True,
|
||||
path_mapper=lambda cosigner: [HARD(45), 0, index])
|
||||
|
||||
def cache_check(index, from_cache):
|
||||
a = make_a(index)[0]
|
||||
l = sim_exec(cmd(a), timeout=None)
|
||||
assert 'Traceback' not in l, l
|
||||
assert eval(l)[0] == from_cache
|
||||
|
||||
# clean slate
|
||||
wipe_cache()
|
||||
settings_set('accts', [])
|
||||
|
||||
# generate 10th (idx=9) address (external)
|
||||
# first run, generated first 10 addresses + 20
|
||||
cache_check(9, False)
|
||||
|
||||
# now we can go up to index 29 - all must come from cache
|
||||
for i in range(10, 30):
|
||||
cache_check(i, True)
|
||||
|
||||
# idx 30 - not in cache
|
||||
# but will cache next 20 addrs
|
||||
cache_check(30, False)
|
||||
|
||||
# now we can go up to index 51 - all must come from cache
|
||||
for i in range(31, 51):
|
||||
cache_check(i, True)
|
||||
|
||||
# idx 51 - not in cache
|
||||
# but will cache next 20 addrs
|
||||
cache_check(51, False)
|
||||
|
||||
cache_check(760, False)
|
||||
cache_check(761, True)
|
||||
cache_check(762, True)
|
||||
cache_check(763, True)
|
||||
|
||||
# after max - not gonna find
|
||||
addr = make_a(764)[0]
|
||||
l = sim_exec(cmd(addr), timeout=None)
|
||||
assert 'Traceback' in l
|
||||
assert 'Searched 1528' in l # max
|
||||
assert "1 wallet(s)" in l
|
||||
assert 'without finding a match' in l
|
||||
|
||||
# EOF
|
||||
|
||||
Loading…
Reference in New Issue
Block a user