This commit is contained in:
scgbckbone 2023-05-07 00:57:22 +02:00 committed by doc-hex
parent e5d1782b9d
commit efda6f84dd
13 changed files with 771 additions and 185 deletions

@ -1 +1 @@
Subproject commit 78ce292c4df1643a6309f4e58e30823caa3695d1
Subproject commit 52b5950105af3c40dc2e6ab7c0b3a161667db787

View File

@ -1,5 +1,6 @@
## 5.2.0 - 2023-09-21
- New Feature: PSBTv2 support added
- New Feature: `Lock Down Seed` now works with every ephemeral secret (not just BIP39 passphrase)
- New Feature: BIP-39 Passphrase can now be added to word based Ephemeral Seeds
- New Feature: Add ability to back-up BIP39 Passphrase wallet

View File

@ -12,8 +12,8 @@ from sffile import SizerFile
from multisig import MultisigWallet, disassemble_multisig, disassemble_multisig_mn
from exceptions import FatalPSBTIssue, FraudulentChangeOutput
from serializations import ser_compact_size, deser_compact_size, hash160, hash256
from serializations import CTxIn, CTxInWitness, CTxOut, ser_string, ser_uint256
from serializations import ser_sig_der, uint256_from_str, ser_push_data, uint256_from_str
from serializations import CTxIn, CTxInWitness, CTxOut, ser_string, ser_uint256, COutPoint
from serializations import ser_sig_der, uint256_from_str, ser_push_data
from serializations import SIGHASH_ALL, SIGHASH_SINGLE, SIGHASH_NONE, SIGHASH_ANYONECANPAY
from serializations import ALL_SIGHASH_FLAGS
from glob import settings
@ -23,7 +23,11 @@ from public_constants import (
PSBT_IN_PARTIAL_SIG, PSBT_IN_SIGHASH_TYPE, PSBT_IN_REDEEM_SCRIPT,
PSBT_IN_WITNESS_SCRIPT, PSBT_IN_BIP32_DERIVATION, PSBT_IN_FINAL_SCRIPTSIG,
PSBT_IN_FINAL_SCRIPTWITNESS, PSBT_OUT_REDEEM_SCRIPT, PSBT_OUT_WITNESS_SCRIPT,
PSBT_OUT_BIP32_DERIVATION, MAX_PATH_DEPTH, MAX_SIGNERS
PSBT_OUT_BIP32_DERIVATION, PSBT_OUT_SCRIPT, PSBT_OUT_AMOUNT, PSBT_GLOBAL_VERSION,
PSBT_GLOBAL_TX_MODIFIABLE, PSBT_GLOBAL_OUTPUT_COUNT, PSBT_GLOBAL_INPUT_COUNT,
PSBT_GLOBAL_FALLBACK_LOCKTIME, PSBT_GLOBAL_TX_VERSION, PSBT_IN_PREVIOUS_TXID,
PSBT_IN_OUTPUT_INDEX, PSBT_IN_SEQUENCE, PSBT_IN_REQUIRED_TIME_LOCKTIME,
PSBT_IN_REQUIRED_HEIGHT_LOCKTIME, MAX_PATH_DEPTH, MAX_SIGNERS
)
psbt_tmp256 = bytearray(256)
@ -305,9 +309,9 @@ class psbtProxy:
#
class psbtOutputProxy(psbtProxy):
no_keys = { PSBT_OUT_REDEEM_SCRIPT, PSBT_OUT_WITNESS_SCRIPT }
blank_flds = ('unknown', 'subpaths', 'redeem_script', 'witness_script',
'is_change', 'num_our_keys', 'amount', 'address', 'scriptpubkey',
'attestation')
'is_change', 'num_our_keys', 'amount', 'script', 'attestation')
def __init__(self, fd, idx):
super().__init__()
@ -316,6 +320,8 @@ class psbtOutputProxy(psbtProxy):
#self.subpaths = None # a dictionary if non-empty
#self.redeem_script = None
#self.witness_script = None
#self.script = None
#self.amount = None
# this flag is set when we are assuming output will be change (same wallet)
#self.is_change = False
@ -333,6 +339,10 @@ class psbtOutputProxy(psbtProxy):
self.redeem_script = val
elif kt == PSBT_OUT_WITNESS_SCRIPT:
self.witness_script = val
elif kt == PSBT_OUT_SCRIPT:
self.script = val
elif kt == PSBT_OUT_AMOUNT:
self.amount = val
elif kt == PSBT_PROPRIETARY:
prefix, subtype, keydata = decode_prop_key(key[1:])
# examine only Coinkite proprietary keys
@ -347,7 +357,7 @@ class psbtOutputProxy(psbtProxy):
raise FatalPSBTIssue("Duplicate key. Key for unknown value already provided in output.")
self.unknown[key] = val
def serialize(self, out_fd, my_idx):
def serialize(self, out_fd, is_v2):
wr = lambda *a: self.write(out_fd, *a)
@ -361,6 +371,10 @@ class psbtOutputProxy(psbtProxy):
if self.witness_script:
wr(PSBT_OUT_WITNESS_SCRIPT, self.witness_script)
if is_v2:
wr(PSBT_OUT_SCRIPT, self.script)
wr(PSBT_OUT_AMOUNT, self.amount)
if self.attestation:
wr(PSBT_PROPRIETARY, self.attestation, encode_prop_key(PSBT_PROP_CK_ID, PSBT_ATTESTATION_SUBTYPE))
@ -379,7 +393,6 @@ class psbtOutputProxy(psbtProxy):
# - full key derivation and validation is done during signing, and critical.
# - we raise fraud alarms, since these are not innocent errors
#
num_ours = self.parse_subpaths(my_xfp, parent.warnings)
if num_ours == 0:
@ -516,11 +529,12 @@ class psbtInputProxy(psbtProxy):
PSBT_IN_REDEEM_SCRIPT, PSBT_IN_WITNESS_SCRIPT, PSBT_IN_FINAL_SCRIPTSIG,
PSBT_IN_FINAL_SCRIPTWITNESS }
blank_flds = ('unknown',
'utxo', 'witness_utxo', 'sighash',
'redeem_script', 'witness_script', 'fully_signed',
'is_segwit', 'is_multisig', 'is_p2sh', 'num_our_keys',
'required_key', 'scriptSig', 'amount', 'scriptCode', 'added_sig')
blank_flds = (
'unknown', 'utxo', 'witness_utxo', 'sighash', 'redeem_script', 'witness_script',
'fully_signed', 'is_segwit', 'is_multisig', 'is_p2sh', 'num_our_keys',
'required_key', 'scriptSig', 'amount', 'scriptCode', 'added_sig', 'previous_txid',
'prevout_idx', 'sequence', 'req_time_locktime', 'req_height_locktime'
)
def __init__(self, fd, idx):
super().__init__()
@ -552,6 +566,12 @@ class psbtInputProxy(psbtProxy):
# after signing, we'll have a signature to add to output PSBT
#self.added_sig = None
#self.previous_txid = None
#self.prevout_idx = None
#self.sequence = None
#self.req_time_locktime = None
#self.req_height_locktime = None
self.parse(fd)
def validate(self, idx, txin, my_xfp, parent):
@ -829,6 +849,16 @@ class psbtInputProxy(psbtProxy):
self.witness_script = val
elif kt == PSBT_IN_SIGHASH_TYPE:
self.sighash = unpack('<I', val)[0]
elif kt == PSBT_IN_PREVIOUS_TXID:
self.previous_txid = val
elif kt == PSBT_IN_OUTPUT_INDEX:
self.prevout_idx = val
elif kt == PSBT_IN_SEQUENCE:
self.sequence = unpack("<I", self.get(val))[0]
elif kt == PSBT_IN_REQUIRED_TIME_LOCKTIME:
self.req_time_locktime = unpack("<I", self.get(val))[0]
elif kt == PSBT_IN_REQUIRED_HEIGHT_LOCKTIME:
self.req_height_locktime = unpack("<I", self.get(val))[0]
else:
# including: PSBT_IN_FINAL_SCRIPTSIG, PSBT_IN_FINAL_SCRIPTWITNESS
self.unknown = self.unknown or {}
@ -836,7 +866,7 @@ class psbtInputProxy(psbtProxy):
raise FatalPSBTIssue("Duplicate key. Key for unknown value already provided in input.")
self.unknown[key] = val
def serialize(self, out_fd, my_idx):
def serialize(self, out_fd, is_v2):
# Output this input's values; might include signatures that weren't there before
wr = lambda *a: self.write(out_fd, *a)
@ -866,6 +896,20 @@ class psbtInputProxy(psbtProxy):
if self.witness_script:
wr(PSBT_IN_WITNESS_SCRIPT, self.witness_script)
if is_v2:
wr(PSBT_IN_PREVIOUS_TXID, self.previous_txid)
wr(PSBT_IN_OUTPUT_INDEX, self.prevout_idx)
if self.sequence is not None:
wr(PSBT_IN_SEQUENCE, pack("<I", self.sequence))
if self.req_time_locktime is not None:
wr(PSBT_IN_REQUIRED_TIME_LOCKTIME, pack("<I", self.req_time_locktime))
if self.req_height_locktime is not None:
wr(PSBT_IN_REQUIRED_HEIGHT_LOCKTIME, pack("<I", self.req_height_locktime))
if self.unknown:
for k, v in self.unknown.items():
wr(k[0], v, k[1:])
@ -881,10 +925,10 @@ class psbtObject(psbtProxy):
super().__init__()
# global objects
self.version = None
self.txn = None
self.xpubs = [] # tuples(xfp_path, xpub)
from glob import dis
self.my_xfp = settings.get('xfp', 0)
# details that we discover as we go
@ -893,11 +937,13 @@ class psbtObject(psbtProxy):
self.had_witness = None
self.num_inputs = None
self.num_outputs = None
self.txn_modifiable = None
self.fallback_locktime = None
self.vin_start = None
self.vout_start = None
self.wit_start = None
self.txn_version = None
self.lock_time = None
self._lock_time = None
self.total_value_out = None
self.total_value_in = None
self.presigned_inputs = set()
@ -918,6 +964,16 @@ class psbtObject(psbtProxy):
self.warnings = []
# v1 vs v2 validation
self.is_v2 = False
self.has_gic = False # global input count
self.has_goc = False # global output count
self.has_gtv = False # global txn version
@property
def lock_time(self):
return (self._lock_time or self.fallback_locktime) or 0
def store(self, kt, key, val):
# capture the values we care about
@ -927,6 +983,21 @@ class psbtObject(psbtProxy):
# list of tuples(xfp_path, xpub)
self.xpubs.append( (self.get(val), key[1:]) )
assert len(self.xpubs) <= MAX_SIGNERS
elif kt == PSBT_GLOBAL_VERSION:
self.version = unpack("<I", self.get(val))[0]
elif kt == PSBT_GLOBAL_TX_VERSION:
self.txn_version = unpack("<I", self.get(val))[0]
self.has_gtv = True
elif kt == PSBT_GLOBAL_FALLBACK_LOCKTIME:
self.fallback_locktime = unpack("<I", self.get(val))[0]
elif kt == PSBT_GLOBAL_INPUT_COUNT:
self.num_inputs = deser_compact_size(BytesIO(self.get(val)))
self.has_gic = True
elif kt == PSBT_GLOBAL_OUTPUT_COUNT:
self.num_outputs = deser_compact_size(BytesIO(self.get(val)))
self.has_goc = True
elif kt == PSBT_GLOBAL_TX_MODIFIABLE:
self.txn_modifiable = val[0]
else:
self.unknown = self.unknown or {}
if key in self.unknown:
@ -935,23 +1006,32 @@ class psbtObject(psbtProxy):
def output_iter(self):
# yield the txn's outputs: index, (CTxOut object) for each
assert self.vout_start is not None # must call input_iter/validate first
fd = self.fd
fd.seek(self.vout_start)
total_out = 0
tx_out = CTxOut()
for idx in range(self.num_outputs):
if self.is_v2:
for idx in range(self.num_outputs):
out = self.outputs[idx]
amount = unpack("<q", self.get(out.amount))[0]
spk = self.get(out.script)
tx_out = CTxOut(nValue=amount, scriptPubKey=spk)
total_out += amount
yield idx, tx_out
else:
assert self.vout_start is not None # must call input_iter/validate first
tx_out.deserialize(fd)
fd = self.fd
fd.seek(self.vout_start)
total_out += tx_out.nValue
tx_out = CTxOut()
for idx in range(self.num_outputs):
cont = fd.tell()
yield idx, tx_out
tx_out.deserialize(fd)
fd.seek(cont)
total_out += tx_out.nValue
cont = fd.tell()
yield idx, tx_out
fd.seek(cont)
if self.total_value_out is None:
self.total_value_out = total_out
@ -1004,7 +1084,7 @@ class psbtObject(psbtProxy):
self.wit_start = _skip_n_objs(fd, num_in, 'CTxInWitness')
# we are at end of outputs, and no witness data, so locktime is here
self.lock_time = unpack("<I", fd.read(4))[0]
self._lock_time = unpack("<I", fd.read(4))[0]
assert fd.tell() == end_pos, 'txn read end wrong'
@ -1017,21 +1097,29 @@ class psbtObject(psbtProxy):
#
# - we also capture much data about the txn on the first pass thru here
#
fd = self.fd
if self.is_v2:
for idx in range(self.num_inputs):
inp = self.inputs[idx]
prevout = COutPoint(uint256_from_str(self.get(inp.previous_txid)),
unpack("<I", self.get(inp.prevout_idx))[0])
sequence = inp.sequence if inp.sequence is not None else 0xffffffff
txin = CTxIn(outpoint=prevout, nSequence=sequence)
yield idx, txin
else:
fd = self.fd
assert self.vin_start # call parse_txn() first!
assert self.vin_start
# stream out the inputs
fd.seek(self.vin_start)
# stream out the inputs
fd.seek(self.vin_start)
txin = CTxIn()
for idx in range(self.num_inputs):
txin.deserialize(fd)
txin = CTxIn()
for idx in range(self.num_inputs):
txin.deserialize(fd)
cont = fd.tell()
yield idx, txin
cont = fd.tell()
yield idx, txin
fd.seek(cont)
fd.seek(cont)
def input_witness_iter(self):
# yield all the witness data, in order by input
@ -1155,10 +1243,60 @@ class psbtObject(psbtProxy):
# Do a first pass over the txn. Raise assertions, be terse tho because
# these messages are rarely seen. These are syntax/fatal errors.
#
assert self.txn[1] > 63, 'too short'
if self.version is not None:
# verision is provided in PSBT - take it as given
assert self.version in (0,2)
else:
# PSBT version is not defined
# global unsigned tx is only allowed in v0
self.version = 2 if self.txn is None else 0
self.is_v2 = self.version is not None and self.version >= 2
if self.is_v2:
assert self.has_gic, "v2 requires global input count"
assert self.has_goc, "v2 requires global output count"
assert self.has_gtv, "v2 requires global txn version"
assert self.txn is None, "v2 requires exclusion of global unsigned tx"
else:
assert not self.has_gic, "v0 requires exclusion of global input count"
assert not self.has_goc, "v0 requires exclusion of global output count"
assert not self.has_gtv, "v0 requires exclusion of global txn version"
assert self.txn, "v0 requires inclusion of global unsigned tx"
assert self.txn[1] > 63, 'txn too short'
assert self.fallback_locktime is None, "v0 requires exclusion of global fallback locktime"
assert self.txn_modifiable is None, "v0 requires exclusion of global txn modifiable"
for idx, txo in self.output_iter():
out = self.outputs[idx]
if self.is_v2:
# v2 requires inclusion
assert out.amount
assert out.script
else:
# v0 requires exclusion
assert out.amount is None
assert out.script is None
# this parses the input TXN in-place
for idx, txin in self.input_iter():
inp = self.inputs[idx]
if self.is_v2:
# v2 requires inclusion
assert inp.prevout_idx is not None
assert inp.previous_txid
if inp.req_time_locktime is not None:
assert inp.req_time_locktime >= 500000000
if inp.req_height_locktime is not None:
assert 0 < inp.req_height_locktime < 500000000
else:
# v0 requires exclusion
assert inp.prevout_idx is None
assert inp.previous_txid is None
assert inp.sequence is None
assert inp.req_time_locktime is None
assert inp.req_height_locktime is None
self.inputs[idx].validate(idx, txin, self.my_xfp, self)
assert len(self.inputs) == self.num_inputs, 'ni mismatch'
@ -1194,7 +1332,6 @@ class psbtObject(psbtProxy):
for out_idx, txo in self.output_iter():
pass
# check fee is reasonable
if self.total_value_out == 0:
per_fee = 100
@ -1455,11 +1592,12 @@ class psbtObject(psbtProxy):
# read main body (globals)
rv.parse(fd)
assert rv.txn, 'missing reqd section'
# learn about the bitcoin transaction we are signing.
rv.parse_txn()
if rv.txn:
# learn about the bitcoin transaction we are signing.
rv.parse_txn()
assert rv.num_inputs is not None
assert rv.num_outputs is not None
rv.inputs = [psbtInputProxy(fd, idx) for idx in range(rv.num_inputs)]
rv.outputs = [psbtOutputProxy(fd, idx) for idx in range(rv.num_outputs)]
@ -1486,8 +1624,19 @@ class psbtObject(psbtProxy):
out_fd.write(ser_compact_size(txn_len))
self.finalize(out_fd)
else:
# provide original txn (unchanged)
wr(PSBT_GLOBAL_UNSIGNED_TX, self.txn)
if not self.is_v2: # can be 0 or None
# provide original txn (unchanged)
wr(PSBT_GLOBAL_UNSIGNED_TX, self.txn)
if self.is_v2:
wr(PSBT_GLOBAL_TX_VERSION, pack('<I', self.txn_version))
if self.fallback_locktime is not None:
wr(PSBT_GLOBAL_FALLBACK_LOCKTIME, pack('<I', self.fallback_locktime))
wr(PSBT_GLOBAL_INPUT_COUNT, ser_compact_size(self.num_inputs))
wr(PSBT_GLOBAL_OUTPUT_COUNT, ser_compact_size(self.num_outputs))
if self.txn_modifiable is not None:
wr(PSBT_GLOBAL_TX_MODIFIABLE, bytes([self.txn_modifiable]))
wr(PSBT_GLOBAL_VERSION, pack('<I', self.version))
if self.xpubs:
for v, k in self.xpubs:
@ -1501,11 +1650,11 @@ class psbtObject(psbtProxy):
out_fd.write(b'\0')
for idx, inp in enumerate(self.inputs):
inp.serialize(out_fd, idx)
inp.serialize(out_fd, self.is_v2)
out_fd.write(b'\0')
for idx, outp in enumerate(self.outputs):
outp.serialize(out_fd, idx)
outp.serialize(out_fd, self.is_v2)
out_fd.write(b'\0')
def sign_it(self):
@ -1667,6 +1816,9 @@ class psbtObject(psbtProxy):
success.add(in_idx)
if self.is_v2:
self.set_modifiable_flag(inp)
# memory cleanup
del result, r, s
@ -1675,6 +1827,31 @@ class psbtObject(psbtProxy):
# done.
dis.progress_bar_show(1)
def set_modifiable_flag(self, inp):
# only for PSBTv2
# sighash needs to be properly set on psbtInputProxy object before this runs
# TODO possible to also cross-check with sighash from signature:
# 1. witnes/scriptSig in serialized tx in PSBT
# 2. psbt meta fields partial_sigs, taproot_key_sig and taproot_script_sigs
if self.txn_modifiable is None:
# set to inputs/outputs modifiable
# has SINGLE to false
self.txn_modifiable = 3
if not (inp.sighash & SIGHASH_ANYONECANPAY):
# Bit 0 is the Inputs Modifiable flag - set to 0
if self.txn_modifiable & 1:
self.txn_modifiable &= ~1
out_type = inp.sighash & 0x7f # regardless of ANYONECANPAY
if out_type != SIGHASH_NONE:
# Bit 1 is the Outputs Modifiable flag - set to 0
if self.txn_modifiable & 2:
self.txn_modifiable &= ~2
if out_type == SIGHASH_SINGLE:
# Bit 2 is the Has SIGHASH_SINGLE flag - set it to 1
self.txn_modifiable |= 4
def make_txn_sighash(self, replace_idx, replacement, sighash_type):
# calculate the hash value for one input of current transaction

View File

@ -384,7 +384,7 @@ class CTxOut(object):
# If this is reached, we do not understand the output well
# enough to allow the user to authorize the spend, so fail hard.
raise ValueError('scriptPubKey template fail: ' + b2a_hex(self.scriptPubKey))
raise ValueError('scriptPubKey template fail: ' + b2a_hex(self.scriptPubKey).decode())
def is_p2sh(self):
return len(self.scriptPubKey) == 23 and self.scriptPubKey[0] == 0xa9 \

View File

@ -2,7 +2,7 @@
#
# needs local bitcoind in PATH
import os, time, uuid, socket, shutil, pytest, tempfile, subprocess, signal
import os, time, uuid, socket, shutil, pytest, tempfile, subprocess, signal, base64
from authproxy import AuthServiceProxy, JSONRPCException
@ -163,6 +163,27 @@ def match_key(bitcoind, set_master_key, reset_seed_words):
yield xfp
@pytest.fixture
def finalize_v2_v0_convert(bitcoind):
def doit(psbt_obj):
# compat wrapper - can be removed after below released
# https://github.com/bitcoin/bitcoin/pull/21283 PSBTv2
# convert v2 -> v0 if bitcoind does not support PSBTv2
# to be able to finalize
from authproxy import JSONRPCException
try:
resp = bitcoind.supply_wallet.finalizepsbt(psbt_obj.as_b64_str())
except JSONRPCException as e:
assert "Unsupported version number" in e.error["message"]
# this version of bitcoind does not support PSBTv2
# convert to v0 - needed for finalize
resp = bitcoind.supply_wallet.finalizepsbt(
base64.b64encode(psbt_obj.to_v0()).decode()
)
return resp
return doit
@pytest.fixture
def bitcoind_wallet(bitcoind):
# Use bitcoind to create a temporary wallet file

View File

@ -6,7 +6,8 @@ from ckcc.protocol import CCProtocolPacker
from helpers import B2A, U2SAT
from msg import verify_message
from api import bitcoind, match_key
from api import bitcoind_wallet, bitcoind_d_wallet, bitcoind_d_wallet_w_sk, bitcoind_d_sim_sign, bitcoind_d_sim_watch
from api import bitcoind_wallet, bitcoind_d_wallet, bitcoind_d_wallet_w_sk, bitcoind_d_sim_sign
from api import bitcoind_d_sim_watch, finalize_v2_v0_convert
from binascii import b2a_hex, a2b_hex
from constants import *
@ -34,6 +35,10 @@ def pytest_addoption(parser):
parser.addoption("--ms-danger", action="store_true",
default=False, help="Operate with multisig checks off")
parser.addoption("--psbt2", action="store_true",
default=False, help="fake_txn produces PSBTv2")
# to make bitcoind produce psbt v2 one currently needs https://github.com/achow101/bitcoin/tree/psbt2
# or wait until https://github.com/bitcoin/bitcoin/pull/21283 merged and released
@pytest.fixture(scope='session')
def dev(request):

View File

@ -103,19 +103,18 @@ def make_change_addr(wallet, style):
target = dest.hash160()
assert len(target) == 20
is_segwit = False
is_segwit = True
if style == 'p2pkh':
redeem_scr = bytes([0x76, 0xa9, 0x14]) + target + bytes([0x88, 0xac])
is_segwit = False
elif style == 'p2wpkh':
redeem_scr = bytes([0, 20]) + target
is_segwit = True
elif style == 'p2wpkh-p2sh':
redeem_scr = bytes([0, 20]) + target
actual_scr = bytes([0xa9, 0x14]) + hash160(redeem_scr) + bytes([0x87])
elif style == 'p2tr':
tweaked_xonly = taptweak(dest.sec()[1:])
redeem_scr = bytes([81, 32]) + tweaked_xonly
is_segwit = True
return redeem_scr, actual_scr, is_segwit, dest.sec()[1:], b'\x00' + struct.pack('4I', xfp, *deriv)
else:
raise ValueError('cant make fake change output of type: ' + style)

View File

@ -4,7 +4,7 @@
#
import io, struct
from binascii import b2a_hex as _b2a_hex
from pycoin.tx.Tx import Tx
from pycoin.tx.Tx import Tx, TxIn, TxOut
from pycoin.tx.script.check_signature import parse_signature_blob
from binascii import a2b_hex
from base64 import b64decode, b64encode
@ -13,35 +13,59 @@ b2a_hex = lambda a: str(_b2a_hex(a), 'ascii')
# BIP-174 aka PSBT defined values
#
PSBT_GLOBAL_UNSIGNED_TX = 0
PSBT_GLOBAL_XPUB = 1
# GLOBAL ===
PSBT_GLOBAL_UNSIGNED_TX = 0x00
PSBT_GLOBAL_XPUB = 0x01
PSBT_GLOBAL_VERSION = 0xfb
PSBT_GLOBAL_PROPRIETARY = 0xfc
PSBT_IN_NON_WITNESS_UTXO = 0
PSBT_IN_WITNESS_UTXO = 1
PSBT_IN_PARTIAL_SIG = 2
PSBT_IN_SIGHASH_TYPE = 3
PSBT_IN_REDEEM_SCRIPT = 4
PSBT_IN_WITNESS_SCRIPT = 5
PSBT_IN_BIP32_DERIVATION = 6
PSBT_IN_FINAL_SCRIPTSIG = 7
PSBT_IN_FINAL_SCRIPTWITNESS = 8
# BIP-370
PSBT_GLOBAL_TX_VERSION = 0x02
PSBT_GLOBAL_FALLBACK_LOCKTIME = 0x03
PSBT_GLOBAL_INPUT_COUNT = 0x04
PSBT_GLOBAL_OUTPUT_COUNT = 0x05
PSBT_GLOBAL_TX_MODIFIABLE = 0x06
# INPUTS ===
PSBT_IN_NON_WITNESS_UTXO = 0x00
PSBT_IN_WITNESS_UTXO = 0x01
PSBT_IN_PARTIAL_SIG = 0x02
PSBT_IN_SIGHASH_TYPE = 0x03
PSBT_IN_REDEEM_SCRIPT = 0x04
PSBT_IN_WITNESS_SCRIPT = 0x05
PSBT_IN_BIP32_DERIVATION = 0x06
PSBT_IN_FINAL_SCRIPTSIG = 0x07
PSBT_IN_FINAL_SCRIPTWITNESS = 0x08
PSBT_IN_POR_COMMITMENT = 0x09 # Proof of Reserves
PSBT_IN_RIPEMD160 = 0x0a
PSBT_IN_SHA256 = 0x0b
PSBT_IN_HASH160 = 0x0c
PSBT_IN_HASH256 = 0x0d
# BIP-370
PSBT_IN_PREVIOUS_TXID = 0x0e
PSBT_IN_OUTPUT_INDEX = 0x0f
PSBT_IN_SEQUENCE = 0x10
PSBT_IN_REQUIRED_TIME_LOCKTIME = 0x11
PSBT_IN_REQUIRED_HEIGHT_LOCKTIME = 0x12
# BIP-371
PSBT_IN_TAP_KEY_SIG = 19 # 0x13
PSBT_IN_TAP_SCRIPT_SIG = 20 # 0x14
PSBT_IN_TAP_LEAF_SCRIPT = 21 # 0x15
PSBT_IN_TAP_BIP32_DERIVATION = 22 # 0x16
PSBT_IN_TAP_INTERNAL_KEY = 23 # 0x17
PSBT_IN_TAP_MERKLE_ROOT = 24 # 0x18
PSBT_IN_TAP_KEY_SIG = 0x13
PSBT_IN_TAP_SCRIPT_SIG = 0x14
PSBT_IN_TAP_LEAF_SCRIPT = 0x15
PSBT_IN_TAP_BIP32_DERIVATION = 0x16
PSBT_IN_TAP_INTERNAL_KEY = 0x17
PSBT_IN_TAP_MERKLE_ROOT = 0x18
PSBT_OUT_REDEEM_SCRIPT = 0
PSBT_OUT_WITNESS_SCRIPT = 1
PSBT_OUT_BIP32_DERIVATION = 2
# OUTPUTS ===
PSBT_OUT_REDEEM_SCRIPT = 0x00
PSBT_OUT_WITNESS_SCRIPT = 0x01
PSBT_OUT_BIP32_DERIVATION = 0x02
# BIP-370
PSBT_OUT_AMOUNT = 0x03
PSBT_OUT_SCRIPT = 0x04
# BIP-371
PSBT_OUT_TAP_INTERNAL_KEY = 5
PSBT_OUT_TAP_TREE = 6
PSBT_OUT_TAP_BIP32_DERIVATION = 7
PSBT_PROPRIETARY = 0xFC
PSBT_OUT_TAP_INTERNAL_KEY = 0x05
PSBT_OUT_TAP_TREE = 0x06
PSBT_OUT_TAP_BIP32_DERIVATION = 0x07
PSBT_PROP_CK_ID = b"COINKITE"
@ -105,7 +129,7 @@ class PSBTSection:
kt = key[0]
self.parse_kv(kt, key[1:], val)
def serialize(self, fd, my_idx):
def serialize(self, fd, v2):
def wr(ktype, val, key=b''):
fd.write(ser_compact_size(1 + len(key)))
@ -113,7 +137,7 @@ class PSBTSection:
fd.write(ser_compact_size(len(val)))
fd.write(val)
self.serialize_kvs(wr)
self.serialize_kvs(wr, v2)
fd.write(b'\0')
@ -130,6 +154,11 @@ class BasicPSBTInput(PSBTSection):
self.taproot_key_sig = None
self.redeem_script = None
self.witness_script = None
self.previous_txid = None # v2
self.prevout_idx = None # v2
self.sequence = None # v2
self.req_time_locktime = None # v2
self.req_height_locktime = None # v2
self.others = {}
self.unknown = {}
@ -148,6 +177,11 @@ class BasicPSBTInput(PSBTSection):
a.taproot_bip32_paths == b.taproot_bip32_paths and \
a.taproot_internal_key == b.taproot_internal_key and \
sorted(a.part_sigs.keys()) == sorted(b.part_sigs.keys()) and \
a.previous_txid == b.previous_txid and \
a.prevout_idx == b.prevout_idx and \
a.sequence == b.sequence and \
a.req_time_locktime == b.req_time_locktime and \
a.req_height_locktime == b.req_height_locktime and \
a.unknown == b.unknown
if rv:
# NOTE: equality test on signatures requires parsing DER stupidness
@ -189,10 +223,20 @@ class BasicPSBTInput(PSBTSection):
self.taproot_internal_key = val
elif kt == PSBT_IN_TAP_KEY_SIG:
self.taproot_key_sig = val
elif kt == PSBT_IN_PREVIOUS_TXID:
self.previous_txid = val
elif kt == PSBT_IN_OUTPUT_INDEX:
self.prevout_idx = struct.unpack("<I", val)[0]
elif kt == PSBT_IN_SEQUENCE:
self.sequence = struct.unpack("<I", val)[0]
elif kt == PSBT_IN_REQUIRED_TIME_LOCKTIME:
self.req_time_locktime = struct.unpack("<I", val)[0]
elif kt == PSBT_IN_REQUIRED_HEIGHT_LOCKTIME:
self.req_height_locktime = struct.unpack("<I", val)[0]
else:
self.unknown[bytes([kt]) + key] = val
def serialize_kvs(self, wr):
def serialize_kvs(self, wr, v2):
if self.utxo:
wr(PSBT_IN_NON_WITNESS_UTXO, self.utxo)
if self.witness_utxo:
@ -201,21 +245,39 @@ class BasicPSBTInput(PSBTSection):
wr(PSBT_IN_REDEEM_SCRIPT, self.redeem_script)
if self.witness_script:
wr(PSBT_IN_WITNESS_SCRIPT, self.witness_script)
if self.part_sigs:
for pk, val in sorted(self.part_sigs.items()):
wr(PSBT_IN_PARTIAL_SIG, val, pk)
if self.sighash is not None:
wr(PSBT_IN_SIGHASH_TYPE, struct.pack('<I', self.sighash))
if self.bip32_paths:
for k in self.bip32_paths:
wr(PSBT_IN_BIP32_DERIVATION, self.bip32_paths[k], k)
if self.taproot_bip32_paths:
for k in self.taproot_bip32_paths:
wr(PSBT_IN_TAP_BIP32_DERIVATION, self.taproot_bip32_paths[k], k)
if self.taproot_internal_key:
wr(PSBT_IN_TAP_INTERNAL_KEY, self.taproot_internal_key)
if self.taproot_key_sig:
wr(PSBT_IN_TAP_KEY_SIG, self.taproot_key_sig)
if v2:
if self.previous_txid is not None:
wr(PSBT_IN_PREVIOUS_TXID, self.previous_txid)
if self.prevout_idx is not None:
wr(PSBT_IN_OUTPUT_INDEX, struct.pack("<I", self.prevout_idx))
if self.sequence is not None:
wr(PSBT_IN_SEQUENCE, struct.pack("<I", self.sequence))
if self.req_time_locktime is not None:
wr(PSBT_IN_REQUIRED_TIME_LOCKTIME, struct.pack("<I", self.req_time_locktime))
if self.req_height_locktime is not None:
wr(PSBT_IN_REQUIRED_HEIGHT_LOCKTIME, struct.pack("<I", self.req_height_locktime))
for k in self.others:
wr(k, self.others[k])
if isinstance(self.unknown, list):
@ -235,12 +297,16 @@ class BasicPSBTOutput(PSBTSection):
self.bip32_paths = {}
self.taproot_bip32_paths = {}
self.taproot_internal_key = None
self.script = None # v2
self.amount = None # v2
self.proprietary = {}
self.unknown = {}
def __eq__(a, b):
return a.redeem_script == b.redeem_script and \
a.witness_script == b.witness_script and \
a.script == b.script and \
a.amount == b.amount and \
a.my_index == b.my_index and \
a.bip32_paths == b.bip32_paths and \
a.taproot_bip32_paths == b.taproot_bip32_paths and \
@ -257,16 +323,20 @@ class BasicPSBTOutput(PSBTSection):
assert not key
elif kt == PSBT_OUT_BIP32_DERIVATION:
self.bip32_paths[key] = val
elif kt == PSBT_PROPRIETARY:
self.proprietary[key] = val
elif kt == PSBT_OUT_TAP_BIP32_DERIVATION:
self.taproot_bip32_paths[key] = val
elif kt == PSBT_OUT_TAP_INTERNAL_KEY:
self.taproot_internal_key = val
elif kt == PSBT_OUT_SCRIPT:
self.script = val
elif kt == PSBT_OUT_AMOUNT:
self.amount = struct.unpack("<q", val)[0]
elif kt == PSBT_GLOBAL_PROPRIETARY:
self.proprietary[key] = val
else:
self.unknown[bytes([kt]) + key] = val
def serialize_kvs(self, wr):
def serialize_kvs(self, wr, v2):
if self.redeem_script:
wr(PSBT_OUT_REDEEM_SCRIPT, self.redeem_script)
if self.witness_script:
@ -279,8 +349,14 @@ class BasicPSBTOutput(PSBTSection):
wr(PSBT_OUT_TAP_BIP32_DERIVATION, self.taproot_bip32_paths[k], k)
if self.taproot_internal_key:
wr(PSBT_OUT_TAP_INTERNAL_KEY, self.taproot_internal_key)
if v2 and self.script is not None:
wr(PSBT_OUT_SCRIPT, self.script)
if v2 and self.amount is not None:
wr(PSBT_OUT_AMOUNT, struct.pack("<q", int(self.amount)))
for k in self.proprietary:
wr(PSBT_PROPRIETARY, self.proprietary[k], k)
wr(PSBT_GLOBAL_PROPRIETARY, self.proprietary[k], k)
if isinstance(self.unknown, list):
# just so I can test duplicate unknown values
# list of tuples [(key0, val0), (key1, val1)]
@ -295,17 +371,26 @@ class BasicPSBT:
"Just? parse and store"
def __init__(self):
self.version = None
self.txn = None
self.txn_version = None
self.xpubs = []
self.input_count = None
self.output_count = None
self.inputs = []
self.outputs = []
self.txn_modifiable = None
self.fallback_locktime = None
self.unknown = {}
self.parsed_txn = None
def __eq__(a, b):
return a.txn == b.txn and \
a.input_count == b.input_count and \
a.output_count == b.output_count and \
a.fallback_locktime == b.fallback_locktime and \
a.txn_version == b.txn_version and \
a.version == b.version and \
len(a.inputs) == len(b.inputs) and \
len(a.outputs) == len(b.outputs) and \
all(a.inputs[i] == b.inputs[i] for i in range(len(a.inputs))) and \
@ -313,6 +398,9 @@ class BasicPSBT:
sorted(a.xpubs) == sorted(b.xpubs) and \
a.unknown == b.unknown
def is_v2(self):
return (self.version == 2) or (not self.txn)
def parse(self, raw):
# auto-detect and decode Base64 and Hex.
if raw[0:10].lower() == b'70736274ff':
@ -338,16 +426,43 @@ class BasicPSBT:
self.txn = val
t = Tx.parse(io.BytesIO(val))
self.parsed_txn = t
num_ins = len(t.txs_in)
num_outs = len(t.txs_out)
elif kt == PSBT_GLOBAL_XPUB:
# key=(xpub) => val=(path)
# ignore PSBT_GLOBAL_XPUB on 0th index (should not be part of parsed key)
self.xpubs.append((key[1:], val))
elif kt == PSBT_GLOBAL_VERSION:
self.version = struct.unpack("<I", val)[0]
elif kt == PSBT_GLOBAL_TX_VERSION:
self.txn_version = struct.unpack("<I", val)[0]
elif kt == PSBT_GLOBAL_FALLBACK_LOCKTIME:
self.fallback_locktime = struct.unpack("<I", val)[0]
elif kt == PSBT_GLOBAL_INPUT_COUNT:
self.input_count = deser_compact_size(io.BytesIO(val))
num_ins = self.input_count
elif kt == PSBT_GLOBAL_OUTPUT_COUNT:
self.output_count = deser_compact_size(io.BytesIO(val))
num_outs = self.output_count
elif kt == PSBT_GLOBAL_TX_MODIFIABLE:
self.txn_modifiable = val[0]
else:
self.unknown[key] = val
assert self.txn, 'missing reqd section'
if self.version is None:
# decide version based on PSBT_GLOBAL_UNSIGNED_TX field
# v0 requires inclusion
# v2 requires exclusion
self.version = 0 if self.txn else 2
if self.version == 0:
assert self.txn, 'v0: missing reqd section - PSBT_GLOBAL_UNSIGNED_TX'
elif self.version == 2:
# tx version needs to be at least 2 because locktimes
assert self.txn_version == 2, 'v2: missing reqd section - PSBT_GLOBAL_TX_VERSION'
assert self.input_count is not None, 'v2: missing reqd section - PSBT_GLOBAL_INPUT_COUNT'
assert self.output_count is not None, 'v2: missing reqd section - PSBT_GLOBAL_OUTPUT_COUNT'
self.inputs = [BasicPSBTInput(fd, idx) for idx in range(num_ins)]
self.outputs = [BasicPSBTOutput(fd, idx) for idx in range(num_outs)]
@ -358,7 +473,7 @@ class BasicPSBT:
return self
def serialize(self, fd):
v2 = self.is_v2()
def wr(ktype, val, key=b''):
ktype_plus_key = bytes([ktype]) + key
fd.write(ser_compact_size(len(ktype_plus_key)))
@ -368,11 +483,31 @@ class BasicPSBT:
fd.write(b'psbt\xff')
wr(PSBT_GLOBAL_UNSIGNED_TX, self.txn)
if (not v2) and self.txn:
wr(PSBT_GLOBAL_UNSIGNED_TX, self.txn)
for k, v in self.xpubs:
wr(PSBT_GLOBAL_XPUB, v, key=k)
if v2:
if self.txn_version is not None:
wr(PSBT_GLOBAL_TX_VERSION, struct.pack('<I', self.txn_version))
if self.fallback_locktime is not None:
wr(PSBT_GLOBAL_FALLBACK_LOCKTIME, struct.pack('<I', self.fallback_locktime))
if self.input_count is not None:
wr(PSBT_GLOBAL_INPUT_COUNT, ser_compact_size(self.input_count))
if self.output_count is not None:
wr(PSBT_GLOBAL_OUTPUT_COUNT, ser_compact_size(self.output_count))
if self.txn_modifiable is not None:
wr(PSBT_GLOBAL_TX_MODIFIABLE, bytes([self.txn_modifiable]))
if self.version is not None:
wr(PSBT_GLOBAL_VERSION, struct.pack('<I', self.version))
if isinstance(self.unknown, list):
# just so I can test duplicate unknown values
# list of tuples [(key0, val0), (key1, val1)]
@ -386,10 +521,10 @@ class BasicPSBT:
fd.write(b'\0')
for idx, inp in enumerate(self.inputs):
inp.serialize(fd, idx)
inp.serialize(fd, v2)
for idx, outp in enumerate(self.outputs):
outp.serialize(fd, idx)
outp.serialize(fd, v2)
def as_bytes(self):
with io.BytesIO() as fd:
@ -399,6 +534,56 @@ class BasicPSBT:
def as_b64_str(self):
return b64encode(self.as_bytes()).decode()
def to_v2(self):
if self.version is None or self.version == 0:
self.version = 2
self.txn_version = 2
self.txn = None
self.input_count = len(self.parsed_txn.txs_in)
self.output_count = len(self.parsed_txn.txs_out)
self.fallback_locktime = self.parsed_txn.lock_time
for idx, inp in enumerate(self.parsed_txn.txs_in):
i = self.inputs[idx]
i.previous_txid = inp.previous_hash
i.prevout_idx = inp.previous_index
i.sequence = inp.sequence
for idx, out in enumerate(self.parsed_txn.txs_out):
o = self.outputs[idx]
o.script = out.script
o.amount = out.coin_value
return self.as_bytes()
def to_v0(self):
if self.version == 2:
tx_ins = []
for inp in self.inputs:
tx_ins.append(TxIn(inp.previous_txid, inp.prevout_idx,
sequence=inp.sequence or 0xffffffff))
inp.prevout_idx = None
inp.previous_txid = None
inp.sequence = None
inp.req_time_locktime = None
inp.req_height_locktime = None
tx_outs = []
for out in self.outputs:
tx_outs.append(TxOut(coin_value=out.amount, script=out.script))
out.amount = None
out.script = None
t = Tx(version=self.txn_version, txs_in=tx_ins, txs_out=tx_outs,
lock_time=self.fallback_locktime or 0)
self.txn_version = None
self.input_count = None
self.output_count = None
self.txn_modifiable = None
self.version = None
self.parsed_txn = t
self.txn = self.parsed_txn.as_bin()
return self.as_bytes()
def test_my_psbt():
import glob, io

View File

@ -97,7 +97,7 @@ def is_ok(ec: ExitCode) -> bool:
def _run_tests_with_simulator(test_module: str, simulator_args: List[str], pytest_marks: str,
pytest_k: str, pdb: bool, failed_first: bool) -> ExitCode:
pytest_k: str, pdb: bool, failed_first: bool, psbt2=False) -> ExitCode:
sim = ColdcardSimulator(args=simulator_args)
sim.start()
time.sleep(1)
@ -110,6 +110,9 @@ def _run_tests_with_simulator(test_module: str, simulator_args: List[str], pytes
cmd_list.append("--pdb")
if failed_first:
cmd_list.append("--ff")
if psbt2:
cmd_list.append("--psbt2")
exit_code = pytest.main(cmd_list)
sim.stop()
time.sleep(1)
@ -119,11 +122,11 @@ def _run_tests_with_simulator(test_module: str, simulator_args: List[str], pytes
def run_tests_with_simulator(test_module=None, simulator_args=None, pytest_k=None, pdb=False,
failed_first=False,
failed_first=False, psbt2=False,
pytest_marks="not onetime and not veryslow and not manual"):
failed = []
exit_code = _run_tests_with_simulator(test_module, simulator_args, pytest_marks, pytest_k,
pdb, failed_first)
pdb, failed_first, psbt2=psbt2)
if not is_ok(exit_code):
# no success, no nothing - give failed another try, each alone with its own simulator
last_failed = get_last_failed()
@ -131,7 +134,7 @@ def run_tests_with_simulator(test_module=None, simulator_args=None, pytest_k=Non
exit_codes = []
for failed_test in last_failed:
exit_code_2 = _run_tests_with_simulator(failed_test, simulator_args, pytest_marks,
pytest_k, pdb, failed_first)
pytest_k, pdb, failed_first, psbt2=psbt2)
exit_codes.append(exit_code_2)
if not is_ok(exit_code_2):
failed.append(failed_test)
@ -191,6 +194,7 @@ def main():
help="Choose how much to sleep after simulator is started")
parser.add_argument("-m", "--module", action="append", help="Choose only n modules to run")
parser.add_argument("--pdb", action="store_true", help="Go to debugger on failure")
parser.add_argument("--psbt2", action="store_true", help="`fake_txn` produces PSBTv2")
parser.add_argument("--ff", action="store_true", help="Run the last failures first")
parser.add_argument("--onetime", action="store_true", default=False,
help="run tests marked as 'onetime'")
@ -246,7 +250,7 @@ def main():
test_args = ["--set", "nfc=1"]
ec, failed_tests = run_tests_with_simulator(test_module, simulator_args=test_args,
pytest_k=args.pytest_k, pdb=args.pdb,
failed_first=args.ff)
failed_first=args.ff, psbt2=args.psbt2)
result.append((test_module, ec, failed_tests))
print("Done", test_module)
print(80 * "=")
@ -257,7 +261,7 @@ def main():
ec, failed_tests = run_tests_with_simulator(test_module=None, pytest_marks="veryslow",
pytest_k=args.pytest_k, pdb=args.pdb,
simulator_args=DEFAULT_SIMULATOR_ARGS,
failed_first=args.ff)
failed_first=args.ff, psbt2=args.psbt2)
result.append(("veryslow", ec, failed_tests))
# run onetime is specified (each test against its own simulator)
if args.onetime:
@ -266,7 +270,8 @@ def main():
for onetime_test in onetime_tests:
ec, failed_tests = run_tests_with_simulator(test_module=onetime_test, pdb=args.pdb,
failed_first=args.ff, pytest_marks="onetime",
simulator_args=DEFAULT_SIMULATOR_ARGS,)
simulator_args=DEFAULT_SIMULATOR_ARGS,
psbt2=args.psbt2)
result.append((f"onetime: {onetime_test}", ec, failed_tests))
print("All done")
any_failed = False

View File

@ -9,20 +9,25 @@
# - command line: py.test test_hsm.py --dev -s --ff
# - no microSD card installed
#
from typing import OrderedDict
import pytest, time, struct, os, itertools, base64, re
from binascii import b2a_hex, a2b_hex
from hashlib import sha256
from ckcc_protocol.protocol import MAX_MSG_LEN, CCProtocolPacker, CCProtoError
from ckcc_protocol.protocol import CCUserRefused, CCProtoError
from ckcc_protocol.protocol import USER_AUTH_TOTP, USER_AUTH_HOTP, USER_AUTH_HMAC
from ckcc_protocol.utils import calc_local_pincode
import json
import pytest, time, itertools, base64, re, json, struct
from collections import OrderedDict
from binascii import b2a_hex, a2b_base64
from io import BytesIO
from base64 import b32encode
from hashlib import pbkdf2_hmac, sha256
from hmac import HMAC
from onetimepass import get_hotp
from objstruct import ObjectStruct as DICT
from txn import *
from txn import render_address, fake_txn
from psbt import ser_prop_key
from helpers import sign_msg, prandom
from ckcc_protocol.constants import *
from ckcc_protocol.protocol import CCProtocolPacker
from ckcc_protocol.protocol import CCUserRefused, CCProtoError
from ckcc_protocol.utils import calc_local_pincode
from pycoin.tx.Tx import Tx
from pycoin.tx.TxOut import TxOut
TEST_USERS = {
# time based OTP
@ -325,7 +330,6 @@ def tweak_hsm_method(sim_exec):
@pytest.fixture
def load_hsm_users(dev, settings_set):
def doit(u=None):
from base64 import b32encode
TEST_USERS['pw'][1] = b32encode(calc_hmac_key(dev.serial)).decode('ascii').rstrip('=')
settings_set('usr', u or TEST_USERS)
@ -643,7 +647,6 @@ def test_whitelist_multi(dev, start_hsm, tweak_rule, attempt_psbt, fake_txn, amo
assert nwl in dests
def test_whitelist_invalid_attestation(start_hsm, attempt_psbt, fake_txn):
from psbt import ser_prop_key
ID = b"COINKITE"
SUBTYPE = 0
@ -670,20 +673,20 @@ def test_whitelist_invalid_attestation(start_hsm, attempt_psbt, fake_txn):
attempt_psbt(psbt, 'non-whitelisted attestation key') # this is a valid sig for some message but it will produce the wrong pubkey in our case
def test_whitelist_valid_attestation(start_hsm, attempt_psbt, fake_txn):
from psbt import ser_prop_key
from ckcc_protocol.constants import AF_CLASSIC, AF_P2WPKH_P2SH, AF_P2WPKH
CK_ID = b"COINKITE"
ATTESTATION_SUBTYPE = 0
def attest(psbt, privkeys):
# generate valid sigs for our txouts
from io import BytesIO
from pycoin.tx.Tx import Tx
from pycoin.tx.TxOut import TxOut
from helpers import sign_msg
txn = Tx.from_bin(psbt.txn)
for idx, txout in enumerate(txn.txs_out):
if psbt.txn is None:
assert psbt.version == 2
tx_outs = [TxOut(out.amount, out.script) for out in psbt.outputs]
else:
txn = Tx.from_bin(psbt.txn)
tx_outs = txn.txs_out
for idx, txout in enumerate(tx_outs):
fd = BytesIO()
txout.stream(fd)
fd.seek(0)
@ -693,10 +696,12 @@ def test_whitelist_valid_attestation(start_hsm, attempt_psbt, fake_txn):
psbt.outputs[idx].proprietary[(ser_prop_key(CK_ID, ATTESTATION_SUBTYPE))] = sig
# we are testing signing with the following address types: legacy, wrapped segwit, native segwit
whitelist = ["mxgE6pFVo9ob5dtLhVZTMuZWwgYxWjqWvr", "2MwZkXTNYmBz5tsRLesLVubxf81TJseHMpZ", "tb1qetnxp3hgajcnvdzg5u6u7jg0av9e3gv2848fq7"]
whitelist = ["mxgE6pFVo9ob5dtLhVZTMuZWwgYxWjqWvr",
"2MwZkXTNYmBz5tsRLesLVubxf81TJseHMpZ",
"tb1qetnxp3hgajcnvdzg5u6u7jg0av9e3gv2848fq7"]
attesters = [("cRvMu9BCaC1YX3XsEvURvjGVfSoxTJ1doJMrMbsSedniFYYfcTYC", AF_CLASSIC),
("cVwmTYzFfQSR1XiEHeB3sDWBYyKJFGZSuARXpnxsQW59ucUj6nw4", AF_P2WPKH_P2SH),
("cTLgBv9qechEAted1VwMwKdqHbfL51X5JN2WBS7JMU6v4EdErset", AF_P2WPKH)]
("cVwmTYzFfQSR1XiEHeB3sDWBYyKJFGZSuARXpnxsQW59ucUj6nw4", AF_P2WPKH_P2SH),
("cTLgBv9qechEAted1VwMwKdqHbfL51X5JN2WBS7JMU6v4EdErset", AF_P2WPKH)]
policy = DICT(rules=[dict(whitelist=whitelist, whitelist_opts=dict(mode="ATTEST"))])
start_hsm(policy)
@ -833,9 +838,6 @@ def enter_local_code(need_keypress):
# dev serial number is part of salt, stored PW value, and challenge
# both need to follow that.
def calc_hmac_key(serial, secret='abcd1234'):
from hashlib import pbkdf2_hmac, sha256
from ckcc_protocol.constants import PBKDF2_ITER_COUNT
salt = sha256(b'pepper'+serial.encode('ascii')).digest()
#key = pbkdf2_hmac('sha256', secret.encode('ascii'), salt, PBKDF2_ITER_COUNT)
key = pbkdf2_hmac('sha512', secret.encode('ascii'), salt, PBKDF2_ITER_COUNT)[0:32]
@ -844,9 +846,6 @@ def calc_hmac_key(serial, secret='abcd1234'):
@pytest.fixture
def auth_user(dev):
from onetimepass import get_hotp
class State:
def __init__(self):
# start time only; don't want to wait 30 seconds between steps
@ -857,8 +856,6 @@ def auth_user(dev):
def __call__(self, username, garbage=False, do_replay=False):
# calc right values!
from base64 import b32decode
mode, secret, _ = TEST_USERS[username]
if garbage:
@ -869,8 +866,6 @@ def auth_user(dev):
assert username == 'pw'
cnt = 0
from hmac import HMAC
key = calc_hmac_key(dev.serial)
pw = HMAC(key, self.psbt_hash, sha256).digest()
@ -1246,8 +1241,7 @@ def test_min_users_perms(dev, quick_start_hsm, load_hsm_users, fake_txn,
attempt_psbt(psbt, 'need user(s) confirmation')
def calc_local_pincode(psbt_sha, next_local_code):
from binascii import a2b_base64
import struct, hmac
import hmac
key = a2b_base64(next_local_code)
assert len(key) >= 15
@ -1292,8 +1286,6 @@ def test_local_conf(dev, quick_start_hsm, tweak_rule, load_hsm_users, fake_txn,
def worst_case_policy():
MAX_NUMBER_USERS = 30 # from shared/users.py
from helpers import prandom
from base64 import b32encode
users = {f'user{i:02d}': [1, b32encode(prandom(10)).decode('ascii'), 0]
for i in range(MAX_NUMBER_USERS)}
@ -1384,7 +1376,7 @@ def test_priv_over_ux(quick_start_hsm, hsm_status, load_hsm_users):
@pytest.mark.parametrize("allow_op_return", [False, True])
def test_op_return_output_local(op_return_data, start_hsm, attempt_psbt, fake_txn, allow_op_return):
dests = []
psbt = fake_txn(2, 2, op_return = (0, op_return_data), capture_scripts=dests)
psbt = fake_txn(2, 2, op_return=(0, op_return_data), capture_scripts=dests)
if allow_op_return:
policy = DICT(rules=[dict(whitelist=[render_address(d) for d in dests[0:2]],
whitelist_opts=dict(allow_zeroval_outs=True))])

View File

@ -2212,9 +2212,10 @@ def test_legacy_multisig_witness_utxo_in_psbt(bitcoind, use_regtest, clear_ms, m
@pytest.mark.parametrize("m_n", [(2,2), (3, 5), (15, 15)])
@pytest.mark.parametrize("desc_type", ["p2wsh_desc", "p2sh_p2wsh_desc", "p2sh_desc"])
@pytest.mark.parametrize("sighash", list(SIGHASH_MAP.keys()))
@pytest.mark.parametrize("psbt_v2", [True, False])
def test_bitcoind_MofN_tutorial(m_n, desc_type, clear_ms, goto_home, need_keypress, pick_menu_item,
sighash, cap_menu, cap_story, microsd_path, use_regtest, bitcoind,
microsd_wipe, load_export, settings_set):
microsd_wipe, load_export, settings_set, psbt_v2, finalize_v2_v0_convert):
# 2of2 case here is described in docs with tutorial
M, N = m_n
settings_set("sighshchk", 1) # disable checks
@ -2348,6 +2349,13 @@ def test_bitcoind_MofN_tutorial(m_n, desc_type, clear_ms, goto_home, need_keypre
for signer in bitcoind_signers:
half_signed_psbt = signer.walletprocesspsbt(psbt, True, sighash, True, False) # do not finalize
psbt = half_signed_psbt["psbt"]
if psbt_v2:
# below is noop is psbt is already v2
po = BasicPSBT().parse(base64.b64decode(psbt))
po.to_v2()
psbt = po.as_b64_str()
name = f"hsc_{M}of{N}_{desc_type}.psbt"
with open(microsd_path(name), "w") as f:
f.write(psbt)
@ -2384,10 +2392,14 @@ def test_bitcoind_MofN_tutorial(m_n, desc_type, clear_ms, goto_home, need_keypre
assert "Updated PSBT is:" in story
need_keypress("y")
os.remove(microsd_path(name))
fname = story.split("\n\n")[-1]
with open(microsd_path(fname), "r") as f:
final_psbt = f.read().strip()
res = bitcoind_watch_only.finalizepsbt(final_psbt)
po = BasicPSBT().parse(base64.b64decode(final_psbt))
res = finalize_v2_v0_convert(po)
assert res["complete"]
tx_hex = res["hex"]
res = bitcoind_watch_only.testmempoolaccept([tx_hex])
@ -2446,6 +2458,10 @@ def test_bitcoind_MofN_tutorial(m_n, desc_type, clear_ms, goto_home, need_keypre
fname = story.split("\n\n")[-1]
with open(microsd_path(fname), "r") as f:
cc_signed_psbt = f.read().strip()
po = BasicPSBT().parse(base64.b64decode(cc_signed_psbt))
cc_signed_psbt = finalize_v2_v0_convert(po)["psbt"]
# CC already signed - now all bitcoin signers
for signer in bitcoind_signers:
res1 = signer.walletprocesspsbt(cc_signed_psbt, True, sighash)

View File

@ -359,14 +359,16 @@ def test_vs_bitcoind(match_key, use_regtest, check_against_bitcoind, bitcoind, s
if hex(got_xfp) != hex(wallet_xfp):
raise pytest.xfail("wrong HD master key fingerprint")
# pull out included txn
txn2 = B2A(mine.txn)
if mine.txn:
# pull out included txn
txn2 = B2A(mine.txn)
# verify against how bitcoind reads it
check_against_bitcoind(txn2, fee)
else:
assert mine.version == 2
start_sign(psbt, finalize=we_finalize)
# verify against how bitcoind reads it
check_against_bitcoind(txn2, fee)
signed = end_sign(accept=True, finalize=we_finalize)
open('debug/vs-signed.psbt', 'wb').write(signed)
@ -962,11 +964,15 @@ def test_finalization_vs_bitcoind(match_key, use_regtest, check_against_bitcoind
if hex(got_xfp) != hex(wallet_xfp):
raise pytest.xfail("wrong HD master key fingerprint")
# pull out included txn
txn2 = B2A(mine.txn)
if mine.txn:
# pull out included txn (only available in PSBTv0)
txn2 = B2A(mine.txn)
# verify against how bitcoind reads it
check_against_bitcoind(txn2, fee)
else:
assert mine.version == 2
start_sign(psbt, finalize=True)
# verify against how bitcoind reads it
check_against_bitcoind(txn2, fee)
signed_final = end_sign(accept=True, finalize=True)
assert signed_final[0:4] != b'psbt', "expecting raw bitcoin txn"
@ -1789,8 +1795,9 @@ def test_duplicate_unknow_values_in_psbt(dev, start_sign, end_sign, fake_txn):
@pytest.fixture
def _test_single_sig_sighash(microsd_wipe, microsd_path, goto_home, cap_story, need_keypress,
bitcoind, bitcoind_d_sim_watch, settings_set):
def doit(addr_fmt, sighash, num_inputs=2, num_outputs=2, consolidation=False, sh_checks=False):
bitcoind, bitcoind_d_sim_watch, settings_set, finalize_v2_v0_convert):
def doit(addr_fmt, sighash, num_inputs=2, num_outputs=2, consolidation=False, sh_checks=False,
psbt_v2=False):
from decimal import Decimal, ROUND_DOWN
settings_set("sighshchk", int(not sh_checks))
@ -1836,7 +1843,12 @@ def _test_single_sig_sighash(microsd_wipe, microsd_path, goto_home, cap_story, n
i.sighash = SIGHASH_MAP.get(sighash[0], sighash[0])
else:
i.sighash = SIGHASH_MAP.get(sighash[idx], sighash[idx])
psbt_sh = x.as_b64_str()
if psbt_v2:
# below is noop is psbt is already v2
psbt_sh = base64.b64encode(x.to_v2()).decode()
else:
psbt_sh = x.as_b64_str()
# make useful reference psbt along the way
open(f'debug/sighash-{sighash[0] if len(sighash) == 1 else "MIX"}.psbt'\
@ -1905,13 +1917,39 @@ def _test_single_sig_sighash(microsd_wipe, microsd_path, goto_home, cap_story, n
for _, sig in i.part_sigs.items():
assert sig[-1] == i.sighash
resp = bitcoind_d_sim_watch.finalizepsbt(cc_psbt)
resp = finalize_v2_v0_convert(y)
assert resp["complete"] is True
tx_hex = resp["hex"]
assert tx_hex == cc_tx_hex
res = bitcoind.supply_wallet.testmempoolaccept([tx_hex])
if psbt_v2:
# check txn_modifiable properly set
po = BasicPSBT().parse(base64.b64decode(cc_psbt))
mod = po.txn_modifiable
used_sh = [SIGHASH_MAP[sh] for sh in sighash]
if all(sh > 128 for sh in used_sh):
# all sighash flags are ANYONECANPAY
assert mod & 1 # allow inputs modification
else:
assert mod & 1 == 0 # inputs modification not allowed
if all(sh in (2, 130) for sh in used_sh):
# all sighash flags are NONE
assert mod & 2 # allow outputs modification
else:
assert mod & 2 == 0
if any(sh in (3, 131) for sh in used_sh):
# some sighash flag/s are SINGLE
assert mod & 4 # allow outputs modification
else:
assert mod & 4 == 0
# for PSBTv2 here we check if we correctly finalize
res = bitcoind.supply_wallet.testmempoolaccept([cc_tx_hex])
assert res[0]["allowed"]
txn_id = bitcoind.supply_wallet.sendrawtransaction(tx_hex)
txn_id = bitcoind.supply_wallet.sendrawtransaction(cc_tx_hex)
assert tx_id == txn_id
return doit
@ -1922,43 +1960,48 @@ def _test_single_sig_sighash(microsd_wipe, microsd_path, goto_home, cap_story, n
@pytest.mark.parametrize("sighash", [sh for sh in SIGHASH_MAP if sh != 'ALL'])
@pytest.mark.parametrize("num_outs", [1, 3, 5])
@pytest.mark.parametrize("num_ins", [2, 5])
def test_sighash_same(addr_fmt, sighash, num_ins, num_outs, _test_single_sig_sighash):
@pytest.mark.parametrize("psbt_v2", [True, False])
def test_sighash_same(addr_fmt, sighash, num_ins, num_outs, psbt_v2, _test_single_sig_sighash):
# sighash is the same among all inputs
_test_single_sig_sighash(addr_fmt, [sighash], num_inputs=num_ins, num_outputs=num_outs)
_test_single_sig_sighash(addr_fmt, [sighash], num_inputs=num_ins, num_outputs=num_outs,
psbt_v2=psbt_v2)
@pytest.mark.bitcoind
@pytest.mark.parametrize("addr_fmt", ["legacy", "p2sh-segwit", "bech32"])
@pytest.mark.parametrize("sighash", list(itertools.combinations(SIGHASH_MAP.keys(), 2)))
@pytest.mark.parametrize("num_outs", [2, 3, 5])
def test_sighash_different(addr_fmt, sighash, num_outs, _test_single_sig_sighash):
@pytest.mark.parametrize("psbt_v2", [True, False])
def test_sighash_different(addr_fmt, sighash, num_outs, psbt_v2, _test_single_sig_sighash):
# sighash differ among all inputs
_test_single_sig_sighash(addr_fmt, sighash, num_inputs=2, num_outputs=num_outs)
_test_single_sig_sighash(addr_fmt, sighash, num_inputs=2, num_outputs=num_outs,
psbt_v2=psbt_v2)
@pytest.mark.bitcoind
@pytest.mark.parametrize("addr_fmt", ["legacy", "p2sh-segwit", "bech32"])
@pytest.mark.parametrize("num_outs", [5, 8])
def test_sighash_fullmix(addr_fmt, num_outs, _test_single_sig_sighash):
@pytest.mark.parametrize("psbt_v2", [True, False])
def test_sighash_fullmix(addr_fmt, num_outs, psbt_v2, _test_single_sig_sighash):
# tx with 6 inputs representing all possible sighashes
_test_single_sig_sighash(addr_fmt, tuple(SIGHASH_MAP.keys()), num_inputs=6,
num_outputs=num_outs)
num_outputs=num_outs, psbt_v2=psbt_v2)
@pytest.mark.bitcoind
@pytest.mark.parametrize("sighash", [sh for sh in SIGHASH_MAP if sh != 'ALL'])
def test_sighash_disallowed_consolidation(sighash, _test_single_sig_sighash):
# sighash != ALL blocked for pure consolidations
_test_single_sig_sighash("bech32", [sighash], num_inputs=2,
num_outputs=2, sh_checks=True, consolidation=True)
_test_single_sig_sighash("bech32", [sighash], num_inputs=2, num_outputs=2,
sh_checks=True, consolidation=True)
@pytest.mark.bitcoind
@pytest.mark.parametrize("sighash", ["NONE", "NONE|ANYONECANPAY"])
def test_sighash_disallowed_NONE(sighash, _test_single_sig_sighash):
# sighash is the same among all inputs
_test_single_sig_sighash("bech32", [sighash], num_inputs=2, num_outputs=2, consolidation=False,
sh_checks=True)
_test_single_sig_sighash("bech32", [sighash], num_inputs=2, num_outputs=2,
consolidation=False, sh_checks=True)
@pytest.mark.bitcoind
@ -2077,4 +2120,103 @@ def test_batch_sign(num_tx, ui_path, action, fake_txn, need_keypress,
time.sleep(.5)
title, story = cap_story()
@pytest.mark.parametrize("desc_psbt_hex", [
("PSBTv0 but with PSBT_GLOBAL_VERSION set to 2.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc68850000000001fb0402000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a2700220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a000000002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv0 but with PSBT_GLOBAL_TX_VERSION.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc68850000000001020402000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a2700220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a000000002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv0 but with PSBT_GLOBAL_FALLBACK_LOCKTIME.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc68850000000001030402000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a2700220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a000000002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv0 but with PSBT_GLOBAL_INPUT_COUNT.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc68850000000001040102000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a2700220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a000000002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv0 but with PSBT_GLOBAL_OUTPUT_COUNT.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc68850000000001050102000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a2700220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a000000002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv0 but with PSBT_GLOBAL_TX_MODIFIABLE.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc68850000000001060100000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a2700220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a000000002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv0 but with PSBT_IN_PREVIOUS_TXID.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc688500000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a27010e200b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc800220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a000000002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv0 but with PSBT_IN_OUTPUT_INDEX.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc688500000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a27010f040000000000220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a000000002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv0 but with PSBT_IN_SEQUENCE.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc688500000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a27011004ffffffff00220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a000000002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv0 but with PSBT_IN_REQUIRED_TIME_LOCKTIME.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc688500000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a270111048c8dc46200220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a000000002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv0 but with PSBT_IN_REQUIRED_HEIGHT_LOCKTIME.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc688500000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a270112041027000000220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a000000002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv0 but with PSBT_OUT_AMOUNT.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc688500000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a2700220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a0000000103080008af2f00000000002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv0 but with PSBT_OUT_SCRIPT.", "70736274ff01007102000000010b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc80000000000feffffff020008af2f00000000160014c430f64c4756da310dbd1a085572ef299926272c8bbdeb0b00000000160014a07dac8ab6ca942d379ed795f835ba71c9cc688500000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e01086b02473044022005275a485734e0ae1f3b971237586f0e72dc85833d278c0e474cd23112c0fa5e02206b048c83cebc3c41d0b93cc7da76185cedbd030d005b08018be2b98bbacbdf7b012103760dcca05f3997dc65b293060f7f29f1514c8c527048e12802b041d4fc340a2700220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a0000000104160014a07dac8ab6ca942d379ed795f835ba71c9cc6885002202036efe2c255621986553ba9d65c3ddc64165ca1436e05aa35a4c6eb02451cf796d18f69d873e540000800100008000000080010000006200000000"),
("PSBTv2 missing PSBT_GLOBAL_INPUT_COUNT.", "70736274ff01020402000000010304000000000105010201fb0402000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e010e200b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc8010f0400000000011004feffffff00220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a0000000103080008af2f000000000104160014c430f64c4756da310dbd1a085572ef299926272c00220202e36fbff53dd534070cf8fd396614680f357a9b85db7340bf1cfa745d2ad7b34018f69d873e54000080010000800000008001000000640000000103088bbdeb0b0000000001041600144dd193ac964a56ac1b9e1cca8454fe2f474f851300"),
("PSBTv2 missing PSBT_GLOBAL_OUTPUT_COUNT.", "70736274ff01020402000000010304000000000104010101fb0402000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e010e200b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc8010f0400000000011004feffffff00220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a0000000103080008af2f000000000104160014c430f64c4756da310dbd1a085572ef299926272c00220202e36fbff53dd534070cf8fd396614680f357a9b85db7340bf1cfa745d2ad7b34018f69d873e54000080010000800000008001000000640000000103088bbdeb0b0000000001041600144dd193ac964a56ac1b9e1cca8454fe2f474f851300"),
("PSBTv2 missing PSBT_IN_PREVIOUS_TXID.", "70736274ff0102040200000001030400000000010401010105010201fb0402000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e010f0400000000011004feffffff00220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a0000000103080008af2f000000000104160014c430f64c4756da310dbd1a085572ef299926272c00220202e36fbff53dd534070cf8fd396614680f357a9b85db7340bf1cfa745d2ad7b34018f69d873e54000080010000800000008001000000640000000103088bbdeb0b0000000001041600144dd193ac964a56ac1b9e1cca8454fe2f474f851300"),
("PSBTv2 missing PSBT_IN_OUTPUT_INDEX.", "70736274ff0102040200000001030400000000010401010105010201fb0402000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e010e200b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc8011004feffffff00220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a0000000103080008af2f000000000104160014c430f64c4756da310dbd1a085572ef299926272c00220202e36fbff53dd534070cf8fd396614680f357a9b85db7340bf1cfa745d2ad7b34018f69d873e54000080010000800000008001000000640000000103088bbdeb0b0000000001041600144dd193ac964a56ac1b9e1cca8454fe2f474f851300"),
("PSBTv2 missing PSBT_OUT_AMOUNT.", "70736274ff0102040200000001030400000000010401010105010201fb0402000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e010e200b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc8010f0400000000011004feffffff00220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a0000000104160014c430f64c4756da310dbd1a085572ef299926272c00220202e36fbff53dd534070cf8fd396614680f357a9b85db7340bf1cfa745d2ad7b34018f69d873e54000080010000800000008001000000640000000103088bbdeb0b0000000001041600144dd193ac964a56ac1b9e1cca8454fe2f474f851300"),
("PSBTv2 missing PSBT_OUT_SCRIPT.", "70736274ff0102040200000001030400000000010401010105010201fb0402000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e010e200b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc8010f0400000000011004feffffff00220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a0000000103080008af2f0000000000220202e36fbff53dd534070cf8fd396614680f357a9b85db7340bf1cfa745d2ad7b34018f69d873e54000080010000800000008001000000640000000103088bbdeb0b0000000001041600144dd193ac964a56ac1b9e1cca8454fe2f474f851300"),
("PSBTv2 with PSBT_IN_REQUIRED_TIME_LOCKTIME less than 500000000.", "70736274ff01020402000000010401010105010201fb0402000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e010e200b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc8010f0400000000011104ff64cd1d00220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a0000000103080008af2f000000000104160014c430f64c4756da310dbd1a085572ef299926272c00220202e36fbff53dd534070cf8fd396614680f357a9b85db7340bf1cfa745d2ad7b34018f69d873e54000080010000800000008001000000640000000103088bbdeb0b0000000001041600144dd193ac964a56ac1b9e1cca8454fe2f474f851300"),
("PSBTv2 with PSBT_IN_REQUIRED_HEIGHT_LOCKTIME greater than or equal to 500000000.", "70736274ff01020402000000010401010105010201fb0402000000000100520200000001c1aa256e214b96a1822f93de42bff3b5f3ff8d0519306e3515d7515a5e805b120000000000ffffffff0118c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e0000000001011f18c69a3b00000000160014b0a3af144208412693ca7d166852b52db0aef06e010e200b0ad921419c1c8719735d72dc739f9ea9e0638d1fe4c1eef0f9944084815fc8010f04000000000112040065cd1d00220202d601f84846a6755f776be00e3d9de8fb10acc935fb83c45fb0162d4cad5ab79218f69d873e540000800100008000000080000000002a0000000103080008af2f000000000104160014c430f64c4756da310dbd1a085572ef299926272c00220202e36fbff53dd534070cf8fd396614680f357a9b85db7340bf1cfa745d2ad7b34018f69d873e54000080010000800000008001000000640000000103088bbdeb0b0000000001041600144dd193ac964a56ac1b9e1cca8454fe2f474f851300"),
])
def test_v2_psbt_bip370_invalid(desc_psbt_hex, start_sign, cap_story):
desc, psbt_hex = desc_psbt_hex
psbt = bytes.fromhex(psbt_hex)
print(desc)
start_sign(psbt)
title, story = cap_story()
assert title == "Failure"
assert ".py:" in story # problem file line
@pytest.mark.bitcoind
@pytest.mark.parametrize("outstyle", ADDR_STYLES_SINGLE)
@pytest.mark.parametrize("segwit_in", [True, False])
@pytest.mark.parametrize("wrapped_segwit_in", [True, False])
def test_psbt_v2(outstyle, segwit_in, wrapped_segwit_in, fake_txn , start_sign, end_sign, cap_story,
microsd_path, bitcoind, finalize_v2_v0_convert):
psbt = fake_txn(2, 2, segwit_in=segwit_in, wrapped=wrapped_segwit_in, change_outputs=[0],
outstyles=[outstyle], psbt_v2=True)
start_sign(psbt)
title, story = cap_story()
assert title == "OK TO SEND?"
# we do not understand change in taproot (taproot not supported)
assert "Consolidating" not in story
assert "Change back" in story
# but we should show address
assert "to script" not in story
signed = end_sign(accept=True, finalize=False)
assert signed
po = BasicPSBT().parse(signed)
assert po.version == 2
assert po.txn_version == 2
assert po.input_count is not None
assert po.output_count is not None
for inp in po.inputs:
assert inp.previous_txid
assert inp.prevout_idx is not None
for out in po.outputs:
assert out.amount
assert out.script
resp = finalize_v2_v0_convert(po)
assert resp["complete"] is True
@pytest.mark.bitcoind
@pytest.mark.parametrize("way", ["i+", "i-", "o+", "o-"])
def test_psbt_v2_global_quantities(way, fake_txn, start_sign, end_sign, cap_story,
microsd_path, bitcoind, finalize_v2_v0_convert):
def hacker(psbt, way):
actual_len_i = len(psbt.inputs)
actual_len_o = len(psbt.outputs)
if way == "i-":
psbt.input_count = actual_len_i - 1
elif way == "i+":
psbt.input_count = actual_len_i - 1
elif way == "o-":
psbt.output_count = actual_len_o - 1
elif way == "o+":
psbt.output_count = actual_len_o + 1
psbt = fake_txn(2, 2, segwit_in=True, wrapped=True, change_outputs=[0],
outstyles=["p2pkh", "p2wpkh"], psbt_v2=True,
psbt_hacker=lambda psbt: hacker(psbt, way))
with open(f"/home/scg/PycharmProjects/afirmware/unix/work/MicroSD/{way}.psbt", "wb") as f:
f.write(psbt)
start_sign(psbt)
title, story = cap_story()
assert "failed" in story or "Invalid PSBT" in story or "Network fee bigger" in story
# EOF

View File

@ -52,19 +52,36 @@ def simple_fake_txn():
return doit
@pytest.fixture()
def fake_txn(dev):
def fake_txn(dev, pytestconfig):
# make various size txn's ... completely fake and pointless values
# - but has UTXO's to match needs
# - input total = num_inputs * 1BTC
from pycoin.tx.Tx import Tx
from pycoin.tx.TxIn import TxIn
from pycoin.tx.TxOut import TxOut
from pycoin.encoding import hash160
from struct import pack
def doit(num_ins, num_outs, master_xpub=None, subpath="0/%d", fee=10000,
invals=None, outvals=None, segwit_in=False, outstyles=['p2pkh'], psbt_hacker=None,
change_outputs=[], capture_scripts=None, add_xpub=None, op_return=None):
invals=None, outvals=None, segwit_in=False, wrapped=False,
outstyles=['p2pkh'], psbt_hacker=None, change_outputs=[],
capture_scripts=None, add_xpub=None, op_return=None,
psbt_v2=None):
psbt = BasicPSBT()
if psbt_v2 is None:
# anything passed directly to this function overrides
# pytest flag --psbt2 - only care about pytest flag
# if psbt_v2 is not specified (None)
psbt_v2 = pytestconfig.getoption('psbt2')
if psbt_v2:
psbt.version = 2
psbt.txn_version = 2
psbt.input_count = num_ins
psbt.output_count = num_outs
txn = Tx(2,[],[])
master_xpub = master_xpub or dev.master_xpub or simulator_fixed_xprv
@ -93,6 +110,10 @@ def fake_txn(dev):
if segwit_in:
# p2wpkh
scr = bytes([0x00, 0x14]) + subkey.hash160()
if wrapped:
# p2sh-p2wpkh
psbt.inputs[i].redeem_script = scr
scr = bytes([0xa9, 0x14]) + hash160(scr) + bytes([0x87])
else:
# p2pkh
scr = bytes([0x76, 0xa9, 0x14]) + subkey.hash160() + bytes([0x88, 0xac])
@ -107,10 +128,17 @@ def fake_txn(dev):
supply.txs_out[-1].stream(fd)
psbt.inputs[i].witness_utxo = fd.getvalue()
if psbt_v2:
psbt.inputs[i].previous_txid = supply.hash()
psbt.inputs[i].prevout_idx = 0
# TODO sequence
# TODO height timelock
# TODO time timelock
spendable = TxIn(supply.hash(), 0)
txn.txs_in.append(spendable)
from binascii import hexlify as b2a_hex
for i in range(num_outs):
# random P2PKH
if not outstyles:
@ -120,6 +148,7 @@ def fake_txn(dev):
if i in change_outputs:
scr, act_scr, isw, pubkey, sp = make_change_addr(mk, style)
if len(pubkey) == 32: # xonly
psbt.outputs[i].taproot_bip32_paths[pubkey] = sp
else:
@ -131,10 +160,15 @@ def fake_txn(dev):
assert scr
act_scr = act_scr or scr
if isw:
psbt.outputs[i].witness_script = scr
elif style.endswith('sh'):
# one of these is not needed anymore in v2 as you have scriptPubkey provided by self.script
if "p2sh" in style:# in ('p2sh-p2wpkh', 'p2wpkh-p2sh'):
psbt.outputs[i].redeem_script = scr
elif isw:
psbt.outputs[i].witness_script = scr
if psbt_v2:
psbt.outputs[i].script = act_scr
psbt.outputs[i].amount = outvals[i] if outvals else int(round(((1E8*num_ins)-fee) / num_outs, 4))
if not outvals:
h = TxOut(round(((1E8*num_ins)-fee) / num_outs, 4), act_scr)
@ -154,16 +188,25 @@ def fake_txn(dev):
script = bytes([106, op_return_size]) + data
else:
script = bytes([106, 76, op_return_size]) + data
op_return_out = TxOut(amount, script)
txn.txs_out.append(op_return_out)
psbt.outputs.append(BasicPSBTOutput(idx=len(psbt.outputs)))
op_ret_o = BasicPSBTOutput(idx=len(psbt.outputs))
if psbt_v2:
op_ret_o.script = script
op_ret_o.amount = amount
psbt.output_count += 1
else:
op_return_out = TxOut(amount, script)
txn.txs_out.append(op_return_out)
psbt.outputs.append(op_ret_o)
if capture_scripts is not None:
capture_scripts.append(script)
with BytesIO() as b:
txn.stream(b)
psbt.txn = b.getvalue()
if not psbt_v2:
with BytesIO() as b:
txn.stream(b)
psbt.txn = b.getvalue()
if add_xpub:
# some people want extra xpub data in their PSBTs