# Additions Copyright 2018-2021 by Coinkite Inc. # Copyright (c) 2010 ArtForz -- public domain half-a-node # Copyright (c) 2012 Jeff Garzik # Copyright (c) 2010-2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Bitcoin Object Python Serializations Modified from the test/test_framework/mininode.py file from the Bitcoin repository CTransaction,CTxIn, CTxOut, etc....: data structures that should map to corresponding structures in bitcoin/primitives for transactions only ser_*, deser_*: functions that handle serialization/deserialization """ from ubinascii import hexlify as b2a_hex import ustruct as struct import ngu from opcodes import * from public_constants import AF_CLASSIC, AF_P2WPKH, AF_P2SH, AF_P2WSH, AF_P2TR, AF_BARE_PK # single-shot hash functions sha256 = ngu.hash.sha256s ripemd160 = ngu.hash.ripemd160 hash256 = ngu.hash.sha256d hash160 = ngu.hash.hash160 #def bytes_to_hex_str(s): # return str(b2a_hex(s), 'ascii') SIGHASH_ALL = const(1) SIGHASH_NONE = const(2) SIGHASH_SINGLE = const(3) SIGHASH_ANYONECANPAY = const(0x80) # list containing all flags that we support signing for ALL_SIGHASH_FLAGS = [ SIGHASH_ALL, SIGHASH_NONE, SIGHASH_SINGLE, SIGHASH_ALL|SIGHASH_ANYONECANPAY, SIGHASH_NONE|SIGHASH_ANYONECANPAY, SIGHASH_SINGLE|SIGHASH_ANYONECANPAY, ] # Serialization/deserialization tools def ser_compact_size(l): if l < 253: return struct.pack("B", l) elif l < 0x10000: return struct.pack("= 253 elif nit == 254: nit = struct.unpack("= 0x1_0000 elif nit == 255: nit = struct.unpack("= 0x1_0000_0000 return nit def deser_string(f): nit = deser_compact_size(f) return f.read(nit) def ser_string(s): return ser_compact_size(len(s)) + s def deser_uint256(f): r = 0 for i in range(8): t = struct.unpack(">= 32 return rs def uint256_from_str(s): r = 0 t = struct.unpack("> 24) & 0xFF v = (c & 0xFFFFFF) << (8 * (nbytes - 3)) return v def deser_vector(f, c): nit = deser_compact_size(f) r = [] for i in range(nit): t = c() t.deserialize(f) r.append(t) return r # ser_function_name: Allow for an alternate serialization function on the # entries in the vector (we use this for serializing the vector of transactions # for a witness block). def ser_vector(l, ser_function_name=None): r = ser_compact_size(len(l)) for i in l: if ser_function_name: r += getattr(i, ser_function_name)() else: r += i.serialize() return r def deser_uint256_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = deser_uint256(f) r.append(t) return r def ser_uint256_vector(l): r = ser_compact_size(len(l)) for i in l: r += ser_uint256(i) return r def deser_string_vector(f): nit = deser_compact_size(f) return [deser_string(f) for _ in range(nit)] def ser_string_vector(l): r = ser_compact_size(len(l)) for sv in l: r += ser_string(sv) return r def deser_int_vector(f): nit = deser_compact_size(f) r = [] for i in range(nit): t = struct.unpack(" OP_PUSHDATA1 + size + data elif ll <= 0xffff: return bytes([0x4d]) + struct.pack(b' OP_PUSHDATA2 else: assert False def ser_push_int(n): # push a small integer onto the stack from opcodes import OP_0, OP_1 if n == 0: return bytes([OP_0]) elif 1 <= n <= 16: return bytes([OP_1 + n - 1]) elif n <= 255: return bytes([1, n]) raise ValueError(n) def disassemble(script): # Very limited script disassembly # yeilds (int / bytes, opcode) for each part of the script # see try: offset = 0 slen = len(script) while 1: if offset >= slen: #print('dis %d done' % offset) return c = script[offset] offset += 1 if 1 <= c <= 75: cnt = c elif OP_1 <= c <= OP_16: # OP_1 thru OP_16 yield (c - OP_1 + 1, None) continue elif c == OP_PUSHDATA1: cnt = script[offset] offset += 1 elif c == OP_PUSHDATA2: # up to 65535 bytes cnt, = struct.unpack_from("H", script, offset) offset += 2 elif c == OP_PUSHDATA4: # no where to put so much data raise NotImplementedError elif c == OP_1NEGATE: yield (-1, None) continue else: # OP_0 included here yield (None, c) continue # a data push of `cnt` bytes - reject if it runs off the end if offset + cnt > slen: raise ValueError yield (script[offset:offset+cnt], None) offset += cnt except Exception as e: # import sys;sys.print_exception(e) raise ValueError("bad script") def ser_sig_der(r, s, sighash_type=1): # Take R and S values from a signature and encode into DER format. sig = b"\x30" # Make r and s as short as possible ri = 0 for b in r: if b == 0: ri += 1 else: break r = r[ri:] si = 0 for b in s: if b == 0: si += 1 else: break s = s[si:] # Make positive of neg if r[0] & (1 << 7) != 0: r = b"\x00" + r if s[0] & (1 << 7) != 0: s = b"\x00" + s # Write total length total_len = len(r) + len(s) + 4 sig += struct.pack("B", total_len) # write r sig += b"\x02" sig += struct.pack("B", len(r)) sig += r # write s sig += b"\x02" sig += struct.pack("B", len(s)) sig += s sig += struct.pack("B", sighash_type) return sig def ser_sig_compact(r, s, recid): rec = struct.unpack("B", recid)[0] prefix = struct.pack("B", 27 + 4 +rec) sig = prefix sig += r + s return sig # Objects that map to bitcoind objects, which can be serialized/deserialized MSG_WITNESS_FLAG = 1<<30 class COutPoint(object): def __init__(self, hash=0, n=0): self.hash = hash self.n = n def deserialize(self, f): self.hash = deser_uint256(f) self.n = struct.unpack(" OP_CHECKSIG # push_op is 0x21 (33) for compressed, 0x41 (65) for uncompressed pk_len = self.scriptPubKey[0] return AF_BARE_PK, self.scriptPubKey[1:1+pk_len], False if self.is_op_return(): return OP_RETURN, self.scriptPubKey, False return None, self.scriptPubKey, None def is_p2tr(self): return len(self.scriptPubKey) == 34 and \ (OP_1 <= self.scriptPubKey[0] <= OP_16) and self.scriptPubKey[1] == 0x20 def is_p2wpkh(self): return len(self.scriptPubKey) == 22 and \ self.scriptPubKey[0] == 0 and self.scriptPubKey[1] == 0x14 def is_p2wsh(self): return len(self.scriptPubKey) == 34 and \ self.scriptPubKey[0] == 0 and self.scriptPubKey[1] == 0x20 def is_p2sh(self): return len(self.scriptPubKey) == 23 and self.scriptPubKey[0] == 0xa9 \ and self.scriptPubKey[1] == 0x14 and self.scriptPubKey[22] == 0x87 def is_p2pkh(self): return len(self.scriptPubKey) == 25 and self.scriptPubKey[0] == 0x76 \ and self.scriptPubKey[1] == 0xa9 and self.scriptPubKey[2] == 0x14 \ and self.scriptPubKey[23] == 0x88 and self.scriptPubKey[24] == 0xac def is_p2pk(self): return (len(self.scriptPubKey) == 35 or len(self.scriptPubKey) == 67) \ and self.scriptPubKey[0] == len(self.scriptPubKey) - 2 \ and self.scriptPubKey[-1] == OP_CHECKSIG def is_op_return(self): return self.scriptPubKey and (self.scriptPubKey[0] == OP_RETURN) #def __repr__(self): # return "CTxOut(nValue=%d scriptPubKey=%s)" \ # % (self.nValue, b2a_hex(self.scriptPubKey)) class CScriptWitness(object): def __init__(self): # stack is a vector of strings self.stack = [] #def __repr__(self): # return "CScriptWitness(%s)" % \ # (",".join([bytes_to_hex_str(x) for x in self.stack])) def is_null(self): if self.stack: return False return True class CTxInWitness(object): def __init__(self): self.scriptWitness = CScriptWitness() def deserialize(self, f): self.scriptWitness.stack = deser_string_vector(f) def serialize(self): return ser_string_vector(self.scriptWitness.stack) #def __repr__(self): # return repr(self.scriptWitness) def is_null(self): return self.scriptWitness.is_null() class CTxWitness(object): def __init__(self): self.vtxinwit = [] def deserialize(self, f): for i in range(len(self.vtxinwit)): self.vtxinwit[i].deserialize(f) def serialize(self): r = b"" # This is different than the usual vector serialization -- # we omit the length of the vector, which is required to be # the same length as the transaction's vin vector. for x in self.vtxinwit: r += x.serialize() return r #def __repr__(self): # return "CTxWitness(%s)" % \ # (';'.join([repr(x) for x in self.vtxinwit])) def is_null(self): for x in self.vtxinwit: if not x.is_null(): return False return True class CTransaction(object): def __init__(self, tx=None): if tx is None: self.nVersion = 1 self.vin = [] self.vout = [] self.wit = CTxWitness() self.nLockTime = 0 self.sha256 = None self.hash = None else: import copy # not supported self.nVersion = tx.nVersion self.vin = copy.deepcopy(tx.vin) self.vout = copy.deepcopy(tx.vout) self.nLockTime = tx.nLockTime self.sha256 = tx.sha256 self.hash = tx.hash self.wit = copy.deepcopy(tx.wit) def deserialize(self, f): self.nVersion = struct.unpack(" 21000000 * COIN: return False return True #def __repr__(self): # return "CTransaction(nVersion=%i vin=%s vout=%s wit=%s nLockTime=%i)" \ # % (self.nVersion, repr(self.vin), repr(self.vout), repr(self.wit), self.nLockTime) # EOF