# (c) Copyright 2021 by Coinkite Inc. This file is covered by license found in COPYING-CC. # # nfc.py -- Add some NFC tag-like features to Mk4 # # - using ST ST25DV64KC # - on it's own I2C bus (not shared) # - has GPIO signal "??" which is multipurpose on its own pin # - this chip chosen because it can disable RF interaction # import utime, ngu, ndef, stash, chains from uasyncio import sleep_ms import uasyncio as asyncio from ustruct import pack, unpack from ubinascii import unhexlify as a2b_hex from ubinascii import b2a_base64, a2b_base64 from ux import ux_show_story, ux_wait_keydown, OK, X from utils import B2A, problem_file_line, txid_from_fname from public_constants import AF_CLASSIC from charcodes import KEY_ENTER, KEY_CANCEL # practical limit for things to share: 8k part, minus overhead MAX_NFC_SIZE = const(8000) # i2c address (7-bits) is not simple... # - assume defaults of E0=1 and I2C_DEVICE_CODE=0xa # - also 0x2d which isn't documented and no idea what it is I2C_ADDR_USER = const(0x53) I2C_ADDR_SYS = const(0x57) I2C_ADDR_RF_ON = const(0x51) I2C_ADDR_RF_OFF = const(0x55) # Dynamic regs GPO_CTRL_Dyn = const(0x2000) # GPO control EH_CTRL_Dyn = const(0x2002) # Energy Harvesting management & usage status RF_MNGT_Dyn = const(0x2003) # RF interface usage management I2C_SSO_Dyn = const(0x2004) # I2C security session status IT_STS_Dyn = const(0x2005) # Interrupt Status MB_CTRL_Dyn = const(0x2006) # Fast transfer mode control and status MB_LEN_Dyn = const(0x2007) # Length of fast transfer mode message # Sys config area GPO1_CFG = const(0x00) # GPIO config 1 GPO2_CFG = const(0x01) # GPIO config 2 RF_MNGT = const(0x03) # RF interface state after Power ON I2C_CFG = const(0x0e) I2C_PWD = const(0x900) # I2C security session password, 8 bytes class NFCHandler: def __init__(self): from machine import I2C, Pin self.i2c = I2C(1, freq=400000) self.last_edge = 0 self.pin_ed = Pin('NFC_ED', mode=Pin.IN, pull=Pin.PULL_UP) try: # Q1 and maybe later Mk4's have a light self.active_led = Pin('NFC_ACTIVE', mode=Pin.OUT, value=0) except ValueError: self.active_led = lambda n: None # track time of last edge def _irq(x): self.last_edge = utime.ticks_ms() self.pin_ed.irq(_irq, Pin.IRQ_FALLING) @classmethod def startup(cls): import glob n = cls() try: n.setup() glob.NFC = n except BaseException as exc: # i2c comms errors probably #sys.print_exception(exc) # debug only remove me print("NFC absent/disabled") del n def shutdown(self): # we aren't wanted anymore self.set_rf_disable(True) import glob glob.NFC = None # flash memory access (fixed tag data): 0x0 to 0x2000 def read(self, offset, count): return self.i2c.readfrom_mem(I2C_ADDR_USER, offset, count, addrsize=16) def write(self, offset, data): # various limits in place here? Not clear self.i2c.writeto_mem(I2C_ADDR_USER, offset, data, addrsize=16) async def big_write(self, data): # write lots to start of flash (new ndef records) for pos in range(0, len(data), 256): here = memoryview(data)[pos:pos+256] self.i2c.writeto_mem(I2C_ADDR_USER, pos, here, addrsize=16) # 6ms per 16 byte row, worst case, so ~100ms here! await self.wait_ready() async def wipe(self, full_wipe): # Tag value is stored in flash cells, so want to clear # once we're done in case it's sensitive. But too slow to # clear entire chip most of time, just do first 512 bytes, # and dont wait for last to complete from glob import dis here = bytes(256) end = 8196 for pos in range(0, end, 256): self.i2c.writeto_mem(I2C_ADDR_USER, pos, here, addrsize=16) if (pos == 256) and not full_wipe: break # 6ms per 16 byte row, worst case, so ~100ms here per iter! 3.2seconds total if full_wipe: dis.progress_bar_show(pos / end) await self.wait_ready() # system config area (flash cells, but affect operation): table 12 def read_config(self, offset, count): return self.i2c.readfrom_mem(I2C_ADDR_SYS, offset, count, addrsize=16) def write_config(self, offset, data): # not all areas are writable self.i2c.writeto_mem(I2C_ADDR_SYS, offset, data, addrsize=16) def read_config1(self, offset): return self.i2c.readfrom_mem(I2C_ADDR_SYS, offset, 1, addrsize=16)[0] def write_config1(self, offset, value): self.i2c.writeto_mem(I2C_ADDR_SYS, offset, bytes([value]), addrsize=16) # dynamic registers (state control, bytes): table 13 def read_dyn(self, offset): assert 0x2000 <= offset < 0x2008 return self.i2c.readfrom_mem(I2C_ADDR_USER, offset, 1, addrsize=16)[0] def write_dyn(self, offset, val): assert 0x2000 <= offset < 0x2008 m = bytes([val]) self.i2c.writeto_mem(I2C_ADDR_USER, offset, m, addrsize=16) def is_rf_disabled(self): # not checking if disable/sleep vs. off return (self.read_dyn(RF_MNGT_Dyn) != 0) def set_rf_disable(self, val): # set light to match state self.active_led(not val) # using stronger "off" rather than sleep/disable if val: self.i2c.writeto(I2C_ADDR_RF_OFF, b'') assert self.read_dyn(RF_MNGT_Dyn) & 0x4 return # re-enable (turn on) for i in range(10): try: self.i2c.writeto(I2C_ADDR_RF_ON, b'') self.write_dyn(RF_MNGT_Dyn, 0) assert self.read_dyn(RF_MNGT_Dyn) == 0x0 return except: # assertion, OSError(ENODEV) # handle no-ACK cases (sometimes, after bigger write to flash) utime.sleep_ms(25) else: raise RuntimeError("timeout") def send_pw(self, pw=None): # show we know a password (but sent cleartext, very lame) # - keeping as zeros for now, so pointless anyway pw = pw or bytes(8) assert len(pw) == 8 msg = pw + b'\x09' + pw self.write_config(I2C_PWD, msg) return (self.read_dyn(I2C_SSO_Dyn) & 0x1 == 0x1) # else "wrong pw" def get_uid(self): # Unique id for chip. Required for RF protocol. return ':'.join('%02x'% i for i in reversed(self.uid)) def dump_ndef(self): # dump what we are showing, skipping the CCFILE and wrapping # - used in test cases, and psbt rx taste = self.read(0, 16) st, ll, _, _ = ndef.ccfile_decode(taste) return self.read(st, ll) def firsttime_setup(self): # always setup IC_RF_SWITCHOFF_EN bit in I2C_CFG register # - so we can module RF support with special i2c addresses # - keep default other bits: 0x1a (i2c base address) self.write_config1(I2C_CFG, 0x3a) utime.sleep_ms(10) # required # set to no RF when first powered up (so CC is quiet when system unpowered) # - side-effect: sets rf to sleep now too self.write_config1(RF_MNGT, 2) utime.sleep_ms(10) # might be needed? # XXX locking stuff? def setup(self): # check if present, alive self.uid = self.read_config(0x18, 8) assert self.uid[-1] == 0xe0 # ST manu code # read size of memory mem_size = (unpack('= MAX_NFC_SIZE: await ux_show_story("Transaction is too large to share via NFC") return n = ndef.ndefMaker() line2 = None if txid is not None: n.add_text('Signed Transaction: ' + txid) n.add_custom('bitcoin.org:txid', a2b_hex(txid)) # want binary line2 = self.txid_line2(txid) n.add_custom('bitcoin.org:sha256', txn_sha) n.add_large_object('bitcoin.org:txn', file_offset, txn_len) return await self.share_loop(n, line2=line2) @staticmethod def txid_line2(txid): return "Signed TXID: %s⋯%s" % (txid[0:8], txid[-8:]) async def share_push_tx(self, url, txid, txn, txn_sha, line2=None): # Given a signed TXN, we convert to URL which a web backend can broadcast directly # - using base64url encoding # - just appends to provided URL # - keeps showing it until they press CANCEL # - may fail late if txn is too big.. not clear what limit is # from utils import b2a_base64url from chains import current_chain is_https = url.startswith('https://') if is_https: url = url[8:] url += 't=' + b2a_base64url(txn) + '&c=' + b2a_base64url(txn_sha[-8:]) ch = current_chain() if ch.ctype != 'BTC': url += '&n=' + ch.ctype # XTN or XRT if len(url) >= MAX_NFC_SIZE: # ignoring overhead, this will not fit: so fail raise ValueError("too big") n = ndef.ndefMaker() n.add_url(url, https=is_https) if line2 is None: line2 = self.txid_line2(txid) await self.share_loop(n, prompt="Tap to broadcast, CANCEL when done", line2=line2) async def push_tx_from_file(self): # Pick (signed txn) file from SD card and broadcast via PushTx # - assumes .txn extension (required) # - hex encoding or binary # - txid is filename, if 64 chars long; else shown on-screen # - assumes txn on same chain as this CC is; ie. not testnet typically from actions import file_picker from files import CardSlot, CardMissingError, needs_microsd from glob import settings def is_suitable(fname): return fname.lower().endswith('.txn') url = settings.get('ptxurl', False) assert url # or else not in menu, cant get here. while 1: fn = await file_picker(min_size=10, max_size=MAX_NFC_SIZE*2, taster=is_suitable) if not fn: return basename = fn.split('/')[-1] try: with CardSlot() as card: with open(fn, 'rb') as fp: data = fp.read(MAX_NFC_SIZE*2).strip() # newlines and carriage returns assert len(data) < MAX_NFC_SIZE*2, "bad read" except CardMissingError: await needs_microsd() return except Exception as e: await ux_show_story( title="ERROR", msg='Read failed!\n\n%s\n%s' % (e, problem_file_line(e)) ) return # maybe decode # targeting last three zero bytes of tx version if data[2:8] == b'000000': # it's a txn, and we wrote as hex data = a2b_hex(data) elif data[1:4] == bytes(3): # looks like binary pass else: raise ValueError("Doesn't look like txn?") sha = ngu.hash.sha256s(data) txid = txid_from_fname(basename) line2 = None if not txid: # assume a r random filename, and not easy to recalc txid here # so show filename instead line2 = 'File: ' + basename if len(line2) > 34: # CHARS_W line2 = line2[:32]+'⋯' # 34-2=32 => because double-width char await self.share_push_tx(url, txid, data, sha, line2=line2) async def share_psbt(self, file_offset, psbt_len, psbt_sha, label=None): # we just signed something, share it over NFC if psbt_len >= MAX_NFC_SIZE: await ux_show_story("PSBT is too large to share via NFC") return n = ndef.ndefMaker() label = label or 'Partly signed PSBT' n.add_text(label) n.add_custom('bitcoin.org:sha256', psbt_sha) n.add_large_object('bitcoin.org:psbt', file_offset, psbt_len) return await self.share_loop(n, line2=label) async def share_json(self, json_data, **kws): # a text file of JSON for programs to read n = ndef.ndefMaker() n.add_mime_data('application/json', json_data) return await self.share_loop(n, **kws) async def share_text(self, data, **kws): # share text from a list of values # - just a text file, no multiple records; max usability! n = ndef.ndefMaker() n.add_text(data) return await self.share_loop(n, **kws) async def wait_ready(self): # block until chip ready to continue (ACK happens) # - especially after any flash write, which is very slow: 5.5ms per 16byte while 1: try: self.i2c.readfrom_mem(I2C_ADDR_USER, 0, 0, addrsize=16) return except OSError: await sleep_ms(3) async def setup_gpio(self): # setup GPIO (ED) signal for detecting activity # - GPO1_CFG seems to be a flash cell, and takes time to write want = 0x1 | 0x80 | 0x04 # enable, and RF_ACTIVITY_EN | RF_WRITE_EN if self.read_config1(GPO1_CFG) != want: self.write_config1(GPO1_CFG, want) # not clear how much delay is needed, but need some await self.wait_ready() self.last_edge = 0 self.write_dyn(GPO_CTRL_Dyn, 0x01) # GPO_EN self.read_dyn(IT_STS_Dyn) # clear interrupt async def ux_animation(self, allow_enter=True, prompt=None, line2=None, is_secret=False, exit_after_activity=True, min_delay=1000): # Run the pretty animation, and detect both when we are written, and/or key to exit/abort. # - similar when "read" and then removed from field # - return T if aborted by user from glob import dis await self.wait_ready() self.set_rf_disable(0) await self.setup_gpio() if dis.has_lcd: dis.real_clear() # bugfix dis.text(None, -2, prompt or 'Tap phone to screen, or CANCEL.', dark=True) if line2: dis.text(None, -3, line2) else: from graphics_mk4 import Graphics from version import mk_num frames = [getattr(Graphics, 'mk%d_nfc_%d'%(mk_num, i)) for i in range(1, 5)] aborted = True phase = -1 last_activity = None # (ms) How long to wait after RF field comes and goes # - user can press OK during this period if they know they are done while 1: if dis.has_lcd: phase = (phase + 1) % 2 dis.image(None, 59, 'nfc_%d' % phase) else: dis.clear() phase = (phase + 1) % 4 dis.icon(0, 8, frames[phase]) dis.show() # wait for key or 250ms animation delay ch = await ux_wait_keydown(KEY_ENTER+KEY_CANCEL+'xy', 250) if self.last_edge: self.last_edge = 0 # detect various types of RF activity, so we can clear screen automatically await self.wait_ready() try: events = self.read_dyn(IT_STS_Dyn) except OSError: # ENODEV #print("r_dyn fail") events = 0 if events & 0x02: # 0x2 = RF activity last_activity = utime.ticks_ms() # X or OK to quit, but with slightly different meanings if ch: if ch in 'x'+KEY_CANCEL: aborted = True break elif allow_enter and ch in 'y'+KEY_ENTER: aborted = False break if exit_after_activity and last_activity: dt = utime.ticks_diff(utime.ticks_ms(), last_activity) if dt >= min_delay: # They acheived some RF activity and then nothing for some time, so # we are done w/ success. aborted = False break self.set_rf_disable(1) return aborted async def share_start(self, ndef_obj, **kws): # do the UX while we are sharing a value over NFC # - assumpting is people know what they are scanning # - x key to abort early, but also self-clears await self.big_write(ndef_obj.bytes()) return await self.ux_animation(**kws) async def start_nfc_rx(self, **kws): # Pretend to be a big warm empty tag ready to be stuffed with data await self.big_write(ndef.CC_WR_FILE) # wait until something is written aborted = await self.ux_animation(min_delay=3000, **kws) if aborted: return # read CCFILE area (header) prob = taste = '' try: taste = self.read(0, 16) st, ll, _, _ = ndef.ccfile_decode(taste) except Exception as e: # robustness; need to handle all failures here prob = str(e) ll = None if not ll or prob: # they wrote nothing / failed write something we could parse msg = "No tag data was written?" if taste: msg += '\n\n' + B2A(taste) if prob: msg += '\n\n' + prob await ux_show_story(msg, title="Sorry!") return # copy to ram, wipe rv = self.read(st, ll) await self.wipe(False) return rv async def start_psbt_rx(self): from auth import psbt_encoding_taster, TXN_INPUT_OFFSET from auth import UserAuthorizedAction, ApproveTransaction from ux import the_ux from sffile import SFFile data = await self.start_nfc_rx() if not data: return psbt_in = None psbt_sha = None try: for urn, msg, meta in ndef.record_parser(data): if len(msg) > 100: # attempt to decode any large object, ignore type for max compat try: decoder, output_encoder, psbt_len = \ psbt_encoding_taster(msg[0:10], len(msg)) psbt_in = msg except ValueError: continue if urn == 'urn:nfc:ext:bitcoin.org:sha256' and len(msg) == 32: # probably produced by another Coldcard: SHA256 over expected contents psbt_sha = bytes(msg) except Exception: pass # dont crash when given garbage if psbt_in is None: await ux_show_story("Could not find PSBT in what was written.", title="Sorry!") return # decode into PSRAM total = 0 with SFFile(TXN_INPUT_OFFSET, max_size=psbt_len) as out: if not decoder: total = out.write(psbt_in) else: for here in decoder.more(psbt_in): total += out.write(here) # might have been whitespace inflating initial estimate of PSBT size, adjust assert total <= psbt_len psbt_len = total # start signing UX UserAuthorizedAction.cleanup() UserAuthorizedAction.active_request = ApproveTransaction( psbt_len, psbt_sha=psbt_sha, input_method="nfc", output_encoder=output_encoder ) # kill any menu stack, and put our thing at the top the_ux.push(UserAuthorizedAction.active_request) @classmethod async def selftest(cls): # Check for chip present, field present .. and that it works # - important: do not allow user (tester) to quit without sending anything over link n = cls() n.setup() assert n.uid nn = ndef.ndefMaker() nn.add_text("NFC is working: %s" % n.get_uid()) aborted = await n.share_start(nn, allow_enter=False) assert not aborted, "Aborted" async def share_file(self): # Pick file from SD card and share over NFC... from actions import file_picker from files import CardSlot, CardMissingError, needs_microsd def is_suitable(fname): f = fname.lower() return f.endswith('.psbt') or f.endswith('.txn') \ or f.endswith('.txt') or f.endswith('.json') or f.endswith('.sig') while 1: fn = await file_picker(min_size=10, max_size=MAX_NFC_SIZE, taster=is_suitable) if not fn: return basename = fn.split('/')[-1] ext = fn.split('.')[-1].lower() try: with CardSlot() as card: with open(fn, 'rb') as fp: data = fp.read(MAX_NFC_SIZE) except CardMissingError: await needs_microsd() return if ext == 'txn': txid = txid_from_fname(basename) if data[2:8] == b'000000': # it's a txn, and we wrote as hex data = a2b_hex(data) else: assert data[1:4] == bytes(3) sha = ngu.hash.sha256s(data) await self.share_signed_txn(txid, data, len(data), sha) elif ext == 'psbt': sha = ngu.hash.sha256s(data) await self.share_psbt(data, len(data), sha, label="PSBT file: " + basename) elif ext in ('txt', 'sig'): await self.share_text(data.decode()) elif ext == 'json': await self.share_json(data.decode()) else: raise ValueError(ext) async def import_multisig_nfc(self, *a): # user is pushing a file downloaded from another CC over NFC # - would need an NFC app in between for the sneakernet step # get some data def f(m): if len(m) < 70: return m = m.decode() # multi( catches both multi( and sortedmulti( if 'pub' in m or "multi(" in m: return m winner = await self._nfc_reader(f, 'Unable to find multisig descriptor.') if winner: from auth import maybe_enroll_xpub try: maybe_enroll_xpub(config=winner) except Exception as e: #import sys; sys.print_exception(e) await ux_show_story('Failed to import.\n\n%s\n%s' % (e, problem_file_line(e))) async def import_ephemeral_seed_words_nfc(self, *a): def f(m): sm = m.decode().strip().split(" ") if len(sm) in stash.SEED_LEN_OPTS: return sm winner = await self._nfc_reader(f, 'Unable to find seed words') if winner: try: from seed import set_ephemeral_seed_words await set_ephemeral_seed_words(winner, origin='NFC Import') except Exception as e: #import sys; sys.print_exception(e) await ux_show_story('Failed to import.\n\n%s\n%s' % (e, problem_file_line(e))) async def address_show_and_share(self): from auth import show_address def f(m): sm = m.decode().split("\n") if 1 <= len(sm) <= 2: return sm winner = await self._nfc_reader(f, 'Expected address and derivation path.') if not winner: return if len(winner) == 1: subpath = winner[0] addr_fmt = AF_CLASSIC else: subpath, addr_fmt_str = winner try: addr_fmt = chains.parse_addr_fmt_str(addr_fmt_str) except AssertionError as e: await ux_show_story(str(e)) return active_request = show_address(addr_fmt, subpath, restore_menu=True) from ux import the_ux the_ux.push(active_request) await the_ux.interact() # need this otherwise NFC animation takes over async def start_msg_sign(self): from auth import approve_msg_sign def f(m): m = m.decode() split_msg = m.split("\n") if 1 <= len(split_msg) <= 3: return m winner = await self._nfc_reader(f, 'Unable to find correctly formated message to sign.') if not winner: return await approve_msg_sign(None, None, None, approved_cb=self.msg_sign_done, msg_sign_request=winner) async def msg_sign_done(self, signature, address, text): from msgsign import rfc_signature_template sig = b2a_base64(signature).decode('ascii').strip() armored_str = "".join(rfc_signature_template(addr=address, msg=text, sig=sig)) await self.share_text(armored_str) async def verify_sig_nfc(self): from msgsign import verify_armored_signed_msg f = lambda x: x.decode().strip() if b"SIGNED MESSAGE" in x else None winner = await self._nfc_reader(f, 'Unable to find signed message.') if winner: await verify_armored_signed_msg(winner, digest_check=False) async def read_address(self): # Read an address or BIP-21 url and parse out addr (just one) from utils import decode_bip21_text def f(m): m = m.decode() what, vals = decode_bip21_text(m) if what == 'addr': return vals winner = await self._nfc_reader(f, 'Unable to find address from NFC data.') return winner async def verify_address_nfc(self): # Get an address or complete bip-21 url even and search it... slow. res = await self.read_address() if not res: return _, addr, args = res from ownership import OWNERSHIP await OWNERSHIP.search_ux(addr, args) async def read_extended_private_key(self): f = lambda x: x.decode().strip() if b"prv" in x else None return await self._nfc_reader(f, 'Unable to find extended private key.') async def read_tapsigner_b64_backup(self): f = lambda x: a2b_base64(x.decode()) if 150 <= len(x) <= 280 else None return await self._nfc_reader(f, 'Unable to find base64 encoded TAPSIGNER backup.') async def read_bip322_msg(self): f = lambda x: x.decode() return await self._nfc_reader(f, 'Unable to find BIP-322 message.') async def read_wif(self): # only compressed WIFs allowed f = lambda x: x.decode() if len(x) >= 51 else None return await self._nfc_reader(f, 'Unable to find WIF key(s).') async def _nfc_reader(self, func, fail_msg): data = await self.start_nfc_rx() if not data: return winner = None try: for urn, msg, meta in ndef.record_parser(data): msg = bytes(msg) try: r = func(msg) if r is not None: winner = r break except: pass except Exception: pass # dont crash when given garbage if not winner: await ux_show_story(fail_msg) return return winner # EOF