addr ownership

This commit is contained in:
Peter D. Gray 2024-03-21 16:33:54 -04:00
parent df8901ca3a
commit 6174b97b99
No known key found for this signature in database
GPG Key ID: A2DCD558C2BE5D7C
2 changed files with 62 additions and 70 deletions

View File

@ -28,18 +28,20 @@ from exceptions import UnknownAddressExplained
# - data building/saves happens when are searching, but might grab some during addr expl export?
#
MAX_ADDRS_STORED = const(500)
BONUS_GAP_LIMIT = const(20)
OwnershipFileHdr = namedtuple('OwnershipFileHdr', 'file_magic flags offset')
OWNERSHIP_FILE_HDR = 'III'
# length of hashed & truncated address record
HASH_ENC_LEN = const(2)
# We may store only the 0/0 ..0/n paths, or alternating 0/0, 1/0, 0/1,
FLAG_DUAL = 0x01
OWNERSHIP_MAGIC = 0xA010 # "Address Ownership" v1.0
# File header
OwnershipFileHdr = namedtuple('OwnershipFileHdr', 'file_magic future flags')
OWNERSHIP_FILE_HDR = 'HHI'
OWNERSHIP_FILE_HDR_LEN = 8
OWNERSHIP_MAGIC = 0x10A0 # "Address Ownership" v1.0
# flags: none yet, but 32 bits reserved
# target 3 flash blocks, max file size => 764 addresses
MAX_ADDRS_STORED = ((3*512) - OWNERSHIP_FILE_HDR_LEN) // HASH_ENC_LEN
BONUS_GAP_LIMIT = const(20)
def encode_addr(addr, salt):
# Convert text address to something we can store while preserving privacy.
@ -63,12 +65,10 @@ class AddressCacheFile:
def peek(self):
# see what we have on-disk; just reads header.
hdr_len = struct.calcsize(OWNERSHIP_FILE_HDR)
try:
with open(self.fname, 'rb') as fd:
hdr = fd.read(hdr_len)
assert len(hdr) == hdr_len
hdr = fd.read(OWNERSHIP_FILE_HDR_LEN)
assert len(hdr) == OWNERSHIP_FILE_HDR_LEN
flen = fd.seek(0, 2)
self.hdr = OwnershipFileHdr(*struct.unpack(OWNERSHIP_FILE_HDR, hdr))
assert self.hdr.file_magic == OWNERSHIP_MAGIC
@ -80,25 +80,22 @@ class AddressCacheFile:
self.hdr = None
return
each = (2 if (self.hdr.flags & FLAG_DUAL) else 1) * HASH_ENC_LEN
self.count = (flen - hdr_len) // each
self.count = (flen - OWNERSHIP_FILE_HDR_LEN) // HASH_ENC_LEN
def setup(self, start_idx, incl_change=False):
flags = (0x0 if not incl_change else FLAG_DUAL)
def setup(self, start_idx):
if self.count or self.hdr:
assert start_idx == self.hdr.offset + self.count, 'not an append'
assert self.hdr.flags == flags, 'mode wrong'
assert start_idx == self.count, 'not an append'
# Open for append, header should be right already
self.fd = open(self.fname, 'ab')
else:
# Start new file
self.fd = open(self.fname, 'wb')
self.hdr = OwnershipFileHdr(OWNERSHIP_MAGIC, flags, start_idx)
self.hdr = OwnershipFileHdr(OWNERSHIP_MAGIC, 0x0, 0x0)
hdr = struct.pack(OWNERSHIP_FILE_HDR, *self.hdr)
self.fd.write(hdr)
def append(self, addr, change_addr=None):
def append(self, addr):
if addr is None:
# close file, done
self.fd.close()
@ -106,39 +103,28 @@ class AddressCacheFile:
return
self.fd.write(encode_addr(addr, self.salt))
if change_addr:
assert self.hdr.flags & FLAG_DUAL
self.fd.write(encode_addr(change_addr))
def fast_search(self, addr):
# Do the easy part of the searching, using the existing file's contents.
# - generates candidate path subcomponents; might be false positive
# - working in-memory, since complete file isn't very large, and speed
from glob import dis
if not self.hdr or not self.count:
return
chk = encode_addr(addr, self.salt)
is_dual = (self.hdr.flags & FLAG_DUAL)
idx = self.hdr.offset
with open(self.fname, 'rb') as fd:
fd.seek(struct.calcsize(OWNERSHIP_FILE_HDR))
buf = bytearray(HASH_ENC_LEN)
while 1:
if fd.readinto(buf) != HASH_ENC_LEN:
break
if chk == buf:
yield (0, idx)
fd.seek(OWNERSHIP_FILE_HDR_LEN)
buf = fd.read(self.count * HASH_ENC_LEN)
if is_dual:
if fd.readinto(buf) != HASH_ENC_LEN:
break
if chk == buf:
yield (1, idx)
assert len(buf) == (self.count * HASH_ENC_LEN)
idx += 1
dis.progress_sofar(idx-self.hdr.offset, self.count)
chk = encode_addr(addr, self.salt)
for idx in range(self.count):
if buf[idx*HASH_ENC_LEN : (idx*HASH_ENC_LEN)+HASH_ENC_LEN] == chk:
yield (0, idx)
dis.progress_sofar(idx, self.count)
def check_match(self, want_addr, subpath):
# need to double-check matches, to get rid of false positives.
@ -155,10 +141,13 @@ class AddressCacheFile:
bonus = 0
match = None
start_idx = self.count + (self.hdr.offset if self.hdr else 0)
start_idx = self.count
count = MAX_ADDRS_STORED - start_idx
self.setup(start_idx, incl_change=False)
if count <= 0:
return None
self.setup(start_idx)
for idx,here,*_ in self.wallet.yield_addresses(
start_idx, count, change_idx=0, censored=False):
@ -167,12 +156,12 @@ class AddressCacheFile:
# Found it! But keep going a little for next time.
match = (0, idx)
self.append(addr)
self.append(here)
self.count += 1
if match:
bonus += 1
if match and bonus > BONUS_GAP_LIMIT:
if match and bonus >= BONUS_GAP_LIMIT:
self.append(None)
return match
@ -192,27 +181,25 @@ class OwnershipCache:
file = AddressCacheFile(wallet)
if file.exists():
# don't save to existing file, has some
# don't save to existing file, has some already
return None
try:
file.setup(start_idx)
except Exception as exc:
# in some cases we don't want to save anything, not an error
sys.print_exception(exc)
return None
return file.append
@classmethod
def search(cls, addr):
# find it!
# - ignoring P2WPKH_P2SH ... for single-sig must be P2WPKH or CLASSIC
# - anything script-like needs to match an existing multisig wallet
# Find it!
# - returns wallet object, and tuple2 of final 2 subpath components
# - if you start w/ testnet, we'll follow that
from chains import current_chain
from multisig import MultisigWallet
from public_constants import AFC_SCRIPT
from public_constants import AFC_SCRIPT, AF_P2WPKH_P2SH, AF_P2SH
from glob import dis
ch = current_chain()
@ -225,7 +212,13 @@ class OwnershipCache:
possibles = []
if addr_fmt & AFC_SCRIPT:
if addr_fmt == AF_P2SH and not MultisigWallet.exists():
# Might be single-sig p2wpkh wrapped in p2sh ... but that was a transition
# thing that hopefully is going away, so if they have any multisig wallets,
# defined, assume that that's the only p2sh address source.
addr_fmt = AF_P2WPKH_P2SH
if addr_fmt & AFC_SCRIPT and MultisigWallet.exists():
# multisig or script at least.. must exist already
for w in MultisigWallet.iter_wallets(addr_fmt=addr_fmt):
possibles.append(w)
@ -236,31 +229,26 @@ class OwnershipCache:
raise UnknownAddressExplained(
"No suitable multisig wallets are currently defined.")
else:
# construct possible single-signer wallets, always at least account=0 case
# Construct possible single-signer wallets, always at least account=0 case
from wallet import MasterSingleSigWallet
w = MasterSingleSigWallet(addr_fmt, account_idx=0)
possibles.append(w)
# TODO: add all account idx they have ever looked at
prompt = 'Searching wallet(s)...'
# "quick" check first
# TODO: search all in parallel, rather than serially because
# more likely to find a match with low index
# "quick" check first, before doing any generations
count = 0
phase2 = []
files = [AddressCacheFile(w) for w in possibles]
for f in files:
dis.fullscreen(prompt, line2=f.wallet.name)
dis.fullscreen('Searching wallet(s)...', line2=f.wallet.name)
for maybe in f.fast_search(addr):
ok = f.check_match(addr, maybe)
if not ok: continue
# winner.
# found winner.
return f.wallet, maybe
if f.count < MAX_ADDRS_STORED:
@ -270,6 +258,9 @@ class OwnershipCache:
# maybe we haven't rendered all the addresses yet, so do that
# - very slow, but only needed once
# - might stop when match found, or maybe go a bit beyond that?
# - MAYBE NOT: search all in parallel, rather than serially because
# more likely to find a match with low index
for f in phase2:
b4 = f.count
dis.fullscreen("Generating addresses...", line2=f.wallet.name)
@ -282,10 +273,8 @@ class OwnershipCache:
count += f.count - b4
raise UnknownAddressExplained('Searched %d candidates without finding a match.' % count)
# singleton
# singleton, but also only created as needed; holds no state.
OWNERSHIP = OwnershipCache()
# EOF

View File

@ -39,10 +39,10 @@ class MasterSingleSigWallet(WalletABC):
# - path can be overriden when we come here via address explorer
if addr_fmt == AF_P2WPKH:
n = 'Segwit'
n = 'Segwit P2WPKH'
prefix = path or 'm/84h/{coin_type}h/{account}h'
elif addr_fmt == AF_CLASSIC:
n = 'Classic'
n = 'Classic P2PKH'
prefix = path or 'm/44h/{coin_type}h/{account}h'
elif addr_fmt == AF_P2WPKH_P2SH:
n = 'P2WPKH-in-P2SH'
@ -55,8 +55,10 @@ class MasterSingleSigWallet(WalletABC):
else:
self.chain = chains.current_chain()
if self.chain.ctype != 'BTC':
n += ' ' + self.chain.menu_name
if self.chain.ctype == 'XTN':
n += ' (TestNet)'
if self.chain.ctype == 'XRT':
n += ' (RegTest)'
self.name = n
self.addr_fmt = addr_fmt
@ -103,13 +105,14 @@ class MasterSingleSigWallet(WalletABC):
yield idx, address, path+str(idx)
def render_address(self, change_idx, idx):
# Optimized for single address.
# Optimized for a single address.
path = self._path + '/%d/%d' % (change_idx, idx)
with SensitiveValues() as sv:
node = sv.derive_path(path)
return self.chain.address(node, self.addr_fmt)
def render_path(self, change_idx, idx):
# show the derivation path for an address
return self._path + '/%d/%d' % (change_idx, idx)
def to_descriptor(self):