Framework

This commit is contained in:
Peter D. Gray 2020-02-10 11:46:59 -05:00
commit 7f5852ec17
No known key found for this signature in database
GPG Key ID: F0E6CC6AFC16CF7B
7 changed files with 680 additions and 0 deletions

110
.gitignore vendored Normal file
View File

@ -0,0 +1,110 @@
public.txt
output.psbt
foo*.psbt
foo*.txt
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2020 Coinkite Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

23
README.md Normal file
View File

@ -0,0 +1,23 @@
# PSBT Faker
A simple program to create test PSBT files, that are plausible and self-consistent so
the PSBT-signing tools will sign them. Does not involve any blockchains... completely
made up inputs.
## Usage
```
# python3 -m pip install --editable .
# rehash
# pbst_faker test.psbt 1destaddr
```
## Requirements
- `python3.6+`
- `pycoin` version 0.80
- `click`
(See `requirements.txt`)

177
main.py Executable file
View File

@ -0,0 +1,177 @@
#!/usr/bin/env python3
#
# To use this, install with:
#
# pip install --editable .
#
# That will create the command "psbt_faker" in your path... or just use "./main.py ..." here
#
#
import click, sys, os, pdb, struct, io, json, re, time
from psbt import BasicPSBT, BasicPSBTInput, BasicPSBTOutput
from pprint import pformat, pprint
from binascii import b2a_hex as _b2a_hex
from binascii import a2b_hex
from io import BytesIO
from collections import namedtuple
from base64 import b64encode, b64decode
from pycoin.tx.Tx import Tx
from pycoin.tx.TxOut import TxOut
from pycoin.tx.TxIn import TxIn
from pycoin.ui import standard_tx_out_script
from pycoin.encoding import b2a_hashed_base58, hash160
from pycoin.serialize import b2h_rev, b2h, h2b, h2b_rev
from pycoin.contrib.segwit_addr import encode as bech32_encode
from pycoin.key.BIP32Node import BIP32Node
from pycoin.convention import tx_fee
import urllib.request
b2a_hex = lambda a: str(_b2a_hex(a), 'ascii')
#xfp2hex = lambda a: b2a_hex(a[::-1]).upper()
TESTNET = False
def str2ipath(s):
# convert text to numeric path for BIP174
for i in s.split('/'):
if i == 'm': continue
if not i: continue # trailing or duplicated slashes
if i[-1] in "'ph":
assert len(i) >= 2, i
here = int(i[:-1]) | 0x80000000
else:
here = int(i)
assert 0 <= here < 0x80000000, here
yield here
def xfp2str(xfp):
# Standardized way to show an xpub's fingerprint... it's a 4-byte string
# and not really an integer. Used to show as '0x%08x' but that's wrong endian.
return b2a_hex(struct.pack('>I', xfp)).upper()
def str2path(xfp, s):
# output binary needed for BIP-174
p = list(str2ipath(s))
return struct.pack('<%dI' % (1 + len(p)), xfp, *p)
def calc_pubkey(xpubs, path):
# given a map of paths to xpubs, and a single path, calculate the pubkey
assert path[0:2] == 'm/'
hard_prefix = '/'.join(s for s in path.split('/') if s[-1] == "'")
hard_depth = hard_prefix.count('/') + 1
want = ('m/'+hard_prefix) if hard_prefix else 'm'
assert want in xpubs, f"Need: {want} to build pubkey of {path}"
node = BIP32Node.from_hwif(xpubs[want])
parts = [s for s in path.split('/') if s != 'm'][hard_depth:]
# node = node.subkey_for_path(path[2:])
if not parts:
assert want == path
else:
for sk in parts:
node = node.subkey_for_path(sk)
return node.sec()
@click.command()
@click.argument('out_psbt', type=click.File('wb'))
@click.argument('payout_addresses', type=str, nargs='*')
@click.option('--testnet', '-t', help="Assume testnet3 addresses", is_flag=True, default=False)
@click.option('--xpub', help="Provide XPUB value", default=None)
@click.option('--num-change', '-c', help="Number of change outputs", default=1)
@click.option('--xfp', '--fingerprint', help="Provide XFP value, otherwise discovered from xpub", default=None)
def faker(num_change, payout_addresses, out_psbt, testnet, xfp=None, xpub=None):
global TESTNET
TESTNET = testnet
''' Match lines like:
m/0'/0'/0' => n3ieqYKgVR8oB2zsHVX1Pr7Zc31pP3C7ZJ
m/0/2 => mh7finD8ctq159hbRzAeevSuFBJ1NQjoH2
and also
m => tpubD6NzVbkrYhZ4XzL5Dhayo67Gorv1YMS7j8pRUvVMd5odC2LBPLAygka9p7748JtSq82FNGPppFEz5xxZUdasBRCqJqXvUHq6xpnsMcYJzeh
'''
psbt = BasicPSBT()
for path, addr in addrs:
print(f"addr: {addr} ... ", end='')
rr = explora('address', addr, 'utxo')
if not rr:
print('nada')
continue
here = 0
for u in rr:
here += u['value']
tt = TxIn(h2b_rev(u['txid']), u['vout'])
spending.append(tt)
#print(rr)
pin = BasicPSBTInput(idx=len(psbt.inputs))
psbt.inputs.append(pin)
pubkey = calc_pubkey(xpubs, path)
pin.bip32_paths[pubkey] = str2path(xfp, path)
# fetch the UTXO for witness signging
td = explora('tx', u['txid'], 'hex', is_json=False)
outpt = Tx.from_hex(td.decode('ascii')).txs_out[u['vout']]
with BytesIO() as b:
outpt.stream(b)
pin.witness_utxo = b.getvalue()
print('%.8f BTC' % (here / 1E8))
total += here
if len(spending) > 15:
print("Reached practical limit on # of inputs. "
"You'll need to repeat this process again later.")
break
assert total
print("Found total: %.8f BTC" % (total / 1E8))
print("Planning to send to: %s" % payout_address)
dest_scr = standard_tx_out_script(payout_address)
txn = Tx(2,spending,[TxOut(total, dest_scr)])
fee = tx_fee.recommended_fee_for_tx(txn)
# placeholder, single output that isn't change
pout = BasicPSBTOutput(idx=0)
psbt.outputs.append(pout)
print("Guestimate fee: %.8f BTC" % (fee / 1E8))
txn.txs_out[0].coin_value -= fee
# write txn into PSBT
with BytesIO() as b:
txn.stream(b)
psbt.txn = b.getvalue()
out_psbt.write(psbt.as_bytes())
print("PSBT to be signed:\n\n\t" + out_psbt.name, end='\n\n')
if __name__ == '__main__':
recovery()
# EOF

323
psbt.py Normal file
View File

@ -0,0 +1,323 @@
# (c) Copyright 2018 by Coinkite Inc. This file is part of Coldcard <coldcardwallet.com>
# and is covered by GPLv3 license found in COPYING.
#
# psbt.py - yet another PSBT parser/serializer but used only for test cases.
#
import io, struct
from binascii import b2a_hex as _b2a_hex
from binascii import a2b_hex as _a2b_hex
from collections import namedtuple
from base64 import b64encode
from pycoin.tx.Tx import Tx
#from pycoin.tx.TxOut import TxOut
#from pycoin.encoding import b2a_hashed_base58, a2b_hashed_base58
from pycoin.tx.script.check_signature import parse_signature_blob
from binascii import b2a_hex, a2b_hex
from base64 import b64decode
b2a_hex = lambda a: str(_b2a_hex(a), 'ascii')
# BIP-174 aka PSBT defined values
#
PSBT_GLOBAL_UNSIGNED_TX = (0)
PSBT_GLOBAL_XPUB = (1)
PSBT_IN_NON_WITNESS_UTXO = (0)
PSBT_IN_WITNESS_UTXO = (1)
PSBT_IN_PARTIAL_SIG = (2)
PSBT_IN_SIGHASH_TYPE = (3)
PSBT_IN_REDEEM_SCRIPT = (4)
PSBT_IN_WITNESS_SCRIPT = (5)
PSBT_IN_BIP32_DERIVATION = (6)
PSBT_IN_FINAL_SCRIPTSIG = (7)
PSBT_IN_FINAL_SCRIPTWITNESS = (8)
PSBT_OUT_REDEEM_SCRIPT = (0)
PSBT_OUT_WITNESS_SCRIPT = (1)
PSBT_OUT_BIP32_DERIVATION = (2)
# Serialization/deserialization tools
def ser_compact_size(l):
r = b""
if l < 253:
r = struct.pack("B", l)
elif l < 0x10000:
r = struct.pack("<BH", 253, l)
elif l < 0x100000000:
r = struct.pack("<BI", 254, l)
else:
r = struct.pack("<BQ", 255, l)
return r
def deser_compact_size(f):
try:
nit = f.read(1)[0]
except IndexError:
return None # end of file
if nit == 253:
nit = struct.unpack("<H", f.read(2))[0]
elif nit == 254:
nit = struct.unpack("<I", f.read(4))[0]
elif nit == 255:
nit = struct.unpack("<Q", f.read(8))[0]
return nit
class PSBTSection:
def __init__(self, fd=None, idx=None):
self.defaults()
self.my_index = idx
if not fd: return
while 1:
ks = deser_compact_size(fd)
if ks is None: break
if ks == 0: break
key = fd.read(ks)
vs = deser_compact_size(fd)
val = fd.read(vs)
kt = key[0]
self.parse_kv(kt, key[1:], val)
def serialize(self, fd, my_idx):
def wr(ktype, val, key=b''):
fd.write(ser_compact_size(1 + len(key)))
fd.write(bytes([ktype]) + key)
fd.write(ser_compact_size(len(val)))
fd.write(val)
self.serialize_kvs(wr)
fd.write(b'\0')
class BasicPSBTInput(PSBTSection):
def defaults(self):
self.utxo = None
self.witness_utxo = None
self.part_sigs = {}
self.sighash = None
self.bip32_paths = {}
self.redeem_script = None
self.witness_script = None
self.others = {}
def __eq__(a, b):
if a.sighash != b.sighash:
if a.sighash is not None and b.sighash is not None:
return False
rv = a.utxo == b.utxo and \
a.witness_utxo == b.witness_utxo and \
a.redeem_script == b.redeem_script and \
a.witness_script == b.witness_script and \
a.my_index == b.my_index and \
a.bip32_paths == b.bip32_paths and \
sorted(a.part_sigs.keys()) == sorted(b.part_sigs.keys())
if rv:
# NOTE: equality test on signatures requires parsing DER stupidness
# and some maybe understanding of R/S values on curve that I don't have.
assert all(parse_signature_blob(a.part_sigs[k])
== parse_signature_blob(b.part_sigs[k]) for k in a.part_sigs)
return rv
def parse_kv(self, kt, key, val):
if kt == PSBT_IN_NON_WITNESS_UTXO:
self.utxo = val
assert not key
elif kt == PSBT_IN_WITNESS_UTXO:
self.witness_utxo = val
assert not key
elif kt == PSBT_IN_PARTIAL_SIG:
self.part_sigs[key] = val
elif kt == PSBT_IN_SIGHASH_TYPE:
assert len(val) == 4
self.sighash = struct.unpack("<I", val)[0]
assert not key
elif kt == PSBT_IN_BIP32_DERIVATION:
self.bip32_paths[key] = val
elif kt == PSBT_IN_REDEEM_SCRIPT:
self.redeem_script = val
assert not key
elif kt == PSBT_IN_WITNESS_SCRIPT:
self.witness_script = val
assert not key
elif kt in ( PSBT_IN_REDEEM_SCRIPT,
PSBT_IN_WITNESS_SCRIPT,
PSBT_IN_FINAL_SCRIPTSIG,
PSBT_IN_FINAL_SCRIPTWITNESS):
assert not key
self.others[kt] = val
else:
raise KeyError(kt)
def serialize_kvs(self, wr):
if self.utxo:
wr(PSBT_IN_NON_WITNESS_UTXO, self.utxo)
if self.witness_utxo:
wr(PSBT_IN_WITNESS_UTXO, self.witness_utxo)
if self.redeem_script:
wr(PSBT_IN_REDEEM_SCRIPT, self.redeem_script)
if self.witness_script:
wr(PSBT_IN_WITNESS_SCRIPT, self.witness_script)
for pk, val in sorted(self.part_sigs.items()):
wr(PSBT_IN_PARTIAL_SIG, val, pk)
if self.sighash is not None:
wr(PSBT_IN_SIGHASH_TYPE, struct.pack('<I', self.sighash))
for k in self.bip32_paths:
wr(PSBT_IN_BIP32_DERIVATION, self.bip32_paths[k], k)
for k in self.others:
wr(k, self.others[k])
class BasicPSBTOutput(PSBTSection):
def defaults(self):
self.redeem_script = None
self.witness_script = None
self.bip32_paths = {}
def __eq__(a, b):
return a.redeem_script == b.redeem_script and \
a.witness_script == b.witness_script and \
a.my_index == b.my_index and \
a.bip32_paths == b.bip32_paths
def parse_kv(self, kt, key, val):
if kt == PSBT_OUT_REDEEM_SCRIPT:
self.redeem_script = val
assert not key
elif kt == PSBT_OUT_WITNESS_SCRIPT:
self.witness_script = val
assert not key
elif kt == PSBT_OUT_BIP32_DERIVATION:
self.bip32_paths[key] = val
else:
raise ValueError(kt)
def serialize_kvs(self, wr):
if self.redeem_script:
wr(PSBT_OUT_REDEEM_SCRIPT, self.redeem_script)
if self.witness_script:
wr(PSBT_OUT_WITNESS_SCRIPT, self.witness_script)
for k in self.bip32_paths:
wr(PSBT_OUT_BIP32_DERIVATION, self.bip32_paths[k], k)
class BasicPSBT:
"Just? parse and store"
def __init__(self):
self.txn = None
self.xpubs = {}
self.inputs = []
self.outputs = []
def __eq__(a, b):
return a.txn == b.txn and \
len(a.inputs) == len(b.inputs) and \
len(a.outputs) == len(b.outputs) and \
all(a.inputs[i] == b.inputs[i] for i in range(len(a.inputs))) and \
all(a.outputs[i] == b.outputs[i] for i in range(len(a.outputs))) and \
sorted(a.xpubs.items()) == sorted(b.xpubs.items())
def parse(self, raw):
# auto-detect and decode Base64 and Hex.
if raw[0:10].lower() == b'70736274ff':
raw = a2b_hex(raw.strip())
if raw[0:6] == b'cHNidP':
raw = b64decode(raw)
assert raw[0:5] == b'psbt\xff', "bad magic"
with io.BytesIO(raw[5:]) as fd:
# globals
while 1:
ks = deser_compact_size(fd)
if ks is None: break
if ks == 0: break
key = fd.read(ks)
vs = deser_compact_size(fd)
val = fd.read(vs)
kt = key[0]
if kt == PSBT_GLOBAL_UNSIGNED_TX:
self.txn = val
t = Tx.parse(io.BytesIO(val))
num_ins = len(t.txs_in)
num_outs = len(t.txs_out)
elif kt == PSBT_GLOBAL_XPUB:
self.xpubs[key[1:]] = val
else:
raise ValueError('unknown global key type: 0x%02x' % kt)
assert self.txn, 'missing reqd section'
self.inputs = [BasicPSBTInput(fd, idx) for idx in range(num_ins)]
self.outputs = [BasicPSBTOutput(fd, idx) for idx in range(num_outs)]
sep = fd.read(1)
assert sep == b''
return self
def serialize(self, fd):
def wr(ktype, val, key=b''):
fd.write(ser_compact_size(1 + len(key)))
fd.write(bytes([ktype]) + key)
fd.write(ser_compact_size(len(val)))
fd.write(val)
fd.write(b'psbt\xff')
wr(PSBT_GLOBAL_UNSIGNED_TX, self.txn)
for k in self.xpubs:
wr(PSBT_GLOBAL_XPUB, self.xpubs[k], key=k)
# sep
fd.write(b'\0')
for idx, inp in enumerate(self.inputs):
inp.serialize(fd, idx)
for idx, outp in enumerate(self.outputs):
outp.serialize(fd, idx)
def as_bytes(self):
with io.BytesIO() as fd:
self.serialize(fd)
return fd.getvalue()
def test_my_psbt():
import glob, io
for fn in glob.glob('data/*.psbt'):
if 'missing_txn.psbt' in fn: continue
if 'unknowns-ins.psbt' in fn: continue
raw = open(fn, 'rb').read()
print("\n\nFILE: %s" % fn)
p = BasicPSBT().parse(raw)
fd = io.BytesIO()
p.serialize(fd)
assert p.txn in fd.getvalue()
chk = BasicPSBT().parse(fd.getvalue())
assert chk == p
# EOF

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
click>=6.7
pycoin==0.80

22
setup.py Normal file
View File

@ -0,0 +1,22 @@
# based on <http://click.pocoo.org/5/setuptools/#setuptools-integration>
#
# To use this, install with:
#
# pip install --editable .
from setuptools import setup
setup(
name='psbt_faker',
version='1.0',
py_modules=[],
python_requires='>3.6.0',
install_requires=[
'Click',
],
entry_points='''
[console_scripts]
psbt_faker=main:faker
''',
)