addr ownership, pass first

This commit is contained in:
Peter D. Gray 2024-03-21 14:36:23 -04:00
parent 3e7a3a879b
commit 2728aa6db0
No known key found for this signature in database
GPG Key ID: A2DCD558C2BE5D7C
15 changed files with 452 additions and 32 deletions

View File

@ -181,3 +181,9 @@ We will summarize transaction outputs as "change" back into same wallet, however
- Note text is unlimited, but storing very large notes may affect performance system-wide.
- Q Backup files restored onto Mk4 will lose their notes, since notes feature is not supported.
# Address Ownership (derivation from payment address)
- only the first 1000 addresses, non-change from each "wallet" are considered
- if you have used an sub account, without ever exploring it in the Address Explorer, nor
signing a PSBT with those account numbers, we do not search it.

View File

@ -376,6 +376,8 @@ Press (3) if you really understand and accept these risks.
def generate_address_csv(path, addr_fmt, ms_wallet, account_num, n, start=0, change=0):
# Produce CSV file contents as a generator
# - maybe cache internally
from ownership import OWNERSHIP
if ms_wallet:
# For multisig, include redeem script and derivation for each signer
@ -383,23 +385,45 @@ def generate_address_csv(path, addr_fmt, ms_wallet, account_num, n, start=0, cha
'Redeem Script (%d of %d)' % (ms_wallet.M, ms_wallet.N)]
+ (['Derivation'] * ms_wallet.N)) + '"\n'
if n > 100 and change == 0:
saver = OWNERSHIP.saver(ms_wallet, start)
else:
saver = None
for (idx, addr, derivs, script) in ms_wallet.yield_addresses(start, n, change_idx=change):
if saver:
saver(addr)
ln = '%d,"%s","%s","' % (idx, addr, b2a_hex(script).decode())
ln += '","'.join(derivs)
ln += '"\n'
yield ln
if saver:
saver(None) # close
return
# build the "master" wallet based on indicated preferences
from wallet import MasterSingleSigWallet
main = MasterSingleSigWallet(addr_fmt, path, account_num)
if n > 100 and change == 0:
saver = OWNERSHIP.saver(main, start)
else:
saver = None
yield '"Index","Payment Address","Derivation"\n'
for (idx, addr, deriv) in main.yield_addresses(start, n, change_idx=change):
if saver:
saver(addr)
yield '%d,"%s","%s"\n' % (idx, addr, deriv)
if saver:
saver(None) # close
async def make_address_summary_file(path, addr_fmt, ms_wallet, account_num,
count=250, change=0, **save_opts):

View File

@ -5,7 +5,8 @@
import ngu
from uhashlib import sha256
from ubinascii import hexlify as b2a_hex
from public_constants import AF_CLASSIC, AF_P2SH, AF_P2WPKH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH
from public_constants import AF_CLASSIC, AF_P2WPKH, AF_P2TR
from public_constants import AF_P2SH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH
from public_constants import AFC_PUBKEY, AFC_SEGWIT, AFC_BECH32, AFC_SCRIPT
from serializations import hash160, ser_compact_size, disassemble
from ucollections import namedtuple
@ -256,6 +257,31 @@ class ChainsBase:
return data_hex, data_ascii
return None
@classmethod
def possible_address_fmt(cls, addr):
# Given a text (serialized) address, return what
# address format applies to the address, but
# for AF_P2SH case, could be AF_P2WPKH_P2SH, AF_P2WSH_P2SH.
if addr.startswith(cls.bech32_hrp):
if addr.startswith(cls.bech32_hrp+'1p'):
return AF_P2TR
else:
return AF_P2WPKH if len(addr) < 55 else AF_P2WSH
try:
raw = ngu.codecs.b58_decode(addr)
except ValueError:
# not base58, not an error
return 0
if raw[0] == cls.b58_addr[0]:
return AF_CLASSIC
if raw[0] == cls.b58_script[0]:
return AF_P2SH
return 0
class BitcoinMain(ChainsBase):
# see <https://github.com/bitcoin/bitcoin/blob/master/src/chainparams.cpp#L140>
ctype = 'BTC'

View File

@ -147,7 +147,7 @@ class Display:
self.text(-2, 28, 'E', font=FontTiny, invert=1)
self.text(-2, 35, 'V', font=FontTiny, invert=1)
def fullscreen(self, msg, percent=None):
def fullscreen(self, msg, percent=None, line2=None):
# show a simple message "fullscreen".
self.clear()
y = 14

View File

@ -47,4 +47,7 @@ class AbortInteraction(BaseException):
class QRDecodeExplained(ValueError):
pass
class UnknownAddressExplained(ValueError):
pass
# EOF

View File

@ -479,10 +479,13 @@ class Display:
self.dis.fill_rect(WIDTH-bw, TOP_MARGIN, bw, ACTIVE_H, COL_SCROLL_DARK)
self.dis.fill_rect(WIDTH-bw, TOP_MARGIN+pos, bw, bh, COL_TEXT)
def fullscreen(self, msg, percent=None):
def fullscreen(self, msg, percent=None, line2=None):
# show a simple message "fullscreen".
self.clear()
self.text(None, CHARS_H // 3, msg)
y = CHARS_H // 3
self.text(None, y, msg)
if line2:
self.text(None, y+2, line2)
if percent is not None:
self.progress_bar(percent)
self.show()

View File

@ -151,6 +151,15 @@ class MultisigWallet(WalletABC):
return v.upper()
return '?'
def render_path(self, change_idx, idx):
# assuming shared derivations for all cosigners. Wrongish.
derivs, _ = self.get_deriv_paths()
if len(derivs) > 1:
deriv = '(various)'
else:
deriv = derivs[0]
return deriv + '/%d/%d' % (change_idx, idx)
@property
def chain(self):
return chains.get_chain(self.chain_type)

View File

@ -2,14 +2,14 @@
#
# ownership.py - store a cache of hashes related to addresses we might control.
#
import gc, chains, stash, ngu
import sys, chains, stash, ngu, struct
from uhashlib import sha256
from ustruct import pack, unpack
from ubinascii import b2a_base64, a2b_base64
from glob import settings
from ucollections import namedtuple
from wallet import WalletABC
from ubinascii import hexlify as b2a_hex
from exceptions import UnknownAddressExplained
# Track many addresses, but in compressed form
# - map from random Bech32/Base58 payment address to (wallet)/keypath
@ -28,32 +28,261 @@ from ubinascii import hexlify as b2a_hex
# - data building/saves happens when are searching, but might grab some during addr expl export?
#
REL_GAP_LIMIT = const(1000)
MAX_ADDRS_STORED = const(500)
BONUS_GAP_LIMIT = const(20)
OwnershipFileHdr = namedtuple('OwnershipFileHdr', 'file_magic offset')
OWNERSHIP_FILE_HDR = 'II'
FILE_HDR_LEN = const(8)
OWNERSHIP_MAGIC = 0xA010 # "Address Ownership" v1.0
OwnershipFileHdr = namedtuple('OwnershipFileHdr', 'file_magic flags offset')
OWNERSHIP_FILE_HDR = 'III'
# length of hashed & truncated address record
HASH_ENC_LEN = const(8)
HASH_ENC_LEN = const(2)
# We may store only the 0/0 ..0/n paths, or alternating 0/0, 1/0, 0/1,
FLAG_DUAL = 0x01
OWNERSHIP_MAGIC = 0xA010 # "Address Ownership" v1.0
def encode_addr(addr, salt):
# Convert text address to something we can store while preserving privacy.
return ngu.hash.sha256s(salt + addr)[0:HASH_ENC_LEN]
class AddressCacheFile:
def __init__(self, wallet):
self.wallet = wallet
self.desc = wallet.to_descriptor().serialize()
h = b2a_hex(ngu.hash.sha256d(self.desc))
self.fname = h[0:32] + '.own'
self.salt = h[32:]
self.count = 0
self.hdr = None
self.peek()
def exists(self):
return bool(self.count)
def peek(self):
# see what we have on-disk; just reads header.
hdr_len = struct.calcsize(OWNERSHIP_FILE_HDR)
try:
with open(self.fname, 'rb') as fd:
hdr = fd.read(hdr_len)
assert len(hdr) == hdr_len
flen = fd.seek(0, 2)
self.hdr = OwnershipFileHdr(*struct.unpack(OWNERSHIP_FILE_HDR, hdr))
assert self.hdr.file_magic == OWNERSHIP_MAGIC
except OSError:
return
except Exception as exc:
sys.print_exception(exc)
self.count = 0
self.hdr = None
return
each = (2 if (self.hdr.flags & FLAG_DUAL) else 1) * HASH_ENC_LEN
self.count = (flen - hdr_len) // each
def setup(self, start_idx, incl_change=False):
flags = (0x0 if not incl_change else FLAG_DUAL)
if self.count or self.hdr:
assert start_idx == self.hdr.offset + self.count, 'not an append'
assert self.hdr.flags == flags, 'mode wrong'
# Open for append, header should be right already
self.fd = open(self.fname, 'ab')
else:
# Start new file
self.fd = open(self.fname, 'wb')
self.hdr = OwnershipFileHdr(OWNERSHIP_MAGIC, flags, start_idx)
hdr = struct.pack(OWNERSHIP_FILE_HDR, *self.hdr)
self.fd.write(hdr)
def append(self, addr, change_addr=None):
if addr is None:
# close file, done
self.fd.close()
del self.fd
return
self.fd.write(encode_addr(addr, self.salt))
if change_addr:
assert self.hdr.flags & FLAG_DUAL
self.fd.write(encode_addr(change_addr))
def fast_search(self, addr):
# Do the easy part of the searching, using the existing file's contents.
# - generates candidate path subcomponents; might be false positive
from glob import dis
if not self.hdr or not self.count:
return
chk = encode_addr(addr, self.salt)
is_dual = (self.hdr.flags & FLAG_DUAL)
idx = self.hdr.offset
with open(self.fname, 'rb') as fd:
fd.seek(struct.calcsize(OWNERSHIP_FILE_HDR))
buf = bytearray(HASH_ENC_LEN)
while 1:
if fd.readinto(buf) != HASH_ENC_LEN:
break
if chk == buf:
yield (0, idx)
if is_dual:
if fd.readinto(buf) != HASH_ENC_LEN:
break
if chk == buf:
yield (1, idx)
idx += 1
dis.progress_sofar(idx-self.hdr.offset, self.count)
def check_match(self, want_addr, subpath):
# need to double-check matches, to get rid of false positives.
chg, idx = subpath
got = self.wallet.render_address(*subpath)
return want_addr == got
def rebuild(self, addr):
# build more addresses
# - maybe wipe incomplete stuff from csv export hack
# - return subpath for a hit or None
from glob import dis
bonus = 0
match = None
start_idx = self.count + (self.hdr.offset if self.hdr else 0)
count = MAX_ADDRS_STORED - start_idx
self.setup(start_idx, incl_change=False)
for idx,here,*_ in self.wallet.yield_addresses(
start_idx, count, change_idx=0, censored=False):
if here == addr:
# Found it! But keep going a little for next time.
match = (0, idx)
self.append(addr)
self.count += 1
if match:
bonus += 1
if match and bonus > BONUS_GAP_LIMIT:
self.append(None)
return match
dis.progress_sofar(idx-start_idx, count)
self.append(None)
return None
class OwnershipCache:
def wallet_to_fname(self, wallet: WalletABC):
# hash up something about the wallet to form a filename
desc = wallet.to_descriptor().serialize()
h = ngu.hash.sha256d(desc)
return b2a_hex(h)[0:32] + '.own'
@classmethod
def saver(cls, wallet, start_idx):
# when we are generating many addresses for export, capture them
# as we go with this function
# - not change -- only main addrs
file = AddressCacheFile(wallet)
def register(self, wallet:WalletABC):
# notes the details of a new wallet
# - won't build anything, so still fast
fn = self.wallet_to_fname(wallet)
if file.exists():
# don't save to existing file, has some
return None
def note_subkey(self, xfp, path, pubkey):
# whenever we see an in or out that is ours, note it here
pass
try:
file.setup(start_idx)
except Exception as exc:
# in some cases we don't want to save anything, not an error
sys.print_exception(exc)
return None
return file.append
@classmethod
def search(cls, addr):
# find it!
# - ignoring P2WPKH_P2SH ... for single-sig must be P2WPKH or CLASSIC
# - anything script-like needs to match an existing multisig wallet
# - if you start w/ testnet, we'll follow that
from chains import current_chain
from multisig import MultisigWallet
from public_constants import AFC_SCRIPT
from glob import dis
ch = current_chain()
addr_fmt = ch.possible_address_fmt(addr)
if not addr_fmt:
# might be valid address on testnet vs mainnet
nm = ch.name if ch.ctype != 'BTC' else 'Bitcoin Mainnet'
raise UnknownAddressExplained('That address is not valid on ' + nm)
possibles = []
if addr_fmt & AFC_SCRIPT:
# multisig or script at least.. must exist already
for w in MultisigWallet.iter_wallets(addr_fmt=addr_fmt):
possibles.append(w)
# TODO: add tapscript and such fancy stuff here
if not possibles:
raise UnknownAddressExplained(
"No suitable multisig wallets are currently defined.")
else:
# construct possible single-signer wallets, always at least account=0 case
from wallet import MasterSingleSigWallet
w = MasterSingleSigWallet(addr_fmt, account_idx=0)
possibles.append(w)
# TODO: add all account idx they have ever looked at
prompt = 'Searching wallet(s)...'
# "quick" check first
# TODO: search all in parallel, rather than serially because
# more likely to find a match with low index
count = 0
phase2 = []
files = [AddressCacheFile(w) for w in possibles]
for f in files:
dis.fullscreen(prompt, line2=f.wallet.name)
for maybe in f.fast_search(addr):
ok = f.check_match(addr, maybe)
if not ok: continue
# winner.
return f.wallet, maybe
if f.count < MAX_ADDRS_STORED:
phase2.append(f)
count += f.count
# maybe we haven't rendered all the addresses yet, so do that
# - very slow, but only needed once
# - might stop when match found, or maybe go a bit beyond that?
for f in phase2:
b4 = f.count
dis.fullscreen("Generating addresses...", line2=f.wallet.name)
result = f.rebuild(addr)
if result:
# found it, so report it and stop
return f.wallet, result
count += f.count - b4
raise UnknownAddressExplained('Searched %d candidates without finding a match.' % count)
# singleton
OWNERSHIP = OwnershipCache()

View File

@ -98,7 +98,7 @@ class SecretStash:
hd.from_chaincode_privkey(ch, pk)
return 'xprv', ch+pk, hd
if marker & 0x80:
elif marker & 0x80:
# seed phrase
ll = ((marker & 0x3) + 2) * 8

View File

@ -1030,7 +1030,7 @@ async def ux_visualize_txn(bin_txn):
async def ux_visualize_bip21(proto, addr, args):
# Show details of BIP-21 URL
# - imho, a bare address is a valid BIP-21 URL so we come here too
# - TODO: validate address ownership
# - validate address ownership on request
from ux import ux_show_story
msg = addr + '\n\n'
@ -1054,8 +1054,27 @@ async def ux_visualize_bip21(proto, addr, args):
if args:
msg += 'And values for: ' + ', '.join(args)
msg += 'Press (1) to verify ownership.'
await ux_show_story(msg, title="Payment Address")
ch = await ux_show_story(msg, title="Payment Address", escape='1')
if ch != '1': return
from ownership import OWNERSHIP
from exceptions import UnknownAddressExplained
try:
wallet, subpath = OWNERSHIP.search(addr)
msg = addr
msg += '\n\nFound in wallet:\n ' + wallet.name
msg += '\nDerivation path:\n ' + wallet.render_path(*subpath)
await ux_show_story(msg, title="Your Own Address")
except UnknownAddressExplained as exc:
await ux_show_story(addr + '\n\n' + str(exc), title="Unknown Address")
async def ux_visualize_wif(wif_str, kp, compressed, testnet):
from ux import ux_show_story

View File

@ -16,9 +16,18 @@ class WalletABC:
# chain
def yield_addresses(self, start_idx, count, change_idx=0, censored=True):
# TODO: expected tuples?
# TODO: returns various expected tuples?
pass
def render_address(self, change_idx, idx):
# make one single address
tmp = list(self.yield_addresses(idx, 1, change_idx=change_idx, censored=False))
assert len(tmp) == 1
assert tmp[0][0] == idx
return tmp[0][1]
def to_descriptor(self):
pass
@ -93,6 +102,16 @@ class MasterSingleSigWallet(WalletABC):
yield idx, address, path+str(idx)
def render_address(self, change_idx, idx):
# Optimized for single address.
path = self._path + '/%d/%d' % (change_idx, idx)
with SensitiveValues() as sv:
node = sv.derive_path(path)
return self.chain.address(node, self.addr_fmt)
def render_path(self, change_idx, idx):
return self._path + '/%d/%d' % (change_idx, idx)
def to_descriptor(self):
from glob import settings
xfp = settings.get('xfp')

View File

@ -2,12 +2,12 @@
//
// AUTO-generated.
//
// built: 2024-03-13
// version: 1.0.1Q
// built: 2024-03-21
// version: 1.0.2Q
//
#include <stdint.h>
// this overrides ports/stm32/fatfs_port.c
uint32_t get_fattime(void) {
return 0x586d0800UL;
return 0x58750800UL;
}

View File

@ -17,7 +17,8 @@ simulator_fixed_xfp = 0x4369050f
simulator_serial_number = 'F1F1F1F1F1F1'
from ckcc_protocol.constants import AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH, AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH
from ckcc_protocol.constants import AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH, AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH, AF_P2TR
from ckcc_protocol.constants import AFC_WRAPPED, AFC_PUBKEY, AFC_SEGWIT, AFC_BECH32M, AFC_SCRIPT
unmap_addr_fmt = {
'p2sh': AF_P2SH,

View File

@ -295,4 +295,57 @@ def test_word_wrap(txt, x_line2, sim_exec, only_q1, width=34):
assert want_words == got_words
from constants import AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH, AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH
@pytest.mark.parametrize('addr,net,fmt', [
( 'bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3t4', 'BTC', AF_P2WPKH ),
])
def test_addr_detect(addr, net, fmt, sim_exec):
cmd = f'from chains import AllChains; RV.write(repr([(ch.ctype, ch.possible_address_fmt({addr!r})) for ch in AllChains]))'
print(cmd)
lst = sim_exec(cmd)
assert 'Error' not in lst
for got_net, match in eval(lst):
if match:
assert net == got_net
assert match == fmt
else:
assert net != got_net
assert match == 0
'''
>>> [AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH, AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH]
[14, 8, 26, 1, 7, 19]
'''
@pytest.mark.parametrize('addr_fmt', [
AF_P2WSH, AF_P2SH, AF_P2WSH_P2SH, AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH
])
@pytest.mark.parametrize('testnet', [ False, True] )
def test_addr_fake_detect(addr_fmt, testnet, sim_exec):
from txn import fake_address
addr = fake_address(addr_fmt, testnet)
cmd = f'from chains import AllChains; RV.write(repr([(ch.ctype, ch.possible_address_fmt({addr!r})) for ch in AllChains]))'
lst = sim_exec(cmd)
assert 'Error' not in lst
#print(lst)
expect_net = ('BTC' if not testnet else 'XTN')
expect_addr_fmt = addr_fmt if addr_fmt not in { AF_P2WSH_P2SH, AF_P2WPKH_P2SH } else AF_P2SH
for got_net, match in eval(lst):
if match:
if got_net == 'XRT':
assert expect_net == 'XTN'
else:
assert got_net == expect_net
assert match == expect_addr_fmt
else:
assert got_net != expect_net
assert match == 0
# EOF

View File

@ -267,4 +267,32 @@ def render_address(script, testnet=True):
raise ValueError('Unknown payment script', repr(script))
def fake_address(addr_fmt, testnet=False):
# make fake addresses of any time. contents are noise
# TODO add regtest option
from constants import AFC_WRAPPED, AFC_PUBKEY, AFC_SEGWIT, AFC_BECH32M, AFC_SCRIPT
from helpers import prandom
from pycoin.encoding import b2a_hashed_base58
from bech32 import encode as bech32_encode
is_script = bool(addr_fmt & (AFC_SCRIPT | AFC_WRAPPED))
body = prandom(32 if is_script else 20)
if not testnet:
bech32_hrp = 'bc'
b58_addr = bytes([0])
b58_script = bytes([5])
else:
bech32_hrp = 'tb'
b58_addr = bytes([111])
b58_script = bytes([196])
if (addr_fmt & AFC_SEGWIT) and not (addr_fmt & AFC_WRAPPED):
# bech32
vers = 1 if (addr_fmt & AFC_BECH32M) == AFC_BECH32M else 0
return bech32_encode(bech32_hrp, vers, body)
else:
# base58
return b2a_hashed_base58((b58_script if is_script else b58_addr) + body[0:20])
# EOF