diff --git a/external/ckcc-protocol b/external/ckcc-protocol index d3bbcf34..111215a3 160000 --- a/external/ckcc-protocol +++ b/external/ckcc-protocol @@ -1 +1 @@ -Subproject commit d3bbcf347a29abaa0d9ded122cf1bc2c4017faa5 +Subproject commit 111215a3fdee33fc436e35c85ec20ea45f73c14e diff --git a/graphics/hsm_0.txt b/graphics/hsm_0.txt deleted file mode 100644 index 8ca4b2c7..00000000 --- a/graphics/hsm_0.txt +++ /dev/null @@ -1,12 +0,0 @@ -xx -xx -xx - xx - xx - xx - xx - xx - xx -xx -xx -xx diff --git a/graphics/hsm_1.txt b/graphics/hsm_1.txt deleted file mode 100644 index e2de7a55..00000000 --- a/graphics/hsm_1.txt +++ /dev/null @@ -1,13 +0,0 @@ - xx - xx - xx - xx - xx - xx - xx - xx - xx - xx - xx - xx - diff --git a/graphics/hsm_2.txt b/graphics/hsm_2.txt deleted file mode 100644 index 2b6a09cd..00000000 --- a/graphics/hsm_2.txt +++ /dev/null @@ -1,12 +0,0 @@ - xx - xx - xx - xx -xx -xx -xx -xx - xx - xx - xx - xx diff --git a/graphics/hsm_3.txt b/graphics/hsm_3.txt deleted file mode 100644 index e2de7a55..00000000 --- a/graphics/hsm_3.txt +++ /dev/null @@ -1,13 +0,0 @@ - xx - xx - xx - xx - xx - xx - xx - xx - xx - xx - xx - xx - diff --git a/shared/auth.py b/shared/auth.py index 57ae03bd..6fa07ec8 100644 --- a/shared/auth.py +++ b/shared/auth.py @@ -13,14 +13,13 @@ from usb import CCBusyError from utils import HexWriter, xfp2str, problem_file_line, cleanup_deriv_path from psbt import psbtObject, FatalPSBTIssue, FraudulentChangeOutput -global active_request -active_request = None - # Where in SPI flash the two transactions are (in and out) TXN_INPUT_OFFSET = 0 TXN_OUTPUT_OFFSET = MAX_TXN_LEN class UserAuthorizedAction: + active_request = None + def __init__(self): self.refused = False self.failed = None @@ -51,26 +50,23 @@ class UserAuthorizedAction: @classmethod def cleanup(cls): # user has collected the results/errors and no need for objs - global active_request - active_request = None + cls.active_request = None gc.collect() @classmethod def check_busy(cls, allowed_cls=None): # see if we're busy. don't interrupt that... unless it's of allowed_cls # - also handle cleanup of stale actions - global active_request - - if not active_request: + if not cls.active_request: return - if allowed_cls and isinstance(active_request, allowed_cls): + if allowed_cls and isinstance(cls.active_request, allowed_cls): return # check if UX actally was cleared, and we're not really doing that anymore; recover # - happens if USB caller never comes back for their final results from ux import the_ux top_ux = the_ux.top_of_stack() - if not isinstance(top_ux, cls) and active_request.ux_done: + if not isinstance(top_ux, cls) and cls.active_request.ux_done: # do cleaup print('recovery cleanup') cls.cleanup() @@ -136,9 +132,13 @@ class ApproveMessageSign(UserAuthorizedAction): async def interact(self): # Prompt user w/ details and get approval from main import dis + from hsm import hsm_active - ch = await ux_show_story(MSG_SIG_TEMPLATE.format(msg=self.text, - addr=self.address, subpath=self.subpath)) + story = MSG_SIG_TEMPLATE.format(msg=self.text, addr=self.address, subpath=self.subpath) + if hsm_active: + ch = await hsm_active.approve_msg_sign(story, self.text, self.subpath) + else: + ch = await ux_show_story(story) if ch != 'y': # they don't want to! @@ -218,12 +218,11 @@ def sign_msg(text, subpath, addr_fmt): # Do some verification before we even show to the local user ApproveMessageSign.validate(text) - global active_request UserAuthorizedAction.check_busy() - active_request = ApproveMessageSign(text, subpath, addr_fmt) + UserAuthorizedAction.active_request = ApproveMessageSign(text, subpath, addr_fmt) # kill any menu stack, and put our thing at the top - abort_and_goto(active_request) + abort_and_goto(UserAuthorizedAction.active_request) def sign_txt_file(filename): # sign a one-line text file found on a MicroSD card @@ -231,8 +230,6 @@ def sign_txt_file(filename): from files import CardSlot, CardMissingError from sram2 import tmp_buf - global active_request - UserAuthorizedAction.cleanup() # copy message into memory @@ -320,13 +317,12 @@ def sign_txt_file(filename): msg = "Created new file:\n\n%s" % out_fn await ux_show_story(msg, title='File Signed') - global active_request UserAuthorizedAction.check_busy() - active_request = ApproveMessageSign(text, subpath, AF_CLASSIC, approved_cb=done) + UserAuthorizedAction.active_request = ApproveMessageSign(text, subpath, AF_CLASSIC, approved_cb=done) # do not kill the menu stack! from ux import the_ux - the_ux.push(active_request) + the_ux.push(UserAuthorizedAction.active_request) class ApproveTransaction(UserAuthorizedAction): @@ -594,12 +590,11 @@ class ApproveTransaction(UserAuthorizedAction): def sign_transaction(psbt_len, do_finalize=False): # transaction (binary) loaded into sflash already, checksum checked - global active_request UserAuthorizedAction.check_busy(ApproveTransaction) - active_request = ApproveTransaction(psbt_len, do_finalize) + UserAuthorizedAction.active_request = ApproveTransaction(psbt_len, do_finalize) # kill any menu stack, and put our thing at the top - abort_and_goto(active_request) + abort_and_goto(UserAuthorizedAction.active_request) def sign_psbt_file(filename): @@ -608,7 +603,6 @@ def sign_psbt_file(filename): from main import dis from sram2 import tmp_buf from utils import HexStreamer, Base64Streamer, HexWriter, Base64Writer - global active_request UserAuthorizedAction.cleanup() @@ -740,10 +734,10 @@ def sign_psbt_file(filename): UserAuthorizedAction.cleanup() - active_request = ApproveTransaction(psbt_len, approved_cb=done) + UserAuthorizedAction.active_request = ApproveTransaction(psbt_len, approved_cb=done) # kill any menu stack, and put our thing at the top - abort_and_goto(active_request) + abort_and_goto(UserAuthorizedAction.active_request) class RemoteBackup(UserAuthorizedAction): def __init__(self): @@ -774,14 +768,12 @@ class RemoteBackup(UserAuthorizedAction): def start_remote_backup(): # tell the local user the secret words, and then save to SPI flash # USB caller has to come back and download encrypted contents. - global active_request UserAuthorizedAction.cleanup() - - active_request = RemoteBackup() + UserAuthorizedAction.active_request = RemoteBackup() # kill any menu stack, and put our thing at the top - abort_and_goto(active_request) + abort_and_goto(UserAuthorizedAction.active_request) class NewPassphrase(UserAuthorizedAction): @@ -842,14 +834,12 @@ Press 2 to view the provided passphrase.\n\nOK to continue, X to cancel.''' % le def start_bip39_passphrase(pw): # tell the local user the secret words, and then save to SPI flash # USB caller has to come back and download encrypted contents. - global active_request UserAuthorizedAction.cleanup() - - active_request = NewPassphrase(pw) + UserAuthorizedAction.active_request = NewPassphrase(pw) # kill any menu stack, and put our thing at the top - abort_and_goto(active_request) + abort_and_goto(UserAuthorizedAction.active_request) class ShowAddressBase(UserAuthorizedAction): @@ -940,16 +930,14 @@ def start_show_p2sh_address(M, N, addr_format, xfp_paths, witdeem_script): assert ms.M == M assert ms.N == N - global active_request UserAuthorizedAction.check_busy(ShowAddressBase) - - active_request = ShowP2SHAddress(ms, addr_format, xfp_paths, witdeem_script) + UserAuthorizedAction.active_request = ShowP2SHAddress(ms, addr_format, xfp_paths, witdeem_script) # kill any menu stack, and put our thing at the top - abort_and_goto(active_request) + abort_and_goto(UserAuthorizedAction.active_request) # provide the value back to attached desktop - return active_request.address + return UserAuthorizedAction.active_request.address def start_show_address(addr_format, subpath): try: @@ -961,15 +949,14 @@ def start_show_address(addr_format, subpath): # require a path to a key subpath = cleanup_deriv_path(subpath) - global active_request UserAuthorizedAction.check_busy(ShowAddressBase) - active_request = ShowPKHAddress(addr_format, subpath) + UserAuthorizedAction.active_request = ShowPKHAddress(addr_format, subpath) # kill any menu stack, and put our thing at the top - abort_and_goto(active_request) + abort_and_goto(UserAuthorizedAction.active_request) # provide the value back to attached desktop - return active_request.address + return UserAuthorizedAction.active_request.address class NewEnrollRequest(UserAuthorizedAction): @@ -1010,7 +997,6 @@ class NewEnrollRequest(UserAuthorizedAction): def maybe_enroll_xpub(sf_len=None, config=None, name=None, ux_reset=False): # Offer to import (enroll) a new multisig wallet. Allow reject by user. - global active_request from multisig import MultisigWallet UserAuthorizedAction.cleanup() @@ -1023,15 +1009,15 @@ def maybe_enroll_xpub(sf_len=None, config=None, name=None, ux_reset=False): # and be shown on screen/over usb ms = MultisigWallet.from_file(config, name=name) - active_request = NewEnrollRequest(ms) + UserAuthorizedAction.active_request = NewEnrollRequest(ms) if ux_reset: # for USB case, and import from PSBT # kill any menu stack, and put our thing at the top - abort_and_goto(active_request) + abort_and_goto(UserAuthorizedAction.active_request) else: # menu item case: add to stack from ux import the_ux - the_ux.push(active_request) + the_ux.push(UserAuthorizedAction.active_request) # EOF diff --git a/shared/flow.py b/shared/flow.py index 728e804e..cd233d1d 100644 --- a/shared/flow.py +++ b/shared/flow.py @@ -12,6 +12,7 @@ from choosers import * from multisig import make_multisig_menu from paper import make_paper_wallet from address_explorer import address_explore +from hsm import hsm_policy_available # # NOTE: "Always In Title Case" @@ -190,6 +191,7 @@ NormalSystem = [ # xxxxxxxxxxxxxxxx MenuItem('Ready To Sign', f=ready2sign), MenuItem('Passphrase', f=start_b39_pw, predicate=lambda: settings.get('words', True)), + MenuItem('Start HSM Mode', f=start_hsm_menu_item, predicate=hsm_policy_available), MenuItem('Secure Logout', f=logout_now), MenuItem('Advanced', menu=AdvancedNormalMenu), MenuItem('Settings', menu=SettingsMenu), diff --git a/shared/hsm.py b/shared/hsm.py index db9da739..3e6ab4c9 100644 --- a/shared/hsm.py +++ b/shared/hsm.py @@ -1,8 +1,9 @@ # (c) Copyright 2020 by Coinkite Inc. This file is part of Coldcard # and is covered by GPLv3 license found in COPYING. # -# Operations that require user authorization, like our core features: signing messages -# and signing bitcoin transactions. +# hsm.py +# +# Unattended signing of transactions and messages, subject to a set of rules. # import stash, ure, tcc, ux, chains, sys, gc, uio, ujson, uos, utime from sffile import SFFile @@ -11,12 +12,13 @@ from utils import problem_file_line, cleanup_deriv_path from psbt import psbtObject, FatalPSBTIssue, FraudulentChangeOutput from auth import UserAuthorizedAction from utils import pretty_short_delay, pretty_delay +from uasyncio.queues import QueueEmpty -# this is None or points to HSMPolicy object +# this is None or points to the active HSMPolicy object global hsm_active hsm_active = None -# where we save policy +# where we save policy/config POLICY_FNAME = '/flash/hsm-policy.json' def get_list(j, fld_name, cleanup_fcn=None): @@ -29,7 +31,8 @@ def get_list(j, fld_name, cleanup_fcn=None): return [cleanup_fcn(i) for i in v] return v -def int_range(v, fld_name, mn=0, mx=1000): +def get_int(j, fld_name, mn=0, mx=1000): + v = j.pop(fld_name, None) or None if v is None: return v assert int(v) == v, "%s: must be integer" % fld_name v = int(v) @@ -53,7 +56,7 @@ class HSMPolicy: self.notes = j.pop('notes', None) # time period, in minutes - self.period = int_range(j.pop('period', None), 'period', 1, 72*60) + self.period = get_int(j, 'period', 1, 3*24*60) # error checking extra = set(j.keys()) @@ -317,11 +320,14 @@ class hsmUxInteraction: update_contents = show async def interact(self): - from main import numpad + import main from actions import login_now - from uasyncio.queues import QueueEmpty from uasyncio import sleep_ms + # Prevent any other component from reading numpad + real_numpad = main.numpad + main.numpad = NeuterPad + # Kill time, waiting for user input while 1: self.show() @@ -329,7 +335,7 @@ class hsmUxInteraction: try: # Poll for an event, no block - ch = numpad.get_nowait() + ch = real_numpad.get_nowait() if ch == 'x': await login_now() # immediate reboots @@ -344,7 +350,6 @@ class hsmUxInteraction: req = UserAuthorizedAction.active_request if req and not req.ux_done: try: - print('do more') await req.interact() except AbortInteraction: pass @@ -353,4 +358,32 @@ class hsmUxInteraction: # singleton hsm_ux_obj = hsmUxInteraction() +# Mock version of NumpadBase from numpad.py +class NeuterPad: + disabled = True + + @classmethod + async def get(cls): + return + + @classmethod + def get_nowait(cls): + raise QueueEmpty + + @classmethod + def empty(cls): + return True + + @classmethod + def stop(cls): + return + + @classmethod + def abort_ux(cls): + return + + @classmethod + def inject(cls, key): + return + # EOF diff --git a/shared/menu.py b/shared/menu.py index 01b1989b..274ed82b 100644 --- a/shared/menu.py +++ b/shared/menu.py @@ -245,5 +245,4 @@ class MenuSystem: # abort/nothing selected/back out? return None - -#demo() +# EOF diff --git a/shared/nvstore.py b/shared/nvstore.py index 0cb37f88..184bafdf 100644 --- a/shared/nvstore.py +++ b/shared/nvstore.py @@ -39,6 +39,7 @@ from sffile import SFFile # axi = index of last selected address in explorer # nick = optional nickname for this coldcard (personalization) # lgto = (minutes) how long to wait for Login Countdown feature +# usr = (dict) map from username to their secret, as base32 # where in SPI Flash we work (last 128k) diff --git a/shared/ux.py b/shared/ux.py index 14ccf0b9..d43d5311 100644 --- a/shared/ux.py +++ b/shared/ux.py @@ -244,7 +244,7 @@ def word_wrap(ln, w): yield left -async def ux_show_story(msg, title=None, escape=None, sensitive=False): +async def ux_show_story(msg, title=None, escape=None, sensitive=False, strict_escape=False): # show a big long string, and wait for XY to continue # - returns character used to get out (X or Y) # - can accept other chars to 'escape' as well. @@ -324,7 +324,8 @@ async def ux_show_story(msg, title=None, escape=None, sensitive=False): # allow another way out for some usages return ch elif ch in 'xy': - return ch + if not strict_escape: + return ch elif ch == '0': top = 0 elif ch == '7': # page up @@ -340,8 +341,9 @@ async def ux_show_story(msg, title=None, escape=None, sensitive=False): async def idle_logout(): from main import numpad, settings + from hsm import hsm_active - while 1: + while not hsm_active: await sleep_ms(250) # they may have changed setting recently @@ -356,11 +358,13 @@ async def idle_logout(): if now > numpad.last_event_time + timeout: # do a logout now. - print("Idle timeout now!") + print("Idle!") from actions import logout_now await logout_now() return # not reached + + print("Idle TO undone") async def ux_confirm(msg): # confirmation screen, with stock title and Y=of course. diff --git a/shared/version.py b/shared/version.py index 269d1cf7..cdce7ac0 100644 --- a/shared/version.py +++ b/shared/version.py @@ -83,7 +83,7 @@ def serial_number(): # Our USB serial number, both in DFU mode (system boot ROM), and later thanks to code in # USBD_StrDescriptor() # - # - this is **completely** public info, since we can be booted into DFU mode by any anybody + # - this is **probably** public info, since shared freely over USB during enumeration # import machine i = machine.unique_id() diff --git a/testing/constants.py b/testing/constants.py index 25a592ee..67c6836c 100644 --- a/testing/constants.py +++ b/testing/constants.py @@ -11,6 +11,8 @@ simulator_fixed_words = "wife shiver author away frog air rough vanish fantasy f simulator_fixed_xfp = 0x4369050f +simulator_serial_number = 'F1F1F1F1F1F1' + from ckcc_protocol.constants import AF_P2WSH, AFC_SCRIPT, AF_P2SH, AF_P2WSH_P2SH unmap_addr_fmt = { diff --git a/testing/test_unit.py b/testing/test_unit.py index 1b2ffc39..a6490b6a 100644 --- a/testing/test_unit.py +++ b/testing/test_unit.py @@ -4,6 +4,7 @@ # Run tests on the simulator itself, not here... these are basically "unit tests" # import pytest, glob +from helpers import B2A def test_remote_exec(sim_exec): assert sim_exec("RV.write('testing123')") == 'testing123' @@ -130,7 +131,6 @@ def test_decoding(unit_test): @pytest.mark.parametrize('msg', [b'123', b'b'*78]) @pytest.mark.parametrize('key', [b'3245', b'b'*78]) def test_hmac(sim_exec, msg, key, hasher): - from helpers import B2A import hashlib, hmac cmd = "import hmac, tcc; from h import b2a_hex; " + \ @@ -140,6 +140,46 @@ def test_hmac(sim_exec, msg, key, hasher): expect = hmac.new(key, msg, hasher).hexdigest() assert got == expect - print(expect) + #print(expect) + +@pytest.mark.parametrize('secret,counter,expect', [ + ( b'abcdefghij', 1, '765705'), + ( b'abcdefghij', 2, '816065'), + ( b'12345678901234567890', 0, '755224'), # test vectors from RFC4226 + ( b'12345678901234567890', 1, '287082'), + ( b'12345678901234567890', 2, '359152'), + ( b'12345678901234567890', 3, '969429'), + ( b'12345678901234567890', 4, '338314'), + ( b'12345678901234567890', 5, '254676'), + ( b'12345678901234567890', 6, '287922'), + ( b'12345678901234567890', 7, '162583'), + ( b'12345678901234567890', 8, '399871'), + ( b'12345678901234567890', 9, '520489'), +]) +def test_hotp(sim_exec, secret, counter, expect): + cmd = "from users import calc_hotp; " + \ + f"RV.write(calc_hotp({secret}, {counter}))" + got = sim_exec(cmd) + assert got == expect + +def test_hmac_key(sim_exec, count=50): + from hashlib import pbkdf2_hmac, sha256 + from constants import simulator_serial_number + from ckcc_protocol.constants import PBKDF2_ITER_COUNT + + salt = sha256(b'pepper'+simulator_serial_number.encode('ascii')).digest() + + for i in range(count): + pw = ('test%09d' % i).encode('ascii') + pw = pw[1:i] if i > 2 else pw + cmd = "from users import calc_hmac_key; from h import b2a_hex; " + \ + f"RV.write(b2a_hex(calc_hmac_key({pw})))" + + got = sim_exec(cmd) + + expect = B2A(pbkdf2_hmac('sha256', pw, salt, PBKDF2_ITER_COUNT)) + + assert got == expect + print(got) # EOF