# (c) Copyright 2020 by Coinkite Inc. This file is covered by license found in COPYING-CC. # import pytest, glob, time, sys, random, re from pprint import pprint from ckcc.protocol import CCProtocolPacker, CCProtoError from helpers import B2A, U2SAT, prandom from api import bitcoind, match_key, bitcoind_finalizer, bitcoind_analyze, bitcoind_decode, explora from api import bitcoind_wallet, bitcoind_d_wallet from binascii import b2a_hex, a2b_hex from constants import * # lock down randomness random.seed(42) def pytest_addoption(parser): parser.addoption("--dev", action="store_true", default=False, help="run on real dev") parser.addoption("--sim", action="store_true", default=True, help="run on simulator") parser.addoption("--manual", action="store_true", default=False, help="operator must press keys on real CC") parser.addoption("--mk", default=3, help="Assume mark N hardware") parser.addoption("--duress", action="store_true", default=False, help="assume logged-in with duress PIN") parser.addoption("--ms-danger", action="store_true", default=False, help="Operate with multisig checks off") @pytest.fixture(scope='session') def dev(request): # a connected Coldcard (via USB) .. or the simulator # use command line --sim or --dev to pick, default is sim from ckcc_protocol.client import ColdcardDevice config = request.config if config.getoption("--dev"): return ColdcardDevice() else: # manually get the simulator fixture simulator = request.getfixturevalue('simulator') return simulator @pytest.fixture(scope='session') def simulator(request): # get a connection to simulator (only, never USB dev) from ckcc_protocol.client import ColdcardDevice if not request.config.getoption("--sim") or request.config.getoption("--dev"): raise pytest.skip('need simulator for this test, have real device') try: return ColdcardDevice(sn=SIM_PATH) except: print("Simulator is required for this test") raise pytest.fail('missing simulator') @pytest.fixture(scope='module') def sim_exec(dev): # run code in the simulator's interpretor def doit(cmd): s = dev.send_recv(b'EXEC' + cmd.encode('utf-8')) return s.decode('utf-8') if not isinstance(s, str) else s return doit @pytest.fixture(scope='module') def sim_eval(dev): # eval an expression in the simulator's interpretor def doit(cmd, timeout=None): return dev.send_recv(b'EVAL' + cmd.encode('utf-8'), timeout=timeout).decode('utf-8') return doit @pytest.fixture(scope='module') def sim_execfile(simulator): # run a whole file in the simulator's interpretor import os def doit(fname, timeout=None): fn = os.path.realpath(fname) hook = 'execfile("%s")' % fn return simulator.send_recv(b'EXEC' + hook.encode('utf-8'), timeout=timeout).decode('utf-8') return doit @pytest.fixture(scope='module') def is_simulator(dev): def doit(): return hasattr(dev.dev, 'pipe') return doit @pytest.fixture(scope='module') def sim_card_ejected(sim_exec, is_simulator): def doit(ejected): if not is_simulator(): # assuming no card on device if not ejected: raise pytest.fail('cant insert on real dev') else: return # see unix/frozen-modules/pyb.py class SDCard cmd = f'import pyb; pyb.SDCard.ejected={ejected}; RV.write("ok")' assert sim_exec(cmd) == 'ok' yield doit if is_simulator(): doit(False) @pytest.fixture(scope='module') def send_ux_abort(simulator): def doit(): # simulator has special USB command # - this is a special "key" simulator.send_recv(CCProtocolPacker.sim_ux_abort()) return doit @pytest.fixture(scope='module') def need_keypress(dev, request): def doit(k, timeout=1000): if request.config.getoption("--manual"): # need actual user interaction print("==> NOW, on the Coldcard, press key: %r (then enter here)" % k, file=sys.stderr) input() else: # simulator has special USB command, and can be used on real device w/ enuf setup dev.send_recv(CCProtocolPacker.sim_keypress(k.encode('ascii')), timeout=timeout) if 0: # try to use debug interface to simulate the press # XXX for some reason, picocom must **already** be running for this to work. # - otherwise, this locks up devs = list(glob.glob('/dev/tty.usbmodem*')) if len(devs) == 1: with open(devs[0], 'wb', 0) as fd: fd.write(k.encode('ascii')) else: raise pytest.fail('need to provide keypresses') return doit @pytest.fixture(scope='module') def enter_number(need_keypress): def doit(number): number = str(number) if not isinstance(number, str) else number for d in number: need_keypress(d) need_keypress('y') return doit @pytest.fixture(scope='module') def master_xpub(dev): if hasattr(dev.dev, 'pipe'): # this works better against simulator in HSM mode, where the xpub cmd may be disabled return simulator_fixed_xpub r = dev.send_recv(CCProtocolPacker.get_xpub('m'), timeout=None, encrypt=1) assert r[1:4] == 'pub', r if r[0:4] == dev.master_xpub[0:4]: assert r == dev.master_xpub elif dev.master_xpub: # testnet vs. mainnet difference from pycoin.key.BIP32Node import BIP32Node a = BIP32Node.from_wallet_key(r) b = BIP32Node.from_wallet_key(dev.master_xpub) assert a.secret_exponent() == b.secret_exponent() return r @pytest.fixture(scope='module') def unit_test(sim_execfile): def doit(filename): rv = sim_execfile(filename) if rv: pytest.fail(rv) return doit @pytest.fixture(scope='module') def get_settings(sim_execfile): # get all settings def doit(): from json import loads resp = sim_execfile('devtest/get-settings.py') assert 'Traceback' not in resp return loads(resp) return doit @pytest.fixture(scope='module') def get_setting(sim_execfile, sim_exec): # get an indivudal setting def doit(name): from json import loads sim_exec('import main; main.SKEY = %r; ' % name) resp = sim_execfile('devtest/get-setting.py') assert 'Traceback' not in resp return loads(resp) return doit @pytest.fixture(scope='module') def addr_vs_path(master_xpub): from pycoin.key.BIP32Node import BIP32Node from ckcc_protocol.constants import AF_CLASSIC, AFC_PUBKEY, AF_P2WPKH, AFC_SCRIPT from ckcc_protocol.constants import AF_P2WPKH_P2SH, AF_P2SH, AF_P2WSH, AF_P2WSH_P2SH from bech32 import bech32_decode, convertbits from pycoin.encoding import a2b_hashed_base58, hash160 from pycoin.key.BIP32Node import PublicPrivateMismatchError from hashlib import sha256 def doit(given_addr, path=None, addr_fmt=None, script=None): if not script: try: # prefer using xpub if we can mk = BIP32Node.from_wallet_key(master_xpub) sk = mk.subkey_for_path(path[2:]) except PublicPrivateMismatchError: mk = BIP32Node.from_wallet_key(simulator_fixed_xprv) sk = mk.subkey_for_path(path[2:]) if addr_fmt == AF_CLASSIC: # easy assert sk.address() == given_addr elif addr_fmt & AFC_PUBKEY: pkh = sk.hash160(use_uncompressed=False) if addr_fmt == AF_P2WPKH: hrp, data = bech32_decode(given_addr) decoded = convertbits(data[1:], 5, 8, False) assert hrp in {'tb', 'bc' } assert bytes(decoded[-20:]) == pkh else: assert addr_fmt == AF_P2WPKH_P2SH assert given_addr[0] in '23' expect = a2b_hashed_base58(given_addr)[1:] assert len(expect) == 20 assert hash160(b'\x00\x14' + pkh) == expect elif addr_fmt & AFC_SCRIPT: assert script, 'need a redeem/witness script' if addr_fmt == AF_P2SH: assert given_addr[0] in '23' expect = a2b_hashed_base58(given_addr)[1:] assert hash160(script) == expect elif addr_fmt == AF_P2WSH: hrp, data = bech32_decode(given_addr) assert hrp in {'tb', 'bc' } decoded = convertbits(data[1:], 5, 8, False) assert bytes(decoded[-32:]) == sha256(script).digest() elif addr_fmt == AF_P2WSH_P2SH: assert given_addr[0] in '23' expect = a2b_hashed_base58(given_addr)[1:] assert hash160(b'\x00\x20' + sha256(script).digest()) == expect else: raise pytest.fail(f'not ready for {addr_fmt:x} yet') else: raise ValueError(addr_fmt) return sk if not script else None return doit @pytest.fixture(scope='module') def capture_enabled(sim_eval): # need to have sim_display imported early, see unix/frozen-modules/ckcc # - could be xfail or xskip here assert sim_eval("'sim_display' in sys.modules") == 'True' @pytest.fixture(scope='module') def cap_menu(sim_execfile): "Return menu items as a list" def doit(): return sim_execfile('devtest/cap-menu.py').split('\n') return doit @pytest.fixture(scope='module') def cap_screen(sim_execfile): def doit(): return sim_execfile('devtest/cap-screen.py') return doit @pytest.fixture(scope='module') def cap_story(sim_execfile): # returns (title, body) of whatever story is being actively shown def doit(): return sim_execfile('devtest/cap-story.py').split('\0', 1) return doit @pytest.fixture(scope='module') def cap_image(sim_execfile): def flip(raw): reorg = bytearray(128*64) j = 0 for y in range(64//8): for by in range(8): for x in range(128): reorg[j] = 255 if (raw[x+(128*y)] & (1 << by)) else 0 j += 1 return bytes(reorg) # returns Pillow image of whatever story is being actively shown on OLED def doit(): from PIL import Image raw = a2b_hex(sim_execfile('devtest/cap-image.py')) assert len(raw) == (128*64//8) return Image.frombytes('L', (128,64), flip(raw), 'raw') return doit QR_HISTORY = [] @pytest.fixture(scope='session') def qr_quality_check(): # Use this with cap_screen_qr print("QR codes will be captured and shown at end of run.") yield None # quick test: # py.test test_drv_entro.py -k test_path_index --ff -k '0-64-bytes' # global QR_HISTORY if not QR_HISTORY: return import textwrap from PIL import Image, ImageOps, ImageFont, ImageDraw w,h = QR_HISTORY[0][1].size count = len(QR_HISTORY) TH = 32 scale=3 rv = Image.new('RGB', (w*scale, ((h*scale)+TH)*count), color=(64,64,64)) y = 0 fnt = ImageFont.truetype('Courier', size=10) dr = ImageDraw.Draw(rv) mw = int((w*scale) / dr.textsize('M', fnt)[0]) for test_name, img in QR_HISTORY: if '[' in test_name: test_name = test_name[test_name.index('['):].replace(' (call)','') else: test_name = test_name.replace(' (call)','') img = img.resize((w*scale,h*scale), resample=Image.NEAREST) rv.paste(img, (0, y)) y += (h*scale) dr.multiline_text((4, y+3), textwrap.fill(test_name, mw), font=fnt, fill=(0,255,0)) y += TH #rv = rv.resize(tuple(c*4 for c in rv.size), resample=Image.NEAREST) rv.save('debug/all-qrs.png') rv.show() @pytest.fixture(scope='module') def cap_screen_qr(cap_image): def doit(x=0, w=64): # NOTE: version=4 QR is pixel doubled to be 66x66 with 2 missing lines at bottom # LATER: not doing that anymore; v=3 doubled, all higher 1:1 pixels (tiny) global QR_HISTORY try: import zbar except ImportError: raise pytest.skip('need zbar-py module') import numpy, os from PIL import ImageOps # see orig_img = cap_image() # document it if x < 10: # removes dups: happen when same image samples for two different # QR's in side-by-side mode tname = os.environ.get('PYTEST_CURRENT_TEST') QR_HISTORY.append( (tname, orig_img) ) img = orig_img.crop( (x, 0, x+w, w) ) img = ImageOps.expand(img, 16, 0) img = img.resize( (256, 256)) img.save('debug/last-qr.png') #img.show() scanner = zbar.Scanner() np = numpy.array(img.getdata(), 'uint8').reshape(img.width, img.height) for sym, value, *_ in scanner.scan(np): assert sym == 'QR-Code', 'unexpected symbology: ' + sym return value # bytes, could be binary # debug: check debug/last-qr.png raise pytest.fail('qr code not found') return doit @pytest.fixture(scope='module') def get_pp_sofar(sim_execfile): # get entry value for bip39 passphrase def doit(): resp = sim_execfile('devtest/get_pp_sofar.py') assert 'Error' not in resp return resp return doit @pytest.fixture(scope='module') def get_secrets(sim_execfile): # returns big dict based on what we'd normally put into a backup file. def doit(): from json import loads rv = dict() resp = sim_execfile('devtest/get-secrets.py') assert 'Error' not in resp for ln in resp.split('\n'): ln = ln.strip() if '#' in ln: ln = ln[0:ln.index('#')] if not ln: continue assert ' = ' in ln n, v = ln.split(' = ', 1) rv[n] = loads(v) return rv return doit @pytest.fixture def goto_home(cap_menu, need_keypress, pick_menu_item): def doit(): # get to top, force a redraw for i in range(10): need_keypress('x') time.sleep(.01) # required # special case to get out of passphrase menu if 'CANCEL' in cap_menu(): pick_menu_item('CANCEL') time.sleep(.01) need_keypress('y') need_keypress('0') # check menu contents m = cap_menu() assert 'Ready To Sign' in m return doit @pytest.fixture def pick_menu_item(cap_menu, need_keypress): WRAP_IF_OVER = 16 # see ../shared/menu.py def doit(text): need_keypress('0') m = cap_menu() if text not in m: raise KeyError(text, "%r not in menu: %r" % (text, m)) m_pos = m.index(text) if len(m) > WRAP_IF_OVER and m_pos > (len(m)//2): # use wrap around, work up from bottom for n in range(len(m) - m_pos): need_keypress('5') time.sleep(.01) # required need_keypress('y') time.sleep(.01) # required else: # go down for n in range(m_pos): need_keypress('8') time.sleep(.01) # required need_keypress('y') time.sleep(.01) # required return doit @pytest.fixture(scope='module') def microsd_path(simulator): # open a file from the simulated microsd def doit(fn): return '../unix/work/MicroSD/' + fn return doit @pytest.fixture(scope='module') def open_microsd(simulator, microsd_path): # open a file from the simulated microsd def doit(fn, mode='rb'): return open(microsd_path(fn), mode) return doit @pytest.fixture(scope="function") def set_master_key(sim_exec, sim_execfile, simulator, reset_seed_words): # load simulator w/ a specific bip32 master key def doit(prv): assert prv[1:4] == 'prv' sim_exec('import main; main.TPRV = %r; ' % prv) rv = sim_execfile('devtest/set_tprv.py') if rv: pytest.fail(rv) simulator.start_encryption() simulator.check_mitm() #print("sim xfp: 0x%08x" % simulator.master_fingerprint) return simulator.master_fingerprint yield doit # Important cleanup: restore normal key, because other tests assume that # - actually need seed words for all tests reset_seed_words() @pytest.fixture(scope="function") def set_xfp(sim_exec, sim_execfile, simulator, reset_seed_words): # set the XFP, without really knowing the private keys # - won't be able to sign, but should accept PSBT for signing def doit(xfp): assert len(xfp) == 8, "expect 8 hex digits" import struct need_xfp, = struct.unpack(">> ', ln self.sio.timeout = 0.250 def eval(self, cmd, max_time=3): # send a command, wait for it to finish (next prompt) and eval the response print("eval: %r" % cmd) self.sio.write(cmd.encode('ascii') + b'\r') self.sio.timeout = max_time lines = [] while 1: resp = self.sio.readline().decode('ascii') if resp.startswith('>>> '): break lines.append(resp) if any('Traceback' in l for l in lines): raise RuntimeError(''.join(lines)) if len(lines) == 0: raise RuntimeError("timeout/got nothing") if len(lines) == 1: # cmd printed nothing, meaning it returned None and REPL hid that assert lines[0].startswith(cmd), lines return None try: return eval(lines[-1]) except: raise RuntimeError(''.join(lines)) def exec(self, cmd, proc_time=1): # send a (one line) command and read the one-line response print("exec: %r" % cmd) self.sio.write(cmd.encode('ascii') + b'\r') self.sio.timeout = 0.2 echo = self.sio.readline() #print("echo: %r" % echo.decode('ascii')) assert cmd.encode('ascii') in echo self.sio.timeout = proc_time resp = self.sio.readline().decode('ascii') #print("resp: %r" % resp) return resp return USBRepl() @pytest.fixture() def decode_with_bitcoind(bitcoind): def doit(raw_txn): # verify our understanding of a TXN (and esp its outputs) matches # the same values as what bitcoind generates try: return bitcoind.decoderawtransaction(B2A(raw_txn)) except ConnectionResetError: # bitcoind sleeps on us sometimes, give it another chance. return bitcoind.decoderawtransaction(B2A(raw_txn)) return doit @pytest.fixture() def decode_psbt_with_bitcoind(bitcoind): def doit(raw_psbt): # verify our understanding of a PSBT against bitcoind from base64 import b64encode try: return bitcoind.decodepsbt(b64encode(raw_psbt).decode('ascii')) except ConnectionResetError: # bitcoind sleeps on us sometimes, give it another chance. return bitcoind.decodepsbt(b64encode(raw_psbt).decode('ascii')) return doit @pytest.fixture() def check_against_bitcoind(bitcoind, sim_exec, sim_execfile): def doit(hex_txn, fee, num_warn=0, change_outs=None, dests=[]): # verify our understanding of a TXN (and esp its outputs) matches # the same values as what bitcoind generates try: decode = bitcoind.decoderawtransaction(hex_txn) except ConnectionResetError: # bitcoind sleeps on us sometimes, give it another chance. decode = bitcoind.decoderawtransaction(hex_txn) #print("Bitcoin code says:", end=''); pprint(decode) if dests: # check we got right destination address(es) for outn, expect_addr in dests: assert decode['vout'][outn]['scriptPubKey']['addresses'][0] == expect_addr # leverage bitcoind's transaction decoding ex = dict( lock_time = decode['locktime'], had_witness = False, # input txn doesn't have them, typical? num_inputs = len(decode['vin']), num_outputs = len(decode['vout']), miner_fee = U2SAT(fee), warnings_expected = num_warn, total_value_out = sum(U2SAT(i['value']) for i in decode['vout']), destinations = [(U2SAT(i['value']), i['scriptPubKey']['addresses'][0]) for i in decode['vout']], ) if change_outs is not None: ex['change_outs'] = set(change_outs) # need this for reliability time.sleep(0.01) # check we understood it right rv= sim_exec('import main; main.EXPECT = %r; ' % ex) if rv: pytest.fail(rv) rv = sim_execfile('devtest/check_decode.py') if rv: pytest.fail(rv) print(" [checks out against bitcoind] ") return decode return doit @pytest.fixture def try_sign_microsd(open_microsd, cap_story, pick_menu_item, goto_home, need_keypress, microsd_path): # like "try_sign" but use "air gapped" file transfer via microSD def doit(f_or_data, accept=True, finalize=False, accept_ms_import=False, complete=False, encoding='binary', del_after=0): if f_or_data[0:5] == b'psbt\xff': ip = f_or_data filename = 'memory' else: filename = f_or_data ip = open(f_or_data, 'rb').read() if ip[0:10] == b'70736274ff': ip = a2b_hex(ip.strip()) assert ip[0:5] == b'psbt\xff' psbtname = 'ftrysign' # population control from glob import glob; import os pat = microsd_path(psbtname+'*.psbt') for f in glob(pat): assert 'psbt' in f os.unlink(f) if encoding == 'hex': ip = b2a_hex(ip) elif encoding == 'base64': from base64 import b64encode, b64decode ip = b64encode(ip) else: assert encoding == 'binary' with open_microsd(psbtname+'.psbt', 'wb') as sd: sd.write(ip) goto_home() pick_menu_item('Ready To Sign') time.sleep(.1) _, story = cap_story() if 'Choose PSBT file' in story: need_keypress('y') time.sleep(.1) pick_menu_item(psbtname+'.psbt') time.sleep(.1) if accept_ms_import: # XXX would be better to do cap_story here, but that would limit test to simulator need_keypress('y') time.sleep(0.050) title, story = cap_story() assert title == 'OK TO SEND?' if accept != None: need_keypress('y' if accept else 'x') if accept == False: time.sleep(0.050) # look for "Aborting..." ?? return ip, None, None # wait for it to finish for r in range(10): time.sleep(0.1) title, story = cap_story() if title == 'PSBT Signed': break else: assert False, 'timed out' txid = None lines = story.split('\n') if 'Final TXID:' in lines: txid = lines[-1] result_fname = lines[-4] else: result_fname = lines[-1] result = open_microsd(result_fname, 'rb').read() if encoding == 'hex' or finalize: result = a2b_hex(result.strip()) elif encoding == 'base64': result = b64decode(result) else: assert encoding == 'binary' in_file = microsd_path(psbtname+'.psbt') # read back final product if finalize: if del_after: if not txid: txid = re.findall('[0-9a-f]{64}', result_fname)[0] assert result_fname == txid+'.txn' assert not os.path.exists(in_file) else: assert 'final' in result_fname assert os.path.exists(in_file) from pycoin.tx.Tx import Tx # parse it a little assert result[0:4] != b'psbt', 'still a PSBT, but asked for finalize' t = Tx.from_bin(result) assert t.version in [1, 2] assert t.id() == txid else: assert result[0:5] == b'psbt\xff' if complete: assert '-signed' in result_fname else: assert '-part' in result_fname if del_after: assert not os.path.exists(in_file) from psbt import BasicPSBT was = BasicPSBT().parse(ip) now = BasicPSBT().parse(result) assert was.txn == now.txn assert was != now return ip, result, txid return doit @pytest.fixture def try_sign(start_sign, end_sign): def doit(filename_or_data, accept=True, finalize=False, accept_ms_import=False): ip = start_sign(filename_or_data, finalize=finalize) return ip, end_sign(accept, finalize=finalize, accept_ms_import=accept_ms_import) return doit @pytest.fixture def start_sign(dev): def doit(filename, finalize=False, stxn_flags=0x0): if filename[0:5] == b'psbt\xff': ip = filename filename = 'memory' else: ip = open(filename, 'rb').read() if ip[0:10] == b'70736274ff': ip = a2b_hex(ip.strip()) assert ip[0:5] == b'psbt\xff' ll, sha = dev.upload_file(ip) dev.send_recv(CCProtocolPacker.sign_transaction(ll, sha, finalize, flags=stxn_flags)) return ip return doit @pytest.fixture def end_sign(dev, need_keypress): from ckcc_protocol.protocol import CCUserRefused def doit(accept=True, in_psbt=None, finalize=False, accept_ms_import=False, expect_txn=True): if accept_ms_import: # XXX would be better to do cap_story here, but that would limit test to simulator need_keypress('y') time.sleep(0.050) if accept != None: need_keypress('y' if accept else 'x', timeout=None) if accept == False: with pytest.raises(CCUserRefused): done = None while done == None: time.sleep(0.050) done = dev.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None) return else: done = None while done == None: time.sleep(0.00) done = dev.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None) assert len(done) == 2 resp_len, chk = done psbt_out = dev.download_file(resp_len, chk) if not expect_txn: # skip checks; it's text return psbt_out sigs = [] if not finalize: from psbt import BasicPSBT tp = BasicPSBT().parse(psbt_out) assert tp is not None for i in tp.inputs: sigs.extend(i.part_sigs.values()) else: from pycoin.tx.Tx import Tx # parse it res = psbt_out assert res[0:4] != b'psbt', 'still a PSBT, but asked for finalize' t = Tx.from_bin(res) assert t.version in [1, 2] # TODO: pull out signatures from signed txn, but pycoin not helpful on that for sig in sigs: assert len(sig) <= 71, "overly long signature observed" return psbt_out return doit # use these for hardware version support @pytest.fixture(scope='session') def is_mark1(request): return int(request.config.getoption('--mk')) == 1 @pytest.fixture(scope='session') def is_mark2(request): return int(request.config.getoption('--mk')) == 2 @pytest.fixture(scope='session') def is_mark3(request): return int(request.config.getoption('--mk')) == 3 # useful fixtures related to multisig from test_multisig import (import_ms_wallet, make_multisig, offer_ms_import, fake_ms_txn, make_ms_address, clear_ms, make_myself_wallet) from test_bip39pw import set_bip39_pw, clear_bip39_pw #EOF