497 lines
16 KiB
Python
Executable File
497 lines
16 KiB
Python
Executable File
#!/usr/bin/env python
|
|
#
|
|
# To use this, install with:
|
|
#
|
|
# pip install --editable .
|
|
#
|
|
# That will create the command "ckcc" in your path.
|
|
#
|
|
# Background:
|
|
# - see <https://github.com/trezor/cython-hidapi/blob/master/hid.pyx> for HID api
|
|
#
|
|
#
|
|
import hid, click, sys, os, pdb, struct, time, io
|
|
from pprint import pformat
|
|
from binascii import b2a_hex, a2b_hex
|
|
from hashlib import sha256
|
|
from base64 import b64encode
|
|
from functools import wraps
|
|
|
|
from ckcc.protocol import CCProtocolPacker, CCProtocolUnpacker, CCProtoError
|
|
from ckcc.constants import MAX_MSG_LEN, MAX_BLK_LEN
|
|
from ckcc.constants import AF_P2WPKH, AF_CLASSIC, AF_P2WPKH_P2SH
|
|
from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID
|
|
from ckcc.sigheader import FW_HEADER_SIZE, FW_HEADER_OFFSET, FW_HEADER_MAGIC
|
|
from ckcc.utils import dfu_parse
|
|
|
|
global force_serial
|
|
force_serial = None
|
|
|
|
# Options we want for all commands
|
|
@click.group()
|
|
@click.option('--serial', '-s', default=None, metavar="HEX",
|
|
help="Operate on specific unit (default: first found)")
|
|
@click.option('--simulator', '-x', default=False, is_flag=True,
|
|
help="Connect to the simulator via Unix socket")
|
|
def main(serial, simulator):
|
|
global force_serial
|
|
force_serial = serial
|
|
|
|
if simulator:
|
|
force_serial = '/tmp/ckcc-simulator.sock'
|
|
|
|
def display_errors(f):
|
|
# clean-up display of errors from Coldcard
|
|
@wraps(f)
|
|
def wrapper(*args, **kws):
|
|
try:
|
|
return f(*args, **kws)
|
|
except CCProtoError as exc:
|
|
click.echo("\n%s\n" % str(exc.args[0]))
|
|
sys.exit(1)
|
|
return wrapper
|
|
|
|
@main.command()
|
|
def debug():
|
|
"Start interactive (local) debug session"
|
|
import code
|
|
import readline
|
|
import atexit
|
|
import os
|
|
|
|
class HistoryConsole(code.InteractiveConsole):
|
|
def __init__(self, locals=None, filename="<console>",
|
|
histfile=os.path.expanduser("~/.console-history")):
|
|
code.InteractiveConsole.__init__(self, locals, filename)
|
|
self.init_history(histfile)
|
|
|
|
def init_history(self, histfile):
|
|
readline.parse_and_bind("tab: complete")
|
|
if hasattr(readline, "read_history_file"):
|
|
try:
|
|
readline.read_history_file(histfile)
|
|
except IOError:
|
|
pass
|
|
atexit.register(self.save_history, histfile)
|
|
|
|
def save_history(self, histfile):
|
|
readline.write_history_file(histfile)
|
|
|
|
# useful stuff
|
|
import pdb
|
|
from pdb import pm
|
|
CC = ColdcardDevice(sn=force_serial, encrypt=False)
|
|
SR = CC.send_recv
|
|
|
|
cli = HistoryConsole(locals=dict(globals(), **locals()))
|
|
cli.interact(banner="Go for it: 'CC' is the connected device, SR=CC.send_recv", exitmsg='')
|
|
|
|
@main.command('list')
|
|
def _list():
|
|
"List all attached Coldcard devices"
|
|
|
|
count = 0
|
|
for info in hid.enumerate(COINKITE_VID, CKCC_PID):
|
|
click.echo("\nColdcard {serial_number}:\n{nice}".format(
|
|
nice=pformat(info, indent=4)[1:-1], **info))
|
|
count += 1
|
|
|
|
if not count:
|
|
click.echo("(none found)")
|
|
|
|
@main.command()
|
|
def logout():
|
|
"Securely logout of device (will require replug to start over)"
|
|
dev = ColdcardDevice(sn=force_serial)
|
|
|
|
resp = dev.send_recv(CCProtocolPacker.logout())
|
|
print("Device says: %r" % resp if resp else "Okay!")
|
|
|
|
@main.command()
|
|
def reboot():
|
|
"Reboot coldcard, force relogin and start over"
|
|
dev = ColdcardDevice(sn=force_serial)
|
|
|
|
resp = dev.send_recv(CCProtocolPacker.reboot())
|
|
print("Device says: %r" % resp if resp else "Okay!")
|
|
|
|
@main.command('bag')
|
|
@click.option('--number', '-n', metavar='BAG_NUMBER', default=None)
|
|
def bag_number(number):
|
|
"Factory: set or read bag number -- single use only!"
|
|
dev = ColdcardDevice(sn=force_serial)
|
|
|
|
nn = b'' if not number else number.encode('ascii')
|
|
|
|
resp = dev.send_recv(CCProtocolPacker.bag_number(nn))
|
|
|
|
print("Bag number: %r" % resp)
|
|
|
|
@main.command('test')
|
|
@click.option('--single', '-s', default=None,
|
|
type=click.IntRange(0,255), help='If set, use this value on wire.')
|
|
def usb_test(single):
|
|
"Test USB connection (debug/dev)"
|
|
dev = ColdcardDevice(sn=force_serial)
|
|
|
|
rng = []
|
|
rng.extend(range(55, 66)) # buggy lengths are around 64
|
|
rng.extend(range(1013, 1024))
|
|
|
|
# we have 4 bytes of overhead (args) for ping cmd, so this will be max-length
|
|
rng.extend(range(MAX_MSG_LEN-10, MAX_MSG_LEN-4))
|
|
|
|
#print(repr(rng))
|
|
|
|
for i in rng:
|
|
print("Ping with length: %d" % i, end='')
|
|
body = os.urandom(i) if single is None else bytes([single]*i)
|
|
rb = dev.send_recv(CCProtocolPacker.ping(body))
|
|
assert rb == body, "Fail @ len: %d, got back %d bytes\n%r !=\n%r" % (
|
|
i, len(rb), b2a_hex(body), b2a_hex(rb))
|
|
print(" Okay")
|
|
|
|
|
|
def real_file_upload(fd, blksize=MAX_BLK_LEN, do_upgrade=False, do_reboot=True, dev=None):
|
|
dev = dev or ColdcardDevice(sn=force_serial)
|
|
|
|
# learn size (portable way)
|
|
offset = 0
|
|
sz = fd.seek(0, 2)
|
|
fd.seek(0)
|
|
|
|
if do_upgrade:
|
|
# Unwrap DFU contents, if needed. Also handles raw binary file.
|
|
try:
|
|
if fd.read(5) == b'DfuSe':
|
|
# expecting a DFU-wrapped file.
|
|
fd.seek(0)
|
|
offset, sz, *_ = dfu_parse(fd)
|
|
else:
|
|
# assume raw binary
|
|
pass
|
|
|
|
assert sz % 256 == 0, "un-aligned size: %s" % sz
|
|
fd.seek(offset+FW_HEADER_OFFSET)
|
|
hdr = fd.read(FW_HEADER_SIZE)
|
|
|
|
magic = struct.unpack_from("<I", hdr)[0]
|
|
#print("hdr @ 0x%x: %s" % (FW_HEADER_OFFSET, b2a_hex(hdr)))
|
|
except:
|
|
magic = None
|
|
|
|
if magic != FW_HEADER_MAGIC:
|
|
click.echo("This does not look like a firmware file! Bad magic value.")
|
|
sys.exit(1)
|
|
|
|
fd.seek(offset)
|
|
|
|
click.echo("%d bytes (start @ %d) to send from %r" % (sz, fd.tell(),
|
|
os.path.basename(fd.name) if hasattr(fd, 'name') else 'memory'))
|
|
|
|
left = sz
|
|
chk = sha256()
|
|
with click.progressbar(range(0, sz, blksize), label="Uploading") as bar:
|
|
for pos in bar:
|
|
here = fd.read(min(blksize, left))
|
|
if not here: break
|
|
left -= len(here)
|
|
result = dev.send_recv(CCProtocolPacker.upload(pos, sz, here))
|
|
assert result == pos, "Got back: %r" % result
|
|
chk.update(here)
|
|
|
|
# do a verify
|
|
expect = chk.digest()
|
|
result = dev.send_recv(CCProtocolPacker.sha256())
|
|
assert len(result) == 32
|
|
if result != expect:
|
|
click.echo("Wrong checksum:\nexpect: %s\n got: %s" % (b2a_hex(expect).decode('ascii'),
|
|
b2a_hex(result).decode('ascii')))
|
|
sys.exit(1)
|
|
|
|
if not do_upgrade:
|
|
return sz, expect
|
|
|
|
# AFTER fully uploaded and verified, write a copy of the signature header
|
|
# onto the end of flash. Bootrom uses this to check entire file uploaded.
|
|
result = dev.send_recv(CCProtocolPacker.upload(sz, sz+FW_HEADER_SIZE, hdr))
|
|
assert result==sz, "failed to write trailer"
|
|
|
|
# check also SHA after that!
|
|
chk.update(hdr)
|
|
expect = chk.digest()
|
|
final_chk = dev.send_recv(CCProtocolPacker.sha256())
|
|
assert expect == final_chk, "Checksum mismatch after all that?"
|
|
|
|
if do_reboot:
|
|
click.echo("Upgrade started. Observe Coldcard screen for progress.")
|
|
dev.send_recv(CCProtocolPacker.reboot())
|
|
|
|
@main.command('upload')
|
|
@click.argument('filename', type=click.File('rb'))
|
|
@click.option('--blksize', default=MAX_BLK_LEN,
|
|
type=click.IntRange(256, MAX_BLK_LEN), help='Block size to use (testing)')
|
|
def file_upload(filename, blksize):
|
|
"Send file to Coldcard (PSBT transaction or firmware)"
|
|
|
|
real_file_upload(filename, blksize)
|
|
|
|
@main.command('upgrade')
|
|
@click.argument('filename', type=click.File('rb'), metavar="FIRMWARE.dfu",
|
|
default='../stm32/firmware-signed.dfu')
|
|
@click.option('--stop-early', '-s', default=False, is_flag=True, help='Stop just before reboot')
|
|
def firmware_upgrade(filename, stop_early):
|
|
"Send firmware file (.dfu) and trigger upgrade process"
|
|
|
|
real_file_upload(filename, do_upgrade=True, do_reboot=(not stop_early))
|
|
|
|
# First account, not change, first index for Bitcoin mainnet in BIP44 path
|
|
BIP44_FIRST = "m/44'/0'/0'/0"
|
|
|
|
@main.command('xpub')
|
|
@click.argument('subpath', default='m')
|
|
def get_xpub(subpath):
|
|
"Get the XPUB for this wallet (master level, or any derivation)"
|
|
|
|
dev = ColdcardDevice(sn=force_serial)
|
|
|
|
if len(subpath) == 1:
|
|
if subpath[0] == 'bip44':
|
|
subpath = BIP44_FIRST
|
|
|
|
xpub = dev.send_recv(CCProtocolPacker.get_xpub(subpath), timeout=None)
|
|
|
|
click.echo(xpub)
|
|
|
|
@main.command('version')
|
|
def get_version():
|
|
"Get the version of the firmware installed"
|
|
|
|
dev = ColdcardDevice(sn=force_serial)
|
|
|
|
v = dev.send_recv(CCProtocolPacker.version())
|
|
|
|
click.echo(v)
|
|
|
|
@main.command('eval')
|
|
@click.argument('stmt', nargs=-1)
|
|
def run_eval(stmt):
|
|
"Simulator only: eval a python statement"
|
|
|
|
dev = ColdcardDevice(sn=force_serial)
|
|
|
|
stmt = ' '.join(stmt)
|
|
|
|
v = dev.send_recv(b'EVAL' + stmt.encode('utf-8'))
|
|
|
|
click.echo(v)
|
|
|
|
@main.command('exec')
|
|
@click.argument('stmt', nargs=-1)
|
|
def run_eval(stmt):
|
|
"Simulator only: exec a python script"
|
|
|
|
dev = ColdcardDevice(sn=force_serial)
|
|
|
|
stmt = ' '.join(stmt)
|
|
|
|
v = dev.send_recv(b'EXEC' + stmt.encode('utf-8'))
|
|
|
|
click.echo(v)
|
|
|
|
@main.command('msg')
|
|
@click.argument('message')
|
|
@click.option('--path', '-p', default=BIP44_FIRST, help='Derivation for key to use')
|
|
@click.option('--verbose', '-v', is_flag=True, help='Include fancy ascii armour')
|
|
@click.option('--just-sig', '-j', is_flag=True, help='Just the signature itself, nothing more')
|
|
@click.option('--segwit', '-s', is_flag=True, help='Address in segwit native (p2wpkh, bech32)')
|
|
@click.option('--wrap', '-w', is_flag=True, help='Address in segwit wrapped in P2SH (p2wpkh)')
|
|
def sign_message(message, path, verbose=True, just_sig=False, wrap=False, segwit=False):
|
|
"Sign a short text message"
|
|
|
|
dev = ColdcardDevice(sn=force_serial)
|
|
|
|
if wrap:
|
|
addr_fmt = AF_P2WPKH_P2SH
|
|
elif segwit:
|
|
addr_fmt = AF_P2WPKH
|
|
else:
|
|
addr_fmt = AF_CLASSIC
|
|
|
|
# NOTE: initial version of firmware not expected to do segwit stuff right, since
|
|
# standard very much still in flux, see: <https://github.com/bitcoin/bitcoin/issues/10542>
|
|
|
|
# not enforcing policy here on msg contents, so we can define that on product
|
|
message = message.encode('ascii')
|
|
|
|
ok = dev.send_recv(CCProtocolPacker.sign_message(message, path, addr_fmt), timeout=None)
|
|
assert ok == None
|
|
|
|
print("Waiting for OK on the Coldcard...", end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
while 1:
|
|
time.sleep(0.250)
|
|
done = dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None)
|
|
if done == None:
|
|
continue
|
|
|
|
break
|
|
|
|
print("\r \r", end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
if len(done) != 2:
|
|
click.echo('Failed: %r' % done)
|
|
sys.exit(1)
|
|
|
|
addr, raw = done
|
|
|
|
sig = str(b64encode(raw), 'ascii').replace('\n', '')
|
|
|
|
if just_sig:
|
|
click.echo(str(sig))
|
|
elif verbose:
|
|
click.echo('-----BEGIN SIGNED MESSAGE-----\n{msg}\n-----BEGIN '
|
|
'SIGNATURE-----\n{addr}\n{sig}\n-----END SIGNED MESSAGE-----'.format(
|
|
msg=message.decode('ascii'), addr=addr, sig=sig))
|
|
else:
|
|
click.echo('%s\n%s\n%s' % (message.decode('ascii'), addr, sig))
|
|
|
|
def wait_and_download(dev, req, fn):
|
|
# Wait for user action on the device... by polling w/ indicated request
|
|
# - also download resulting file
|
|
|
|
print("Waiting for OK on the Coldcard...", end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
while 1:
|
|
time.sleep(0.250)
|
|
done = dev.send_recv(req, timeout=None)
|
|
if done == None:
|
|
continue
|
|
break
|
|
|
|
print("\r \r", end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
if len(done) != 2:
|
|
click.echo('Failed: %r' % done)
|
|
sys.exit(1)
|
|
|
|
result_len, result_sha = done
|
|
|
|
# download the result.
|
|
|
|
click.echo("Ok! Downloading result (%d bytes)" % result_len)
|
|
result = dev.download_file(result_len, result_sha, file_number=fn)
|
|
|
|
return result, result_sha
|
|
|
|
@main.command('sign')
|
|
@click.argument('psbt_in', type=click.File('rb'))
|
|
@click.argument('psbt_out', type=click.File('wb'))
|
|
@click.option('--verbose', '-v', is_flag=True, help='Show more details')
|
|
@click.option('--finalize', '-f', is_flag=True, help='Show final signed transaction, ready for transmission')
|
|
#@click.option('--just-txn', '-t', is_flag=True, help='Just the final transaction itself, nothing more')
|
|
@display_errors
|
|
def sign_transaction(psbt_in, psbt_out, verbose=False, hex_mode=False, finalize=True):
|
|
"Approve a spending transaction (by signing it on Coldcard)"
|
|
|
|
dev = ColdcardDevice(sn=force_serial)
|
|
|
|
dev.check_mitm()
|
|
|
|
# not enforcing policy here on msg contents, so we can define that on product
|
|
taste = psbt_in.read(10)
|
|
psbt_in.seek(0)
|
|
if taste == b'70736274ff':
|
|
# hex encoded; make binary
|
|
psbt_in = io.BytesIO(a2b_hex(psbt_in.read()))
|
|
hex_mode = True
|
|
elif taste[0:5] != b'psbt\xff':
|
|
click.echo("File doesn't have PSBT magic number at start.")
|
|
sys.exit(1)
|
|
|
|
# upload the transaction
|
|
txn_len, sha = real_file_upload(psbt_in, dev=dev)
|
|
|
|
# start the signing process
|
|
ok = dev.send_recv(CCProtocolPacker.sign_transaction(txn_len, sha), timeout=None)
|
|
assert ok == None
|
|
|
|
result, _ = wait_and_download(dev, CCProtocolPacker.get_signed_txn(), 1)
|
|
|
|
if finalize:
|
|
# assume(?) transaction is completely signed, and output the
|
|
# bitcoin transaction to be sent.
|
|
# XXX maybe do this on embedded side, when txn is final?
|
|
# XXX otherwise, need to parse PSBT and also handle combining properly
|
|
pass
|
|
|
|
# save it
|
|
psbt_out.write(b2a_hex(result) if hex_mode else result)
|
|
|
|
@main.command('backup')
|
|
@click.option('--outdir', '-d',
|
|
type=click.Path(exists=True,dir_okay=True, file_okay=False, writable=True),
|
|
help="Save into indicated directory (auto filename)", default='.')
|
|
@click.option('--outfile', '-o', metavar="filename.7z",
|
|
help="Name for backup file", default=None,
|
|
type=click.File('wb'))
|
|
#@click.option('--verbose', '-v', is_flag=True, help='Show more details')
|
|
@display_errors
|
|
def start_backup(outdir, outfile, verbose=False):
|
|
'''Prompts user to remember a massive pass phrase and then \
|
|
downloads AES-encrypted data backup. By default, saves into current directory using \
|
|
a filename based on the date.'''
|
|
|
|
dev = ColdcardDevice(sn=force_serial)
|
|
|
|
dev.check_mitm()
|
|
|
|
ok = dev.send_recv(CCProtocolPacker.start_backup())
|
|
assert ok == None
|
|
|
|
result, chk = wait_and_download(dev, CCProtocolPacker.get_backup_file(), 0)
|
|
|
|
if outfile:
|
|
outfile.write(result)
|
|
outfile.close()
|
|
fn = outfile.name
|
|
else:
|
|
assert outdir
|
|
|
|
# pick a useful filename, if they gave a dirname
|
|
fn = os.path.join(outdir, time.strftime('backup-%Y%m%d-%H%M.7z'))
|
|
|
|
open(fn, 'wb').write(result)
|
|
|
|
click.echo("Wrote %d bytes into: %s\nSHA256: %s" % (len(result), fn, str(b2a_hex(chk), 'ascii')))
|
|
|
|
@main.command('addr')
|
|
@click.option('--path', '-p', default=BIP44_FIRST, help='Derivation for key to show')
|
|
@click.option('--segwit', '-s', is_flag=True, help='Show in segwit native (p2wpkh, bech32)')
|
|
@click.option('--wrap', '-w', is_flag=True, help='Show in segwit wrapped in P2SH (p2wpkh)')
|
|
@click.option('--quiet', '-q', is_flag=True, help='Show less details; just the address')
|
|
def show_address(path, quiet=False, segwit=False, wrap=False):
|
|
"Show the human version of an address"
|
|
|
|
dev = ColdcardDevice(sn=force_serial)
|
|
|
|
if wrap:
|
|
addr_fmt = AF_P2WPKH_P2SH
|
|
elif segwit:
|
|
addr_fmt = AF_P2WPKH
|
|
else:
|
|
addr_fmt = AF_CLASSIC
|
|
|
|
addr = dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None)
|
|
|
|
if quiet:
|
|
click.echo(addr)
|
|
else:
|
|
click.echo('Displaying address:\n\n%s\n' % addr)
|
|
|
|
# EOF
|