1463 lines
51 KiB
Python
1463 lines
51 KiB
Python
# (c) Copyright 2024 by Coinkite Inc. This file is covered by license found in COPYING-CC.
|
|
#
|
|
# wallet.py - A place you find UTXO, addresses and descriptors.
|
|
#
|
|
|
|
import ngu, ujson, uio, chains, ure, version, stash
|
|
from binascii import hexlify as b2a_hex
|
|
from serializations import ser_string
|
|
from desc_utils import (bip388_wallet_policy_to_descriptor, append_checksum, bip388_validate_policy,
|
|
ExtendedKey, MusigKey)
|
|
from public_constants import AF_P2TR, AF_P2WSH, AF_CLASSIC, AF_P2SH, AF_P2WSH_P2SH
|
|
from menu import MenuSystem, MenuItem, start_chooser
|
|
from ux import ux_show_story, ux_confirm, ux_dramatic_pause, OK, X, ux_enter_bip32_index
|
|
from files import CardSlot, CardMissingError, needs_microsd
|
|
from utils import problem_file_line, xfp2str, to_ascii_printable, swab32, show_single_address
|
|
from charcodes import KEY_QR, KEY_CANCEL, KEY_NFC, KEY_ENTER
|
|
from glob import settings, DESC_CACHE
|
|
|
|
# Arbitrary value, not 0 or 1, used to derive a pubkey from preshared xpub in Key Teleport
|
|
KT_RXPUBKEY_DERIV = const(20250317)
|
|
|
|
# PSBT Xpub trust policies
|
|
TRUST_VERIFY = const(0)
|
|
TRUST_OFFER = const(1)
|
|
TRUST_PSBT = const(2)
|
|
|
|
MAX_BIP32_IDX = (2 ** 31) - 1
|
|
MAX_NAME_LEN = 30 # use (almost) full potential of Q screen
|
|
|
|
class WalletOutOfSpace(RuntimeError):
|
|
pass
|
|
|
|
|
|
class WalletABC:
|
|
# How to make this ABC useful without consuming memory/code space??
|
|
# - be more of an "interface" than a base class
|
|
|
|
# name
|
|
# addr_fmt
|
|
# chain
|
|
|
|
def yield_addresses(self, start_idx, count, change_idx=0):
|
|
# returns various tuples, with at least (idx, address, ...)
|
|
pass
|
|
|
|
def render_address(self, change_idx, idx):
|
|
# make one single address as text.
|
|
|
|
tmp = list(self.yield_addresses(idx, 1, change_idx))
|
|
|
|
assert len(tmp) == 1
|
|
assert tmp[0][0] == idx
|
|
|
|
return tmp[0][1]
|
|
|
|
def to_descriptor(self):
|
|
pass
|
|
|
|
class MasterSingleSigWallet(WalletABC):
|
|
# Refers to current seed phrase, whichever is loaded master or temporary
|
|
def __init__(self, addr_fmt, path=None, account_idx=0, chain_name=None):
|
|
# Construct a wallet based on current master secret, and chain.
|
|
# - path is optional, and then we use standard path for addr_fmt
|
|
# - path can be overriden when we come here via address explorer
|
|
|
|
n = chains.addr_fmt_label(addr_fmt)
|
|
if not version.has_qwerty:
|
|
# Mk4 tiny display
|
|
# Classic P2PKH -> P2PKH
|
|
# Segwit P2WPKH -> P2WPKH
|
|
# P2SH-Segwit -> no change (should not be used that much)
|
|
n = n.split(" ")[-1]
|
|
|
|
purpose = chains.af_to_bip44_purpose(addr_fmt)
|
|
prefix = path or 'm/%dh/{coin_type}h/{account}h' % purpose
|
|
|
|
if chain_name:
|
|
self.chain = chains.get_chain(chain_name)
|
|
else:
|
|
self.chain = chains.current_chain()
|
|
|
|
if account_idx != 0:
|
|
rv = " Account#%d" if version.has_qwerty else " Acct#%d"
|
|
n += rv % account_idx
|
|
|
|
if self.chain.ctype == 'XTN':
|
|
n += ' (Testnet)' if version.has_qwerty else " XTN"
|
|
if self.chain.ctype == 'XRT':
|
|
n += ' (Regtest)' if version.has_qwerty else " XRT"
|
|
|
|
self.name = n
|
|
self.addr_fmt = addr_fmt
|
|
|
|
# Figure out the derivation path
|
|
# - we want to store path w/o change and index part
|
|
p = prefix.format(account=account_idx, coin_type=self.chain.b44_cointype,
|
|
change='C', idx='I')
|
|
if p.endswith('/C/I'):
|
|
p = p[:-4]
|
|
if p.endswith('/I'):
|
|
# custom path in addr explorer can get this
|
|
p = p[:-2]
|
|
|
|
self._path = p
|
|
|
|
def yield_addresses(self, start_idx, count, change_idx=None):
|
|
# Render a range of addresses. Slow to start, since accesses SE in general
|
|
# - if count==1, don't derive any subkey, just do path.
|
|
path = self._path
|
|
if change_idx is not None:
|
|
assert 0 <= change_idx <= 1
|
|
path += '/%d' % change_idx
|
|
|
|
with stash.SensitiveValues() as sv:
|
|
node = sv.derive_path(path)
|
|
|
|
if count is None: # special case - showing single, ignoring start_idx
|
|
address = self.chain.address(node, self.addr_fmt)
|
|
yield 0, address, path
|
|
return
|
|
|
|
path += '/'
|
|
for idx in range(start_idx, start_idx+count):
|
|
if idx > MAX_BIP32_IDX:
|
|
break
|
|
try:
|
|
here = node.copy()
|
|
here.derive(idx, False) # works in-place
|
|
address = self.chain.address(here, self.addr_fmt)
|
|
finally:
|
|
here.blank()
|
|
del here
|
|
|
|
yield idx, address, path+str(idx)
|
|
|
|
def render_address(self, change_idx, idx):
|
|
# Optimized for a single address.
|
|
path = self._path + '/%d/%d' % (change_idx, idx)
|
|
with stash.SensitiveValues() as sv:
|
|
node = sv.derive_path(path)
|
|
return self.chain.address(node, self.addr_fmt)
|
|
|
|
def render_path(self, change_idx, idx):
|
|
# show the derivation path for an address
|
|
return self._path + '/%d/%d' % (change_idx, idx)
|
|
|
|
def to_descriptor(self):
|
|
from descriptor import Descriptor, ExtendedKey
|
|
xfp = settings.get('xfp')
|
|
xpub = settings.get('xpub')
|
|
d = Descriptor(key=ExtendedKey.from_cc_data(xfp, self._path, xpub), addr_fmt=self.addr_fmt)
|
|
return d
|
|
|
|
|
|
class MiniScriptWallet(WalletABC):
|
|
skey = "miniscript"
|
|
# optional: user can short-circuit many checks (system wide, one power-cycle only)
|
|
disable_checks = False
|
|
|
|
def __init__(self, name, desc_tmplt, keys_info, af, ik_u=None,
|
|
desc=None, m_n=None, bip67=None, chain_type=None):
|
|
|
|
assert 1 <= len(name) <= MAX_NAME_LEN, "name len"
|
|
|
|
self.storage_idx = -1
|
|
self.name = name
|
|
self.desc_tmplt = desc_tmplt
|
|
self.keys_info = keys_info
|
|
self.desc = desc
|
|
self.addr_fmt = af
|
|
self.ik_u = ik_u # internal key unspendable (taproot only)
|
|
|
|
# below are basic multisig meta
|
|
# if m_n is not None, we are dealing with basic multisig
|
|
self.m_n = m_n
|
|
self.bip67 = bip67
|
|
|
|
# at this point all the keys are already validated
|
|
self.chain_type = chain_type or chains.current_chain().ctype
|
|
|
|
def serialize(self):
|
|
opts = {"af": self.addr_fmt}
|
|
if self.ik_u is not None:
|
|
opts['ik_u'] = self.ik_u
|
|
if self.chain_type != "BTC":
|
|
opts['ct'] = self.chain_type
|
|
if self.m_n:
|
|
opts['m_n'] = self.m_n
|
|
opts['b67'] = self.bip67
|
|
|
|
return self.name, self.desc_tmplt, self.keys_info, opts
|
|
|
|
@classmethod
|
|
def deserialize(cls, c, idx=-1):
|
|
# after deserialization - we lack loaded descriptor object
|
|
# we do not need it for everything
|
|
needs_migration = False
|
|
if len(c) == 4:
|
|
name, desc_tmplt, keys_info, opts = c
|
|
else:
|
|
# needs migration
|
|
name, desc_tmplt, keys_info, opts = miniscript_640_migrate(c)
|
|
needs_migration = True
|
|
|
|
af = opts.get("af")
|
|
ct = opts.get("ct", "BTC")
|
|
ik_u = opts.get("ik_u", False)
|
|
m_n = opts.get("m_n", None)
|
|
b67 = opts.get("b67", None)
|
|
|
|
rv = cls(name, desc_tmplt, keys_info, af, ik_u, m_n=m_n,
|
|
bip67=b67, chain_type=ct)
|
|
rv.storage_idx = idx
|
|
return rv, needs_migration
|
|
|
|
@property
|
|
def chain(self):
|
|
return chains.get_chain(self.chain_type)
|
|
|
|
@property
|
|
def key_chain(self):
|
|
return chains.get_chain("XTN" if self.chain_type == "XRT" else self.chain_type)
|
|
|
|
@classmethod
|
|
def exists(cls):
|
|
# are there any wallets defined?
|
|
return bool(settings.get(cls.skey, []))
|
|
|
|
@classmethod
|
|
def iter_wallets(cls, name=None, addr_fmts=None):
|
|
# - this is only place we should be searching this list, please!!
|
|
lst = settings.get(cls.skey, [])
|
|
for idx in range(len(lst)):
|
|
w, migrate = cls.deserialize(lst[idx], idx)
|
|
if migrate:
|
|
if idx == 0:
|
|
from glob import dis
|
|
dis.fullscreen("Migrating...")
|
|
|
|
lst[idx] = w.serialize()
|
|
settings.set(cls.skey, lst)
|
|
settings.save()
|
|
|
|
if w.key_chain.ctype != chains.current_key_chain().ctype:
|
|
continue
|
|
if name and name != w.name:
|
|
continue
|
|
if addr_fmts and w.addr_fmt not in addr_fmts:
|
|
continue
|
|
|
|
yield w
|
|
|
|
def commit(self):
|
|
# data to save
|
|
# - important that this fails immediately when nvram overflows
|
|
obj = self.serialize()
|
|
|
|
v = settings.get(self.skey, [])
|
|
orig = v.copy()
|
|
if not v or self.storage_idx == -1:
|
|
# create
|
|
self.storage_idx = len(v)
|
|
v.append(obj)
|
|
else:
|
|
# update in place
|
|
v[self.storage_idx] = obj
|
|
|
|
settings.set(self.skey, v)
|
|
|
|
# save now, rather than in background, so we can recover
|
|
# from out-of-space situation
|
|
try:
|
|
settings.save()
|
|
except:
|
|
# back out change; no longer sure of NVRAM state
|
|
try:
|
|
settings.set(self.skey, orig)
|
|
settings.save()
|
|
except: pass # give up on recovery
|
|
|
|
raise WalletOutOfSpace
|
|
|
|
def delete(self):
|
|
# remove saved entry
|
|
# - important: not expecting more than one instance of this class in memory
|
|
assert self.storage_idx >= 0
|
|
lst = settings.get(self.skey, [])
|
|
try:
|
|
del lst[self.storage_idx]
|
|
if lst:
|
|
settings.set(self.skey, lst)
|
|
else:
|
|
settings.remove_key(self.skey)
|
|
|
|
settings.save() # actual write
|
|
except IndexError: pass
|
|
self.storage_idx = -1
|
|
|
|
@classmethod
|
|
def get_trust_policy(cls):
|
|
which = settings.get('pms', None)
|
|
if which is None:
|
|
which = TRUST_VERIFY if cls.exists() else TRUST_OFFER
|
|
|
|
return which
|
|
|
|
@classmethod
|
|
def find_match(cls, xfp_paths, addr_fmt=None, M=None, N=None):
|
|
for rv in cls.iter_wallets():
|
|
if addr_fmt is not None:
|
|
if rv.addr_fmt != addr_fmt:
|
|
continue
|
|
|
|
if M and N:
|
|
if not rv.m_n:
|
|
continue
|
|
|
|
m, n = rv.m_n
|
|
if m != M or n != N:
|
|
continue
|
|
|
|
if rv.matching_subpaths(xfp_paths):
|
|
return rv
|
|
|
|
return None
|
|
|
|
def xfp_paths(self, skip_unspend_ik=False):
|
|
if not self.desc:
|
|
res = []
|
|
for i, k_str in enumerate(self.keys_info):
|
|
if not i and self.ik_u and skip_unspend_ik:
|
|
continue
|
|
k = ExtendedKey.from_string(k_str)
|
|
res.append(k.origin.psbt_derivation())
|
|
return res
|
|
|
|
return self.desc.xfp_paths(skip_unspend_ik=skip_unspend_ik)
|
|
|
|
def matching_subpaths(self, xfp_paths):
|
|
my_xfp_paths = self.to_descriptor().xfp_paths()
|
|
if len(xfp_paths) != len(my_xfp_paths):
|
|
return False
|
|
|
|
for x in my_xfp_paths:
|
|
prefix_len = len(x)
|
|
for y in xfp_paths:
|
|
if x == y[:prefix_len]:
|
|
break
|
|
else:
|
|
return False
|
|
return True
|
|
|
|
def subderivation_indexes(self, xfp_paths):
|
|
# we already know that they do match
|
|
my_xfp_paths = self.to_descriptor().xfp_paths()
|
|
res = set()
|
|
for x in my_xfp_paths:
|
|
prefix_len = len(x)
|
|
for y in xfp_paths:
|
|
if x == y[:prefix_len]:
|
|
to_derive = tuple(y[prefix_len:])
|
|
if to_derive:
|
|
res.add(to_derive)
|
|
|
|
err = "derivation indexes"
|
|
assert res, err
|
|
if len(res) == 1:
|
|
branch, idx = list(res)[0]
|
|
else:
|
|
branch = [i[0] for i in res]
|
|
indexes = set([i[1] for i in res])
|
|
assert len(indexes) == 1, err
|
|
idx = list(indexes)[0]
|
|
|
|
return branch, idx
|
|
|
|
def get_my_deriv(self):
|
|
# returns derivation path of the first "our" key in keys info vector
|
|
# used for signed exports only
|
|
str_xfp = xfp2str(settings.get('xfp'))
|
|
for ek in self.keys_info:
|
|
orig_end = ek.find("]")
|
|
if orig_end == -1:
|
|
continue # key without origin
|
|
|
|
orig = ek[1:orig_end]
|
|
fp_end = orig.find("/")
|
|
if fp_end == -1:
|
|
master_fp = orig
|
|
fp_end = len(orig)
|
|
else:
|
|
master_fp = orig[:fp_end]
|
|
|
|
if master_fp.upper() == str_xfp:
|
|
return "m" + orig[fp_end:]
|
|
|
|
# didn't find any origin info
|
|
# BUT we know that our key is included (verified on import)
|
|
# therefore our key root key
|
|
return "m"
|
|
|
|
def derive_desc(self, xfp_paths):
|
|
branch, idx = self.subderivation_indexes(xfp_paths)
|
|
derived_desc = self.desc.derive(branch).derive(idx)
|
|
return derived_desc
|
|
|
|
def validate_script_pubkey(self, script_pubkey, xfp_paths, merkle_root=None):
|
|
derived_desc = self.derive_desc(xfp_paths)
|
|
derived_spk = derived_desc.script_pubkey()
|
|
assert derived_spk == script_pubkey, "spk mismatch\n\ncalc:\n%s\n\npsbt:\n%s" % (
|
|
b2a_hex(derived_spk).decode(), b2a_hex(script_pubkey).decode()
|
|
)
|
|
if merkle_root:
|
|
calc = derived_desc.tapscript.merkle_root
|
|
assert calc == merkle_root, "merkle root mismatch\n\ncalc:\n%s\n\npsbt:\n%s" % (
|
|
b2a_hex(calc).decode(), b2a_hex(merkle_root).decode()
|
|
)
|
|
return derived_desc
|
|
|
|
def detail(self):
|
|
s = "Wallet Name:\n %s\n\n" % self.name
|
|
if self.m_n:
|
|
# basic multisig
|
|
M, N = self.m_n
|
|
s += "Policy: %d of %d\n\n" % (M, N)
|
|
|
|
if M == N == 1:
|
|
s += 'The one signer must approve spends.'
|
|
elif M == N:
|
|
s += 'All %d co-signers must approve spends.' % N
|
|
elif M == 1:
|
|
s += 'Any signature from %d co-signers will approve spends.' % N
|
|
else:
|
|
s += '%d signatures, from %d possible co-signers, will be required to approve spends.' % (M, N)
|
|
|
|
s += "\n\n"
|
|
|
|
s += chains.addr_fmt_label(self.addr_fmt)
|
|
s += "\n\n" + self.desc_tmplt
|
|
return s
|
|
|
|
async def show_detail(self, story="", offer_import=False):
|
|
story += self.detail()
|
|
story += "\n\nPress (1) to see extended public keys"
|
|
|
|
if offer_import:
|
|
story += ", OK to approve, X to cancel."
|
|
|
|
while True:
|
|
ch = await ux_show_story(story, escape="1")
|
|
if ch == "1":
|
|
await self.show_keys()
|
|
|
|
elif (ch == "y") and offer_import:
|
|
return True
|
|
elif ch == "x":
|
|
return False
|
|
|
|
async def show_keys(self):
|
|
msg = ""
|
|
for idx, k_str in enumerate(self.keys_info):
|
|
if idx:
|
|
msg += '\n---===---\n\n'
|
|
elif self.addr_fmt == AF_P2TR and self.ik_u:
|
|
msg += "Provably unspendable internal key:\n\n"
|
|
|
|
msg += '@%s:\n %s\n\n' % (idx, k_str)
|
|
|
|
await ux_show_story(msg)
|
|
|
|
def to_descriptor(self):
|
|
if self.desc is None:
|
|
# actual descriptor is not loaded, but was asked for
|
|
# fill policy - aka storage format - to actual descriptor
|
|
|
|
if self.name in DESC_CACHE:
|
|
# loaded descriptor from cache
|
|
self.desc = DESC_CACHE[self.name]
|
|
else:
|
|
# print("loading... policy --> descriptor !!!")
|
|
# no need to validate already saved descriptor - was validated upon enroll
|
|
self.desc = self._from_bip388_wallet_policy(self.desc_tmplt, self.keys_info,
|
|
validate=False)
|
|
# cache len always 1
|
|
DESC_CACHE.clear()
|
|
DESC_CACHE[self.name] = self.desc
|
|
|
|
return self.desc
|
|
|
|
@staticmethod
|
|
def _from_bip388_wallet_policy(desc_template, keys_info, validate=True):
|
|
desc_str = bip388_wallet_policy_to_descriptor(desc_template, keys_info)
|
|
from descriptor import Descriptor
|
|
desc_obj = Descriptor.from_string(desc_str)
|
|
if validate:
|
|
desc_obj.validate(MiniScriptWallet.disable_checks)
|
|
return desc_obj
|
|
|
|
@classmethod
|
|
def from_bip388_wallet_policy(cls, name, desc_template, keys_info):
|
|
bip388_validate_policy(desc_template, keys_info)
|
|
desc_template = desc_template.replace("/<0;1>/*", "/**")
|
|
desc_obj = cls._from_bip388_wallet_policy(desc_template, keys_info)
|
|
msc = cls.from_descriptor_obj(name, desc_obj, desc_template, keys_info)
|
|
return msc
|
|
|
|
@classmethod
|
|
def from_descriptor_obj(cls, name, desc_obj, desc_tmplt=None, keys_info=None):
|
|
if not desc_tmplt or not keys_info:
|
|
# BIP388 wasn't generated yet - generating from descriptor upon import/enroll
|
|
desc_tmplt, keys_info = desc_obj.bip388_wallet_policy()
|
|
# self-validation
|
|
bip388_validate_policy(desc_tmplt, keys_info)
|
|
|
|
ik_u = desc_obj.key and desc_obj.key.is_provably_unspendable
|
|
af = desc_obj.addr_fmt
|
|
m_n = None
|
|
bip67 = None
|
|
if desc_obj.is_basic_multisig:
|
|
m_n = desc_obj.miniscript.m_n()
|
|
bip67 = desc_obj.is_sortedmulti
|
|
|
|
return cls(name, desc_tmplt, keys_info, af, ik_u, desc_obj, m_n, bip67)
|
|
|
|
@classmethod
|
|
def from_file(cls, config, name=None, bip388=False):
|
|
from descriptor import Descriptor
|
|
|
|
if bip388:
|
|
# config is JSON wallet policy
|
|
wal = cls.from_bip388_wallet_policy(config["name"], config["desc_template"],
|
|
config["keys_info"])
|
|
else:
|
|
if name is None:
|
|
desc_obj, cs = Descriptor.from_string(config.strip(), checksum=True)
|
|
name = cs
|
|
else:
|
|
name = to_ascii_printable(name)
|
|
desc_obj = Descriptor.from_string(config.strip())
|
|
|
|
desc_obj.validate(cls.disable_checks)
|
|
|
|
wal = cls.from_descriptor_obj(name, desc_obj)
|
|
|
|
return wal
|
|
|
|
@classmethod
|
|
def import_from_psbt(cls, addr_fmt, M, N, xpubs_list):
|
|
# given the raw data from PSBT global header, offer the user
|
|
# the details, and/or bypass that all and just trust the data.
|
|
# - xpubs_list is a list of (xfp+path, binary BIP-32 xpub)
|
|
# - already know not in our records.
|
|
from descriptor import Descriptor
|
|
from miniscript import Sortedmulti, Number
|
|
|
|
# build up an in-memory version of the wallet.
|
|
# - capture address format based on path used for my leg (if standards compliant)
|
|
|
|
assert N == len(xpubs_list)
|
|
assert 1 <= M <= N <= 20, 'M/N range'
|
|
my_xfp = settings.get('xfp')
|
|
|
|
has_mine = 0
|
|
|
|
keys = []
|
|
for ek, xfp_pth in xpubs_list:
|
|
k = ExtendedKey.from_psbt_xpub(ek, xfp_pth)
|
|
has_mine += k.validate(my_xfp, cls.disable_checks)
|
|
keys.append(k)
|
|
|
|
assert has_mine == 1 # 'my key not included'
|
|
|
|
name = 'PSBT-%d-of-%d' % (M, N)
|
|
# this will always create sortedmulti multisig (BIP-67)
|
|
# because BIP-174 came years after wide-spread acceptance of BIP-67 policy
|
|
desc_obj = Descriptor(miniscript=Sortedmulti(Number(M), *keys),
|
|
addr_fmt=addr_fmt)
|
|
return cls.from_descriptor_obj(name, desc_obj)
|
|
|
|
def validate_psbt_xpubs(self, psbt_xpubs):
|
|
# validate via set equality on string representation of the key(s)
|
|
# using __hash__ of the key object ignores origin derivation
|
|
keys = set()
|
|
for ek, xfp_pth in psbt_xpubs:
|
|
key = ExtendedKey.from_psbt_xpub(ek, xfp_pth)
|
|
key.validate(settings.get('xfp', 0), self.disable_checks)
|
|
keys.add(key.to_string(external=False, internal=False))
|
|
|
|
if not self.disable_checks:
|
|
assert set(self.keys_info) == keys, "PSBT xpubs mismatch"
|
|
|
|
def ux_unique_name_msg(self, name=None):
|
|
return ("%s wallet with name '%s' already exists. All wallets MUST"
|
|
" have unique names.\n\n" % ("Multisig" if self.m_n else "Miniscript", name or self.name))
|
|
|
|
def find_duplicates(self):
|
|
for rv in self.iter_wallets():
|
|
assert self.name != rv.name, self.ux_unique_name_msg()
|
|
|
|
# optimization miniscript vs. multisig & different M/N multisigs
|
|
if self.m_n != rv.m_n:
|
|
# different M/N
|
|
continue
|
|
|
|
err = "Duplicate wallet. Wallet '%s' is the same." % rv.name
|
|
if self.m_n:
|
|
# enrolling basic multisig wallet
|
|
if self.addr_fmt == rv.addr_fmt and sorted(self.keys_info) == sorted(rv.keys_info):
|
|
if self.bip67 != rv.bip67:
|
|
err += " BIP-67 clash."
|
|
err += "\n\n"
|
|
assert False, err
|
|
|
|
else:
|
|
if self.desc_tmplt == rv.desc_tmplt and self.keys_info == rv.keys_info:
|
|
assert False, err + "\n\n"
|
|
|
|
async def confirm_import(self):
|
|
# Return T if the user approves of this new wallet
|
|
try:
|
|
allow_import = True
|
|
self.find_duplicates()
|
|
story = "Create new %s wallet?\n\n" % ('multisig' if self.m_n else 'miniscript')
|
|
if self.m_n and not self.bip67:
|
|
story += ("WARNING: BIP-67 disabled! Unsorted multisig - "
|
|
"order of keys in descriptor/backup is crucial\n\n")
|
|
|
|
except AssertionError as e:
|
|
story, allow_import = str(e), False
|
|
|
|
if not await self.show_detail(story, offer_import=allow_import):
|
|
# user didn't like it, stop
|
|
return False
|
|
|
|
# save new record
|
|
assert self.storage_idx == -1
|
|
self.commit()
|
|
|
|
# new wallet was imported, so cache its descriptor
|
|
assert self.desc
|
|
DESC_CACHE.clear()
|
|
DESC_CACHE[self.name] = self.desc
|
|
|
|
await ux_dramatic_pause("Saved.", 2)
|
|
|
|
return True
|
|
|
|
def yield_addresses(self, start_idx, count, change_idx=0, scripts=False):
|
|
ch = chains.current_chain()
|
|
# change_idx work as boolean here - you cannot specify random change_idx
|
|
# as it is defined by descriptor
|
|
dd = self.to_descriptor().derive(None, change=bool(change_idx))
|
|
idx = start_idx
|
|
while count:
|
|
if idx > MAX_BIP32_IDX:
|
|
break
|
|
# make the redeem script, convert into address
|
|
d = dd.derive(idx)
|
|
scr = d.miniscript.compile() if d.miniscript else None
|
|
addr = ch.render_address(d.script_pubkey(compiled_scr=scr))
|
|
ders = script = None
|
|
if scripts:
|
|
ders = ""
|
|
for k in d.keys:
|
|
ders += "[%s]; " % str(k.origin)
|
|
|
|
if d.tapscript:
|
|
# DFS ordered list of scripts
|
|
script = ""
|
|
for leaf_ver, scr, _ in d.tapscript._processed_tree:
|
|
script += b2a_hex(chains.tapscript_serialize(scr, leaf_ver)).decode() + "; "
|
|
else:
|
|
script = b2a_hex(ser_string(scr)).decode()
|
|
|
|
yield idx, addr, ders, script
|
|
|
|
idx += 1
|
|
count -= 1
|
|
|
|
def make_addresses_msg(self, msg, start, n, change=0):
|
|
from glob import dis
|
|
|
|
addrs = []
|
|
|
|
for idx, addr, *_ in self.yield_addresses(start, n, change):
|
|
msg += '.../%d =>\n' % idx # just idx, if derivations or scripts needed - export csv
|
|
addrs.append(addr)
|
|
msg += show_single_address(addr) + '\n\n'
|
|
dis.progress_sofar(idx - start + 1, n)
|
|
|
|
return msg, addrs
|
|
|
|
def generate_address_csv(self, start, n, change, saver=None):
|
|
scripts = settings.get("aemscsv", False)
|
|
header = ['Index', 'Payment Address']
|
|
if scripts:
|
|
header += ['Script', 'Derivations']
|
|
|
|
yield '"' + '","'.join(header) + '"\n'
|
|
for idx, addr, ders, script in self.yield_addresses(start, n, change, scripts=scripts):
|
|
if saver:
|
|
saver(addr, idx)
|
|
|
|
ln = '%d,"%s"' % (idx, addr)
|
|
if scripts:
|
|
ln += ',"%s"' % script
|
|
ln += ',"%s"' % ders
|
|
ln += '\n'
|
|
yield ln
|
|
|
|
def to_string(self, checksum=True):
|
|
# policy filling - not possible to specify internal/external always multipath export
|
|
# only supported from bitcoin-core 29.0
|
|
if self.desc_tmplt and self.keys_info:
|
|
desc = bip388_wallet_policy_to_descriptor(self.desc_tmplt, self.keys_info)
|
|
if checksum:
|
|
desc = append_checksum(desc)
|
|
return desc
|
|
|
|
return self.desc.to_string()
|
|
|
|
def bitcoin_core_serialize(self):
|
|
return [{
|
|
"desc": self.to_string(), # policy fill
|
|
"active": True,
|
|
"timestamp": "now",
|
|
"range": [0, 100],
|
|
}]
|
|
|
|
def make_fname(self, prefix, suffix='txt'):
|
|
name = self.name.replace(' ', '_')
|
|
name = name.replace('/', '-')
|
|
return '%s-%s.%s' % (prefix, name, suffix)
|
|
|
|
async def export_wallet_file(self, core=False, bip388=False, sign=True):
|
|
# do not load descriptor - just fill policy
|
|
# only with multipath format <0;1>
|
|
from glob import NFC, dis
|
|
from ux import import_export_prompt
|
|
|
|
dis.fullscreen('Wait...')
|
|
|
|
t = "Multisig" if self.m_n else "Miniscript"
|
|
|
|
if core:
|
|
name = "Bitcoin Core %s" % t
|
|
fname_pattern = self.make_fname('bitcoin-core')
|
|
msg = "importdescriptors cmd"
|
|
core_obj = self.bitcoin_core_serialize()
|
|
core_str = ujson.dumps(core_obj)
|
|
res = "importdescriptors '%s'\n" % core_str
|
|
elif bip388:
|
|
# policy as JSON
|
|
msg = self.name
|
|
name = "BIP-388 Wallet Policy"
|
|
fname_pattern = self.make_fname("b388", "json")
|
|
res = ujson.dumps({"name": self.name,
|
|
"desc_template": self.desc_tmplt,
|
|
"keys_info": self.keys_info})
|
|
else:
|
|
name = t
|
|
fname_pattern = self.make_fname("multi" if self.m_n else "minsc")
|
|
msg = self.name
|
|
res = self.to_string()
|
|
|
|
ch = await import_export_prompt("%s file" % name)
|
|
if isinstance(ch, str):
|
|
if ch in "3"+KEY_NFC:
|
|
if bip388:
|
|
await NFC.share_json(res)
|
|
else:
|
|
await NFC.share_text(res)
|
|
elif ch == KEY_QR:
|
|
try:
|
|
from ux import show_qr_code
|
|
await show_qr_code(res, msg=msg)
|
|
except:
|
|
if version.has_qwerty:
|
|
from ux_q1 import show_bbqr_codes
|
|
await show_bbqr_codes('U', res, msg)
|
|
return
|
|
|
|
try:
|
|
with CardSlot(**ch) as card:
|
|
fname, nice = card.pick_filename(fname_pattern)
|
|
|
|
# do actual write
|
|
with open(fname, 'w+') as fp:
|
|
fp.write(res)
|
|
|
|
if sign:
|
|
# sign with my key at the same path as first address of export
|
|
derive = self.get_my_deriv() + "/0/0"
|
|
from msgsign import write_sig_file
|
|
h = ngu.hash.sha256s(res.encode())
|
|
sig_nice = write_sig_file([(h, fname)], derive, AF_CLASSIC)
|
|
|
|
msg = '%s file written:\n\n%s' % (name, nice)
|
|
if sign:
|
|
msg += '\n\n%s signature file written:\n\n%s' % (name, sig_nice)
|
|
await ux_show_story(msg)
|
|
|
|
except CardMissingError:
|
|
await needs_microsd()
|
|
return
|
|
except Exception as e:
|
|
await ux_show_story('Failed to write!\n\n%s\n%s' % (e, problem_file_line(e)))
|
|
return
|
|
|
|
def xpubs_from_xfp(self, xfp):
|
|
# return list of XPUB's which match xfp
|
|
res = []
|
|
desc = self.to_descriptor()
|
|
for key in desc.keys:
|
|
ks = key.keys if isinstance(key, MusigKey) else [key]
|
|
for k in ks:
|
|
if k.origin and k.origin.cc_fp == xfp:
|
|
res.append(k)
|
|
elif swab32(k.node.my_fp()) == xfp:
|
|
res.append(k)
|
|
|
|
assert res, "missing xfp %s" % xfp2str(xfp)
|
|
# returned is list of keys with corresponding master xfp
|
|
# key in list are lexicographically sorted based on their public keys
|
|
# lowest public key first
|
|
return sorted(res, key=lambda o: o.serialize())
|
|
|
|
def kt_make_rxkey(self, xfp):
|
|
# Derive the receiver's pubkey from preshared xpub and a special derivation
|
|
# - also provide the keypair we're using from our side of connection
|
|
# - returns 4 byte nonce which is sent un-encrypted, his_pubkey and my_keypair
|
|
ri = ngu.random.uniform(1<<28)
|
|
|
|
# sorted lexicographically, always use the lowest pubkey from the list at index 0
|
|
keys = self.xpubs_from_xfp(xfp)
|
|
k = keys[0]
|
|
k = k.derive(KT_RXPUBKEY_DERIV).derive(ri)
|
|
pubkey = k.node.pubkey()
|
|
|
|
kp = self.kt_my_keypair(ri)
|
|
return ri.to_bytes(4, 'big'), pubkey, kp
|
|
|
|
def kt_my_keypair(self, ri):
|
|
# Calc my keypair for sending PSBT files.
|
|
#
|
|
# sorted lexicographically, always use the lowest pubkey from the list at index 0
|
|
keys = self.xpubs_from_xfp(settings.get('xfp'))
|
|
|
|
subpath = "/%d/%d" % (KT_RXPUBKEY_DERIV, ri)
|
|
path = keys[0].origin.str_derivation() + subpath
|
|
with stash.SensitiveValues() as sv:
|
|
node = sv.derive_path(path)
|
|
kp = ngu.secp256k1.keypair(node.privkey())
|
|
return kp
|
|
|
|
@classmethod
|
|
def kt_search_rxkey(cls, payload):
|
|
# Construct the keypair for to be decryption
|
|
# - has to try pubkey each all the unique XFP for all co-signers in all wallets
|
|
# - checks checksum of ECDH unwrapped data to see if it's the right one
|
|
# - returns session key, decrypted first layer, and XFP of sender
|
|
from teleport import decode_step1
|
|
|
|
# this nonce is part of the derivation path so each txn gets new keys
|
|
ri = int.from_bytes(payload[0:4], 'big')
|
|
|
|
my_xfp = settings.get('xfp')
|
|
|
|
for msc in cls.iter_wallets():
|
|
kp = msc.kt_my_keypair(ri)
|
|
|
|
for key in msc.to_descriptor().keys:
|
|
ks = key.keys if isinstance(key, MusigKey) else [key]
|
|
for k in ks:
|
|
if not k.origin: continue
|
|
if k.origin.cc_fp == my_xfp:
|
|
continue
|
|
kk = k.derive(KT_RXPUBKEY_DERIV).derive(ri)
|
|
his_pubkey = kk.node.pubkey()
|
|
# if implied session key decodes the checksum, it is right
|
|
ses_key, body = decode_step1(kp, his_pubkey, payload[4:])
|
|
if ses_key:
|
|
return ses_key, body, kk.origin.cc_fp
|
|
|
|
return None, None, None
|
|
|
|
async def export_electrum(self):
|
|
# Generate and save an Electrum JSON file.
|
|
from export import export_contents
|
|
|
|
assert self.m_n, "not multisig"
|
|
M, N = self.m_n
|
|
|
|
def doit():
|
|
rv = dict(seed_version=17, use_encryption=False,
|
|
wallet_type='%dof%d' % (M, N))
|
|
|
|
ch = self.chain
|
|
|
|
# the important stuff.
|
|
for idx, key in enumerate(self.to_descriptor().keys):
|
|
# CHALLENGE: we must do slip-132 format [yz]pubs here when not p2sh mode.
|
|
xp = ch.serialize_public(key.node, self.addr_fmt)
|
|
|
|
rv['x%d/' % (idx + 1)] = {"hw_type":"coldcard", "type":"hardware",
|
|
"ckcc_xfp": key.origin.cc_fp, "xpub":xp,
|
|
"label":"Coldcard %s" % xfp2str(key.origin.cc_fp),
|
|
"derivation":key.origin.str_derivation()}
|
|
|
|
# sign export with first p2pkh key
|
|
return ujson.dumps(rv), self.get_my_deriv() + "/0/0", AF_CLASSIC
|
|
|
|
await export_contents('Electrum multisig wallet', doit,
|
|
self.make_fname("el", "json"), is_json=True)
|
|
|
|
async def miniscript_delete(msc):
|
|
if not await ux_confirm("Delete miniscript wallet '%s'?\n\nFunds may be impacted." % msc.name):
|
|
await ux_dramatic_pause('Aborted.', 3)
|
|
return
|
|
|
|
msc.delete()
|
|
await ux_dramatic_pause('Deleted.', 3)
|
|
|
|
async def miniscript_wallet_delete(menu, label, item):
|
|
msc = item.arg
|
|
|
|
await miniscript_delete(msc)
|
|
|
|
from ux import the_ux
|
|
# pop stack
|
|
the_ux.pop()
|
|
|
|
m = the_ux.top_of_stack()
|
|
m.update_contents()
|
|
|
|
async def miniscript_wallet_rename(menu, label, item):
|
|
from glob import dis
|
|
from ux import ux_input_text, the_ux
|
|
|
|
idx, msc = item.arg
|
|
new_name = await ux_input_text(msc.name, confirm_exit=False,
|
|
min_len=1, max_len=MAX_NAME_LEN)
|
|
|
|
if not new_name:
|
|
return
|
|
|
|
wallets = settings.get("miniscript", [])
|
|
names = [i[0] for i in wallets]
|
|
if new_name in names:
|
|
await ux_show_story(msc.ux_unique_name_msg(new_name), title="FAILED")
|
|
return
|
|
|
|
dis.fullscreen("Saving...")
|
|
|
|
# save it
|
|
wal = list(wallets[idx])
|
|
wal[0] = new_name
|
|
# it will become list after JSON encode/decode anyways
|
|
wallets[idx] = wal
|
|
msc.name = new_name
|
|
settings.set("miniscript", wallets)
|
|
|
|
# update label in sub-menu
|
|
menu.items[0].label = new_name
|
|
# and name in parent menu too
|
|
parent = the_ux.parent_of(menu)
|
|
if parent:
|
|
parent.update_contents()
|
|
|
|
async def miniscript_wallet_detail(menu, label, item):
|
|
# show details of single multisig wallet
|
|
msc = item.arg
|
|
return await msc.show_detail()
|
|
|
|
async def import_miniscript(*a):
|
|
# pick text file from SD card, import as multisig setup file
|
|
from actions import file_picker
|
|
from ux import import_export_prompt
|
|
|
|
ch = await import_export_prompt("miniscript wallet file", is_import=True)
|
|
if isinstance(ch, str):
|
|
if ch == KEY_QR:
|
|
await import_miniscript_qr()
|
|
elif ch == KEY_NFC:
|
|
await import_miniscript_nfc()
|
|
return
|
|
|
|
def possible(filename):
|
|
with open(filename, 'rt') as fd:
|
|
for ln in fd:
|
|
if "sh(" in ln or "wsh(" in ln or "tr(" in ln:
|
|
# descriptor import
|
|
return True
|
|
|
|
fn = await file_picker(suffix=['.txt', '.json'], min_size=100,
|
|
taster=possible, **ch)
|
|
if not fn: return
|
|
|
|
try:
|
|
with CardSlot(**ch) as card:
|
|
with open(fn, 'rt') as fp:
|
|
data = fp.read()
|
|
except CardMissingError:
|
|
await needs_microsd()
|
|
return
|
|
|
|
from auth import maybe_enroll_xpub
|
|
try:
|
|
possible_name = (fn.split('/')[-1].split('.'))[0] if fn else None
|
|
maybe_enroll_xpub(config=data, name=possible_name)
|
|
except BaseException as e:
|
|
# import sys;sys.print_exception(e)
|
|
await ux_show_story('Failed to import miniscript.\n\n%s\n%s' % (e, problem_file_line(e)))
|
|
|
|
async def import_miniscript_nfc(*a):
|
|
from glob import NFC
|
|
try:
|
|
return await NFC.import_miniscript_nfc()
|
|
except Exception as e:
|
|
await ux_show_story('Failed to import miniscript.\n\n%s\n%s' % (e, problem_file_line(e)))
|
|
|
|
async def import_miniscript_qr(*a):
|
|
from auth import maybe_enroll_xpub
|
|
from ux_q1 import QRScannerInteraction
|
|
data = await QRScannerInteraction().scan_text('Scan Multisig/Miniscript from a QR code')
|
|
if not data:
|
|
# press pressed CANCEL
|
|
return
|
|
try:
|
|
maybe_enroll_xpub(config=data)
|
|
except Exception as e:
|
|
await ux_show_story('Failed to import miniscript.\n\n%s\n%s' % (e, problem_file_line(e)))
|
|
|
|
async def miniscript_wallet_export(menu, label, item):
|
|
# create a text file with the details; ready for import to next Coldcard
|
|
msc = item.arg[0]
|
|
kwargs = item.arg[1]
|
|
await msc.export_wallet_file(**kwargs)
|
|
|
|
async def miniscript_wallet_descriptors(menu, label, item):
|
|
# descriptor menu
|
|
msc = item.arg
|
|
if not msc:
|
|
return
|
|
|
|
rv = [
|
|
MenuItem('Export', f=miniscript_wallet_export, arg=(msc, {"core": False})),
|
|
MenuItem('Bitcoin Core', f=miniscript_wallet_export, arg=(msc, {"core": True})),
|
|
MenuItem('BIP-388 Policy', f=miniscript_wallet_export, arg=(msc, {"bip388":True})),
|
|
]
|
|
return rv
|
|
|
|
async def miniscript_sign_psbt(a, b, item):
|
|
from actions import _ready2sign
|
|
await _ready2sign(probe=False, miniscript_wallet=item.arg)
|
|
|
|
async def make_miniscript_wallet_menu(menu, label, item):
|
|
# details, actions on single multisig wallet
|
|
idx, msc = item.arg
|
|
|
|
rv = [
|
|
MenuItem('"%s"' % msc.name, f=miniscript_wallet_detail, arg=msc),
|
|
MenuItem('View Details', f=miniscript_wallet_detail, arg=msc),
|
|
MenuItem('Descriptors', menu=miniscript_wallet_descriptors, arg=msc),
|
|
MenuItem('Sign PSBT', f=miniscript_sign_psbt, arg=msc),
|
|
MenuItem('Rename', f=miniscript_wallet_rename, arg=(idx, msc)),
|
|
MenuItem('Delete', f=miniscript_wallet_delete, arg=msc),
|
|
]
|
|
if msc.m_n and msc.bip67:
|
|
# basic multisig but only sortedmulti
|
|
rv.append(MenuItem('Electrum Wallet', f=multisig_electrum_export, arg=msc))
|
|
|
|
return rv
|
|
|
|
|
|
class MiniscriptMenu(MenuSystem):
|
|
@classmethod
|
|
def construct(cls):
|
|
import version
|
|
from menu import ShortcutItem
|
|
from bsms import make_ms_wallet_bsms_menu
|
|
from multisig import create_ms_step1
|
|
|
|
rv = []
|
|
for i, msc in enumerate(MiniScriptWallet.iter_wallets()):
|
|
rv.append(MenuItem('%s' % msc.name, menu=make_miniscript_wallet_menu, arg=(i,msc)))
|
|
|
|
rv = rv or [MenuItem("(none setup yet)")]
|
|
|
|
from glob import NFC
|
|
rv.append(MenuItem('Import', f=import_miniscript))
|
|
rv.append(MenuItem('Export XPUB', f=export_miniscript_xpubs))
|
|
rv.append(MenuItem('BSMS (BIP-129)', menu=make_ms_wallet_bsms_menu))
|
|
rv.append(MenuItem('Create Airgapped', f=create_ms_step1))
|
|
rv.append(MenuItem('Trust PSBT?', f=trust_psbt_menu))
|
|
rv.append(MenuItem('Skip Checks?', f=disable_checks_menu))
|
|
rv.append(ShortcutItem(KEY_NFC, predicate=lambda: NFC is not None,
|
|
f=import_miniscript_nfc))
|
|
rv.append(ShortcutItem(KEY_QR, predicate=lambda: version.has_qwerty,
|
|
f=import_miniscript_qr))
|
|
return rv
|
|
|
|
def update_contents(self):
|
|
# Reconstruct the list of wallets on this dynamic menu, because
|
|
# we added or changed them and are showing that same menu again.
|
|
tmp = self.construct()
|
|
self.replace_items(tmp)
|
|
|
|
async def make_miniscript_menu(*a):
|
|
# list of all multisig wallets, and high-level settings/actions
|
|
from pincodes import pa
|
|
|
|
if pa.is_secret_blank():
|
|
await ux_show_story("You must have wallet seed before creating miniscript wallets.")
|
|
return
|
|
|
|
# 6.4.0 multisig migration is done in login_sequence
|
|
# this is duplicate for users that have multisig wallets stored in tmp seed settings
|
|
# executes upon entry to "Multisig/Miniscript" menu
|
|
await do_640_multisig_migration()
|
|
|
|
rv = MiniscriptMenu.construct()
|
|
return MiniscriptMenu(rv)
|
|
|
|
|
|
def disable_checks_chooser():
|
|
ch = ['Normal', 'Skip Checks']
|
|
|
|
def xset(idx, text):
|
|
MiniScriptWallet.disable_checks = bool(idx)
|
|
|
|
return int(MiniScriptWallet.disable_checks), ch, xset
|
|
|
|
async def disable_checks_menu(*a):
|
|
|
|
if not MiniScriptWallet.disable_checks:
|
|
ch = await ux_show_story('''\
|
|
With many different wallet vendors and implementors involved, it can \
|
|
be hard to create a PSBT consistent with the many keys involved. \
|
|
With this setting, you can \
|
|
disable the more stringent verification checks your Coldcard normally provides.
|
|
|
|
USE AT YOUR OWN RISK. These checks exist for good reason! Signed txn may \
|
|
not be accepted by network.
|
|
|
|
This settings lasts only until power down.
|
|
|
|
Press (4) to confirm entering this DANGEROUS mode.
|
|
''', escape='4')
|
|
|
|
if ch != '4': return
|
|
|
|
start_chooser(disable_checks_chooser)
|
|
|
|
|
|
def psbt_xpubs_policy_chooser():
|
|
# Chooser for trust policy
|
|
ch = ['Verify Only', 'Offer Import', 'Trust PSBT']
|
|
|
|
def xset(idx, text):
|
|
settings.set('pms', idx)
|
|
|
|
return MiniScriptWallet.get_trust_policy(), ch, xset
|
|
|
|
async def trust_psbt_menu(*a):
|
|
# show a story then go into chooser
|
|
|
|
ch = await ux_show_story('''\
|
|
This setting controls what the Coldcard does \
|
|
with the co-signer public keys (XPUB) that may \
|
|
be provided inside a PSBT file. Three choices:
|
|
|
|
- Verify Only. Do not import the xpubs found, but do \
|
|
verify the correct wallet already exists on the Coldcard.
|
|
|
|
- Offer Import. If it's a new multisig wallet, offer to import \
|
|
the details and store them as a new wallet in the Coldcard.
|
|
|
|
- Trust PSBT. Use the wallet data in the PSBT as a temporary,
|
|
multisig wallet, and do not import it. This permits some \
|
|
deniability and additional privacy.
|
|
|
|
When the XPUB data is not provided in the PSBT, regardless of the above, \
|
|
we require the appropriate multisig wallet to already exist \
|
|
on the Coldcard. Default is to 'Offer' unless a multisig wallet already \
|
|
exists, otherwise 'Verify'.''')
|
|
|
|
if ch == 'x': return
|
|
start_chooser(psbt_xpubs_policy_chooser)
|
|
|
|
|
|
async def multisig_electrum_export(menu, label, item):
|
|
# create a JSON file that Electrum can use. Challenges:
|
|
# - file contains derivation paths for each co-signer to use
|
|
# - electrum is using BIP-43 with purpose=48 (purpose48_derivation) to make paths like:
|
|
# m/48h/1h/0h/2h
|
|
# - above is now called BIP-48
|
|
# - other signers might not be coldcards (we don't know)
|
|
# solution:
|
|
# - when building air-gap, pick address type at that point, and matching path to suit
|
|
# - could check path prefix and addr_fmt make sense together, but meh.
|
|
msc = item.arg
|
|
await msc.export_electrum()
|
|
|
|
|
|
async def export_miniscript_xpubs(*a, xfp=None, alt_secret=None, skip_prompt=False):
|
|
# WAS: Create a single text file with lots of docs, and all possible useful xpub values.
|
|
# THEN: Just create the one-liner xpub export value they need/want to support BIP-45
|
|
# NOW: Export JSON with one xpub per useful address type and semi-standard derivation path
|
|
#
|
|
# - consumer for this file is supposed to be ourselves, when we build on-device multisig.
|
|
# - however some 3rd parties are making use of it as well.
|
|
# - used for CCC feature now as well, but result looks just like normal export
|
|
#
|
|
xfp = xfp2str(xfp or settings.get('xfp', 0))
|
|
chain = chains.current_chain()
|
|
|
|
fname_pattern = 'ccxp-%s.json' % xfp
|
|
label = "Multisig XPUB"
|
|
|
|
if not skip_prompt:
|
|
msg = '''\
|
|
This feature creates a small file containing \
|
|
the extended public keys (XPUB) you would need to join \
|
|
a multisig wallet.
|
|
|
|
Public keys for BIP-48 conformant paths are used:
|
|
|
|
P2SH-P2WSH:
|
|
m/48h/{coin}h/{{acct}}h/1h
|
|
P2WSH:
|
|
m/48h/{coin}h/{{acct}}h/2h
|
|
P2TR:
|
|
m/48h/{coin}h/{{acct}}h/3h
|
|
|
|
{ok} to continue. {x} to abort.'''.format(coin=chain.b44_cointype, ok=OK, x=X)
|
|
|
|
ch = await ux_show_story(msg)
|
|
if ch != "y":
|
|
return
|
|
|
|
acct = await ux_enter_bip32_index('Account Number:') or 0
|
|
|
|
def render(acct_num):
|
|
sign_der = None
|
|
with uio.StringIO() as fp:
|
|
fp.write('{\n')
|
|
with stash.SensitiveValues(secret=alt_secret) as sv:
|
|
for name, deriv, fmt in chains.MS_STD_DERIVATIONS:
|
|
if fmt == AF_P2SH and acct_num:
|
|
continue
|
|
dd = deriv.format(coin=chain.b44_cointype, acct_num=acct_num)
|
|
if fmt == AF_P2WSH:
|
|
sign_der = dd + "/0/0"
|
|
node = sv.derive_path(dd)
|
|
xp = chain.serialize_public(node, fmt)
|
|
fp.write(' "%s_deriv": "%s",\n' % (name, dd))
|
|
fp.write(' "%s": "%s",\n' % (name, xp))
|
|
xpub = chain.serialize_public(node)
|
|
fp.write(' "%s_key_exp": "%s",\n' % (name, "[%s/%s]%s" % (xfp, dd.replace("m/", ""), xpub)))
|
|
|
|
fp.write(' "account": "%d",\n' % acct_num)
|
|
fp.write(' "xfp": "%s"\n}\n' % xfp)
|
|
return fp.getvalue(), sign_der, AF_CLASSIC
|
|
|
|
from export import export_contents
|
|
await export_contents(label, lambda: render(acct), fname_pattern,
|
|
force_bbqr=True, is_json=True)
|
|
|
|
## MIGRATION ===
|
|
|
|
def miniscript_640_migrate(old_serialization):
|
|
from ubinascii import unhexlify as a2b_hex
|
|
from ucollections import OrderedDict
|
|
from desc_utils import PROVABLY_UNSPENDABLE
|
|
|
|
def remove_subderivation(str_key):
|
|
# find the end of origin derivation
|
|
orig_der_end = str_key.find(']')
|
|
if orig_der_end != -1:
|
|
orig_der = str_key[:orig_der_end + 1]
|
|
rest = str_key[orig_der_end + 1:]
|
|
else:
|
|
orig_der = ""
|
|
rest = str_key
|
|
|
|
rest_split = rest.split("/")
|
|
subder = "/%s" % "/".join(rest_split[1:])
|
|
return orig_der + rest_split[0], subder
|
|
|
|
# last 4 members are irrelevant
|
|
name, ct, af, key, keys, policy, _, _, _, _ = old_serialization
|
|
|
|
# standardize policy according to BIP-388
|
|
policy = policy.replace("/<0;1>/*", "/**")
|
|
|
|
# P2TR problem - policy here does not contain internal key (key)
|
|
# therefore numbering is wrong - needs to be x+1 to make place for internal key
|
|
# problem - keys can be duplicates with just subderivation different
|
|
|
|
# deduplicate keys to become origin keys
|
|
keys = list(OrderedDict([(remove_subderivation(k)[0], None) for k in keys]).keys())
|
|
|
|
if key:
|
|
# taproot internal key
|
|
# will always be @0
|
|
# need to check if this key is not already in policy somewhere
|
|
if "unspend(" in key:
|
|
# this is no longer supported - need to convert to xpub
|
|
end = key.find(")")
|
|
chain_code_str = key[8:end]
|
|
ik_u = True
|
|
ik_subder = key[end+1:]
|
|
n = ngu.hdnode.HDNode()
|
|
n.from_chaincode_pubkey(a2b_hex(chain_code_str), PROVABLY_UNSPENDABLE)
|
|
ik_key = chains.current_chain().serialize_public(n)
|
|
else:
|
|
ik_key, ik_subder = remove_subderivation(key)
|
|
ik_u = ExtendedKey.from_string(ik_key).is_provably_unspendable
|
|
|
|
if ik_subder == "/<0;1>/*":
|
|
ik_subder = "/**"
|
|
|
|
# internal key can be used in script tree & can already be at its correct position
|
|
# i.e. first in the keys vector
|
|
ik_pos_incorrect = int(ik_key != keys[0])
|
|
|
|
keys_info = []
|
|
for i in range(len(keys) - 1, -1, -1):
|
|
ph = "@%d" % i
|
|
assert policy.find(ph) != -1
|
|
|
|
res_key = keys[i]
|
|
|
|
if af == AF_P2TR:
|
|
# to make space for internal key in policy we need to bump placeholder
|
|
if res_key == ik_key:
|
|
# this origin key is the same as internal key
|
|
# so it is @0
|
|
policy = policy.replace(ph, "@0")
|
|
continue # no need to insert - will do later
|
|
else:
|
|
policy = policy.replace(ph, "@%d" % (i + ik_pos_incorrect))
|
|
|
|
keys_info.insert(0, res_key)
|
|
|
|
new_opts = {"af": af}
|
|
|
|
# policy in old version lacks script type
|
|
if af == AF_P2TR:
|
|
# handle internal key
|
|
keys_info.insert(0, ik_key)
|
|
desc_tmplt = "tr(@0%s,%s)" % (ik_subder, policy)
|
|
new_opts["ik_u"] = ik_u
|
|
|
|
elif af == AF_P2WSH:
|
|
desc_tmplt = "wsh(" + policy + ")"
|
|
elif af == AF_P2WSH_P2SH:
|
|
desc_tmplt = "sh(wsh(" + policy + "))"
|
|
else:
|
|
desc_tmplt = "sh(" + policy + ")"
|
|
|
|
if ct != "BTC":
|
|
new_opts['ct'] = ct
|
|
|
|
# previous version had unbounded names, cut it
|
|
return name[:MAX_NAME_LEN], desc_tmplt, keys_info, new_opts
|
|
|
|
|
|
async def multisig_640_migration(multisig_wallets):
|
|
# all MultisigWallet needs to be converted to MiniscriptWallet
|
|
# this function just returns new list of migrated multisig wallets without
|
|
# changing any persisted settings data
|
|
from glob import dis
|
|
dis.fullscreen("Migrating...")
|
|
total = len(multisig_wallets)
|
|
|
|
migrated_multi = []
|
|
# first element is always name, whether migrated or not
|
|
# shorten to MAX_NAME_LEN that will be done to miniscript names upon migration
|
|
taken_names = [tup[0][:MAX_NAME_LEN] for tup in settings.get("miniscript", [])]
|
|
for i, ms in enumerate(multisig_wallets):
|
|
bip67 = 1 # default enabled, requires 5-element serialization to disable
|
|
if len(ms) == 5:
|
|
bip67 = ms[-1]
|
|
ms = ms[:-1]
|
|
|
|
name, m_of_n, xpubs, opts = ms
|
|
ct = opts.get('ch', 'BTC')
|
|
af = opts.get('ft', AF_P2SH)
|
|
|
|
if len(xpubs[0]) == 2:
|
|
common_prefix = opts.get('pp', None)
|
|
if not common_prefix:
|
|
common_prefix = 'm'
|
|
common_prefix = common_prefix.replace("'", "h")
|
|
xpubs = [(a, common_prefix, b) for a, b in xpubs]
|
|
else:
|
|
# new format decompression
|
|
if 'd' in opts:
|
|
derivs = [p.replace("'", "h") for p in opts.get('d')]
|
|
xpubs = [(a, derivs[b], c) for a, b, c in xpubs]
|
|
|
|
keys_info = []
|
|
for mfp, der, ek in xpubs:
|
|
xfp = xfp2str(mfp).lower()
|
|
if der == "m":
|
|
keys_info.append("[%s]%s" % (xfp, ek))
|
|
else:
|
|
keys_info.append("[%s/%s]%s" % (xfp, der.replace("m/", ""), ek))
|
|
|
|
ms_type = "sortedmulti" if bip67 else "multi"
|
|
if af == AF_P2WSH:
|
|
desc_tmplt = "wsh(" + ms_type + "(%s))"
|
|
elif af == AF_P2WSH_P2SH:
|
|
desc_tmplt = "sh(wsh(" + ms_type + "(%s)))"
|
|
else:
|
|
desc_tmplt = "sh(" + ms_type + "(%s))"
|
|
|
|
M, N = m_of_n
|
|
inner = "%d,%s" % (M, ",".join(["@%d/**" % i for i in range(N)]))
|
|
desc_tmplt = desc_tmplt % inner
|
|
|
|
new_opts = {
|
|
"af": af,
|
|
"m_n": (M, N),
|
|
"b67": bip67
|
|
}
|
|
if ct != "BTC":
|
|
new_opts['ct'] = ct
|
|
|
|
# this should not happen as multisg names were limited to 20 chars max
|
|
name = name[:MAX_NAME_LEN]
|
|
if name in taken_names:
|
|
# name collision with miniscript
|
|
while name in taken_names:
|
|
suffix = str(ngu.random.uniform(100))
|
|
if (len(name) + len(suffix)) > MAX_NAME_LEN:
|
|
# issue
|
|
name = name[:MAX_NAME_LEN-len(suffix)]
|
|
|
|
name = name + suffix
|
|
|
|
migrated_multi.append((name, desc_tmplt, keys_info, new_opts))
|
|
dis.progress_sofar(i+1, total)
|
|
|
|
return migrated_multi
|
|
|
|
async def do_640_multisig_migration():
|
|
if not settings.get("multi_mig", 0):
|
|
ms = settings.get("multisig")
|
|
if ms:
|
|
# in version 6.4.0 EDGE
|
|
# MultisigWallet was removed & multisigs are now part of miniscript
|
|
migrated = await multisig_640_migration(ms)
|
|
msc = settings.get("miniscript", [])
|
|
settings.set("miniscript", msc + migrated)
|
|
# settings.remove_key("multisig")
|
|
settings.set("multi_mig", 1)
|
|
settings.save()
|
|
|
|
# EOF
|