refactor-out msgsign.py from auth.py

This commit is contained in:
Peter D. Gray 2025-03-27 16:57:37 -04:00
parent a0524ebe60
commit 2723f93d7c
No known key found for this signature in database
GPG Key ID: A2DCD558C2BE5D7C
13 changed files with 557 additions and 512 deletions

View File

@ -1648,7 +1648,7 @@ async def list_files(*A):
card.securely_blank_file(fn)
break
else:
from auth import write_sig_file
from msgsign import write_sig_file
sig_nice = write_sig_file([(digest, fn)])
await ux_show_story("Signature file %s written." % sig_nice)
@ -1946,7 +1946,7 @@ async def verify_sig_file(*a):
return
# start the process
from auth import verify_txt_sig_file
from msgsign import verify_txt_sig_file
await verify_txt_sig_file(fn)

View File

@ -14,7 +14,7 @@ from uasyncio import sleep_ms
from uhashlib import sha256
from ubinascii import hexlify as b2a_hex
from glob import settings
from auth import write_sig_file
from msgsign import write_sig_file
from charcodes import KEY_QR, KEY_NFC, KEY_PAGE_UP, KEY_PAGE_DOWN, KEY_HOME, KEY_LEFT, KEY_RIGHT
from charcodes import KEY_CANCEL
from utils import show_single_address, problem_file_line, truncate_address
@ -391,7 +391,7 @@ Press (3) if you really understand and accept these risks.
else:
# only custom path sets allow_change to False
# msg sign
from auth import sign_with_own_address
from msgsign import sign_with_own_address
await sign_with_own_address(path, addr_fmt)
elif n is None:

View File

@ -22,6 +22,7 @@ from files import CardSlot, CardMissingError, needs_microsd
from exceptions import HSMDenied
from version import MAX_TXN_LEN
from charcodes import KEY_QR, KEY_NFC, KEY_ENTER, KEY_CANCEL, KEY_LEFT, KEY_RIGHT
from msgsign import sign_message_digest
# Where in SPI flash/PSRAM the two PSBT files are (in and out)
TXN_INPUT_OFFSET = 0
@ -127,240 +128,14 @@ Using the key associated with address:
Press %s to continue, otherwise %s to cancel.''' % (OK, X)
# RFC2440 <https://www.ietf.org/rfc/rfc2440.txt> style signatures, popular
# since the genesis block, but not really part of any BIP as far as I know.
#
def rfc_signature_template_gen(msg, addr, sig):
template = [
"-----BEGIN BITCOIN SIGNED MESSAGE-----\n",
"%s\n" % msg,
"-----BEGIN BITCOIN SIGNATURE-----\n",
"%s\n" % addr,
"%s\n" % sig,
"-----END BITCOIN SIGNATURE-----\n"
]
for part in template:
yield part
def parse_armored_signature_file(contents):
sep = "-----"
assert contents.count(sep) == 6, "Armor text MUST be surrounded by exactly five (5) dashes."
temp = contents.split(sep)
msg = temp[2].strip()
addr_sig = temp[4].strip()
addr, sig_str = addr_sig.split()
return msg, addr, sig_str
def sign_message_digest(digest, subpath, prompt, addr_fmt=AF_CLASSIC, pk=None):
# do the signature itself!
from glob import dis
ch = chains.current_chain()
if prompt:
dis.fullscreen(prompt, percent=.25)
if pk is None:
with stash.SensitiveValues() as sv:
# if private key is provided, derivation subpath is ignored
# and provided private key is used for signing
node = sv.derive_path(subpath)
dis.progress_sofar(50, 100)
pk = node.privkey()
addr = ch.address(node, addr_fmt)
else:
node = ngu.hdnode.HDNode().from_chaincode_privkey(bytes(32), pk)
dis.progress_sofar(50, 100)
addr = ch.address(node, addr_fmt)
dis.progress_sofar(75, 100)
rv = ngu.secp256k1.sign(pk, digest, 0).to_bytes()
# AF_CLASSIC header byte base 31 is returned by default from ngu - NOOP
if addr_fmt != AF_CLASSIC:
header_byte, rs = rv[0], rv[1:]
# ngu only produces header base for compressed p2pkh, anyways get only rec_id
rec_id = (header_byte - 27) & 0x03
new_header_byte = rec_id + ch.sig_hdr_base(addr_fmt=addr_fmt)
rv = bytes([new_header_byte]) + rs
dis.progress_bar_show(1)
return rv, addr
def make_signature_file_msg(content_list):
# list of tuples consisting of (hash, file_name)
return b"\n".join([
b2a_hex(h) + b" " + fname.encode()
for h, fname in content_list
])
def parse_signature_file_msg(msg):
# only succeed for our format digest + 2 spaces + fname
try:
res = []
lines = msg.split('\n')
for ln in lines:
d, fn = ln.split(' ')
# should not need to strip if our file format, so dont
# is hex? is 32 bytes long?
assert len(a2b_hex(d)) == 32
res.append((d, fn))
return res
except:
return
def sign_export_contents(content_list, deriv, addr_fmt, pk=None):
msg2sign = make_signature_file_msg(content_list)
bitcoin_digest = chains.current_chain().hash_message(msg2sign)
sig_bytes, addr = sign_message_digest(bitcoin_digest, deriv, "Signing...", addr_fmt, pk=pk)
sig = b2a_base64(sig_bytes).decode().strip()
gen = rfc_signature_template_gen(addr=addr, msg=msg2sign.decode(), sig=sig)
return gen
def verify_signed_file_digest(msg):
from files import CardSlot
parsed_msg = parse_signature_file_msg(msg)
if not parsed_msg:
# not our format
return
try:
err, warn = [], []
with CardSlot() as card:
for digest, fname in parsed_msg:
path = card.abs_path(fname)
if not card.exists(path):
warn.append((fname, None))
continue
path = card.abs_path(fname)
with open(path, "rb") as f:
csum = chunk_checksum(f)
h = b2a_hex(csum).decode().strip()
if h != digest:
err.append((fname, h, digest))
except:
# fail silently if issues with reading files or SD issues
# no digest checking
return
return err, warn
def write_sig_file(content_list, derive=None, addr_fmt=AF_CLASSIC, pk=None, sig_name=None):
from glob import dis
if derive is None:
ct = chains.current_chain().b44_cointype
derive = "m/44'/%d'/0'/0/0" % ct
fpath = content_list[0][1]
if len(content_list) > 1:
# we're signing contents of more files - need generic name for sig file
assert sig_name
sig_nice = sig_name + ".sig"
sig_fpath = fpath.rsplit("/", 1)[0] + "/" + sig_nice
else:
sig_fpath = fpath.rsplit(".", 1)[0] + ".sig"
sig_nice = sig_fpath.split("/")[-1]
sig_gen = sign_export_contents([(h, f.split("/")[-1]) for h, f in content_list],
derive, addr_fmt, pk=pk)
with open(sig_fpath, 'wt') as fd:
for i, part in enumerate(sig_gen):
fd.write(part)
# rfc template generator has length of 6
dis.progress_sofar(i, 6)
return sig_nice
def validate_text_for_signing(text, only_printable=True):
# Check for some UX/UI traps in the message itself.
# - messages must be short and ascii only. Our charset is limited
# - too many spaces, leading/trailing can be an issue
# MSG_MAX_SPACES = 4 # impt. compared to -=- positioning
result = to_ascii_printable(text, only_printable=only_printable)
length = len(result)
assert length >= 2, "msg too short (min. 2)"
assert length <= MSG_SIGNING_MAX_LENGTH, "msg too long (max. %d)" % MSG_SIGNING_MAX_LENGTH
assert " " not in result, 'too many spaces together in msg(max. 3)'
# other confusion w/ whitepace
assert result[0] != ' ', 'leading space(s) in msg'
assert result[-1] != ' ', 'trailing space(s) in msg'
# looks ok
return result
def addr_fmt_from_subpath(subpath):
if not subpath:
af = "p2pkh"
elif subpath[:4] == "m/84":
af = "p2wpkh"
elif subpath[:4] == "m/49":
af = "p2sh-p2wpkh"
else:
af = "p2pkh"
return af
def parse_msg_sign_request(data):
subpath = ""
addr_fmt = None
is_json = False
# sparrow compat
if "signmessage" in data:
try:
mark, subpath, *msg_line = data.split(" ", 2)
assert mark == "signmessage"
# subpath will be verified & cleaned later
assert msg_line[0][:6] == "ascii:"
text = msg_line[0][6:]
return text, subpath, addr_fmt_from_subpath(subpath), is_json
except:pass
# ===
try:
data_dict = ujson.loads(data.strip())
text = data_dict.get("msg", None)
if text is None:
raise AssertionError("MSG required")
subpath = data_dict.get("subpath", subpath)
addr_fmt = data_dict.get("addr_fmt", addr_fmt)
is_json = True
except ValueError:
lines = data.split("\n")
assert len(lines) >= 1, "min 1 line"
assert len(lines) <= 3, "max 3 lines"
if len(lines) == 1:
text = lines[0]
elif len(lines) == 2:
text, subpath = lines
else:
text, subpath, addr_fmt = lines
if not addr_fmt:
addr_fmt = addr_fmt_from_subpath(subpath)
if not subpath:
subpath = chains.STD_DERIVATIONS[addr_fmt]
subpath = subpath.format(
coin_type=chains.current_chain().b44_cointype,
account=0, change=0, idx=0
)
return text, subpath, addr_fmt, is_json
class ApproveMessageSign(UserAuthorizedAction):
def __init__(self, text, subpath, addr_fmt, approved_cb=None,
msg_sign_request=None, only_printable=True):
super().__init__()
is_json = False
from msgsign import validate_text_for_signing, parse_msg_sign_request
if msg_sign_request:
text, subpath, addr_fmt, is_json = parse_msg_sign_request(msg_sign_request)
@ -397,7 +172,7 @@ class ApproveMessageSign(UserAuthorizedAction):
else:
# perform signing (progress bar shown)
digest = chains.current_chain().hash_message(self.text.encode())
self.result = sign_message_digest(digest, self.subpath, "Signing...", self.addr_fmt)[0]
self.result, _ = sign_message_digest(digest, self.subpath, "Signing...", self.addr_fmt)
if self.approved_cb:
# for micro sd case
@ -412,48 +187,18 @@ class ApproveMessageSign(UserAuthorizedAction):
def sign_msg(text, subpath, addr_fmt):
# Start the approval process for message signing.
UserAuthorizedAction.check_busy()
UserAuthorizedAction.active_request = ApproveMessageSign(text, subpath, addr_fmt)
# kill any menu stack, and put our thing at the top
abort_and_goto(UserAuthorizedAction.active_request)
async def msg_sign_ux_get_subpath(addr_fmt):
purpose = chains.af_to_bip44_purpose(addr_fmt)
chain_n = chains.current_chain().b44_cointype
acct = await ux_enter_bip32_index('Account Number:') or 0
ch = await ux_show_story(title="Change?",
msg="Press (0) to use internal/change address,"
" %s to use external/receive address." % OK, escape="0")
change = 1 if ch == '0' else 0
idx = await ux_enter_bip32_index('Index Number:') or 0
return "m/%dh/%dh/%dh/%d/%d" % (purpose, chain_n, acct, change, idx)
async def ux_sign_msg(txt, approved_cb=None, kill_menu=True):
from menu import MenuSystem, MenuItem
from ux import the_ux
async def done(_1, _2, item):
from auth import approve_msg_sign, msg_sign_ux_get_subpath
text, af = item.arg
subpath = await msg_sign_ux_get_subpath(af)
await approve_msg_sign(text, subpath, af, approved_cb=approved_cb,
kill_menu=kill_menu, only_printable=False)
# pick address format
rv = [
MenuItem(chains.addr_fmt_label(af), f=done, arg=(txt, af))
for af in chains.SINGLESIG_AF
]
the_ux.push(MenuSystem(rv))
async def approve_msg_sign(text, subpath, addr_fmt, approved_cb=None,
msg_sign_request=None, kill_menu=False,
only_printable=True):
# Ask user if they want to sign some short text message.
UserAuthorizedAction.cleanup()
UserAuthorizedAction.check_busy(ApproveMessageSign)
try:
@ -463,112 +208,22 @@ async def approve_msg_sign(text, subpath, addr_fmt, approved_cb=None,
msg_sign_request=msg_sign_request,
only_printable=only_printable,
)
if kill_menu:
abort_and_goto(UserAuthorizedAction.active_request)
else:
# do not kill the menu stack! just append
# do not kill the menu stack! just push
from ux import the_ux
the_ux.push(UserAuthorizedAction.active_request)
except (AssertionError, ValueError) as exc:
await ux_show_story("Problem: %s\n\nMessage to be signed must be a single line of ASCII text." % exc)
return
async def msg_signing_done(signature, address, text):
from ux import import_export_prompt
ch = await import_export_prompt("Signed Msg", is_import=False,
no_qr=not version.has_qwerty)
if ch == KEY_CANCEL:
return
if isinstance(ch, dict):
await sd_sign_msg_done(signature, address, text, "msg_sign", **ch)
elif version.has_qr and ch == KEY_QR:
from ux_q1 import qr_msg_sign_done
await qr_msg_sign_done(signature, address, text)
elif ch in KEY_NFC+"3":
from glob import NFC
if NFC:
await NFC.msg_sign_done(signature, address, text)
async def sign_with_own_address(subpath, addr_fmt):
# used for cases where we already have the key picked, but need the message:
# * address_explorer custom path
# * positive ownership test
to_sign = await ux_input_text("", scan_ok=True, prompt="Enter MSG") # max len is 100 only here
if not to_sign: return
await approve_msg_sign(to_sign, subpath, addr_fmt, approved_cb=msg_signing_done, kill_menu=True)
async def sd_sign_msg_done(signature, address, text, base=None, orig_path=None,
slot_b=None, force_vdisk=False):
from glob import dis
dis.fullscreen('Generating...')
out_fn = None
sig = b2a_base64(signature).decode('ascii').strip()
while 1:
# try to put back into same spot
# add -signed to end.
target_fname = base + '-signed.txt'
lst = [orig_path]
if orig_path:
lst.append(None)
for path in lst:
try:
with CardSlot(readonly=True, slot_b=slot_b, force_vdisk=force_vdisk) as card:
out_full, out_fn = card.pick_filename(target_fname, path)
out_path = path
if out_full: break
except CardMissingError:
prob = 'Missing card.\n\n'
out_fn = None
if not out_fn:
# need them to insert a card
prob = ''
else:
# attempt write-out
try:
dis.fullscreen("Saving...")
with CardSlot(slot_b=slot_b, force_vdisk=force_vdisk) as card:
with card.open(out_full, 'wt') as fd:
# save in full RFC style
# gen length is 6
gen = rfc_signature_template_gen(addr=address, msg=text, sig=sig)
for i, part in enumerate(gen):
fd.write(part)
dis.progress_sofar(i, 6)
# success and done!
break
except OSError as exc:
prob = 'Failed to write!\n\n%s\n\n' % exc
sys.print_exception(exc)
# fall through to try again
# prompt them to input another card?
ch = await ux_show_story(prob + "Please insert an SDCard to receive signed message, "
"and press %s." % OK, title="Need Card")
if ch == 'x':
await ux_aborted()
return
# done.
msg = "Created new file:\n\n%s" % out_fn
await ux_show_story(msg, title='File Signed')
async def sign_txt_file(filename):
# sign a one-line text file found on a MicroSD card
# - not yet clear how to do address types other than 'classic'
from ux import the_ux
from msgsign import sd_sign_msg_done
async def done(signature, address, text):
# complete. write out result
@ -591,139 +246,10 @@ async def sign_txt_file(filename):
await approve_msg_sign(None, None, None, approved_cb=done,
msg_sign_request=res)
def verify_signature(msg, addr, sig_str):
warnings = ""
script = None
hash160 = None
invalid_addr_fmt_msg = "Invalid address format - must be one of p2pkh, p2sh-p2wpkh, or p2wpkh."
invalid_addr = "Invalid signature for message."
if addr[0] in "1mn":
addr_fmt = AF_CLASSIC
decoded_addr = ngu.codecs.b58_decode(addr)
hash160 = decoded_addr[1:] # remove prefix
elif addr.startswith("bc1q") or addr.startswith("tb1q") or addr.startswith("bcrt1q"):
if len(addr) > 44: # testnet/mainnet max singlesig len 42, regtest 44
# p2wsh
raise ValueError(invalid_addr_fmt_msg)
addr_fmt = AF_P2WPKH
_, _, hash160 = ngu.codecs.segwit_decode(addr)
elif addr[0] in "32":
addr_fmt = AF_P2WPKH_P2SH
decoded_addr = ngu.codecs.b58_decode(addr)
script = decoded_addr[1:] # remove prefix
else:
raise ValueError(invalid_addr_fmt_msg)
try:
sig_bytes = a2b_base64(sig_str)
if not sig_bytes or len(sig_bytes) != 65:
# can return b'' in case of wrong, can also raise
raise ValueError("invalid encoding")
header_byte = sig_bytes[0]
header_base = chains.current_chain().sig_hdr_base(addr_fmt)
if (header_byte - header_base) not in (0, 1, 2, 3):
# wrong header value only - this can still verify OK
warnings += "Specified address format does not match signature header byte format."
# least two significant bits
rec_id = (header_byte - 27) & 0x03
# need to normalize it to 31 base for ngu
new_header_byte = 31 + rec_id
sig = ngu.secp256k1.signature(bytes([new_header_byte]) + sig_bytes[1:])
except ValueError as e:
raise ValueError("Parsing signature failed - %s." % str(e))
digest = chains.current_chain().hash_message(msg.encode('ascii'))
try:
rec_pubkey = sig.verify_recover(digest)
except ValueError as e:
raise ValueError("Invalid signature for msg - %s." % str(e))
rec_pubkey_bytes = rec_pubkey.to_bytes()
rec_hash160 = ngu.hash.hash160(rec_pubkey_bytes)
if script:
target = bytes([0, 20]) + rec_hash160
target = ngu.hash.hash160(target)
if target != script:
raise ValueError(invalid_addr)
else:
if rec_hash160 != hash160:
raise ValueError(invalid_addr)
return warnings
async def verify_armored_signed_msg(contents, digest_check=True):
# digest_check=False for NFC cases, where we do not have filesystem
from glob import dis
dis.fullscreen("Verifying...")
try:
msg, addr, sig_str = parse_armored_signature_file(contents)
except Exception as e:
e_line = problem_file_line(e)
await ux_show_story("Malformed signature file. %s %s" % (str(e), e_line), title="FAILURE")
return
try:
sig_warn = verify_signature(msg, addr, sig_str)
except Exception as e:
await ux_show_story(str(e), title="ERROR")
return
title = "CORRECT"
warn_msg = ""
err_msg = ""
story = "Good signature by address:\n%s" % show_single_address(addr)
if digest_check:
digest_prob = verify_signed_file_digest(msg)
if digest_prob:
err, digest_warn = digest_prob
if digest_warn:
title = "WARNING"
wmsg_base = "not present. Contents verification not possible."
if len(digest_warn) == 1:
fname = digest_warn[0][0]
warn_msg += "'%s' is %s" % (fname, wmsg_base)
else:
warn_msg += "Files:\n" + "\n".join("> %s" % fname for fname, _ in digest_warn)
warn_msg += "\nare %s" % wmsg_base
if err:
title = "ERROR"
for fname, calc, got in err:
err_msg += ("Referenced file '%s' has wrong contents.\n"
"Got:\n%s\n\nExpected:\n%s" % (fname, got, calc))
if sig_warn:
# we know not ours only because wrong recid header used & not BIP-137 compliant
story = "Correctly signed, but not by this Coldcard. %s" % sig_warn
await ux_show_story('\n\n'.join(m for m in [err_msg, story, warn_msg] if m), title=title)
async def verify_txt_sig_file(filename):
# copy message into memory
try:
with CardSlot() as card:
with card.open(filename, 'rt') as fd:
text = fd.read()
except CardMissingError:
await needs_microsd()
return
except Exception as e:
await ux_show_story('Error: ' + str(e))
return
await verify_armored_signed_msg(text)
async def try_push_tx(data, txid, txn_sha=None):
from glob import settings, PSRAM, NFC
# if NFC PushTx is enabled, do that w/o questions.
from glob import settings, PSRAM, NFC
url = settings.get('ptxurl', False)
if NFC and url:
try:
@ -734,8 +260,8 @@ async def try_push_tx(data, txid, txn_sha=None):
await NFC.share_push_tx(url, txid, data, txn_sha)
return True
except: pass # continue normally if it fails, perhaps too big?
return False
return False
class ApproveTransaction(UserAuthorizedAction):
def __init__(self, psbt_len, flags=0x0, approved_cb=None, psbt_sha=None, cb_kws=None):
@ -1123,7 +649,7 @@ class ApproveTransaction(UserAuthorizedAction):
if chk:
# append the signature
digest = ngu.hash.sha256s(chk.digest())
sig = sign_message_digest(digest, 'm', None, AF_CLASSIC)[0]
sig, _ = sign_message_digest(digest, 'm', None, AF_CLASSIC)
fd.write(b2a_base64(sig).decode('ascii').strip())
fd.write('\n')

View File

@ -11,7 +11,7 @@ from ux import ux_show_story, ux_enter_bip32_index, the_ux, ux_confirm, ux_drama
from menu import MenuItem, MenuSystem
from ubinascii import hexlify as b2a_hex
from ubinascii import b2a_base64
from auth import write_sig_file
from msgsign import write_sig_file
from utils import chunk_writer, xfp2str, swab32
from charcodes import KEY_QR, KEY_NFC, KEY_CANCEL

View File

@ -8,7 +8,7 @@ from ucollections import OrderedDict
from utils import xfp2str, swab32, chunk_writer
from ux import ux_show_story
from glob import settings
from auth import write_sig_file
from msgsign import write_sig_file
from public_constants import AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH, AF_P2WSH, AF_P2WSH_P2SH, AF_P2SH
from charcodes import KEY_NFC, KEY_CANCEL, KEY_QR
from ownership import OWNERSHIP

View File

@ -5,6 +5,7 @@ freeze_as_mpy('', [
'actions.py',
'address_explorer.py',
'auth.py',
'msgsign.py',
'backups.py',
'callgate.py',
'chains.py',

519
shared/msgsign.py Normal file
View File

@ -0,0 +1,519 @@
# (c) Copyright 2025 by Coinkite Inc. This file is covered by license found in COPYING-CC.
#
# Signatures over text ... not transactions.
#
import stash, chains, sys, gc, ngu, ujson, version
from ubinascii import b2a_base64, a2b_base64
from ubinascii import hexlify as b2a_hex
from ubinascii import unhexlify as a2b_hex
from uhashlib import sha256
from public_constants import MSG_SIGNING_MAX_LENGTH
from public_constants import AF_CLASSIC, AF_P2WPKH, AF_P2WPKH_P2SH
from charcodes import KEY_QR, KEY_NFC, KEY_ENTER, KEY_CANCEL
from ux import ux_show_story, OK, X, ux_enter_bip32_index
from utils import problem_file_line, to_ascii_printable, show_single_address
from files import CardSlot, CardMissingError, needs_microsd
def rfc_signature_template(msg, addr, sig):
# RFC2440 <https://www.ietf.org/rfc/rfc2440.txt> style signatures, popular
# since the genesis block, but not really part of any BIP as far as I know.
#
return [
"-----BEGIN BITCOIN SIGNED MESSAGE-----\n",
"%s\n" % msg,
"-----BEGIN BITCOIN SIGNATURE-----\n",
"%s\n" % addr,
"%s\n" % sig,
"-----END BITCOIN SIGNATURE-----\n"
]
def parse_armored_signature_file(contents):
# XXX limited parser: will fail w/ messages containing dashes
sep = "-----"
assert contents.count(sep) == 6, "Armor text MUST be surrounded by exactly five (5) dashes."
temp = contents.split(sep)
msg = temp[2].strip()
addr_sig = temp[4].strip()
addr, sig_str = addr_sig.split()
return msg, addr, sig_str
def verify_signature(msg, addr, sig_str):
# Look at a base64 signature, and given address. Do full verification.
# - raise on errors
# - return warnings as string: can only be mismatch between addr format encoded in recid
warnings = ""
script = None
hash160 = None
invalid_addr_fmt_msg = "Invalid address format - must be one of p2pkh, p2sh-p2wpkh, or p2wpkh."
invalid_addr = "Invalid signature for message."
if addr[0] in "1mn":
addr_fmt = AF_CLASSIC
decoded_addr = ngu.codecs.b58_decode(addr)
hash160 = decoded_addr[1:] # remove prefix
elif addr.startswith("bc1q") or addr.startswith("tb1q") or addr.startswith("bcrt1q"):
if len(addr) > 44: # testnet/mainnet max singlesig len 42, regtest 44
# p2wsh
raise ValueError(invalid_addr_fmt_msg)
addr_fmt = AF_P2WPKH
_, _, hash160 = ngu.codecs.segwit_decode(addr)
elif addr[0] in "32":
addr_fmt = AF_P2WPKH_P2SH
decoded_addr = ngu.codecs.b58_decode(addr)
script = decoded_addr[1:] # remove prefix
else:
raise ValueError(invalid_addr_fmt_msg)
try:
sig_bytes = a2b_base64(sig_str)
if not sig_bytes or len(sig_bytes) != 65:
# can return b'' in case of wrong, can also raise
raise ValueError("invalid encoding")
header_byte = sig_bytes[0]
header_base = chains.current_chain().sig_hdr_base(addr_fmt)
if (header_byte - header_base) not in (0, 1, 2, 3):
# wrong header value only - this can still verify OK
warnings += "Specified address format does not match signature header byte format."
# least two significant bits
rec_id = (header_byte - 27) & 0x03
# need to normalize it to 31 base for ngu
new_header_byte = 31 + rec_id
sig = ngu.secp256k1.signature(bytes([new_header_byte]) + sig_bytes[1:])
except ValueError as e:
raise ValueError("Parsing signature failed - %s." % str(e))
digest = chains.current_chain().hash_message(msg.encode('ascii'))
try:
rec_pubkey = sig.verify_recover(digest)
except ValueError as e:
raise ValueError("Invalid signature for msg - %s." % str(e))
rec_pubkey_bytes = rec_pubkey.to_bytes()
rec_hash160 = ngu.hash.hash160(rec_pubkey_bytes)
if script:
target = bytes([0, 20]) + rec_hash160
target = ngu.hash.hash160(target)
if target != script:
raise ValueError(invalid_addr)
else:
if rec_hash160 != hash160:
raise ValueError(invalid_addr)
return warnings
async def verify_armored_signed_msg(contents, digest_check=True):
# Verify on-disk checksums of files listed inside a signed file.
# - digest_check=False for NFC cases, where we do not have filesystem
from glob import dis
dis.fullscreen("Verifying...")
try:
msg, addr, sig_str = parse_armored_signature_file(contents)
except Exception as e:
e_line = problem_file_line(e)
await ux_show_story("Malformed signature file. %s %s" % (str(e), e_line), title="FAILURE")
return
try:
sig_warn = verify_signature(msg, addr, sig_str)
except Exception as e:
await ux_show_story(str(e), title="ERROR")
return
title = "CORRECT"
warn_msg = ""
err_msg = ""
story = "Good signature by address:\n%s" % show_single_address(addr)
if digest_check:
digest_prob = verify_signed_file_digest(msg)
if digest_prob:
err, digest_warn = digest_prob
if digest_warn:
title = "WARNING"
wmsg_base = "not present. Contents verification not possible."
if len(digest_warn) == 1:
fname = digest_warn[0][0]
warn_msg += "'%s' is %s" % (fname, wmsg_base)
else:
warn_msg += "Files:\n" + "\n".join("> %s" % fname for fname, _ in digest_warn)
warn_msg += "\nare %s" % wmsg_base
if err:
title = "ERROR"
for fname, calc, got in err:
err_msg += ("Referenced file '%s' has wrong contents.\n"
"Got:\n%s\n\nExpected:\n%s" % (fname, got, calc))
if sig_warn:
# we know not ours only because wrong recid header used & not BIP-137 compliant
story = "Correctly signed, but not by this Coldcard. %s" % sig_warn
await ux_show_story('\n\n'.join(m for m in [err_msg, story, warn_msg] if m), title=title)
async def verify_txt_sig_file(filename):
# copy message into memory
try:
with CardSlot() as card:
with card.open(filename, 'rt') as fd:
text = fd.read()
except CardMissingError:
await needs_microsd()
return
except Exception as e:
await ux_show_story('Error: ' + str(e))
return
await verify_armored_signed_msg(text)
async def msg_sign_ux_get_subpath(addr_fmt):
# Ask for account number, and maybe change component of path for signature.
# - return full derivation path to be used.
purpose = chains.af_to_bip44_purpose(addr_fmt)
chain_n = chains.current_chain().b44_cointype
acct = await ux_enter_bip32_index('Account Number:') or 0
ch = await ux_show_story(title="Change?",
msg="Press (0) to use internal/change address,"
" %s to use external/receive address." % OK, escape="0")
change = 1 if ch == '0' else 0
idx = await ux_enter_bip32_index('Index Number:') or 0
return "m/%dh/%dh/%dh/%d/%d" % (purpose, chain_n, acct, change, idx)
def sign_export_contents(content_list, deriv, addr_fmt, pk=None):
# Return signed message over hashes of files.
msg2sign = make_signature_file_msg(content_list)
bitcoin_digest = chains.current_chain().hash_message(msg2sign)
sig_bytes, addr = sign_message_digest(bitcoin_digest, deriv, "Signing...", addr_fmt, pk=pk)
sig = b2a_base64(sig_bytes).decode().strip()
return rfc_signature_template(addr=addr, msg=msg2sign.decode(), sig=sig)
def verify_signed_file_digest(msg):
# Look inside a list of hashs and file names, and
# verify at their actual hashes and return list of issues if any.
from files import CardSlot
parsed_msg = parse_signature_file_msg(msg)
if not parsed_msg:
# not our format
return
try:
err, warn = [], []
with CardSlot() as card:
for digest, fname in parsed_msg:
path = card.abs_path(fname)
if not card.exists(path):
warn.append((fname, None))
continue
path = card.abs_path(fname)
md = sha256()
with open(path, "rb") as f:
while True:
chunk = f.read(1024)
if not chunk:
break
md.update(chunk)
h = b2a_hex(md.digest()).decode().strip()
if h != digest:
err.append((fname, h, digest))
except:
# fail silently if issues with reading files or SD issues
# no digest checking
return
return err, warn
def write_sig_file(content_list, derive=None, addr_fmt=AF_CLASSIC, pk=None, sig_name=None):
from glob import dis
if derive is None:
ct = chains.current_chain().b44_cointype
derive = "m/44'/%d'/0'/0/0" % ct
fpath = content_list[0][1]
if len(content_list) > 1:
# we're signing contents of more files - need generic name for sig file
assert sig_name
sig_nice = sig_name + ".sig"
sig_fpath = fpath.rsplit("/", 1)[0] + "/" + sig_nice
else:
sig_fpath = fpath.rsplit(".", 1)[0] + ".sig"
sig_nice = sig_fpath.split("/")[-1]
sig_gen = sign_export_contents([(h, f.split("/")[-1]) for h, f in content_list],
derive, addr_fmt, pk=pk)
with open(sig_fpath, 'wt') as fd:
for i, part in enumerate(sig_gen):
fd.write(part)
return sig_nice
def validate_text_for_signing(text, only_printable=True):
# Check for some UX/UI traps in the message itself.
# - messages must be short and ascii only. Our charset is limited
# - too many spaces, leading/trailing can be an issue
# MSG_MAX_SPACES = 4 # impt. compared to -=- positioning
result = to_ascii_printable(text, only_printable=only_printable)
length = len(result)
assert length >= 2, "msg too short (min. 2)"
assert length <= MSG_SIGNING_MAX_LENGTH, "msg too long (max. %d)" % MSG_SIGNING_MAX_LENGTH
assert " " not in result, 'too many spaces together in msg(max. 3)'
# other confusion w/ whitepace
assert result[0] != ' ', 'leading space(s) in msg'
assert result[-1] != ' ', 'trailing space(s) in msg'
# looks ok
return result
def addr_fmt_from_subpath(subpath):
if not subpath:
af = "p2pkh"
elif subpath[:4] == "m/84":
af = "p2wpkh"
elif subpath[:4] == "m/49":
af = "p2sh-p2wpkh"
else:
af = "p2pkh"
return af
def parse_msg_sign_request(data):
subpath = ""
addr_fmt = None
is_json = False
# sparrow compat
if "signmessage" in data:
try:
mark, subpath, *msg_line = data.split(" ", 2)
assert mark == "signmessage"
# subpath will be verified & cleaned later
assert msg_line[0][:6] == "ascii:"
text = msg_line[0][6:]
return text, subpath, addr_fmt_from_subpath(subpath), is_json
except:pass
# ===
try:
data_dict = ujson.loads(data.strip())
text = data_dict.get("msg", None)
if text is None:
raise AssertionError("MSG required")
subpath = data_dict.get("subpath", subpath)
addr_fmt = data_dict.get("addr_fmt", addr_fmt)
is_json = True
except ValueError:
lines = data.split("\n")
assert len(lines) >= 1, "min 1 line"
assert len(lines) <= 3, "max 3 lines"
if len(lines) == 1:
text = lines[0]
elif len(lines) == 2:
text, subpath = lines
else:
text, subpath, addr_fmt = lines
if not addr_fmt:
addr_fmt = addr_fmt_from_subpath(subpath)
if not subpath:
subpath = chains.STD_DERIVATIONS[addr_fmt]
subpath = subpath.format(
coin_type=chains.current_chain().b44_cointype,
account=0, change=0, idx=0
)
return text, subpath, addr_fmt, is_json
def make_signature_file_msg(content_list):
# list of tuples consisting of (hash, file_name)
return b"\n".join([
b2a_hex(h) + b" " + fname.encode()
for h, fname in content_list
])
def parse_signature_file_msg(msg):
# only succeed for our format digest + 2 spaces + fname
try:
res = []
lines = msg.split('\n')
for ln in lines:
d, fn = ln.split(' ')
# should not need to strip if our file format, so dont
# is hex? is 32 bytes long?
assert len(a2b_hex(d)) == 32
res.append((d, fn))
return res
except:
return
def sign_message_digest(digest, subpath, prompt, addr_fmt=AF_CLASSIC, pk=None):
# do the signature itself!
from glob import dis
ch = chains.current_chain()
if prompt:
dis.fullscreen(prompt, percent=.25)
if pk is None:
with stash.SensitiveValues() as sv:
node = sv.derive_path(subpath)
dis.progress_sofar(50, 100)
pk = node.privkey()
addr = ch.address(node, addr_fmt)
else:
# if private key is provided, derivation subpath is ignored
# and given private key is used for signing.
node = ngu.hdnode.HDNode().from_chaincode_privkey(bytes(32), pk)
dis.progress_sofar(50, 100)
addr = ch.address(node, addr_fmt)
dis.progress_sofar(75, 100)
rv = ngu.secp256k1.sign(pk, digest, 0).to_bytes()
# AF_CLASSIC header byte base 31 is returned by default from ngu - NOOP
if addr_fmt != AF_CLASSIC:
# ngu only produces header base for compressed p2pkh, anyways get only rec_id
rv = bytearray(rv)
rec_id = (rv[0] - 27) & 0x03
rv[0] = rec_id + ch.sig_hdr_base(addr_fmt=addr_fmt)
dis.progress_bar_show(1)
return rv, addr
async def ux_sign_msg(txt, approved_cb=None, kill_menu=True):
from menu import MenuSystem, MenuItem
from ux import the_ux
async def done(_1, _2, item):
from auth import approve_msg_sign
text, af = item.arg
subpath = await msg_sign_ux_get_subpath(af)
await approve_msg_sign(text, subpath, af, approved_cb=approved_cb,
kill_menu=kill_menu, only_printable=False)
# pick address format
rv = [
MenuItem(chains.addr_fmt_label(af), f=done, arg=(txt, af))
for af in chains.SINGLESIG_AF
]
the_ux.push(MenuSystem(rv))
async def msg_signing_done(signature, address, text):
from ux import import_export_prompt
ch = await import_export_prompt("Signed Msg", is_import=False,
no_qr=not version.has_qwerty)
if ch == KEY_CANCEL:
return
if isinstance(ch, dict):
await sd_sign_msg_done(signature, address, text, "msg_sign", **ch)
elif version.has_qr and ch == KEY_QR:
from ux_q1 import qr_msg_sign_done
await qr_msg_sign_done(signature, address, text)
elif ch in KEY_NFC+"3":
from glob import NFC
if NFC:
await NFC.msg_sign_done(signature, address, text)
async def sign_with_own_address(subpath, addr_fmt):
# used for cases where we already have the key picked, but need the message:
# * address_explorer custom path
# * positive ownership test
from glob import dis
to_sign = await ux_input_text("", scan_ok=True, prompt="Enter MSG") # max len is 100 only here
if not to_sign: return
await approve_msg_sign(to_sign, subpath, addr_fmt, approved_cb=msg_signing_done, kill_menu=True)
async def sd_sign_msg_done(signature, address, text, base=None, orig_path=None,
slot_b=None, force_vdisk=False):
from glob import dis
dis.fullscreen('Generating...')
out_fn = None
sig = b2a_base64(signature).decode('ascii').strip()
while 1:
# try to put back into same spot
# add -signed to end.
target_fname = base + '-signed.txt'
lst = [orig_path]
if orig_path:
lst.append(None)
for path in lst:
try:
with CardSlot(readonly=True, slot_b=slot_b, force_vdisk=force_vdisk) as card:
out_full, out_fn = card.pick_filename(target_fname, path)
out_path = path
if out_full: break
except CardMissingError:
prob = 'Missing card.\n\n'
out_fn = None
if not out_fn:
# need them to insert a card
prob = ''
else:
# attempt write-out
try:
dis.fullscreen("Saving...")
with CardSlot(slot_b=slot_b, force_vdisk=force_vdisk) as card:
with card.open(out_full, 'wt') as fd:
# save in full RFC style
# gen length is 6
gen = rfc_signature_template(addr=address, msg=text, sig=sig)
for i, part in enumerate(gen):
fd.write(part)
# success and done!
break
except OSError as exc:
prob = 'Failed to write!\n\n%s\n\n' % exc
sys.print_exception(exc)
# fall through to try again
# prompt them to input another card?
ch = await ux_show_story(prob + "Please insert an SDCard to receive signed message, "
"and press %s." % OK, title="Need Card")
if ch == 'x':
await ux_aborted()
return
# done.
msg = "Created new file:\n\n%s" % out_fn
await ux_show_story(msg, title='File Signed')
# EOF

View File

@ -716,14 +716,14 @@ class NFCHandler:
msg_sign_request=winner)
async def msg_sign_done(self, signature, address, text):
from auth import rfc_signature_template_gen
from msgsign import rfc_signature_template
sig = b2a_base64(signature).decode('ascii').strip()
armored_str = "".join(rfc_signature_template_gen(addr=address, msg=text, sig=sig))
armored_str = "".join(rfc_signature_template(addr=address, msg=text, sig=sig))
await self.share_text(armored_str)
async def verify_sig_nfc(self):
from auth import verify_armored_signed_msg
from msgsign import verify_armored_signed_msg
f = lambda x: x.decode().strip() if b"SIGNED MESSAGE" in x else None
winner = await self._nfc_reader(f, 'Unable to find signed message.')

View File

@ -318,7 +318,7 @@ class NoteContentBase:
await start_export([self])
async def sign_txt_msg(self, a, b, item):
from auth import ux_sign_msg, msg_signing_done
from msgsign import ux_sign_msg, msg_signing_done
txt = item.arg
await ux_sign_msg(txt, approved_cb=msg_signing_done, kill_menu=False)
@ -537,7 +537,7 @@ class NoteContent(NoteContentBase):
async def start_export(notes):
# Save out notes/passwords
from glob import NFC
from auth import write_sig_file
from msgsign import write_sig_file
import ujson as json
from ux_q1 import show_bbqr_codes

View File

@ -340,7 +340,7 @@ class OwnershipCache:
is_alnum=(wallet.addr_fmt & (AFC_BECH32 | AFC_BECH32M)),
msg=addr, is_addrs=True)
elif not is_ms and (ch == "0"): # only singlesig
from auth import sign_with_own_address
from msgsign import sign_with_own_address
await sign_with_own_address(sp, wallet.addr_fmt)
else:
break

View File

@ -83,7 +83,7 @@ class PaperWalletMaker:
try:
import ngu
from auth import write_sig_file
from msgsign import write_sig_file
from chains import current_chain
from serializations import hash160
from stash import blank_object

View File

@ -972,13 +972,14 @@ class QRScannerInteraction:
if what == "vmsg":
data, = vals
from auth import verify_armored_signed_msg
from msgsign import verify_armored_signed_msg
await verify_armored_signed_msg(data)
return
if what == "smsg":
data, = vals
from auth import approve_msg_sign, msg_signing_done
from auth import approve_msg_sign,
from msgsign import msg_signing_done
await approve_msg_sign(None, None, None,
msg_sign_request=data, kill_menu=True,
approved_cb=msg_signing_done)
@ -1122,7 +1123,7 @@ async def ux_visualize_wif(wif_str, kp, compressed, testnet):
async def qr_msg_sign_done(signature, address, text):
from ux import ux_show_story
from auth import rfc_signature_template_gen
from msgsign import rfc_signature_template
from export import export_by_qr
sig = b2a_base64(signature).decode('ascii').strip()
@ -1134,12 +1135,13 @@ async def qr_msg_sign_done(signature, address, text):
if ch == "y":
await export_by_qr(sig, "Signature", "U")
if ch == "0":
armored_str = "".join(rfc_signature_template_gen(addr=address, msg=text,
armored_str = "".join(rfc_signature_template(addr=address, msg=text,
sig=sig))
await show_bbqr_codes("U", armored_str, "Armored MSG")
async def qr_sign_msg(txt):
from auth import ux_sign_msg
from msgsign import ux_sign_msg
await ux_sign_msg(txt, approved_cb=qr_msg_sign_done, kill_menu=True)
async def ux_visualize_textqr(txt, maxlen=MSG_SIGNING_MAX_LENGTH):
@ -1156,8 +1158,6 @@ async def ux_visualize_textqr(txt, maxlen=MSG_SIGNING_MAX_LENGTH):
msg = "%s\n\nAbove is text that was scanned. " % txt
if escape:
msg += " Press (0) to sign the text. "
else:
msg += "We can't do any more with it."
ch = await ux_show_story(title="Simple Text", msg=msg, escape=escape)
if escape and (ch == "0"):

View File

@ -932,7 +932,6 @@ def test_verify_signature_file_truncated(way, microsd_path, cap_story, verify_ar
else:
assert title == "FAILURE"
assert "Armor text MUST be surrounded by exactly five (5) dashes" in story
assert "auth.py" in story
@pytest.mark.parametrize("msg", ["this is the message to sign", "this is meessage to sign\n with newline", "a"*200])