restore backup via USB
This commit is contained in:
parent
4e77d38108
commit
5b7bee406a
@ -22,6 +22,7 @@ Spending policies for "Single Signers" adds new spending policy options:
|
||||
- Enhancement: Show QR codes of output addresses in transaction output explorer. Explorer is
|
||||
now offered for transactions of all sizes, not just complex ones.
|
||||
- Enhancement: Added file rename, when listing contents of SD card.
|
||||
- Enhancement: Added ability to restore Coldcard backup via USB (TODO version of updated ckcc)
|
||||
- Bugfix: If all change outputs have `nValue=0`, they were not shown in UX.
|
||||
- Bugfix: Disallow negative input/output amounts in PSBT.
|
||||
- Bugfix: Fix filesystem initialization after Wife LFS or Destroy Seed.
|
||||
|
||||
@ -11,7 +11,7 @@ from uhashlib import sha256
|
||||
from public_constants import AFC_SCRIPT, AF_CLASSIC, AFC_BECH32, SUPPORTED_ADDR_FORMATS, AF_P2TR
|
||||
from public_constants import STXN_FINALIZE, STXN_VISUALIZE, STXN_SIGNED
|
||||
from sffile import SFFile
|
||||
from ux import ux_show_story, abort_and_goto, ux_dramatic_pause, ux_clear_keys
|
||||
from ux import ux_show_story, abort_and_goto, ux_dramatic_pause, ux_clear_keys, ux_confirm
|
||||
from ux import show_qr_code, OK, X, abort_and_push, AbortInteraction
|
||||
from usb import CCBusyError
|
||||
from utils import HexWriter, xfp2str, problem_file_line, cleanup_deriv_path, B2A, show_single_address
|
||||
@ -1142,6 +1142,51 @@ class RemoteBackup(UserAuthorizedAction):
|
||||
self.done()
|
||||
|
||||
|
||||
class RemoteRestoreBackup(UserAuthorizedAction):
|
||||
def __init__(self, file_len, bitflag):
|
||||
super().__init__()
|
||||
self.file_len = file_len
|
||||
self.custom_pwd = bitflag & 1
|
||||
self.plaintext = bitflag & 2
|
||||
self.force_tmp = bitflag & 4
|
||||
|
||||
def to_words(self):
|
||||
# conversion to "words" argument of "restore_complete" function
|
||||
if self.plaintext:
|
||||
return None
|
||||
elif self.custom_pwd:
|
||||
return False
|
||||
return True
|
||||
|
||||
def to_tmp(self):
|
||||
# conversion to "temporary" argument of "restore_complete" function
|
||||
from pincodes import pa
|
||||
if pa.is_secret_blank() and not self.force_tmp:
|
||||
# no master secret & not forcing tmp
|
||||
# will load backup as master seed
|
||||
return False, "master"
|
||||
|
||||
# has master secret --> load backup as tmp
|
||||
# secret is blank but user forcing tmp
|
||||
return True, "temporary"
|
||||
|
||||
async def interact(self):
|
||||
try:
|
||||
# requires confirm from user
|
||||
tmp, noun = self.to_tmp()
|
||||
if await ux_confirm("Restore uploaded backup as a %s seed?" % noun):
|
||||
from backups import restore_complete
|
||||
await restore_complete(self.file_len, tmp, self.to_words())
|
||||
else:
|
||||
self.refused = True
|
||||
|
||||
except BaseException as exc:
|
||||
self.failed = "Error during backup restore."
|
||||
# sys.print_exception(exc)
|
||||
finally:
|
||||
self.done()
|
||||
|
||||
|
||||
def start_remote_backup():
|
||||
# tell the local user the secret words, and then save to SPI flash
|
||||
# USB caller has to come back and download encrypted contents.
|
||||
@ -1152,6 +1197,12 @@ def start_remote_backup():
|
||||
# kill any menu stack, and put our thing at the top
|
||||
abort_and_goto(UserAuthorizedAction.active_request)
|
||||
|
||||
def start_remote_restore_backup(file_len, bitflag):
|
||||
UserAuthorizedAction.cleanup()
|
||||
UserAuthorizedAction.active_request = RemoteRestoreBackup(file_len, bitflag)
|
||||
# kill any menu stack, and put our thing at the top
|
||||
abort_and_goto(UserAuthorizedAction.active_request)
|
||||
|
||||
|
||||
class NewPassphrase(UserAuthorizedAction):
|
||||
def __init__(self, pw):
|
||||
|
||||
@ -6,9 +6,10 @@ import compat7z, stash, ckcc, chains, gc, sys, bip39, uos, ngu
|
||||
from ubinascii import hexlify as b2a_hex
|
||||
from ubinascii import unhexlify as a2b_hex
|
||||
from utils import deserialize_secret
|
||||
from sffile import SFFile
|
||||
from ux import ux_show_story, ux_confirm, ux_dramatic_pause, OK, X, ux_input_text
|
||||
import version, ujson
|
||||
from uio import StringIO
|
||||
from uio import StringIO, BytesIO
|
||||
import seed
|
||||
from glob import settings
|
||||
from pincodes import pa
|
||||
@ -457,8 +458,6 @@ async def write_complete_backup(pwd, fname_pattern, write_sflash=False,
|
||||
|
||||
if write_sflash:
|
||||
# for use over USB and unit testing: commit file into PSRAM
|
||||
from sffile import SFFile
|
||||
|
||||
with SFFile(0, max_size=MAX_BACKUP_FILE_SIZE, message='Saving...') as fd:
|
||||
if zz:
|
||||
fd.write(hdr)
|
||||
@ -588,62 +587,86 @@ async def restore_complete(fname_or_fd, temporary=False, words=True):
|
||||
|
||||
await done(pwd)
|
||||
|
||||
|
||||
def check_and_decrypt(fd, password):
|
||||
try:
|
||||
compat7z.check_file_headers(fd)
|
||||
except Exception as e:
|
||||
raise RuntimeError('Unable to read backup file.'
|
||||
' Has it been touched?\n\nError: '+str(e))
|
||||
|
||||
from glob import dis
|
||||
dis.fullscreen("Decrypting...")
|
||||
try:
|
||||
zz = compat7z.Builder()
|
||||
fname, contents = zz.read_file(fd, password, MAX_BACKUP_FILE_SIZE,
|
||||
progress_fcn=dis.progress_bar_show)
|
||||
|
||||
# simple quick sanity checks
|
||||
assert fname.endswith('.txt') # was == 'ckcc-backup.txt'
|
||||
assert contents[0:1] == b'#' and contents[-1:] == b'\n'
|
||||
return contents
|
||||
|
||||
except Exception as e:
|
||||
# assume everything here is "password wrong" errors
|
||||
raise RuntimeError('Unable to decrypt backup file. Incorrect password?'
|
||||
'\n\nTried:\n\n' + password)
|
||||
|
||||
|
||||
async def restore_complete_doit(fname_or_fd, words, file_cleanup=None, temporary=False):
|
||||
# Open file, read it, maybe decrypt it; return string if any error
|
||||
# - some errors will be shown, None return in that case
|
||||
# - no return if successful (due to reboot)
|
||||
from glob import dis
|
||||
from files import CardSlot, CardMissingError, needs_microsd
|
||||
|
||||
# build password
|
||||
password = ' '.join(words)
|
||||
prob = None
|
||||
|
||||
try:
|
||||
with CardSlot(readonly=True) as card:
|
||||
# filename already picked, taste it and maybe consider using its data.
|
||||
try:
|
||||
fd = open(fname_or_fd, 'rb') if isinstance(fname_or_fd, str) else fname_or_fd
|
||||
except:
|
||||
return 'Unable to open backup file.\n\n' + str(fname_or_fd)
|
||||
if isinstance(fname_or_fd, int):
|
||||
# USB restore - backup is already in PSRAM, fname of fd is length
|
||||
# TXN_INPUT_OFFSET = 0
|
||||
with SFFile(0, length=fname_or_fd) as fd:
|
||||
if not words:
|
||||
contents = fd.read(fname_or_fd)
|
||||
else:
|
||||
# read full size, then decrypt
|
||||
fd = BytesIO(fd.read(fname_or_fd))
|
||||
try:
|
||||
contents = check_and_decrypt(fd, password)
|
||||
except RuntimeError as e:
|
||||
return str(e)
|
||||
else:
|
||||
try:
|
||||
with CardSlot(readonly=True) as card:
|
||||
# filename already picked, taste it and maybe consider using its data.
|
||||
try:
|
||||
fd = open(fname_or_fd, 'rb')
|
||||
except:
|
||||
return 'Unable to open backup file.\n\n' + str(fname_or_fd)
|
||||
|
||||
try:
|
||||
if not words:
|
||||
contents = fd.read()
|
||||
else:
|
||||
try:
|
||||
compat7z.check_file_headers(fd)
|
||||
except Exception as e:
|
||||
return 'Unable to read backup file. Has it been touched?\n\nError: ' \
|
||||
+ str(e)
|
||||
try:
|
||||
if words:
|
||||
contents = check_and_decrypt(fd, password)
|
||||
else:
|
||||
contents = fd.read()
|
||||
|
||||
dis.fullscreen("Decrypting...")
|
||||
try:
|
||||
zz = compat7z.Builder()
|
||||
fname, contents = zz.read_file(fd, password, MAX_BACKUP_FILE_SIZE,
|
||||
progress_fcn=dis.progress_bar_show)
|
||||
|
||||
# simple quick sanity checks
|
||||
assert fname.endswith('.txt') # was == 'ckcc-backup.txt'
|
||||
assert contents[0:1] == b'#' and contents[-1:] == b'\n'
|
||||
|
||||
except Exception as e:
|
||||
# assume everything here is "password wrong" errors
|
||||
#print("pw wrong? %s" % e)
|
||||
|
||||
return ('Unable to decrypt backup file. Incorrect password?'
|
||||
'\n\nTried:\n\n' + password)
|
||||
finally:
|
||||
fd.close()
|
||||
except RuntimeError as e:
|
||||
return str(e)
|
||||
finally:
|
||||
fd.close()
|
||||
|
||||
if file_cleanup:
|
||||
file_cleanup(fname_or_fd)
|
||||
|
||||
except CardMissingError:
|
||||
await needs_microsd()
|
||||
return
|
||||
except CardMissingError:
|
||||
await needs_microsd()
|
||||
return
|
||||
|
||||
vals = text_bk_parser(contents)
|
||||
try:
|
||||
vals = text_bk_parser(contents)
|
||||
except:
|
||||
return "Invalid backup file."
|
||||
|
||||
# this leads to reboot if it works, else errors shown, etc.
|
||||
if temporary:
|
||||
|
||||
@ -612,7 +612,6 @@ class USBHandler:
|
||||
# STILL waiting on user
|
||||
return None
|
||||
|
||||
|
||||
if cmd == 'pwok':
|
||||
# return new root xpub
|
||||
xpub = req.result
|
||||
@ -647,6 +646,15 @@ class USBHandler:
|
||||
from auth import start_remote_backup
|
||||
return start_remote_backup()
|
||||
|
||||
if cmd == 'rest':
|
||||
# restore backup from what is already uploaded in PSRAM
|
||||
file_len, file_sha, bf = unpack_from('<I32sB', args)
|
||||
if file_sha != self.file_checksum.digest():
|
||||
return b'err_Checksum'
|
||||
|
||||
from auth import start_remote_restore_backup
|
||||
return start_remote_restore_backup(file_len, bf)
|
||||
|
||||
if cmd == 'blkc':
|
||||
# report which blockchain we are configured for
|
||||
from chains import current_chain
|
||||
|
||||
@ -93,10 +93,7 @@ def backup_system(settings_set, settings_remove, goto_home, pick_menu_item,
|
||||
# st -> seed type
|
||||
# ct -> cleartext backup
|
||||
if reuse_pw:
|
||||
if isinstance(reuse_pw, list):
|
||||
assert len(reuse_pw) == 12
|
||||
else:
|
||||
assert reuse_pw is True # default
|
||||
if reuse_pw is True:
|
||||
reuse_pw = ['zoo' for _ in range(12)]
|
||||
|
||||
settings_set('bkpw', ' '.join(reuse_pw))
|
||||
@ -129,15 +126,19 @@ def backup_system(settings_set, settings_remove, goto_home, pick_menu_item,
|
||||
|
||||
need_keypress("6")
|
||||
time.sleep(.1)
|
||||
_, story = cap_story()
|
||||
assert "Are you SURE ?!?" in story
|
||||
title, story = cap_story()
|
||||
assert "Are you SURE ?!?" in (title if is_q1 else story)
|
||||
assert "**NOT** be encrypted" in story
|
||||
press_select()
|
||||
return # nothing more to be done
|
||||
|
||||
if reuse_pw:
|
||||
assert (' 1: %s' % reuse_pw[0]) in body
|
||||
assert ('12: %s' % reuse_pw[-1]) in body
|
||||
if len(reuse_pw) == 1:
|
||||
reuse_pw = reuse_pw[0]
|
||||
assert f"{reuse_pw[0]}...{reuse_pw[-1]}" in body
|
||||
else:
|
||||
assert (' 1: %s' % reuse_pw[0]) in body
|
||||
assert ('12: %s' % reuse_pw[-1]) in body
|
||||
press_select()
|
||||
words = ['zoo'] * 12
|
||||
else:
|
||||
@ -641,4 +642,79 @@ def test_bkpw_override(reset_seed_words, override_bkpw, goto_home, pick_menu_ite
|
||||
for pw, fn in zip(test_cases, fnames):
|
||||
restore_backup_cs(fn, pw, custom_bkpw=True)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('btype', ["classic", "custom_bkpw", "plaintext"])
|
||||
@pytest.mark.parametrize('force_tmp', [True, False])
|
||||
def test_restore_usb_backup(backup_system, set_seed_words, cap_story, verify_ephemeral_secret_ui,
|
||||
settings_slots, reset_seed_words, word_menu_entry, confirm_tmp_seed,
|
||||
dev, microsd_path, press_select, btype, enter_text, force_tmp,
|
||||
unit_test, restore_main_seed, cap_menu):
|
||||
|
||||
from test_ephemeral import SEEDVAULT_TEST_DATA
|
||||
xfp_str, encoded_str, mnemonic = SEEDVAULT_TEST_DATA[2]
|
||||
set_seed_words(mnemonic)
|
||||
bkpw = 34*"Z"
|
||||
plaintext = (btype == "plaintext")
|
||||
password = False
|
||||
|
||||
# ACTUAL BACKUP
|
||||
if plaintext:
|
||||
bk_pw = backup_system(ct=True)
|
||||
elif btype == "custom_bkpw":
|
||||
# encrypted but with custom pwd
|
||||
password = True
|
||||
bk_pw = backup_system(reuse_pw=[bkpw])
|
||||
else:
|
||||
# classic word-based encrypted backup
|
||||
bk_pw = backup_system()
|
||||
|
||||
time.sleep(.1)
|
||||
title, story = cap_story()
|
||||
fname = story.split("\n\n")[1]
|
||||
|
||||
# remove all saved slots, one of them will be the one where we just created backup
|
||||
# slot where backup was created needs to be removed - otherwise we will load back to it
|
||||
# and see multisig wallet there without the need for backup to actually copy it
|
||||
for s in settings_slots():
|
||||
try:
|
||||
os.remove(s)
|
||||
except: pass
|
||||
|
||||
# clear seed
|
||||
unit_test('devtest/clear_seed.py')
|
||||
|
||||
from ckcc_protocol.protocol import CCProtocolPacker
|
||||
with open(microsd_path(fname), "rb") as f:
|
||||
file_len, sha = dev.upload_file(f.read())
|
||||
|
||||
dev.send_recv(CCProtocolPacker.restore_backup(file_len, sha, password, plaintext, force_tmp),
|
||||
timeout=None)
|
||||
time.sleep(.2)
|
||||
_, story = cap_story()
|
||||
assert f"Restore uploaded backup as a {'temporary' if force_tmp else 'master'} seed" in story
|
||||
press_select()
|
||||
|
||||
time.sleep(.1)
|
||||
if btype == "classic":
|
||||
word_menu_entry(bk_pw, has_checksum=False)
|
||||
elif password:
|
||||
enter_text(bkpw)
|
||||
|
||||
time.sleep(.2)
|
||||
mnemonic = mnemonic.split(" ")
|
||||
|
||||
if force_tmp:
|
||||
confirm_tmp_seed(seedvault=False)
|
||||
verify_ephemeral_secret_ui(mnemonic=mnemonic, xpub=None, seed_vault=False)
|
||||
restore_main_seed()
|
||||
time.sleep(.1)
|
||||
assert "New Seed Words" in cap_menu()
|
||||
else:
|
||||
_, story = cap_story()
|
||||
assert "configured for best security practices" in story
|
||||
press_select()
|
||||
time.sleep(.1)
|
||||
_, story = cap_story()
|
||||
assert "now reboot" in story
|
||||
|
||||
# EOF
|
||||
|
||||
@ -1409,6 +1409,66 @@ def test_temporary_from_backup(multisig, backup_system, import_ms_wallet, get_se
|
||||
else:
|
||||
restore_main_seed(False)
|
||||
|
||||
@pytest.mark.parametrize('btype', ["classic", "custom_bkpw", "plaintext"])
|
||||
def test_temporary_from_backup_usb(backup_system, set_seed_words, cap_story, verify_ephemeral_secret_ui,
|
||||
settings_slots, reset_seed_words, word_menu_entry, confirm_tmp_seed,
|
||||
dev, microsd_path, press_select, btype,
|
||||
enter_text):
|
||||
|
||||
xfp_str, encoded_str, mnemonic = SEEDVAULT_TEST_DATA[0]
|
||||
set_seed_words(mnemonic)
|
||||
bkpw = 32*"X"
|
||||
plaintext = (btype == "plaintext")
|
||||
password = False
|
||||
|
||||
# ACTUAL BACKUP
|
||||
if plaintext:
|
||||
bk_pw = backup_system(ct=True)
|
||||
elif btype == "custom_bkpw":
|
||||
# encrypted but with custom pwd
|
||||
password = True
|
||||
bk_pw = backup_system(reuse_pw=[bkpw])
|
||||
else:
|
||||
# classic word-based encrypted backup
|
||||
bk_pw = backup_system()
|
||||
|
||||
time.sleep(.1)
|
||||
title, story = cap_story()
|
||||
fname = story.split("\n\n")[1]
|
||||
|
||||
# remove all saved slots, one of them will be the one where we just created backup
|
||||
# slot where backup was created needs to be removed - otherwise we will load back to it
|
||||
# and see multisig wallet there without the need for backup to actually copy it
|
||||
for s in settings_slots():
|
||||
try:
|
||||
os.remove(s)
|
||||
except: pass
|
||||
|
||||
# restore fixed simulator
|
||||
reset_seed_words()
|
||||
|
||||
from ckcc_protocol.protocol import CCProtocolPacker
|
||||
with open(microsd_path(fname), "rb") as f:
|
||||
file_len, sha = dev.upload_file(f.read())
|
||||
|
||||
dev.send_recv(CCProtocolPacker.restore_backup(file_len, sha, password, plaintext), timeout=None)
|
||||
time.sleep(.2)
|
||||
_, story = cap_story()
|
||||
assert "Restore uploaded backup as a temporary seed" in story
|
||||
press_select()
|
||||
|
||||
time.sleep(.1)
|
||||
if btype == "classic":
|
||||
word_menu_entry(bk_pw, has_checksum=False)
|
||||
elif password:
|
||||
enter_text(bkpw)
|
||||
|
||||
time.sleep(.1)
|
||||
confirm_tmp_seed(seedvault=False)
|
||||
time.sleep(.1)
|
||||
mnemonic = mnemonic.split(" ")
|
||||
verify_ephemeral_secret_ui(mnemonic=mnemonic, xpub=None, seed_vault=False)
|
||||
|
||||
|
||||
def test_tmp_upgrade_disabled(reset_seed_words, pick_menu_item, cap_story,
|
||||
cap_menu, goto_home, unit_test,
|
||||
|
||||
@ -1188,7 +1188,7 @@ def test_storage_locker(package, count, start_hsm, dev):
|
||||
def test_usb_cmds_block(quick_start_hsm, dev):
|
||||
# check these commands return errors (test whitelist)
|
||||
block_list = [
|
||||
'rebo', 'dfu_', 'enrl', 'enok',
|
||||
'rebo', 'dfu_', 'enrl', 'enok', 'rest',
|
||||
'back', 'pass', 'bagi', 'hsms', 'nwur', 'rmur', 'pwok', 'bkok',
|
||||
]
|
||||
|
||||
@ -1196,8 +1196,8 @@ def test_usb_cmds_block(quick_start_hsm, dev):
|
||||
|
||||
for cmd in block_list:
|
||||
with pytest.raises(CCProtoError) as ee:
|
||||
got = dev.send_recv(cmd)
|
||||
assert 'HSM' in str(ee)
|
||||
dev.send_recv(cmd)
|
||||
assert 'Not allowed in HSM mode' in str(ee)
|
||||
|
||||
def test_unit_local_conf(sim_exec, enter_local_code, quick_start_hsm):
|
||||
# just testing our fixture really
|
||||
|
||||
Loading…
Reference in New Issue
Block a user