462 lines
17 KiB
Python
462 lines
17 KiB
Python
# (c) Copyright 2024 by Coinkite Inc. This file is covered by license found in COPYING-CC.
|
|
#
|
|
# ownership.py - store a cache of hashes related to addresses we might control.
|
|
#
|
|
import os, chains, ngu, struct, version
|
|
from glob import settings
|
|
from ucollections import namedtuple
|
|
from ubinascii import hexlify as b2a_hex
|
|
from ubinascii import unhexlify as a2b_hex
|
|
from exceptions import UnknownAddressExplained
|
|
from utils import problem_file_line, show_single_address, validate_own_address
|
|
from public_constants import AFC_SCRIPT, AF_P2WPKH_P2SH, AF_P2SH, AF_P2WSH_P2SH, AF_P2TR, AF_P2WSH
|
|
|
|
# Track many addresses, but in compressed form
|
|
# - map from random Bech32/Base58 payment address to (wallet) + keypath
|
|
# - won't consider any keypath that does not end in <0;1>/*
|
|
# - store only hints, since we can re-construct any address and want to fully verify
|
|
# - try to keep private between different duress wallets, and seed vaults
|
|
# - storing bulk data into LFS, not settings
|
|
# - okay to wipe, can restore anytime; with CPU cost
|
|
# - doesn't really have a gap limit concept, but limited to first N addresses in a wallet
|
|
# - cannot be used to accelerate address explorer because we don't store full addresses
|
|
# - data stored in binary, fixed-length header, then fixed-length records
|
|
# - multisig and single sig, and someday taproot, miniscript too
|
|
# - searching leaves behind a cache for next time
|
|
# - data building/saves happens when are searching, but might grab some during addr expl export?
|
|
# - performance: 1m40s for one P2PKH wallet (change, and external addresses: 1528 in all)
|
|
#
|
|
|
|
# length of hashed & truncated address record
|
|
HASH_ENC_LEN = const(2)
|
|
|
|
# File header
|
|
OwnershipFileHdr = namedtuple('OwnershipFileHdr', 'file_magic change_idx flags')
|
|
OWNERSHIP_FILE_HDR = 'HHI'
|
|
OWNERSHIP_FILE_HDR_LEN = 8
|
|
|
|
OWNERSHIP_MAGIC = 0x10A0 # "Address Ownership" v1.0
|
|
# flags: none yet, but 32 bits reserved
|
|
|
|
# target 3 flash blocks, max file size => 764 addresses
|
|
MAX_ADDRS_STORED = const(764) # =((3*512) - OWNERSHIP_FILE_HDR_LEN) // HASH_ENC_LEN
|
|
BONUS_AFTER_MATCH = const(20) # number of addresses to still generate after match found
|
|
|
|
def encode_addr(addr, salt):
|
|
# Convert text address to something we can store while preserving privacy.
|
|
return ngu.hash.sha256s(salt + addr)[0:HASH_ENC_LEN]
|
|
|
|
class AddressCacheFile:
|
|
|
|
def __init__(self, wallet, change_idx):
|
|
self.wallet = wallet
|
|
self.change_idx = change_idx
|
|
desc = wallet.to_descriptor().serialize()
|
|
h = b2a_hex(ngu.hash.sha256d(wallet.chain.ctype + desc))
|
|
self.fname = h[0:32] + '-%d.own' % change_idx
|
|
self.salt = h[32:]
|
|
self.count = 0
|
|
self.hdr = None
|
|
self.fd = None
|
|
|
|
self.peek()
|
|
|
|
def nice_name(self):
|
|
rv = self.wallet.name
|
|
if self.change_idx:
|
|
rv += ' (change)'
|
|
return rv
|
|
|
|
def peek(self):
|
|
# see what we have on-disk; just reads header.
|
|
try:
|
|
with open(self.fname, 'rb') as fd:
|
|
hdr = fd.read(OWNERSHIP_FILE_HDR_LEN)
|
|
assert len(hdr) == OWNERSHIP_FILE_HDR_LEN
|
|
flen = fd.seek(0, 2)
|
|
self.hdr = OwnershipFileHdr(*struct.unpack(OWNERSHIP_FILE_HDR, hdr))
|
|
assert self.hdr.file_magic == OWNERSHIP_MAGIC
|
|
assert self.hdr.change_idx == self.change_idx
|
|
except OSError:
|
|
return
|
|
except Exception as exc:
|
|
# sys.print_exception(exc)
|
|
self.count = 0
|
|
self.hdr = None
|
|
return
|
|
|
|
self.count = (flen - OWNERSHIP_FILE_HDR_LEN) // HASH_ENC_LEN
|
|
|
|
def setup(self, change_idx, start_idx):
|
|
assert self.change_idx == change_idx
|
|
|
|
if self.count or self.hdr:
|
|
assert start_idx == self.count, 'not an append'
|
|
|
|
# Open for append, header should be right already
|
|
self.fd = open(self.fname, 'ab')
|
|
else:
|
|
# Start new file
|
|
assert start_idx == 0
|
|
self.fd = open(self.fname, 'wb')
|
|
self.hdr = OwnershipFileHdr(OWNERSHIP_MAGIC, self.change_idx, 0x0)
|
|
hdr = struct.pack(OWNERSHIP_FILE_HDR, *self.hdr)
|
|
self.fd.write(hdr)
|
|
|
|
def append(self, addr):
|
|
self.fd.write(encode_addr(addr, self.salt))
|
|
|
|
def close(self):
|
|
# close file, done
|
|
if self.fd is not None:
|
|
self.fd.close()
|
|
self.fd = None
|
|
|
|
def fast_search(self, addr):
|
|
# Do the easy part of the searching, using the existing file's contents.
|
|
# - generates candidate path subcomponents; might be false positive
|
|
# - working in-memory, since complete file isn't very large, and speed
|
|
from glob import dis
|
|
|
|
if not self.hdr or not self.count:
|
|
# cache empty
|
|
return
|
|
|
|
with open(self.fname, 'rb') as fd:
|
|
fd.seek(OWNERSHIP_FILE_HDR_LEN)
|
|
buf = fd.read(self.count * HASH_ENC_LEN)
|
|
|
|
assert len(buf) == (self.count * HASH_ENC_LEN)
|
|
|
|
chk = encode_addr(addr, self.salt)
|
|
for idx in range(self.count):
|
|
if buf[idx*HASH_ENC_LEN : (idx*HASH_ENC_LEN)+HASH_ENC_LEN] == chk:
|
|
yield self.change_idx, idx
|
|
|
|
dis.progress_sofar(idx, self.count)
|
|
|
|
def check_match(self, want_addr, subpath):
|
|
# need to double-check matches, to get rid of false positives.
|
|
got = self.wallet.render_address(*subpath)
|
|
# chg, idx = subpath
|
|
#print('(%d, %d) => %s ?= %s' % (chg, idx, got, want_addr))
|
|
return want_addr == got
|
|
|
|
def build_and_search(self, addr):
|
|
# build many more addresses
|
|
# - return subpath for a hit or None
|
|
from glob import dis
|
|
|
|
match = None
|
|
|
|
start_idx = self.count
|
|
count = MAX_ADDRS_STORED - start_idx
|
|
|
|
if count <= 0:
|
|
return match
|
|
|
|
self.setup(self.change_idx, start_idx)
|
|
|
|
bonus = None
|
|
for idx,here,*_ in self.wallet.yield_addresses(start_idx, count,
|
|
change_idx=self.change_idx):
|
|
self.append(here)
|
|
self.count += 1
|
|
|
|
if bonus:
|
|
if bonus >= BONUS_AFTER_MATCH:
|
|
# do (at most) 20 more - limited by 'start_idx' & 'count'
|
|
break
|
|
bonus += 1
|
|
|
|
|
|
if here == addr:
|
|
# match but keep going
|
|
match = (self.change_idx, idx)
|
|
bonus = 1
|
|
|
|
dis.progress_sofar(idx - start_idx, count)
|
|
|
|
self.close()
|
|
return match
|
|
|
|
class OwnershipCache:
|
|
|
|
@classmethod
|
|
def saver(cls, wallet, change_idx, start_idx, count):
|
|
# when we are generating many addresses for export, capture them (if suitable)
|
|
# as we go with this function
|
|
if not count:
|
|
return
|
|
if change_idx not in (0, 1):
|
|
return
|
|
if start_idx >= MAX_ADDRS_STORED:
|
|
return
|
|
|
|
file = AddressCacheFile(wallet, change_idx)
|
|
current_pos = file.count
|
|
|
|
if start_idx > current_pos:
|
|
# nothing to do here, we are missing some addresses in the middle
|
|
return
|
|
if (start_idx + count) <= current_pos:
|
|
# we already have all these addresses
|
|
return
|
|
|
|
file.setup(change_idx, current_pos)
|
|
|
|
def doit(addr, idx):
|
|
if addr is None:
|
|
file.close()
|
|
elif (idx < MAX_ADDRS_STORED) and idx >= current_pos:
|
|
file.append(addr)
|
|
|
|
return doit
|
|
|
|
@classmethod
|
|
def filter(cls, addr_fmt, args):
|
|
# Filter possible candidates!
|
|
# - if you start w/ testnet, we'll follow that
|
|
from multisig import MultisigWallet
|
|
|
|
args = args or {}
|
|
|
|
# user has specified specific (named) wallet
|
|
named_wal = args.get("wallet", None)
|
|
if named_wal:
|
|
# quick search without deserialization
|
|
res = list(MultisigWallet.iter_wallets(name=named_wal))
|
|
if not res:
|
|
raise UnknownAddressExplained("Wallet '%s' not defined." % named_wal)
|
|
|
|
# only return desired named wallet, no other wallets are searched
|
|
return res
|
|
|
|
possibles = []
|
|
if addr_fmt & AFC_SCRIPT:
|
|
# multisig or script at least... must exist already
|
|
afs = [addr_fmt]
|
|
if addr_fmt == AF_P2SH:
|
|
# might look like P2SH but actually be AF_P2WSH_P2SH
|
|
# wrapped segwit is more used than legacy
|
|
afs = [AF_P2WSH_P2SH, AF_P2SH]
|
|
|
|
# Might be single-sig p2wpkh wrapped in p2sh ... but that was a transition
|
|
# thing that hopefully is going away, so if they have any multisig wallets,
|
|
# defined, assume that that's the only p2sh address source.
|
|
addr_fmt = AF_P2WPKH_P2SH
|
|
|
|
possibles.extend(MultisigWallet.iter_wallets(addr_fmts=afs))
|
|
|
|
try:
|
|
# Construct possible single-signer wallets, always at least account=0 case
|
|
from wallet import MasterSingleSigWallet
|
|
w = MasterSingleSigWallet(addr_fmt, account_idx=0)
|
|
possibles.append(w)
|
|
|
|
# add all account idx they have ever looked at, w/ this addr fmt (single sig)
|
|
ex = settings.get('accts', [])
|
|
for af, acct_num in ex:
|
|
if af == addr_fmt and acct_num:
|
|
w = MasterSingleSigWallet(addr_fmt, account_idx=acct_num)
|
|
possibles.append(w)
|
|
except (KeyError, ValueError):
|
|
pass # if not single sig address format
|
|
|
|
if not possibles:
|
|
# can only happen w/ scripts; for single-signer we have things to check
|
|
raise UnknownAddressExplained(
|
|
"No suitable multisig wallets are currently defined.")
|
|
|
|
# ordering here
|
|
return possibles
|
|
|
|
@classmethod
|
|
def search_wallet_cache(cls, addr, cf):
|
|
# - returns wallet object, and tuple2 of final 2 subpath components
|
|
# "quick" check first, before doing any generations
|
|
# external chain first, then internal (change)
|
|
for maybe in cf.fast_search(addr):
|
|
ok = cf.check_match(addr, maybe)
|
|
if ok:
|
|
return cf.wallet, maybe
|
|
return None, None
|
|
|
|
|
|
@classmethod
|
|
def search_build_wallet(cls, addr, cf):
|
|
# maybe we haven't calculated all the addresses yet, so do that
|
|
# - very slow, but only needed once; any negative (failed) search causes this
|
|
# - could stop when match found, but we go a bit beyond that for next time
|
|
# - we could search all in parallel, rather than serially because
|
|
# more likely to find a match with low index... but seen as too much memory
|
|
result = cf.build_and_search(addr)
|
|
if result:
|
|
# found it, so report it and stop
|
|
return cf.wallet, result
|
|
|
|
# possible phase 3: other seedvault... slow, rare and not implemented
|
|
return None, None
|
|
|
|
@classmethod
|
|
def search(cls, addr, args=None):
|
|
from glob import dis
|
|
|
|
dis.fullscreen("Wait...")
|
|
|
|
try:
|
|
addr, addr_fmt = validate_own_address(addr)
|
|
except Exception as e:
|
|
raise UnknownAddressExplained('That address is not valid on ' + e.args[0])
|
|
|
|
matches = OWNERSHIP.filter(addr_fmt, args)
|
|
|
|
# build cache files for both external & internal chain
|
|
cachefs = []
|
|
for w in matches:
|
|
cachefs.append(AddressCacheFile(w, 0))
|
|
cachefs.append(AddressCacheFile(w, 1))
|
|
|
|
for cf in cachefs:
|
|
msg = "Searching wallet(s)..." if dis.has_lcd else "Searching..."
|
|
dis.fullscreen(msg, line2=cf.nice_name())
|
|
wallet, subpath = OWNERSHIP.search_wallet_cache(addr, cf)
|
|
if wallet:
|
|
# first arg from_cache=True
|
|
return True, wallet, subpath
|
|
|
|
# nothing found in existing cache files
|
|
c = 0
|
|
for cf in cachefs:
|
|
msg = "Generating addresses..." if dis.has_lcd else "Generating..."
|
|
dis.fullscreen(msg, line2=cf.nice_name())
|
|
wallet, subpath = OWNERSHIP.search_build_wallet(addr, cf)
|
|
c += cf.count
|
|
if wallet:
|
|
# first arg from_cache=False
|
|
return False, wallet, subpath
|
|
|
|
# nothing found among singlesig & registered multisig wallets
|
|
# check WIF store (single sig only)
|
|
if addr_fmt not in [AF_P2TR, AF_P2WSH]:
|
|
dis.fullscreen("WIF Store...")
|
|
from wif import iter_wif_store_addresses
|
|
target_af = AF_P2WPKH_P2SH if addr_fmt == AF_P2SH else addr_fmt
|
|
for i, store_addr in iter_wif_store_addresses(target_af):
|
|
if store_addr == addr:
|
|
return False, ("wif", target_af), i+1
|
|
|
|
raise UnknownAddressExplained('Searched %d candidate addresses in %d wallet(s)'
|
|
' without finding a match.' % (c, len(matches)))
|
|
|
|
@classmethod
|
|
async def search_ux(cls, addr, args):
|
|
# Provide a simple UX. Called functions do fullscreen, progress bar stuff.
|
|
from ux import ux_show_story, show_qr_code
|
|
from charcodes import KEY_QR
|
|
from multisig import MultisigWallet
|
|
from public_constants import AFC_BECH32, AFC_BECH32M
|
|
|
|
try:
|
|
_, wallet, subpath = cls.search(addr, args)
|
|
is_ms = isinstance(wallet, MultisigWallet)
|
|
msg = show_single_address(addr)
|
|
esc = ""
|
|
if isinstance(wallet, tuple) and (wallet[0] == "wif"):
|
|
msg += '\n\nFound in WIF store at index %d' % subpath
|
|
addr_fmt = wallet[1]
|
|
else:
|
|
sp = wallet.render_path(*subpath)
|
|
msg += '\n\nFound in wallet:\n ' + wallet.name
|
|
msg += '\nDerivation path:\n ' + sp
|
|
addr_fmt = wallet.addr_fmt
|
|
if not is_ms:
|
|
esc = "0"
|
|
msg += "\n\nPress (0) to sign message with this key."
|
|
|
|
title = "Verified"
|
|
if version.has_qwerty:
|
|
esc += KEY_QR
|
|
title += " Address"
|
|
else:
|
|
msg += ' Press (1) for address QR.'
|
|
esc += '1'
|
|
title += "!"
|
|
|
|
while 1:
|
|
ch = await ux_show_story(msg, title=title, escape=esc, hint_icons=KEY_QR)
|
|
if ch in ("1"+KEY_QR):
|
|
await show_qr_code(addr, msg=addr, is_addrs=True,
|
|
is_alnum=(addr_fmt & (AFC_BECH32 | AFC_BECH32M)))
|
|
elif not is_ms and (ch == "0"): # only singlesig
|
|
from msgsign import sign_with_own_address
|
|
await sign_with_own_address(sp, addr_fmt)
|
|
else:
|
|
break
|
|
|
|
except UnknownAddressExplained as exc:
|
|
await ux_show_story(show_single_address(addr) + '\n\n' + str(exc), title="Unknown Address")
|
|
except Exception as e:
|
|
await ux_show_story('Ownership search failed.\n\n%s\n%s' % (e, problem_file_line(e)))
|
|
|
|
|
|
@classmethod
|
|
def note_subpath_used(cls, subpath):
|
|
# when looking at PSBT, the address format is only implied
|
|
# - but assume BIP-44/48/etc are being respected, and map to addr_fmt
|
|
# - subpath is integers from PSBT contents already
|
|
# - ignore coin_type
|
|
from public_constants import AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH
|
|
|
|
subpath = subpath[1:] # ignore xfp
|
|
if len(subpath) < 3 or subpath[0] < 0x80000000 or subpath[2] < 0x80000000:
|
|
# weird path w/o expected hardened levels - ignore
|
|
return
|
|
|
|
top = subpath[0] & 0x7fffffff
|
|
acct = subpath[2] & 0x7fffffff
|
|
if top == 44:
|
|
af = AF_CLASSIC
|
|
elif top == 49:
|
|
af = AF_P2WPKH_P2SH
|
|
elif top == 84:
|
|
af = AF_P2WPKH
|
|
else:
|
|
return
|
|
|
|
cls.note_wallet_used(af, acct)
|
|
|
|
@classmethod
|
|
def note_wallet_used(cls, addr_fmt, subaccount):
|
|
# we track single-sig wallets they seem to use
|
|
# - if they explore it (non-zero subaccount)
|
|
# - if they sign those paths
|
|
# - but ignore testnet vs. not
|
|
if subaccount == 0:
|
|
# only interested in non-zero subaccounts
|
|
return
|
|
|
|
here = [addr_fmt, subaccount]
|
|
|
|
ex = settings.get('accts', [])
|
|
|
|
if here in ex:
|
|
# known.
|
|
return
|
|
|
|
ex = list(ex)
|
|
ex.append(here)
|
|
settings.set('accts', ex)
|
|
|
|
@classmethod
|
|
def wipe_all(cls):
|
|
# clear all cached addresses. will affect other seeds in vault
|
|
for fn in os.listdir():
|
|
if fn.endswith('.own'):
|
|
os.remove(fn)
|
|
|
|
# singleton, but also only created as needed; holds no state.
|
|
OWNERSHIP = OwnershipCache()
|
|
|
|
# EOF
|