1423 lines
51 KiB
Python
Executable File
1423 lines
51 KiB
Python
Executable File
#!/usr/bin/env python
|
|
#
|
|
# (c) Copyright 2021 by Coinkite Inc. This file is covered by license found in COPYING-CC.
|
|
#
|
|
# To use this, install with:
|
|
#
|
|
# pip install --editable .
|
|
#
|
|
# That will create the command "ckcc" in your path.
|
|
#
|
|
# Background:
|
|
# - see <https://github.com/trezor/cython-hidapi/blob/master/hid.pyx> for HID api
|
|
#
|
|
#
|
|
import hid, click, sys, os, pdb, struct, time, io, re, json, contextlib, tempfile
|
|
from pprint import pformat
|
|
from binascii import b2a_hex, a2b_hex
|
|
from hashlib import sha256
|
|
from functools import wraps
|
|
from base64 import b64decode, b64encode
|
|
|
|
from ecdsa import VerifyingKey, SECP256k1
|
|
|
|
from ckcc.protocol import CCProtocolPacker, CCProtocolUnpacker
|
|
from ckcc.protocol import CCProtoError, CCUserRefused, CCBusyError
|
|
from ckcc.constants import MAX_MSG_LEN, MAX_BLK_LEN, MAX_USERNAME_LEN, MAX_SIGNERS
|
|
from ckcc.constants import USER_AUTH_HMAC, USER_AUTH_TOTP, USER_AUTH_HOTP, USER_AUTH_SHOW_QR
|
|
from ckcc.constants import AF_P2SH, AF_P2WSH, AF_P2WSH_P2SH
|
|
from ckcc.constants import STXN_FINALIZE, STXN_VISUALIZE, STXN_SIGNED, RFC_SIGNATURE_TEMPLATE
|
|
from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID, DEFAULT_SIM_SOCKET
|
|
from ckcc.sigheader import FW_HEADER_SIZE, FW_HEADER_OFFSET, FW_HEADER_MAGIC
|
|
from ckcc.utils import dfu_parse, calc_local_pincode, xfp2str, B2A, decode_xpub
|
|
from ckcc.utils import get_pubkey_string, descriptor_template, addr_fmt_help, txn_to_pushtx_url
|
|
from ckcc.electrum import filepath_append_cc, convert2cc
|
|
|
|
global force_serial
|
|
force_serial = None
|
|
global force_plaintext
|
|
force_plaintext = False
|
|
|
|
# First account, not change, first index for Bitcoin mainnet in BIP44 path
|
|
BIP44_FIRST = "m/44'/0'/0'/0/0"
|
|
|
|
# Cleanup display (supress traceback) for user-feedback exceptions
|
|
_sys_excepthook = sys.excepthook
|
|
def my_hook(ty, val, tb):
|
|
if ty in { CCProtoError, CCUserRefused, CCBusyError }:
|
|
print("\n\n%s" % val, file=sys.stderr)
|
|
else:
|
|
return _sys_excepthook(ty, val, tb)
|
|
sys.excepthook=my_hook
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def get_device(optional=False):
|
|
# Open connection to Coldcard as a context with auto-close
|
|
try:
|
|
device = ColdcardDevice(sn=force_serial, encrypt=not force_plaintext)
|
|
except KeyError:
|
|
if not optional:
|
|
raise
|
|
device = None
|
|
|
|
yield device
|
|
|
|
if device:
|
|
device.close()
|
|
|
|
|
|
# Options we want for all commands
|
|
@click.group()
|
|
@click.option('--serial', '-s', default=None, metavar="HEX",
|
|
help="Operate on specific unit (default: first found)")
|
|
@click.option('--socket', '-c', type=click.Path(exists=True,dir_okay=False),
|
|
metavar="ckcc-simulator-<pid>.sock", required=False, default=None,
|
|
help="Operate on specific simulator")
|
|
@click.option('--simulator', '-x', default=False, is_flag=True,
|
|
help="Connect to the simulator via Unix socket")
|
|
@click.option('--plaintext', '-P', default=False, is_flag=True,
|
|
help="Disable USB link-layer encryption")
|
|
def main(serial, simulator, plaintext, socket):
|
|
global force_serial, force_plaintext
|
|
force_serial = serial
|
|
force_plaintext = plaintext
|
|
|
|
if simulator or socket:
|
|
force_serial = socket or DEFAULT_SIM_SOCKET
|
|
|
|
def display_errors(f):
|
|
# clean-up display of errors from Coldcard
|
|
@wraps(f)
|
|
def wrapper(*args, **kws):
|
|
try:
|
|
return f(*args, **kws)
|
|
except CCProtoError as exc:
|
|
click.echo("\n%s\n" % str(exc.args[0]))
|
|
sys.exit(1)
|
|
return wrapper
|
|
|
|
|
|
@main.command()
|
|
def debug():
|
|
"""Start interactive (local) debug session"""
|
|
import code
|
|
import readline
|
|
import atexit
|
|
import os
|
|
|
|
class HistoryConsole(code.InteractiveConsole):
|
|
def __init__(self, locals=None, filename="<console>",
|
|
histfile=os.path.expanduser("~/.console-history")):
|
|
code.InteractiveConsole.__init__(self, locals, filename)
|
|
self.init_history(histfile)
|
|
|
|
def init_history(self, histfile):
|
|
readline.parse_and_bind("tab: complete")
|
|
if hasattr(readline, "read_history_file"):
|
|
try:
|
|
readline.read_history_file(histfile)
|
|
except IOError:
|
|
pass
|
|
atexit.register(self.save_history, histfile)
|
|
|
|
def save_history(self, histfile):
|
|
readline.write_history_file(histfile)
|
|
|
|
# useful stuff
|
|
import pdb
|
|
from pdb import pm
|
|
CC = ColdcardDevice(sn=force_serial, encrypt=False)
|
|
SR = CC.send_recv
|
|
|
|
cli = HistoryConsole(locals=dict(globals(), **locals()))
|
|
cli.interact(banner="Go for it: 'CC' is the connected device, SR=CC.send_recv", exitmsg='')
|
|
|
|
|
|
@main.command('list')
|
|
def _list():
|
|
"""List all attached Coldcard devices"""
|
|
|
|
count = 0
|
|
for info in hid.enumerate(COINKITE_VID, CKCC_PID):
|
|
click.echo("\nColdcard {serial_number}:\n{nice}".format(
|
|
nice=pformat(info, indent=4)[1:-1], **info))
|
|
count += 1
|
|
|
|
if not count:
|
|
click.echo("(none found)")
|
|
|
|
|
|
@main.command()
|
|
def logout():
|
|
"""Securely logout of device (will require replug to start over)"""
|
|
with get_device() as dev:
|
|
resp = dev.send_recv(CCProtocolPacker.logout())
|
|
print("Device says: %r" % resp if resp else "Okay!")
|
|
|
|
|
|
@main.command()
|
|
def reboot():
|
|
"""Reboot coldcard, force relogin and start over"""
|
|
with get_device() as dev:
|
|
resp = dev.send_recv(CCProtocolPacker.reboot())
|
|
print("Device says: %r" % resp if resp else "Okay!")
|
|
|
|
|
|
@main.command('bag')
|
|
@click.option('--number', '-n', metavar='BAG_NUMBER', default=None)
|
|
def bag_number(number):
|
|
"""Factory: set or read bag number -- single use only!"""
|
|
with get_device() as dev:
|
|
nn = b'' if not number else number.encode('ascii')
|
|
|
|
resp = dev.send_recv(CCProtocolPacker.bag_number(nn))
|
|
|
|
print("Bag number: %r" % resp)
|
|
|
|
|
|
@main.command('test')
|
|
@click.option('--single', '-s', default=None,
|
|
type=click.IntRange(0,255), help='If set, use this value on wire.')
|
|
def usb_test(single):
|
|
"""Test USB connection (debug/dev)"""
|
|
with get_device() as dev:
|
|
rng = []
|
|
rng.extend(range(55, 66)) # buggy lengths are around 64
|
|
rng.extend(range(1013, 1024))
|
|
|
|
# we have 4 bytes of overhead (args) for ping cmd, so this will be max-length
|
|
rng.extend(range(MAX_MSG_LEN-10, MAX_MSG_LEN-4))
|
|
|
|
#print(repr(rng))
|
|
|
|
for i in rng:
|
|
print("Ping with length: %d" % i, end='')
|
|
body = os.urandom(i) if single is None else bytes([single]*i)
|
|
rb = dev.send_recv(CCProtocolPacker.ping(body))
|
|
assert rb == body, "Fail @ len: %d, got back %d bytes\n%r !=\n%r" % (
|
|
i, len(rb), b2a_hex(body), b2a_hex(rb))
|
|
print(" Okay")
|
|
|
|
|
|
def real_file_upload(fd, dev, blksize=MAX_BLK_LEN, do_upgrade=False, do_reboot=True):
|
|
# learn size (portable way)
|
|
offset = 0
|
|
sz = fd.seek(0, 2)
|
|
fd.seek(0)
|
|
|
|
if do_upgrade:
|
|
# Unwrap DFU contents, if needed. Also handles raw binary file.
|
|
try:
|
|
if fd.read(5) == b'DfuSe':
|
|
# expecting a DFU-wrapped file.
|
|
fd.seek(0)
|
|
offset, sz, *_ = dfu_parse(fd)
|
|
else:
|
|
# assume raw binary
|
|
pass
|
|
|
|
assert sz % 256 == 0, "un-aligned size: %s" % sz
|
|
fd.seek(offset+FW_HEADER_OFFSET)
|
|
hdr = fd.read(FW_HEADER_SIZE)
|
|
|
|
magic = struct.unpack_from("<I", hdr)[0]
|
|
#print("hdr @ 0x%x: %s" % (FW_HEADER_OFFSET, b2a_hex(hdr)))
|
|
except Exception:
|
|
magic = None
|
|
|
|
if magic != FW_HEADER_MAGIC:
|
|
click.echo("This does not look like a firmware file! Bad magic value.")
|
|
sys.exit(1)
|
|
|
|
fd.seek(offset)
|
|
|
|
click.echo("%d bytes (start @ %d) to send from %r" % (sz, fd.tell(),
|
|
os.path.basename(fd.name) if hasattr(fd, 'name') else 'memory'), err=1)
|
|
|
|
left = sz
|
|
chk = sha256()
|
|
with click.progressbar(range(0, sz, blksize), label="Uploading") as bar:
|
|
for pos in bar:
|
|
here = fd.read(min(blksize, left))
|
|
if not here: break
|
|
left -= len(here)
|
|
result = dev.send_recv(CCProtocolPacker.upload(pos, sz, here))
|
|
assert result == pos, f"Got back: {result}"
|
|
chk.update(here)
|
|
|
|
# do a verify
|
|
expect = chk.digest()
|
|
result = dev.send_recv(CCProtocolPacker.sha256())
|
|
assert len(result) == 32
|
|
if result != expect:
|
|
click.echo("Wrong checksum:\nexpect: %s\n got: %s"
|
|
% (b2a_hex(expect).decode('ascii'), b2a_hex(result).decode('ascii')), err=1)
|
|
sys.exit(1)
|
|
|
|
if not do_upgrade:
|
|
return sz, expect
|
|
|
|
# AFTER fully uploaded and verified, write a copy of the signature header
|
|
# onto the end of flash. Bootrom uses this to check entire file uploaded.
|
|
result = dev.send_recv(CCProtocolPacker.upload(sz, sz+FW_HEADER_SIZE, hdr))
|
|
assert result==sz, "failed to write trailer"
|
|
|
|
# check also SHA after that!
|
|
chk.update(hdr)
|
|
expect = chk.digest()
|
|
final_chk = dev.send_recv(CCProtocolPacker.sha256())
|
|
assert expect == final_chk, "Checksum mismatch after all that?"
|
|
|
|
if do_reboot:
|
|
click.echo("Upgrade started. Observe Coldcard screen for progress.", err=1)
|
|
dev.send_recv(CCProtocolPacker.reboot())
|
|
|
|
|
|
@main.command('upload')
|
|
@click.argument('filename', type=click.File('rb'))
|
|
@click.option('--blksize', default=MAX_BLK_LEN, type=click.IntRange(256, MAX_BLK_LEN),
|
|
help='Block size to use (testing)')
|
|
@click.option('--multisig', '-m', is_flag=True,
|
|
help='Attempt multisig enroll using file')
|
|
@click.option('--miniscript', is_flag=True,
|
|
help='Attempt miniscript enroll using file')
|
|
@click.option('--backup', is_flag=True,
|
|
help='Upload encrypted backup')
|
|
def file_upload(filename, blksize, multisig, miniscript, backup):
|
|
"""Send file to Coldcard (PSBT transaction or firmware)"""
|
|
|
|
if sum([multisig, miniscript, backup]) > 1:
|
|
# only 1 or None can be True
|
|
click.echo("Failed: Only one can be specified from miniscript/multisig/backup")
|
|
sys.exit(1)
|
|
|
|
# NOTE: mostly for debug/dev usage.
|
|
with get_device() as dev:
|
|
|
|
file_len, sha = real_file_upload(filename, dev, blksize=blksize)
|
|
|
|
if multisig:
|
|
dev.send_recv(CCProtocolPacker.multisig_enroll(file_len, sha))
|
|
elif miniscript:
|
|
dev.send_recv(CCProtocolPacker.miniscript_enroll(file_len, sha), timeout=None)
|
|
elif backup:
|
|
dev.send_recv(CCProtocolPacker.restore_backup(file_len, sha), timeout=None)
|
|
|
|
|
|
|
|
@main.command('upgrade')
|
|
@click.argument('filename', type=click.File('rb'), metavar="FIRMWARE.dfu",
|
|
default='../stm32/firmware-signed.dfu')
|
|
@click.option('--stop-early', '-s', default=False, is_flag=True, help='Stop just before reboot')
|
|
def firmware_upgrade(filename, stop_early):
|
|
"""Send firmware file (.dfu) and trigger upgrade process"""
|
|
with get_device() as dev:
|
|
real_file_upload(filename, dev, do_upgrade=True, do_reboot=(not stop_early))
|
|
|
|
|
|
@main.command('xpub')
|
|
@click.argument('subpath', default='m')
|
|
@click.option('--verbose', '-v', is_flag=True,
|
|
help='Show extended key with master fingerprint and derivation')
|
|
def get_xpub(subpath, verbose):
|
|
"""Get the XPUB for this wallet (master level, or any derivation)"""
|
|
|
|
with get_device() as dev:
|
|
if len(subpath) == 1:
|
|
if subpath[0] == 'bip44':
|
|
subpath = BIP44_FIRST
|
|
|
|
xpub = dev.send_recv(CCProtocolPacker.get_xpub(subpath), timeout=None)
|
|
|
|
if verbose:
|
|
# return key expression with BIP-32 extended key as defined in BIP-380
|
|
sp = subpath.replace("m/", "").replace("'", "h")
|
|
click.echo(f"[{xfp2str(dev.master_fingerprint).lower()}/{sp}]{xpub}")
|
|
else:
|
|
click.echo(xpub)
|
|
|
|
|
|
@main.command('pubkey')
|
|
@click.argument('subpath', default='m')
|
|
def get_pubkey(subpath):
|
|
"""
|
|
Get the public key for a derivation path
|
|
|
|
Dump 33-byte (compressed, SEC encoded) public key value.
|
|
"""
|
|
with get_device() as dev:
|
|
xpub = dev.send_recv(CCProtocolPacker.get_xpub(subpath), timeout=None)
|
|
pubkey, _ = decode_xpub(xpub)
|
|
vk = VerifyingKey.from_string(get_pubkey_string(pubkey), curve=SECP256k1)
|
|
click.echo(b2a_hex(vk.to_string("compressed")))
|
|
|
|
|
|
@main.command('xfp')
|
|
@click.option('--swab', '-s', is_flag=True, help='Reverse endian of result (32-bit)')
|
|
def get_fingerprint(swab):
|
|
"""Get the fingerprint for this wallet (master level)"""
|
|
with get_device() as dev:
|
|
xfp = dev.master_fingerprint
|
|
assert xfp
|
|
|
|
if swab:
|
|
# this is how we used to show XFP values: LE32 hex with 0x in front.
|
|
click.echo('0x%08x' % xfp)
|
|
else:
|
|
# network order = BE32 = top 32-bits of hash160(pubkey) = 4 bytes in bip32 serialization
|
|
click.echo(xfp2str(xfp))
|
|
|
|
|
|
@main.command('version')
|
|
def get_version():
|
|
"""Get the version of the firmware installed"""
|
|
with get_device() as dev:
|
|
|
|
v = dev.send_recv(CCProtocolPacker.version())
|
|
|
|
click.echo(v)
|
|
|
|
|
|
@main.command('chain')
|
|
def get_block_chain():
|
|
"""
|
|
Get which blockchain (Bitcoin/Testnet) is configured.
|
|
BTC=>Bitcoin or XTN=>Bitcoin Testnet
|
|
"""
|
|
with get_device() as dev:
|
|
code = dev.send_recv(CCProtocolPacker.block_chain())
|
|
click.echo(code)
|
|
|
|
|
|
@main.command('eval')
|
|
@click.argument('stmt', nargs=-1)
|
|
def run_eval(stmt):
|
|
"""Simulator only: eval a python statement"""
|
|
with get_device() as dev:
|
|
stmt = ' '.join(stmt)
|
|
v = dev.send_recv(b'EVAL' + stmt.encode('utf-8'))
|
|
click.echo(v)
|
|
|
|
|
|
@main.command('exec')
|
|
@click.argument('stmt', nargs=-1)
|
|
def run_eval(stmt):
|
|
"""Simulator only: exec a python script"""
|
|
with get_device() as dev:
|
|
stmt = ' '.join(stmt)
|
|
v = dev.send_recv(b'EXEC' + stmt.encode('utf-8'))
|
|
click.echo(v)
|
|
|
|
|
|
@main.command('msg')
|
|
@click.argument('message')
|
|
@click.option('--path', '-p', default=None, help=f'Derivation for key to use', metavar="DERIVATION")
|
|
@click.option('--verbose', '-v', is_flag=True, help='Include fancy ascii armour')
|
|
@click.option('--just-sig', '-j', is_flag=True, help='Just the signature itself, nothing more')
|
|
@click.option('--segwit', '-s', is_flag=True, help='Address in segwit native (p2wpkh, bech32)')
|
|
@click.option('--wrap', '-w', is_flag=True, help='Address in segwit wrapped in P2SH (p2sh-p2wpkh)')
|
|
def sign_message(message, path, verbose, just_sig, wrap, segwit):
|
|
"""Sign a short text message"""
|
|
with get_device() as dev:
|
|
|
|
addr_fmt, af_path = addr_fmt_help(dev, wrap, segwit)
|
|
|
|
# NOTE: initial version of firmware not expected to do segwit stuff right, since
|
|
# standard very much still in flux, see: <https://github.com/bitcoin/bitcoin/issues/10542>
|
|
|
|
# not enforcing policy here on msg contents, so we can define that on product
|
|
message = message.encode('ascii') if not isinstance(message, bytes) else message
|
|
|
|
ok = dev.send_recv(CCProtocolPacker.sign_message(message, path or af_path, addr_fmt), timeout=None)
|
|
assert ok == None
|
|
|
|
print("Waiting for OK on the Coldcard...", end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
while 1:
|
|
time.sleep(0.250)
|
|
done = dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None)
|
|
if done == None:
|
|
continue
|
|
|
|
break
|
|
|
|
print("\r \r", end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
if len(done) != 2:
|
|
click.echo('Failed: %r' % done)
|
|
sys.exit(1)
|
|
|
|
addr, raw = done
|
|
|
|
sig = str(b64encode(raw), 'ascii').replace('\n', '')
|
|
|
|
if just_sig:
|
|
click.echo(str(sig))
|
|
elif verbose:
|
|
click.echo(RFC_SIGNATURE_TEMPLATE.format(msg=message.decode('ascii'),
|
|
addr=addr, sig=sig))
|
|
else:
|
|
click.echo('%s\n%s\n%s' % (message.decode('ascii'), addr, sig))
|
|
|
|
|
|
@main.command('msg-file')
|
|
@click.argument('json_file', type=click.Path(exists=True, dir_okay=False), metavar="FILE_PATH")
|
|
@click.option('--outfile', '-o', type=click.Path(), default=None,
|
|
help="Output file path for signed result (default: <basename>-signed.<ext>)")
|
|
@click.option('--outdir', '-d', type=click.Path(exists=True, dir_okay=True, file_okay=False),
|
|
default=None, help="Directory to save signed file (default: same as input)")
|
|
def sign_msg_file(json_file, outfile, outdir):
|
|
"""
|
|
Sign a text file containing a message (TXT or JSON format).
|
|
|
|
Reads a JSON or TXT file, extracts the message to sign, sends it to the Coldcard
|
|
for signing, and writes the result as an RFC2440-like signed message file.
|
|
|
|
For JSON files, the file must contain a 'msg' field. The 'subpath' and 'addr_fmt'
|
|
fields are optional:
|
|
|
|
{"msg":"this address belongs to me","subpath":"m/84h/0h/0h/0/120"}
|
|
|
|
For TXT files, the first line is the message. The optional second line specifies
|
|
the subkey derivation path. The optional third line specifies the address format
|
|
(p2pkh, p2sh-p2wpkh, or p2wpkh).
|
|
|
|
The signed output file is saved with '-signed' inserted before the file extension.
|
|
"""
|
|
|
|
basename = os.path.basename(json_file)
|
|
name, ext = os.path.splitext(basename)
|
|
|
|
if '-signed' in name:
|
|
click.echo("Error: filename cannot contain '-signed'")
|
|
sys.exit(1)
|
|
|
|
raw = open(json_file, 'r').read()
|
|
if len(raw.encode('utf-8')) > 500: #CC Spec: "Be less than 500 bytes in size"
|
|
click.echo("Error: file must be less than 500 bytes")
|
|
sys.exit(1)
|
|
|
|
# parse file contents
|
|
message = None
|
|
subpath = None
|
|
addr_fmt_str = None
|
|
|
|
try:
|
|
parsed = json.loads(raw)
|
|
if isinstance(parsed, dict):
|
|
if 'msg' not in parsed:
|
|
click.echo("Error: JSON must contain 'msg' field")
|
|
sys.exit(1)
|
|
message = parsed['msg']
|
|
subpath = parsed.get('subpath', None)
|
|
addr_fmt_str = parsed.get('addr_fmt', None)
|
|
else:
|
|
raise ValueError("not a JSON object")
|
|
except (json.JSONDecodeError, ValueError):
|
|
lines = raw.strip().split('\n')
|
|
if not lines or not lines[0].strip():
|
|
click.echo("Error: file is empty or has no message")
|
|
sys.exit(1)
|
|
message = lines[0].strip()
|
|
if len(lines) >= 2 and lines[1].strip():
|
|
subpath = lines[1].strip()
|
|
if len(lines) >= 3 and lines[2].strip():
|
|
addr_fmt_str = lines[2].strip()
|
|
|
|
wrap = False
|
|
segwit = False
|
|
if addr_fmt_str:
|
|
af = addr_fmt_str.lower().replace('-', '').replace('_', '')
|
|
if af in ('p2shp2wpkh', 'p2sh_p2wpkh', 'p2shp2wpkh'):
|
|
wrap = True
|
|
elif af in ('p2wpkh', 'segwit', 'bech32'):
|
|
segwit = True
|
|
elif af not in ('p2pkh', 'classic', ''):
|
|
click.echo("Warning: unknown addr_fmt '%s', using default (p2pkh)" % addr_fmt_str,
|
|
err=True)
|
|
|
|
with get_device() as dev:
|
|
|
|
addr_fmt, af_path = addr_fmt_help(dev, wrap, segwit)
|
|
signing_path = subpath or af_path
|
|
msg_bytes = message.encode('ascii') if not isinstance(message, bytes) else message
|
|
|
|
ok = dev.send_recv(CCProtocolPacker.sign_message(msg_bytes, signing_path, addr_fmt),
|
|
timeout=None)
|
|
assert ok == None
|
|
|
|
print("Waiting for OK on the Coldcard...", end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
while 1:
|
|
time.sleep(0.250)
|
|
done = dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None)
|
|
if done == None:
|
|
continue
|
|
break
|
|
|
|
print("\r \r", end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
if len(done) != 2:
|
|
click.echo('Failed: %r' % done)
|
|
sys.exit(1)
|
|
|
|
addr, raw_sig = done
|
|
|
|
sig = str(b64encode(raw_sig), 'ascii').replace('\n', '')
|
|
|
|
# format as RFC2440-like armoured output
|
|
result = RFC_SIGNATURE_TEMPLATE.format(msg=message, addr=addr, sig=sig)
|
|
|
|
if not outfile:
|
|
out_dir = outdir or os.path.dirname(os.path.abspath(json_file))
|
|
signed_name = '%s-signed%s' % (name, ext)
|
|
outfile = os.path.join(out_dir, signed_name)
|
|
|
|
open(outfile, 'w').write(result)
|
|
click.echo("Wrote signed message to: %s" % outfile)
|
|
|
|
def wait_and_download(dev, req, fn):
|
|
# Wait for user action on the device... by polling w/ indicated request
|
|
# - also download resulting file
|
|
print("Waiting for OK on the Coldcard...", end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
while 1:
|
|
time.sleep(0.250)
|
|
done = dev.send_recv(req, timeout=None)
|
|
if done == None:
|
|
continue
|
|
break
|
|
|
|
print("\r \r", end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
if len(done) != 2:
|
|
click.echo('Failed: %r' % done)
|
|
sys.exit(1)
|
|
|
|
result_len, result_sha = done
|
|
|
|
# download the result.
|
|
|
|
click.echo("Ok! Downloading result (%d bytes)" % result_len, err=1)
|
|
result = dev.download_file(result_len, result_sha, file_number=fn)
|
|
|
|
return result, result_sha
|
|
|
|
|
|
@main.command('sign')
|
|
@click.argument('psbt_in', type=click.File('rb'))
|
|
@click.argument('psbt_out', type=click.File('wb'), required=False)
|
|
@click.option('--finalize', '-f', is_flag=True, help='Show final signed transaction, ready for transmission')
|
|
@click.option('--visualize', '-z', is_flag=True, help='Show text of Coldcard\'s interpretation of the transaction (does not create transaction, no interaction needed)')
|
|
@click.option('--pushtx', '-p', default=None, help='Broadcast transaction via provided PushTx URL. Shortcut options: coldcard, mempool', metavar="URL")
|
|
@click.option('--miniscript', '-m', default=None, help='Miniscript wallet name')
|
|
@click.option('--signed', '-s', is_flag=True, help='Include a signature over visualization text')
|
|
@click.option('--hex', '-x', 'hex_mode', is_flag=True, help="Write out (signed) PSBT in hexidecimal")
|
|
@click.option('--base64', '-6', 'b64_mode', is_flag=True, help="Write out (signed) PSBT encoded in base64")
|
|
@display_errors
|
|
def sign_transaction(psbt_in, psbt_out, pushtx, b64_mode, hex_mode, finalize, visualize, signed, miniscript):
|
|
"""Approve a spending transaction by signing it on Coldcard"""
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
|
|
# Handle non-binary encodings, and incorrect files.
|
|
data = psbt_in.read()
|
|
|
|
if data[:10].lower() == b'70736274ff':
|
|
# Looks hex encoded; make into binary again
|
|
hx = ''.join(re.findall(r'[0-9a-fA-F]*', data.decode('ascii')))
|
|
data = a2b_hex(hx)
|
|
elif data[:6] == b'cHNidP':
|
|
# Base64 encoded input
|
|
data = b64decode(data)
|
|
|
|
if data[0:5] != b'psbt\xff':
|
|
click.echo("File doesn't have PSBT magic number at start.")
|
|
sys.exit(1)
|
|
|
|
# upload the transaction
|
|
txn_len, sha = real_file_upload(io.BytesIO(data), dev)
|
|
|
|
if pushtx:
|
|
finalize = True
|
|
visualize = signed = False
|
|
|
|
flags = 0x0
|
|
if visualize or signed:
|
|
flags |= STXN_VISUALIZE
|
|
if signed:
|
|
flags |= STXN_SIGNED
|
|
elif finalize:
|
|
flags |= STXN_FINALIZE
|
|
|
|
# start the signing process
|
|
ok = dev.send_recv(CCProtocolPacker.sign_transaction(txn_len, sha, flags=flags,
|
|
miniscript_name=miniscript), timeout=None)
|
|
assert ok is None
|
|
|
|
# errors will raise here, no need for error display
|
|
result, sha = wait_and_download(dev, CCProtocolPacker.get_signed_txn(), 1)
|
|
|
|
# If 'finalize' is set, we are outputing a bitcoin transaction,
|
|
# ready for the p2p network. If the CC wasn't able to finalize it,
|
|
# an exception would have occured. Most people will want hex here, but
|
|
# resisting the urge to force it.
|
|
|
|
if pushtx:
|
|
pushtx_url = {
|
|
"coldcard": "https://coldcard.com/pushtx#",
|
|
"mempool": "https://mempool.space/pushtx#"
|
|
}.get(pushtx, pushtx)
|
|
|
|
chain = dev.send_recv(CCProtocolPacker.block_chain())
|
|
|
|
try:
|
|
url = txn_to_pushtx_url(result, pushtx_url, sha=sha, chain=chain)
|
|
click.launch(url)
|
|
except Exception as e:
|
|
click.echo(f"ERROR: {e}", err=True)
|
|
|
|
return # done here
|
|
|
|
if visualize:
|
|
if psbt_out:
|
|
psbt_out.write(result)
|
|
else:
|
|
click.echo(result, nl=False)
|
|
else:
|
|
# save it
|
|
if hex_mode:
|
|
result = b2a_hex(result)
|
|
elif b64_mode or (not psbt_out):
|
|
result = b64encode(result)
|
|
|
|
if psbt_out:
|
|
psbt_out.write(result)
|
|
elif not pushtx:
|
|
click.echo(result)
|
|
|
|
|
|
@main.command('backup')
|
|
@click.option('--outdir', '-d',
|
|
type=click.Path(exists=True,dir_okay=True, file_okay=False, writable=True),
|
|
help="Save into indicated directory (auto filename)", default='.')
|
|
@click.option('--outfile', '-o', metavar="filename.7z",
|
|
help="Name for backup file", default=None,
|
|
type=click.File('wb'))
|
|
@display_errors
|
|
def start_backup(outdir, outfile):
|
|
"""
|
|
Creates 7z encrypted backup file after prompting user to remember a massive passphrase.
|
|
Downloads the AES-encrypted data backup and by default, saves into current directory using
|
|
a filename based on today's date.
|
|
"""
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
|
|
ok = dev.send_recv(CCProtocolPacker.start_backup())
|
|
assert ok == None
|
|
|
|
result, chk = wait_and_download(dev, CCProtocolPacker.get_backup_file(), 0)
|
|
|
|
if outfile:
|
|
outfile.write(result)
|
|
outfile.close()
|
|
fn = outfile.name
|
|
else:
|
|
assert outdir
|
|
|
|
# pick a useful filename, if they gave a dirname
|
|
fn = os.path.join(outdir, time.strftime('backup-%Y%m%d-%H%M.7z'))
|
|
|
|
open(fn, 'wb').write(result)
|
|
|
|
click.echo("Wrote %d bytes into: %s\nSHA256: %s" % (len(result), fn, str(b2a_hex(chk), 'ascii')))
|
|
|
|
|
|
@main.command('restore')
|
|
@click.argument('filename', type=click.File('rb'), metavar="backup.7z")
|
|
@click.option('-c', '--plaintext', is_flag=True,
|
|
help="Force plaintext restore. No need to use if file has proper '.txt' suffix")
|
|
@click.option('-p', '--password', is_flag=True,
|
|
help="This backup has custom password. Not words.")
|
|
@click.option('-t', '--tmp', is_flag=True,
|
|
help="Force restoring backup as temporary seed. Only works for seedless Coldcard.")
|
|
@display_errors
|
|
def restore_backup(filename, plaintext, password, tmp):
|
|
"""
|
|
Uploads 7z encrypted backup file & starts backup restore process. User needs to specify
|
|
what kind of backup is being uploaded. Default is 7z encrypted file with word-based password.
|
|
Use -p/--password flag if your backup has custom not word-based password.
|
|
User is prompted to enter backup password on the device.
|
|
"""
|
|
|
|
if not plaintext and filename.name.lower().endswith(".txt"):
|
|
plaintext = True
|
|
|
|
if plaintext and password:
|
|
# only 1 or None can be True
|
|
click.echo("Failed: Plaintext backup cannot have custom password.")
|
|
sys.exit(1)
|
|
|
|
with get_device() as dev:
|
|
file_len, sha = real_file_upload(filename, dev)
|
|
dev.send_recv(
|
|
CCProtocolPacker.restore_backup(file_len, sha, password, plaintext, tmp),
|
|
timeout=None)
|
|
|
|
|
|
@main.command('addr')
|
|
@click.argument('path', default=None, metavar='[m/1/2/3]', required=False)
|
|
@click.option('--segwit', '-s', is_flag=True, help='Show in segwit native (p2wpkh, bech32)')
|
|
@click.option('--taproot', '-t', is_flag=True, help='Show in taproot (p2tr, bech32m)')
|
|
@click.option('--wrap', '-w', is_flag=True, help='Show in segwit wrapped in P2SH (p2sh-p2wpkh)')
|
|
@click.option('--quiet', '-q', is_flag=True, help='Show less details; just the address')
|
|
def show_address(path, quiet, segwit, wrap, taproot):
|
|
"""Show the human version of an address"""
|
|
with get_device() as dev:
|
|
addr_fmt, af_path = addr_fmt_help(dev, wrap, segwit, taproot)
|
|
addr = dev.send_recv(CCProtocolPacker.show_address(path or af_path, addr_fmt), timeout=None)
|
|
|
|
if quiet:
|
|
click.echo(addr)
|
|
else:
|
|
click.echo('Displaying address:\n\n%s\n' % addr)
|
|
|
|
|
|
def str_to_int_path(xfp, path):
|
|
# convert text m/34'/33/44 into BIP174 binary compat format
|
|
# - include hex for fingerprint (m) as first arg
|
|
|
|
rv = [struct.unpack('<I', a2b_hex(xfp))[0]]
|
|
for i in path.split('/'):
|
|
if i == 'm':
|
|
continue
|
|
if not i:
|
|
continue # trailing or duplicated slashes
|
|
|
|
if i[-1] in "'phHP":
|
|
assert len(i) >= 2, i
|
|
here = int(i[:-1]) | 0x80000000
|
|
else:
|
|
here = int(i)
|
|
assert 0 <= here < 0x80000000, here
|
|
|
|
rv.append(here)
|
|
|
|
return rv
|
|
|
|
|
|
@main.command('p2sh')
|
|
@click.argument('script', type=str, nargs=1, required=True)
|
|
@click.argument('fingerprints', type=str, nargs=-1, required=True)
|
|
@click.option('--segwit', '-s', is_flag=True, help='Show in segwit native (p2wsh, bech32)')
|
|
@click.option('--wrap', '-w', is_flag=True, help='Show as segwit wrapped in P2SH (p2sh-p2wsh)')
|
|
@click.option('--quiet', '-q', is_flag=True, help='Show less details; just the address')
|
|
def show_address(script, fingerprints, quiet, segwit, wrap):
|
|
"""
|
|
Show a multisig payment address on-screen.
|
|
|
|
Needs a redeem script and list of fingerprint/path (4369050F/1/0/0 for example).
|
|
|
|
This is provided as a demo or debug feature. You'll need need some way to
|
|
generate the full redeem script (hex), and the fingerprints and paths used to
|
|
generate each public key inside that. The order of fingerprint/paths must
|
|
match order of pubkeys in the script.
|
|
"""
|
|
with get_device() as dev:
|
|
|
|
addr_fmt = AF_P2SH
|
|
if segwit:
|
|
addr_fmt = AF_P2WSH
|
|
if wrap:
|
|
addr_fmt = AF_P2WSH_P2SH
|
|
|
|
script = a2b_hex(script)
|
|
N = len(fingerprints)
|
|
|
|
assert 1 <= N <= MAX_SIGNERS, "bad N"
|
|
|
|
min_signers = script[0] - 80
|
|
assert 1 <= min_signers <= N, "bad M"
|
|
|
|
assert script[-1] == 0xAE, "expect script to end with OP_CHECKMULTISIG"
|
|
assert script[-2] == 80+N, "second last byte should encode N"
|
|
|
|
xfp_paths = []
|
|
for idx, xfp in enumerate(fingerprints):
|
|
assert '/' in xfp, 'Needs a XFP/path: ' + xfp
|
|
xfp, p = xfp.split('/', 1)
|
|
|
|
xfp_paths.append(str_to_int_path(xfp, p))
|
|
|
|
addr = dev.send_recv(CCProtocolPacker.show_p2sh_address(
|
|
min_signers, xfp_paths, script, addr_fmt=addr_fmt), timeout=None)
|
|
|
|
if quiet:
|
|
click.echo(addr)
|
|
else:
|
|
click.echo('Displaying address:\n\n%s\n' % addr)
|
|
|
|
|
|
@main.command('pass')
|
|
@click.argument('passphrase', required=False)
|
|
@click.option('--passphrase', prompt=True, hide_input=True,
|
|
confirmation_prompt=False)
|
|
@click.option('--verbose', '-v', is_flag=True, help='Show new root xpub')
|
|
def bip39_passphrase(passphrase, verbose):
|
|
"""Provide a BIP39 passphrase"""
|
|
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
|
|
ok = dev.send_recv(CCProtocolPacker.bip39_passphrase(passphrase), timeout=None)
|
|
assert ok == None
|
|
|
|
print("Waiting for OK on the Coldcard...", end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
while 1:
|
|
time.sleep(0.250)
|
|
done = dev.send_recv(CCProtocolPacker.get_passphrase_done(), timeout=None)
|
|
if done == None:
|
|
continue
|
|
break
|
|
|
|
print("\r \r", end='', file=sys.stderr)
|
|
sys.stderr.flush()
|
|
|
|
if verbose:
|
|
xpub = done
|
|
click.echo(xpub)
|
|
else:
|
|
click.echo('Done.')
|
|
|
|
|
|
@main.command('multisig')
|
|
@click.option('--min-signers', '-m', type=int, default=0,
|
|
help='Minimum M signers of N required to approve (default: all)')
|
|
@click.option('--signers', '-n', 'num_signers', type=int, default=3,
|
|
help='N signers in wallet')
|
|
@click.option('--name', '-l', type=str, default='Unnamed',
|
|
help='Wallet name on Coldcard')
|
|
@click.option('--output-file', '-f', type=click.File('wt', lazy=True),
|
|
help='Save configuration to file')
|
|
@click.option('--verbose', '-v', is_flag=True,
|
|
help='Show file uploaded')
|
|
@click.option('--path', '-p', default="m/45'",
|
|
help="Derivation for key (default: BIP45 = m/45')")
|
|
@click.option('--add', '-a', 'just_add', is_flag=True,
|
|
help='Just show line required to add this Coldcard')
|
|
@click.option('--desc', '-d', 'descriptor', is_flag=True,
|
|
help='Use BIP380 descriptor template')
|
|
@click.option('--format', type=click.Choice(["p2sh", "p2sh-p2wsh", "p2wsh"]),
|
|
help='Address format', default="p2wsh")
|
|
def enroll_xpub(name, min_signers, path, num_signers, output_file, verbose,
|
|
just_add, descriptor, format):
|
|
"""
|
|
Create a skeleton file which defines a multisig wallet.
|
|
When completed, use with: "ckcc upload -m wallet.txt" or put on SD card.
|
|
"""
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
|
|
xfp = dev.master_fingerprint
|
|
my_xpub = dev.send_recv(CCProtocolPacker.get_xpub(path), timeout=None)
|
|
xfp_str = xfp2str(xfp)
|
|
new_line = "%s: %s" % (xfp_str, my_xpub)
|
|
|
|
if just_add:
|
|
click.echo(new_line)
|
|
sys.exit(0)
|
|
|
|
N = num_signers
|
|
|
|
if N < min_signers:
|
|
N = min_signers
|
|
|
|
if not (1 <= N < 15):
|
|
click.echo("N must be 1..15")
|
|
sys.exit(1)
|
|
|
|
if min_signers == 0:
|
|
min_signers = N
|
|
|
|
if not (1 <= min_signers <= N):
|
|
click.echo(f"Minimum number of signers (M) must be between 1 and N={N}")
|
|
sys.exit(1)
|
|
|
|
if not (1 <= len(name) <= 20) or name != str(name.encode('utf8'), 'ascii', 'ignore'):
|
|
click.echo("Name must be between 1 and 20 characters of ASCII.")
|
|
sys.exit(1)
|
|
|
|
if descriptor:
|
|
if format == "p2sh":
|
|
fmt = AF_P2SH
|
|
elif format == "p2sh-p2wsh":
|
|
fmt = AF_P2WSH_P2SH
|
|
else:
|
|
fmt = AF_P2WSH
|
|
config = descriptor_template(xfp=xfp_str, xpub=my_xpub, path=path, fmt=fmt, m=min_signers)
|
|
|
|
if name != 'Unnamed':
|
|
config = json.dumps({"name": name, "desc": config})
|
|
|
|
else:
|
|
# render into a template
|
|
config = f'name: {name}\npolicy: {min_signers} of {N}\nformat: {format.upper()}\n\n#path: {path}\n{new_line}\n'
|
|
if num_signers != 1:
|
|
config += '\n'.join(f'#{i+2}# FINGERPRINT: xpub123123123123123' for i in range(num_signers-1))
|
|
config += '\n'
|
|
|
|
if verbose or not output_file:
|
|
click.echo(config[:-1] if config[-1] == "\n" else config)
|
|
|
|
if output_file:
|
|
output_file.write(config)
|
|
output_file.close()
|
|
click.echo(f"Wrote to: {output_file.name}")
|
|
|
|
|
|
@main.command('hsm-start')
|
|
@click.argument('policy', type=click.Path(exists=True,dir_okay=False), metavar="policy.json", required=False)
|
|
@click.option('--dry-run', '-n', is_flag=True, help="Just validate file, don't upload")
|
|
def hsm_setup(policy, dry_run):
|
|
"""
|
|
Enable Hardware Security Module (HSM) mode.
|
|
|
|
Upload policy file (or use existing policy) and start HSM mode on device. User must approve startup.
|
|
All PSBT's will be signed automatically based on that policy.
|
|
"""
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
if policy:
|
|
if dry_run:
|
|
# check it looks reasonable, but jsut a JSON check
|
|
raw = open(policy, 'rt').read()
|
|
j = json.loads(raw)
|
|
|
|
click.echo("Policy ok")
|
|
sys.exit(0)
|
|
|
|
file_len, sha = real_file_upload(open(policy, 'rb'), dev)
|
|
|
|
dev.send_recv(CCProtocolPacker.hsm_start(file_len, sha))
|
|
else:
|
|
if dry_run:
|
|
raise click.UsageError("Dry run not useful without a policy file to check.")
|
|
|
|
dev.send_recv(CCProtocolPacker.hsm_start())
|
|
|
|
click.echo("Approve HSM policy on Coldcard screen.")
|
|
|
|
|
|
@main.command('hsm')
|
|
def hsm_status():
|
|
"""
|
|
Get current status of HSM feature.
|
|
Is it running, what is the policy (summary only).
|
|
"""
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
|
|
resp = dev.send_recv(CCProtocolPacker.hsm_status())
|
|
|
|
o = json.loads(resp)
|
|
|
|
click.echo(pformat(o))
|
|
|
|
@click.group()
|
|
def miniscript():
|
|
"""Miniscript related commands"""
|
|
pass
|
|
|
|
|
|
@miniscript.command('enroll')
|
|
@click.argument('desc', type=str, metavar="DESCRIPTOR",
|
|
required=True)
|
|
@click.option('--blksize', default=MAX_BLK_LEN,
|
|
type=click.IntRange(256, MAX_BLK_LEN),
|
|
help='Block size to use (testing)')
|
|
def miniscript_enroll(desc, blksize):
|
|
"""
|
|
Enroll miniscript wallet from string.
|
|
Descriptor can be JSON wrapped.
|
|
"""
|
|
with tempfile.NamedTemporaryFile("wb+") as tmp:
|
|
tmp.write(desc.encode())
|
|
tmp.seek(0)
|
|
with get_device() as dev:
|
|
file_len, sha = real_file_upload(tmp, dev, blksize=blksize)
|
|
dev.send_recv(CCProtocolPacker.miniscript_enroll(file_len, sha), timeout=None)
|
|
|
|
|
|
@miniscript.command('ls')
|
|
def miniscript_ls():
|
|
"""
|
|
List registered miniscript wallet names.
|
|
"""
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
|
|
resp = dev.send_recv(CCProtocolPacker.miniscript_ls())
|
|
o = json.loads(resp)
|
|
|
|
click.echo(pformat(o))
|
|
|
|
|
|
@miniscript.command('del')
|
|
@click.argument('name', type=str,
|
|
metavar="MINISCRIPT_WALLET_NAME",
|
|
required=True)
|
|
def miniscript_del(name):
|
|
"""
|
|
Delete registered miniscript wallet by name with
|
|
on device confirmation.
|
|
"""
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
|
|
dev.send_recv(CCProtocolPacker.miniscript_delete(name))
|
|
|
|
|
|
@miniscript.command('get')
|
|
@click.argument('name', type=str,
|
|
metavar="MINISCRIPT_WALLET_NAME",
|
|
required=True)
|
|
def miniscript_get(name):
|
|
"""
|
|
Get registered miniscript wallet by name.
|
|
"""
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
|
|
resp = dev.send_recv(CCProtocolPacker.miniscript_get(name),
|
|
timeout=None)
|
|
o = json.loads(resp)
|
|
|
|
click.echo(pformat(o))
|
|
|
|
|
|
@miniscript.command('policy')
|
|
@click.argument('name', type=str,
|
|
metavar="MINISCRIPT_WALLET_NAME",
|
|
required=True)
|
|
def miniscript_bip388_policy(name):
|
|
"""
|
|
Get registered miniscript wallet policy (BIP-388) by name.
|
|
"""
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
|
|
resp = dev.send_recv(CCProtocolPacker.miniscript_policy(name),
|
|
timeout=None)
|
|
o = json.loads(resp)
|
|
|
|
click.echo(pformat(o))
|
|
|
|
|
|
@miniscript.command('addr')
|
|
@click.argument('name', type=str,
|
|
metavar="MINISCRIPT_WALLET_NAME",
|
|
required=True)
|
|
@click.argument('index',
|
|
type=click.IntRange(min=0, max=(2**31)-1),
|
|
metavar="ADDR_IDX", required=True)
|
|
@click.option('--change', is_flag=True, default=False,
|
|
help='Use internal chain.')
|
|
def miniscript_address(name, change, index):
|
|
"""
|
|
Get miniscript internal/external chain address by index
|
|
with on device verification.
|
|
"""
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
|
|
resp = dev.send_recv(
|
|
CCProtocolPacker.miniscript_address(name, change, index),
|
|
timeout=None
|
|
)
|
|
click.echo(resp)
|
|
|
|
|
|
@main.command('user')
|
|
@click.argument('username', type=str, metavar="USERNAME", required=True)
|
|
@click.option('--totp', '-t', 'totp_create', is_flag=True, help='Do TOTP and let Coldcard pick secret (default)')
|
|
@click.option('--pass', 'pick_pass', is_flag=True, help='Use a password picked by Coldcard')
|
|
@click.option('--ask-pass', '-a', is_flag=True, help='Define password here (interactive)')
|
|
@click.option('--totp-secret', '-s', help='BASE32 encoded secret for TOTP 2FA method (not great)')
|
|
@click.option('--text-secret', '-p', help='Provide password on command line (not great)')
|
|
@click.option('--delete', '-d', 'do_delete', is_flag=True, help='Remove a user by name')
|
|
@click.option('--show-qr', '-q', is_flag=True, help='Show enroll QR contents (locally)')
|
|
@click.option('--hotp', is_flag=True, help='Use HOTP instead of TOTP (dev only)')
|
|
def new_user(username, totp_create=False, totp_secret=None, text_secret=None, ask_pass=False,
|
|
do_delete=False, debug=False, show_qr=False, hotp=False, pick_pass=False):
|
|
"""
|
|
Create a new user on the Coldcard for HSM policy (also delete).
|
|
|
|
You can input a password (interactively), or one can be picked
|
|
by the Coldcard. When possible the QR to enrol your 2FA app will
|
|
be shown on the Coldcard screen.
|
|
"""
|
|
from base64 import b32encode, b32decode
|
|
|
|
username = username.encode('ascii')
|
|
assert 1 <= len(username) <= MAX_USERNAME_LEN, "Username length wrong"
|
|
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
|
|
if do_delete:
|
|
dev.send_recv(CCProtocolPacker.delete_user(username))
|
|
click.echo('Deleted, if it was there')
|
|
return
|
|
|
|
if ask_pass:
|
|
assert not text_secret, "dont give and ask for password"
|
|
text_secret = click.prompt('Password (hidden)', hide_input=True, confirmation_prompt=True)
|
|
mode = USER_AUTH_HMAC
|
|
|
|
if totp_secret:
|
|
secret = b32decode(totp_secret, casefold=True)
|
|
assert len(secret) in {10, 20}
|
|
mode = USER_AUTH_TOTP
|
|
elif hotp:
|
|
mode = USER_AUTH_HOTP
|
|
secret = b''
|
|
elif pick_pass or text_secret:
|
|
mode = USER_AUTH_HMAC
|
|
else:
|
|
# default is TOTP
|
|
secret = b''
|
|
mode = USER_AUTH_TOTP
|
|
|
|
if mode == USER_AUTH_HMAC:
|
|
# default is text passwords
|
|
secret = dev.hash_password(text_secret.encode('utf8')) if text_secret else b''
|
|
assert not show_qr, 'QR not appropriate for text passwords'
|
|
|
|
if not secret and not show_qr:
|
|
# ask the Coldcard to show the QR (for password or TOTP shared secret)
|
|
mode |= USER_AUTH_SHOW_QR
|
|
|
|
new_secret = dev.send_recv(CCProtocolPacker.create_user(username, mode, secret))
|
|
|
|
if show_qr and new_secret:
|
|
# format the URL thing ... needs a spec
|
|
username = username.decode('ascii')
|
|
secret = new_secret or b32encode(secret).decode('ascii')
|
|
mode = 'hotp' if mode == USER_AUTH_HOTP else 'totp'
|
|
click.echo(f'otpauth://{mode}/{username}?secret={secret}&issuer=Coldcard%20{dev.serial}')
|
|
elif not text_secret and new_secret:
|
|
click.echo(f'New password is: {new_secret}')
|
|
else:
|
|
click.echo('Done')
|
|
|
|
|
|
@main.command('local-conf')
|
|
@click.argument('psbt-file', type=click.File('rb'), required=True, metavar="Binary PSBT")
|
|
@click.option('--next', '-n', 'next_code', type=str, help='next_local_code from Coldcard (default: ask it)')
|
|
def user_auth(psbt_file, next_code):
|
|
"""
|
|
Generate the 6-digit code needed for a specific PSBT file to authorize
|
|
it's signing on the Coldcard in HSM mode.
|
|
"""
|
|
if not next_code:
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
resp = dev.send_recv(CCProtocolPacker.hsm_status())
|
|
o = json.loads(resp)
|
|
|
|
assert o['active'], "Coldcard not in HSM mode"
|
|
|
|
next_code = o['next_local_code']
|
|
|
|
psbt_hash = sha256(psbt_file.read()).digest()
|
|
|
|
rv = calc_local_pincode(psbt_hash, next_code)
|
|
|
|
print("Local authorization code is:\n\n\t%s\n" % rv)
|
|
|
|
|
|
@main.command('auth')
|
|
@click.argument('username', type=str, metavar="USERNAME", required=True)
|
|
@click.argument('token', type=str, metavar="[TOTP]", required=False)
|
|
@click.option('--psbt-file', '-f', type=click.File('rb'), required=False)
|
|
@click.option('--password', '-p', is_flag=True, help="Prompt for password")
|
|
@click.option('--debug', '-d', is_flag=True, help='Show values used')
|
|
@click.option('--version3', '-3', is_flag=True, help='Support obsolete 3.x.x firmware')
|
|
def user_auth(username, token, password, psbt_file, debug, version3):
|
|
"""
|
|
Indicate specific user is present (for HSM).
|
|
|
|
Username and 2FA (TOTP, 6-digits) value or password are required. To use
|
|
password, the PSBT file in question must be provided.
|
|
"""
|
|
import time
|
|
from hmac import HMAC
|
|
from hashlib import pbkdf2_hmac, sha256
|
|
|
|
dryrun = True
|
|
with get_device() as dev:
|
|
dev.check_mitm()
|
|
|
|
if psbt_file or password:
|
|
if psbt_file:
|
|
psbt_hash = sha256(psbt_file.read()).digest()
|
|
dryrun = False
|
|
else:
|
|
psbt_hash = bytes(32)
|
|
|
|
pw = token or click.prompt('Password (hidden)', hide_input=True)
|
|
secret = dev.hash_password(pw.encode('utf8'), v3=version3)
|
|
|
|
token = HMAC(secret, msg=psbt_hash, digestmod=sha256).digest()
|
|
|
|
if debug:
|
|
click.echo(" secret = %s" % B2A(secret))
|
|
click.echo(" salt = %s" % B2A(salt))
|
|
|
|
totp_time = 0
|
|
else:
|
|
if not token:
|
|
token = click.prompt('2FA Token (6 digits)', hide_input=False)
|
|
|
|
if len(token) != 6 or not token.isdigit():
|
|
raise click.UsageError("2FA Token must be 6 decimal digits")
|
|
|
|
token = token.encode('ascii')
|
|
|
|
now = int(time.time())
|
|
if now % 30 < 5:
|
|
click.echo("NOTE: TOTP was on edge of expiry limit! Might not work.")
|
|
totp_time = now // 30
|
|
|
|
#raise click.UsageError("Need PSBT file as part of HMAC for password")
|
|
|
|
assert token and len(token) in {6, 32}
|
|
username = username.encode('ascii')
|
|
|
|
if debug:
|
|
click.echo(" username = %s" % username.decode('ascii'))
|
|
click.echo(" token = %s" % (B2A(token) if len(token) > 6 else token.decode('ascii')))
|
|
click.echo("totp_time = %d" % totp_time)
|
|
|
|
resp = dev.send_recv(CCProtocolPacker.user_auth(username, token, totp_time))
|
|
|
|
if not resp:
|
|
click.echo("Correct or queued")
|
|
else:
|
|
click.echo(f'Problem: {resp}')
|
|
|
|
|
|
@main.command('get-locker')
|
|
def get_storage_locker():
|
|
"""Get the value held in the Storage Locker (not Bitcoin related, reserved for HSM use)"""
|
|
with get_device() as dev:
|
|
ls = dev.send_recv(CCProtocolPacker.get_storage_locker(), timeout=None)
|
|
click.echo(ls)
|
|
|
|
|
|
keystore_keys = ["derivation", "hw_type", "label", "root_fingerprint", "soft_device_id", "xpub"]
|
|
|
|
@main.command('convert2cc')
|
|
@click.argument('file', type=click.Path(exists=True), required=True)
|
|
@click.option('--outfile', '-o', type=click.Path(),
|
|
help="output file path where adjusted wallet file is written. "
|
|
"If this is not specified output is written to <original_file>_cc")
|
|
@click.option('--dry-run', '-n', default=False, is_flag=True,
|
|
help="do not write files instead pretty print to console")
|
|
@click.option('--key', '-k', type=click.Choice(keystore_keys),
|
|
help="Multisig wallet keystore dict key based on which to match correct keystore "
|
|
"(for example hw_type or root_fingerprint). Option required for "
|
|
"multisig wallets if coldcard not connected")
|
|
@click.option('--val', '-v', type=str,
|
|
help="Multisig wallet value to match for specified key "
|
|
"(for example ledger[hw_type] or fffffff0[root_fingerprint])"
|
|
"Option required for multisig wallets if coldcard not connected")
|
|
def electrum_convert2cc(file, outfile, dry_run, key, val):
|
|
"""
|
|
Convert existing Electrum wallet file into COLDCARD wallet file.
|
|
|
|
Your Coldcard does not need to be connected, but it is better
|
|
to have it connected because it can be checked whether Coldcard
|
|
and wallet file describe the same wallet.
|
|
|
|
Under the hood this command changes values in keystore dict in
|
|
electrum wallet file. 'hw_type' is changed to coldcard.
|
|
'soft_device_id' is set to null. 'ckcc_xpub' is added (if
|
|
coldcard is connected). And 'label' is built using 'Coldcard'
|
|
+ master fingerprint.
|
|
|
|
If no '--outfile/-o' is specified, new wallet file will be
|
|
created in same location with '_cc' suffix.
|
|
|
|
To convert2cc multisig wallet file, specify --key/--val that
|
|
defines the correct keystore (ie. specific co-signer). For
|
|
example if one has 2of3 multisig (Ledger, Trezor, Colcdard) and
|
|
wants to convert the Trezor signer:
|
|
|
|
ckcc convert2cc /path/to/multisig_wallet --key hw_type --val trezor
|
|
|
|
You do not need to define --key/--val if your coldcard is
|
|
connected (loaded with correct seed and derivation path of
|
|
course). We wil match correct keystore based on root fingerprint
|
|
(XFP) obtained from connected Coldcard.
|
|
|
|
|
|
Rationale:
|
|
|
|
Users may want to switch their hardware wallet vendor. Yet
|
|
they want to keep all of their Electrum data (UTXO, labels,
|
|
contacts, payment requests...). Another possibility is when
|
|
someone's hww gets lost or broken, and they still have a seed,
|
|
this seed can be loaded to new Coldcard (check
|
|
https://coldcard.com/docs/import) and their previous Electrum
|
|
wallet can be converted.
|
|
|
|
* Does not work with encrypted wallet files - please decrypt first
|
|
via Electrum interface.
|
|
"""
|
|
if file == outfile:
|
|
click.echo("'FILE' and '--outfile' cannot be the same")
|
|
sys.exit(1)
|
|
|
|
with get_device(optional=True) as dev:
|
|
|
|
try:
|
|
# open file only for reading and close it immediately after it is loaded into memory
|
|
with open(file, "r") as f:
|
|
wallet_str = f.read()
|
|
new_wallet_str = convert2cc(wallet_str=wallet_str, dev=dev, key=key, val=val)
|
|
except json.JSONDecodeError as e:
|
|
click.echo("Failed to load wallet file {}".format(e))
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
click.echo("convert2cc failed: {}".format(e))
|
|
sys.exit(1)
|
|
|
|
if dry_run:
|
|
click.echo(new_wallet_str)
|
|
else:
|
|
if outfile is None:
|
|
outfile = filepath_append_cc(file)
|
|
try:
|
|
with open(outfile, "w") as f:
|
|
f.write(new_wallet_str)
|
|
click.echo("New wallet file created: {}".format(outfile))
|
|
except Exception as e:
|
|
click.echo("Failed to dump wallet file: {}".format(e))
|
|
sys.exit(1)
|
|
|
|
|
|
main.add_command(miniscript)
|
|
|
|
# EOF
|