# (c) Copyright 2022 by Coinkite Inc. This file is covered by license found in COPYING-CC. """ Run conveniently tests against simulator. Tests are run module after module. If any tests fail, it will try to re-run those failed test with fresh simulator. Has to be run from firmware/testing directory. Do not forget to comment/uncomment line in pytest.ini. . ENV/bin/activate python run_sim_tests.py --help python run_sim_tests.py --veryslow # run ONLY very slow tests python run_sim_tests.py --onetime # run ONLY onetime tests (each will get its own simulator) python run_sim_tests.py --onetime --veryslow # run both onetime and very slow python run_sim_tests.py -m test_nfc.py # run only nfc tests python run_sim_tests.py -m test_nfc.py -m test_hsm.py # run nfc and hsm tests python run_sim_tests.py -m all # run all tests but not onetime and not very slow (cca 40 minutes) python run_sim_tests.py # same as with '-m all' above --> most useful python run_sim_tests.py -m all --onetime --veryslow # run all tests (cca 252 minutes) python run_sim_tests.py -m test_multisig.py -k cosigning # run only tests that match expression from test_multisig.py python run_sim_tests.py -m test_export.py --pdb # run only export tests and attach debugger python run_sim_tests.py -m test_attended.py --q1 -w 6 --login # run attended test + all login tests python run_sim_tests.py -w 6 --q1 --headless # run in headless mode (skips QR code checks) Onetime/veryslow tests are completely separated form the rest of the test suite. When using -m/--module do not expect the --onetime/--veryslow to apply. If --onetime/--veryslow is specified, these test will run at the end or alone. python run_sim_tests.py --collect onetime # just print all onetime tests to stdout python run_sim_tests.py --collect veryslow # just print all veryslow tests to stdout python run_sim_tests.py --collect manual # just print all manual tests to stdout Make sure to run manual test if you want to state that your changes passed all the tests. Testing on multiple simulators in parallel python run_sim_tests.py --q1 --multiproc # to run all Q tests in parallel (default num-proc=14 simulators) python run_sim_tests.py --multiproc --num-proc 6 # to run all Mk4 tests in parallel max 6 simulators at once python run_sim_tests.py -m test_addr.py -m test_bbqr.py --multiproc # just desired test python run_sim_tests.py --q1 -m test_sign.py --multiproc # just desired test python run_sim_tests --multiproc --turbo # turbo causes both Mk4 & Q tests to run simultaneously (turbo doubles num-procs) python run_sim_tests --multiproc --turbo # all Mk4 & Q tests run in 60 minutes total!! python run_sim_tests --multiproc --turbo -m test_addr.py -m test_ux.py # will spawn 4 simulators: one Q and one Mk4 for address tests & one Q and one Mk4 for ux tests Console output has some useful info: * when job is started it will print its PID * when job is done you'll get elapsed time from start (test duration) * when all is done - complete test session duration ``` $ python run_sim_tests.py -m test_addr.py -m test_drv_entro.py -m test_usb.py --multiproc --turbo started: Mk4 test_addr.py 38824 started: Q test_addr.py 38935 started: Mk4 test_drv_entro.py 39042 started: Q test_drv_entro.py 39150 started: Mk4 test_usb.py 39257 started: Q test_usb.py 39364 done: Mk4 test_usb.py 0:00:06.043072 done: Q test_usb.py 0:00:06.081147 done: Mk4 test_addr.py 0:00:51.141250 done: Q test_addr.py 0:01:03.185571 done: Mk4 test_drv_entro.py 0:03:24.234521 done: Q test_drv_entro.py 0:03:30.278795 elapsed: 0:03:50.308146 ``` After jobs are finished, or even during execution you can inspect `/tmp/cc-simulators` directory: * contains simulator work directories named as of specific simulator * log directories where pytest output is piped * mk4_logs * q1_logs ``` $ pwd /tmp/cc-simulators $ ls 38824 38935 39042 39150 39257 39364 mk4_logs q1_logs $ ls 39042/* 39042/debug: last-qr.png 39042/MicroSD: drv-hex-idx0-2.txt drv-pw-idx0.txt drv-words-idx0-2.txt drv-words-idx0.txt drv-hex-idx0.txt drv-wif-idx0.txt drv-words-idx0-3.txt drv-xprv-idx0.txt 39042/settings: 39042/VirtDisk: README.md $ ls mk4_logs/ test_addr.py.log test_drv_entro.py.log test_usb.py.log ``` To parse only failures use below cmd in {mk4,q1}_logs directory: ``` for f in $(ls); do x=`grep -n "short test summary info" $f | grep -Eo '^[^:]+'`; if [ -n "$x" ];then tail -n +"$x" $f | grep -E '^FAILED|^ERROR';fi ;done ``` """ import os, time, glob, json, pytest, atexit, signal, argparse, subprocess, contextlib, shutil from datetime import timedelta from typing import List from pytest import ExitCode SIM_INIT_WAIT = 2 # 2 seconds, can be tweaked via cmdline arguments ( -w 6 ) DEFAULT_PYTEST_MARKS = "not onetime and not veryslow and not manual" @contextlib.contextmanager def pushd(new_dir): previous_dir = os.getcwd() os.chdir(new_dir) try: yield finally: os.chdir(previous_dir) def clean_directory(pth): for root, dirs, files in os.walk(pth): for f in files: os.unlink(os.path.join(root, f)) for d in dirs: shutil.rmtree(os.path.join(root, d)) def remove_all_client_sockets(): with pushd("/tmp"): for fn in glob.glob("ckcc-client*.sock"): os.remove(fn) def remove_cautious(fpath: str) -> None: if os.path.basename(fpath) in ["README.md", ".gitignore"]: # Do not remove README.md or .gitignore" return os.remove(fpath) def clean_sim_data(): with pushd("../unix/work"): for path, dirnames, filenames in os.walk("."): for filename in filenames: filepath = os.path.join(path, filename) remove_cautious(filepath) print("Work directory cleaned up") def collect_marked_tests(mark: str) -> List[str]: plugin = PytestCollectMarked(mark=mark) with open(os.devnull, 'w') as dev_null: with contextlib.redirect_stdout(dev_null): pytest.main( ['-m', plugin.mark, '--collect-only', "--no-header", "--no-summary"], plugins=[plugin] ) return plugin.collected def get_last_failed() -> List[str]: with open(".pytest_cache/v/cache/lastfailed", "r") as f: res = f.read() last_failed = json.loads(res) return list(last_failed.keys()) def is_ok(ec: ExitCode) -> bool: if ec in [ExitCode.OK, ExitCode.NO_TESTS_COLLECTED]: return True return False def _run_pytest_tests(test_module: str, pytest_marks: str, pytest_k: str, pdb: bool, failed_first: bool, psbt2=False, is_Q=False, headless=False, sim_socket=None) -> ExitCode: cmd_list = [ "--cache-clear", "-m", pytest_marks, "--sim", test_module if test_module is not None else "" ] if pytest_k: cmd_list += ["-k", pytest_k] if pdb: cmd_list.append("--pdb") if failed_first: cmd_list.append("--ff") if psbt2: cmd_list.append("--psbt2") if is_Q: cmd_list.insert(0, "--Q") # only changes behavior in login_settings_test if headless: cmd_list.append("--headless") if sim_socket: cmd_list.append("--sim-socket") cmd_list.append(sim_socket) return pytest.main(cmd_list) def _run_coldcard_tests(test_module: str, simulator_args: List[str], pytest_k: str, pdb: bool, failed_first: bool, psbt2=False, is_Q=False, headless=False, pytest_marks: str = DEFAULT_PYTEST_MARKS, sim_segregate=False) -> ExitCode: sock_path = None if simulator_args is not None: sim = ColdcardSimulator(args=simulator_args, headless=headless, segregate=sim_segregate) sim.start() time.sleep(1) sock_path = sim.socket exit_code = _run_pytest_tests(test_module, pytest_marks, pytest_k, pdb, failed_first, psbt2, is_Q, headless, sock_path) if simulator_args is not None: sim.stop() time.sleep(1) clean_sim_data() remove_all_client_sockets() return exit_code def run_coldcard_tests(test_module=None, simulator_args=None, pytest_k=None, pdb=False, failed_first=False, psbt2=False, is_Q=False, headless=False, pytest_marks=DEFAULT_PYTEST_MARKS): failed = [] exit_code = _run_coldcard_tests(test_module, simulator_args, pytest_k, pdb, failed_first, psbt2, is_Q, headless, pytest_marks) if not is_ok(exit_code): # no success, no nothing - give failed another try, each alone with its own simulator last_failed = get_last_failed() print("Running failed from last run", last_failed) exit_codes = [] for failed_test in last_failed: exit_code_2 = _run_coldcard_tests(failed_test, simulator_args, pytest_k, pdb, failed_first, psbt2, is_Q, headless, pytest_marks) exit_codes.append(exit_code_2) if not is_ok(exit_code_2): failed.append(failed_test) if all([ec == ExitCode.OK for ec in exit_codes]): exit_code = ExitCode.OK return exit_code, failed class PytestCollectMarked: def __init__(self, mark): self.mark = mark self.collected = [] def pytest_collection_modifyitems(self, items): for item in items: for marker in item.own_markers: if marker.name == self.mark: self.collected.append(item.nodeid) class ColdcardSimulator: def __init__(self,args=None, headless=False, segregate=False): self.proc = None self.args = args self.headless = headless self.segregate = segregate self.socket = "/tmp/ckcc-simulator.sock" def start(self, start_wait=None): # here we are in testing directory cmd_list = [ "python", "simulator.py" ] if self.args is not None: cmd_list.extend(self.args) if self.headless: cmd_list.append("--headless") if self.segregate: cmd_list.append("--segregate") self.proc = subprocess.Popen( cmd_list, # this needs to be in firmware/unix - expected to be run from firmware/testing cwd="../unix", preexec_fn=os.setsid, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) time.sleep(start_wait or SIM_INIT_WAIT) if self.segregate: self.socket = "/tmp/ckcc-simulator-%d.sock" % self.proc.pid atexit.register(self.stop) def stop(self): pp = self.proc.poll() if pp is None: os.killpg(os.getpgid(self.proc.pid), signal.SIGTERM) os.waitpid(os.getpgid(self.proc.pid), 0) atexit.unregister(self.stop) def main(): parser = argparse.ArgumentParser(description="Run tests against simulated Coldcard") parser.add_argument("-w", "--sim-init-wait", type=int, help="Choose how much to sleep after simulator is started") parser.add_argument("-m", "--module", action="append", help="Choose only n modules to run") parser.add_argument("--pdb", action="store_true", help="Go to debugger on failure") parser.add_argument("--q1", action="store_true", help="Simulate a Q instead of Mk5 COLDCARD") parser.add_argument("--mk4", action="store_true", help="Simulate a Mk4 instead of Mk5 COLDCARD") parser.add_argument("--psbt2", action="store_true", help="`fake_txn` produces PSBTv2") parser.add_argument("--ff", action="store_true", help="Run the last failures first") parser.add_argument("--onetime", action="store_true", default=False, help="run tests marked as 'onetime'") parser.add_argument("--veryslow", action="store_true", default=False, help="run 'login_settings_tests.py'") parser.add_argument("--login", action="store_true", default=False, help="run 'login_settings_tests'") parser.add_argument("--clone", action="store_true", default=False, help="run 'clone_tests'") parser.add_argument("--seedless", action="store_true", default=False, help="run 'seedless_tests'") parser.add_argument("--collect", type=str, metavar="MARK", help="Collect marked test and print them to stdout") parser.add_argument("-k", "--pytest-k", type=str, metavar="EXPRESSION", default=None, help="only run tests which match the given substring expression") parser.add_argument("--headless", action="store_true", default=False, help="run simulator instance in headless mode") parser.add_argument("--multiproc", action="store_true", default=False, help="Run tests & simulators in parallel") parser.add_argument("--num-proc", type=int, default=14, help="How many executors/simulators to run in parallel in --multiproc mode") parser.add_argument("--turbo", action="store_true", default=False, help="Both Mk4 and Q at the same time") args = parser.parse_args() if args.sim_init_wait: global SIM_INIT_WAIT SIM_INIT_WAIT = args.sim_init_wait if args.collect: # when collect is in argument - do just collect and exit print(collect_marked_tests(args.collect)) return if args.module is None and (args.onetime is False and args.veryslow is False and args.login is False and args.clone is False and args.seedless is False): args.module = ["all"] DEFAULT_SIMULATOR_ARGS = ["--eff", "--set", "nfc=1"] if args.q1: DEFAULT_SIMULATOR_ARGS.append('--q1') if args.module is None: test_modules = [] elif len(args.module) == 1 and args.module[0].lower() == "all": test_modules = glob.glob("test_*.py") assert test_modules, "please run in ../testing subdir" else: for fn in args.module: if not os.path.exists(fn): raise RuntimeError(f"{fn} does not exist") test_modules = args.module # test_pincodes.py can only be run against real device # test_rng.py not needed when using simulator # test_rolls.py should be run alone as it does not need simulator # set diff test_modules = set(test_modules) - {"test_rng.py", "test_pincodes.py", "test_rolls.py"} module_args = [] for test_module in sorted(list(test_modules)): sim_args = DEFAULT_SIMULATOR_ARGS if test_module in ["test_bsms.py", "test_address_explorer.py", "test_export.py", "test_multisig.py", "test_ux.py", "test_wif.py"]: sim_args = DEFAULT_SIMULATOR_ARGS + ["--set", "vidsk=1"] if test_module == "test_vdisk.py": sim_args = ["--eject"] + DEFAULT_SIMULATOR_ARGS + ["--set", "vidsk=1"] if test_module == "test_bip39pw.py": sim_args = [] if test_module in ["test_unit.py", "test_se2.py", "test_backup.py", "test_teleport.py", "test_hobble.py", "test_sssp.py"]: # test_nvram_mk4 needs to run without --eff # se2 duress wallet activated as ephemeral seed requires proper `settings.load` sim_args = ["--set", "nfc=1"] if test_module in ["test_ephemeral.py", "test_notes.py", "test_ccc.py"]: # proper `settings.load` _ virtual disk sim_args = ["--set", "nfc=1", "--set", "vidsk=1"] # by default Mk5 is run if args.q1 and '--q1' not in sim_args: sim_args.append('--q1') elif args.mk4 and '--mk4' not in sim_args: sim_args.append("--mk4") module_args.append((test_module, sim_args, args.pytest_k, args.pdb, args.ff, args.psbt2, args.q1, args.headless)) if args.multiproc: start_time = time.time() def add_to_queue(module_name, simulator_args, queue): if module_name == "test_multisig.py": # split takes too much time queue.append((0, [module_name, simulator_args, "not tutorial and not airgapped and not ms_address and not descriptor_export", ""])) queue.append((0, [module_name, simulator_args, "airgapped", "-sep1"])) queue.append((0, [module_name, simulator_args, "tutorial", "-sep2"])) queue.append((0, [module_name, simulator_args, "ms_address", "-sep3"])) queue.append((0, [module_name, simulator_args, "descriptor_export", "-sep4"])) elif module_name == "test_seed_xor.py": # split takes too much time queue.append((0, [module_name, simulator_args, "test_import_xor", "-sep1"])) queue.append((0, [module_name, simulator_args, "not test_import_xor", ""])) elif module_name in ["test_export.py", "test_ephemeral.py", "test_sign.py", "test_msg.py", "test_backup.py"]: # higher priority queue.append((1, [module_name, simulator_args, None, ""])) else: # standard priority queue.append((2, [module_name, simulator_args, None, ""])) # will clear everything there from previous runs tmp_dir = "/tmp/cc-simulators" clean_directory(tmp_dir) # clean it mk4_log_dir = f"{tmp_dir}/mk4_logs" mk5_log_dir = f"{tmp_dir}/mk5_logs" q1_log_dir = f"{tmp_dir}/q1_logs" os.makedirs(mk4_log_dir, exist_ok=True) os.makedirs(mk5_log_dir, exist_ok=True) os.makedirs(q1_log_dir, exist_ok=True) q = [] # build priority queue for mod_name, sim_args, *_ in module_args: if args.turbo: if "--q1" in sim_args: add_to_queue(mod_name, sim_args, q) add_to_queue(mod_name, [i for i in sim_args if i == "--q1"], q) else: add_to_queue(mod_name, sim_args, q) add_to_queue(mod_name, sim_args + ["--q1"], q) else: add_to_queue(mod_name, sim_args, q) # sort queue by priority, highest priority elements at the end q = [i[1] for i in sorted(q, reverse=True)] num_proc = args.num_proc if args.turbo: # double num-proc num_proc *= 2 procs = [] while True: # create as many processes as allowed by --num-proc (default=14) if q and (len(procs) < num_proc): # start simulators first q_chunks = [] for _ in range (num_proc - len(procs)): try: mn, sim_args, k, mod_add = q.pop() # remove element except IndexError: # priority queue is empty break sim = ColdcardSimulator(sim_args, segregate=True) sim.start(start_wait=0) if "--q1" in sim_args: ld = q1_log_dir elif "--mk4" in sim_args: ld = mk4_log_dir else: ld = mk5_log_dir q_chunks.append((sim, mn, mod_add, k, ld)) time.sleep(5) for sim, mn, mod_add, k, log_dir in q_chunks: assert sim.socket out_log_path = f"{log_dir}/%s.log" % (mn + mod_add) out_fd = open(out_log_path, "w") cmd_list = ["pytest", "--cache-clear", "-m", DEFAULT_PYTEST_MARKS, "--sim", mn, "--sim-socket", sim.socket] if k: cmd_list.extend(["-k", k]) p = subprocess.Popen(cmd_list, preexec_fn=os.setsid, stdout=out_fd, stderr=out_fd) if "q1" in log_dir: mark = "Q" elif "mk5" in log_dir: mark = "Mk5" else: mark = "Mk4" procs.append((mn+mod_add, p, out_fd, sim, mark, time.time())) print(f'started: {mark:<6}{mn+mod_add:<30}{sim.socket.split("-")[-1].split(".")[0]:<10}') if not procs and not q: # done break i = 0 while i < len(procs): mn, p, out_fd, sim, mark, st = procs[i] if p.poll() is None: # still running i += 1 continue else: # done p.communicate() out_fd.close() sim.stop() del procs[i] print(f"done: {mark:<6}{mn:<30}{str(timedelta(seconds=time.time()-st)):<15}") time.sleep(3) # multiprocess done print(f"\n\nelapsed: {str(timedelta(seconds=time.time()-start_time))}") return result = [] for arguments in module_args: test_module = arguments[0] print("Started", test_module) ec, failed_tests = run_coldcard_tests(*arguments) result.append((test_module, ec, failed_tests)) print("Done", test_module) print(80 * "=") # run veryslow is specified if args.veryslow: print("started veryslow tests") ec, failed_tests = run_coldcard_tests(test_module=None, pytest_marks="veryslow", pytest_k=args.pytest_k, pdb=args.pdb, simulator_args=DEFAULT_SIMULATOR_ARGS, failed_first=args.ff, psbt2=args.psbt2, headless=args.headless) result.append(("veryslow", ec, failed_tests)) # run onetime is specified (each test against its own simulator) if args.onetime: print("started onetime tests") onetime_tests = collect_marked_tests("onetime") for onetime_test in onetime_tests: ec, failed_tests = run_coldcard_tests(test_module=onetime_test, pdb=args.pdb, failed_first=args.ff, pytest_marks="onetime", simulator_args=DEFAULT_SIMULATOR_ARGS, psbt2=args.psbt2, headless=args.headless) result.append((f"onetime: {onetime_test}", ec, failed_tests)) if args.login: print("start login settings tests") ec, failed_tests = run_coldcard_tests(test_module="login_settings_tests.py", pdb=args.pdb, failed_first=args.ff, pytest_k=args.pytest_k, is_Q=True if args.q1 else False, headless=args.headless) result.append((f"login_settings_tests", ec, failed_tests)) if args.clone: print("start clone tests") ec, failed_tests = run_coldcard_tests(test_module="clone_tests.py", pdb=args.pdb, failed_first=args.ff, pytest_k=args.pytest_k, headless=args.headless) result.append((f"clone_tests", ec, failed_tests)) if args.seedless: print("start seedless tests") ec, failed_tests = run_coldcard_tests(test_module="seedless_tests.py", pdb=args.pdb, failed_first=args.ff, pytest_k=args.pytest_k, headless=args.headless) result.append((f"seedless_tests", ec, failed_tests)) print("All done") any_failed = False for module, ec, failed in result: if not failed: continue print(f"FAILED {module:40s} {failed}") any_failed = True if any_failed is False: print("SUCCESS") print() if __name__ == "__main__": main() # sim = ColdcardSimulator(args=["--eff", "--segregate"]) # sim.start() # import pdb;pdb.set_trace() # x = 5 # EOF