Additions for HSM and user mgmt

This commit is contained in:
Peter D. Gray 2020-01-10 12:39:42 -05:00
parent d3bbcf347a
commit 111215a3fd
No known key found for this signature in database
GPG Key ID: F0E6CC6AFC16CF7B
4 changed files with 245 additions and 4 deletions

View File

@ -10,7 +10,7 @@
# - see <https://github.com/trezor/cython-hidapi/blob/master/hid.pyx> for HID api
#
#
import hid, click, sys, os, pdb, struct, time, io, re
import hid, click, sys, os, pdb, struct, time, io, re, json
from pprint import pformat
from binascii import b2a_hex, a2b_hex
from hashlib import sha256
@ -20,7 +20,8 @@ from base64 import b64decode, b64encode
from ckcc.protocol import CCProtocolPacker, CCProtocolUnpacker
from ckcc.protocol import CCProtoError, CCUserRefused, CCBusyError
from ckcc.constants import MAX_MSG_LEN, MAX_BLK_LEN
from ckcc.constants import MAX_MSG_LEN, MAX_BLK_LEN, MAX_USERNAME_LEN
from ckcc.constants import USER_AUTH_HMAC, USER_AUTH_TOTP, USER_AUTH_HOTP
from ckcc.constants import (
AF_CLASSIC, AF_P2SH, AF_P2WPKH, AF_P2WSH, AF_P2WPKH_P2SH, AF_P2WSH_P2SH)
from ckcc.client import ColdcardDevice, COINKITE_VID, CKCC_PID
@ -39,6 +40,8 @@ def my_hook(ty, val, tb):
return _sys_excepthook(ty, val, tb)
sys.excepthook=my_hook
B2A = lambda x: b2a_hex(x).decode('ascii')
def xfp2str(xfp):
# Standardized way to show an xpub's fingerprint... it's a 4-byte string
# and not really an integer. Used to show as '0x%08x' but that's wrong endian.
@ -608,7 +611,7 @@ def str_to_int_path(xfp, path):
@click.option('--wrap', '-w', is_flag=True, help='Show as segwit wrapped in P2SH (p2wpkh)')
@click.option('--quiet', '-q', is_flag=True, help='Show less details; just the address')
def show_address(script, fingerprints, quiet=False, segwit=False, wrap=False):
'''Show a multisig payment address on-screen
'''Show a multisig payment address on-screen.
Needs a redeem script and list of fingerprint/path (4369050F/1/0/0 for example).
@ -697,7 +700,7 @@ def bip39_passphrase(passphrase, verbose=False):
@click.option('--verbose', '-v', is_flag=True, help='Show file uploaded')
@click.option('--path', '-p', default="m/45'", help="Derivation for key (default: BIP45 = m/45')")
@click.option('--add', '-a', 'just_add', is_flag=True, help='Just show line required to add this Coldcard')
def enroll_xpub(name, min_signers, path, num_signers, dry_run=False, output_file=None, verbose=False, just_add=False):
def enroll_xpub(name, min_signers, path, num_signers, output_file=None, verbose=False, just_add=False):
'''
Create a skeleton file which defines a multisig wallet.
@ -749,4 +752,189 @@ When completed, use with: "ckcc upload -m wallet.txt" or put on SD card.
output_file.close()
click.echo(f"Wrote to: {output_file.name}")
@main.command('hsm-start')
@click.option('--policy', '-f', type=click.Path(exists=True,dir_okay=False), metavar="policy.json")
@click.option('--dry-run', '-n', is_flag=True, help="Just validate file, don't upload")
def hsm_setup(policy=None, dry_run=False):
'''
Enable Hardware Security Module (HSM) mode.
Upload policy file (or use existing policy) and start HSM mode on device. User must approve startup.
All PSBT's will be signed automatically based on that policy.
'''
dev = ColdcardDevice(sn=force_serial)
dev.check_mitm()
if policy:
if dry_run:
# check it looks reasonable, but jsut a JSON check
raw = open(policy, 'rt').read()
j = json.loads(raw)
click.echo("Policy ok")
sys.exit(0)
file_len, sha = real_file_upload(open(policy, 'rb'), dev=dev)
dev.send_recv(CCProtocolPacker.hsm_start(file_len, sha))
else:
if dry_run:
raise click.UsageError("Dry run not useful without a policy file to check.")
dev.send_recv(CCProtocolPacker.hsm_start())
click.echo("Approve HSM policy on Coldcard screen.")
@main.command('hsm')
def hsm_status():
'''
Get current status of HSM feature.
Is it running, what is the policy (summary only).
'''
dev = ColdcardDevice(sn=force_serial)
dev.check_mitm()
resp = dev.send_recv(CCProtocolPacker.hsm_status())
o = json.loads(resp)
click.echo(pformat(o))
@main.command('user')
@click.argument('username', type=str, metavar="USERNAME", required=True)
@click.option('--totp-create', '-t', is_flag=True, help='Do TOTP and let Coldcard pick secret')
@click.option('--ask-pass', '-a', is_flag=True, help='Define password here (interactive)')
@click.option('--totp-secret', '-s', help='BASE32 encoded secret for TOTP 2FA method (not great)')
@click.option('--text-secret', '-p', help='Provide password on command line (not great)')
@click.option('--delete', '-d', 'do_delete', is_flag=True, help='Remove a user by name')
@click.option('--hotp', is_flag=True, help='Use HOTP instead of TOTP (dev only)')
@click.option('--show-qr', '-q', is_flag=True, help='Show enroll QR contents')
def new_user(username, totp_create=False, totp_secret=None, text_secret=None, ask_pass=False,
do_delete=False, debug=False, show_qr=False, hotp=False):
'''
Create a new user on the Coldcard for HSM policy (also delete).
You can input a password (interactively), or once can be picked
by the Coldcard.
'''
from base64 import b32encode, b32decode
username = username.encode('ascii')
assert 1 <= len(username) <= MAX_USERNAME_LEN, "Username length wrong"
dev = ColdcardDevice(sn=force_serial)
dev.check_mitm()
if do_delete:
dev.send_recv(CCProtocolPacker.delete_user(username))
click.echo('Deleted, if it was there')
return
if ask_pass:
assert not text_secret, "dont give and ask for password"
text_secret = click.prompt('Password (hidden)', hide_input=True, confirmation_prompt=True)
if totp_secret:
secret = b32decode(totp_secret, casefold=True)
assert len(secret) in {10, 20}
mode = USER_AUTH_TOTP
elif totp_create:
secret = b''
mode = USER_AUTH_TOTP
elif hotp:
mode = USER_AUTH_HOTP
secret = b''
else:
secret = dev.hash_password(text_secret.encode('utf8')) if text_secret else b''
mode = USER_AUTH_HMAC
assert not show_qr, 'QR not appropriate for text passwords'
if hotp:
mode = USER_AUTH_HOTP
new_secret = dev.send_recv(CCProtocolPacker.create_user(username, mode, secret))
if show_qr:
# format the URL thing ... needs a spec
username = username.decode('ascii')
secret = new_secret or b32encode(secret).decode('ascii')
mode = 'hotp' if mode == USER_AUTH_HOTP else 'totp'
click.echo(f'otpauth://{mode}/{username}?secret={secret}&issuer=Coldcard%20{dev.serial}')
elif not text_secret:
click.echo(f'New password is: {new_secret}')
else:
click.echo('Done')
@main.command('auth')
@click.argument('username', type=str, metavar="USERNAME", required=True)
@click.argument('token', type=str, metavar="[TOTP]", required=False)
@click.option('--psbt-file', '-f', type=click.File('rb'), required=False)
@click.option('--password', '-p', is_flag=True, help="Prompt for password")
@click.option('--debug', '-d', is_flag=True, help='Show values used')
def user_auth(username, token=None, password=None, prompt=None, totp=None, psbt_file=None, debug=False):
'''
Indicate specific user is present (for HSM).
Username and 2FA (TOTP, 6-digits) value or password are required. To use
password, the PSBT file in question must be provided.
'''
import time
from hmac import HMAC
from hashlib import pbkdf2_hmac, sha256
dryrun = True
dev = ColdcardDevice(sn=force_serial)
dev.check_mitm()
if psbt_file or password:
if psbt_file:
psbt_hash = sha256(psbt_file.read()).digest()
dryrun = False
else:
psbt_hash = bytes(32)
pw = token or click.prompt('Password (hidden)', hide_input=True)
secret = dev.hash_password(pw.encode('utf8'))
token = HMAC(secret, msg=psbt_hash, digestmod=sha256).digest()
if debug:
click.echo(" secret = %s" % B2A(secret))
click.echo(" salt = %s" % B2A(salt))
totp_time = 0
else:
if not token:
token = click.prompt('2FA Token (6 digits)', hide_input=False)
if len(token) != 6 or not token.isdigit():
raise click.UsageError("2FA Token must be 6 decimal digits")
token = token.encode('ascii')
now = int(time.time())
if now % 30 < 5:
click.echo("NOTE: TOTP was on edge of expiry limit! Might not work.")
totp_time = now // 30
#raise click.UsageError("Need PSBT file as part of HMAC for password")
assert token and len(token) in {6, 32}
username = username.encode('ascii')
if debug:
click.echo(" username = %s" % username.decode('ascii'))
click.echo(" token = %s" % (B2A(token) if len(token) > 6 else token.decode('ascii')))
click.echo("totp_time = %d" % totp_time)
resp = dev.send_recv(CCProtocolPacker.user_auth(username, token, totp_time))
if not resp:
click.echo("Correct or queued")
else:
click.echo(f'Problem: {resp}')
# EOF

View File

@ -327,6 +327,15 @@ class ColdcardDevice:
return data
def hash_password(self, text_password):
# Turn text password into a key for use in HSM auth protocol
from hashlib import pbkdf2_hmac, sha256
from .constants import PBKDF2_ITER_COUNT
salt = sha256(b'pepper' + self.serial.encode('ascii')).digest()
return pbkdf2_hmac('sha256', text_password, salt, PBKDF2_ITER_COUNT)
class UnixSimulatorPipe:
# Use a UNIX pipe to the simulator instead of a real USB connection.

View File

@ -24,6 +24,14 @@ MAX_UPLOAD_LEN = const(2*MAX_TXN_LEN)
# Max length of text messages for signing
MSG_SIGNING_MAX_LENGTH = const(240)
# Types of user auth we support
USER_AUTH_TOTP = const(1) # RFC6238
USER_AUTH_HOTP = const(2) # RFC4226
USER_AUTH_HMAC = const(3) # PBKDF2('hmac-sha256', secret, sha256(psbt), PBKDF2_ITER_COUNT)
MAX_USERNAME_LEN = 16
PBKDF2_ITER_COUNT = 2500
# Max depth for derived keys, in PSBT files, and USB commands
MAX_PATH_DEPTH = const(12)

View File

@ -167,6 +167,42 @@ class CCProtocolPacker:
# one time only: put into bag, or readback bag
return b'bagi' + bytes(new_number)
@staticmethod
def hsm_start(length=0, file_sha=b''):
if length:
# New policy already be uploaded as a JSON file, get approval and start.
assert len(file_sha) == 32
return pack('<4sI32s', b'hsms', length, file_sha)
else:
# Use policy on device already. Confirmation still required by local user.
return b'hsms'
@staticmethod
def hsm_status():
# get current status of HSM mode and/or policy defined already. Returns JSON
return b'hsts'
@staticmethod
def create_user(username, auth_mode, secret=b''):
# create username, with pre-shared secret/password, or we generate.
# auth_model should be one of USER_AUTH_*
assert 1 <= len(username) <= MAX_USERNAME_LEN
assert len(secret) in { 0, 10, 20, 32}
return pack('<4sBBB', b'nwur', auth_mode, len(username), len(secret)) + username + secret
@staticmethod
def delete_user(username):
# remove a username and forget secret; cannot be used in HSM mode (only before)
assert 0 < len(username) <= MAX_USERNAME_LEN
return pack('<4sB', b'rmur', len(username)) + username
@staticmethod
def user_auth(username, token, totp_time=0):
# HSM mode: try an authentication method for a username
assert 0 < len(username) <= 16
assert 6 <= len(token) <= 32
return pack('<4sIBB', b'user', totp_time, len(username), len(token)) + username + token
class CCProtocolUnpacker:
# Take a binary response, and turn it into a python object