Additions for HSM and user mgmt

This commit is contained in:
Peter D. Gray 2020-01-10 12:39:57 -05:00
parent 8792fa27a9
commit 424850d7f3
14 changed files with 134 additions and 117 deletions

@ -1 +1 @@
Subproject commit d3bbcf347a29abaa0d9ded122cf1bc2c4017faa5
Subproject commit 111215a3fdee33fc436e35c85ec20ea45f73c14e

View File

@ -1,12 +0,0 @@
xx
xx
xx
xx
xx
xx
xx
xx
xx
xx
xx
xx

View File

@ -1,13 +0,0 @@
xx
xx
xx
xx
xx
xx
xx
xx
xx
xx
xx
xx

View File

@ -1,12 +0,0 @@
xx
xx
xx
xx
xx
xx
xx
xx
xx
xx
xx
xx

View File

@ -1,13 +0,0 @@
xx
xx
xx
xx
xx
xx
xx
xx
xx
xx
xx
xx

View File

@ -13,14 +13,13 @@ from usb import CCBusyError
from utils import HexWriter, xfp2str, problem_file_line, cleanup_deriv_path
from psbt import psbtObject, FatalPSBTIssue, FraudulentChangeOutput
global active_request
active_request = None
# Where in SPI flash the two transactions are (in and out)
TXN_INPUT_OFFSET = 0
TXN_OUTPUT_OFFSET = MAX_TXN_LEN
class UserAuthorizedAction:
active_request = None
def __init__(self):
self.refused = False
self.failed = None
@ -51,26 +50,23 @@ class UserAuthorizedAction:
@classmethod
def cleanup(cls):
# user has collected the results/errors and no need for objs
global active_request
active_request = None
cls.active_request = None
gc.collect()
@classmethod
def check_busy(cls, allowed_cls=None):
# see if we're busy. don't interrupt that... unless it's of allowed_cls
# - also handle cleanup of stale actions
global active_request
if not active_request:
if not cls.active_request:
return
if allowed_cls and isinstance(active_request, allowed_cls):
if allowed_cls and isinstance(cls.active_request, allowed_cls):
return
# check if UX actally was cleared, and we're not really doing that anymore; recover
# - happens if USB caller never comes back for their final results
from ux import the_ux
top_ux = the_ux.top_of_stack()
if not isinstance(top_ux, cls) and active_request.ux_done:
if not isinstance(top_ux, cls) and cls.active_request.ux_done:
# do cleaup
print('recovery cleanup')
cls.cleanup()
@ -136,9 +132,13 @@ class ApproveMessageSign(UserAuthorizedAction):
async def interact(self):
# Prompt user w/ details and get approval
from main import dis
from hsm import hsm_active
ch = await ux_show_story(MSG_SIG_TEMPLATE.format(msg=self.text,
addr=self.address, subpath=self.subpath))
story = MSG_SIG_TEMPLATE.format(msg=self.text, addr=self.address, subpath=self.subpath)
if hsm_active:
ch = await hsm_active.approve_msg_sign(story, self.text, self.subpath)
else:
ch = await ux_show_story(story)
if ch != 'y':
# they don't want to!
@ -218,12 +218,11 @@ def sign_msg(text, subpath, addr_fmt):
# Do some verification before we even show to the local user
ApproveMessageSign.validate(text)
global active_request
UserAuthorizedAction.check_busy()
active_request = ApproveMessageSign(text, subpath, addr_fmt)
UserAuthorizedAction.active_request = ApproveMessageSign(text, subpath, addr_fmt)
# kill any menu stack, and put our thing at the top
abort_and_goto(active_request)
abort_and_goto(UserAuthorizedAction.active_request)
def sign_txt_file(filename):
# sign a one-line text file found on a MicroSD card
@ -231,8 +230,6 @@ def sign_txt_file(filename):
from files import CardSlot, CardMissingError
from sram2 import tmp_buf
global active_request
UserAuthorizedAction.cleanup()
# copy message into memory
@ -320,13 +317,12 @@ def sign_txt_file(filename):
msg = "Created new file:\n\n%s" % out_fn
await ux_show_story(msg, title='File Signed')
global active_request
UserAuthorizedAction.check_busy()
active_request = ApproveMessageSign(text, subpath, AF_CLASSIC, approved_cb=done)
UserAuthorizedAction.active_request = ApproveMessageSign(text, subpath, AF_CLASSIC, approved_cb=done)
# do not kill the menu stack!
from ux import the_ux
the_ux.push(active_request)
the_ux.push(UserAuthorizedAction.active_request)
class ApproveTransaction(UserAuthorizedAction):
@ -594,12 +590,11 @@ class ApproveTransaction(UserAuthorizedAction):
def sign_transaction(psbt_len, do_finalize=False):
# transaction (binary) loaded into sflash already, checksum checked
global active_request
UserAuthorizedAction.check_busy(ApproveTransaction)
active_request = ApproveTransaction(psbt_len, do_finalize)
UserAuthorizedAction.active_request = ApproveTransaction(psbt_len, do_finalize)
# kill any menu stack, and put our thing at the top
abort_and_goto(active_request)
abort_and_goto(UserAuthorizedAction.active_request)
def sign_psbt_file(filename):
@ -608,7 +603,6 @@ def sign_psbt_file(filename):
from main import dis
from sram2 import tmp_buf
from utils import HexStreamer, Base64Streamer, HexWriter, Base64Writer
global active_request
UserAuthorizedAction.cleanup()
@ -740,10 +734,10 @@ def sign_psbt_file(filename):
UserAuthorizedAction.cleanup()
active_request = ApproveTransaction(psbt_len, approved_cb=done)
UserAuthorizedAction.active_request = ApproveTransaction(psbt_len, approved_cb=done)
# kill any menu stack, and put our thing at the top
abort_and_goto(active_request)
abort_and_goto(UserAuthorizedAction.active_request)
class RemoteBackup(UserAuthorizedAction):
def __init__(self):
@ -774,14 +768,12 @@ class RemoteBackup(UserAuthorizedAction):
def start_remote_backup():
# tell the local user the secret words, and then save to SPI flash
# USB caller has to come back and download encrypted contents.
global active_request
UserAuthorizedAction.cleanup()
active_request = RemoteBackup()
UserAuthorizedAction.active_request = RemoteBackup()
# kill any menu stack, and put our thing at the top
abort_and_goto(active_request)
abort_and_goto(UserAuthorizedAction.active_request)
class NewPassphrase(UserAuthorizedAction):
@ -842,14 +834,12 @@ Press 2 to view the provided passphrase.\n\nOK to continue, X to cancel.''' % le
def start_bip39_passphrase(pw):
# tell the local user the secret words, and then save to SPI flash
# USB caller has to come back and download encrypted contents.
global active_request
UserAuthorizedAction.cleanup()
active_request = NewPassphrase(pw)
UserAuthorizedAction.active_request = NewPassphrase(pw)
# kill any menu stack, and put our thing at the top
abort_and_goto(active_request)
abort_and_goto(UserAuthorizedAction.active_request)
class ShowAddressBase(UserAuthorizedAction):
@ -940,16 +930,14 @@ def start_show_p2sh_address(M, N, addr_format, xfp_paths, witdeem_script):
assert ms.M == M
assert ms.N == N
global active_request
UserAuthorizedAction.check_busy(ShowAddressBase)
active_request = ShowP2SHAddress(ms, addr_format, xfp_paths, witdeem_script)
UserAuthorizedAction.active_request = ShowP2SHAddress(ms, addr_format, xfp_paths, witdeem_script)
# kill any menu stack, and put our thing at the top
abort_and_goto(active_request)
abort_and_goto(UserAuthorizedAction.active_request)
# provide the value back to attached desktop
return active_request.address
return UserAuthorizedAction.active_request.address
def start_show_address(addr_format, subpath):
try:
@ -961,15 +949,14 @@ def start_show_address(addr_format, subpath):
# require a path to a key
subpath = cleanup_deriv_path(subpath)
global active_request
UserAuthorizedAction.check_busy(ShowAddressBase)
active_request = ShowPKHAddress(addr_format, subpath)
UserAuthorizedAction.active_request = ShowPKHAddress(addr_format, subpath)
# kill any menu stack, and put our thing at the top
abort_and_goto(active_request)
abort_and_goto(UserAuthorizedAction.active_request)
# provide the value back to attached desktop
return active_request.address
return UserAuthorizedAction.active_request.address
class NewEnrollRequest(UserAuthorizedAction):
@ -1010,7 +997,6 @@ class NewEnrollRequest(UserAuthorizedAction):
def maybe_enroll_xpub(sf_len=None, config=None, name=None, ux_reset=False):
# Offer to import (enroll) a new multisig wallet. Allow reject by user.
global active_request
from multisig import MultisigWallet
UserAuthorizedAction.cleanup()
@ -1023,15 +1009,15 @@ def maybe_enroll_xpub(sf_len=None, config=None, name=None, ux_reset=False):
# and be shown on screen/over usb
ms = MultisigWallet.from_file(config, name=name)
active_request = NewEnrollRequest(ms)
UserAuthorizedAction.active_request = NewEnrollRequest(ms)
if ux_reset:
# for USB case, and import from PSBT
# kill any menu stack, and put our thing at the top
abort_and_goto(active_request)
abort_and_goto(UserAuthorizedAction.active_request)
else:
# menu item case: add to stack
from ux import the_ux
the_ux.push(active_request)
the_ux.push(UserAuthorizedAction.active_request)
# EOF

View File

@ -12,6 +12,7 @@ from choosers import *
from multisig import make_multisig_menu
from paper import make_paper_wallet
from address_explorer import address_explore
from hsm import hsm_policy_available
#
# NOTE: "Always In Title Case"
@ -190,6 +191,7 @@ NormalSystem = [
# xxxxxxxxxxxxxxxx
MenuItem('Ready To Sign', f=ready2sign),
MenuItem('Passphrase', f=start_b39_pw, predicate=lambda: settings.get('words', True)),
MenuItem('Start HSM Mode', f=start_hsm_menu_item, predicate=hsm_policy_available),
MenuItem('Secure Logout', f=logout_now),
MenuItem('Advanced', menu=AdvancedNormalMenu),
MenuItem('Settings', menu=SettingsMenu),

View File

@ -1,8 +1,9 @@
# (c) Copyright 2020 by Coinkite Inc. This file is part of Coldcard <coldcardwallet.com>
# and is covered by GPLv3 license found in COPYING.
#
# Operations that require user authorization, like our core features: signing messages
# and signing bitcoin transactions.
# hsm.py
#
# Unattended signing of transactions and messages, subject to a set of rules.
#
import stash, ure, tcc, ux, chains, sys, gc, uio, ujson, uos, utime
from sffile import SFFile
@ -11,12 +12,13 @@ from utils import problem_file_line, cleanup_deriv_path
from psbt import psbtObject, FatalPSBTIssue, FraudulentChangeOutput
from auth import UserAuthorizedAction
from utils import pretty_short_delay, pretty_delay
from uasyncio.queues import QueueEmpty
# this is None or points to HSMPolicy object
# this is None or points to the active HSMPolicy object
global hsm_active
hsm_active = None
# where we save policy
# where we save policy/config
POLICY_FNAME = '/flash/hsm-policy.json'
def get_list(j, fld_name, cleanup_fcn=None):
@ -29,7 +31,8 @@ def get_list(j, fld_name, cleanup_fcn=None):
return [cleanup_fcn(i) for i in v]
return v
def int_range(v, fld_name, mn=0, mx=1000):
def get_int(j, fld_name, mn=0, mx=1000):
v = j.pop(fld_name, None) or None
if v is None: return v
assert int(v) == v, "%s: must be integer" % fld_name
v = int(v)
@ -53,7 +56,7 @@ class HSMPolicy:
self.notes = j.pop('notes', None)
# time period, in minutes
self.period = int_range(j.pop('period', None), 'period', 1, 72*60)
self.period = get_int(j, 'period', 1, 3*24*60)
# error checking
extra = set(j.keys())
@ -317,11 +320,14 @@ class hsmUxInteraction:
update_contents = show
async def interact(self):
from main import numpad
import main
from actions import login_now
from uasyncio.queues import QueueEmpty
from uasyncio import sleep_ms
# Prevent any other component from reading numpad
real_numpad = main.numpad
main.numpad = NeuterPad
# Kill time, waiting for user input
while 1:
self.show()
@ -329,7 +335,7 @@ class hsmUxInteraction:
try:
# Poll for an event, no block
ch = numpad.get_nowait()
ch = real_numpad.get_nowait()
if ch == 'x':
await login_now() # immediate reboots
@ -344,7 +350,6 @@ class hsmUxInteraction:
req = UserAuthorizedAction.active_request
if req and not req.ux_done:
try:
print('do more')
await req.interact()
except AbortInteraction:
pass
@ -353,4 +358,32 @@ class hsmUxInteraction:
# singleton
hsm_ux_obj = hsmUxInteraction()
# Mock version of NumpadBase from numpad.py
class NeuterPad:
disabled = True
@classmethod
async def get(cls):
return
@classmethod
def get_nowait(cls):
raise QueueEmpty
@classmethod
def empty(cls):
return True
@classmethod
def stop(cls):
return
@classmethod
def abort_ux(cls):
return
@classmethod
def inject(cls, key):
return
# EOF

View File

@ -245,5 +245,4 @@ class MenuSystem:
# abort/nothing selected/back out?
return None
#demo()
# EOF

View File

@ -39,6 +39,7 @@ from sffile import SFFile
# axi = index of last selected address in explorer
# nick = optional nickname for this coldcard (personalization)
# lgto = (minutes) how long to wait for Login Countdown feature
# usr = (dict) map from username to their secret, as base32
# where in SPI Flash we work (last 128k)

View File

@ -244,7 +244,7 @@ def word_wrap(ln, w):
yield left
async def ux_show_story(msg, title=None, escape=None, sensitive=False):
async def ux_show_story(msg, title=None, escape=None, sensitive=False, strict_escape=False):
# show a big long string, and wait for XY to continue
# - returns character used to get out (X or Y)
# - can accept other chars to 'escape' as well.
@ -324,7 +324,8 @@ async def ux_show_story(msg, title=None, escape=None, sensitive=False):
# allow another way out for some usages
return ch
elif ch in 'xy':
return ch
if not strict_escape:
return ch
elif ch == '0':
top = 0
elif ch == '7': # page up
@ -340,8 +341,9 @@ async def ux_show_story(msg, title=None, escape=None, sensitive=False):
async def idle_logout():
from main import numpad, settings
from hsm import hsm_active
while 1:
while not hsm_active:
await sleep_ms(250)
# they may have changed setting recently
@ -356,11 +358,13 @@ async def idle_logout():
if now > numpad.last_event_time + timeout:
# do a logout now.
print("Idle timeout now!")
print("Idle!")
from actions import logout_now
await logout_now()
return # not reached
print("Idle TO undone")
async def ux_confirm(msg):
# confirmation screen, with stock title and Y=of course.

View File

@ -83,7 +83,7 @@ def serial_number():
# Our USB serial number, both in DFU mode (system boot ROM), and later thanks to code in
# USBD_StrDescriptor()
#
# - this is **completely** public info, since we can be booted into DFU mode by any anybody
# - this is **probably** public info, since shared freely over USB during enumeration
#
import machine
i = machine.unique_id()

View File

@ -11,6 +11,8 @@ simulator_fixed_words = "wife shiver author away frog air rough vanish fantasy f
simulator_fixed_xfp = 0x4369050f
simulator_serial_number = 'F1F1F1F1F1F1'
from ckcc_protocol.constants import AF_P2WSH, AFC_SCRIPT, AF_P2SH, AF_P2WSH_P2SH
unmap_addr_fmt = {

View File

@ -4,6 +4,7 @@
# Run tests on the simulator itself, not here... these are basically "unit tests"
#
import pytest, glob
from helpers import B2A
def test_remote_exec(sim_exec):
assert sim_exec("RV.write('testing123')") == 'testing123'
@ -130,7 +131,6 @@ def test_decoding(unit_test):
@pytest.mark.parametrize('msg', [b'123', b'b'*78])
@pytest.mark.parametrize('key', [b'3245', b'b'*78])
def test_hmac(sim_exec, msg, key, hasher):
from helpers import B2A
import hashlib, hmac
cmd = "import hmac, tcc; from h import b2a_hex; " + \
@ -140,6 +140,46 @@ def test_hmac(sim_exec, msg, key, hasher):
expect = hmac.new(key, msg, hasher).hexdigest()
assert got == expect
print(expect)
#print(expect)
@pytest.mark.parametrize('secret,counter,expect', [
( b'abcdefghij', 1, '765705'),
( b'abcdefghij', 2, '816065'),
( b'12345678901234567890', 0, '755224'), # test vectors from RFC4226
( b'12345678901234567890', 1, '287082'),
( b'12345678901234567890', 2, '359152'),
( b'12345678901234567890', 3, '969429'),
( b'12345678901234567890', 4, '338314'),
( b'12345678901234567890', 5, '254676'),
( b'12345678901234567890', 6, '287922'),
( b'12345678901234567890', 7, '162583'),
( b'12345678901234567890', 8, '399871'),
( b'12345678901234567890', 9, '520489'),
])
def test_hotp(sim_exec, secret, counter, expect):
cmd = "from users import calc_hotp; " + \
f"RV.write(calc_hotp({secret}, {counter}))"
got = sim_exec(cmd)
assert got == expect
def test_hmac_key(sim_exec, count=50):
from hashlib import pbkdf2_hmac, sha256
from constants import simulator_serial_number
from ckcc_protocol.constants import PBKDF2_ITER_COUNT
salt = sha256(b'pepper'+simulator_serial_number.encode('ascii')).digest()
for i in range(count):
pw = ('test%09d' % i).encode('ascii')
pw = pw[1:i] if i > 2 else pw
cmd = "from users import calc_hmac_key; from h import b2a_hex; " + \
f"RV.write(b2a_hex(calc_hmac_key({pw})))"
got = sim_exec(cmd)
expect = B2A(pbkdf2_hmac('sha256', pw, salt, PBKDF2_ITER_COUNT))
assert got == expect
print(got)
# EOF