diff --git a/ckcc/__init__.py b/ckcc/__init__.py index 831bae4..f4cee2d 100644 --- a/ckcc/__init__.py +++ b/ckcc/__init__.py @@ -1,6 +1,6 @@ # (c) Copyright 2021 by Coinkite Inc. This file is covered by license found in COPYING-CC. -__version__ = '1.3.1' +__version__ = '1.3.2' __all__ = [ "client", "protocol", "constants" ] diff --git a/ckcc/client.py b/ckcc/client.py index e51a15e..61a886b 100644 --- a/ckcc/client.py +++ b/ckcc/client.py @@ -12,6 +12,7 @@ import hid, sys, os from binascii import b2a_hex, a2b_hex from hashlib import sha256 +from .constants import USB_NCRY_V1, USB_NCRY_V2 from .protocol import CCProtocolPacker, CCProtocolUnpacker, CCProtoError, MAX_MSG_LEN, MAX_BLK_LEN from .utils import decode_xpub, get_pubkey_string @@ -23,7 +24,7 @@ CKCC_PID = 0xcc10 CKCC_SIMULATOR_PATH = '/tmp/ckcc-simulator.sock' class ColdcardDevice: - def __init__(self, sn=None, dev=None, encrypt=True): + def __init__(self, sn=None, dev=None, encrypt=True, ncry_ver=USB_NCRY_V1): # Establish connection via USB (HID) or Unix Pipe self.is_simulator = False @@ -57,6 +58,7 @@ class ColdcardDevice: self.serial = found # they will be defined after we've established a shared secret w/ device + self.ncry_ver = ncry_ver self.session_key = None self.encrypt_request = None self.decrypt_response = None @@ -66,7 +68,7 @@ class ColdcardDevice: self.resync() if encrypt: - self.start_encryption() + self.start_encryption(version=self.ncry_ver) def close(self): # close underlying HID device @@ -100,10 +102,14 @@ class ColdcardDevice: # first byte of each 64-byte packet encodes length or packet-offset assert 4 <= len(msg) <= MAX_MSG_LEN, "msg length: %d" % len(msg) - if not self.encrypt_request: + if self.encrypt_request is None: # disable encryption if not already enabled for this connection encrypt = False + if self.encrypt_request and self.ncry_ver == USB_NCRY_V2: + # ncry version 2 - everything needs to be encrypted + encrypt = True + if encrypt: msg = self.encrypt_request(msg) @@ -223,7 +229,7 @@ class ColdcardDevice: self.encrypt_request = pyaes.AESModeOfOperationCTR(session_key, pyaes.Counter(0)).encrypt self.decrypt_response = pyaes.AESModeOfOperationCTR(session_key, pyaes.Counter(0)).decrypt - def start_encryption(self): + def start_encryption(self, version=USB_NCRY_V1): # setup encryption on the link # - pick our own key pair, IV for AES # - send IV and pubkey to device @@ -232,10 +238,12 @@ class ColdcardDevice: pubkey = self.ec_setup() - msg = CCProtocolPacker.encrypt_start(pubkey) + msg = CCProtocolPacker.encrypt_start(pubkey, version=version) his_pubkey, fingerprint, xpub = self.send_recv(msg, encrypt=False) + self.ncry_ver = version + self.session_key = self.ec_mult(his_pubkey) # capture some public details of remote side's master key diff --git a/ckcc/constants.py b/ckcc/constants.py index 3cf2c40..2153694 100644 --- a/ckcc/constants.py +++ b/ckcc/constants.py @@ -7,6 +7,22 @@ try: except ImportError: const = int +# USB encryption versions (default USB_NCRY_V1) +# +# This introduces a new ncry version to close a potential attack vector: +# +# A malicious program may re-initialize the connection encryption by sending the ncry command a second time during USB operation. +# This may prove particularly harmful in HSM mode. +# +# Sending version USB_NCRY_V2 changes the behavior in two ways: +# * All future commands must be encrypted +# * Returns an error if the ncry command is sent again for the duration of the power cycle +# +# USB_NCRY_V2 is most suitable for HSM mode as in case of any communication issue or simply by closing `ColdcardDevice` +# Coldcard will need to reboot to recover USB operation if USB_NCRY_V2. +USB_NCRY_V1 = const(0x01) +USB_NCRY_V2 = const(0x02) + # For upload/download this is the max size of the data block. MAX_BLK_LEN = const(2048) diff --git a/ckcc/protocol.py b/ckcc/protocol.py index 60726c6..66b5335 100644 --- a/ckcc/protocol.py +++ b/ckcc/protocol.py @@ -68,7 +68,11 @@ class CCProtocolPacker: return b'back' @staticmethod - def encrypt_start(device_pubkey, version=0x1): + def encrypt_start(device_pubkey, version=USB_NCRY_V1): + supported_versions = [USB_NCRY_V1, USB_NCRY_V2] + if version not in supported_versions: + raise ValueError("Unsupported USB encryption version. " + "Supported versions: %s" % (supported_versions)) assert len(device_pubkey) == 64, "want uncompressed 64-byte pubkey, no prefix byte" return pack('<4sI64s', b'ncry', version, device_pubkey) diff --git a/tests/test_usb_ncry.py b/tests/test_usb_ncry.py new file mode 100644 index 0000000..072e647 --- /dev/null +++ b/tests/test_usb_ncry.py @@ -0,0 +1,71 @@ +import pytest +from ckcc.constants import AF_P2WPKH, USB_NCRY_V1, USB_NCRY_V2 +from ckcc.client import ColdcardDevice, CCProtocolPacker + + +# v2 tests require you to have firmware supporting usb encryption v2 +# after each v2 test, coldcard needs to be reconnected + + +def test_ncry_v1(): + # USB_NCRY_V1 is the default + dev = ColdcardDevice() + session_key = dev.session_key + assert session_key + # re-establish shared secret + dev.start_encryption() + assert dev.ncry_ver == USB_NCRY_V1 + assert session_key != dev.session_key + session_key = dev.session_key + # we can do this many times over - it will always work + dev.start_encryption() + assert dev.ncry_ver == USB_NCRY_V1 + assert session_key != dev.session_key + + +def test_ncry_v2(): + # after this test, one needs to reconnect coldcard + dev = ColdcardDevice(ncry_ver=USB_NCRY_V2) + assert dev.session_key + assert dev.ncry_ver == USB_NCRY_V2 + # cannot start new session - already bound + with pytest.raises(Exception): + dev.start_encryption() + # cannot start new session even with v2 - already bound + with pytest.raises(Exception): + dev.start_encryption(version=USB_NCRY_V2) + # if above conditions are met - all commands gonna be encrypted + assert dev.ncry_ver == USB_NCRY_V2 + addr = dev.send_recv(CCProtocolPacker.show_address("m/84'/0'/0'/0/0", AF_P2WPKH), timeout=None) + assert addr + + +def test_ncry_v2_via_start_encryption(): + dev = ColdcardDevice() + assert dev.session_key + assert dev.ncry_ver == USB_NCRY_V1 + dev.start_encryption(version=USB_NCRY_V2) + assert dev.ncry_ver == USB_NCRY_V2 + # cannot start new session - already bound + with pytest.raises(Exception): + dev.start_encryption() + # cannot start new session even with v2 - already bound + with pytest.raises(Exception): + dev.start_encryption(version=USB_NCRY_V2) + # test some commands + assert dev.ncry_ver == USB_NCRY_V2 + assert dev.encrypt_request is not None + # if above conditions are met - all commands gonna be encrypted + addr = dev.send_recv(CCProtocolPacker.show_address("m/84'/0'/0'/0/0", AF_P2WPKH), timeout=None) + assert addr + + +def test_unsupported_version(): + dev = ColdcardDevice() + with pytest.raises(ValueError): + dev.start_encryption(version=0x3) + dev.close() + with pytest.raises(ValueError): + ColdcardDevice(ncry_ver=0x3) + +