# (c) Copyright 2018 by Coinkite Inc. This file is covered by license found in COPYING-CC. # # usb.py - USB related things # import ckcc, pyb, callgate, sys, ux, ngu, stash, aes256ctr from uasyncio import sleep_ms, core from uhashlib import sha256 from public_constants import MAX_MSG_LEN, MAX_BLK_LEN, AFC_SCRIPT from public_constants import STXN_FLAGS_MASK from ustruct import pack, unpack_from from ckcc import watchpoint, is_simulator from utils import problem_file_line, call_later_ms from version import supports_hsm, is_devmode, MAX_TXN_LEN, MAX_UPLOAD_LEN from exceptions import FramingError, CCBusyError, HSMDenied, HSMCMDDisabled, SpendPolicyViolation from pincodes import pa # Unofficial, unpermissioned... numbers COINKITE_VID = 0xd13e CKCC_PID = 0xcc10 # Based on the U2F descriptor: # see # however, we don't want to be detected as a U2F device, because we # don't support that protocol, and we don't want web browsers and such # trying to speak U2F/HID/USB protocol at us. This descriptor is just to # keep the HID class drivers happy, we will detect based on VID/PID. # hid_descp = bytes([ 0x06, 0xcc, 0x10, # USAGE_PAGE (CC10 = Coldcard v1.0) 0x09, 0x01, # USAGE (0x01) 0xa1, 0x01, # COLLECTION (Application) 0x09, 0x20, # USAGE (Input Report Data) 0x15, 0x00, # LOGICAL_MINIMUM (0) 0x26, 0xff, 0x00, # LOGICAL_MAXIMUM (255) 0x75, 0x08, # REPORT_SIZE (8) 0x95, 0x40, # REPORT_COUNT (64) 0x81, 0x02, # INPUT (Data,Var,Abs) 0x09, 0x21, # USAGE (Output Report Data) 0x15, 0x00, # LOGICAL_MINIMUM (0) 0x26, 0xff, 0x00, # LOGICAL_MAXIMUM (255) 0x75, 0x08, # REPORT_SIZE (8) 0x95, 0x40, # REPORT_COUNT (64) 0x91, 0x02, # OUTPUT (Data,Var,Abs) 0xc0, # END_COLLECTION ]) # Only these whitelisted USB commands are allowed once we enter HSM mode. # NOTE: 'robo' here would allow firmware changes during HSM mode! HSM_WHITELIST = frozenset({ 'logo', 'ping', 'vers', # harmless/boring 'upld', 'sha2', 'dwld', 'stxn', # up/download/sign PSBT needed 'mitm', 'ncry', # maybe limited by policy tho 'smsg', # limited by policy 'blkc', 'hsts', # report status values 'stok', 'smok', # completion check: sign txn or msg 'xpub', 'msck', # quick status checks 'p2sh', 'show', # limited by HSM policy 'user', # auth HSM user, other user cmds not allowed 'gslr', # read storage locker; hsm mode only, limited usage }) # HSM related commands that are not allowed if 'hsmcmd' is disabled. HSM_DISABLE_CMDS = frozenset({ "user", "rmur", "nwur", "gslr", "hsts", "hsms", }) # spending policy active: blacklist some commands # - 'pass' may be allowed if 'okeys' is enabled HOBBLED_CMDS = frozenset({ 'enrl', # no new multisigs during policy enforcement 'back', # no backups 'bagi', 'dfu_', # just in case }) | HSM_DISABLE_CMDS # singleton instance of USBHandler() handler = None def enable_keyboard_emulation(): if is_simulator(): enable_usb() else: # real device pyb.usb_mode('VCP+HID', hid=pyb.hid_keyboard) global handler if not handler: handler = USBHandler() def enable_usb(): # We can't change it on the fly; must be disabled before here # - only one combo of subclasses can be used during a single power-up cycle cur = pyb.usb_mode() if cur: print("USB already enabled: %s" % cur) else: # subclass, protocol, max packet length, polling interval, report descriptor hid_info = (0x0, 0x0, 64, 5, hid_descp) classes = 'VCP+MSC+HID' pyb.usb_mode(classes, vid=COINKITE_VID, pid=CKCC_PID, hid=hid_info) global handler if not handler: handler = USBHandler() from imptask import IMPT if "USB" not in IMPT.tasks: IMPT.start_task('USB', handler.usb_hid_recv()) def disable_usb(): from imptask import IMPT task_usb = IMPT.tasks.pop("USB", None) if task_usb: task_usb.cancel() # pull the plug pyb.usb_mode(None) def is_vcp_active(): # VCP = Virtual Comm Port en = ckcc.vcp_enabled(None) cur = pyb.usb_mode() return cur and ('VCP' in cur) and en class USBHandler: def __init__(self): self.dev = pyb.USB_HID() # We keep a running hash over whatever has been uploaded # - reset at offset zero, can be read back anytime self.file_checksum = sha256() self.is_fw_upgrade = False # handle simulator self.blockable = getattr(self.dev, 'pipe', self.dev) self.msg = bytearray(2048+12) assert len(self.msg) == MAX_MSG_LEN self.encrypted_req = False # not bound to a specific crypto setup by default self.bound = False # these will be objects later self.encrypt = None self.decrypt = None def get_packet(self): # read next packet (64 bytes) waiting on the wire. Unframe it and return # active part of packet, flags associated. buf = self.dev.recv(64, timeout=5000) ckcc.usb_active() if not buf: raise FramingError('timeout') elif len(buf) < 64: raise FramingError('short') elif len(buf) > 64: raise FramingError('long') # first byte gives us the actual size, status # all illegal combos here may become special messages someday flag = buf[0] is_last = bool(flag & 0x80) len_here = int(flag & 0x3f) is_encrypted = bool(flag & 0x40) return buf[1:1+len_here], is_last, is_encrypted async def usb_hid_recv(self): # blocks and builds up a full-length command packet in memory # - calls self.handle() once complete msg on hand msg_len = 0 while 1: success = False yield core._io_queue.queue_read(self.blockable) try: here, is_last, is_encrypted = self.get_packet() #print('Rx[%d]' % len(here)) if here: lh = len(here) if msg_len+lh > MAX_MSG_LEN: raise FramingError('xlong') self.msg[msg_len:msg_len + lh] = here msg_len += lh else: # treat zero-length packets as a reset request # do not echo anything back on link.. used to resync connection msg_len = 0 continue if not is_last: # need more content continue if not(4 <= msg_len <= MAX_MSG_LEN): raise FramingError('badsz') if not is_encrypted and self.bound: raise FramingError('must encrypt') if is_encrypted: if self.decrypt is None: raise FramingError('no key') self.encrypted_req = True self.decrypt_inplace(msg_len) else: self.encrypted_req = False # process request try: # this saves memory over a simple slice (confirmed) args = memoryview(self.msg)[4:msg_len] resp = await self.handle(self.msg[0:4], args) success = True except CCBusyError: # auth UX is doing something else resp = b'busy' except SpendPolicyViolation: resp = b'err_Spending policy in effect' except HSMDenied: resp = b'err_Not allowed in HSM mode' except HSMCMDDisabled: # do NOT change below error msg as other applications depend on it resp = b'err_HSM commands disabled' msg_len = 0 except (ValueError, AssertionError) as exc: # some limited invalid args feedback #print("USB request caused assert: ", end='') # sys.print_exception(exc) msg = str(exc) if not msg: msg = 'Assertion ' + problem_file_line(exc) resp = b'err_' + msg.encode()[0:80] except MemoryError: # prefer to catch at higher layers, but sometimes can't resp = b'err_Out of RAM' except FramingError as exc: raise exc except Exception as exc: # catch bugs and fuzzing too if is_simulator() or is_devmode: print("USB request caused this: ", end='') sys.print_exception(exc) resp = b'err_Confused ' + problem_file_line(exc) if not success: # do not let the progress screen hang on "Receiving..." from ux import restore_menu restore_menu() msg_len = 0 # always send a reply if they get this far await self.send_response(resp) except FramingError as exc: reason = exc.args[0] # print("Framing: %s" % reason) await self.framing_error(reason) msg_len = 0 except BaseException as exc: # recover from general issues/keep going #print("USB!") #sys.print_exception(exc) msg_len = 0 def decrypt_inplace(self, msg_len): # self.msg is encrypted. decode it in place # - seems dangerous to use memview here, but works # - some memory alloc still happens here tho self.msg[0:msg_len] = self.decrypt(memoryview(self.msg)[0:msg_len]) def encrypt_response(self, msg): # encrypt what we'll send to desktop return self.encrypt(msg) async def send_response(self, resp): # send a python object as the response # - we know how to encode a few things, or send binary # - sadly we cannot stream here because we cannot subclass streams # - cannot reuse rx buffer either! # handle simple types here if isinstance(resp, (bytes, bytearray)): # preformated assert len(resp) >= 4 elif resp is None: resp = b'okay' elif isinstance(resp, int): resp = pack('<4sI', 'int1', resp) else: #print("Unknown resp: " + repr(resp)) raise NotImplementedError() assert len(resp) >= 4 msg = bytearray(64) if self.encrypt and self.encrypted_req: resp = self.encrypt_response(resp) final_flag = 0x80 | 0x40 else: final_flag = 0x80 pos = 0 left = len(resp) while left: # sent up to 63 bytes per packet here = min(left, 63) msg[0] = here msg[1:1+here] = resp[pos:pos+here] if here == left: # no more to come assert 0 <= here < 64 msg[0] |= final_flag left -= here pos += here ckcc.usb_active() for retries in range(100): chk = self.dev.send(msg) if chk == 64: break # Host may not have read previous value yet, so might need # to wait for it. Data loss possible here, but also the # host may stop reading the EP forever, so not our fault. # Let other stuff run during this delay. await sleep_ms(10) async def framing_error(self, why): # send error about framing, and recover resp = b'fram' + why.encode() await self.send_response(resp) async def handle(self, cmd, args): # Dispatch incoming message, and provide reply. from glob import hsm_active, settings try: cmd = bytes(cmd).decode() except: raise FramingError('decode') if is_devmode and cmd[0].isupper(): # special hacky commands to support testing w/ the simulator try: from usb_test_commands import do_usb_command return do_usb_command(cmd, args) except: pass if hsm_active: # only a few commands are allowed during HSM mode if cmd not in HSM_WHITELIST: raise HSMDenied if pa.hobbled_mode: # block some commands when we are hobbled. if cmd in HOBBLED_CMDS: raise SpendPolicyViolation if cmd in {'pwok', 'pass'}: from ccc import sssp_spending_policy if not sssp_spending_policy('okeys'): raise SpendPolicyViolation elif not settings.get('hsmcmd', False): # block these HSM-related command if not using feature if cmd in HSM_DISABLE_CMDS: raise HSMCMDDisabled if cmd == 'dfu_': # only useful in factory, undocumented. return self.call_after(callgate.enter_dfu) if cmd == 'rebo': from auth import UserAuthorizedAction, FirmwareUpgradeRequest import machine req = UserAuthorizedAction.active_request if req and isinstance(req, FirmwareUpgradeRequest): # We're waiting on firmware upgrade approval, so don't reboot # (which would not apply the upgrade anyway) and also don't # give an error, because ckcc-protocol and other clients # send the reboot as part of the old upgrade process. return return self.call_after(machine.reset) if cmd == 'logo': from utils import clean_shutdown return self.call_after(clean_shutdown) if cmd == 'ping': return b'biny' + args if cmd == 'upld': offset, total_size = unpack_from('path and M values addr_fmt, M, N, script_len = unpack_from('= 4, 'badlen' # regression patch of AFC_BECH32M flag # fixed here https://github.com/Coldcard/ckcc-protocol/commit/a6d901f9fca50755835eca895586ca74d0ca81ed if addr_fmt == 0x17: # old P2TR addr_fmt = 0x23 # new P2TR return b'asci' + usb_show_address(addr_fmt, subpath=args[4:]) if cmd == 'enrl': # Enroll new xpubkey to be involved in multisigs. # - text config file must already be uploaded file_len, file_sha = unpack_from('= total_size and not hsm_active: # probably done dis.progress_bar_show(1.0) ux.restore_menu() return offset def handle_xpub(self, subpath): # Share the xpub for the indicated subpath. Expects # a text string which is the path derivation. # TODO: might not have a privkey yet from chains import current_chain from utils import cleanup_deriv_path subpath = cleanup_deriv_path(subpath) from glob import hsm_active if hsm_active and not hsm_active.approve_xpub_share(subpath): raise HSMDenied chain = current_chain() with stash.SensitiveValues() as sv: node = sv.derive_path(subpath) xpub = chain.serialize_public(node) return b'asci' + xpub.encode() def handle_bag_number(self, bag_num): import version, callgate from glob import dis, settings if bag_num and version.is_factory_mode and not version.has_qr: # check state first assert settings.get('tested', False) assert pa.is_blank() assert 8 <= len(bag_num) < 32 # do the change failed = callgate.set_bag_number(bag_num) assert not failed callgate.set_rdp_level(2) pa.greenlight_firmware() dis.fullscreen(bytes(bag_num).decode()) self.call_after(callgate.show_logout, 1) # always report the existing/new value val = callgate.get_bag_number() or b'' return b'asci' + val class EmulatedKeyboard: # be a context manager, used during kbd emulation # page 88+ char_map = { "a": 0x04, "b": 0x05, "c": 0x06, "d": 0x07, "e": 0x08, "f": 0x09, "g": 0x0A, "h": 0x0B, "i": 0x0C, "j": 0x0D, "k": 0x0E, "l": 0x0F, "m": 0x10, "n": 0x11, "o": 0x12, "p": 0x13, "q": 0x14, "r": 0x15, "s": 0x16, "t": 0x17, "u": 0x18, "v": 0x19, "w": 0x1A, "x": 0x1B, "y": 0x1C, # qwerty "z": 0x1D, # numbers (top row - USA centric) "1": 0x1e, "2": 0x1f, "3": 0x20, "4": 0x21, "5": 0x22, "6": 0x23, "7": 0x24, "8": 0x25, "9": 0x26, "0": 0x27, " ": 0x2C, # spacebar # HACK: Keypad symbols work w/o concern for language, shift state "/": 0x54, "*": 0x55, "-": 0x56, "+": 0x57, # Keyboard Enter "\r": 0x28, } def __enter__(self): return self async def connect(self): # can be slow; needs to wait until host has enumerated us # - does UX for that # - can fail when host doesn't want to enumerate us, shows msg # - returns T if problem from glob import dis dis.fullscreen("Switching...") if is_simulator(): return # if USB was enabled, reset is needed disable_usb() enable_keyboard_emulation() # wait for emeration, with timeout self.dev = pyb.USB_HID() # macOS at least: need twice to be sure, maybe more! all_up = bytes(8) # harmless "all keys are up" keyboard report fails = 0 for keyups in range(3): for retry in range(100): rv = self.dev.send(all_up) if rv == 8: break await sleep_ms(5) else: fails += 1 if fails < 3: return False # if we are connected to a COLDPOWER, for example, this will happen. from ux import ux_show_story await ux_show_story("USB Host computer, if any, is not communicating with us.", title="FAILED") return True def __exit__(self, exc_type, exc_val, exc_tb): # disable keyboard emulation mode (all USB) from glob import settings disable_usb() if not settings.get("du"): # enable usb only if it was previously enabled, otherwise keep disabled enable_usb() @classmethod def can_type(cls, s): # do we know how to type all the chars in "s" # .lower() has no effect on symbols, numbers, and escapes return all(ch.lower() in cls.char_map for ch in s) async def send_keystrokes(self, keystroke_string): from glob import dis # Send keystrokes to enter a password... only expected to support Base64 charset if is_simulator(): print("Simulating keystrokes: " + repr(keystroke_string)) return buf = bytearray(8) pwd_len = len(keystroke_string) dis.fullscreen('Typing...') for i, ch in enumerate(keystroke_string, start=1): cap = False to_press = self.char_map.get(ch, None) if to_press is None: cap = True to_press = self.char_map.get(ch.lower(), None) if to_press is None: # problem: we don't know how to type this char to_press = 0x1B # X buf[2] = to_press if cap: # set LEFT SHIFT for capital letters buf[0] = 0x02 while self.dev.send(buf) == 0: await sleep_ms(5) # all keys up buf[2] = 0x00 buf[0] = 0x00 while self.dev.send(buf) == 0: await sleep_ms(5) dis.progress_bar_show(i/pwd_len) # EOF