279 lines
8.5 KiB
Python
279 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright (c) 2010 ArtForz -- public domain half-a-node
|
|
# Copyright (c) 2012 Jeff Garzik
|
|
# Copyright (c) 2010-2016 The Bitcoin Core developers
|
|
# Distributed under the MIT software license, see the accompanying
|
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|
"""Bitcoin Object Python Serializations
|
|
|
|
Modified from the test/test_framework/mininode.py file from the
|
|
Bitcoin repository
|
|
|
|
CTransaction,CTxIn, CTxOut, etc....:
|
|
data structures that should map to corresponding structures in
|
|
bitcoin/primitives for transactions only
|
|
"""
|
|
|
|
import copy, struct, hashlib
|
|
|
|
from serialize import (
|
|
deser_uint256,
|
|
deser_string,
|
|
deser_string_vector,
|
|
deser_vector,
|
|
ser_uint256,
|
|
ser_string,
|
|
ser_string_vector,
|
|
ser_vector,
|
|
uint256_from_str,
|
|
)
|
|
|
|
from typing import (
|
|
List,
|
|
Optional,
|
|
)
|
|
|
|
# Objects that map to bitcoind objects, which can be serialized/deserialized
|
|
|
|
MSG_WITNESS_FLAG = 1 << 30
|
|
|
|
|
|
def hash256(data):
|
|
return hashlib.sha256(hashlib.sha256(data).digest()).digest()
|
|
|
|
class COutPoint(object):
|
|
def __init__(self, hash: int = 0, n: int = 0xffffffff):
|
|
self.hash = hash
|
|
self.n = n
|
|
|
|
def deserialize(self, f) -> None:
|
|
self.hash = deser_uint256(f)
|
|
self.n = struct.unpack("<I", f.read(4))[0]
|
|
|
|
def serialize(self) -> bytes:
|
|
r = b""
|
|
r += ser_uint256(self.hash)
|
|
r += struct.pack("<I", self.n)
|
|
return r
|
|
|
|
def __repr__(self) -> str:
|
|
return "COutPoint(hash=%064x n=%i)" % (self.hash, self.n)
|
|
|
|
|
|
class CTxIn(object):
|
|
def __init__(
|
|
self,
|
|
outpoint: Optional[COutPoint] = None,
|
|
scriptSig: bytes = b"",
|
|
nSequence: int = 0,
|
|
):
|
|
if outpoint is None:
|
|
self.prevout = COutPoint()
|
|
else:
|
|
self.prevout = outpoint
|
|
self.scriptSig = scriptSig
|
|
self.nSequence = nSequence
|
|
|
|
def deserialize(self, f) -> None:
|
|
self.prevout = COutPoint()
|
|
self.prevout.deserialize(f)
|
|
self.scriptSig = deser_string(f)
|
|
self.nSequence = struct.unpack("<I", f.read(4))[0]
|
|
|
|
def serialize(self) -> bytes:
|
|
r = b""
|
|
r += self.prevout.serialize()
|
|
r += ser_string(self.scriptSig)
|
|
r += struct.pack("<I", self.nSequence)
|
|
return r
|
|
|
|
def __repr__(self) -> str:
|
|
return "CTxIn(prevout=%s scriptSig=%s nSequence=%i)" \
|
|
% (repr(self.prevout), self.scriptSig.hex(),
|
|
self.nSequence)
|
|
|
|
|
|
class CTxOut(object):
|
|
def __init__(self, nValue: int = 0, scriptPubKey: bytes = b""):
|
|
self.nValue = nValue
|
|
self.scriptPubKey = scriptPubKey
|
|
|
|
def deserialize(self, f) -> None:
|
|
self.nValue = struct.unpack("<q", f.read(8))[0]
|
|
self.scriptPubKey = deser_string(f)
|
|
|
|
def serialize(self) -> bytes:
|
|
r = b""
|
|
r += struct.pack("<q", self.nValue)
|
|
r += ser_string(self.scriptPubKey)
|
|
return r
|
|
|
|
def __repr__(self) -> str:
|
|
return "CTxOut(nValue=%i.%08i scriptPubKey=%s)" \
|
|
% (self.nValue // 100_000_000, self.nValue % 100_000_000, self.scriptPubKey.hex())
|
|
|
|
|
|
class CScriptWitness(object):
|
|
def __init__(self) -> None:
|
|
# stack is a vector of strings
|
|
self.stack: List[bytes] = []
|
|
|
|
def __repr__(self) -> str:
|
|
return "CScriptWitness(%s)" % \
|
|
(",".join([x.hex() for x in self.stack]))
|
|
|
|
def is_null(self) -> bool:
|
|
if self.stack:
|
|
return False
|
|
return True
|
|
|
|
|
|
class CTxInWitness(object):
|
|
def __init__(self) -> None:
|
|
self.scriptWitness = CScriptWitness()
|
|
|
|
def deserialize(self, f) -> None:
|
|
self.scriptWitness.stack = deser_string_vector(f)
|
|
|
|
def serialize(self) -> bytes:
|
|
return ser_string_vector(self.scriptWitness.stack)
|
|
|
|
def __repr__(self) -> str:
|
|
return repr(self.scriptWitness)
|
|
|
|
def is_null(self) -> bool:
|
|
return self.scriptWitness.is_null()
|
|
|
|
|
|
class CTxWitness(object):
|
|
def __init__(self) -> None:
|
|
self.vtxinwit: List[CTxInWitness] = []
|
|
|
|
def deserialize(self, f) -> None:
|
|
for i in range(len(self.vtxinwit)):
|
|
self.vtxinwit[i].deserialize(f)
|
|
|
|
def serialize(self) -> bytes:
|
|
r = b""
|
|
# This is different than the usual vector serialization --
|
|
# we omit the length of the vector, which is required to be
|
|
# the same length as the transaction's vin vector.
|
|
for x in self.vtxinwit:
|
|
r += x.serialize()
|
|
return r
|
|
|
|
def __repr__(self) -> str:
|
|
return "CTxWitness(%s)" % \
|
|
(';'.join([repr(x) for x in self.vtxinwit]))
|
|
|
|
def is_null(self) -> bool:
|
|
for x in self.vtxinwit:
|
|
if not x.is_null():
|
|
return False
|
|
return True
|
|
|
|
|
|
class CTransaction(object):
|
|
def __init__(self, tx: Optional['CTransaction'] = None) -> None:
|
|
if tx is None:
|
|
self.nVersion = 1
|
|
self.vin: List[CTxIn] = []
|
|
self.vout: List[CTxOut] = []
|
|
self.wit = CTxWitness()
|
|
self.nLockTime = 0
|
|
self.sha256: Optional[int] = None
|
|
self.hash: Optional[bytes] = None
|
|
else:
|
|
self.nVersion = tx.nVersion
|
|
self.vin = copy.deepcopy(tx.vin)
|
|
self.vout = copy.deepcopy(tx.vout)
|
|
self.nLockTime = tx.nLockTime
|
|
self.sha256 = tx.sha256
|
|
self.hash = tx.hash
|
|
self.wit = copy.deepcopy(tx.wit)
|
|
|
|
def deserialize(self, f) -> None:
|
|
self.nVersion = struct.unpack("<i", f.read(4))[0]
|
|
self.vin = deser_vector(f, CTxIn)
|
|
flags = 0
|
|
if len(self.vin) == 0:
|
|
flags = struct.unpack("<B", f.read(1))[0]
|
|
# Not sure why flags can't be zero, but this
|
|
# matches the implementation in bitcoind
|
|
if (flags != 0):
|
|
self.vin = deser_vector(f, CTxIn)
|
|
self.vout = deser_vector(f, CTxOut)
|
|
else:
|
|
self.vout = deser_vector(f, CTxOut)
|
|
if flags != 0:
|
|
self.wit.vtxinwit = [CTxInWitness() for i in range(len(self.vin))]
|
|
self.wit.deserialize(f)
|
|
self.nLockTime = struct.unpack("<I", f.read(4))[0]
|
|
self.sha256 = None
|
|
self.hash = None
|
|
|
|
def serialize_without_witness(self) -> bytes:
|
|
r = b""
|
|
r += struct.pack("<i", self.nVersion)
|
|
r += ser_vector(self.vin)
|
|
r += ser_vector(self.vout)
|
|
r += struct.pack("<I", self.nLockTime)
|
|
return r
|
|
|
|
# Only serialize with witness when explicitly called for
|
|
def serialize_with_witness(self) -> bytes:
|
|
flags = 0
|
|
if not self.wit.is_null():
|
|
flags |= 1
|
|
r = b""
|
|
r += struct.pack("<i", self.nVersion)
|
|
if flags:
|
|
r += ser_vector([])
|
|
r += struct.pack("<B", flags)
|
|
r += ser_vector(self.vin)
|
|
r += ser_vector(self.vout)
|
|
if flags & 1:
|
|
if (len(self.wit.vtxinwit) != len(self.vin)):
|
|
# vtxinwit must have the same length as vin
|
|
self.wit.vtxinwit = self.wit.vtxinwit[:len(self.vin)]
|
|
for _ in range(len(self.wit.vtxinwit), len(self.vin)):
|
|
self.wit.vtxinwit.append(CTxInWitness())
|
|
r += self.wit.serialize()
|
|
r += struct.pack("<I", self.nLockTime)
|
|
return r
|
|
|
|
# Regular serialization is without witness -- must explicitly
|
|
# call serialize_with_witness to include witness data.
|
|
def serialize(self) -> bytes:
|
|
return self.serialize_without_witness()
|
|
|
|
# Recalculate the txid (transaction hash without witness)
|
|
def rehash(self) -> None:
|
|
self.sha256 = None
|
|
self.calc_sha256()
|
|
|
|
# We will only cache the serialization without witness in
|
|
# self.sha256 and self.hash -- those are expected to be the txid.
|
|
def calc_sha256(self, with_witness: bool = False) -> Optional[int]:
|
|
if with_witness:
|
|
# Don't cache the result, just return it
|
|
return uint256_from_str(hash256(self.serialize_with_witness()))
|
|
|
|
if self.sha256 is None:
|
|
self.sha256 = uint256_from_str(hash256(self.serialize_without_witness()))
|
|
self.hash = hash256(self.serialize())
|
|
return None
|
|
|
|
def is_null(self) -> bool:
|
|
return len(self.vin) == 0 and len(self.vout) == 0
|
|
|
|
def txid(self):
|
|
# convenience
|
|
if self.sha256 is None:
|
|
self.calc_sha256()
|
|
return self.sha256.to_bytes(32, "big")
|
|
|
|
def __repr__(self) -> str:
|
|
return "CTransaction(nVersion=%i vin=%s vout=%s wit=%s nLockTime=%i)" \
|
|
% (self.nVersion, repr(self.vin), repr(self.vout), repr(self.wit), self.nLockTime)
|