Use just a descriptor to find funds and create PSBT to move them

This commit is contained in:
Peter D. Gray 2020-09-11 17:33:41 -04:00
parent 5c73833773
commit 918e8b9534
No known key found for this signature in database
GPG Key ID: F0E6CC6AFC16CF7B
4 changed files with 215 additions and 100 deletions

View File

@ -8,10 +8,9 @@ from binascii import b2a_hex as _b2a_hex
from binascii import a2b_hex as _a2b_hex
from collections import namedtuple
from base64 import b64encode
from pycoin.tx.Tx import Tx
#from pycoin.tx.TxOut import TxOut
from pycoin.coins.bitcoin.Tx import Tx
from pycoin.satoshi.checksigops import parse_signature_blob
#from pycoin.encoding import b2a_hashed_base58, a2b_hashed_base58
from pycoin.tx.script.check_signature import parse_signature_blob
from binascii import b2a_hex, a2b_hex
from base64 import b64decode

View File

@ -66,6 +66,13 @@ def xfp2str(xfp):
# and not really an integer. Used to show as '0x%08x' but that's wrong endian.
return b2a_hex(struct.pack('>I', xfp)).upper()
def str2xfp(xfp):
# Standardized way to show an xpub's fingerprint... it's a 4-byte string
# and not really an integer. Used to show as '0x%08x' but that's wrong endian.
return struct.unpack('>I', a2b_hex(xfp))[0]
assert str2xfp(xfp2str(0x1234)) == 0x1234
def str2path(xfp, s):
# output binary needed for BIP-174
p = list(str2ipath(s))
@ -92,20 +99,190 @@ def calc_pubkey(xpubs, path):
node = node.subkey_for_path(sk)
return node.sec()
@click.command()
@click.argument('public_txt', type=click.File('rt'))
@click.argument('payout_address', type=str)
@click.argument('out_psbt', type=click.File('wb'))
@click.option('--testnet', '-t', help="Assume testnet3 addresses", is_flag=True, default=False)
@click.option('--xfp', '--fingerprint', help="Provide XFP value, otherwise discovered from file", default=None)
@click.option('--gap', help="Widen search by searching /[0/1]/0...gap", default=None, type=int)
@click.option('--xpub', 'single_xpub', help="Limit work to single xpub", default=None)
def recovery(public_txt, payout_address, out_psbt, testnet, xfp=None, gap=None, single_xpub=None):
def build_psbt(ctx, xfp, addrs, pubkey=None, xpubs=None):
locals().update(ctx.obj)
payout_address = ctx.obj['payout_address']
out_psbt = ctx.obj['output_psbt']
if pubkey:
assert len(addrs) == 1 # can only be single addr in that case
assert len(pubkey) == 33
spending = []
total = 0
psbt = BasicPSBT()
for path, addr in addrs:
print(f"addr: {path} => {addr} ... ", end='')
rr = explora('address', addr, 'utxo')
if not rr:
print('nada')
continue
here = 0
for u in rr:
here += u['value']
tt = TxIn(h2b_rev(u['txid']), u['vout'])
spending.append(tt)
#print(rr)
pin = BasicPSBTInput(idx=len(psbt.inputs))
psbt.inputs.append(pin)
pubkey = pubkey or calc_pubkey(xpubs, path)
pin.bip32_paths[pubkey] = str2path(xfp, path)
# fetch the UTXO for witness signging
td = explora('tx', u['txid'], 'hex', is_json=False)
outpt = Tx.from_hex(td.decode('ascii')).txs_out[u['vout']]
with BytesIO() as b:
outpt.stream(b)
pin.witness_utxo = b.getvalue()
print('%.8f BTC' % (here / 1E8))
total += here
if len(spending) > 15:
print("Reached practical limit on # of inputs. "
"You'll need to repeat this process again later.")
break
assert total, "Sorry! Didn't find any UTXO"
print("Found total: %.8f BTC" % (total / 1E8))
if payout_address:
print("Planning to send to: %s" % payout_address)
dest_scr = BTC.contract.for_address(payout_address)
txn = Tx(2,spending,[TxOut(total, dest_scr)])
else:
print("Output section of PSBT will be empty. Change downstream")
txn = Tx(2,spending,[])
fee = tx_fee.recommended_fee_for_tx(txn)
# placeholder, single output that isn't change
pout = BasicPSBTOutput(idx=0)
psbt.outputs.append(pout)
print("Guestimate fee: %.8f BTC" % (fee / 1E8))
if txn.txs_out:
txn.txs_out[0].coin_value -= fee
# write txn into PSBT
with BytesIO() as b:
txn.stream(b)
psbt.txn = b.getvalue()
out_psbt.write(psbt.as_bytes())
print("PSBT to be signed:\n\n\t" + out_psbt.name, end='\n\n')
@click.group()
@click.option('-p', '--payout_address', type=str, default=None, metavar="1bitcoinaddr")
@click.option('-o', '--output_psbt', type=click.File('wb'), default="out.psbt")
@click.option('-t', '--testnet', help="Assume testnet3 addresses", is_flag=True, default=False)
@click.pass_context
def cli(ctx, payout_address, output_psbt, testnet):
ctx.ensure_object(dict)
ctx.obj['payout_address'] = payout_address
ctx.obj['output_psbt'] = output_psbt
global TESTNET
TESTNET = testnet
@cli.command('desc')
@click.argument('descriptor', type=str, metavar='FULL-DESCRIPTOR')
@click.argument('address', type=str, metavar='Address')
@click.option('--xfp', '--fingerprint', help="Provide XFP value, otherwise some checks will be skipped", default=None)
@click.option('--xpub', help="Optional XPUB at hardened depth", default=None)
@click.option('--depth', help="Depth of xpub given", type=int, default=None)
@click.pass_context
def descriptor(ctx, descriptor, address, xfp, xpub, depth):
locals().update(ctx.obj)
if xpub and not depth:
print("need depth if xpub given")
sys.exit(1)
# XXX could not find quick python lib to read miniscript
# - not checking checksum TODO
m = re.match(r"(.*)\(\[([a-f0-9/']*)\]([a-f0-9]{66})", descriptor)
if not m:
print("descriptor fail")
return
# ex = "sh(wpkh([e0000002/84'/0'/0'/0/9]022c...43434))#v90hljj9"
mode = m.group(1) # sh(wpkh
mode = mode.replace('(', '/').replace(')', '').upper()
deriv = m.group(2) # e0000002/84'/0'/0'/0/9
expect_pubkey = m.group(3) # 022c...34
parts = deriv.split('/')
if xfp:
assert parts[0].lower() == xfp.lower(), f'wrong xfp? got={parts[0]} expected={xfp}'
else:
# expect 8 hex digits
xfp = parts[0]
assert len(xfp) == 8
xfp = str2xfp(xfp)
path = '/'.join(parts[1:])
addr_fmt = None
if xpub:
wallet = BTC.parse.bip32(xpub)
sub = '/'.join(parts[1+depth:])
ph = '/'.join(["_'"] * depth)
print(f"Assuming: m/{ph}/{sub} is path")
node = wallet.subkey_for_path(sub)
pubkey = node.sec()
assert b2a_hex(pubkey) == expect_pubkey
fails = []
for pc_name, guess_addr, *_ in BTC.output_for_public_pair(node.public_pair()):
if guess_addr == address:
addr_fmt = pc_name
print(f"Address Format: {addr_fmt} vs {mode} must be right")
break
fails.append(guess_addr)
else:
print("Can't confirm address based on xpub + path")
print("tried: " + ' '.join(fails))
print(f"none match: {address}")
sys.exit(1)
else:
pubkey = a2b_hex(expect_pubkey)
addrs = [ (path, address) ]
build_psbt(ctx, xfp, addrs, pubkey=pubkey)
@cli.command('public')
@click.argument('public_txt', type=click.File('rt'))
@click.option('--xfp', '--fingerprint', help="Provide XFP value, otherwise discovered from file", default=None)
@click.option('--gap', help="Widen search by searching /[0/1]/0...gap", default=None, type=int)
@click.option('--xpub', 'single_xpub', help="Limit work to single xpub", default=None)
@click.option('--dump_addrs', help="Dump addrs and paths we will check (and stop)", default=None)
@click.pass_context
def recovery(ctx, public_txt, xfp=None, gap=None, single_xpub=None, dump_addrs=None):
locals().update(ctx.obj)
''' Match lines like:
m/0'/0'/0' => n3ieqYKgVR8oB2zsHVX1Pr7Zc31pP3C7ZJ
@ -168,10 +345,6 @@ def recovery(public_txt, payout_address, out_psbt, testnet, xfp=None, gap=None,
if xfp:
print("Fingerprint is: " + xfp2str(xfp))
if not addrs:
print("No addresses found!")
sys.exit(1)
pubkeys = {}
if single_xpub:
assert single_xpub in xpubs.values(), "Specific xpub not found: " + repr(xpubs)
@ -181,6 +354,8 @@ def recovery(public_txt, payout_address, out_psbt, testnet, xfp=None, gap=None,
if gap:
print(f"Will use deriv path: {the_path}")
wallet = BTC.parse.bip32(single_xpub)
expect_addr = addrs[0][1] # for .../0/0
addrs = []
addr_fmt = None
for ch in range(2):
for idx in range(gap):
@ -190,9 +365,8 @@ def recovery(public_txt, payout_address, out_psbt, testnet, xfp=None, gap=None,
garbage = dict((a,b) for a,b,*c in BTC.output_for_public_pair(node.public_pair()))
if not addr_fmt:
assert idx==0 and ch==0
expect = addrs[0][1]
for k,v in garbage.items():
if v == expect:
if v == expect_addr:
addr_fmt = k
print(f"Address format will be: {addr_fmt}")
break
@ -200,16 +374,27 @@ def recovery(public_txt, payout_address, out_psbt, testnet, xfp=None, gap=None,
assert not expect, "Could not find 0/0 addr in public?!"
addr = garbage[addr_fmt]
print(addr)
pubkeys[p] = node.sec()
if idx < 5 and ch == 0:
assert (p, addr) in addrs, f'Got {addr} for {p} but wanted/expected: {addrs[0][0]} => {addrs[0][1]}'
if idx == 0 and ch == 0:
assert addr == expect_addr
addrs.append( (p, addr) )
else:
print("Found %d xpubs: %s" % (len(xpubs), ' '.join(xpubs)))
print("Found %d addresses. Checking for balances.\n" % len(addrs))
if not addrs:
print("No addresses found!")
sys.exit(1)
if dump_addrs:
with open(dump_addrs, 'wt') as fd:
for p,a in addrs:
fd.write(f'{p} => {a}\n')
print(f'Wrote: {dump_addrs}')
sys.exit(0)
print(f"Found {len(addrs)} addresses: from {addrs[0][0]} to {addrs[-1][0]}")
print("Checking for balances.\n")
if 0:
# verify we have enough data
@ -223,83 +408,11 @@ def recovery(public_txt, payout_address, out_psbt, testnet, xfp=None, gap=None,
if trouble:
sys.exit(1)
spending = []
total = 0
psbt = BasicPSBT()
build_psbt(ctx, xfp, addrs)
for path, addr in addrs:
print(f"addr: {path}=>{addr} ... ", end='')
rr = explora('address', addr, 'utxo')
if not rr:
print('nada')
continue
here = 0
for u in rr:
here += u['value']
tt = TxIn(h2b_rev(u['txid']), u['vout'])
spending.append(tt)
#print(rr)
pin = BasicPSBTInput(idx=len(psbt.inputs))
psbt.inputs.append(pin)
pubkey = calc_pubkey(xpubs, path)
pin.bip32_paths[pubkey] = str2path(xfp, path)
# fetch the UTXO for witness signging
td = explora('tx', u['txid'], 'hex', is_json=False)
outpt = Tx.from_hex(td.decode('ascii')).txs_out[u['vout']]
with BytesIO() as b:
outpt.stream(b)
pin.witness_utxo = b.getvalue()
print('%.8f BTC' % (here / 1E8))
total += here
if len(spending) > 15:
print("Reached practical limit on # of inputs. "
"You'll need to repeat this process again later.")
break
assert total, "sorry didnt find any UTXO"
print("Found total: %.8f BTC" % (total / 1E8))
print("Planning to send to: %s" % payout_address)
#dest_scr = standard_tx_out_script(payout_address)
dest_scr = BTC.contract.for_address(payout_address)
txn = Tx(2,spending,[TxOut(total, dest_scr)])
fee = tx_fee.recommended_fee_for_tx(txn)
# placeholder, single output that isn't change
pout = BasicPSBTOutput(idx=0)
psbt.outputs.append(pout)
print("Guestimate fee: %.8f BTC" % (fee / 1E8))
txn.txs_out[0].coin_value -= fee
# write txn into PSBT
with BytesIO() as b:
txn.stream(b)
psbt.txn = b.getvalue()
out_psbt.write(psbt.as_bytes())
print("PSBT to be signed:\n\n\t" + out_psbt.name, end='\n\n')
if __name__ == '__main__':
recovery()
cli()
# EOF

View File

@ -1,4 +1,7 @@
click>=6.7
pycoin==0.80
#pycoin==0.80
# extensive porting, now:
pycoin==0.90.20200809

View File

@ -8,7 +8,7 @@ from setuptools import setup
setup(
name='psbt_recover',
version='1.0',
version='2.0',
py_modules=[],
python_requires='>3.6.0',
install_requires=[
@ -16,7 +16,7 @@ setup(
],
entry_points='''
[console_scripts]
psbt_recovery=recovery:recovery
psbt_recovery=recovery:cli
''',
)