diff --git a/docs/limitations.md b/docs/limitations.md index 7ecdf33c..b3a2c790 100644 --- a/docs/limitations.md +++ b/docs/limitations.md @@ -181,3 +181,9 @@ We will summarize transaction outputs as "change" back into same wallet, however - Note text is unlimited, but storing very large notes may affect performance system-wide. - Q Backup files restored onto Mk4 will lose their notes, since notes feature is not supported. +# Address Ownership (derivation from payment address) + +- only the first 1000 addresses, non-change from each "wallet" are considered +- if you have used an sub account, without ever exploring it in the Address Explorer, nor + signing a PSBT with those account numbers, we do not search it. + diff --git a/shared/address_explorer.py b/shared/address_explorer.py index 530aae7c..68d2a9f2 100644 --- a/shared/address_explorer.py +++ b/shared/address_explorer.py @@ -376,6 +376,8 @@ Press (3) if you really understand and accept these risks. def generate_address_csv(path, addr_fmt, ms_wallet, account_num, n, start=0, change=0): # Produce CSV file contents as a generator + # - maybe cache internally + from ownership import OWNERSHIP if ms_wallet: # For multisig, include redeem script and derivation for each signer @@ -383,23 +385,45 @@ def generate_address_csv(path, addr_fmt, ms_wallet, account_num, n, start=0, cha 'Redeem Script (%d of %d)' % (ms_wallet.M, ms_wallet.N)] + (['Derivation'] * ms_wallet.N)) + '"\n' + if n > 100 and change == 0: + saver = OWNERSHIP.saver(ms_wallet, start) + else: + saver = None + for (idx, addr, derivs, script) in ms_wallet.yield_addresses(start, n, change_idx=change): + if saver: + saver(addr) + ln = '%d,"%s","%s","' % (idx, addr, b2a_hex(script).decode()) ln += '","'.join(derivs) ln += '"\n' yield ln + if saver: + saver(None) # close + return # build the "master" wallet based on indicated preferences from wallet import MasterSingleSigWallet main = MasterSingleSigWallet(addr_fmt, path, account_num) + if n > 100 and change == 0: + saver = OWNERSHIP.saver(main, start) + else: + saver = None + yield '"Index","Payment Address","Derivation"\n' for (idx, addr, deriv) in main.yield_addresses(start, n, change_idx=change): + if saver: + saver(addr) + yield '%d,"%s","%s"\n' % (idx, addr, deriv) + if saver: + saver(None) # close + async def make_address_summary_file(path, addr_fmt, ms_wallet, account_num, count=250, change=0, **save_opts): diff --git a/shared/chains.py b/shared/chains.py index 62d7fc38..7f98d7e5 100644 --- a/shared/chains.py +++ b/shared/chains.py @@ -5,7 +5,8 @@ import ngu from uhashlib import sha256 from ubinascii import hexlify as b2a_hex -from public_constants import AF_CLASSIC, AF_P2SH, AF_P2WPKH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH +from public_constants import AF_CLASSIC, AF_P2WPKH, AF_P2TR +from public_constants import AF_P2SH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH from public_constants import AFC_PUBKEY, AFC_SEGWIT, AFC_BECH32, AFC_SCRIPT from serializations import hash160, ser_compact_size, disassemble from ucollections import namedtuple @@ -256,6 +257,31 @@ class ChainsBase: return data_hex, data_ascii return None + @classmethod + def possible_address_fmt(cls, addr): + # Given a text (serialized) address, return what + # address format applies to the address, but + # for AF_P2SH case, could be AF_P2WPKH_P2SH, AF_P2WSH_P2SH. + if addr.startswith(cls.bech32_hrp): + if addr.startswith(cls.bech32_hrp+'1p'): + return AF_P2TR + else: + return AF_P2WPKH if len(addr) < 55 else AF_P2WSH + + try: + raw = ngu.codecs.b58_decode(addr) + except ValueError: + # not base58, not an error + return 0 + + if raw[0] == cls.b58_addr[0]: + return AF_CLASSIC + if raw[0] == cls.b58_script[0]: + return AF_P2SH + + return 0 + + class BitcoinMain(ChainsBase): # see ctype = 'BTC' diff --git a/shared/display.py b/shared/display.py index b7bfb914..6870f0d3 100644 --- a/shared/display.py +++ b/shared/display.py @@ -147,7 +147,7 @@ class Display: self.text(-2, 28, 'E', font=FontTiny, invert=1) self.text(-2, 35, 'V', font=FontTiny, invert=1) - def fullscreen(self, msg, percent=None): + def fullscreen(self, msg, percent=None, line2=None): # show a simple message "fullscreen". self.clear() y = 14 diff --git a/shared/exceptions.py b/shared/exceptions.py index 7be7c517..57abb83a 100644 --- a/shared/exceptions.py +++ b/shared/exceptions.py @@ -47,4 +47,7 @@ class AbortInteraction(BaseException): class QRDecodeExplained(ValueError): pass +class UnknownAddressExplained(ValueError): + pass + # EOF diff --git a/shared/lcd_display.py b/shared/lcd_display.py index 4712dfce..6a3b291a 100644 --- a/shared/lcd_display.py +++ b/shared/lcd_display.py @@ -479,10 +479,13 @@ class Display: self.dis.fill_rect(WIDTH-bw, TOP_MARGIN, bw, ACTIVE_H, COL_SCROLL_DARK) self.dis.fill_rect(WIDTH-bw, TOP_MARGIN+pos, bw, bh, COL_TEXT) - def fullscreen(self, msg, percent=None): + def fullscreen(self, msg, percent=None, line2=None): # show a simple message "fullscreen". self.clear() - self.text(None, CHARS_H // 3, msg) + y = CHARS_H // 3 + self.text(None, y, msg) + if line2: + self.text(None, y+2, line2) if percent is not None: self.progress_bar(percent) self.show() diff --git a/shared/multisig.py b/shared/multisig.py index 2b592c9c..8842e5fc 100644 --- a/shared/multisig.py +++ b/shared/multisig.py @@ -151,6 +151,15 @@ class MultisigWallet(WalletABC): return v.upper() return '?' + def render_path(self, change_idx, idx): + # assuming shared derivations for all cosigners. Wrongish. + derivs, _ = self.get_deriv_paths() + if len(derivs) > 1: + deriv = '(various)' + else: + deriv = derivs[0] + return deriv + '/%d/%d' % (change_idx, idx) + @property def chain(self): return chains.get_chain(self.chain_type) diff --git a/shared/ownership.py b/shared/ownership.py index 00efb9dc..2a102f45 100644 --- a/shared/ownership.py +++ b/shared/ownership.py @@ -2,14 +2,14 @@ # # ownership.py - store a cache of hashes related to addresses we might control. # -import gc, chains, stash, ngu +import sys, chains, stash, ngu, struct from uhashlib import sha256 -from ustruct import pack, unpack from ubinascii import b2a_base64, a2b_base64 from glob import settings from ucollections import namedtuple from wallet import WalletABC from ubinascii import hexlify as b2a_hex +from exceptions import UnknownAddressExplained # Track many addresses, but in compressed form # - map from random Bech32/Base58 payment address to (wallet)/keypath @@ -28,32 +28,261 @@ from ubinascii import hexlify as b2a_hex # - data building/saves happens when are searching, but might grab some during addr expl export? # -REL_GAP_LIMIT = const(1000) +MAX_ADDRS_STORED = const(500) +BONUS_GAP_LIMIT = const(20) -OwnershipFileHdr = namedtuple('OwnershipFileHdr', 'file_magic offset') -OWNERSHIP_FILE_HDR = 'II' -FILE_HDR_LEN = const(8) -OWNERSHIP_MAGIC = 0xA010 # "Address Ownership" v1.0 +OwnershipFileHdr = namedtuple('OwnershipFileHdr', 'file_magic flags offset') +OWNERSHIP_FILE_HDR = 'III' # length of hashed & truncated address record -HASH_ENC_LEN = const(8) +HASH_ENC_LEN = const(2) + +# We may store only the 0/0 ..0/n paths, or alternating 0/0, 1/0, 0/1, +FLAG_DUAL = 0x01 +OWNERSHIP_MAGIC = 0xA010 # "Address Ownership" v1.0 + +def encode_addr(addr, salt): + # Convert text address to something we can store while preserving privacy. + return ngu.hash.sha256s(salt + addr)[0:HASH_ENC_LEN] + +class AddressCacheFile: + + def __init__(self, wallet): + self.wallet = wallet + self.desc = wallet.to_descriptor().serialize() + h = b2a_hex(ngu.hash.sha256d(self.desc)) + self.fname = h[0:32] + '.own' + self.salt = h[32:] + self.count = 0 + self.hdr = None + + self.peek() + + def exists(self): + return bool(self.count) + + def peek(self): + # see what we have on-disk; just reads header. + hdr_len = struct.calcsize(OWNERSHIP_FILE_HDR) + + try: + with open(self.fname, 'rb') as fd: + hdr = fd.read(hdr_len) + assert len(hdr) == hdr_len + flen = fd.seek(0, 2) + self.hdr = OwnershipFileHdr(*struct.unpack(OWNERSHIP_FILE_HDR, hdr)) + assert self.hdr.file_magic == OWNERSHIP_MAGIC + except OSError: + return + except Exception as exc: + sys.print_exception(exc) + self.count = 0 + self.hdr = None + return + + each = (2 if (self.hdr.flags & FLAG_DUAL) else 1) * HASH_ENC_LEN + self.count = (flen - hdr_len) // each + + def setup(self, start_idx, incl_change=False): + flags = (0x0 if not incl_change else FLAG_DUAL) + if self.count or self.hdr: + assert start_idx == self.hdr.offset + self.count, 'not an append' + assert self.hdr.flags == flags, 'mode wrong' + + # Open for append, header should be right already + self.fd = open(self.fname, 'ab') + else: + # Start new file + self.fd = open(self.fname, 'wb') + self.hdr = OwnershipFileHdr(OWNERSHIP_MAGIC, flags, start_idx) + hdr = struct.pack(OWNERSHIP_FILE_HDR, *self.hdr) + self.fd.write(hdr) + + def append(self, addr, change_addr=None): + if addr is None: + # close file, done + self.fd.close() + del self.fd + return + + self.fd.write(encode_addr(addr, self.salt)) + if change_addr: + assert self.hdr.flags & FLAG_DUAL + self.fd.write(encode_addr(change_addr)) + + def fast_search(self, addr): + # Do the easy part of the searching, using the existing file's contents. + # - generates candidate path subcomponents; might be false positive + from glob import dis + + if not self.hdr or not self.count: + return + + chk = encode_addr(addr, self.salt) + is_dual = (self.hdr.flags & FLAG_DUAL) + + idx = self.hdr.offset + with open(self.fname, 'rb') as fd: + fd.seek(struct.calcsize(OWNERSHIP_FILE_HDR)) + buf = bytearray(HASH_ENC_LEN) + while 1: + if fd.readinto(buf) != HASH_ENC_LEN: + break + if chk == buf: + yield (0, idx) + + if is_dual: + if fd.readinto(buf) != HASH_ENC_LEN: + break + if chk == buf: + yield (1, idx) + + idx += 1 + dis.progress_sofar(idx-self.hdr.offset, self.count) + + def check_match(self, want_addr, subpath): + # need to double-check matches, to get rid of false positives. + chg, idx = subpath + got = self.wallet.render_address(*subpath) + return want_addr == got + + def rebuild(self, addr): + # build more addresses + # - maybe wipe incomplete stuff from csv export hack + # - return subpath for a hit or None + from glob import dis + + bonus = 0 + match = None + + start_idx = self.count + (self.hdr.offset if self.hdr else 0) + count = MAX_ADDRS_STORED - start_idx + + self.setup(start_idx, incl_change=False) + + for idx,here,*_ in self.wallet.yield_addresses( + start_idx, count, change_idx=0, censored=False): + + if here == addr: + # Found it! But keep going a little for next time. + match = (0, idx) + + self.append(addr) + self.count += 1 + if match: + bonus += 1 + + if match and bonus > BONUS_GAP_LIMIT: + self.append(None) + return match + + dis.progress_sofar(idx-start_idx, count) + + self.append(None) + + return None class OwnershipCache: - def wallet_to_fname(self, wallet: WalletABC): - # hash up something about the wallet to form a filename - desc = wallet.to_descriptor().serialize() - h = ngu.hash.sha256d(desc) - return b2a_hex(h)[0:32] + '.own' + @classmethod + def saver(cls, wallet, start_idx): + # when we are generating many addresses for export, capture them + # as we go with this function + # - not change -- only main addrs + file = AddressCacheFile(wallet) - def register(self, wallet:WalletABC): - # notes the details of a new wallet - # - won't build anything, so still fast - fn = self.wallet_to_fname(wallet) + if file.exists(): + # don't save to existing file, has some + return None - def note_subkey(self, xfp, path, pubkey): - # whenever we see an in or out that is ours, note it here - pass + try: + file.setup(start_idx) + except Exception as exc: + # in some cases we don't want to save anything, not an error + sys.print_exception(exc) + return None + + return file.append + + @classmethod + def search(cls, addr): + # find it! + # - ignoring P2WPKH_P2SH ... for single-sig must be P2WPKH or CLASSIC + # - anything script-like needs to match an existing multisig wallet + # - if you start w/ testnet, we'll follow that + from chains import current_chain + from multisig import MultisigWallet + from public_constants import AFC_SCRIPT + from glob import dis + + ch = current_chain() + + addr_fmt = ch.possible_address_fmt(addr) + if not addr_fmt: + # might be valid address on testnet vs mainnet + nm = ch.name if ch.ctype != 'BTC' else 'Bitcoin Mainnet' + raise UnknownAddressExplained('That address is not valid on ' + nm) + + possibles = [] + + if addr_fmt & AFC_SCRIPT: + # multisig or script at least.. must exist already + for w in MultisigWallet.iter_wallets(addr_fmt=addr_fmt): + possibles.append(w) + + # TODO: add tapscript and such fancy stuff here + + if not possibles: + raise UnknownAddressExplained( + "No suitable multisig wallets are currently defined.") + else: + # construct possible single-signer wallets, always at least account=0 case + from wallet import MasterSingleSigWallet + w = MasterSingleSigWallet(addr_fmt, account_idx=0) + possibles.append(w) + + # TODO: add all account idx they have ever looked at + + prompt = 'Searching wallet(s)...' + + # "quick" check first + + # TODO: search all in parallel, rather than serially because + # more likely to find a match with low index + + count = 0 + phase2 = [] + files = [AddressCacheFile(w) for w in possibles] + for f in files: + dis.fullscreen(prompt, line2=f.wallet.name) + + for maybe in f.fast_search(addr): + ok = f.check_match(addr, maybe) + if not ok: continue + + # winner. + return f.wallet, maybe + + if f.count < MAX_ADDRS_STORED: + phase2.append(f) + count += f.count + + # maybe we haven't rendered all the addresses yet, so do that + # - very slow, but only needed once + # - might stop when match found, or maybe go a bit beyond that? + for f in phase2: + b4 = f.count + dis.fullscreen("Generating addresses...", line2=f.wallet.name) + + result = f.rebuild(addr) + if result: + # found it, so report it and stop + return f.wallet, result + + count += f.count - b4 + + raise UnknownAddressExplained('Searched %d candidates without finding a match.' % count) + # singleton OWNERSHIP = OwnershipCache() diff --git a/shared/stash.py b/shared/stash.py index 71fa0372..12cf56a4 100644 --- a/shared/stash.py +++ b/shared/stash.py @@ -98,7 +98,7 @@ class SecretStash: hd.from_chaincode_privkey(ch, pk) return 'xprv', ch+pk, hd - if marker & 0x80: + elif marker & 0x80: # seed phrase ll = ((marker & 0x3) + 2) * 8 diff --git a/shared/ux_q1.py b/shared/ux_q1.py index 4afa2b4e..8463af9b 100644 --- a/shared/ux_q1.py +++ b/shared/ux_q1.py @@ -1030,7 +1030,7 @@ async def ux_visualize_txn(bin_txn): async def ux_visualize_bip21(proto, addr, args): # Show details of BIP-21 URL # - imho, a bare address is a valid BIP-21 URL so we come here too - # - TODO: validate address ownership + # - validate address ownership on request from ux import ux_show_story msg = addr + '\n\n' @@ -1054,8 +1054,27 @@ async def ux_visualize_bip21(proto, addr, args): if args: msg += 'And values for: ' + ', '.join(args) + + msg += 'Press (1) to verify ownership.' - await ux_show_story(msg, title="Payment Address") + ch = await ux_show_story(msg, title="Payment Address", escape='1') + + if ch != '1': return + + from ownership import OWNERSHIP + from exceptions import UnknownAddressExplained + + try: + wallet, subpath = OWNERSHIP.search(addr) + + msg = addr + msg += '\n\nFound in wallet:\n ' + wallet.name + msg += '\nDerivation path:\n ' + wallet.render_path(*subpath) + await ux_show_story(msg, title="Your Own Address") + + except UnknownAddressExplained as exc: + await ux_show_story(addr + '\n\n' + str(exc), title="Unknown Address") + async def ux_visualize_wif(wif_str, kp, compressed, testnet): from ux import ux_show_story diff --git a/shared/wallet.py b/shared/wallet.py index b966881a..6dba09fc 100644 --- a/shared/wallet.py +++ b/shared/wallet.py @@ -16,9 +16,18 @@ class WalletABC: # chain def yield_addresses(self, start_idx, count, change_idx=0, censored=True): - # TODO: expected tuples? + # TODO: returns various expected tuples? pass + def render_address(self, change_idx, idx): + # make one single address + tmp = list(self.yield_addresses(idx, 1, change_idx=change_idx, censored=False)) + + assert len(tmp) == 1 + assert tmp[0][0] == idx + + return tmp[0][1] + def to_descriptor(self): pass @@ -93,6 +102,16 @@ class MasterSingleSigWallet(WalletABC): yield idx, address, path+str(idx) + def render_address(self, change_idx, idx): + # Optimized for single address. + path = self._path + '/%d/%d' % (change_idx, idx) + with SensitiveValues() as sv: + node = sv.derive_path(path) + return self.chain.address(node, self.addr_fmt) + + def render_path(self, change_idx, idx): + return self._path + '/%d/%d' % (change_idx, idx) + def to_descriptor(self): from glob import settings xfp = settings.get('xfp') diff --git a/stm32/COLDCARD_Q1/file_time.c b/stm32/COLDCARD_Q1/file_time.c index 1d14d337..de8f561f 100644 --- a/stm32/COLDCARD_Q1/file_time.c +++ b/stm32/COLDCARD_Q1/file_time.c @@ -2,12 +2,12 @@ // // AUTO-generated. // -// built: 2024-03-13 -// version: 1.0.1Q +// built: 2024-03-21 +// version: 1.0.2Q // #include // this overrides ports/stm32/fatfs_port.c uint32_t get_fattime(void) { - return 0x586d0800UL; + return 0x58750800UL; } diff --git a/testing/constants.py b/testing/constants.py index 3f0b5400..ca319d02 100644 --- a/testing/constants.py +++ b/testing/constants.py @@ -17,7 +17,8 @@ simulator_fixed_xfp = 0x4369050f simulator_serial_number = 'F1F1F1F1F1F1' -from ckcc_protocol.constants import AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH, AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH +from ckcc_protocol.constants import AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH, AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH, AF_P2TR +from ckcc_protocol.constants import AFC_WRAPPED, AFC_PUBKEY, AFC_SEGWIT, AFC_BECH32M, AFC_SCRIPT unmap_addr_fmt = { 'p2sh': AF_P2SH, diff --git a/testing/test_unit.py b/testing/test_unit.py index 8dacfdb4..e3db8385 100644 --- a/testing/test_unit.py +++ b/testing/test_unit.py @@ -295,4 +295,57 @@ def test_word_wrap(txt, x_line2, sim_exec, only_q1, width=34): assert want_words == got_words + +from constants import AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH, AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH + +@pytest.mark.parametrize('addr,net,fmt', [ + ( 'bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3t4', 'BTC', AF_P2WPKH ), +]) +def test_addr_detect(addr, net, fmt, sim_exec): + cmd = f'from chains import AllChains; RV.write(repr([(ch.ctype, ch.possible_address_fmt({addr!r})) for ch in AllChains]))' + print(cmd) + lst = sim_exec(cmd) + assert 'Error' not in lst + + for got_net, match in eval(lst): + if match: + assert net == got_net + assert match == fmt + else: + assert net != got_net + assert match == 0 + +''' + >>> [AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH, AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH] + [14, 8, 26, 1, 7, 19] +''' +@pytest.mark.parametrize('addr_fmt', [ + AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH, AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH +]) +@pytest.mark.parametrize('testnet', [ False, True] ) +def test_addr_fake_detect(addr_fmt, testnet, sim_exec): + from txn import fake_address + + addr = fake_address(addr_fmt, testnet) + + cmd = f'from chains import AllChains; RV.write(repr([(ch.ctype, ch.possible_address_fmt({addr!r})) for ch in AllChains]))' + lst = sim_exec(cmd) + assert 'Error' not in lst + #print(lst) + + expect_net = ('BTC' if not testnet else 'XTN') + + expect_addr_fmt = addr_fmt if addr_fmt not in { AF_P2WSH_P2SH, AF_P2WPKH_P2SH } else AF_P2SH + + for got_net, match in eval(lst): + if match: + if got_net == 'XRT': + assert expect_net == 'XTN' + else: + assert got_net == expect_net + assert match == expect_addr_fmt + else: + assert got_net != expect_net + assert match == 0 + # EOF diff --git a/testing/txn.py b/testing/txn.py index 0037b0b1..278de3fd 100644 --- a/testing/txn.py +++ b/testing/txn.py @@ -267,4 +267,32 @@ def render_address(script, testnet=True): raise ValueError('Unknown payment script', repr(script)) +def fake_address(addr_fmt, testnet=False): + # make fake addresses of any time. contents are noise + # TODO add regtest option + from constants import AFC_WRAPPED, AFC_PUBKEY, AFC_SEGWIT, AFC_BECH32M, AFC_SCRIPT + from helpers import prandom + from pycoin.encoding import b2a_hashed_base58 + from bech32 import encode as bech32_encode + + is_script = bool(addr_fmt & (AFC_SCRIPT | AFC_WRAPPED)) + body = prandom(32 if is_script else 20) + + if not testnet: + bech32_hrp = 'bc' + b58_addr = bytes([0]) + b58_script = bytes([5]) + else: + bech32_hrp = 'tb' + b58_addr = bytes([111]) + b58_script = bytes([196]) + + if (addr_fmt & AFC_SEGWIT) and not (addr_fmt & AFC_WRAPPED): + # bech32 + vers = 1 if (addr_fmt & AFC_BECH32M) == AFC_BECH32M else 0 + return bech32_encode(bech32_hrp, vers, body) + else: + # base58 + return b2a_hashed_base58((b58_script if is_script else b58_addr) + body[0:20]) + # EOF