ckcc-protocol/ckcc/cli.py
2022-02-28 09:09:08 -05:00

1105 lines
39 KiB
Python
Executable File

#!/usr/bin/env python
#
# (c) Copyright 2021 by Coinkite Inc. This file is covered by license found in COPYING-CC.
#
# To use this, install with:
#
# pip install --editable .
#
# That will create the command "ckcc" in your path.
#
# Background:
# - see <https://github.com/trezor/cython-hidapi/blob/master/hid.pyx> for HID api
#
#
import hid, click, sys, os, pdb, struct, time, io, re, json, contextlib
from pprint import pformat
from binascii import b2a_hex, a2b_hex
from hashlib import sha256
from base64 import b64encode
from functools import wraps
from base64 import b64decode, b64encode
from ckcc.protocol import CCProtocolPacker, CCProtocolUnpacker
from ckcc.protocol import CCProtoError, CCUserRefused, CCBusyError
from ckcc.constants import MAX_MSG_LEN, MAX_BLK_LEN, MAX_USERNAME_LEN
from ckcc.constants import USER_AUTH_HMAC, USER_AUTH_TOTP, USER_AUTH_HOTP, USER_AUTH_SHOW_QR
from ckcc.constants import AF_CLASSIC, AF_P2SH, AF_P2WPKH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH
from ckcc.constants import STXN_FINALIZE, STXN_VISUALIZE, STXN_SIGNED
from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID
from ckcc.sigheader import FW_HEADER_SIZE, FW_HEADER_OFFSET, FW_HEADER_MAGIC
from ckcc.utils import dfu_parse, calc_local_pincode, xfp2str, B2A
from ckcc.electrum import filepath_append_cc, convert2cc
global force_serial
force_serial = None
global force_plaintext
force_plaintext = False
# First account, not change, first index for Bitcoin mainnet in BIP44 path
BIP44_FIRST = "m/44'/0'/0'/0/0"
# Cleanup display (supress traceback) for user-feedback exceptions
_sys_excepthook = sys.excepthook
def my_hook(ty, val, tb):
if ty in { CCProtoError, CCUserRefused, CCBusyError }:
print("\n\n%s" % val, file=sys.stderr)
else:
return _sys_excepthook(ty, val, tb)
sys.excepthook=my_hook
@contextlib.contextmanager
def get_device(optional=False):
# Open connection to Coldcard as a context with auto-close
try:
device = ColdcardDevice(sn=force_serial, encrypt=not force_plaintext)
except KeyError:
if not optional:
raise
device = None
yield device
if device:
device.close()
# Options we want for all commands
@click.group()
@click.option('--serial', '-s', default=None, metavar="HEX",
help="Operate on specific unit (default: first found)")
@click.option('--simulator', '-x', default=False, is_flag=True,
help="Connect to the simulator via Unix socket")
@click.option('--plaintext', '-P', default=False, is_flag=True,
help="Disable USB link-layer encryption")
def main(serial, simulator, plaintext):
global force_serial, force_plaintext
force_serial = serial
force_plaintext = plaintext
if simulator:
force_serial = '/tmp/ckcc-simulator.sock'
def display_errors(f):
# clean-up display of errors from Coldcard
@wraps(f)
def wrapper(*args, **kws):
try:
return f(*args, **kws)
except CCProtoError as exc:
click.echo("\n%s\n" % str(exc.args[0]))
sys.exit(1)
return wrapper
@main.command()
def debug():
"""Start interactive (local) debug session"""
import code
import readline
import atexit
import os
class HistoryConsole(code.InteractiveConsole):
def __init__(self, locals=None, filename="<console>",
histfile=os.path.expanduser("~/.console-history")):
code.InteractiveConsole.__init__(self, locals, filename)
self.init_history(histfile)
def init_history(self, histfile):
readline.parse_and_bind("tab: complete")
if hasattr(readline, "read_history_file"):
try:
readline.read_history_file(histfile)
except IOError:
pass
atexit.register(self.save_history, histfile)
def save_history(self, histfile):
readline.write_history_file(histfile)
# useful stuff
import pdb
from pdb import pm
CC = ColdcardDevice(sn=force_serial, encrypt=False)
SR = CC.send_recv
cli = HistoryConsole(locals=dict(globals(), **locals()))
cli.interact(banner="Go for it: 'CC' is the connected device, SR=CC.send_recv", exitmsg='')
@main.command('list')
def _list():
"""List all attached Coldcard devices"""
count = 0
for info in hid.enumerate(COINKITE_VID, CKCC_PID):
click.echo("\nColdcard {serial_number}:\n{nice}".format(
nice=pformat(info, indent=4)[1:-1], **info))
count += 1
if not count:
click.echo("(none found)")
@main.command()
def logout():
"""Securely logout of device (will require replug to start over)"""
with get_device() as dev:
resp = dev.send_recv(CCProtocolPacker.logout())
print("Device says: %r" % resp if resp else "Okay!")
@main.command()
def reboot():
"""Reboot coldcard, force relogin and start over"""
with get_device() as dev:
resp = dev.send_recv(CCProtocolPacker.reboot())
print("Device says: %r" % resp if resp else "Okay!")
@main.command('bag')
@click.option('--number', '-n', metavar='BAG_NUMBER', default=None)
def bag_number(number):
"""Factory: set or read bag number -- single use only!"""
with get_device() as dev:
nn = b'' if not number else number.encode('ascii')
resp = dev.send_recv(CCProtocolPacker.bag_number(nn))
print("Bag number: %r" % resp)
@main.command('test')
@click.option('--single', '-s', default=None,
type=click.IntRange(0,255), help='If set, use this value on wire.')
def usb_test(single):
"""Test USB connection (debug/dev)"""
with get_device() as dev:
rng = []
rng.extend(range(55, 66)) # buggy lengths are around 64
rng.extend(range(1013, 1024))
# we have 4 bytes of overhead (args) for ping cmd, so this will be max-length
rng.extend(range(MAX_MSG_LEN-10, MAX_MSG_LEN-4))
#print(repr(rng))
for i in rng:
print("Ping with length: %d" % i, end='')
body = os.urandom(i) if single is None else bytes([single]*i)
rb = dev.send_recv(CCProtocolPacker.ping(body))
assert rb == body, "Fail @ len: %d, got back %d bytes\n%r !=\n%r" % (
i, len(rb), b2a_hex(body), b2a_hex(rb))
print(" Okay")
def real_file_upload(fd, dev, blksize=MAX_BLK_LEN, do_upgrade=False, do_reboot=True):
# learn size (portable way)
offset = 0
sz = fd.seek(0, 2)
fd.seek(0)
if do_upgrade:
# Unwrap DFU contents, if needed. Also handles raw binary file.
try:
if fd.read(5) == b'DfuSe':
# expecting a DFU-wrapped file.
fd.seek(0)
offset, sz, *_ = dfu_parse(fd)
else:
# assume raw binary
pass
assert sz % 256 == 0, "un-aligned size: %s" % sz
fd.seek(offset+FW_HEADER_OFFSET)
hdr = fd.read(FW_HEADER_SIZE)
magic = struct.unpack_from("<I", hdr)[0]
#print("hdr @ 0x%x: %s" % (FW_HEADER_OFFSET, b2a_hex(hdr)))
except Exception:
magic = None
if magic != FW_HEADER_MAGIC:
click.echo("This does not look like a firmware file! Bad magic value.")
sys.exit(1)
fd.seek(offset)
click.echo("%d bytes (start @ %d) to send from %r" % (sz, fd.tell(),
os.path.basename(fd.name) if hasattr(fd, 'name') else 'memory'), err=1)
left = sz
chk = sha256()
with click.progressbar(range(0, sz, blksize), label="Uploading") as bar:
for pos in bar:
here = fd.read(min(blksize, left))
if not here: break
left -= len(here)
result = dev.send_recv(CCProtocolPacker.upload(pos, sz, here))
assert result == pos, "Got back: %r" % result
chk.update(here)
# do a verify
expect = chk.digest()
result = dev.send_recv(CCProtocolPacker.sha256())
assert len(result) == 32
if result != expect:
click.echo("Wrong checksum:\nexpect: %s\n got: %s"
% (b2a_hex(expect).decode('ascii'), b2a_hex(result).decode('ascii')), err=1)
sys.exit(1)
if not do_upgrade:
return sz, expect
# AFTER fully uploaded and verified, write a copy of the signature header
# onto the end of flash. Bootrom uses this to check entire file uploaded.
result = dev.send_recv(CCProtocolPacker.upload(sz, sz+FW_HEADER_SIZE, hdr))
assert result==sz, "failed to write trailer"
# check also SHA after that!
chk.update(hdr)
expect = chk.digest()
final_chk = dev.send_recv(CCProtocolPacker.sha256())
assert expect == final_chk, "Checksum mismatch after all that?"
if do_reboot:
click.echo("Upgrade started. Observe Coldcard screen for progress.", err=1)
dev.send_recv(CCProtocolPacker.reboot())
@main.command('upload')
@click.argument('filename', type=click.File('rb'))
@click.option('--blksize', default=MAX_BLK_LEN,
type=click.IntRange(256, MAX_BLK_LEN), help='Block size to use (testing)')
@click.option('--multisig', '-m', default=False, is_flag=True,
help='Attempt multisig enroll using file')
def file_upload(filename, blksize, multisig=False):
"""Send file to Coldcard (PSBT transaction or firmware)"""
# NOTE: mostly for debug/dev usage.
with get_device() as dev:
file_len, sha = real_file_upload(filename, dev, blksize=blksize)
if multisig:
dev.send_recv(CCProtocolPacker.multisig_enroll(file_len, sha))
@main.command('upgrade')
@click.argument('filename', type=click.File('rb'), metavar="FIRMWARE.dfu",
default='../stm32/firmware-signed.dfu')
@click.option('--stop-early', '-s', default=False, is_flag=True, help='Stop just before reboot')
def firmware_upgrade(filename, stop_early):
"""Send firmware file (.dfu) and trigger upgrade process"""
with get_device() as dev:
real_file_upload(filename, dev, do_upgrade=True, do_reboot=(not stop_early))
@main.command('xpub')
@click.argument('subpath', default='m')
def get_xpub(subpath):
"""Get the XPUB for this wallet (master level, or any derivation)"""
with get_device() as dev:
if len(subpath) == 1:
if subpath[0] == 'bip44':
subpath = BIP44_FIRST
xpub = dev.send_recv(CCProtocolPacker.get_xpub(subpath), timeout=None)
click.echo(xpub)
@main.command('pubkey')
@click.argument('subpath', default='m')
def get_pubkey(subpath):
"""
Get the public key for a derivation path
Dump 33-byte (compressed, SEC encoded) public key value.
"""
try:
from pycoin.key.BIP32Node import BIP32Node
except Exception:
raise click.Abort("pycoin must be installed, not found.")
with get_device() as dev:
xpub = dev.send_recv(CCProtocolPacker.get_xpub(subpath), timeout=None)
node = BIP32Node.from_hwif(xpub)
click.echo(b2a_hex(node.sec()))
@main.command('xfp')
@click.option('--swab', '-s', is_flag=True, help='Reverse endian of result (32-bit)')
def get_fingerprint(swab):
"""Get the fingerprint for this wallet (master level)"""
with get_device() as dev:
xfp = dev.master_fingerprint
assert xfp
if swab:
# this is how we used to show XFP values: LE32 hex with 0x in front.
click.echo('0x%08x' % xfp)
else:
# network order = BE32 = top 32-bits of hash160(pubkey) = 4 bytes in bip32 serialization
click.echo(xfp2str(xfp))
@main.command('version')
def get_version():
"""Get the version of the firmware installed"""
with get_device() as dev:
v = dev.send_recv(CCProtocolPacker.version())
click.echo(v)
@main.command('chain')
def get_block_chain():
"""
Get which blockchain (Bitcoin/Testnet) is configured.
BTC=>Bitcoin or XTN=>Bitcoin Testnet
"""
with get_device() as dev:
code = dev.send_recv(CCProtocolPacker.block_chain())
click.echo(code)
@main.command('eval')
@click.argument('stmt', nargs=-1)
def run_eval(stmt):
"""Simulator only: eval a python statement"""
with get_device() as dev:
stmt = ' '.join(stmt)
v = dev.send_recv(b'EVAL' + stmt.encode('utf-8'))
click.echo(v)
@main.command('exec')
@click.argument('stmt', nargs=-1)
def run_eval(stmt):
"""Simulator only: exec a python script"""
with get_device() as dev:
stmt = ' '.join(stmt)
v = dev.send_recv(b'EXEC' + stmt.encode('utf-8'))
click.echo(v)
@main.command('msg')
@click.argument('message')
@click.option('--path', '-p', default=BIP44_FIRST,
help=f'Derivation for key to use [default: {BIP44_FIRST}]', metavar="DERIVATION")
@click.option('--verbose', '-v', is_flag=True, help='Include fancy ascii armour')
@click.option('--just-sig', '-j', is_flag=True, help='Just the signature itself, nothing more')
@click.option('--segwit', '-s', is_flag=True, help='Address in segwit native (p2wpkh, bech32)')
@click.option('--wrap', '-w', is_flag=True, help='Address in segwit wrapped in P2SH (p2wpkh)')
def sign_message(message, path, verbose=True, just_sig=False, wrap=False, segwit=False):
"""Sign a short text message"""
with get_device() as dev:
if wrap:
addr_fmt = AF_P2WPKH_P2SH
elif segwit:
addr_fmt = AF_P2WPKH
else:
addr_fmt = AF_CLASSIC
# NOTE: initial version of firmware not expected to do segwit stuff right, since
# standard very much still in flux, see: <https://github.com/bitcoin/bitcoin/issues/10542>
# not enforcing policy here on msg contents, so we can define that on product
message = message.encode('ascii') if not isinstance(message, bytes) else message
ok = dev.send_recv(CCProtocolPacker.sign_message(message, path, addr_fmt), timeout=None)
assert ok == None
print("Waiting for OK on the Coldcard...", end='', file=sys.stderr)
sys.stderr.flush()
while 1:
time.sleep(0.250)
done = dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None)
if done == None:
continue
break
print("\r \r", end='', file=sys.stderr)
sys.stderr.flush()
if len(done) != 2:
click.echo('Failed: %r' % done)
sys.exit(1)
addr, raw = done
sig = str(b64encode(raw), 'ascii').replace('\n', '')
if just_sig:
click.echo(str(sig))
elif verbose:
click.echo('-----BEGIN SIGNED MESSAGE-----\n{msg}\n-----BEGIN '
'SIGNATURE-----\n{addr}\n{sig}\n-----END SIGNED MESSAGE-----'.format(
msg=message.decode('ascii'), addr=addr, sig=sig))
else:
click.echo('%s\n%s\n%s' % (message.decode('ascii'), addr, sig))
def wait_and_download(dev, req, fn):
# Wait for user action on the device... by polling w/ indicated request
# - also download resulting file
print("Waiting for OK on the Coldcard...", end='', file=sys.stderr)
sys.stderr.flush()
while 1:
time.sleep(0.250)
done = dev.send_recv(req, timeout=None)
if done == None:
continue
break
print("\r \r", end='', file=sys.stderr)
sys.stderr.flush()
if len(done) != 2:
click.echo('Failed: %r' % done)
sys.exit(1)
result_len, result_sha = done
# download the result.
click.echo("Ok! Downloading result (%d bytes)" % result_len, err=1)
result = dev.download_file(result_len, result_sha, file_number=fn)
return result, result_sha
@main.command('sign')
@click.argument('psbt_in', type=click.File('rb'))
@click.argument('psbt_out', type=click.File('wb'), required=False)
@click.option('--verbose', '-v', is_flag=True, help='Show more details')
@click.option('--finalize', '-f', is_flag=True, help='Show final signed transaction, ready for transmission')
@click.option('--visualize', '-z', is_flag=True, help='Show text of Coldcard\'s interpretation of the transaction (does not create transaction, no interaction needed)')
@click.option('--signed', '-s', is_flag=True, help='Include a signature over visualization text')
@click.option('--hex', '-x', 'hex_mode', is_flag=True, help="Write out (signed) PSBT in hexidecimal")
@click.option('--base64', '-6', 'b64_mode', is_flag=True, help="Write out (signed) PSBT encoded in base64")
@display_errors
def sign_transaction(psbt_in, psbt_out=None, verbose=False, b64_mode=False, hex_mode=False, finalize=False, visualize=False, signed=False):
"""Approve a spending transaction by signing it on Coldcard"""
with get_device() as dev:
dev.check_mitm()
# Handle non-binary encodings, and incorrect files.
taste = psbt_in.read(10)
psbt_in.seek(0)
if taste == b'70736274ff' or taste == b'70736274FF':
# Looks hex encoded; make into binary again
hx = ''.join(re.findall(r'[0-9a-fA-F]*', psbt_in.read().decode('ascii')))
psbt_in = io.BytesIO(a2b_hex(hx))
elif taste[0:6] == b'cHNidP':
# Base64 encoded input
psbt_in = io.BytesIO(b64decode(psbt_in.read()))
elif taste[0:5] != b'psbt\xff':
click.echo("File doesn't have PSBT magic number at start.")
sys.exit(1)
# upload the transaction
txn_len, sha = real_file_upload(psbt_in, dev)
flags = 0x0
if visualize or signed:
flags |= STXN_VISUALIZE
if signed:
flags |= STXN_SIGNED
elif finalize:
flags |= STXN_FINALIZE
# start the signing process
ok = dev.send_recv(CCProtocolPacker.sign_transaction(txn_len, sha, flags=flags), timeout=None)
assert ok is None
# errors will raise here, no need for error display
result, _ = wait_and_download(dev, CCProtocolPacker.get_signed_txn(), 1)
# If 'finalize' is set, we are outputing a bitcoin transaction,
# ready for the p2p network. If the CC wasn't able to finalize it,
# an exception would have occured. Most people will want hex here, but
# resisting the urge to force it.
if visualize:
if psbt_out:
psbt_out.write(result)
else:
click.echo(result, nl=False)
else:
# save it
if hex_mode:
result = b2a_hex(result)
elif b64_mode or (not psbt_out and os.isatty(0)):
result = b64encode(result)
if psbt_out:
psbt_out.write(result)
else:
click.echo(result)
@main.command('backup')
@click.option('--outdir', '-d',
type=click.Path(exists=True,dir_okay=True, file_okay=False, writable=True),
help="Save into indicated directory (auto filename)", default='.')
@click.option('--outfile', '-o', metavar="filename.7z",
help="Name for backup file", default=None,
type=click.File('wb'))
#@click.option('--verbose', '-v', is_flag=True, help='Show more details')
@display_errors
def start_backup(outdir, outfile, verbose=False):
"""
Creates 7z encrypted backup file after prompting user to remember a massive passphrase.
Downloads the AES-encrypted data backup and by default, saves into current directory using
a filename based on today's date.
"""
with get_device() as dev:
dev.check_mitm()
ok = dev.send_recv(CCProtocolPacker.start_backup())
assert ok == None
result, chk = wait_and_download(dev, CCProtocolPacker.get_backup_file(), 0)
if outfile:
outfile.write(result)
outfile.close()
fn = outfile.name
else:
assert outdir
# pick a useful filename, if they gave a dirname
fn = os.path.join(outdir, time.strftime('backup-%Y%m%d-%H%M.7z'))
open(fn, 'wb').write(result)
click.echo("Wrote %d bytes into: %s\nSHA256: %s" % (len(result), fn, str(b2a_hex(chk), 'ascii')))
@main.command('addr')
@click.argument('path', default=BIP44_FIRST, metavar='[m/1/2/3]', required=False)
@click.option('--segwit', '-s', is_flag=True, help='Show in segwit native (p2wpkh, bech32)')
@click.option('--wrap', '-w', is_flag=True, help='Show in segwit wrapped in P2SH (p2wpkh)')
@click.option('--quiet', '-q', is_flag=True, help='Show less details; just the address')
@click.option('--path', '-p', default=BIP44_FIRST, help='Derivation for key to show (or first arg)')
def show_address(path, quiet=False, segwit=False, wrap=False):
"""Show the human version of an address"""
with get_device() as dev:
if wrap:
addr_fmt = AF_P2WPKH_P2SH
elif segwit:
addr_fmt = AF_P2WPKH
else:
addr_fmt = AF_CLASSIC
addr = dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None)
if quiet:
click.echo(addr)
else:
click.echo('Displaying address:\n\n%s\n' % addr)
def str_to_int_path(xfp, path):
# convert text m/34'/33/44 into BIP174 binary compat format
# - include hex for fingerprint (m) as first arg
rv = [struct.unpack('<I', a2b_hex(xfp))[0]]
for i in path.split('/'):
if i == 'm':
continue
if not i:
continue # trailing or duplicated slashes
if i[-1] in "'phHP":
assert len(i) >= 2, i
here = int(i[:-1]) | 0x80000000
else:
here = int(i)
assert 0 <= here < 0x80000000, here
rv.append(here)
return rv
@main.command('p2sh')
@click.argument('script', type=str, nargs=1, required=True)
@click.argument('fingerprints', type=str, nargs=-1, required=True)
@click.option('--segwit', '-s', is_flag=True, help='Show in segwit native (p2wpkh, bech32)')
@click.option('--wrap', '-w', is_flag=True, help='Show as segwit wrapped in P2SH (p2wpkh)')
@click.option('--quiet', '-q', is_flag=True, help='Show less details; just the address')
def show_address(script, fingerprints, quiet=False, segwit=False, wrap=False):
"""
Show a multisig payment address on-screen.
Needs a redeem script and list of fingerprint/path (4369050F/1/0/0 for example).
This is provided as a demo or debug feature. You'll need need some way to
generate the full redeem script (hex), and the fingerprints and paths used to
generate each public key inside that. The order of fingerprint/paths must
match order of pubkeys in the script.
"""
with get_device() as dev:
addr_fmt = AF_P2SH
if segwit:
addr_fmt = AF_P2WSH
if wrap:
addr_fmt = AF_P2WSH_P2SH
script = a2b_hex(script)
N = len(fingerprints)
assert 1 <= N <= 15, "bad N"
min_signers = script[0] - 80
assert 1 <= min_signers <= N, "bad M"
assert script[-1] == 0xAE, "expect script to end with OP_CHECKMULTISIG"
assert script[-2] == 80+N, "second last byte should encode N"
xfp_paths = []
for idx, xfp in enumerate(fingerprints):
assert '/' in xfp, 'Needs a XFP/path: ' + xfp
xfp, p = xfp.split('/', 1)
xfp_paths.append(str_to_int_path(xfp, p))
addr = dev.send_recv(CCProtocolPacker.show_p2sh_address(
min_signers, xfp_paths, script, addr_fmt=addr_fmt), timeout=None)
if quiet:
click.echo(addr)
else:
click.echo('Displaying address:\n\n%s\n' % addr)
@main.command('pass')
@click.argument('passphrase', required=False)
@click.option('--passphrase', prompt=True, hide_input=True,
confirmation_prompt=False)
@click.option('--verbose', '-v', is_flag=True, help='Show new root xpub')
def bip39_passphrase(passphrase, verbose=False):
"""Provide a BIP39 passphrase"""
with get_device() as dev:
dev.check_mitm()
ok = dev.send_recv(CCProtocolPacker.bip39_passphrase(passphrase), timeout=None)
assert ok == None
print("Waiting for OK on the Coldcard...", end='', file=sys.stderr)
sys.stderr.flush()
while 1:
time.sleep(0.250)
done = dev.send_recv(CCProtocolPacker.get_passphrase_done(), timeout=None)
if done == None:
continue
break
print("\r \r", end='', file=sys.stderr)
sys.stderr.flush()
if verbose:
xpub = done
click.echo(xpub)
else:
click.echo('Done.')
@main.command('multisig')
@click.option('--min-signers', '-m', type=int, help='Minimum M signers of N required to approve (default: all)', default=0)
@click.option('--signers', '-n', 'num_signers', type=int, help='N signers in wallet', default=3)
@click.option('--name', '-l', type=str, help='Wallet name on Coldcard', default='Unnamed')
@click.option('--output-file', '-f', type=click.File('wt', lazy=True),
help='Save configuration to file')
@click.option('--verbose', '-v', is_flag=True, help='Show file uploaded')
@click.option('--path', '-p', default="m/45'", help="Derivation for key (default: BIP45 = m/45')")
@click.option('--add', '-a', 'just_add', is_flag=True, help='Just show line required to add this Coldcard')
def enroll_xpub(name, min_signers, path, num_signers, output_file=None, verbose=False, just_add=False):
"""
Create a skeleton file which defines a multisig wallet.
When completed, use with: "ckcc upload -m wallet.txt" or put on SD card.
"""
with get_device() as dev:
dev.check_mitm()
xfp = dev.master_fingerprint
my_xpub = dev.send_recv(CCProtocolPacker.get_xpub(path), timeout=None)
new_line = "%s: %s" % (xfp2str(xfp), my_xpub)
if just_add:
click.echo(new_line)
sys.exit(0)
N = num_signers
if N < min_signers:
N = min_signers
if not (1 <= N < 15):
click.echo("N must be 1..15")
sys.exit(1)
if min_signers == 0:
min_signers = N
if not (1 <= min_signers <= N):
click.echo(f"Minimum number of signers (M) must be between 1 and N={N}")
sys.exit(1)
if not (1 <= len(name) <= 20) or name != str(name.encode('utf8'), 'ascii', 'ignore'):
click.echo("Name must be between 1 and 20 characters of ASCII.")
sys.exit(1)
# render into a template
config = f'name: {name}\npolicy: {min_signers} of {N}\n\n#path: {path}\n{new_line}\n'
if num_signers != 1:
config += '\n'.join(f'#{i+2}# FINGERPRINT: xpub123123123123123' for i in range(num_signers-1))
config += '\n'
if verbose or not output_file:
click.echo(config[:-1])
if output_file:
output_file.write(config)
output_file.close()
click.echo(f"Wrote to: {output_file.name}")
@main.command('hsm-start')
@click.argument('policy', type=click.Path(exists=True,dir_okay=False), metavar="policy.json", required=False)
@click.option('--dry-run', '-n', is_flag=True, help="Just validate file, don't upload")
def hsm_setup(policy=None, dry_run=False):
"""
Enable Hardware Security Module (HSM) mode.
Upload policy file (or use existing policy) and start HSM mode on device. User must approve startup.
All PSBT's will be signed automatically based on that policy.
"""
with get_device() as dev:
dev.check_mitm()
if policy:
if dry_run:
# check it looks reasonable, but jsut a JSON check
raw = open(policy, 'rt').read()
j = json.loads(raw)
click.echo("Policy ok")
sys.exit(0)
file_len, sha = real_file_upload(open(policy, 'rb'), dev)
dev.send_recv(CCProtocolPacker.hsm_start(file_len, sha))
else:
if dry_run:
raise click.UsageError("Dry run not useful without a policy file to check.")
dev.send_recv(CCProtocolPacker.hsm_start())
click.echo("Approve HSM policy on Coldcard screen.")
@main.command('hsm')
def hsm_status():
"""
Get current status of HSM feature.
Is it running, what is the policy (summary only).
"""
with get_device() as dev:
dev.check_mitm()
resp = dev.send_recv(CCProtocolPacker.hsm_status())
o = json.loads(resp)
click.echo(pformat(o))
@main.command('user')
@click.argument('username', type=str, metavar="USERNAME", required=True)
@click.option('--totp', '-t', 'totp_create', is_flag=True, help='Do TOTP and let Coldcard pick secret (default)')
@click.option('--pass', 'pick_pass', is_flag=True, help='Use a password picked by Coldcard')
@click.option('--ask-pass', '-a', is_flag=True, help='Define password here (interactive)')
@click.option('--totp-secret', '-s', help='BASE32 encoded secret for TOTP 2FA method (not great)')
@click.option('--text-secret', '-p', help='Provide password on command line (not great)')
@click.option('--delete', '-d', 'do_delete', is_flag=True, help='Remove a user by name')
@click.option('--show-qr', '-q', is_flag=True, help='Show enroll QR contents (locally)')
@click.option('--hotp', is_flag=True, help='Use HOTP instead of TOTP (dev only)')
def new_user(username, totp_create=False, totp_secret=None, text_secret=None, ask_pass=False,
do_delete=False, debug=False, show_qr=False, hotp=False, pick_pass=False):
"""
Create a new user on the Coldcard for HSM policy (also delete).
You can input a password (interactively), or one can be picked
by the Coldcard. When possible the QR to enrol your 2FA app will
be shown on the Coldcard screen.
"""
from base64 import b32encode, b32decode
username = username.encode('ascii')
assert 1 <= len(username) <= MAX_USERNAME_LEN, "Username length wrong"
with get_device() as dev:
dev.check_mitm()
if do_delete:
dev.send_recv(CCProtocolPacker.delete_user(username))
click.echo('Deleted, if it was there')
return
if ask_pass:
assert not text_secret, "dont give and ask for password"
text_secret = click.prompt('Password (hidden)', hide_input=True, confirmation_prompt=True)
mode = USER_AUTH_HMAC
if totp_secret:
secret = b32decode(totp_secret, casefold=True)
assert len(secret) in {10, 20}
mode = USER_AUTH_TOTP
elif hotp:
mode = USER_AUTH_HOTP
secret = b''
elif pick_pass or text_secret:
mode = USER_AUTH_HMAC
else:
# default is TOTP
secret = b''
mode = USER_AUTH_TOTP
if mode == USER_AUTH_HMAC:
# default is text passwords
secret = dev.hash_password(text_secret.encode('utf8')) if text_secret else b''
assert not show_qr, 'QR not appropriate for text passwords'
if not secret and not show_qr:
# ask the Coldcard to show the QR (for password or TOTP shared secret)
mode |= USER_AUTH_SHOW_QR
new_secret = dev.send_recv(CCProtocolPacker.create_user(username, mode, secret))
if show_qr and new_secret:
# format the URL thing ... needs a spec
username = username.decode('ascii')
secret = new_secret or b32encode(secret).decode('ascii')
mode = 'hotp' if mode == USER_AUTH_HOTP else 'totp'
click.echo(f'otpauth://{mode}/{username}?secret={secret}&issuer=Coldcard%20{dev.serial}')
elif not text_secret and new_secret:
click.echo(f'New password is: {new_secret}')
else:
click.echo('Done')
@main.command('local-conf')
@click.argument('psbt-file', type=click.File('rb'), required=True, metavar="Binary PSBT")
@click.option('--next', '-n', 'next_code', type=str, help='next_local_code from Coldcard (default: ask it)')
def user_auth(psbt_file, next_code=None):
"""
Generate the 6-digit code needed for a specific PSBT file to authorize
it's signing on the Coldcard in HSM mode.
"""
if not next_code:
with get_device() as dev:
dev.check_mitm()
resp = dev.send_recv(CCProtocolPacker.hsm_status())
o = json.loads(resp)
assert o['active'], "Coldcard not in HSM mode"
next_code = o['next_local_code']
psbt_hash = sha256(psbt_file.read()).digest()
rv = calc_local_pincode(psbt_hash, next_code)
print("Local authorization code is:\n\n\t%s\n" % rv)
@main.command('auth')
@click.argument('username', type=str, metavar="USERNAME", required=True)
@click.argument('token', type=str, metavar="[TOTP]", required=False)
@click.option('--psbt-file', '-f', type=click.File('rb'), required=False)
@click.option('--password', '-p', is_flag=True, help="Prompt for password")
@click.option('--debug', '-d', is_flag=True, help='Show values used')
@click.option('--version3', '-3', is_flag=True, help='Support obsolete 3.x.x firmware')
def user_auth(username, token=None, password=None, prompt=None, totp=None, psbt_file=None, debug=False, version3=False):
"""
Indicate specific user is present (for HSM).
Username and 2FA (TOTP, 6-digits) value or password are required. To use
password, the PSBT file in question must be provided.
"""
import time
from hmac import HMAC
from hashlib import pbkdf2_hmac, sha256
dryrun = True
with get_device() as dev:
dev.check_mitm()
if psbt_file or password:
if psbt_file:
psbt_hash = sha256(psbt_file.read()).digest()
dryrun = False
else:
psbt_hash = bytes(32)
pw = token or click.prompt('Password (hidden)', hide_input=True)
secret = dev.hash_password(pw.encode('utf8'), v3=version3)
token = HMAC(secret, msg=psbt_hash, digestmod=sha256).digest()
if debug:
click.echo(" secret = %s" % B2A(secret))
click.echo(" salt = %s" % B2A(salt))
totp_time = 0
else:
if not token:
token = click.prompt('2FA Token (6 digits)', hide_input=False)
if len(token) != 6 or not token.isdigit():
raise click.UsageError("2FA Token must be 6 decimal digits")
token = token.encode('ascii')
now = int(time.time())
if now % 30 < 5:
click.echo("NOTE: TOTP was on edge of expiry limit! Might not work.")
totp_time = now // 30
#raise click.UsageError("Need PSBT file as part of HMAC for password")
assert token and len(token) in {6, 32}
username = username.encode('ascii')
if debug:
click.echo(" username = %s" % username.decode('ascii'))
click.echo(" token = %s" % (B2A(token) if len(token) > 6 else token.decode('ascii')))
click.echo("totp_time = %d" % totp_time)
resp = dev.send_recv(CCProtocolPacker.user_auth(username, token, totp_time))
if not resp:
click.echo("Correct or queued")
else:
click.echo(f'Problem: {resp}')
@main.command('get-locker')
def get_storage_locker():
"""Get the value held in the Storage Locker (not Bitcoin related, reserved for HSM use)"""
with get_device() as dev:
ls = dev.send_recv(CCProtocolPacker.get_storage_locker(), timeout=None)
click.echo(ls)
keystore_keys = ["derivation", "hw_type", "label", "root_fingerprint", "soft_device_id", "xpub"]
@main.command('convert2cc')
@click.argument('file', type=click.Path(exists=True), required=True)
@click.option('--outfile', '-o', type=click.Path(),
help="output file path where adjusted wallet file is written. "
"If this is not specified output is written to <original_file>_cc")
@click.option('--dry-run', '-n', default=False, is_flag=True,
help="do not write files instead pretty print to console")
@click.option('--key', '-k', type=click.Choice(keystore_keys),
help="Multisig wallet keystore dict key based on which to match correct keystore "
"(for example hw_type or root_fingerprint). Option required for "
"multisig wallets if coldcard not connected")
@click.option('--val', '-v', type=str,
help="Multisig wallet value to match for specified key "
"(for example ledger[hw_type] or fffffff0[root_fingerprint])"
"Option required for multisig wallets if coldcard not connected")
def electrum_convert2cc(file, outfile, dry_run, key, val):
"""
Convert existing Electrum wallet file into COLDCARD wallet file.
Your Coldcard does not need to be connected, but it is better
to have it connected because it can be checked whether Coldcard
and wallet file describe the same wallet.
Under the hood this command changes values in keystore dict in
electrum wallet file. 'hw_type' is changed to coldcard.
'soft_device_id' is set to null. 'ckcc_xpub' is added (if
coldcard is connected). And 'label' is built using 'Coldcard'
+ master fingerprint.
If no '--outfile/-o' is specified, new wallet file will be
created in same location with '_cc' suffix.
To convert2cc multisig wallet file, specify --key/--val that
defines the correct keystore (ie. specific co-signer). For
example if one has 2of3 multisig (Ledger, Trezor, Colcdard) and
wants to convert the Trezor signer:
ckcc convert2cc /path/to/multisig_wallet --key hw_type --val trezor
You do not need to define --key/--val if your coldcard is
connected (loaded with correct seed and derivation path of
course). We wil match correct keystore based on root fingerprint
(XFP) obtained from connected Coldcard.
Rationale:
Users may want to switch their hardware wallet vendor. Yet
they want to keep all of their Electrum data (UTXO, labels,
contacts, payment requests...). Another possibility is when
someone's hww gets lost or broken, and they still have a seed,
this seed can be loaded to new Coldcard (check
https://coldcard.com/docs/import) and their previous Electrum
wallet can be converted.
* Does not work with encrypted wallet files - please decrypt first
via Electrum interface.
"""
if file == outfile:
click.echo("'FILE' and '--outfile' cannot be the same")
sys.exit(1)
with get_device(optional=True) as dev:
try:
# open file only for reading and close it immediately after it is loaded into memory
with open(file, "r") as f:
wallet_str = f.read()
new_wallet_str = convert2cc(wallet_str=wallet_str, dev=dev, key=key, val=val)
except json.JSONDecodeError as e:
click.echo("Failed to load wallet file {}".format(e))
sys.exit(1)
except Exception as e:
click.echo("convert2cc failed: {}".format(e))
sys.exit(1)
if dry_run:
click.echo(new_wallet_str)
else:
if outfile is None:
outfile = filepath_append_cc(file)
try:
with open(outfile, "w") as f:
f.write(new_wallet_str)
click.echo("New wallet file created: {}".format(outfile))
except Exception as e:
click.echo("Failed to dump wallet file: {}".format(e))
sys.exit(1)
# EOF