multisig faker

This commit is contained in:
scgbckbone 2025-04-17 13:47:16 +02:00 committed by doc-hex
parent fc66596db0
commit d22631028a
12 changed files with 1889 additions and 99 deletions

View File

@ -7,56 +7,24 @@
# That will create the command "psbt_faker" in your path... or just use "./main.py ..." here
#
#
import click, sys, os, pdb, struct, io, json, re, time
from pprint import pformat, pprint
import click
from binascii import b2a_hex as _b2a_hex
from binascii import a2b_hex
from io import BytesIO
from collections import namedtuple
from base64 import b64encode, b64decode
from pycoin.tx.Tx import Tx
from pycoin.tx.TxOut import TxOut
from pycoin.tx.TxIn import TxIn
from pycoin.ui import standard_tx_out_script
from pycoin.encoding import b2a_hashed_base58, hash160
from pycoin.serialize import b2h_rev, b2h, h2b, h2b_rev
from pycoin.key.BIP32Node import BIP32Node
from pycoin.convention import tx_fee
import urllib.request
from base64 import b64encode
from .txn import *
from .ripemd import ripemd160
from .multisig import from_simple_text
b2a_hex = lambda a: str(_b2a_hex(a), 'ascii')
#xfp2hex = lambda a: b2a_hex(a[::-1]).upper()
SIM_XPUB = 'tpubD6NzVbkrYhZ4XzL5Dhayo67Gorv1YMS7j8pRUvVMd5odC2LBPLAygka9p7748JtSq82FNGPppFEz5xxZUdasBRCqJqXvUHq6xpnsMcYJzeh'
def str2ipath(s):
# convert text to numeric path for BIP174
for i in s.split('/'):
if i == 'm': continue
if not i: continue # trailing or duplicated slashes
if i[-1] in "'ph":
assert len(i) >= 2, i
here = int(i[:-1]) | 0x80000000
else:
here = int(i)
assert 0 <= here < 0x80000000, here
@click.group()
def main():
pass
yield here
def xfp2str(xfp):
# Standardized way to show an xpub's fingerprint... it's a 4-byte string
# and not really an integer. Used to show as '0x%08x' but that's wrong endian.
return b2a_hex(struct.pack('>I', xfp)).upper()
def str2path(xfp, s):
# output binary needed for BIP-174
p = list(str2ipath(s))
return struct.pack('<%dI' % (1 + len(p)), xfp, *p)
@click.command()
@main.command()
@click.argument('out_psbt', type=click.File('wb'), metavar="OUTPUT.PSBT")
@click.argument('xpub', type=str, default=SIM_XPUB)
@click.option('--num-outs', '-n', help="Number of outputs (default 1)", default=1)
@ -98,7 +66,41 @@ def faker(num_change, num_outs, out_psbt, value, testnet, xpub, segwit, fee, sty
#print("\nPSBT to be signed: " + out_psbt.name, end='\n\n')
@main.command()
@click.argument('ms_conf', type=click.File('r'), metavar="CC ms export")
@click.argument('out_psbt', type=click.File('wb'), metavar="OUTPUT.PSBT")
@click.option('--input-amount', '-n', help="Size of each input in sats (default 100k sats each input)", default=100000)
@click.option('--num-ins', help="Number of inputs (default 1)", default=1)
@click.option('--num-outs', help="Number of outputs (default 1)", default=1)
@click.option('--num-change', '-c', help="Number of change outputs (default 1)", default=1)
@click.option('--fee', '-f', help="Miner's fee in Satoshis", default=1000)
@click.option('--locktime', '-l', help="nLocktime value (default current block height)", default=None)
@click.option('--legacy', help="Make inputs be legacy p2sh style", is_flag=True, default=False)
@click.option('--styles', '-a', help="Output address style (multiple ok)", multiple=True, default=None, type=click.Choice(ADDR_STYLES))
@click.option('--base64', '-6', help="Output base64 (default binary)", is_flag=True, default=False)
def ms_faker(ms_conf, out_psbt, num_ins, num_change, num_outs, legacy, fee, styles, base64,
locktime, input_amount):
'''Construct a valid multisig PSBT which spends non-existant BTC to random addresses!'''
if locktime is None:
try:
import urllib.request
u = urllib.request.urlopen("https://mempool.space/api/blocks/tip/height")
locktime = int(u.read().decode())
except:
locktime = 0
ms_config = ms_conf.read()
name, af, keys, M, N = from_simple_text(ms_config.split("\n"))
psbt = fake_ms_txn(num_ins, num_outs, M, keys, fee=fee, locktime=locktime,
change_outputs=list(range(num_change)), outstyles=styles,
segwit_in=not legacy, input_amount=input_amount)
out_psbt.write(psbt if not base64 else b64encode(psbt))
if __name__ == '__main__':
faker()
main()
# EOF

98
psbt_faker/base58.py Normal file
View File

@ -0,0 +1,98 @@
# This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivatives 4.0 International License
import hashlib
BASE58_ALPHABET = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
def hash256(s: bytes) -> bytes:
"""
two rounds of sha256
:param s: data
:return: hashed data
"""
return hashlib.sha256(hashlib.sha256(s).digest()).digest()
def encode_base58(data: bytes) -> str:
"""
Encode base58.
:param data: data to encode
:return: base58 encoded string
"""
count = 0
for c in data:
if c == 0:
count += 1
else:
break
num = int.from_bytes(data, 'big')
prefix = '1' * count
result = ''
while num > 0:
num, mod = divmod(num, 58)
result = BASE58_ALPHABET[mod] + result
return prefix + result
def encode_base58_checksum(data: bytes) -> str:
"""
Encode base58 checksum.
:param data: data to encode
:return: base58 encoded string with checksum
"""
return encode_base58(data + hash256(data)[:4])
def decode_base58(s: str) -> bytes:
"""
Decode base58.
:param s: base58 encoded string
:return: decoded data
"""
num = 0
for c in s:
if c not in BASE58_ALPHABET:
raise ValueError(
"character {} is not valid base58 character".format(c)
)
num *= 58
num += BASE58_ALPHABET.index(c)
h = hex(num)[2:]
h = '0' + h if len(h) % 2 else h
res = bytes.fromhex(h)
# Add padding back.
pad = 0
for c in s[:-1]:
if c == BASE58_ALPHABET[0]:
pad += 1
else:
break
return b'\x00' * pad + res
def decode_base58_checksum(s: str) -> bytes:
"""
Decode base58 checksum.
:param s: base58 encoded string with checksum
:return: decoded data (without checksum)
"""
num_bytes = decode_base58(s=s)
checksum = num_bytes[-4:]
if hash256(num_bytes[:-4])[:4] != checksum:
raise ValueError(
'bad checksum: {} {}'.format(
checksum,
hash256(num_bytes[:-4])[:4]
)
)
return num_bytes[:-4]

756
psbt_faker/bip32.py Normal file
View File

@ -0,0 +1,756 @@
# (c) Copyright 2024 by Coinkite Inc. This file is covered by license found in COPYING-CC.
#
import hashlib, hmac
from typing import Union
from .segwit_addr import encode as bech32_encode
from io import BytesIO
try:
from pysecp256k1 import (
ec_seckey_verify, ec_pubkey_create, ec_pubkey_serialize, ec_pubkey_parse,
ec_seckey_tweak_add, ec_pubkey_tweak_add,
)
except ImportError:
import ecdsa
SECP256k1 = ecdsa.curves.SECP256k1
CURVE_GEN = ecdsa.ecdsa.generator_secp256k1
CURVE_ORDER = CURVE_GEN.order()
FIELD_ORDER = SECP256k1.curve.p()
INFINITY = ecdsa.ellipticcurve.INFINITY
from .base58 import encode_base58_checksum, decode_base58_checksum
from .ripemd import hash160
from .helpers import str2ipath
HARDENED = 2 ** 31
Prv_or_PubKeyNode = Union["PrvKeyNode", "PubKeyNode"]
class InvalidKeyError(Exception):
"""Raised when derived key is invalid"""
def big_endian_to_int(b: bytes) -> int:
"""
Big endian representation to integer.
:param b: big endian representation
:return: integer
"""
return int.from_bytes(b, "big")
def int_to_big_endian(n: int, length: int) -> bytes:
"""
Represents integer in big endian byteorder.
:param n: integer
:param length: byte length
:return: big endian
"""
return n.to_bytes(length, "big")
def hmac_sha512(key: bytes, msg: bytes) -> bytes:
"""
Hash-based message authentication code with sha512
:param key: secret key
:param msg: message
:return: digest bytes
"""
return hmac.new(key=key, msg=msg, digestmod=hashlib.sha512).digest()
class PrivateKey(object):
__slots__ = (
"k",
"K"
)
def __init__(self, sec_exp: Union[bytes, int]):
"""
Initializes private key.
:param sec_exp: secret
"""
if isinstance(sec_exp, int):
sec_exp = int_to_big_endian(sec_exp, 32)
try:
ec_seckey_verify(sec_exp)
self.k = sec_exp
self.K = PublicKey(ec_pubkey_create(self.k))
except NameError:
k = ecdsa.SigningKey.from_string(sec_exp, curve=SECP256k1)
self.K = PublicKey(pub_key=k.get_verifying_key())
self.k = k.to_string()
else:
ec_seckey_verify(self.k)
self.K = PublicKey(ec_pubkey_create(self.k))
def __bytes__(self) -> bytes:
"""
Encodes private key into corresponding byte sequence.
:return: byte representation of PrivateKey object
"""
return self.k
def __eq__(self, other: "PrivateKey") -> bool:
"""
Checks whether two private keys are equal.
:param other: other private key
"""
return self.k == other.k
def wif(self, compressed: bool = True, testnet: bool = False) -> str:
"""
Encodes private key into wallet import/export format.
:param compressed: whether public key is compressed (default=True)
:param testnet: whether to encode as a testnet key (default=False)
:return: WIF encoded private key
"""
prefix = b"\xef" if testnet else b"\x80"
suffix = b"\x01" if compressed else b""
return encode_base58_checksum(prefix + bytes(self) + suffix)
def tweak_add(self, tweak32: bytes) -> "PrivateKey":
tweaked = ec_seckey_tweak_add(self.k, tweak32)
return PrivateKey(sec_exp=tweaked)
@classmethod
def from_wif(cls, wif_str: str) -> "PrivateKey":
"""
Initializes private key from wallet import format encoding.
:param wif_str: wallet import format private key
:return: private key
"""
decoded = decode_base58_checksum(s=wif_str)
if wif_str[0] in ("K", "L", "c"):
# compressed key --> so remove last byte that has to be 01
assert decoded[-1] == 1
decoded = decoded[:-1]
return cls(sec_exp=decoded[1:])
@classmethod
def parse(cls, key_bytes: bytes) -> "PrivateKey":
"""
Initializes private key from byte sequence.
:param key_bytes: byte representation of private key
:return: private key
"""
return cls(sec_exp=key_bytes)
@classmethod
def from_int(cls, sec_exp: int) -> "Privatekey":
return cls(sec_exp=int_to_big_endian(sec_exp, 32))
class PublicKey(object):
__slots__ = (
"K"
)
def __init__(self, pub_key):
"""
Initializes PublicKey object.
:param key: secp256k1 pubkey or ecdsa.VerifyingKey
"""
self.K = pub_key
def __eq__(self, other: "PublicKey") -> bool:
"""
Checks whether two public keys are equal.
:param other: other public key
"""
return self.sec() == other.sec()
@property
def point(self): # -> ecdsa.ellipticcurve.Point:
"""
Point on curve (x and y coordinates).
:return: point on curve
"""
return self.K.pubkey.point
def sec(self, compressed: bool = True) -> bytes:
"""
Encodes public key to SEC format.
:param compressed: whether to use compressed format (default=True)
:return: SEC encoded public key
"""
try:
return ec_pubkey_serialize(self.K, compressed=compressed)
except NameError:
return self.K.to_string(encoding="compressed" if compressed else "uncompressed")
def tweak_add(self, tweak32: bytes) -> "PublicKey":
return PublicKey(pub_key=ec_pubkey_tweak_add(self.K, tweak32))
@classmethod
def parse(cls, key_bytes: bytes) -> "PublicKey":
"""
Initializes public key from byte sequence.
:param key_bytes: byte representation of public key
:return: public key
"""
try:
return cls(pub_key=ec_pubkey_parse(key_bytes))
except NameError:
return cls(ecdsa.VerifyingKey.from_string(key_bytes, curve=SECP256k1))
@classmethod
def from_point(cls, point) -> "PublicKey":
"""
Initializes public key from point on elliptic curve.
:param point: point on elliptic curve
:return: public key
"""
return cls(ecdsa.VerifyingKey.from_public_point(point, curve=SECP256k1))
def h160(self, compressed: bool = True) -> bytes:
"""
SHA256 followed by RIPEMD160 of public key.
:param compressed: whether to use compressed format (default=True)
:return: SHA256(RIPEMD160(public key))
"""
return hash160(self.sec(compressed=compressed))
def address(self, compressed: bool = True, testnet: bool = False,
addr_fmt: str = "p2wpkh") -> str:
"""
Generates bitcoin address from public key.
:param compressed: whether to use compressed format (default=True)
:param testnet: whether to encode as a testnet address (default=False)
:param addr_type: which address type to generate:
1. p2pkh
2. p2sh-p2wpkh
3. p2wpkh (default)
:return: bitcoin address
"""
h160 = self.h160(compressed=compressed)
if addr_fmt == "p2pkh":
prefix = b"\x6f" if testnet else b"\x00"
return encode_base58_checksum(prefix + h160)
elif addr_fmt == "p2wpkh":
hrp = "tb" if testnet else "bc"
return bech32_encode(hrp=hrp, witver=0, witprog=h160)
elif addr_fmt == "p2sh-p2wpkh":
scr = b"\x00\x14" + h160 # witversion 0 + pubkey hash
h160 = hash160(scr)
prefix = b"\xc4" if testnet else b"\x05"
return encode_base58_checksum(prefix + h160)
raise ValueError("Unsupported address type.")
class PubKeyNode(object):
mark: str = "M"
testnet_version: int = 0x043587CF
mainnet_version: int = 0x0488B21E
__slots__ = (
"parent",
"key",
"chain_code",
"depth",
"index",
"parsed_parent_fingerprint",
"parsed_version",
"testnet",
"children"
)
def __init__(self, key: bytes, chain_code: bytes, index: int = 0,
depth: int = 0, testnet: bool = False,
parent: Union["PubKeyNode", "PrvKeyNode"] = None,
parent_fingerprint: bytes = None):
"""
Initializes Pub/PrvKeyNode.
:param key: public or private key
:param chain_code: chain code
:param index: current node derivation index (default=0)
:param depth: current node depth (default=0)
:param testnet: whether this node is testnet node (default=False)
:param parent: parent node of the current node (default=None)
:param parent_fingerprint: fingerprint of parent node (default=None)
"""
self.parent = parent
self.key = key
self.chain_code = chain_code
self.depth = depth
self.index = index
self.parsed_parent_fingerprint = parent_fingerprint
self.parsed_version = None
self.testnet = testnet
def __eq__(self, other) -> bool:
"""
Checks whether two private/public key nodes are equal.
:param other: other private/public key node
"""
if type(self) != type(other):
return False
self_key = big_endian_to_int(self.key)
other_key = big_endian_to_int(other.key)
return self_key == other_key and \
self.chain_code == other.chain_code and \
self.depth == other.depth and \
self.index == other.index and \
self.testnet == other.testnet and \
self.parent_fingerprint == other.parent_fingerprint
@property
def public_key(self) -> PublicKey:
"""
Public key node's public key.
:return: public key of public key node
"""
return PublicKey.parse(key_bytes=self.key)
@property
def parent_fingerprint(self) -> bytes:
"""
Gets parent fingerprint.
If node is parsed from extended key, only parsed parent fingerprint
is available. If node is derived, parent fingerprint is calculated
from parent node.
:return: parent fingerprint
"""
if self.parent:
fingerprint = self.parent.fingerprint()
else:
fingerprint = self.parsed_parent_fingerprint
# in case there is still None here - it is master
return fingerprint or b"\x00\x00\x00\x00"
@property
def pub_version(self) -> int:
"""
Decides which extended public key version integer to use
based on testnet parameter.
:return: extended public key version
"""
if self.testnet:
return PubKeyNode.testnet_version
return PubKeyNode.mainnet_version
def __repr__(self) -> str:
if self.is_master() or self.is_root():
return self.mark
if self.is_hardened():
index = str(self.index - 2**31) + "'"
else:
index = str(self.index)
parent = str(self.parent) if self.parent else self.mark
return parent + "/" + index
def is_hardened(self) -> bool:
"""Check whether current key node is hardened."""
return self.index >= 2**31
def is_master(self) -> bool:
"""Check whether current key node is master node."""
return self.depth == 0 and self.index == 0 and self.parent is None
def is_root(self) -> bool:
"""Check whether current key node is root (has no parent)."""
return self.parent is None
def fingerprint(self) -> bytes:
"""
Gets current node fingerprint.
:return: first four bytes of SHA256(RIPEMD160(public key))
"""
return hash160(self.public_key.sec())[:4]
@classmethod
def parse(cls, s: Union[str, bytes, BytesIO],
testnet: bool = False) -> Prv_or_PubKeyNode:
"""
Initializes private/public key node from serialized node or
extended key.
:param s: serialized node or extended key
:param testnet: whether this node is testnet node
:return: public/private key node
"""
if isinstance(s, str):
s = BytesIO(decode_base58_checksum(s=s))
elif isinstance(s, bytes):
s = BytesIO(s)
elif isinstance(s, BytesIO):
pass
else:
raise ValueError("has to be bytes, str or BytesIO")
return cls._parse(s, testnet=testnet)
@classmethod
def _parse(cls, s: BytesIO, testnet: bool = False) -> Prv_or_PubKeyNode:
"""
Initializes private/public key node from serialized node buffer.
:param s: serialized node buffer
:param testnet: whether this node is testnet node (default=False)
:return: public/private key node
"""
version = big_endian_to_int(s.read(4))
depth = big_endian_to_int(s.read(1))
parent_fingerprint = s.read(4)
index = big_endian_to_int(s.read(4))
chain_code = s.read(32)
key_bytes = s.read(33)
key = cls(
key=key_bytes,
chain_code=chain_code,
index=index,
depth=depth,
testnet=testnet,
parent_fingerprint=parent_fingerprint,
)
key.parsed_version = version
return key
def _serialize(self, key: bytes, version: int = None) -> bytes:
"""
Serializes public/private key node to extended key format.
:param version: extended public/private key version (default=None)
:return: serialized extended public/private key node
"""
# 4 byte: version bytes
result = int_to_big_endian(version, 4)
# 1 byte: depth: 0x00 for master nodes, 0x01 for level-1 derived keys
result += int_to_big_endian(self.depth, 1)
# 4 bytes: the fingerprint of the parent key (0x00000000 if master key)
if self.is_master():
result += int_to_big_endian(0x00000000, 4)
else:
result += self.parent_fingerprint
# 4 bytes: child number. This is ser32(i) for i in xi = xpar/i,
# with xi the key being serialized. (0x00000000 if master key)
result += int_to_big_endian(self.index, 4)
# 32 bytes: the chain code
result += self.chain_code
# 33 bytes: the public key or private key data
# (serP(K) for public keys, 0x00 || ser256(k) for private keys)
result += key
return result
def serialize_public(self, version: int = None) -> bytes:
"""
Serializes public key node to extended key format.
:param version: extended public key version (default=None)
:return: serialized extended public key node
"""
return self._serialize(
version=self.pub_version if version is None else version,
key=self.public_key.sec()
)
def extended_public_key(self, version: int = None) -> str:
"""
Base58 encodes serialized public key node. If version is not
provided (default) it is determined by result of self.pub_version.
:param version: extended public key version (default=None)
:return: extended public key
"""
return encode_base58_checksum(self.serialize_public(version=version))
def ckd(self, index: int) -> "PubKeyNode":
"""
The function CKDpub((Kpar, cpar), i) (Ki, ci) computes a child
extended public key from the parent extended public key.
It is only defined for non-hardened child keys.
* Check whether i 231 (whether the child is a hardened key).
* If so (hardened child):
return failure
* If not (normal child):
let I = HMAC-SHA512(Key=cpar, Data=serP(Kpar) || ser32(i)).
* Split I into two 32-byte sequences, IL and IR.
* The returned child key Ki is point(parse256(IL)) + Kpar.
* The returned chain code ci is IR.
* In case parse256(IL) n or Ki is the point at infinity,
the resulting key is invalid, and one should proceed with the next
value for i.
:param index: derivation index
:return: derived child
"""
if index >= HARDENED:
raise RuntimeError("failure: hardened child for public ckd")
I = hmac_sha512(
key=self.chain_code,
msg=self.key + int_to_big_endian(index, 4)
)
IL, IR = I[:32], I[32:]
# TODO this does not check whether IL is not zero (secp256k1 also does not check)
try:
Ki = self.public_key.tweak_add(IL)
except NameError:
if big_endian_to_int(IL) >= CURVE_ORDER:
InvalidKeyError(
"public key {} is greater/equal to curve order".format(
big_endian_to_int(IL)
)
)
point = PrivateKey.parse(IL).K.point + self.public_key.point
if point == INFINITY:
raise InvalidKeyError("public key is a point at infinity")
Ki = PublicKey.from_point(point=point)
child = self.__class__(
key=Ki.sec(),
chain_code=IR,
index=index,
depth=self.depth + 1,
testnet=self.testnet,
parent=self
)
return child
class PrvKeyNode(PubKeyNode):
mark: str = "m"
testnet_version: int = 0x04358394
mainnet_version: int = 0x0488ADE4
@property
def private_key(self) -> PrivateKey:
"""
Private key node's private key.
:return: public key of private key node
"""
if len(self.key) == 33 and self.key[0] == 0:
return PrivateKey(self.key[1:])
return PrivateKey(self.key)
@property
def public_key(self) -> PublicKey:
"""
Private key node's public key.
:return: public key of public key node
"""
return self.private_key.K
@property
def prv_version(self) -> int:
"""
Decides which extended private key version integer to use
based on testnet parameter.
:return: extended private key version
"""
if self.testnet:
return PrvKeyNode.testnet_version
return PrvKeyNode.mainnet_version
@classmethod
def master_key(cls, bip39_seed: bytes, testnet=False) -> "PrvKeyNode":
"""
Generates master private key node from bip39 seed.
* Generate a seed byte sequence S (bip39_seed arg) of a chosen length
(between 128 and 512 bits; 256 bits is advised) from a (P)RNG.
* Calculate I = HMAC-SHA512(Key = "Bitcoin seed", Data = S)
* Split I into two 32-byte sequences, IL and IR.
* Use parse256(IL) as master secret key, and IR as master chain code.
:param bip39_seed: bip39_seed
:param testnet: whether this node is testnet node (default=False)
:return: master private key node
"""
I = hmac_sha512(key=b"Bitcoin seed", msg=bip39_seed)
# private key
IL = I[:32]
# In case IL is 0 or ≥ n, the master key is invalid
int_left_key = big_endian_to_int(IL)
if int_left_key == 0:
raise InvalidKeyError("master key is zero")
try:
ec_seckey_verify(IL)
except NameError:
if int_left_key >= CURVE_ORDER:
raise InvalidKeyError(
"master key {} is greater/equal to curve order".format(
int_left_key
)
)
# chain code
IR = I[32:]
return cls(
key=IL,
chain_code=IR,
testnet=testnet
)
def serialize_private(self, version: int = None) -> bytes:
"""
Serializes private key node to extended key format.
:param version: extended private key version (default=None)
:return: serialized extended private key node
"""
return self._serialize(
version=self.prv_version if version is None else version,
key=b"\x00" + bytes(self.private_key)
)
def extended_private_key(self, version: int = None) -> str:
"""
Base58 encodes serialized private key node. If version is not
provided (default) it is determined by result of self.prv_version.
:param version: extended private key version (default=None)
:return: extended private key
"""
return encode_base58_checksum(self.serialize_private(version=version))
def ckd(self, index: int) -> "PrvKeyNode":
"""
The function CKDpriv((kpar, cpar), i) (ki, ci) computes
a child extended private key from the parent extended private key:
* Check whether i 2**31 (whether the child is a hardened key).
* If so (hardened child):
let I = HMAC-SHA512(Key=cpar, Data=0x00 || ser256(kpar) || ser32(i))
(Note: The 0x00 pads the private key to make it 33 bytes long.)
* If not (normal child):
let I = HMAC-SHA512(Key=cpar, Data=serP(point(kpar)) || ser32(i))
* Split I into two 32-byte sequences, IL and IR.
* The returned child key ki is parse256(IL) + kpar (mod n).
* The returned chain code ci is IR.
* In case parse256(IL) n or ki = 0, the resulting key is invalid,
and one should proceed with the next value for i.
(Note: this has probability lower than 1 in 2**127.)
:param index: derivation index
:return: derived child
"""
if index >= HARDENED:
# hardened
data = b"\x00"+bytes(self.private_key) + int_to_big_endian(index, 4)
else:
data = self.public_key.sec() + int_to_big_endian(index, 4)
I = hmac_sha512(key=self.chain_code, msg=data)
IL, IR = I[:32], I[32:]
try:
ki = self.private_key.tweak_add(IL)
# if ki == PrivateKey.from_int(0):
# InvalidKeyError("private key is zero")
except NameError:
if big_endian_to_int(IL) >= CURVE_ORDER:
InvalidKeyError(
"private key {} is greater/equal to curve order".format(
big_endian_to_int(IL)
)
)
ki = (int.from_bytes(IL, "big") +
big_endian_to_int(bytes(self.private_key))) % CURVE_ORDER
if ki == 0:
InvalidKeyError("private key is zero")
ki = int_to_big_endian(ki, 32)
child = self.__class__(
key=bytes(ki),
chain_code=IR,
index=index,
depth=self.depth + 1,
testnet=self.testnet,
parent=self
)
return child
class BIP32Node:
def __init__(self, node, netcode="XTN"):
self.node = node
self._netcode = netcode
@classmethod
def from_master_secret(cls, bip39_seed: bytes, netcode="XTN"):
return cls(PrvKeyNode.master_key(bip39_seed, False if netcode == "BTC" else True),
netcode=netcode)
@classmethod
def from_hwif(cls, extended_key):
assert extended_key[0] in "xt"
testnet = extended_key[0] == "t"
if extended_key[1:4] == "prv":
ek = PrvKeyNode.parse(extended_key, testnet)
else:
ek = PubKeyNode.parse(extended_key, testnet)
return cls(ek, netcode="XTN" if testnet else "BTC")
def subkey_for_path(self, path):
path_list = list(str2ipath(path))
node = self.node
for idx in path_list:
node = node.ckd(idx)
return BIP32Node(node)
def hwif(self, as_private=False):
is_pub = type(self.node) is PubKeyNode
if is_pub and as_private:
raise ValueError("no private key")
if as_private:
return self.node.extended_private_key()
return self.node.extended_public_key()
@classmethod
def from_wallet_key(cls, extended_key):
return cls.from_hwif(extended_key)
def hash160(self, compressed=True):
return self.node.public_key.h160(compressed)
def address(self, compressed=True, netcode="XTN", addr_fmt="p2pkh"):
return self.node.public_key.address(compressed, addr_fmt=addr_fmt,
testnet=False if netcode == "BTC" else True)
def sec(self, compressed=True):
return self.node.public_key.sec(compressed)
def fingerprint(self):
return self.node.fingerprint()
def netcode(self):
return self._netcode
def chain_code(self):
return self.node.chain_code
def privkey(self):
assert isinstance(self.node, PrvKeyNode)
return bytes(self.node.private_key)
def parent_fingerprint(self):
return self.node.parent_fingerprint

278
psbt_faker/ctransaction.py Normal file
View File

@ -0,0 +1,278 @@
#!/usr/bin/env python3
# Copyright (c) 2010 ArtForz -- public domain half-a-node
# Copyright (c) 2012 Jeff Garzik
# Copyright (c) 2010-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Bitcoin Object Python Serializations
Modified from the test/test_framework/mininode.py file from the
Bitcoin repository
CTransaction,CTxIn, CTxOut, etc....:
data structures that should map to corresponding structures in
bitcoin/primitives for transactions only
"""
import copy, struct, hashlib
from .serialize import (
deser_uint256,
deser_string,
deser_string_vector,
deser_vector,
ser_uint256,
ser_string,
ser_string_vector,
ser_vector,
uint256_from_str,
)
from typing import (
List,
Optional,
)
# Objects that map to bitcoind objects, which can be serialized/deserialized
MSG_WITNESS_FLAG = 1 << 30
def hash256(data):
return hashlib.sha256(hashlib.sha256(data).digest()).digest()
class COutPoint(object):
def __init__(self, hash: int = 0, n: int = 0xffffffff):
self.hash = hash
self.n = n
def deserialize(self, f) -> None:
self.hash = deser_uint256(f)
self.n = struct.unpack("<I", f.read(4))[0]
def serialize(self) -> bytes:
r = b""
r += ser_uint256(self.hash)
r += struct.pack("<I", self.n)
return r
def __repr__(self) -> str:
return "COutPoint(hash=%064x n=%i)" % (self.hash, self.n)
class CTxIn(object):
def __init__(
self,
outpoint: Optional[COutPoint] = None,
scriptSig: bytes = b"",
nSequence: int = 0,
):
if outpoint is None:
self.prevout = COutPoint()
else:
self.prevout = outpoint
self.scriptSig = scriptSig
self.nSequence = nSequence
def deserialize(self, f) -> None:
self.prevout = COutPoint()
self.prevout.deserialize(f)
self.scriptSig = deser_string(f)
self.nSequence = struct.unpack("<I", f.read(4))[0]
def serialize(self) -> bytes:
r = b""
r += self.prevout.serialize()
r += ser_string(self.scriptSig)
r += struct.pack("<I", self.nSequence)
return r
def __repr__(self) -> str:
return "CTxIn(prevout=%s scriptSig=%s nSequence=%i)" \
% (repr(self.prevout), self.scriptSig.hex(),
self.nSequence)
class CTxOut(object):
def __init__(self, nValue: int = 0, scriptPubKey: bytes = b""):
self.nValue = nValue
self.scriptPubKey = scriptPubKey
def deserialize(self, f) -> None:
self.nValue = struct.unpack("<q", f.read(8))[0]
self.scriptPubKey = deser_string(f)
def serialize(self) -> bytes:
r = b""
r += struct.pack("<q", self.nValue)
r += ser_string(self.scriptPubKey)
return r
def __repr__(self) -> str:
return "CTxOut(nValue=%i.%08i scriptPubKey=%s)" \
% (self.nValue // 100_000_000, self.nValue % 100_000_000, self.scriptPubKey.hex())
class CScriptWitness(object):
def __init__(self) -> None:
# stack is a vector of strings
self.stack: List[bytes] = []
def __repr__(self) -> str:
return "CScriptWitness(%s)" % \
(",".join([x.hex() for x in self.stack]))
def is_null(self) -> bool:
if self.stack:
return False
return True
class CTxInWitness(object):
def __init__(self) -> None:
self.scriptWitness = CScriptWitness()
def deserialize(self, f) -> None:
self.scriptWitness.stack = deser_string_vector(f)
def serialize(self) -> bytes:
return ser_string_vector(self.scriptWitness.stack)
def __repr__(self) -> str:
return repr(self.scriptWitness)
def is_null(self) -> bool:
return self.scriptWitness.is_null()
class CTxWitness(object):
def __init__(self) -> None:
self.vtxinwit: List[CTxInWitness] = []
def deserialize(self, f) -> None:
for i in range(len(self.vtxinwit)):
self.vtxinwit[i].deserialize(f)
def serialize(self) -> bytes:
r = b""
# This is different than the usual vector serialization --
# we omit the length of the vector, which is required to be
# the same length as the transaction's vin vector.
for x in self.vtxinwit:
r += x.serialize()
return r
def __repr__(self) -> str:
return "CTxWitness(%s)" % \
(';'.join([repr(x) for x in self.vtxinwit]))
def is_null(self) -> bool:
for x in self.vtxinwit:
if not x.is_null():
return False
return True
class CTransaction(object):
def __init__(self, tx: Optional['CTransaction'] = None) -> None:
if tx is None:
self.nVersion = 1
self.vin: List[CTxIn] = []
self.vout: List[CTxOut] = []
self.wit = CTxWitness()
self.nLockTime = 0
self.sha256: Optional[int] = None
self.hash: Optional[bytes] = None
else:
self.nVersion = tx.nVersion
self.vin = copy.deepcopy(tx.vin)
self.vout = copy.deepcopy(tx.vout)
self.nLockTime = tx.nLockTime
self.sha256 = tx.sha256
self.hash = tx.hash
self.wit = copy.deepcopy(tx.wit)
def deserialize(self, f) -> None:
self.nVersion = struct.unpack("<i", f.read(4))[0]
self.vin = deser_vector(f, CTxIn)
flags = 0
if len(self.vin) == 0:
flags = struct.unpack("<B", f.read(1))[0]
# Not sure why flags can't be zero, but this
# matches the implementation in bitcoind
if (flags != 0):
self.vin = deser_vector(f, CTxIn)
self.vout = deser_vector(f, CTxOut)
else:
self.vout = deser_vector(f, CTxOut)
if flags != 0:
self.wit.vtxinwit = [CTxInWitness() for i in range(len(self.vin))]
self.wit.deserialize(f)
self.nLockTime = struct.unpack("<I", f.read(4))[0]
self.sha256 = None
self.hash = None
def serialize_without_witness(self) -> bytes:
r = b""
r += struct.pack("<i", self.nVersion)
r += ser_vector(self.vin)
r += ser_vector(self.vout)
r += struct.pack("<I", self.nLockTime)
return r
# Only serialize with witness when explicitly called for
def serialize_with_witness(self) -> bytes:
flags = 0
if not self.wit.is_null():
flags |= 1
r = b""
r += struct.pack("<i", self.nVersion)
if flags:
r += ser_vector([])
r += struct.pack("<B", flags)
r += ser_vector(self.vin)
r += ser_vector(self.vout)
if flags & 1:
if (len(self.wit.vtxinwit) != len(self.vin)):
# vtxinwit must have the same length as vin
self.wit.vtxinwit = self.wit.vtxinwit[:len(self.vin)]
for _ in range(len(self.wit.vtxinwit), len(self.vin)):
self.wit.vtxinwit.append(CTxInWitness())
r += self.wit.serialize()
r += struct.pack("<I", self.nLockTime)
return r
# Regular serialization is without witness -- must explicitly
# call serialize_with_witness to include witness data.
def serialize(self) -> bytes:
return self.serialize_without_witness()
# Recalculate the txid (transaction hash without witness)
def rehash(self) -> None:
self.sha256 = None
self.calc_sha256()
# We will only cache the serialization without witness in
# self.sha256 and self.hash -- those are expected to be the txid.
def calc_sha256(self, with_witness: bool = False) -> Optional[int]:
if with_witness:
# Don't cache the result, just return it
return uint256_from_str(hash256(self.serialize_with_witness()))
if self.sha256 is None:
self.sha256 = uint256_from_str(hash256(self.serialize_without_witness()))
self.hash = hash256(self.serialize())
return None
def is_null(self) -> bool:
return len(self.vin) == 0 and len(self.vout) == 0
def txid(self):
# convenience
if self.sha256 is None:
self.calc_sha256()
return self.sha256.to_bytes(32, "big")
def __repr__(self) -> str:
return "CTransaction(nVersion=%i vin=%s vout=%s wit=%s nLockTime=%i)" \
% (self.nVersion, repr(self.vin), repr(self.vout), repr(self.wit), self.nLockTime)

26
psbt_faker/helpers.py Normal file
View File

@ -0,0 +1,26 @@
import struct
def str2ipath(s):
# convert text to numeric path for BIP174
for i in s.split('/'):
if i == 'm': continue
if not i: continue # trailing or duplicated slashes
if i[-1] in "'ph":
assert len(i) >= 2, i
here = int(i[:-1]) | 0x80000000
else:
here = int(i)
assert 0 <= here < 0x80000000, here
yield here
def xfp2str(xfp):
# Standardized way to show an xpub's fingerprint... it's a 4-byte string
# and not really an integer. Used to show as '0x%08x' but that's wrong endian.
return struct.pack('>I', xfp).hex().upper()
def str2path(xfp, s):
# output binary needed for BIP-174
p = list(str2ipath(s))
return bytes.fromhex(xfp) + struct.pack('<%dI' % (len(p)), *p)

67
psbt_faker/multisig.py Normal file
View File

@ -0,0 +1,67 @@
import re
from .bip32 import BIP32Node
def from_simple_text(lines):
# standard multisig file format - more than one line
M, N = -1, -1
deriv = None
name = None
xpubs = []
addr_fmt = "p2sh"
for ln in lines:
# remove comments
comm = ln.find('#')
if comm == 0:
continue
if comm != -1:
if not ln[comm + 1:comm + 2].isdigit():
ln = ln[0:comm]
ln = ln.strip()
if ':' not in ln:
if 'pub' in ln:
# pointless optimization: allow bare xpub if we can calc xfp
label = '00000000'
value = ln
else:
# complain?
# if ln: print("no colon: " + ln)
continue
else:
label, value = ln.split(':', 1)
label = label.lower()
value = value.strip()
if label == 'name':
name = value
elif label == 'policy':
try:
# accepts: 2 of 3 2/3 2,3 2 3 etc
mat = re.search(r'(\d+)\D*(\d+)', value)
assert mat
M = int(mat.group(1))
N = int(mat.group(2))
assert 1 <= M <= N <= 15
except:
raise AssertionError('bad policy line')
elif label == 'derivation':
# reveal the path derivation for following key(s)
try:
assert value, 'blank'
deriv = value
except BaseException as exc:
raise AssertionError('bad derivation line: ' + str(exc))
elif label == 'format':
# pick segwit vs. classic vs. wrapped version
value = value.lower()
assert value in ['p2wsh', 'p2sh', 'p2sh-p2wsh', 'p2wsh-p2sh']
addr_fmt = value
elif len(label) == 8:
xpubs.append((label, deriv, BIP32Node.from_hwif(value)))
return name, addr_fmt, xpubs, M, N

View File

@ -5,14 +5,8 @@
#
import io, struct
from binascii import b2a_hex as _b2a_hex
from binascii import a2b_hex as _a2b_hex
from collections import namedtuple
from base64 import b64encode
from pycoin.tx.Tx import Tx
#from pycoin.tx.TxOut import TxOut
#from pycoin.encoding import b2a_hashed_base58, a2b_hashed_base58
from pycoin.tx.script.check_signature import parse_signature_blob
from binascii import b2a_hex, a2b_hex
from .ctransaction import CTransaction
from binascii import a2b_hex
from base64 import b64decode
b2a_hex = lambda a: str(_b2a_hex(a), 'ascii')
@ -123,8 +117,7 @@ class BasicPSBTInput(PSBTSection):
if rv:
# NOTE: equality test on signatures requires parsing DER stupidness
# and some maybe understanding of R/S values on curve that I don't have.
assert all(parse_signature_blob(a.part_sigs[k])
== parse_signature_blob(b.part_sigs[k]) for k in a.part_sigs)
assert all(a.part_sigs[k] == b.part_sigs[k] for k in a.part_sigs)
return rv
def parse_kv(self, kt, key, val):
@ -235,7 +228,7 @@ class BasicPSBT:
raw = b64decode(raw)
assert raw[0:5] == b'psbt\xff', "bad magic"
with io.BytesIO(raw[5:]) as fd:
with (io.BytesIO(raw[5:]) as fd):
# globals
while 1:
@ -252,9 +245,10 @@ class BasicPSBT:
if kt == PSBT_GLOBAL_UNSIGNED_TX:
self.txn = val
t = Tx.parse(io.BytesIO(val))
num_ins = len(t.txs_in)
num_outs = len(t.txs_out)
t = CTransaction()
t.deserialize(io.BytesIO(val))
num_ins = len(t.vin)
num_outs = len(t.vout)
elif kt == PSBT_GLOBAL_XPUB:
self.xpubs[key[1:]] = val
else:

134
psbt_faker/ripemd.py Normal file
View File

@ -0,0 +1,134 @@
# Copyright (c) 2021 Pieter Wuille
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test-only pure Python RIPEMD160 implementation."""
import unittest, hashlib
# Message schedule indexes for the left path.
ML = [
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
7, 4, 13, 1, 10, 6, 15, 3, 12, 0, 9, 5, 2, 14, 11, 8,
3, 10, 14, 4, 9, 15, 8, 1, 2, 7, 0, 6, 13, 11, 5, 12,
1, 9, 11, 10, 0, 8, 12, 4, 13, 3, 7, 15, 14, 5, 6, 2,
4, 0, 5, 9, 7, 12, 2, 10, 14, 1, 3, 8, 11, 6, 15, 13
]
# Message schedule indexes for the right path.
MR = [
5, 14, 7, 0, 9, 2, 11, 4, 13, 6, 15, 8, 1, 10, 3, 12,
6, 11, 3, 7, 0, 13, 5, 10, 14, 15, 8, 12, 4, 9, 1, 2,
15, 5, 1, 3, 7, 14, 6, 9, 11, 8, 12, 2, 10, 0, 4, 13,
8, 6, 4, 1, 3, 11, 15, 0, 5, 12, 2, 13, 9, 7, 10, 14,
12, 15, 10, 4, 1, 5, 8, 7, 6, 2, 13, 14, 0, 3, 9, 11
]
# Rotation counts for the left path.
RL = [
11, 14, 15, 12, 5, 8, 7, 9, 11, 13, 14, 15, 6, 7, 9, 8,
7, 6, 8, 13, 11, 9, 7, 15, 7, 12, 15, 9, 11, 7, 13, 12,
11, 13, 6, 7, 14, 9, 13, 15, 14, 8, 13, 6, 5, 12, 7, 5,
11, 12, 14, 15, 14, 15, 9, 8, 9, 14, 5, 6, 8, 6, 5, 12,
9, 15, 5, 11, 6, 8, 13, 12, 5, 12, 13, 14, 11, 8, 5, 6
]
# Rotation counts for the right path.
RR = [
8, 9, 9, 11, 13, 15, 15, 5, 7, 7, 8, 11, 14, 14, 12, 6,
9, 13, 15, 7, 12, 8, 9, 11, 7, 7, 12, 7, 6, 15, 13, 11,
9, 7, 15, 11, 8, 6, 6, 14, 12, 13, 5, 14, 13, 13, 7, 5,
15, 5, 8, 11, 14, 14, 6, 14, 6, 9, 12, 9, 12, 5, 15, 8,
8, 5, 12, 9, 12, 5, 14, 6, 8, 13, 6, 5, 15, 13, 11, 11
]
# K constants for the left path.
KL = [0, 0x5a827999, 0x6ed9eba1, 0x8f1bbcdc, 0xa953fd4e]
# K constants for the right path.
KR = [0x50a28be6, 0x5c4dd124, 0x6d703ef3, 0x7a6d76e9, 0]
def fi(x, y, z, i):
"""The f1, f2, f3, f4, and f5 functions from the specification."""
if i == 0:
return x ^ y ^ z
elif i == 1:
return (x & y) | (~x & z)
elif i == 2:
return (x | ~y) ^ z
elif i == 3:
return (x & z) | (y & ~z)
elif i == 4:
return x ^ (y | ~z)
else:
assert False
def rol(x, i):
"""Rotate the bottom 32 bits of x left by i bits."""
return ((x << i) | ((x & 0xffffffff) >> (32 - i))) & 0xffffffff
def compress(h0, h1, h2, h3, h4, block):
"""Compress state (h0, h1, h2, h3, h4) with block."""
# Left path variables.
al, bl, cl, dl, el = h0, h1, h2, h3, h4
# Right path variables.
ar, br, cr, dr, er = h0, h1, h2, h3, h4
# Message variables.
x = [int.from_bytes(block[4*i:4*(i+1)], 'little') for i in range(16)]
# Iterate over the 80 rounds of the compression.
for j in range(80):
rnd = j >> 4
# Perform left side of the transformation.
al = rol(al + fi(bl, cl, dl, rnd) + x[ML[j]] + KL[rnd], RL[j]) + el
al, bl, cl, dl, el = el, al, bl, rol(cl, 10), dl
# Perform right side of the transformation.
ar = rol(ar + fi(br, cr, dr, 4 - rnd) + x[MR[j]] + KR[rnd], RR[j]) + er
ar, br, cr, dr, er = er, ar, br, rol(cr, 10), dr
# Compose old state, left transform, and right transform into new state.
return h1 + cl + dr, h2 + dl + er, h3 + el + ar, h4 + al + br, h0 + bl + cr
def ripemd160(data):
"""Compute the RIPEMD-160 hash of data."""
# Initialize state.
state = (0x67452301, 0xefcdab89, 0x98badcfe, 0x10325476, 0xc3d2e1f0)
# Process full 64-byte blocks in the input.
for b in range(len(data) >> 6):
state = compress(*state, data[64*b:64*(b+1)])
# Construct final blocks (with padding and size).
pad = b"\x80" + b"\x00" * ((119 - len(data)) & 63)
fin = data[len(data) & ~63:] + pad + (8 * len(data)).to_bytes(8, 'little')
# Process final blocks.
for b in range(len(fin) >> 6):
state = compress(*state, fin[64*b:64*(b+1)])
# Produce output.
return b"".join((h & 0xffffffff).to_bytes(4, 'little') for h in state)
def hash160(data):
return ripemd160(hashlib.sha256(data).digest())
class TestFrameworkKey(unittest.TestCase):
def test_ripemd160(self):
"""RIPEMD-160 test vectors."""
# See https://homes.esat.kuleuven.be/~bosselae/ripemd160.html
for msg, hexout in [
(b"", "9c1185a5c5e9fc54612808977ee8f548b2258d31"),
(b"a", "0bdc9d2d256b3ee9daae347be6f4dc835a467ffe"),
(b"abc", "8eb208f7e05d987a9b044a8e98c6b087f15a0bfc"),
(b"message digest", "5d0689ef49d2fae572b881b123a85ffa21595f36"),
(b"abcdefghijklmnopqrstuvwxyz",
"f71c27109c692c1b56bbdceb5b9d2865b3708dbc"),
(b"abcdbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq",
"12a053384a9c0c88e405a06c27dcf49ada62eb2b"),
(b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789",
"b0e20b6e3116640286ed3a87a5713079b21f5189"),
(b"1234567890" * 8, "9b752e45573d4b39f4dbd3323cab82bf63326bfb"),
(b"a" * 1000000, "52783243c1697bdbe16d37f97f68f08325dc1528")
]:
self.assertEqual(ripemd160(msg).hex(), hexout)

243
psbt_faker/serialize.py Normal file
View File

@ -0,0 +1,243 @@
#!/usr/bin/env python3
# Copyright (c) 2010 ArtForz -- public domain half-a-node
# Copyright (c) 2012 Jeff Garzik
# Copyright (c) 2010-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""
Bitcoin Object Python Serializations
************************************
Modified from the test/test_framework/mininode.py file from the
Bitcoin repository
"""
import struct
from struct import error as struct_err
from typing import List
# Serialization/deserialization tools
def ser_compact_size(size: int) -> bytes:
"""
Serialize an integer using Bitcoin's compact size unsigned integer serialization.
:param size: The int to serialize
:returns: The int serialized as a compact size unsigned integer
"""
r = b""
if size < 253:
r = struct.pack("B", size)
elif size < 0x10000:
r = struct.pack("<BH", 253, size)
elif size < 0x100000000:
r = struct.pack("<BI", 254, size)
else:
r = struct.pack("<BQ", 255, size)
return r
def deser_compact_size(f):
"""
Deserialize a compact size unsigned integer from the beginning of the byte stream.
:param f: The byte stream
:returns: The integer that was serialized
"""
try:
nit: int = struct.unpack("<B", f.read(1))[0]
except struct_err:
return None
if nit == 253:
nit = struct.unpack("<H", f.read(2))[0]
elif nit == 254:
nit = struct.unpack("<I", f.read(4))[0]
elif nit == 255:
nit = struct.unpack("<Q", f.read(8))[0]
return nit
def deser_string(f) -> bytes:
"""
Deserialize a variable length byte string serialized with Bitcoin's variable length string serialization from a byte stream.
:param f: The byte stream
:returns: The byte string that was serialized
"""
nit = deser_compact_size(f)
return f.read(nit)
def ser_string(s: bytes) -> bytes:
"""
Serialize a byte string with Bitcoin's variable length string serialization.
:param s: The byte string to be serialized
:returns: The serialized byte string
"""
return ser_compact_size(len(s)) + s
def deser_uint256(f) -> int:
"""
Deserialize a 256 bit integer serialized with Bitcoin's 256 bit integer serialization from a byte stream.
:param f: The byte stream.
:returns: The integer that was serialized
"""
r = 0
for i in range(8):
t = struct.unpack("<I", f.read(4))[0]
r += t << (i * 32)
return r
def ser_uint256(u: int) -> bytes:
"""
Serialize a 256 bit integer with Bitcoin's 256 bit integer serialization.
:param u: The integer to serialize
:returns: The serialized 256 bit integer
"""
rs = b""
for _ in range(8):
rs += struct.pack("<I", u & 0xFFFFFFFF)
u >>= 32
return rs
def uint256_from_str(s: bytes) -> int:
"""
Deserialize a 256 bit integer serialized with Bitcoin's 256 bit integer serialization from a byte string.
:param s: The byte string
:returns: The integer that was serialized
"""
r = 0
t = struct.unpack("<IIIIIIII", s[:32])
for i in range(8):
r += t[i] << (i * 32)
return r
def deser_vector(f, c) -> List:
"""
Deserialize a vector of objects with Bitcoin's object vector serialization from a byte stream.
:param f: The byte stream
:param c: The class of object to deserialize for each object in the vector
:returns: A list of objects that were serialized
"""
nit = deser_compact_size(f)
r = []
for _ in range(nit):
t = c()
t.deserialize(f)
r.append(t)
return r
def ser_vector(v) -> bytes:
"""
Serialize a vector of objects with Bitcoin's object vector serialzation.
:param v: The list of objects to serialize
:returns: The serialized objects
"""
r = ser_compact_size(len(v))
for i in v:
r += i.serialize()
return r
def deser_string_vector(f) -> List[bytes]:
"""
Deserialize a vector of byte strings from a byte stream.
:param f: The byte stream
:returns: The list of byte strings that were serialized
"""
nit = deser_compact_size(f)
r = []
for _ in range(nit):
t = deser_string(f)
r.append(t)
return r
def ser_string_vector(v: List[bytes]) -> bytes:
"""
Serialize a list of byte strings as a vector of byte strings.
:param v: The list of byte strings to serialize
:returns: The serialized list of byte strings
"""
r = ser_compact_size(len(v))
for sv in v:
r += ser_string(sv)
return r
def ser_sig_der(r: bytes, s: bytes) -> bytes:
"""
Serialize the ``r`` and ``s`` values of an ECDSA signature using DER.
:param r: The ``r`` value bytes
:param s: The ``s`` value bytes
:returns: The DER encoded signature
"""
sig = b"\x30"
# Make r and s as short as possible
ri = 0
for b in r:
if b == 0:
ri += 1
else:
break
r = r[ri:]
si = 0
for b in s:
if b == 0:
si += 1
else:
break
s = s[si:]
# Make positive of neg
first = r[0]
if first & (1 << 7) != 0:
r = b"\x00" + r
first = s[0]
if first & (1 << 7) != 0:
s = b"\x00" + s
# Write total length
total_len = len(r) + len(s) + 4
sig += struct.pack("B", total_len)
# write r
sig += b"\x02"
sig += struct.pack("B", len(r))
sig += r
# write s
sig += b"\x02"
sig += struct.pack("B", len(s))
sig += s
sig += b"\x01"
return sig
def ser_sig_compact(r: bytes, s: bytes, recid: bytes) -> bytes:
"""
Serialize the ``r`` and ``s`` values of an ECDSA signature using the compact signature serialization scheme.
:param r: The ``r`` value bytes
:param s: The ``s`` value bytes
:returns: The compact signature
"""
rec = struct.unpack("B", recid)[0]
prefix = struct.pack("B", 27 + 4 + rec)
sig = b""
sig += prefix
sig += r + s
return sig

View File

@ -1,19 +1,25 @@
#
# Creating fake transactions. Not simple... but only for testing purposes, so ....
#
import time, os, random
from binascii import b2a_hex, a2b_hex
import struct, random, hashlib
from .segwit_addr import encode as bech32_encode
from io import BytesIO
from pprint import pprint, pformat
from decimal import Decimal
from pycoin.key.BIP32Node import BIP32Node
from .psbt import BasicPSBT, BasicPSBTInput, BasicPSBTOutput, PSBT_IN_REDEEM_SCRIPT
from .psbt import BasicPSBT, BasicPSBTInput, BasicPSBTOutput
from .base58 import encode_base58_checksum
from .ripemd import hash160
from .helpers import str2path
from .serialize import uint256_from_str
from .bip32 import BIP32Node
from .ctransaction import CTransaction, CTxIn, CTxOut, COutPoint
# all possible addr types, including multisig/scripts
ADDR_STYLES = ['p2wpkh', 'p2wsh', 'p2sh', 'p2pkh', 'p2wsh-p2sh', 'p2wpkh-p2sh', 'p2tr']
# single-signer
ADDR_STYLES_SINGLE = ['p2wpkh', 'p2pkh', 'p2wpkh-p2sh']
# multi-signer
ADDR_STYLES_MULTI = ['p2wsh', 'p2sh', 'p2sh-p2wsh', 'p2wsh-p2sh']
def prandom(count):
# make some bytes, randomly, but not: deterministic
@ -46,8 +52,6 @@ def fake_dest_addr(style='p2pkh'):
def make_change_addr(wallet, style):
# provide script, pubkey and xpath for a legit-looking change output
import struct, random
from pycoin.encoding import hash160
redeem_scr, actual_scr = None, None
deriv = [12, 34, random.randint(0, 1000)]
@ -74,22 +78,19 @@ def make_change_addr(wallet, style):
return redeem_scr, actual_scr, is_segwit, dest.sec(), struct.pack('4I', xfp, *deriv)
def fake_txn(num_ins, num_outs, master_xpub=None, subpath="0/%d", fee=10000,
outvals=None, segwit_in=False, outstyles=['p2pkh'], is_testnet=False,
change_style='p2pkh', partial=False,
change_outputs=[]):
outvals=None, segwit_in=False, outstyles=['p2pkh'], is_testnet=False,
change_style='p2pkh', partial=False,
change_outputs=[]):
# make various size txn's ... completely fake and pointless values
# - but has UTXO's to match needs
# - input total = num_inputs * 1BTC
from pycoin.tx.Tx import Tx
from pycoin.tx.TxIn import TxIn
from pycoin.tx.TxOut import TxOut
from pycoin.serialize import h2b_rev
from struct import pack
psbt = BasicPSBT()
txn = Tx(2,[],[])
txn = CTransaction()
txn.nVersion = 2
# we have a key; use it to provide "plausible" value inputs
if master_xpub:
mk = BIP32Node.from_wallet_key(master_xpub)
@ -119,24 +120,34 @@ def fake_txn(num_ins, num_outs, master_xpub=None, subpath="0/%d", fee=10000,
else:
psbt.inputs[i].bip32_paths[sec] = xfp + pack('<II', 0, i)
# UTXO that provides the funding for to-be-signed txn
supply = Tx(2,[TxIn(pack('4Q', 0xdead, 0xbeef, 0, 0), 73)],[])
supply = CTransaction()
supply.nVersion = 2
out_point = COutPoint(
uint256_from_str(struct.pack('4Q', 0xdead, 0xbeef, 0, 0)),
73
)
supply.vin = [CTxIn(out_point, nSequence=0xffffffff)]
scr = bytes([0x76, 0xa9, 0x14]) + subkey.hash160() + bytes([0x88, 0xac])
if segwit_in:
# p2wpkh
scr = bytes([0x00, 0x14]) + subkey.hash160()
else:
# p2pkh
scr = bytes([0x76, 0xa9, 0x14]) + subkey.hash160() + bytes([0x88, 0xac])
supply.txs_out.append(TxOut(1E8, scr))
supply.vout.append(CTxOut(int(1E8), scr))
with BytesIO() as fd:
if not segwit_in:
supply.stream(fd)
psbt.inputs[i].utxo = fd.getvalue()
else:
supply.txs_out[-1].stream(fd)
psbt.inputs[i].witness_utxo = fd.getvalue()
if segwit_in:
# just utxo for segwit
psbt.inputs[i].witness_utxo = supply.vout[-1].serialize()
else:
# whole tx for pre-segwit
psbt.inputs[i].utxo = supply.serialize_with_witness()
spendable = TxIn(supply.hash(), 0)
txn.txs_in.append(spendable)
supply.calc_sha256()
spendable = CTxIn(COutPoint(supply.sha256, 0), nSequence=0xffffffff)
txn.vin.append(spendable)
for i in range(num_outs):
is_change = False
@ -164,28 +175,208 @@ def fake_txn(num_ins, num_outs, master_xpub=None, subpath="0/%d", fee=10000,
psbt.outputs[i].redeem_script = scr
if not outvals:
h = TxOut(round(((1E8*num_ins)-fee) / num_outs, 4), act_scr)
h = CTxOut(int(round(((1E8*num_ins)-fee) / num_outs, 4)), act_scr)
else:
h = TxOut(outvals[i], act_scr)
h = CTxOut(int(outvals[i]), act_scr)
outputs.append( (Decimal(h.coin_value)/Decimal(1E8), act_scr, is_change) )
outputs.append((Decimal(h.nValue)/Decimal(1E8), act_scr, is_change) )
txn.txs_out.append(h)
txn.vout.append(h)
with BytesIO() as b:
txn.stream(b)
psbt.txn = b.getvalue()
psbt.txn = txn.serialize()
rv = BytesIO()
psbt.serialize(rv)
return rv.getvalue(), [(n, render_address(s, is_testnet), ic) for n,s,ic in outputs]
def make_redeem(M, keys, idx, is_change, bip67=True):
# Construct a redeem script, and ordered list of xfp+path to match.
N = len(keys)
# see BIP 67: <https://github.com/bitcoin/bips/blob/master/bip-0067.mediawiki>
data = []
for cosigner_idx, (xfp, str_path, node) in enumerate(keys):
sp = f"{int(is_change)}/{idx}"
n = node.subkey_for_path(sp)
pk = n.sec()
data.append((pk, str2path(xfp, str_path + "/" + sp)))
if bip67:
data.sort(key=lambda i: i[0])
mm = [80 + M] if M <= 16 else [1, M]
nn = [80 + N] if N <= 16 else [1, N]
rv = bytes(mm)
for pk, _ in data:
rv += bytes([len(pk)]) + pk
rv += bytes(nn + [0xAE])
return rv, data
def make_ms_address(M, keys, idx, is_change, addr_fmt="p2wsh", testnet=1, bip67=True):
# Construct addr and script need to represent a p2sh address
script, bip32paths = make_redeem(M, keys, idx, is_change, bip67=bip67)
if addr_fmt == "p2wsh":
# testnet=2 --> regtest
hrp = ['bc', 'tb', 'bcrt'][testnet]
data = hashlib.sha256(script).digest()
addr = bech32_encode(hrp, 0, data)
scriptPubKey = bytes([0x0, 0x20]) + data
else:
if addr_fmt == "p2sh":
digest = hash160(script)
elif addr_fmt in ("p2sh-p2wsh", "p2wsh-p2sh"):
digest = hash160(b'\x00\x20' + hashlib.sha256(script).digest())
else:
raise ValueError(addr_fmt)
prefix = bytes([196]) if testnet else bytes([5])
addr = encode_base58_checksum(prefix + digest)
scriptPubKey = bytes([0xa9, 0x14]) + digest + bytes([0x87])
return addr, scriptPubKey, script, bip32paths
def fake_ms_txn(num_ins, num_outs, M, keys, fee=10000, outvals=None, segwit_in=True,
outstyles=['p2wsh'], change_outputs=[], incl_xpubs=False,
input_amount=1E8, bip67=True, locktime=0, testnet=False):
# make various size MULTISIG txn's ... completely fake and pointless values
# - but has UTXO's to match needs
psbt = BasicPSBT()
# if psbt_v2 is None:
# # anything passed directly to this function overrides
# # pytest flag --psbt2 - only care about pytest flag
# # if psbt_v2 is not specified (None)
# psbt_v2 = pytestconfig.getoption('psbt2')
#
# if psbt_v2:
# psbt.version = 2
# psbt.txn_version = 2
# psbt.input_count = num_ins
# psbt.output_count = num_outs
txn = CTransaction()
txn.nVersion = 2
txn.nLockTime = locktime
if incl_xpubs:
# add global header with XPUB's
for idx, (xfp, str_path, node) in enumerate(keys):
kk = str2path(xfp, str_path)
psbt.xpubs.append((node.node.serialize_public(), kk))
psbt.inputs = [BasicPSBTInput(idx=i) for i in range(num_ins)]
psbt.outputs = [BasicPSBTOutput(idx=i) for i in range(num_outs)]
for i in range(num_ins):
# make a fake txn to supply each of the inputs
# - each input is 1BTC
# addr where the fake money will be stored.
addr, scriptPubKey, script, details = make_ms_address(M, keys, i, True,
addr_fmt="p2wsh" if segwit_in else "p2sh",
bip67=bip67)
# lots of supporting details needed for p2sh inputs
if segwit_in:
psbt.inputs[i].witness_script = script
else:
psbt.inputs[i].redeem_script = script
for pubkey, xfp_path in details:
psbt.inputs[i].bip32_paths[pubkey] = xfp_path
# UTXO that provides the funding for to-be-signed txn
supply = CTransaction()
supply.nVersion = 2
out_point = COutPoint(
uint256_from_str(struct.pack('4Q', 0xdead, 0xbeef, 0, 0)),
73
)
supply.vin = [CTxIn(out_point, nSequence=0xffffffff)]
supply.vout.append(CTxOut(int(input_amount), scriptPubKey))
if not segwit_in:
psbt.inputs[i].utxo = supply.serialize_with_witness()
else:
psbt.inputs[i].witness_utxo = supply.vout[-1].serialize()
supply.calc_sha256()
# if psbt_v2:
# psbt.inputs[i].previous_txid = supply.hash
# psbt.inputs[i].prevout_idx = 0
# # TODO sequence
# # TODO height timelock
# # TODO time timelock
seq = 0xffffffff
if (i == 0) and locktime:
# need to decrement at least one for locktime to work
seq -= 1
spendable = CTxIn(COutPoint(supply.sha256, 0), nSequence=seq)
txn.vin.append(spendable)
for i in range(num_outs):
if not outstyles:
style = ADDR_STYLES_MULTI[i % len(ADDR_STYLES_MULTI)]
elif len(outstyles) == num_outs:
style = outstyles[i]
else:
style = outstyles[i % len(outstyles)]
if i in change_outputs:
addr, scriptPubKey, scr, details = make_ms_address(M, keys, num_ins+i, False,
addr_fmt=style, bip67=bip67)
for pubkey, xfp_path in details:
psbt.outputs[i].bip32_paths[pubkey] = xfp_path
if 'w' in style:
psbt.outputs[i].witness_script = scr
if style.endswith('p2sh'):
psbt.outputs[i].redeem_script = b'\0\x20' + hashlib.sha256(scr).digest()
elif style.endswith('sh'):
psbt.outputs[i].redeem_script = scr
else:
scriptPubKey = fake_dest_addr(style)
assert scriptPubKey
# if psbt_v2:
# psbt.outputs[i].script = scriptPubKey
# if outvals:
# psbt.outputs[i].amount = outvals[i]
# else:
# psbt.outputs[i].amount = int(round(((input_amount * num_ins) - fee) / num_outs, 4))
if not outvals:
h = CTxOut(int(round(((input_amount*num_ins)-fee) / num_outs, 4)), scriptPubKey)
else:
h = CTxOut(int(outvals[i]), scriptPubKey)
txn.vout.append(h)
psbt.txn = txn.serialize_with_witness()
rv = BytesIO()
psbt.serialize(rv)
return rv.getvalue()
def render_address(script, testnet=True):
# take a scriptPubKey (part of the TxOut) and convert into conventional human-readable
# string... aka: the "payment address"
from pycoin.encoding import b2a_hashed_base58
from .segwit_addr import encode as bech32_encode
ll = len(script)
@ -203,11 +394,11 @@ def render_address(script, testnet=True):
# P2PKH
if ll == 25 and script[0:3] == b'\x76\xA9\x14' and script[23:26] == b'\x88\xAC':
return b2a_hashed_base58(b58_addr + script[3:3+20])
return encode_base58_checksum(b58_addr + script[3:3+20])
# P2SH
if ll == 23 and script[0:2] == b'\xA9\x14' and script[22] == 0x87:
return b2a_hashed_base58(b58_script + script[2:2+20])
return encode_base58_checksum(b58_script + script[2:2+20])
# P2WPKH
if ll == 22 and script[0:2] == b'\x00\x14':

View File

@ -1,2 +1,3 @@
click>=6.7
pycoin==0.80
ecdsa
# or can use python-secp256k1

View File

@ -6,7 +6,7 @@
from setuptools import setup, find_packages
VERSION = '1.3'
VERSION = '1.4'
with open('README.md', 'rt') as fd:
desc = fd.read()
@ -26,11 +26,11 @@ if __name__ == '__main__':
python_requires='>3.6.0',
install_requires=[
'Click',
'pycoin==0.80',
'ecdsa',
],
entry_points='''
[console_scripts]
psbt_faker=psbt_faker:faker
psbt_faker=psbt_faker:main
''',
classifiers=[
"Programming Language :: Python :: 3",