# (c) Copyright 2021 by Coinkite Inc. This file is covered by license found in COPYING-CC. # # client.py # # Implement the desktop side of our Coldcard USB protocol. # # If you would like to use a different EC/AES library, you may subclass # and override these member functions: # # - ec_mult, ec_setup, aes_setup, mitm_verify # import hid, os, socket, atexit from binascii import b2a_hex from hashlib import sha256 from .constants import USB_NCRY_V1, USB_NCRY_V2 from .protocol import CCProtocolPacker, CCProtocolUnpacker, CCProtoError, MAX_MSG_LEN from .utils import decode_xpub, get_pubkey_string # unofficial, unpermissioned... USB numbers COINKITE_VID = 0xd13e CKCC_PID = 0xcc10 DEFAULT_SIM_SOCKET = "/tmp/ckcc-simulator.sock" class ColdcardDevice: def __init__(self, sn=None, dev=None, encrypt=True, ncry_ver=USB_NCRY_V1, is_simulator=False): # Establish connection via USB (HID) or Unix Pipe self.is_simulator = is_simulator if not dev and ((sn and ('/' in sn)) or self.is_simulator): dev = UnixSimulatorPipe(sn) found = 'simulator' self.is_simulator = True if not dev: for info in hid.enumerate(COINKITE_VID, CKCC_PID): found = info['serial_number'] if sn and sn != found: continue # only one interface per device, so only one 'path' dev = hid.device(serial=found) assert dev, "failed to open: "+found dev.open_path(info['path']) break if not dev: raise KeyError("Could not find Coldcard!" if not sn else ('Cannot find CC with serial: '+sn)) else: found = dev.get_serial_number_string() self.dev = dev self.serial = found # they will be defined after we've established a shared secret w/ device self.ncry_ver = ncry_ver self.session_key = None self.encrypt_request = None self.decrypt_response = None self.master_xpub = None self.master_fingerprint = None self.resync() if encrypt: self.start_encryption(version=self.ncry_ver) def close(self): # close underlying HID device if self.dev: self.dev.close() self.dev = None def resync(self): # flush anything already waiting on the EP while 1: junk = self.dev.read(64, timeout_ms=1) if not junk: break # write a special packet, that encodes zero-length data, and last packet in sequence # prefix with 0x00 for "report number" self.dev.write(b'\x00\x80' + (b'\xff'*63)) # flush any response (perhaps error) waiting on the EP while 1: junk = self.dev.read(64, timeout_ms=1) if not junk: break # check the above all worked err = self.dev.error() if err and ('not implemented yet' not in err) and (err != 'Success'): raise RuntimeError('hidapi: '+err) assert self.dev.get_serial_number_string() == self.serial def send_recv(self, msg, expect_errors=False, verbose=0, timeout=3000, encrypt=True): # first byte of each 64-byte packet encodes length or packet-offset assert 4 <= len(msg) <= MAX_MSG_LEN, "msg length: %d" % len(msg) if self.encrypt_request is None: # disable encryption if not already enabled for this connection encrypt = False if self.encrypt_request and self.ncry_ver == USB_NCRY_V2: # ncry version 2 - everything needs to be encrypted encrypt = True if encrypt: msg = self.encrypt_request(msg) left = len(msg) offset = 0 while left > 0: # Note: first byte always zero (HID report number), # [1] is framing header (length+flags) # [2:65] payload (63 bytes, perhaps including padding) here = min(63, left) buf = bytearray(65) buf[2:2+here] = msg[offset:offset+here] if here == left: # final one in sequence buf[1] = here | 0x80 | (0x40 if encrypt else 0x00) else: # more will be coming buf[1] = here assert len(buf) == 65 if verbose: print("Tx [%2d]: %s (0x%x)" % (here, b2a_hex(buf[1:]), buf[1])) rv = self.dev.write(buf) assert rv == len(buf) == 65, repr(rv) offset += here left -= here # collect response, framed in the same manner resp = b'' while 1: buf = self.dev.read(64, timeout_ms=(timeout or 0)) if not buf and timeout: # give it another try buf = self.dev.read(64, timeout_ms=timeout) assert buf, "timeout reading USB EP" # (trusting more than usual here) flag = buf[0] resp += bytes(buf[1:1+(flag & 0x3f)]) if flag & 0x80: break if flag & 0x40: if verbose: print('Enc response: %s' % b2a_hex(resp)) resp = self.decrypt_response(resp) try: if verbose: print("Rx [%2d]: %r" % (len(resp), b2a_hex(bytes(resp)))) return CCProtocolUnpacker.decode(resp) except CCProtoError: if expect_errors: raise raise except Exception: #print("Corrupt response: %r" % resp) raise def ec_setup(self): # Provides the ECSDA primatives in portable way. # Needed to do D-H session key aggreement and then AES. # - should be replaced in subclasses if you have other EC libraries # - curve is always secp256k1 # - values are binary strings # - write whatever you want onto self. # - setup: return 65 of public key, and 16 bytes of AES IV # - second call: give the pubkey of far side, calculate the shared pt on curve from ecdsa.curves import SECP256k1 from ecdsa import SigningKey self.my_key = SigningKey.generate(curve=SECP256k1, hashfunc=sha256) pubkey = self.my_key.get_verifying_key().to_string() assert len(pubkey) == 64 #print("my pubkey = %s" % b2a_hex(pubkey)) return pubkey def ec_mult(self, his_pubkey): # - second call: given the pubkey of far side, calculate the shared pt on curve # - creates session key based on that from ecdsa.curves import SECP256k1 from ecdsa import VerifyingKey from ecdsa.util import number_to_string # Validate his pubkey a little: this call will check it's on the curve. assert len(his_pubkey) == 64 his_pubkey = VerifyingKey.from_string(his_pubkey, curve=SECP256k1, hashfunc=sha256) #print("his pubkey = %s" % b2a_hex(his_pubkey.to_string())) # do the D-H thing pt = self.my_key.privkey.secret_multiplier * his_pubkey.pubkey.point # final key is sha256 of that point, serialized (64 bytes). order = SECP256k1.order kk = number_to_string(pt.x(), order) + number_to_string(pt.y(), order) del self.my_key return sha256(kk).digest() def aes_setup(self, session_key): # Load keys and define encrypt/decrypt functions # - for CTR mode, we have different counters in each direction, so need two instances # - count must start at zero, and increment in LSB for each block. import pyaes self.encrypt_request = pyaes.AESModeOfOperationCTR(session_key, pyaes.Counter(0)).encrypt self.decrypt_response = pyaes.AESModeOfOperationCTR(session_key, pyaes.Counter(0)).decrypt def start_encryption(self, version=USB_NCRY_V1): # setup encryption on the link # - pick our own key pair, IV for AES # - send IV and pubkey to device # - it replies with own pubkey # - determine what the session key was/is pubkey = self.ec_setup() msg = CCProtocolPacker.encrypt_start(pubkey, version=version) his_pubkey, fingerprint, xpub = self.send_recv(msg, encrypt=False) self.ncry_ver = version self.session_key = self.ec_mult(his_pubkey) # capture some public details of remote side's master key # - these can be empty/0x0 when no secrets on device yet self.master_xpub = str(xpub, 'ascii') self.master_fingerprint = fingerprint #print('sess key = %s' % b2a_hex(self.session_key)) self.aes_setup(self.session_key) def mitm_verify(self, sig, expected_xpub): from ecdsa import BadSignatureError, SECP256k1, VerifyingKey # of the returned (pubkey, chaincode) tuple, chaincode is not used pubkey, _ = decode_xpub(expected_xpub) vk = VerifyingKey.from_string(get_pubkey_string(pubkey), curve=SECP256k1) try: ok = vk.verify_digest(sig[1:], self.session_key) except BadSignatureError: ok = False return ok def check_mitm(self, expected_xpub=None, sig=None): # Optional? verification against MiTM attack: # Using the master xpub, check a signature over the session public key, to # verify we talking directly to the real Coldcard (no active MitM between us). # - message is just the session key itself; no digests or prefixes # - no need for this unless concerned about *active* mitm on USB bus # - passive attackers (snoopers) will get nothing anyway, thanks to diffie-helman sauce # - unfortunately might be too slow to do everytime? xp = expected_xpub or self.master_xpub assert xp, "device doesn't have any secrets yet" assert self.session_key, "connection not yet in encrypted mode" # this request is delibrately slow on the device side if not sig: sig = self.send_recv(CCProtocolPacker.check_mitm(), timeout=5000) assert len(sig) == 65 ok = self.mitm_verify(sig, xp) if ok != True: raise RuntimeError("Possible active MiTM attack in progress! Incorrect signature.") def upload_file(self, data, verify=True, blksize=1024): # upload a single file, up to 1MB? in size. Can check arrives ok. chk = sha256(data).digest() for i in range(0, len(data), blksize): here = data[i:i+blksize] pos = self.send_recv(CCProtocolPacker.upload(i, len(data), here)) assert pos == i if verify: rb = self.send_recv(CCProtocolPacker.sha256()) if rb != chk: raise RuntimeError('Checksum wrong during file upload') return len(data), chk def download_file(self, length, checksum, blksize=1024, file_number=1): # Download a single file, when you already know it's checksum. Will check arrives ok. data = b'' chk = sha256() pos = 0 while pos < length: here = self.send_recv(CCProtocolPacker.download(pos, min(blksize, length-pos), file_number)) data += here chk.update(here) pos += len(here) assert len(here) > 0 if chk.digest() != checksum: raise RuntimeError('Checksum wrong during file download') return data def hash_password(self, text_password, v3=False): # Turn text password into a key for use in HSM auth protocol # - changed from pbkdf2_hmac_sha256 to pbkdf2_hmac_sha512 in version 4 of CC firmware from hashlib import pbkdf2_hmac, sha256 from .constants import PBKDF2_ITER_COUNT salt = sha256(b'pepper' + self.serial.encode('ascii')).digest() return pbkdf2_hmac('sha256' if v3 else 'sha512', text_password, salt, PBKDF2_ITER_COUNT)[:32] def firmware_version(self): return self.send_recv(CCProtocolPacker.version()).split("\n") def is_edge(self): # returns True if device is running EDGE firmware version if self.is_simulator: cmd = "import version; RV.write(str(int(getattr(version, 'is_edge', 0))))" rv = self.send_recv(b'EXEC' + cmd.encode('utf-8'), timeout=60000, encrypt=False) return rv == b"1" return self.firmware_version()[1][-1] == "X" class UnixSimulatorPipe: # Use a UNIX pipe to the simulator instead of a real USB connection. # - emulates the API of hidapi device object. def __init__(self, socket_path=None): self.socket_path = socket_path or DEFAULT_SIM_SOCKET self.pipe = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) try: self.pipe.connect(self.socket_path) except Exception: self.close() raise RuntimeError("Cannot connect to simulator. Is it running?") last_err = None for instance in range(5): # if simulator has PID in socket path, client will have matching, or empty pn = '/tmp/ckcc-client%s-%d-%d.sock' % (self.get_sim_pid(), os.getpid(), instance) try: self.pipe.bind(pn) # just needs any name break except OSError as err: last_err = err if os.path.exists(pn): os.remove(pn) continue else: raise last_err # raise whatever was raised last in the loop self.pipe_name = pn atexit.register(self.close) def get_sim_pid(self): # return str PID if any in socket_path if self.socket_path == DEFAULT_SIM_SOCKET: return "" return "-" + self.socket_path.split(".")[0].split("-")[-1] def read(self, max_count, timeout_ms=None): import socket if not timeout_ms: self.pipe.settimeout(None) else: self.pipe.settimeout(timeout_ms / 1000.0) try: return self.pipe.recv(max_count) except socket.timeout: return None def write(self, buf): assert len(buf) == 65 self.pipe.settimeout(10) rv = self.pipe.send(buf[1:]) return 65 if rv == 64 else rv def error(self): return '' def close(self): self.pipe.close() try: os.unlink(self.pipe_name) except Exception: pass def get_serial_number_string(self): return 'F1'*6 # EOF