diff --git a/ckcc/cli.py b/ckcc/cli.py index 8c2b264..d1e1b39 100755 --- a/ckcc/cli.py +++ b/ckcc/cli.py @@ -12,7 +12,7 @@ # - see for HID api # # -import hid, click, sys, os, pdb, struct, time, io, re, json +import hid, click, sys, os, pdb, struct, time, io, re, json, contextlib from pprint import pformat from binascii import b2a_hex, a2b_hex from hashlib import sha256 @@ -49,13 +49,19 @@ sys.excepthook=my_hook B2A = lambda x: b2a_hex(x).decode('ascii') + def xfp2str(xfp): # Standardized way to show an xpub's fingerprint... it's a 4-byte string # and not really an integer. Used to show as '0x%08x' but that's wrong endian. return b2a_hex(struct.pack('Bitcoin or XTN=>Bitcoin Testnet - ''' - - dev = get_device() - - code = dev.send_recv(CCProtocolPacker.block_chain()) - - click.echo(code) + """ + with get_device() as dev: + code = dev.send_recv(CCProtocolPacker.block_chain()) + click.echo(code) @main.command('eval') @click.argument('stmt', nargs=-1) def run_eval(stmt): - "Simulator only: eval a python statement" - - dev = get_device() + """Simulator only: eval a python statement""" + with get_device() as dev: + stmt = ' '.join(stmt) + v = dev.send_recv(b'EVAL' + stmt.encode('utf-8')) + click.echo(v) - stmt = ' '.join(stmt) - - v = dev.send_recv(b'EVAL' + stmt.encode('utf-8')) - - click.echo(v) @main.command('exec') @click.argument('stmt', nargs=-1) def run_eval(stmt): - "Simulator only: exec a python script" - - dev = get_device() + """Simulator only: exec a python script""" + with get_device() as dev: + stmt = ' '.join(stmt) + v = dev.send_recv(b'EXEC' + stmt.encode('utf-8')) + click.echo(v) - stmt = ' '.join(stmt) - v = dev.send_recv(b'EXEC' + stmt.encode('utf-8')) - - click.echo(v) - @main.command('msg') @click.argument('message') @click.option('--path', '-p', default=BIP44_FIRST, @@ -398,61 +399,60 @@ def run_eval(stmt): @click.option('--segwit', '-s', is_flag=True, help='Address in segwit native (p2wpkh, bech32)') @click.option('--wrap', '-w', is_flag=True, help='Address in segwit wrapped in P2SH (p2wpkh)') def sign_message(message, path, verbose=True, just_sig=False, wrap=False, segwit=False): - "Sign a short text message" + """Sign a short text message""" + with get_device() as dev: - dev = get_device() + if wrap: + addr_fmt = AF_P2WPKH_P2SH + elif segwit: + addr_fmt = AF_P2WPKH + else: + addr_fmt = AF_CLASSIC - if wrap: - addr_fmt = AF_P2WPKH_P2SH - elif segwit: - addr_fmt = AF_P2WPKH - else: - addr_fmt = AF_CLASSIC + # NOTE: initial version of firmware not expected to do segwit stuff right, since + # standard very much still in flux, see: - # NOTE: initial version of firmware not expected to do segwit stuff right, since - # standard very much still in flux, see: + # not enforcing policy here on msg contents, so we can define that on product + message = message.encode('ascii') if not isinstance(message, bytes) else message - # not enforcing policy here on msg contents, so we can define that on product - message = message.encode('ascii') if not isinstance(message, bytes) else message + ok = dev.send_recv(CCProtocolPacker.sign_message(message, path, addr_fmt), timeout=None) + assert ok == None - ok = dev.send_recv(CCProtocolPacker.sign_message(message, path, addr_fmt), timeout=None) - assert ok == None + print("Waiting for OK on the Coldcard...", end='', file=sys.stderr) + sys.stderr.flush() - print("Waiting for OK on the Coldcard...", end='', file=sys.stderr) - sys.stderr.flush() + while 1: + time.sleep(0.250) + done = dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None) + if done == None: + continue - while 1: - time.sleep(0.250) - done = dev.send_recv(CCProtocolPacker.get_signed_msg(), timeout=None) - if done == None: - continue + break - break + print("\r \r", end='', file=sys.stderr) + sys.stderr.flush() - print("\r \r", end='', file=sys.stderr) - sys.stderr.flush() + if len(done) != 2: + click.echo('Failed: %r' % done) + sys.exit(1) - if len(done) != 2: - click.echo('Failed: %r' % done) - sys.exit(1) + addr, raw = done - addr, raw = done + sig = str(b64encode(raw), 'ascii').replace('\n', '') + + if just_sig: + click.echo(str(sig)) + elif verbose: + click.echo('-----BEGIN SIGNED MESSAGE-----\n{msg}\n-----BEGIN ' + 'SIGNATURE-----\n{addr}\n{sig}\n-----END SIGNED MESSAGE-----'.format( + msg=message.decode('ascii'), addr=addr, sig=sig)) + else: + click.echo('%s\n%s\n%s' % (message.decode('ascii'), addr, sig)) - sig = str(b64encode(raw), 'ascii').replace('\n', '') - if just_sig: - click.echo(str(sig)) - elif verbose: - click.echo('-----BEGIN SIGNED MESSAGE-----\n{msg}\n-----BEGIN ' - 'SIGNATURE-----\n{addr}\n{sig}\n-----END SIGNED MESSAGE-----'.format( - msg=message.decode('ascii'), addr=addr, sig=sig)) - else: - click.echo('%s\n%s\n%s' % (message.decode('ascii'), addr, sig)) - def wait_and_download(dev, req, fn): # Wait for user action on the device... by polling w/ indicated request # - also download resulting file - print("Waiting for OK on the Coldcard...", end='', file=sys.stderr) sys.stderr.flush() @@ -478,7 +478,8 @@ def wait_and_download(dev, req, fn): result = dev.download_file(result_len, result_sha, file_number=fn) return result, result_sha - + + @main.command('sign') @click.argument('psbt_in', type=click.File('rb')) @click.argument('psbt_out', type=click.File('wb'), required=False) @@ -490,64 +491,64 @@ def wait_and_download(dev, req, fn): @click.option('--base64', '-6', 'b64_mode', is_flag=True, help="Write out (signed) PSBT encoded in base64") @display_errors def sign_transaction(psbt_in, psbt_out=None, verbose=False, b64_mode=False, hex_mode=False, finalize=False, visualize=False, signed=False): - "Approve a spending transaction by signing it on Coldcard" + """Approve a spending transaction by signing it on Coldcard""" + with get_device() as dev: + dev.check_mitm() - dev = get_device() - dev.check_mitm() + # Handle non-binary encodings, and incorrect files. + taste = psbt_in.read(10) + psbt_in.seek(0) + if taste == b'70736274ff' or taste == b'70736274FF': + # Looks hex encoded; make into binary again + hx = ''.join(re.findall(r'[0-9a-fA-F]*', psbt_in.read().decode('ascii'))) + psbt_in = io.BytesIO(a2b_hex(hx)) + elif taste[0:6] == b'cHNidP': + # Base64 encoded input + psbt_in = io.BytesIO(b64decode(psbt_in.read())) + elif taste[0:5] != b'psbt\xff': + click.echo("File doesn't have PSBT magic number at start.") + sys.exit(1) - # Handle non-binary encodings, and incorrect files. - taste = psbt_in.read(10) - psbt_in.seek(0) - if taste == b'70736274ff' or taste == b'70736274FF': - # Looks hex encoded; make into binary again - hx = ''.join(re.findall(r'[0-9a-fA-F]*', psbt_in.read().decode('ascii'))) - psbt_in = io.BytesIO(a2b_hex(hx)) - elif taste[0:6] == b'cHNidP': - # Base64 encoded input - psbt_in = io.BytesIO(b64decode(psbt_in.read())) - elif taste[0:5] != b'psbt\xff': - click.echo("File doesn't have PSBT magic number at start.") - sys.exit(1) + # upload the transaction + txn_len, sha = real_file_upload(psbt_in, dev=dev) - # upload the transaction - txn_len, sha = real_file_upload(psbt_in, dev=dev) + flags = 0x0 + if visualize or signed: + flags |= STXN_VISUALIZE + if signed: + flags |= STXN_SIGNED + elif finalize: + flags |= STXN_FINALIZE - flags = 0x0 - if visualize or signed: - flags |= STXN_VISUALIZE - if signed: - flags |= STXN_SIGNED - elif finalize: - flags |= STXN_FINALIZE + # start the signing process + ok = dev.send_recv(CCProtocolPacker.sign_transaction(txn_len, sha, flags=flags), timeout=None) + assert ok is None - # start the signing process - ok = dev.send_recv(CCProtocolPacker.sign_transaction(txn_len, sha, flags=flags), timeout=None) - assert ok == None + # errors will raise here, no need for error display + result, _ = wait_and_download(dev, CCProtocolPacker.get_signed_txn(), 1) - # errors will raise here, no need for error display - result, _ = wait_and_download(dev, CCProtocolPacker.get_signed_txn(), 1) + # If 'finalize' is set, we are outputing a bitcoin transaction, + # ready for the p2p network. If the CC wasn't able to finalize it, + # an exception would have occured. Most people will want hex here, but + # resisting the urge to force it. - # If 'finalize' is set, we are outputing a bitcoin transaction, - # ready for the p2p network. If the CC wasn't able to finalize it, - # an exception would have occured. Most people will want hex here, but - # resisting the urge to force it. - - if visualize: - if psbt_out: - psbt_out.write(result) + if visualize: + if psbt_out: + psbt_out.write(result) + else: + click.echo(result, nl=False) else: - click.echo(result, nl=False) - else: - # save it - if hex_mode: - result = b2a_hex(result) - elif b64_mode or (not psbt_out and os.isatty(0)): - result = b64encode(result) + # save it + if hex_mode: + result = b2a_hex(result) + elif b64_mode or (not psbt_out and os.isatty(0)): + result = b64encode(result) + + if psbt_out: + psbt_out.write(result) + else: + click.echo(result) - if psbt_out: - psbt_out.write(result) - else: - click.echo(result) @main.command('backup') @click.option('--outdir', '-d', @@ -559,33 +560,34 @@ def sign_transaction(psbt_in, psbt_out=None, verbose=False, b64_mode=False, hex_ #@click.option('--verbose', '-v', is_flag=True, help='Show more details') @display_errors def start_backup(outdir, outfile, verbose=False): - '''Creates 7z encrypted backup file after prompting user to remember a massive passphrase. \ -Downloads the AES-encrypted data backup and by default, saves into current directory using \ -a filename based on today's date.''' + """ + Creates 7z encrypted backup file after prompting user to remember a massive passphrase. + Downloads the AES-encrypted data backup and by default, saves into current directory using + a filename based on today's date. + """ + with get_device() as dev: + dev.check_mitm() - dev = get_device() + ok = dev.send_recv(CCProtocolPacker.start_backup()) + assert ok == None - dev.check_mitm() + result, chk = wait_and_download(dev, CCProtocolPacker.get_backup_file(), 0) - ok = dev.send_recv(CCProtocolPacker.start_backup()) - assert ok == None + if outfile: + outfile.write(result) + outfile.close() + fn = outfile.name + else: + assert outdir - result, chk = wait_and_download(dev, CCProtocolPacker.get_backup_file(), 0) + # pick a useful filename, if they gave a dirname + fn = os.path.join(outdir, time.strftime('backup-%Y%m%d-%H%M.7z')) - if outfile: - outfile.write(result) - outfile.close() - fn = outfile.name - else: - assert outdir + open(fn, 'wb').write(result) - # pick a useful filename, if they gave a dirname - fn = os.path.join(outdir, time.strftime('backup-%Y%m%d-%H%M.7z')) + click.echo("Wrote %d bytes into: %s\nSHA256: %s" % (len(result), fn, str(b2a_hex(chk), 'ascii'))) - open(fn, 'wb').write(result) - click.echo("Wrote %d bytes into: %s\nSHA256: %s" % (len(result), fn, str(b2a_hex(chk), 'ascii'))) - @main.command('addr') @click.argument('path', default=BIP44_FIRST, metavar='[m/1/2/3]', required=False) @click.option('--segwit', '-s', is_flag=True, help='Show in segwit native (p2wpkh, bech32)') @@ -593,23 +595,22 @@ a filename based on today's date.''' @click.option('--quiet', '-q', is_flag=True, help='Show less details; just the address') @click.option('--path', '-p', default=BIP44_FIRST, help='Derivation for key to show (or first arg)') def show_address(path, quiet=False, segwit=False, wrap=False): - "Show the human version of an address" + """Show the human version of an address""" + with get_device() as dev: + if wrap: + addr_fmt = AF_P2WPKH_P2SH + elif segwit: + addr_fmt = AF_P2WPKH + else: + addr_fmt = AF_CLASSIC - dev = get_device() + addr = dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None) - if wrap: - addr_fmt = AF_P2WPKH_P2SH - elif segwit: - addr_fmt = AF_P2WPKH - else: - addr_fmt = AF_CLASSIC + if quiet: + click.echo(addr) + else: + click.echo('Displaying address:\n\n%s\n' % addr) - addr = dev.send_recv(CCProtocolPacker.show_address(path, addr_fmt), timeout=None) - - if quiet: - click.echo(addr) - else: - click.echo('Displaying address:\n\n%s\n' % addr) def str_to_int_path(xfp, path): # convert text m/34'/33/44 into BIP174 binary compat format @@ -617,8 +618,10 @@ def str_to_int_path(xfp, path): rv = [struct.unpack('= 2, i @@ -639,7 +642,8 @@ def str_to_int_path(xfp, path): @click.option('--wrap', '-w', is_flag=True, help='Show as segwit wrapped in P2SH (p2wpkh)') @click.option('--quiet', '-q', is_flag=True, help='Show less details; just the address') def show_address(script, fingerprints, quiet=False, segwit=False, wrap=False): - '''Show a multisig payment address on-screen. + """ + Show a multisig payment address on-screen. Needs a redeem script and list of fingerprint/path (4369050F/1/0/0 for example). @@ -647,41 +651,40 @@ def show_address(script, fingerprints, quiet=False, segwit=False, wrap=False): generate the full redeem script (hex), and the fingerprints and paths used to generate each public key inside that. The order of fingerprint/paths must match order of pubkeys in the script. - ''' + """ + with get_device() as dev: - dev = get_device() + addr_fmt = AF_P2SH + if segwit: + addr_fmt = AF_P2WSH + if wrap: + addr_fmt = AF_P2WSH_P2SH - addr_fmt = AF_P2SH - if segwit: - addr_fmt = AF_P2WSH - if wrap: - addr_fmt = AF_P2WSH_P2SH + script = a2b_hex(script) + N = len(fingerprints) - script = a2b_hex(script) - N = len(fingerprints) + assert 1 <= N <= 15, "bad N" - assert 1 <= N <= 15, "bad N" + min_signers = script[0] - 80 + assert 1 <= min_signers <= N, "bad M" - min_signers = script[0] - 80 - assert 1 <= min_signers <= N, "bad M" + assert script[-1] == 0xAE, "expect script to end with OP_CHECKMULTISIG" + assert script[-2] == 80+N, "second last byte should encode N" - assert script[-1] == 0xAE, "expect script to end with OP_CHECKMULTISIG" - assert script[-2] == 80+N, "second last byte should encode N" + xfp_paths = [] + for idx, xfp in enumerate(fingerprints): + assert '/' in xfp, 'Needs a XFP/path: ' + xfp + xfp, p = xfp.split('/', 1) - xfp_paths = [] - for idx, xfp in enumerate(fingerprints): - assert '/' in xfp, 'Needs a XFP/path: ' + xfp - xfp, p = xfp.split('/', 1) + xfp_paths.append(str_to_int_path(xfp, p)) - xfp_paths.append(str_to_int_path(xfp, p)) + addr = dev.send_recv(CCProtocolPacker.show_p2sh_address( + min_signers, xfp_paths, script, addr_fmt=addr_fmt), timeout=None) - addr = dev.send_recv(CCProtocolPacker.show_p2sh_address( - min_signers, xfp_paths, script, addr_fmt=addr_fmt), timeout=None) - - if quiet: - click.echo(addr) - else: - click.echo('Displaying address:\n\n%s\n' % addr) + if quiet: + click.echo(addr) + else: + click.echo('Displaying address:\n\n%s\n' % addr) @main.command('pass') @@ -690,33 +693,32 @@ def show_address(script, fingerprints, quiet=False, segwit=False, wrap=False): confirmation_prompt=False) @click.option('--verbose', '-v', is_flag=True, help='Show new root xpub') def bip39_passphrase(passphrase, verbose=False): - "Provide a BIP39 passphrase" + """Provide a BIP39 passphrase""" - dev = get_device() + with get_device() as dev: + dev.check_mitm() - dev.check_mitm() + ok = dev.send_recv(CCProtocolPacker.bip39_passphrase(passphrase), timeout=None) + assert ok == None - ok = dev.send_recv(CCProtocolPacker.bip39_passphrase(passphrase), timeout=None) - assert ok == None + print("Waiting for OK on the Coldcard...", end='', file=sys.stderr) + sys.stderr.flush() - print("Waiting for OK on the Coldcard...", end='', file=sys.stderr) - sys.stderr.flush() + while 1: + time.sleep(0.250) + done = dev.send_recv(CCProtocolPacker.get_passphrase_done(), timeout=None) + if done == None: + continue + break - while 1: - time.sleep(0.250) - done = dev.send_recv(CCProtocolPacker.get_passphrase_done(), timeout=None) - if done == None: - continue - break + print("\r \r", end='', file=sys.stderr) + sys.stderr.flush() - print("\r \r", end='', file=sys.stderr) - sys.stderr.flush() - - if verbose: - xpub = done - click.echo(xpub) - else: - click.echo('Done.') + if verbose: + xpub = done + click.echo(xpub) + else: + click.echo('Done.') @main.command('multisig') @@ -729,107 +731,104 @@ def bip39_passphrase(passphrase, verbose=False): @click.option('--path', '-p', default="m/45'", help="Derivation for key (default: BIP45 = m/45')") @click.option('--add', '-a', 'just_add', is_flag=True, help='Just show line required to add this Coldcard') def enroll_xpub(name, min_signers, path, num_signers, output_file=None, verbose=False, just_add=False): - ''' -Create a skeleton file which defines a multisig wallet. + """ + Create a skeleton file which defines a multisig wallet. + When completed, use with: "ckcc upload -m wallet.txt" or put on SD card. + """ + with get_device() as dev: + dev.check_mitm() -When completed, use with: "ckcc upload -m wallet.txt" or put on SD card. -''' + xfp = dev.master_fingerprint + my_xpub = dev.send_recv(CCProtocolPacker.get_xpub(path), timeout=None) + new_line = "%s: %s" % (xfp2str(xfp), my_xpub) - dev = get_device() - dev.check_mitm() + if just_add: + click.echo(new_line) + sys.exit(0) - xfp = dev.master_fingerprint - my_xpub = dev.send_recv(CCProtocolPacker.get_xpub(path), timeout=None) - new_line = "%s: %s" % (xfp2str(xfp), my_xpub) + N = num_signers - if just_add: - click.echo(new_line) - sys.exit(0) + if N < min_signers: + N = min_signers - N = num_signers + if not (1 <= N < 15): + click.echo("N must be 1..15") + sys.exit(1) - if N < min_signers: - N = min_signers + if min_signers == 0: + min_signers = N - if not (1 <= N < 15): - click.echo("N must be 1..15") - sys.exit(1) + if not (1 <= min_signers <= N): + click.echo(f"Minimum number of signers (M) must be between 1 and N={N}") + sys.exit(1) - if min_signers == 0: - min_signers = N + if not (1 <= len(name) <= 20) or name != str(name.encode('utf8'), 'ascii', 'ignore'): + click.echo("Name must be between 1 and 20 characters of ASCII.") + sys.exit(1) - if not (1 <= min_signers <= N): - click.echo(f"Minimum number of signers (M) must be between 1 and N={N}") - sys.exit(1) + # render into a template + config = f'name: {name}\npolicy: {min_signers} of {N}\n\n#path: {path}\n{new_line}\n' + if num_signers != 1: + config += '\n'.join(f'#{i+2}# FINGERPRINT: xpub123123123123123' for i in range(num_signers-1)) + config += '\n' - if not (1 <= len(name) <= 20) or name != str(name.encode('utf8'), 'ascii', 'ignore'): - click.echo("Name must be between 1 and 20 characters of ASCII.") - sys.exit(1) + if verbose or not output_file: + click.echo(config[:-1]) - # render into a template - config = f'name: {name}\npolicy: {min_signers} of {N}\n\n#path: {path}\n{new_line}\n' - if num_signers != 1: - config += '\n'.join(f'#{i+2}# FINGERPRINT: xpub123123123123123' for i in range(num_signers-1)) - config += '\n' + if output_file: + output_file.write(config) + output_file.close() + click.echo(f"Wrote to: {output_file.name}") - if verbose or not output_file: - click.echo(config[:-1]) - - if output_file: - output_file.write(config) - output_file.close() - click.echo(f"Wrote to: {output_file.name}") @main.command('hsm-start') @click.argument('policy', type=click.Path(exists=True,dir_okay=False), metavar="policy.json", required=False) @click.option('--dry-run', '-n', is_flag=True, help="Just validate file, don't upload") def hsm_setup(policy=None, dry_run=False): - ''' -Enable Hardware Security Module (HSM) mode. + """ + Enable Hardware Security Module (HSM) mode. -Upload policy file (or use existing policy) and start HSM mode on device. User must approve startup. -All PSBT's will be signed automatically based on that policy. + Upload policy file (or use existing policy) and start HSM mode on device. User must approve startup. + All PSBT's will be signed automatically based on that policy. + """ + with get_device() as dev: + dev.check_mitm() + if policy: + if dry_run: + # check it looks reasonable, but jsut a JSON check + raw = open(policy, 'rt').read() + j = json.loads(raw) -''' - dev = get_device() - dev.check_mitm() + click.echo("Policy ok") + sys.exit(0) - if policy: - if dry_run: - # check it looks reasonable, but jsut a JSON check - raw = open(policy, 'rt').read() - j = json.loads(raw) + file_len, sha = real_file_upload(open(policy, 'rb'), dev=dev) - click.echo("Policy ok") - sys.exit(0) + dev.send_recv(CCProtocolPacker.hsm_start(file_len, sha)) + else: + if dry_run: + raise click.UsageError("Dry run not useful without a policy file to check.") - file_len, sha = real_file_upload(open(policy, 'rb'), dev=dev) + dev.send_recv(CCProtocolPacker.hsm_start()) - dev.send_recv(CCProtocolPacker.hsm_start(file_len, sha)) - else: - if dry_run: - raise click.UsageError("Dry run not useful without a policy file to check.") + click.echo("Approve HSM policy on Coldcard screen.") - dev.send_recv(CCProtocolPacker.hsm_start()) - - click.echo("Approve HSM policy on Coldcard screen.") @main.command('hsm') def hsm_status(): - ''' -Get current status of HSM feature. + """ + Get current status of HSM feature. + Is it running, what is the policy (summary only). + """ + with get_device() as dev: + dev.check_mitm() -Is it running, what is the policy (summary only). -''' - - dev = get_device() - dev.check_mitm() + resp = dev.send_recv(CCProtocolPacker.hsm_status()) - resp = dev.send_recv(CCProtocolPacker.hsm_status()) + o = json.loads(resp) - o = json.loads(resp) + click.echo(pformat(o)) - click.echo(pformat(o)) @main.command('user') @click.argument('username', type=str, metavar="USERNAME", required=True) @@ -843,82 +842,81 @@ Is it running, what is the policy (summary only). @click.option('--hotp', is_flag=True, help='Use HOTP instead of TOTP (dev only)') def new_user(username, totp_create=False, totp_secret=None, text_secret=None, ask_pass=False, do_delete=False, debug=False, show_qr=False, hotp=False, pick_pass=False): - ''' -Create a new user on the Coldcard for HSM policy (also delete). + """ + Create a new user on the Coldcard for HSM policy (also delete). -You can input a password (interactively), or one can be picked -by the Coldcard. When possible the QR to enrol your 2FA app will -be shown on the Coldcard screen. -''' + You can input a password (interactively), or one can be picked + by the Coldcard. When possible the QR to enrol your 2FA app will + be shown on the Coldcard screen. + """ from base64 import b32encode, b32decode username = username.encode('ascii') assert 1 <= len(username) <= MAX_USERNAME_LEN, "Username length wrong" - dev = get_device() - dev.check_mitm() + with get_device() as dev: + dev.check_mitm() - if do_delete: - dev.send_recv(CCProtocolPacker.delete_user(username)) - click.echo('Deleted, if it was there') - return + if do_delete: + dev.send_recv(CCProtocolPacker.delete_user(username)) + click.echo('Deleted, if it was there') + return - if ask_pass: - assert not text_secret, "dont give and ask for password" - text_secret = click.prompt('Password (hidden)', hide_input=True, confirmation_prompt=True) - mode = USER_AUTH_HMAC + if ask_pass: + assert not text_secret, "dont give and ask for password" + text_secret = click.prompt('Password (hidden)', hide_input=True, confirmation_prompt=True) + mode = USER_AUTH_HMAC - if totp_secret: - secret = b32decode(totp_secret, casefold=True) - assert len(secret) in {10, 20} - mode = USER_AUTH_TOTP - elif hotp: - mode = USER_AUTH_HOTP - secret = b'' - elif pick_pass or text_secret: - mode = USER_AUTH_HMAC - else: - # default is TOTP - secret = b'' - mode = USER_AUTH_TOTP - - if mode == USER_AUTH_HMAC: - # default is text passwords - secret = dev.hash_password(text_secret.encode('utf8')) if text_secret else b'' - assert not show_qr, 'QR not appropriate for text passwords' + if totp_secret: + secret = b32decode(totp_secret, casefold=True) + assert len(secret) in {10, 20} + mode = USER_AUTH_TOTP + elif hotp: + mode = USER_AUTH_HOTP + secret = b'' + elif pick_pass or text_secret: + mode = USER_AUTH_HMAC + else: + # default is TOTP + secret = b'' + mode = USER_AUTH_TOTP - if not secret and not show_qr: - # ask the Coldcard to show the QR (for password or TOTP shared secret) - mode |= USER_AUTH_SHOW_QR + if mode == USER_AUTH_HMAC: + # default is text passwords + secret = dev.hash_password(text_secret.encode('utf8')) if text_secret else b'' + assert not show_qr, 'QR not appropriate for text passwords' - new_secret = dev.send_recv(CCProtocolPacker.create_user(username, mode, secret)) + if not secret and not show_qr: + # ask the Coldcard to show the QR (for password or TOTP shared secret) + mode |= USER_AUTH_SHOW_QR + + new_secret = dev.send_recv(CCProtocolPacker.create_user(username, mode, secret)) + + if show_qr and new_secret: + # format the URL thing ... needs a spec + username = username.decode('ascii') + secret = new_secret or b32encode(secret).decode('ascii') + mode = 'hotp' if mode == USER_AUTH_HOTP else 'totp' + click.echo(f'otpauth://{mode}/{username}?secret={secret}&issuer=Coldcard%20{dev.serial}') + elif not text_secret and new_secret: + click.echo(f'New password is: {new_secret}') + else: + click.echo('Done') - if show_qr and new_secret: - # format the URL thing ... needs a spec - username = username.decode('ascii') - secret = new_secret or b32encode(secret).decode('ascii') - mode = 'hotp' if mode == USER_AUTH_HOTP else 'totp' - click.echo(f'otpauth://{mode}/{username}?secret={secret}&issuer=Coldcard%20{dev.serial}') - elif not text_secret and new_secret: - click.echo(f'New password is: {new_secret}') - else: - click.echo('Done') @main.command('local-conf') @click.argument('psbt-file', type=click.File('rb'), required=True, metavar="Binary PSBT") @click.option('--next', '-n', 'next_code', type=str, help='next_local_code from Coldcard (default: ask it)') def user_auth(psbt_file, next_code=None): - ''' -Generate the 6-digit code needed for a specific PSBT file to authorize -it's signing on the Coldcard in HSM mode. -''' - + """ + Generate the 6-digit code needed for a specific PSBT file to authorize + it's signing on the Coldcard in HSM mode. + """ if not next_code: - dev = get_device() - dev.check_mitm() - - resp = dev.send_recv(CCProtocolPacker.hsm_status()) - o = json.loads(resp) + with get_device() as dev: + dev.check_mitm() + resp = dev.send_recv(CCProtocolPacker.hsm_status()) + o = json.loads(resp) assert o['active'], "Coldcard not in HSM mode" @@ -930,6 +928,7 @@ it's signing on the Coldcard in HSM mode. print("Local authorization code is:\n\n\t%s\n" % rv) + @main.command('auth') @click.argument('username', type=str, metavar="USERNAME", required=True) @click.argument('token', type=str, metavar="[TOTP]", required=False) @@ -938,76 +937,74 @@ it's signing on the Coldcard in HSM mode. @click.option('--debug', '-d', is_flag=True, help='Show values used') @click.option('--version3', '-3', is_flag=True, help='Support obsolete 3.x.x firmware') def user_auth(username, token=None, password=None, prompt=None, totp=None, psbt_file=None, debug=False, version3=False): - ''' -Indicate specific user is present (for HSM). + """ + Indicate specific user is present (for HSM). -Username and 2FA (TOTP, 6-digits) value or password are required. To use -password, the PSBT file in question must be provided. -''' + Username and 2FA (TOTP, 6-digits) value or password are required. To use + password, the PSBT file in question must be provided. + """ import time from hmac import HMAC from hashlib import pbkdf2_hmac, sha256 dryrun = True - dev = get_device() - dev.check_mitm() + with get_device() as dev: + dev.check_mitm() - if psbt_file or password: - if psbt_file: - psbt_hash = sha256(psbt_file.read()).digest() - dryrun = False + if psbt_file or password: + if psbt_file: + psbt_hash = sha256(psbt_file.read()).digest() + dryrun = False + else: + psbt_hash = bytes(32) + + pw = token or click.prompt('Password (hidden)', hide_input=True) + secret = dev.hash_password(pw.encode('utf8'), v3=version3) + + token = HMAC(secret, msg=psbt_hash, digestmod=sha256).digest() + + if debug: + click.echo(" secret = %s" % B2A(secret)) + click.echo(" salt = %s" % B2A(salt)) + + totp_time = 0 else: - psbt_hash = bytes(32) + if not token: + token = click.prompt('2FA Token (6 digits)', hide_input=False) - pw = token or click.prompt('Password (hidden)', hide_input=True) - secret = dev.hash_password(pw.encode('utf8'), v3=version3) + if len(token) != 6 or not token.isdigit(): + raise click.UsageError("2FA Token must be 6 decimal digits") - token = HMAC(secret, msg=psbt_hash, digestmod=sha256).digest() + token = token.encode('ascii') + + now = int(time.time()) + if now % 30 < 5: + click.echo("NOTE: TOTP was on edge of expiry limit! Might not work.") + totp_time = now // 30 + + #raise click.UsageError("Need PSBT file as part of HMAC for password") + + assert token and len(token) in {6, 32} + username = username.encode('ascii') if debug: - click.echo(" secret = %s" % B2A(secret)) - click.echo(" salt = %s" % B2A(salt)) + click.echo(" username = %s" % username.decode('ascii')) + click.echo(" token = %s" % (B2A(token) if len(token) > 6 else token.decode('ascii'))) + click.echo("totp_time = %d" % totp_time) - totp_time = 0 - else: - if not token: - token = click.prompt('2FA Token (6 digits)', hide_input=False) + resp = dev.send_recv(CCProtocolPacker.user_auth(username, token, totp_time)) - if len(token) != 6 or not token.isdigit(): - raise click.UsageError("2FA Token must be 6 decimal digits") + if not resp: + click.echo("Correct or queued") + else: + click.echo(f'Problem: {resp}') - token = token.encode('ascii') - - now = int(time.time()) - if now % 30 < 5: - click.echo("NOTE: TOTP was on edge of expiry limit! Might not work.") - totp_time = now // 30 - - #raise click.UsageError("Need PSBT file as part of HMAC for password") - - assert token and len(token) in {6, 32} - username = username.encode('ascii') - - if debug: - click.echo(" username = %s" % username.decode('ascii')) - click.echo(" token = %s" % (B2A(token) if len(token) > 6 else token.decode('ascii'))) - click.echo("totp_time = %d" % totp_time) - - resp = dev.send_recv(CCProtocolPacker.user_auth(username, token, totp_time)) - - if not resp: - click.echo("Correct or queued") - else: - click.echo(f'Problem: {resp}') @main.command('get-locker') def get_storage_locker(): - "Get the value held in the Storage Locker (not Bitcoin related, reserved for HSM use)" - - dev = get_device() - - ls = dev.send_recv(CCProtocolPacker.get_storage_locker(), timeout=None) - - click.echo(ls) + """Get the value held in the Storage Locker (not Bitcoin related, reserved for HSM use)""" + with get_device() as dev: + ls = dev.send_recv(CCProtocolPacker.get_storage_locker(), timeout=None) + click.echo(ls) # EOF