diff --git a/hwilib/devices/ckcc/README.md b/hwilib/devices/ckcc/README.md new file mode 100644 index 0000000..bfab35b --- /dev/null +++ b/hwilib/devices/ckcc/README.md @@ -0,0 +1,10 @@ +# Coldcard Library + +This is a stripped down and modified version of the official [ckcc-protocol](https://github.com/Coldcard/ckcc-protocol) library. + +This stripped down version was made at commit [49fa0265df4c9d0d0d915ccd4dc41b06104d6738](https://github.com/Coldcard/ckcc-protocol/tree/49fa0265df4c9d0d0d915ccd4dc41b06104d6738). + +## Changes + +- Removed CLI +- Removed pycoin dependency diff --git a/hwilib/devices/ckcc/__init__.py b/hwilib/devices/ckcc/__init__.py new file mode 100644 index 0000000..c016e76 --- /dev/null +++ b/hwilib/devices/ckcc/__init__.py @@ -0,0 +1,6 @@ + +__version__ = '0.7.2' + +__all__ = [ "client", "protocol", "constants" ] + + diff --git a/hwilib/devices/ckcc/client.py b/hwilib/devices/ckcc/client.py new file mode 100644 index 0000000..f550e51 --- /dev/null +++ b/hwilib/devices/ckcc/client.py @@ -0,0 +1,383 @@ +# +# client.py +# +# Implement the desktop side of our Coldcard USB protocol. +# +# If you would like to use a different EC/AES library, you may subclass +# and override these member functions: +# +# - ec_mult, ec_setup, aes_setup, mitm_verify +# +import hid, sys, os +from binascii import b2a_hex, a2b_hex +from hashlib import sha256 +from .protocol import CCProtocolPacker, CCProtocolUnpacker, CCProtoError, MAX_MSG_LEN, MAX_BLK_LEN +from .utils import decode_xpub, get_pubkey_string + +# unofficial, unpermissioned... USB numbers +COINKITE_VID = 0xd13e +CKCC_PID = 0xcc10 + +# Unix domain socket used by the simulator +CKCC_SIMULATOR_PATH = '/tmp/ckcc-simulator.sock' + +class ColdcardDevice: + def __init__(self, sn=None, dev=None, encrypt=True): + # Establish connection via USB (HID) or Unix Pipe + self.is_simulator = False + + if not dev and sn and '/' in sn: + dev = UnixSimulatorPipe(sn) + found = 'simulator' + self.is_simulator = True + + if not dev: + + for info in hid.enumerate(COINKITE_VID, CKCC_PID): + found = info['serial_number'] + + if sn and sn != found: + continue + + # only one interface per device, so only one 'path' + dev = hid.device(serial=found) + assert dev, "failed to open: "+found + dev.open_path(info['path']) + + break + + if not dev: + raise KeyError("Could not find Coldcard!" + if not sn else ('Cannot find CC with serial: '+sn)) + else: + found = dev.get_serial_number_string() + + self.dev = dev + self.serial = found + + # they will be defined after we've established a shared secret w/ device + self.session_key = None + self.encrypt_request = None + self.decrypt_response = None + self.master_xpub = None + self.master_fingerprint = None + + self.resync() + + if encrypt: + self.start_encryption() + + def close(self): + # close underlying HID device + if self.dev: + self.dev.close() + self.dev = None + + def resync(self): + # flush anything already waiting on the EP + while 1: + junk = self.dev.read(64, timeout_ms=1) + if not junk: break + + # write a special packet, that encodes zero-length data, and last packet in sequence + # prefix with 0x00 for "report number" + self.dev.write(b'\x00\x80' + (b'\xff'*63)) + + # flush any response (perhaps error) waiting on the EP + while 1: + junk = self.dev.read(64, timeout_ms=1) + if not junk: break + + # check the above all worked + err = self.dev.error() + if err != '': + raise RuntimeError('hidapi: '+err) + + assert self.dev.get_serial_number_string() == self.serial + + def send_recv(self, msg, expect_errors=False, verbose=0, timeout=1000, encrypt=True): + # first byte of each 64-byte packet encodes length or packet-offset + assert 4 <= len(msg) <= MAX_MSG_LEN, "msg length: %d" % len(msg) + + if not self.encrypt_request: + # disable encryption if not already enabled for this connection + encrypt = False + + if encrypt: + msg = self.encrypt_request(msg) + + left = len(msg) + offset = 0 + while left > 0: + # Note: first byte always zero (HID report number), + # [1] is framing header (length+flags) + # [2:65] payload (63 bytes, perhaps including padding) + here = min(63, left) + buf = bytearray(65) + buf[2:2+here] = msg[offset:offset+here] + if here == left: + # final one in sequence + buf[1] = here | 0x80 | (0x40 if encrypt else 0x00) + else: + # more will be coming + buf[1] = here + + assert len(buf) == 65 + + if verbose: + print("Tx [%2d]: %s (0x%x)" % (here, b2a_hex(buf[1:]), buf[1])) + + rv = self.dev.write(buf) + assert rv == len(buf) == 65, repr(rv) + + offset += here + left -= here + + # collect response, framed in the same manner + resp = b'' + while 1: + buf = self.dev.read(64, timeout_ms=(timeout or 0)) + + assert buf, "timeout reading USB EP" + + # (trusting more than usual here) + flag = buf[0] + resp += bytes(buf[1:1+(flag & 0x3f)]) + if flag & 0x80: + break + + if flag & 0x40: + if verbose: + print('Enc response: %s' % b2a_hex(resp)) + + resp = self.decrypt_response(resp) + + try: + if verbose: + print("Rx [%2d]: %r" % (len(resp), b2a_hex(bytes(resp)))) + + return CCProtocolUnpacker.decode(resp) + except CCProtoError as e: + if expect_errors: raise + raise + except: + #print("Corrupt response: %r" % resp) + raise + + def ec_setup(self): + # Provides the ECSDA primatives in portable way. + # Needed to do D-H session key aggreement and then AES. + # - should be replaced in subclasses if you have other EC libraries + # - curve is always secp256k1 + # - values are binary strings + # - write whatever you want onto self. + + # - setup: return 65 of public key, and 16 bytes of AES IV + # - second call: give the pubkey of far side, calculate the shared pt on curve + from ecdsa.curves import SECP256k1 + from ecdsa import SigningKey + + self.my_key = SigningKey.generate(curve=SECP256k1, hashfunc=sha256) + pubkey = self.my_key.get_verifying_key().to_string() + assert len(pubkey) == 64 + + #print("my pubkey = %s" % b2a_hex(pubkey)) + + return pubkey + + def ec_mult(self, his_pubkey): + # - second call: given the pubkey of far side, calculate the shared pt on curve + # - creates session key based on that + from ecdsa.curves import SECP256k1 + from ecdsa import VerifyingKey + from ecdsa.util import number_to_string + + # Validate his pubkey a little: this call will check it's on the curve. + assert len(his_pubkey) == 64 + his_pubkey = VerifyingKey.from_string(his_pubkey, curve=SECP256k1, hashfunc=sha256) + + #print("his pubkey = %s" % b2a_hex(his_pubkey.to_string())) + + # do the D-H thing + pt = self.my_key.privkey.secret_multiplier * his_pubkey.pubkey.point + + # final key is sha256 of that point, serialized (64 bytes). + order = SECP256k1.order + kk = number_to_string(pt.x(), order) + number_to_string(pt.y(), order) + + del self.my_key + + return sha256(kk).digest() + + def aes_setup(self, session_key): + # Load keys and define encrypt/decrypt functions + # - for CTR mode, we have different counters in each direction, so need two instances + # - count must start at zero, and increment in LSB for each block. + import pyaes + + self.encrypt_request = pyaes.AESModeOfOperationCTR(session_key, pyaes.Counter(0)).decrypt + self.decrypt_response = pyaes.AESModeOfOperationCTR(session_key, pyaes.Counter(0)).encrypt + + def start_encryption(self): + # setup encryption on the link + # - pick our own key pair, IV for AES + # - send IV and pubkey to device + # - it replies with own pubkey + # - determine what the session key was/is + + pubkey = self.ec_setup() + + msg = CCProtocolPacker.encrypt_start(pubkey) + + his_pubkey, fingerprint, xpub = self.send_recv(msg, encrypt=False) + + self.session_key = self.ec_mult(his_pubkey) + + # capture some public details of remote side's master key + # - these can be empty/0x0 when no secrets on device yet + self.master_xpub = str(xpub, 'ascii') + self.master_fingerprint = fingerprint + + #print('sess key = %s' % b2a_hex(self.session_key)) + self.aes_setup(self.session_key) + + def mitm_verify(self, sig, expected_xpub): + # First try with Pycoin + try: + from pycoin.key.BIP32Node import BIP32Node + from pycoin.contrib.msg_signing import verify_message + from pycoin.encoding import from_bytes_32 + from base64 import b64encode + + mk = BIP32Node.from_wallet_key(expected_xpub) + return verify_message(mk, b64encode(sig), msg_hash=from_bytes_32(self.session_key)) + except ImportError: + pass + + # If Pycoin is not available, do it using ecdsa + from ecdsa import BadSignatureError, SECP256k1, VerifyingKey + pubkey, chaincode = decode_xpub(expected_xpub) + vk = VerifyingKey.from_string(get_pubkey_string(pubkey), curve=SECP256k1) + try: + ok = vk.verify_digest(sig[1:], self.session_key) + except BadSignatureError: + ok = False + + return ok + + def check_mitm(self, expected_xpub=None, sig=None): + # Optional? verification against MiTM attack: + # Using the master xpub, check a signature over the session public key, to + # verify we talking directly to the real Coldcard (no active MitM between us). + # - message is just the session key itself; no digests or prefixes + # - no need for this unless concerned about *active* mitm on USB bus + # - passive attackers (snoopers) will get nothing anyway, thanks to diffie-helman sauce + # - unfortunately might be too slow to do everytime? + + xp = expected_xpub or self.master_xpub + assert xp, "device doesn't have any secrets yet" + assert self.session_key, "connection not yet in encrypted mode" + + # this request is delibrately slow on the device side + if not sig: + sig = self.send_recv(CCProtocolPacker.check_mitm(), timeout=5000) + + assert len(sig) == 65 + + ok = self.mitm_verify(sig, xp) + + if ok != True: + raise RuntimeError("Possible active MiTM attack in progress! Incorrect signature.") + + def upload_file(self, data, verify=True, blksize=1024): + # upload a single file, up to 1MB? in size. Can check arrives ok. + chk = sha256(data).digest() + + for i in range(0, len(data), blksize): + here = data[i:i+blksize] + pos = self.send_recv(CCProtocolPacker.upload(i, len(data), here)) + assert pos == i + + if verify: + rb = self.send_recv(CCProtocolPacker.sha256()) + if rb != chk: + raise RuntimeError('Checksum wrong during file upload') + + return len(data), chk + + def download_file(self, length, checksum, blksize=1024, file_number=1): + # Download a single file, when you already know it's checksum. Will check arrives ok. + data = b'' + chk = sha256() + + pos = 0 + while pos < length: + here = self.send_recv(CCProtocolPacker.download(pos, min(blksize, length-pos), file_number)) + data += here + chk.update(here) + pos += len(here) + assert len(here) > 0 + + if chk.digest() != checksum: + raise RuntimeError('Checksum wrong during file download') + + return data + + +class UnixSimulatorPipe: + # Use a UNIX pipe to the simulator instead of a real USB connection. + # - emulates the API of hidapi device object. + + def __init__(self, path): + import socket, atexit + self.pipe = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + try: + self.pipe.connect(path) + except FileNotFoundError: + raise RuntimeError("Cannot connect to simulator. Is it running?") + + instance = 0 + while instance < 10: + pn = '/tmp/ckcc-client-%d-%d.sock' % (os.getpid(), instance) + try: + self.pipe.bind(pn) # just needs any name + break + except OSError: + instance += 1 + continue + + self.pipe_name = pn + atexit.register(self.close) + + def read(self, max_count, timeout_ms=None): + import socket + if not timeout_ms: + self.pipe.settimeout(None) + else: + self.pipe.settimeout(timeout_ms / 1000.0) + + try: + return self.pipe.recv(max_count) + except socket.timeout: + return None + + def write(self, buf): + assert len(buf) == 65 + self.pipe.settimeout(10) + rv = self.pipe.send(buf[1:]) + return 65 if rv == 64 else rv + + def error(self): + return '' + + def close(self): + self.pipe.close() + try: + os.unlink(self.pipe_name) + except: pass + + def get_serial_number_string(self): + return 'simulator' + + +# EOF diff --git a/hwilib/devices/ckcc/constants.py b/hwilib/devices/ckcc/constants.py new file mode 100644 index 0000000..9499838 --- /dev/null +++ b/hwilib/devices/ckcc/constants.py @@ -0,0 +1,69 @@ +# +# Constants and various "limits" shared between embedded and desktop USB protocol +# +try: + from micropython import const +except ImportError: + const = int + +# For upload/download this is the max size of the data block. +MAX_BLK_LEN = const(2048) + +# Max total message length, excluding framing overhead (1 byte per 64). +# - includes args for upload command +MAX_MSG_LEN = const(4+4+4+MAX_BLK_LEN) + +# Max PSBT txn we support (384k bytes as PSBT) +# - the max on the wire for mainnet is 100k +# - but a PSBT might contain a full txn for each input +MAX_TXN_LEN = const(384*1024) + +# Max size of any upload (firmware.dfu files in particular) +MAX_UPLOAD_LEN = const(2*MAX_TXN_LEN) + +# Max length of text messages for signing +MSG_SIGNING_MAX_LENGTH = const(240) + +# Bit values for address types +AFC_PUBKEY = const(0x01) # pay to hash of pubkey +AFC_SEGWIT = const(0x02) # requires a witness to spend +AFC_BECH32 = const(0x04) # just how we're encoding it? +AFC_SCRIPT = const(0x08) # paying into a script +AFC_WRAPPED = const(0x10) # for transition/compat types for segwit vs. old + +# Numeric codes for specific address types +AF_CLASSIC = AFC_PUBKEY # 1addr +AF_P2SH = AFC_SCRIPT # classic multisig / simple P2SH / 3hash +AF_P2WPKH = AFC_PUBKEY | AFC_SEGWIT | AFC_BECH32 # bc1qsdklfj +AF_P2WSH = AFC_SCRIPT | AFC_SEGWIT | AFC_BECH32 # segwit multisig +AF_P2WPKH_P2SH = AFC_WRAPPED | AFC_PUBKEY | AFC_SEGWIT # looks classic P2SH, but p2wpkh inside +AF_P2WSH_P2SH = AFC_WRAPPED | AFC_SCRIPT | AFC_SEGWIT # looks classic P2SH, segwit multisig + +SUPPORTED_ADDR_FORMATS = frozenset([ + AF_CLASSIC, + AF_P2SH, + AF_P2WPKH, + AF_P2WSH, + AF_P2WPKH_P2SH, + AF_P2WSH_P2SH, +]) + +# BIP-174 aka PSBT defined values +# +PSBT_GLOBAL_UNSIGNED_TX = const(0) + +PSBT_IN_NON_WITNESS_UTXO = const(0) +PSBT_IN_WITNESS_UTXO = const(1) +PSBT_IN_PARTIAL_SIG = const(2) +PSBT_IN_SIGHASH_TYPE = const(3) +PSBT_IN_REDEEM_SCRIPT = const(4) +PSBT_IN_WITNESS_SCRIPT = const(5) +PSBT_IN_BIP32_DERIVATION = const(6) +PSBT_IN_FINAL_SCRIPTSIG = const(7) +PSBT_IN_FINAL_SCRIPTWITNESS = const(8) + +PSBT_OUT_REDEEM_SCRIPT = const(0) +PSBT_OUT_WITNESS_SCRIPT = const(1) +PSBT_OUT_BIP32_DERIVATION = const(2) + +# EOF diff --git a/hwilib/devices/ckcc/protocol.py b/hwilib/devices/ckcc/protocol.py new file mode 100644 index 0000000..8467cba --- /dev/null +++ b/hwilib/devices/ckcc/protocol.py @@ -0,0 +1,202 @@ +# +# Details of our USB level protocol. Shared file between desktop and embedded. +# +# - first 4 bytes of all messages is the command code or response code +# - use H +# +from struct import pack, unpack_from +from .constants import * + +class CCProtoError(RuntimeError): + def __str__(self): + return self.args[0] + +class CCUserRefused(RuntimeError): + def __str__(self): + return 'You refused permission to do the operation' + +class CCBusyError(RuntimeError): + def __str__(self): + return 'Coldcard is handling another request right now' + +class CCProtocolPacker: + # returns a lamba that will take correct args + # and then give you a binary string to encode the + # request + + @staticmethod + def logout(): + return pack('4s', b'logo') + + @staticmethod + def reboot(): + return pack('4s', b'rebo') + + @staticmethod + def version(): + # returns a string, with newline separators + return pack('4s', b'vers') + + @staticmethod + def ping(msg): + # returns whatever binary you give it + return b'ping' + bytes(msg) + + @staticmethod + def check_mitm(): + return b'mitm' + + @staticmethod + def start_backup(): + # prompts user with password for encrytped backup + return b'back' + + @staticmethod + def encrypt_start(device_pubkey, version=0x1): + assert len(device_pubkey) == 64, "want uncompressed 64-byte pubkey, no prefix byte" + return pack('<4sI64s', b'ncry', version, device_pubkey) + + @staticmethod + def upload(offset, total_size, data): + # note: see MAX_MSG_LEN above + assert len(data) <= MAX_MSG_LEN, 'badlen' + return pack('<4sII', b'upld', offset, total_size) + data + + @staticmethod + def download(offset, length, file_number=0): + assert 0 <= file_number < 2 + return pack('<4sIII', b'dwld', offset, length, file_number) + + @staticmethod + def sha256(): + return b'sha2' + + @staticmethod + def sign_transaction(length, file_sha, finalize=False): + # must have already uploaded binary, and give expected sha256 + assert len(file_sha) == 32 + return pack('<4sII32s', b'stxn', length, int(finalize), file_sha) + + @staticmethod + def sign_message(raw_msg, subpath='m', addr_fmt=AF_CLASSIC): + # only begins user interaction + return pack('<4sIII', b'smsg', addr_fmt, len(subpath), len(raw_msg)) \ + + subpath.encode('ascii') + raw_msg + + @staticmethod + def get_signed_msg(): + # poll completion/results of message signing + return b'smok' + + @staticmethod + def get_backup_file(): + # poll completion/results of backup + return b'bkok' + + @staticmethod + def get_signed_txn(): + # poll completion/results of transaction signing + return b'stok' + + @staticmethod + def get_xpub(subpath='m'): + # takes a string, like: m/44'/0'/23/23 + return b'xpub' + subpath.encode('ascii') + + @staticmethod + def show_address(subpath, addr_fmt=AF_CLASSIC): + # takes a string, like: m/44'/0'/23/23 + # shows on screen, no feedback from user expected + return pack('<4sI', b'show', addr_fmt) + subpath.encode('ascii') + + @staticmethod + def sim_keypress(key): + # Simulator ONLY: pretend a key is pressed + return b'XKEY' + key + + @staticmethod + def bag_number(new_number=b''): + # one time only: put into bag, or readback bag + return b'bagi' + bytes(new_number) + + +class CCProtocolUnpacker: + # Take a binary response, and turn it into a python object + # - we support a number of signatures, and expand as needed + # - some will be general-purpose, but others can be very specific to one command + # - given full rx message to work from + # - this is done after un-framing + + @classmethod + def decode(cls, msg): + assert len(msg) >= 4 + sign = str(msg[0:4], 'utf8', 'ignore') + + d = getattr(cls, sign, cls) + if d is cls: + raise CCProtoError('Unknown response signature: ' + repr(sign)) + + return d(msg) + + + # struct info for each response + + def okay(msg): + # trivial response, w/ no content + assert len(msg) == 4 + return None + + # low-level errors + def fram(msg): + raise CCProtoError("Framing Error", str(msg[4:], 'utf8')) + def err_(msg): + raise CCProtoError("Remote Error: " + str(msg[4:], 'utf8', 'ignore'), msg[4:]) + + def refu(msg): + # user didn't want to approve something + raise CCUserRefused() + + def busy(msg): + # user didn't want to approve something + raise CCBusyError() + + def biny(msg): + # binary string: length implied by msg framing + return msg[4:] + + def int1(msg): + return unpack_from('= 0x8008000, "Bad address?" + + yield fd.tell() + yield elem.size + +# Adapted from https://github.com/petertodd/python-bitcoinlib/blob/master/bitcoin/base58.py +def decode_xpub(s): + assert s[1:].startswith('pub') + b58_digits = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' + + # Convert the string to an integer + n = 0 + for c in s: + n *= 58 + if c not in b58_digits: + raise ValueError('Character %r is not a valid base58 character' % c) + digit = b58_digits.index(c) + n += digit + + # Convert the integer to bytes + h = '%x' % n + if len(h) % 2: + h = '0' + h + res = binascii.unhexlify(h.encode('utf8')) + + # Add padding back. + pad = 0 + for c in s[:-1]: + if c == b58_digits[0]: pad += 1 + else: break + decoded = b'\x00' * pad + res + + # Get the pubkey and chaincode + return decoded[-37:-4], decoded[-69:-37] + +def get_pubkey_string(b): + p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F + + x = int.from_bytes(b[1:], byteorder="big") + y = pow((x*x*x + 7) % p, (p + 1) // 4, p) + if (y & 1 != b[0] & 1): + y = p - y + return x.to_bytes(32, byteorder="big") + y.to_bytes(32, byteorder="big") + +# EOF diff --git a/hwilib/devices/coldcard.py b/hwilib/devices/coldcard.py index 9022637..c989e5e 100644 --- a/hwilib/devices/coldcard.py +++ b/hwilib/devices/coldcard.py @@ -2,9 +2,9 @@ from ..hwwclient import HardwareWalletClient from ..errors import ActionCanceledError, BadArgumentError, DeviceBusyError, UnavailableActionError, DeviceFailureError -from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID -from ckcc.protocol import CCProtocolPacker, CCBusyError, CCProtoError, CCUserRefused -from ckcc.constants import MAX_BLK_LEN, AF_P2WPKH, AF_CLASSIC, AF_P2WPKH_P2SH +from .ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID +from .ckcc.protocol import CCProtocolPacker, CCBusyError, CCProtoError, CCUserRefused +from .ckcc.constants import MAX_BLK_LEN, AF_P2WPKH, AF_CLASSIC, AF_P2WPKH_P2SH from ..base58 import xpub_main_2_test, get_xpub_fingerprint_hex from hashlib import sha256 diff --git a/setup.py b/setup.py index 0018f21..8e29a91 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,6 @@ setuptools.setup( install_requires=[ 'hidapi', # HID API needed in general 'btchip-python', # Ledger Nano S - 'ckcc-protocol[cli]', # Coldcard 'pyaes', 'ecdsa', # Needed for Ledger but their library does not install it 'typing_extensions>=3.7', diff --git a/test/test_coldcard.py b/test/test_coldcard.py index e76b57b..19c52d2 100755 --- a/test/test_coldcard.py +++ b/test/test_coldcard.py @@ -8,8 +8,8 @@ import time import unittest from hwilib.cli import process_commands -from ckcc.protocol import CCProtocolPacker -from ckcc.client import ColdcardDevice +from hwilib.devices.ckcc.protocol import CCProtocolPacker +from hwilib.devices.ckcc.client import ColdcardDevice from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignMessage, TestSignTx def coldcard_test_suite(simulator, rpc, userpass):