Replace ckcc dependency with our own version of the library itself
This commit is contained in:
parent
f0c5aa7d3c
commit
f7d04ff2eb
10
hwilib/devices/ckcc/README.md
Normal file
10
hwilib/devices/ckcc/README.md
Normal file
@ -0,0 +1,10 @@
|
||||
# Coldcard Library
|
||||
|
||||
This is a stripped down and modified version of the official [ckcc-protocol](https://github.com/Coldcard/ckcc-protocol) library.
|
||||
|
||||
This stripped down version was made at commit [49fa0265df4c9d0d0d915ccd4dc41b06104d6738](https://github.com/Coldcard/ckcc-protocol/tree/49fa0265df4c9d0d0d915ccd4dc41b06104d6738).
|
||||
|
||||
## Changes
|
||||
|
||||
- Removed CLI
|
||||
- Removed pycoin dependency
|
||||
6
hwilib/devices/ckcc/__init__.py
Normal file
6
hwilib/devices/ckcc/__init__.py
Normal file
@ -0,0 +1,6 @@
|
||||
|
||||
__version__ = '0.7.2'
|
||||
|
||||
__all__ = [ "client", "protocol", "constants" ]
|
||||
|
||||
|
||||
383
hwilib/devices/ckcc/client.py
Normal file
383
hwilib/devices/ckcc/client.py
Normal file
@ -0,0 +1,383 @@
|
||||
#
|
||||
# client.py
|
||||
#
|
||||
# Implement the desktop side of our Coldcard USB protocol.
|
||||
#
|
||||
# If you would like to use a different EC/AES library, you may subclass
|
||||
# and override these member functions:
|
||||
#
|
||||
# - ec_mult, ec_setup, aes_setup, mitm_verify
|
||||
#
|
||||
import hid, sys, os
|
||||
from binascii import b2a_hex, a2b_hex
|
||||
from hashlib import sha256
|
||||
from .protocol import CCProtocolPacker, CCProtocolUnpacker, CCProtoError, MAX_MSG_LEN, MAX_BLK_LEN
|
||||
from .utils import decode_xpub, get_pubkey_string
|
||||
|
||||
# unofficial, unpermissioned... USB numbers
|
||||
COINKITE_VID = 0xd13e
|
||||
CKCC_PID = 0xcc10
|
||||
|
||||
# Unix domain socket used by the simulator
|
||||
CKCC_SIMULATOR_PATH = '/tmp/ckcc-simulator.sock'
|
||||
|
||||
class ColdcardDevice:
|
||||
def __init__(self, sn=None, dev=None, encrypt=True):
|
||||
# Establish connection via USB (HID) or Unix Pipe
|
||||
self.is_simulator = False
|
||||
|
||||
if not dev and sn and '/' in sn:
|
||||
dev = UnixSimulatorPipe(sn)
|
||||
found = 'simulator'
|
||||
self.is_simulator = True
|
||||
|
||||
if not dev:
|
||||
|
||||
for info in hid.enumerate(COINKITE_VID, CKCC_PID):
|
||||
found = info['serial_number']
|
||||
|
||||
if sn and sn != found:
|
||||
continue
|
||||
|
||||
# only one interface per device, so only one 'path'
|
||||
dev = hid.device(serial=found)
|
||||
assert dev, "failed to open: "+found
|
||||
dev.open_path(info['path'])
|
||||
|
||||
break
|
||||
|
||||
if not dev:
|
||||
raise KeyError("Could not find Coldcard!"
|
||||
if not sn else ('Cannot find CC with serial: '+sn))
|
||||
else:
|
||||
found = dev.get_serial_number_string()
|
||||
|
||||
self.dev = dev
|
||||
self.serial = found
|
||||
|
||||
# they will be defined after we've established a shared secret w/ device
|
||||
self.session_key = None
|
||||
self.encrypt_request = None
|
||||
self.decrypt_response = None
|
||||
self.master_xpub = None
|
||||
self.master_fingerprint = None
|
||||
|
||||
self.resync()
|
||||
|
||||
if encrypt:
|
||||
self.start_encryption()
|
||||
|
||||
def close(self):
|
||||
# close underlying HID device
|
||||
if self.dev:
|
||||
self.dev.close()
|
||||
self.dev = None
|
||||
|
||||
def resync(self):
|
||||
# flush anything already waiting on the EP
|
||||
while 1:
|
||||
junk = self.dev.read(64, timeout_ms=1)
|
||||
if not junk: break
|
||||
|
||||
# write a special packet, that encodes zero-length data, and last packet in sequence
|
||||
# prefix with 0x00 for "report number"
|
||||
self.dev.write(b'\x00\x80' + (b'\xff'*63))
|
||||
|
||||
# flush any response (perhaps error) waiting on the EP
|
||||
while 1:
|
||||
junk = self.dev.read(64, timeout_ms=1)
|
||||
if not junk: break
|
||||
|
||||
# check the above all worked
|
||||
err = self.dev.error()
|
||||
if err != '':
|
||||
raise RuntimeError('hidapi: '+err)
|
||||
|
||||
assert self.dev.get_serial_number_string() == self.serial
|
||||
|
||||
def send_recv(self, msg, expect_errors=False, verbose=0, timeout=1000, encrypt=True):
|
||||
# first byte of each 64-byte packet encodes length or packet-offset
|
||||
assert 4 <= len(msg) <= MAX_MSG_LEN, "msg length: %d" % len(msg)
|
||||
|
||||
if not self.encrypt_request:
|
||||
# disable encryption if not already enabled for this connection
|
||||
encrypt = False
|
||||
|
||||
if encrypt:
|
||||
msg = self.encrypt_request(msg)
|
||||
|
||||
left = len(msg)
|
||||
offset = 0
|
||||
while left > 0:
|
||||
# Note: first byte always zero (HID report number),
|
||||
# [1] is framing header (length+flags)
|
||||
# [2:65] payload (63 bytes, perhaps including padding)
|
||||
here = min(63, left)
|
||||
buf = bytearray(65)
|
||||
buf[2:2+here] = msg[offset:offset+here]
|
||||
if here == left:
|
||||
# final one in sequence
|
||||
buf[1] = here | 0x80 | (0x40 if encrypt else 0x00)
|
||||
else:
|
||||
# more will be coming
|
||||
buf[1] = here
|
||||
|
||||
assert len(buf) == 65
|
||||
|
||||
if verbose:
|
||||
print("Tx [%2d]: %s (0x%x)" % (here, b2a_hex(buf[1:]), buf[1]))
|
||||
|
||||
rv = self.dev.write(buf)
|
||||
assert rv == len(buf) == 65, repr(rv)
|
||||
|
||||
offset += here
|
||||
left -= here
|
||||
|
||||
# collect response, framed in the same manner
|
||||
resp = b''
|
||||
while 1:
|
||||
buf = self.dev.read(64, timeout_ms=(timeout or 0))
|
||||
|
||||
assert buf, "timeout reading USB EP"
|
||||
|
||||
# (trusting more than usual here)
|
||||
flag = buf[0]
|
||||
resp += bytes(buf[1:1+(flag & 0x3f)])
|
||||
if flag & 0x80:
|
||||
break
|
||||
|
||||
if flag & 0x40:
|
||||
if verbose:
|
||||
print('Enc response: %s' % b2a_hex(resp))
|
||||
|
||||
resp = self.decrypt_response(resp)
|
||||
|
||||
try:
|
||||
if verbose:
|
||||
print("Rx [%2d]: %r" % (len(resp), b2a_hex(bytes(resp))))
|
||||
|
||||
return CCProtocolUnpacker.decode(resp)
|
||||
except CCProtoError as e:
|
||||
if expect_errors: raise
|
||||
raise
|
||||
except:
|
||||
#print("Corrupt response: %r" % resp)
|
||||
raise
|
||||
|
||||
def ec_setup(self):
|
||||
# Provides the ECSDA primatives in portable way.
|
||||
# Needed to do D-H session key aggreement and then AES.
|
||||
# - should be replaced in subclasses if you have other EC libraries
|
||||
# - curve is always secp256k1
|
||||
# - values are binary strings
|
||||
# - write whatever you want onto self.
|
||||
|
||||
# - setup: return 65 of public key, and 16 bytes of AES IV
|
||||
# - second call: give the pubkey of far side, calculate the shared pt on curve
|
||||
from ecdsa.curves import SECP256k1
|
||||
from ecdsa import SigningKey
|
||||
|
||||
self.my_key = SigningKey.generate(curve=SECP256k1, hashfunc=sha256)
|
||||
pubkey = self.my_key.get_verifying_key().to_string()
|
||||
assert len(pubkey) == 64
|
||||
|
||||
#print("my pubkey = %s" % b2a_hex(pubkey))
|
||||
|
||||
return pubkey
|
||||
|
||||
def ec_mult(self, his_pubkey):
|
||||
# - second call: given the pubkey of far side, calculate the shared pt on curve
|
||||
# - creates session key based on that
|
||||
from ecdsa.curves import SECP256k1
|
||||
from ecdsa import VerifyingKey
|
||||
from ecdsa.util import number_to_string
|
||||
|
||||
# Validate his pubkey a little: this call will check it's on the curve.
|
||||
assert len(his_pubkey) == 64
|
||||
his_pubkey = VerifyingKey.from_string(his_pubkey, curve=SECP256k1, hashfunc=sha256)
|
||||
|
||||
#print("his pubkey = %s" % b2a_hex(his_pubkey.to_string()))
|
||||
|
||||
# do the D-H thing
|
||||
pt = self.my_key.privkey.secret_multiplier * his_pubkey.pubkey.point
|
||||
|
||||
# final key is sha256 of that point, serialized (64 bytes).
|
||||
order = SECP256k1.order
|
||||
kk = number_to_string(pt.x(), order) + number_to_string(pt.y(), order)
|
||||
|
||||
del self.my_key
|
||||
|
||||
return sha256(kk).digest()
|
||||
|
||||
def aes_setup(self, session_key):
|
||||
# Load keys and define encrypt/decrypt functions
|
||||
# - for CTR mode, we have different counters in each direction, so need two instances
|
||||
# - count must start at zero, and increment in LSB for each block.
|
||||
import pyaes
|
||||
|
||||
self.encrypt_request = pyaes.AESModeOfOperationCTR(session_key, pyaes.Counter(0)).decrypt
|
||||
self.decrypt_response = pyaes.AESModeOfOperationCTR(session_key, pyaes.Counter(0)).encrypt
|
||||
|
||||
def start_encryption(self):
|
||||
# setup encryption on the link
|
||||
# - pick our own key pair, IV for AES
|
||||
# - send IV and pubkey to device
|
||||
# - it replies with own pubkey
|
||||
# - determine what the session key was/is
|
||||
|
||||
pubkey = self.ec_setup()
|
||||
|
||||
msg = CCProtocolPacker.encrypt_start(pubkey)
|
||||
|
||||
his_pubkey, fingerprint, xpub = self.send_recv(msg, encrypt=False)
|
||||
|
||||
self.session_key = self.ec_mult(his_pubkey)
|
||||
|
||||
# capture some public details of remote side's master key
|
||||
# - these can be empty/0x0 when no secrets on device yet
|
||||
self.master_xpub = str(xpub, 'ascii')
|
||||
self.master_fingerprint = fingerprint
|
||||
|
||||
#print('sess key = %s' % b2a_hex(self.session_key))
|
||||
self.aes_setup(self.session_key)
|
||||
|
||||
def mitm_verify(self, sig, expected_xpub):
|
||||
# First try with Pycoin
|
||||
try:
|
||||
from pycoin.key.BIP32Node import BIP32Node
|
||||
from pycoin.contrib.msg_signing import verify_message
|
||||
from pycoin.encoding import from_bytes_32
|
||||
from base64 import b64encode
|
||||
|
||||
mk = BIP32Node.from_wallet_key(expected_xpub)
|
||||
return verify_message(mk, b64encode(sig), msg_hash=from_bytes_32(self.session_key))
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# If Pycoin is not available, do it using ecdsa
|
||||
from ecdsa import BadSignatureError, SECP256k1, VerifyingKey
|
||||
pubkey, chaincode = decode_xpub(expected_xpub)
|
||||
vk = VerifyingKey.from_string(get_pubkey_string(pubkey), curve=SECP256k1)
|
||||
try:
|
||||
ok = vk.verify_digest(sig[1:], self.session_key)
|
||||
except BadSignatureError:
|
||||
ok = False
|
||||
|
||||
return ok
|
||||
|
||||
def check_mitm(self, expected_xpub=None, sig=None):
|
||||
# Optional? verification against MiTM attack:
|
||||
# Using the master xpub, check a signature over the session public key, to
|
||||
# verify we talking directly to the real Coldcard (no active MitM between us).
|
||||
# - message is just the session key itself; no digests or prefixes
|
||||
# - no need for this unless concerned about *active* mitm on USB bus
|
||||
# - passive attackers (snoopers) will get nothing anyway, thanks to diffie-helman sauce
|
||||
# - unfortunately might be too slow to do everytime?
|
||||
|
||||
xp = expected_xpub or self.master_xpub
|
||||
assert xp, "device doesn't have any secrets yet"
|
||||
assert self.session_key, "connection not yet in encrypted mode"
|
||||
|
||||
# this request is delibrately slow on the device side
|
||||
if not sig:
|
||||
sig = self.send_recv(CCProtocolPacker.check_mitm(), timeout=5000)
|
||||
|
||||
assert len(sig) == 65
|
||||
|
||||
ok = self.mitm_verify(sig, xp)
|
||||
|
||||
if ok != True:
|
||||
raise RuntimeError("Possible active MiTM attack in progress! Incorrect signature.")
|
||||
|
||||
def upload_file(self, data, verify=True, blksize=1024):
|
||||
# upload a single file, up to 1MB? in size. Can check arrives ok.
|
||||
chk = sha256(data).digest()
|
||||
|
||||
for i in range(0, len(data), blksize):
|
||||
here = data[i:i+blksize]
|
||||
pos = self.send_recv(CCProtocolPacker.upload(i, len(data), here))
|
||||
assert pos == i
|
||||
|
||||
if verify:
|
||||
rb = self.send_recv(CCProtocolPacker.sha256())
|
||||
if rb != chk:
|
||||
raise RuntimeError('Checksum wrong during file upload')
|
||||
|
||||
return len(data), chk
|
||||
|
||||
def download_file(self, length, checksum, blksize=1024, file_number=1):
|
||||
# Download a single file, when you already know it's checksum. Will check arrives ok.
|
||||
data = b''
|
||||
chk = sha256()
|
||||
|
||||
pos = 0
|
||||
while pos < length:
|
||||
here = self.send_recv(CCProtocolPacker.download(pos, min(blksize, length-pos), file_number))
|
||||
data += here
|
||||
chk.update(here)
|
||||
pos += len(here)
|
||||
assert len(here) > 0
|
||||
|
||||
if chk.digest() != checksum:
|
||||
raise RuntimeError('Checksum wrong during file download')
|
||||
|
||||
return data
|
||||
|
||||
|
||||
class UnixSimulatorPipe:
|
||||
# Use a UNIX pipe to the simulator instead of a real USB connection.
|
||||
# - emulates the API of hidapi device object.
|
||||
|
||||
def __init__(self, path):
|
||||
import socket, atexit
|
||||
self.pipe = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
||||
try:
|
||||
self.pipe.connect(path)
|
||||
except FileNotFoundError:
|
||||
raise RuntimeError("Cannot connect to simulator. Is it running?")
|
||||
|
||||
instance = 0
|
||||
while instance < 10:
|
||||
pn = '/tmp/ckcc-client-%d-%d.sock' % (os.getpid(), instance)
|
||||
try:
|
||||
self.pipe.bind(pn) # just needs any name
|
||||
break
|
||||
except OSError:
|
||||
instance += 1
|
||||
continue
|
||||
|
||||
self.pipe_name = pn
|
||||
atexit.register(self.close)
|
||||
|
||||
def read(self, max_count, timeout_ms=None):
|
||||
import socket
|
||||
if not timeout_ms:
|
||||
self.pipe.settimeout(None)
|
||||
else:
|
||||
self.pipe.settimeout(timeout_ms / 1000.0)
|
||||
|
||||
try:
|
||||
return self.pipe.recv(max_count)
|
||||
except socket.timeout:
|
||||
return None
|
||||
|
||||
def write(self, buf):
|
||||
assert len(buf) == 65
|
||||
self.pipe.settimeout(10)
|
||||
rv = self.pipe.send(buf[1:])
|
||||
return 65 if rv == 64 else rv
|
||||
|
||||
def error(self):
|
||||
return ''
|
||||
|
||||
def close(self):
|
||||
self.pipe.close()
|
||||
try:
|
||||
os.unlink(self.pipe_name)
|
||||
except: pass
|
||||
|
||||
def get_serial_number_string(self):
|
||||
return 'simulator'
|
||||
|
||||
|
||||
# EOF
|
||||
69
hwilib/devices/ckcc/constants.py
Normal file
69
hwilib/devices/ckcc/constants.py
Normal file
@ -0,0 +1,69 @@
|
||||
#
|
||||
# Constants and various "limits" shared between embedded and desktop USB protocol
|
||||
#
|
||||
try:
|
||||
from micropython import const
|
||||
except ImportError:
|
||||
const = int
|
||||
|
||||
# For upload/download this is the max size of the data block.
|
||||
MAX_BLK_LEN = const(2048)
|
||||
|
||||
# Max total message length, excluding framing overhead (1 byte per 64).
|
||||
# - includes args for upload command
|
||||
MAX_MSG_LEN = const(4+4+4+MAX_BLK_LEN)
|
||||
|
||||
# Max PSBT txn we support (384k bytes as PSBT)
|
||||
# - the max on the wire for mainnet is 100k
|
||||
# - but a PSBT might contain a full txn for each input
|
||||
MAX_TXN_LEN = const(384*1024)
|
||||
|
||||
# Max size of any upload (firmware.dfu files in particular)
|
||||
MAX_UPLOAD_LEN = const(2*MAX_TXN_LEN)
|
||||
|
||||
# Max length of text messages for signing
|
||||
MSG_SIGNING_MAX_LENGTH = const(240)
|
||||
|
||||
# Bit values for address types
|
||||
AFC_PUBKEY = const(0x01) # pay to hash of pubkey
|
||||
AFC_SEGWIT = const(0x02) # requires a witness to spend
|
||||
AFC_BECH32 = const(0x04) # just how we're encoding it?
|
||||
AFC_SCRIPT = const(0x08) # paying into a script
|
||||
AFC_WRAPPED = const(0x10) # for transition/compat types for segwit vs. old
|
||||
|
||||
# Numeric codes for specific address types
|
||||
AF_CLASSIC = AFC_PUBKEY # 1addr
|
||||
AF_P2SH = AFC_SCRIPT # classic multisig / simple P2SH / 3hash
|
||||
AF_P2WPKH = AFC_PUBKEY | AFC_SEGWIT | AFC_BECH32 # bc1qsdklfj
|
||||
AF_P2WSH = AFC_SCRIPT | AFC_SEGWIT | AFC_BECH32 # segwit multisig
|
||||
AF_P2WPKH_P2SH = AFC_WRAPPED | AFC_PUBKEY | AFC_SEGWIT # looks classic P2SH, but p2wpkh inside
|
||||
AF_P2WSH_P2SH = AFC_WRAPPED | AFC_SCRIPT | AFC_SEGWIT # looks classic P2SH, segwit multisig
|
||||
|
||||
SUPPORTED_ADDR_FORMATS = frozenset([
|
||||
AF_CLASSIC,
|
||||
AF_P2SH,
|
||||
AF_P2WPKH,
|
||||
AF_P2WSH,
|
||||
AF_P2WPKH_P2SH,
|
||||
AF_P2WSH_P2SH,
|
||||
])
|
||||
|
||||
# BIP-174 aka PSBT defined values
|
||||
#
|
||||
PSBT_GLOBAL_UNSIGNED_TX = const(0)
|
||||
|
||||
PSBT_IN_NON_WITNESS_UTXO = const(0)
|
||||
PSBT_IN_WITNESS_UTXO = const(1)
|
||||
PSBT_IN_PARTIAL_SIG = const(2)
|
||||
PSBT_IN_SIGHASH_TYPE = const(3)
|
||||
PSBT_IN_REDEEM_SCRIPT = const(4)
|
||||
PSBT_IN_WITNESS_SCRIPT = const(5)
|
||||
PSBT_IN_BIP32_DERIVATION = const(6)
|
||||
PSBT_IN_FINAL_SCRIPTSIG = const(7)
|
||||
PSBT_IN_FINAL_SCRIPTWITNESS = const(8)
|
||||
|
||||
PSBT_OUT_REDEEM_SCRIPT = const(0)
|
||||
PSBT_OUT_WITNESS_SCRIPT = const(1)
|
||||
PSBT_OUT_BIP32_DERIVATION = const(2)
|
||||
|
||||
# EOF
|
||||
202
hwilib/devices/ckcc/protocol.py
Normal file
202
hwilib/devices/ckcc/protocol.py
Normal file
@ -0,0 +1,202 @@
|
||||
#
|
||||
# Details of our USB level protocol. Shared file between desktop and embedded.
|
||||
#
|
||||
# - first 4 bytes of all messages is the command code or response code
|
||||
# - use <I and <H, never >H
|
||||
#
|
||||
from struct import pack, unpack_from
|
||||
from .constants import *
|
||||
|
||||
class CCProtoError(RuntimeError):
|
||||
def __str__(self):
|
||||
return self.args[0]
|
||||
|
||||
class CCUserRefused(RuntimeError):
|
||||
def __str__(self):
|
||||
return 'You refused permission to do the operation'
|
||||
|
||||
class CCBusyError(RuntimeError):
|
||||
def __str__(self):
|
||||
return 'Coldcard is handling another request right now'
|
||||
|
||||
class CCProtocolPacker:
|
||||
# returns a lamba that will take correct args
|
||||
# and then give you a binary string to encode the
|
||||
# request
|
||||
|
||||
@staticmethod
|
||||
def logout():
|
||||
return pack('4s', b'logo')
|
||||
|
||||
@staticmethod
|
||||
def reboot():
|
||||
return pack('4s', b'rebo')
|
||||
|
||||
@staticmethod
|
||||
def version():
|
||||
# returns a string, with newline separators
|
||||
return pack('4s', b'vers')
|
||||
|
||||
@staticmethod
|
||||
def ping(msg):
|
||||
# returns whatever binary you give it
|
||||
return b'ping' + bytes(msg)
|
||||
|
||||
@staticmethod
|
||||
def check_mitm():
|
||||
return b'mitm'
|
||||
|
||||
@staticmethod
|
||||
def start_backup():
|
||||
# prompts user with password for encrytped backup
|
||||
return b'back'
|
||||
|
||||
@staticmethod
|
||||
def encrypt_start(device_pubkey, version=0x1):
|
||||
assert len(device_pubkey) == 64, "want uncompressed 64-byte pubkey, no prefix byte"
|
||||
return pack('<4sI64s', b'ncry', version, device_pubkey)
|
||||
|
||||
@staticmethod
|
||||
def upload(offset, total_size, data):
|
||||
# note: see MAX_MSG_LEN above
|
||||
assert len(data) <= MAX_MSG_LEN, 'badlen'
|
||||
return pack('<4sII', b'upld', offset, total_size) + data
|
||||
|
||||
@staticmethod
|
||||
def download(offset, length, file_number=0):
|
||||
assert 0 <= file_number < 2
|
||||
return pack('<4sIII', b'dwld', offset, length, file_number)
|
||||
|
||||
@staticmethod
|
||||
def sha256():
|
||||
return b'sha2'
|
||||
|
||||
@staticmethod
|
||||
def sign_transaction(length, file_sha, finalize=False):
|
||||
# must have already uploaded binary, and give expected sha256
|
||||
assert len(file_sha) == 32
|
||||
return pack('<4sII32s', b'stxn', length, int(finalize), file_sha)
|
||||
|
||||
@staticmethod
|
||||
def sign_message(raw_msg, subpath='m', addr_fmt=AF_CLASSIC):
|
||||
# only begins user interaction
|
||||
return pack('<4sIII', b'smsg', addr_fmt, len(subpath), len(raw_msg)) \
|
||||
+ subpath.encode('ascii') + raw_msg
|
||||
|
||||
@staticmethod
|
||||
def get_signed_msg():
|
||||
# poll completion/results of message signing
|
||||
return b'smok'
|
||||
|
||||
@staticmethod
|
||||
def get_backup_file():
|
||||
# poll completion/results of backup
|
||||
return b'bkok'
|
||||
|
||||
@staticmethod
|
||||
def get_signed_txn():
|
||||
# poll completion/results of transaction signing
|
||||
return b'stok'
|
||||
|
||||
@staticmethod
|
||||
def get_xpub(subpath='m'):
|
||||
# takes a string, like: m/44'/0'/23/23
|
||||
return b'xpub' + subpath.encode('ascii')
|
||||
|
||||
@staticmethod
|
||||
def show_address(subpath, addr_fmt=AF_CLASSIC):
|
||||
# takes a string, like: m/44'/0'/23/23
|
||||
# shows on screen, no feedback from user expected
|
||||
return pack('<4sI', b'show', addr_fmt) + subpath.encode('ascii')
|
||||
|
||||
@staticmethod
|
||||
def sim_keypress(key):
|
||||
# Simulator ONLY: pretend a key is pressed
|
||||
return b'XKEY' + key
|
||||
|
||||
@staticmethod
|
||||
def bag_number(new_number=b''):
|
||||
# one time only: put into bag, or readback bag
|
||||
return b'bagi' + bytes(new_number)
|
||||
|
||||
|
||||
class CCProtocolUnpacker:
|
||||
# Take a binary response, and turn it into a python object
|
||||
# - we support a number of signatures, and expand as needed
|
||||
# - some will be general-purpose, but others can be very specific to one command
|
||||
# - given full rx message to work from
|
||||
# - this is done after un-framing
|
||||
|
||||
@classmethod
|
||||
def decode(cls, msg):
|
||||
assert len(msg) >= 4
|
||||
sign = str(msg[0:4], 'utf8', 'ignore')
|
||||
|
||||
d = getattr(cls, sign, cls)
|
||||
if d is cls:
|
||||
raise CCProtoError('Unknown response signature: ' + repr(sign))
|
||||
|
||||
return d(msg)
|
||||
|
||||
|
||||
# struct info for each response
|
||||
|
||||
def okay(msg):
|
||||
# trivial response, w/ no content
|
||||
assert len(msg) == 4
|
||||
return None
|
||||
|
||||
# low-level errors
|
||||
def fram(msg):
|
||||
raise CCProtoError("Framing Error", str(msg[4:], 'utf8'))
|
||||
def err_(msg):
|
||||
raise CCProtoError("Remote Error: " + str(msg[4:], 'utf8', 'ignore'), msg[4:])
|
||||
|
||||
def refu(msg):
|
||||
# user didn't want to approve something
|
||||
raise CCUserRefused()
|
||||
|
||||
def busy(msg):
|
||||
# user didn't want to approve something
|
||||
raise CCBusyError()
|
||||
|
||||
def biny(msg):
|
||||
# binary string: length implied by msg framing
|
||||
return msg[4:]
|
||||
|
||||
def int1(msg):
|
||||
return unpack_from('<I', msg, 4)[0]
|
||||
|
||||
def int2(msg):
|
||||
return unpack_from('<2I', msg, 4)
|
||||
|
||||
def int3(msg):
|
||||
return unpack_from('<3I', msg, 4)
|
||||
|
||||
def mypb(msg):
|
||||
# response to "ncry" command:
|
||||
# - the (uncompressed) pubkey of the Coldcard
|
||||
# - info about master key: xpub, fingerprint of that
|
||||
# - anti-MitM: remote xpub
|
||||
# session key is SHA256(point on sec256pk1 in binary) via D-H
|
||||
dev_pubkey, fingerprint, xpub_len = unpack_from('64sII', msg, 4)
|
||||
xpub = msg[-xpub_len:] if xpub_len else b''
|
||||
return dev_pubkey, fingerprint, xpub
|
||||
|
||||
def asci(msg):
|
||||
# hex/base58 string or other for-computers string, which isn't international
|
||||
return msg[4:].decode('ascii')
|
||||
|
||||
def smrx(msg):
|
||||
# message signing result. application specific!
|
||||
# returns actual address used (text), and raw binary signature (65 bytes)
|
||||
aln = unpack_from('<I', msg, 4)[0]
|
||||
return msg[8:aln+8].decode('ascii'), msg[8+aln:]
|
||||
|
||||
def strx(msg):
|
||||
# txn signing result, or other file operation. application specific!
|
||||
# returns length of resulting PSBT and it's sha256
|
||||
ln, sha = unpack_from('<I32s', msg, 4)
|
||||
return ln, sha
|
||||
|
||||
# EOF
|
||||
45
hwilib/devices/ckcc/sigheader.py
Normal file
45
hwilib/devices/ckcc/sigheader.py
Normal file
@ -0,0 +1,45 @@
|
||||
# Autogen'ed file, don't edit. See bootloader/mk-sigheader.h for original
|
||||
|
||||
|
||||
# Our simple firmware header.
|
||||
# Although called a header, this data is placed into the middle of the binary.
|
||||
# It is located at start of firmware + 16k - sizeof(heaer). This is a gap unused in normal
|
||||
# micropython layout. Exactly the last 64 bytes (signature) should be left out of
|
||||
# the checksum. We do checksum areas beyond the end of the last byte of firmware (up to length)
|
||||
# and expect those regions to be unprogrammed flash (ones).
|
||||
# - timestamp must increase with each upgrade (downgrade protection)
|
||||
# - version_string is for humans only
|
||||
# - pubkey_num indicates which pubkey was used for signature
|
||||
# - firmware_length, must be:
|
||||
# - bigger than minimum length, less than max
|
||||
# - 512-byte aligned
|
||||
# - bootloader assumes the flash filesystem (FAT FS) follows the firmware.
|
||||
# - this C header file is somewhat parsed and used by python signature-adding code
|
||||
# - timestamp is YYMMDDHHMMSS0000 in BCD
|
||||
|
||||
|
||||
FW_HEADER_SIZE = 128
|
||||
FW_HEADER_OFFSET = (0x4000-FW_HEADER_SIZE)
|
||||
|
||||
FW_HEADER_MAGIC = 0xCC001234
|
||||
|
||||
# arbitrary min size
|
||||
FW_MIN_LENGTH = (256*1024)
|
||||
# absolute max: 1MB flash - 32k for bootloader
|
||||
# practical limit for our-protocol USB upgrades: 786432 (or else settings damaged)
|
||||
FW_MAX_LENGTH = (0x100000 - 0x8000)
|
||||
|
||||
# Arguments to be used w/ python's struct module.
|
||||
FWH_PY_FORMAT = "<I8s8sII36s64s"
|
||||
FWH_PY_VALUES = "magic_value timestamp version_string pubkey_num firmware_length future signature"
|
||||
FWH_NUM_FUTURE = 9
|
||||
FWH_PK_NUM_OFFSET = 20
|
||||
|
||||
# There is a copy of the header at this location in RAM, copied by bootloader
|
||||
# **after** it has been verified. Cannot write to this area, or you will be reset!
|
||||
RAM_HEADER_BASE = 0x10007c20
|
||||
|
||||
# Original copy of header, as recorded in flash/firmware file.
|
||||
FLASH_HEADER_BASE = 0x0800bf80
|
||||
|
||||
# EOF
|
||||
88
hwilib/devices/ckcc/utils.py
Normal file
88
hwilib/devices/ckcc/utils.py
Normal file
@ -0,0 +1,88 @@
|
||||
import struct
|
||||
import binascii
|
||||
from collections import namedtuple
|
||||
|
||||
def dfu_parse(fd):
|
||||
# do just a little parsing of DFU headers, to find start/length of main binary
|
||||
# - not trying to support anything but what ../stm32/Makefile will generate
|
||||
# - see external/micropython/tools/pydfu.py for details
|
||||
# - works sequentially only
|
||||
fd.seek(0)
|
||||
|
||||
def consume(xfd, tname, fmt, names):
|
||||
# Parses the struct defined by `fmt` from `data`, stores the parsed fields
|
||||
# into a named tuple using `names`. Returns the named tuple.
|
||||
size = struct.calcsize(fmt)
|
||||
here = xfd.read(size)
|
||||
ty = namedtuple(tname, names.split())
|
||||
values = struct.unpack(fmt, here)
|
||||
return ty(*values)
|
||||
|
||||
dfu_prefix = consume(fd, 'DFU', '<5sBIB', 'signature version size targets')
|
||||
|
||||
#print('dfu: ' + repr(dfu_prefix))
|
||||
|
||||
assert dfu_prefix.signature == b'DfuSe', "Not a DFU file (bad magic)"
|
||||
|
||||
for idx in range(dfu_prefix.targets):
|
||||
|
||||
prefix = consume(fd, 'Target', '<6sBI255s2I',
|
||||
'signature altsetting named name size elements')
|
||||
|
||||
#print("target%d: %r" % (idx, prefix))
|
||||
|
||||
for ei in range(prefix.elements):
|
||||
# Decode target prefix
|
||||
# < little endian
|
||||
# I uint32_t element address
|
||||
# I uint32_t element size
|
||||
elem = consume(fd, 'Element', '<2I', 'addr size')
|
||||
|
||||
#print("target%d: %r" % (ei, elem))
|
||||
|
||||
# assume bootloader at least 32k, and targeting flash.
|
||||
assert elem.addr >= 0x8008000, "Bad address?"
|
||||
|
||||
yield fd.tell()
|
||||
yield elem.size
|
||||
|
||||
# Adapted from https://github.com/petertodd/python-bitcoinlib/blob/master/bitcoin/base58.py
|
||||
def decode_xpub(s):
|
||||
assert s[1:].startswith('pub')
|
||||
b58_digits = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
|
||||
|
||||
# Convert the string to an integer
|
||||
n = 0
|
||||
for c in s:
|
||||
n *= 58
|
||||
if c not in b58_digits:
|
||||
raise ValueError('Character %r is not a valid base58 character' % c)
|
||||
digit = b58_digits.index(c)
|
||||
n += digit
|
||||
|
||||
# Convert the integer to bytes
|
||||
h = '%x' % n
|
||||
if len(h) % 2:
|
||||
h = '0' + h
|
||||
res = binascii.unhexlify(h.encode('utf8'))
|
||||
|
||||
# Add padding back.
|
||||
pad = 0
|
||||
for c in s[:-1]:
|
||||
if c == b58_digits[0]: pad += 1
|
||||
else: break
|
||||
decoded = b'\x00' * pad + res
|
||||
|
||||
# Get the pubkey and chaincode
|
||||
return decoded[-37:-4], decoded[-69:-37]
|
||||
|
||||
def get_pubkey_string(b):
|
||||
p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F
|
||||
|
||||
x = int.from_bytes(b[1:], byteorder="big")
|
||||
y = pow((x*x*x + 7) % p, (p + 1) // 4, p)
|
||||
if (y & 1 != b[0] & 1):
|
||||
y = p - y
|
||||
return x.to_bytes(32, byteorder="big") + y.to_bytes(32, byteorder="big")
|
||||
|
||||
# EOF
|
||||
@ -2,9 +2,9 @@
|
||||
|
||||
from ..hwwclient import HardwareWalletClient
|
||||
from ..errors import ActionCanceledError, BadArgumentError, DeviceBusyError, UnavailableActionError, DeviceFailureError
|
||||
from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID
|
||||
from ckcc.protocol import CCProtocolPacker, CCBusyError, CCProtoError, CCUserRefused
|
||||
from ckcc.constants import MAX_BLK_LEN, AF_P2WPKH, AF_CLASSIC, AF_P2WPKH_P2SH
|
||||
from .ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID
|
||||
from .ckcc.protocol import CCProtocolPacker, CCBusyError, CCProtoError, CCUserRefused
|
||||
from .ckcc.constants import MAX_BLK_LEN, AF_P2WPKH, AF_CLASSIC, AF_P2WPKH_P2SH
|
||||
from ..base58 import xpub_main_2_test, get_xpub_fingerprint_hex
|
||||
from hashlib import sha256
|
||||
|
||||
|
||||
1
setup.py
1
setup.py
@ -16,7 +16,6 @@ setuptools.setup(
|
||||
install_requires=[
|
||||
'hidapi', # HID API needed in general
|
||||
'btchip-python', # Ledger Nano S
|
||||
'ckcc-protocol[cli]', # Coldcard
|
||||
'pyaes',
|
||||
'ecdsa', # Needed for Ledger but their library does not install it
|
||||
'typing_extensions>=3.7',
|
||||
|
||||
@ -8,8 +8,8 @@ import time
|
||||
import unittest
|
||||
|
||||
from hwilib.cli import process_commands
|
||||
from ckcc.protocol import CCProtocolPacker
|
||||
from ckcc.client import ColdcardDevice
|
||||
from hwilib.devices.ckcc.protocol import CCProtocolPacker
|
||||
from hwilib.devices.ckcc.client import ColdcardDevice
|
||||
from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignMessage, TestSignTx
|
||||
|
||||
def coldcard_test_suite(simulator, rpc, userpass):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user