#!/usr/bin/env python # # To use this, install with: # # pip install --editable . # # That will create the command "ckcc" in your path. # # Background: # - see for HID api # # import hid, click, sys, os, pdb, struct, time, io from pprint import pformat from binascii import b2a_hex, a2b_hex from hashlib import sha256 from base64 import b64encode from functools import wraps from ckcc.protocol import CCProtocolPacker, CCProtocolUnpacker, CCProtoError from ckcc.constants import MAX_MSG_LEN, MAX_BLK_LEN from ckcc.constants import AF_P2WPKH, AF_CLASSIC, AF_P2WPKH_P2SH from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID from ckcc.sigheader import FW_HEADER_SIZE, FW_HEADER_OFFSET, FW_HEADER_MAGIC from ckcc.utils import dfu_parse global force_serial force_serial = None # Options we want for all commands @click.group() @click.option('--serial', '-s', default=None, metavar="HEX", help="Operate on specific unit (default: first found)") @click.option('--simulator', '-x', default=False, is_flag=True, help="Connect to the simulator via Unix socket") def main(serial, simulator): global force_serial force_serial = serial if simulator: force_serial = '/tmp/ckcc-simulator.sock' def display_errors(f): # clean-up display of errors from Coldcard @wraps(f) def wrapper(*args, **kws): try: return f(*args, **kws) except CCProtoError as exc: click.echo("\n%s\n" % str(exc.args[0])) sys.exit(1) return wrapper @main.command() def debug(): "Start interactive (local) debug session" import code import readline import atexit import os class HistoryConsole(code.InteractiveConsole): def __init__(self, locals=None, filename="", histfile=os.path.expanduser("~/.console-history")): code.InteractiveConsole.__init__(self, locals, filename) self.init_history(histfile) def init_history(self, histfile): readline.parse_and_bind("tab: complete") if hasattr(readline, "read_history_file"): try: readline.read_history_file(histfile) except IOError: pass atexit.register(self.save_history, histfile) def save_history(self, histfile): readline.write_history_file(histfile) # useful stuff import pdb from pdb import pm CC = ColdcardDevice(sn=force_serial, encrypt=False) SR = CC.send_recv cli = HistoryConsole(locals=dict(globals(), **locals())) cli.interact(banner="Go for it: 'CC' is the connected device, SR=CC.send_recv", exitmsg='') @main.command('list') def _list(): "List all attached Coldcard devices" count = 0 for info in hid.enumerate(COINKITE_VID, CKCC_PID): click.echo("\nColdcard {serial_number}:\n{nice}".format( nice=pformat(info, indent=4)[1:-1], **info)) count += 1 if not count: click.echo("(none found)") @main.command() def logout(): "Securely logout of device (will require replug to start over)" dev = ColdcardDevice(sn=force_serial) resp = dev.send_recv(CCProtocolPacker.logout()) print("Device says: %r" % resp if resp else "Okay!") @main.command() def reboot(): "Reboot coldcard, force relogin and start over" dev = ColdcardDevice(sn=force_serial) resp = dev.send_recv(CCProtocolPacker.reboot()) print("Device says: %r" % resp if resp else "Okay!") @main.command('bag') @click.option('--number', '-n', metavar='BAG_NUMBER', default=None) def bag_number(number): "Factory: set or read bag number -- single use only!" dev = ColdcardDevice(sn=force_serial) nn = b'' if not number else number.encode('ascii') resp = dev.send_recv(CCProtocolPacker.bag_number(nn)) print("Bag number: %r" % resp) @main.command('test') @click.option('--single', '-s', default=None, type=click.IntRange(0,255), help='If set, use this value on wire.') def usb_test(single): "Test USB connection (debug/dev)" dev = ColdcardDevice(sn=force_serial) rng = [] rng.extend(range(55, 66)) # buggy lengths are around 64 rng.extend(range(1013, 1024)) # we have 4 bytes of overhead (args) for ping cmd, so this will be max-length rng.extend(range(MAX_MSG_LEN-10, MAX_MSG_LEN-4)) #print(repr(rng)) for i in rng: print("Ping with length: %d" % i, end='') body = os.urandom(i) if single is None else bytes([single]*i) rb = dev.send_recv(CCProtocolPacker.ping(body)) assert rb == body, "Fail @ len: %d, got back %d bytes\n%r !=\n%r" % ( i, len(rb), b2a_hex(body), b2a_hex(rb)) print(" Okay") def real_file_upload(fd, blksize=MAX_BLK_LEN, do_upgrade=False, do_reboot=True, dev=None): dev = dev or ColdcardDevice(sn=force_serial) # learn size (portable way) offset = 0 sz = fd.seek(0, 2) fd.seek(0) if do_upgrade: # Unwrap DFU contents, if needed. Also handles raw binary file. try: if fd.read(5) == b'DfuSe': # expecting a DFU-wrapped file. fd.seek(0) offset, sz, *_ = dfu_parse(fd) else: # assume raw binary pass assert sz % 256 == 0, "un-aligned size: %s" % sz fd.seek(offset+FW_HEADER_OFFSET) hdr = fd.read(FW_HEADER_SIZE) magic = struct.unpack_from(" # not enforcing policy here on msg contents, so we can define that on product message = message.encode('ascii') ok = dev.send_recv(CCProtocolPacker.sign_message(message, path, addr_fmt), timeout=None) assert ok == None print("Waiting for OK on the Coldcard...", end='', file=sys.stderr) sys.stderr.flush() while 1: time.sleep(0.250) done = dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None) if done == None: continue break print("\r \r", end='', file=sys.stderr) sys.stderr.flush() if len(done) != 2: click.echo('Failed: %r' % done) sys.exit(1) addr, raw = done sig = str(b64encode(raw), 'ascii').replace('\n', '') if just_sig: click.echo(str(sig)) elif verbose: click.echo('-----BEGIN SIGNED MESSAGE-----\n{msg}\n-----BEGIN ' 'SIGNATURE-----\n{addr}\n{sig}\n-----END SIGNED MESSAGE-----'.format( msg=message.decode('ascii'), addr=addr, sig=sig)) else: click.echo('%s\n%s\n%s' % (message.decode('ascii'), addr, sig)) def wait_and_download(dev, req, fn): # Wait for user action on the device... by polling w/ indicated request # - also download resulting file print("Waiting for OK on the Coldcard...", end='', file=sys.stderr) sys.stderr.flush() while 1: time.sleep(0.250) done = dev.send_recv(req, timeout=None) if done == None: continue break print("\r \r", end='', file=sys.stderr) sys.stderr.flush() if len(done) != 2: click.echo('Failed: %r' % done) sys.exit(1) result_len, result_sha = done # download the result. click.echo("Ok! Downloading result (%d bytes)" % result_len) result = dev.download_file(result_len, result_sha, file_number=fn) return result, result_sha @main.command('sign') @click.argument('psbt_in', type=click.File('rb')) @click.argument('psbt_out', type=click.File('wb')) @click.option('--verbose', '-v', is_flag=True, help='Show more details') @click.option('--finalize', '-f', is_flag=True, help='Show final signed transaction, ready for transmission') #@click.option('--just-txn', '-t', is_flag=True, help='Just the final transaction itself, nothing more') @display_errors def sign_transaction(psbt_in, psbt_out, verbose=False, hex_mode=False, finalize=True): "Approve a spending transaction (by signing it on Coldcard)" dev = ColdcardDevice(sn=force_serial) dev.check_mitm() # not enforcing policy here on msg contents, so we can define that on product taste = psbt_in.read(10) psbt_in.seek(0) if taste == b'70736274ff': # hex encoded; make binary psbt_in = io.BytesIO(a2b_hex(psbt_in.read())) hex_mode = True elif taste[0:5] != b'psbt\xff': click.echo("File doesn't have PSBT magic number at start.") sys.exit(1) # upload the transaction txn_len, sha = real_file_upload(psbt_in, dev=dev) # start the signing process ok = dev.send_recv(CCProtocolPacker.sign_transaction(txn_len, sha), timeout=None) assert ok == None result, _ = wait_and_download(dev, CCProtocolPacker.get_signed_txn(), 1) if finalize: # assume(?) transaction is completely signed, and output the # bitcoin transaction to be sent. # XXX maybe do this on embedded side, when txn is final? # XXX otherwise, need to parse PSBT and also handle combining properly pass # save it psbt_out.write(b2a_hex(result) if hex_mode else result) @main.command('backup') @click.option('--outdir', '-d', type=click.Path(exists=True,dir_okay=True, file_okay=False, writable=True), help="Save into indicated directory (auto filename)", default='.') @click.option('--outfile', '-o', metavar="filename.7z", help="Name for backup file", default=None, type=click.File('wb')) #@click.option('--verbose', '-v', is_flag=True, help='Show more details') @display_errors def start_backup(outdir, outfile, verbose=False): '''Prompts user to remember a massive pass phrase and then \ downloads AES-encrypted data backup. By default, saves into current directory using \ a filename based on the date.''' dev = ColdcardDevice(sn=force_serial) dev.check_mitm() ok = dev.send_recv(CCProtocolPacker.start_backup()) assert ok == None result, chk = wait_and_download(dev, CCProtocolPacker.get_backup_file(), 0) if outfile: outfile.write(result) outfile.close() fn = outfile.name else: assert outdir # pick a useful filename, if they gave a dirname fn = os.path.join(outdir, time.strftime('backup-%Y%m%d-%H%M.7z')) open(fn, 'wb').write(result) click.echo("Wrote %d bytes into: %s\nSHA256: %s" % (len(result), fn, str(b2a_hex(chk), 'ascii'))) @main.command('addr') @click.option('--path', '-p', default=BIP44_FIRST, help='Derivation for key to show') @click.option('--segwit', '-s', is_flag=True, help='Show in segwit native (p2wpkh, bech32)') @click.option('--wrap', '-w', is_flag=True, help='Show in segwit wrapped in P2SH (p2wpkh)') @click.option('--quiet', '-q', is_flag=True, help='Show less details; just the address') def show_address(path, quiet=False, segwit=False, wrap=False): "Show the human version of an address" dev = ColdcardDevice(sn=force_serial) if wrap: addr_fmt = AF_P2WPKH_P2SH elif segwit: addr_fmt = AF_P2WPKH else: addr_fmt = AF_CLASSIC addr = dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None) if quiet: click.echo(addr) else: click.echo('Displaying address:\n\n%s\n' % addr) # EOF