From e594a0b4b3c85b5498fe7a20d3365b40e4216eb1 Mon Sep 17 00:00:00 2001 From: Andrew Chow Date: Thu, 14 Feb 2019 22:43:45 -0500 Subject: [PATCH 1/3] Allow specifiying a different interface for using HWI in tests --- test/run_tests.py | 11 ++--- test/test_coldcard.py | 29 +++++++------ test/test_device.py | 89 ++++++++++++++++++++++---------------- test/test_digitalbitbox.py | 45 +++++++++---------- test/test_keepkey.py | 66 ++++++++++++++++------------ test/test_ledger.py | 29 +++++++------ test/test_trezor.py | 66 ++++++++++++++++------------ 7 files changed, 187 insertions(+), 148 deletions(-) diff --git a/test/run_tests.py b/test/run_tests.py index 0bf8561..a7afea1 100755 --- a/test/run_tests.py +++ b/test/run_tests.py @@ -32,6 +32,7 @@ dbb_group.add_argument('--no_bitbox', help='Do not run Digital Bitbox test with dbb_group.add_argument('--bitbox', help='Path to Digital bitbox simulator.', default='work/mcu/build/bin/simulator') parser.add_argument('--bitcoind', help='Path to bitcoind.', default='work/bitcoin/src/bitcoind') +parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli'], default='library') args = parser.parse_args() # Run tests @@ -45,14 +46,14 @@ if not args.no_trezor or not args.no_coldcard or args.ledger or not args.no_bitb rpc, userpass = start_bitcoind(args.bitcoind) if not args.no_trezor: - suite.addTest(trezor_test_suite(args.trezor, rpc, userpass)) + suite.addTest(trezor_test_suite(args.trezor, rpc, userpass, args.interface)) if not args.no_coldcard: - suite.addTest(coldcard_test_suite(args.coldcard, rpc, userpass)) + suite.addTest(coldcard_test_suite(args.coldcard, rpc, userpass, args.interface)) if args.ledger: - suite.addTest(ledger_test_suite(rpc, userpass)) + suite.addTest(ledger_test_suite(rpc, userpass, args.interface)) if not args.no_bitbox: - suite.addTest(digitalbitbox_test_suite(rpc, userpass, args.bitbox)) + suite.addTest(digitalbitbox_test_suite(rpc, userpass, args.bitbox, args.interface)) if not args.no_keepkey: - suite.addTest(keepkey_test_suite(args.keepkey, rpc, userpass)) + suite.addTest(keepkey_test_suite(args.keepkey, rpc, userpass, args.interface)) result = unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite) sys.exit(not result.wasSuccessful()) diff --git a/test/test_coldcard.py b/test/test_coldcard.py index 19c52d2..d111765 100755 --- a/test/test_coldcard.py +++ b/test/test_coldcard.py @@ -12,7 +12,7 @@ from hwilib.devices.ckcc.protocol import CCProtocolPacker from hwilib.devices.ckcc.client import ColdcardDevice from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignMessage, TestSignTx -def coldcard_test_suite(simulator, rpc, userpass): +def coldcard_test_suite(simulator, rpc, userpass, interface): # Start the Coldcard simulator simulator_proc = subprocess.Popen(['python3', os.path.basename(simulator)], cwd=os.path.dirname(simulator), stdout=subprocess.DEVNULL) # Wait for simulator to be up @@ -30,39 +30,39 @@ def coldcard_test_suite(simulator, rpc, userpass): # Coldcard specific management command tests class TestColdcardManCommands(DeviceTestCase): def test_setup(self): - result = process_commands(self.dev_args + ['setup']) + result = self.do_command(self.dev_args + ['setup']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Coldcard does not support software setup') self.assertEqual(result['code'], -9) def test_wipe(self): - result = process_commands(self.dev_args + ['wipe']) + result = self.do_command(self.dev_args + ['wipe']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Coldcard does not support wiping via software') self.assertEqual(result['code'], -9) def test_restore(self): - result = process_commands(self.dev_args + ['restore']) + result = self.do_command(self.dev_args + ['restore']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Coldcard does not support restoring via software') self.assertEqual(result['code'], -9) def test_backup(self): - result = process_commands(self.dev_args + ['backup']) + result = self.do_command(self.dev_args + ['backup']) self.assertTrue(result['success']) self.assertIn('The backup has been written to', result['message']) def test_pin(self): - result = process_commands(self.dev_args + ['promptpin']) + result = self.do_command(self.dev_args + ['promptpin']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Coldcard does not need a PIN sent from the host') self.assertEqual(result['code'], -9) - result = process_commands(self.dev_args + ['sendpin', '1234']) + result = self.do_command(self.dev_args + ['sendpin', '1234']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Coldcard does not need a PIN sent from the host') @@ -70,22 +70,23 @@ def coldcard_test_suite(simulator, rpc, userpass): # Generic device tests suite = unittest.TestSuite() - suite.addTest(DeviceTestCase.parameterize(TestColdcardManCommands, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', '')) - suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd')) - suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd')) - suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd')) - suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd')) - suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd')) + suite.addTest(DeviceTestCase.parameterize(TestColdcardManCommands, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', '', interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd', interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd', interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd', interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd', interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd', interface=interface)) return suite if __name__ == '__main__': parser = argparse.ArgumentParser(description='Test Coldcard implementation') parser.add_argument('simulator', help='Path to the Coldcard simulator') parser.add_argument('bitcoind', help='Path to bitcoind binary') + parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli'], default='library') args = parser.parse_args() # Start bitcoind rpc, userpass = start_bitcoind(args.bitcoind) - suite = coldcard_test_suite(args.simulator, rpc, userpass) + suite = coldcard_test_suite(args.simulator, rpc, userpass, args.interface) unittest.TextTestRunner(verbosity=2).run(suite) diff --git a/test/test_device.py b/test/test_device.py index 22d364c..30e0765 100644 --- a/test/test_device.py +++ b/test/test_device.py @@ -1,6 +1,7 @@ #! /usr/bin/env python3 import atexit +import json import os import shutil import subprocess @@ -50,7 +51,7 @@ def start_bitcoind(bitcoind_path): return (rpc, userpass) class DeviceTestCase(unittest.TestCase): - def __init__(self, rpc, rpc_userpass, type, path, fingerprint, master_xpub, password = '', emulator=None, methodName='runTest'): + def __init__(self, rpc, rpc_userpass, type, path, fingerprint, master_xpub, password = '', emulator=None, interface='library', methodName='runTest'): super(DeviceTestCase, self).__init__(methodName) self.rpc = rpc self.rpc_userpass = rpc_userpass @@ -66,16 +67,30 @@ class DeviceTestCase(unittest.TestCase): self.emulator = DeviceEmulator() if password: self.dev_args.extend(['-p', password]) + self.interface = interface @staticmethod - def parameterize(testclass, rpc, rpc_userpass, type, path, fingerprint, master_xpub, password = '', emulator=None): + def parameterize(testclass, rpc, rpc_userpass, type, path, fingerprint, master_xpub, password = '', interface='library', emulator=None): testloader = unittest.TestLoader() testnames = testloader.getTestCaseNames(testclass) suite = unittest.TestSuite() for name in testnames: - suite.addTest(testclass(rpc, rpc_userpass, type, path, fingerprint, master_xpub, password, emulator, name)) + suite.addTest(testclass(rpc, rpc_userpass, type, path, fingerprint, master_xpub, password, emulator, interface, name)) return suite + def do_command(self, args): + if self.interface == 'cli': + proc = subprocess.Popen(['hwi ' + ' '.join(args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True) + result = proc.communicate() + return json.loads(result[0].decode()) + else: + return process_commands(args) + + def get_password_args(self): + if self.password: + return ['-p', self.password] + return [] + def __str__(self): return '{}: {}'.format(self.type, super().__str__()) @@ -90,7 +105,7 @@ class TestDeviceConnect(DeviceTestCase): self.emulator.stop() def test_enumerate(self): - enum_res = process_commands(['-p', self.password, 'enumerate']) + enum_res = self.do_command(self.get_password_args() + ['enumerate']) found = False for device in enum_res: if device['type'] == self.type and device['path'] == self.path and device['fingerprint'] == self.fingerprint: @@ -99,31 +114,31 @@ class TestDeviceConnect(DeviceTestCase): self.assertTrue(found) def test_no_type(self): - gmxp_res = process_commands(['getmasterxpub']) + gmxp_res = self.do_command(['getmasterxpub']) self.assertIn('error', gmxp_res) self.assertEqual(gmxp_res['error'], 'You must specify a device type or fingerprint for all commands except enumerate') self.assertIn('code', gmxp_res) self.assertEqual(gmxp_res['code'], -1) def test_path_type(self): - gmxp_res = process_commands(['-t', self.type, '-d', self.path, '-p', self.password, 'getmasterxpub']) + gmxp_res = self.do_command(self.get_password_args() + ['-t', self.type, '-d', self.path, 'getmasterxpub']) self.assertEqual(gmxp_res['xpub'], self.master_xpub) def test_fingerprint_autodetect(self): - gmxp_res = process_commands(['-f', self.fingerprint, '-p', self.password, 'getmasterxpub']) + gmxp_res = self.do_command(self.get_password_args() + ['-f', self.fingerprint, 'getmasterxpub']) self.assertEqual(gmxp_res['xpub'], self.master_xpub) # Nonexistent fingerprint - gmxp_res = process_commands(['-f', '0000ffff', '-p', self.password, 'getmasterxpub']) + gmxp_res = self.do_command(self.get_password_args() + ['-f', '0000ffff', 'getmasterxpub']) self.assertEqual(gmxp_res['error'], 'Could not find device with specified fingerprint') self.assertEqual(gmxp_res['code'], -3) def test_type_only_autodetect(self): - gmxp_res = process_commands(['-t', self.type, '-p', self.password, 'getmasterxpub']) + gmxp_res = self.do_command(self.get_password_args() + ['-t', self.type, 'getmasterxpub']) self.assertEqual(gmxp_res['xpub'], self.master_xpub) # Unknown device type - gmxp_res = process_commands(['-t', 'fakedev', '-d', 'fakepath', 'getmasterxpub']) + gmxp_res = self.do_command(['-t', 'fakedev', '-d', 'fakepath', 'getmasterxpub']) self.assertEqual(gmxp_res['error'], 'Unknown device type specified') self.assertEqual(gmxp_res['code'], -4) @@ -142,17 +157,17 @@ class TestGetKeypool(DeviceTestCase): self.emulator.stop() def test_getkeypool_bad_args(self): - result = process_commands(self.dev_args + ['getkeypool', '--sh_wpkh', '--wpkh', '0', '20']) + result = self.do_command(self.dev_args + ['getkeypool', '--sh_wpkh', '--wpkh', '0', '20']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['code'], -7) def test_getkeypool(self): - non_keypool_desc = process_commands(self.dev_args + ['getkeypool', '0', '20']) + non_keypool_desc = self.do_command(self.dev_args + ['getkeypool', '0', '20']) import_result = self.wpk_rpc.importmulti(non_keypool_desc) self.assertTrue(import_result[0]['success']) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '0', '20']) import_result = self.wpk_rpc.importmulti(keypool_desc) self.assertFalse(import_result[0]['success']) @@ -162,75 +177,75 @@ class TestGetKeypool(DeviceTestCase): addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress()) self.assertEqual(addr_info['hdkeypath'], "m/44'/1'/0'/0/{}".format(i)) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--internal', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--internal', '0', '20']) import_result = self.wrpc.importmulti(keypool_desc) self.assertTrue(import_result[0]['success']) for i in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress()) self.assertEqual(addr_info['hdkeypath'], "m/44'/1'/0'/1/{}".format(i)) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '0', '20']) import_result = self.wrpc.importmulti(keypool_desc) self.assertTrue(import_result[0]['success']) for i in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress()) self.assertEqual(addr_info['hdkeypath'], "m/49'/1'/0'/0/{}".format(i)) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--internal', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--internal', '0', '20']) import_result = self.wrpc.importmulti(keypool_desc) self.assertTrue(import_result[0]['success']) for i in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress()) self.assertEqual(addr_info['hdkeypath'], "m/49'/1'/0'/1/{}".format(i)) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '0', '20']) import_result = self.wrpc.importmulti(keypool_desc) self.assertTrue(import_result[0]['success']) for i in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress()) self.assertEqual(addr_info['hdkeypath'], "m/84'/1'/0'/0/{}".format(i)) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '--internal', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '--internal', '0', '20']) import_result = self.wrpc.importmulti(keypool_desc) self.assertTrue(import_result[0]['success']) for i in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress()) self.assertEqual(addr_info['hdkeypath'], "m/84'/1'/0'/1/{}".format(i)) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--account', '3', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--account', '3', '0', '20']) import_result = self.wrpc.importmulti(keypool_desc) self.assertTrue(import_result[0]['success']) for i in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress()) self.assertEqual(addr_info['hdkeypath'], "m/49'/1'/3'/0/{}".format(i)) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--internal', '--account', '3', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--internal', '--account', '3', '0', '20']) import_result = self.wrpc.importmulti(keypool_desc) self.assertTrue(import_result[0]['success']) for i in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress()) self.assertEqual(addr_info['hdkeypath'], "m/49'/1'/3'/1/{}".format(i)) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '--account', '3', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '--account', '3', '0', '20']) import_result = self.wrpc.importmulti(keypool_desc) self.assertTrue(import_result[0]['success']) for i in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress()) self.assertEqual(addr_info['hdkeypath'], "m/84'/1'/3'/0/{}".format(i)) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '--internal', '--account', '3', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '--internal', '--account', '3', '0', '20']) import_result = self.wrpc.importmulti(keypool_desc) self.assertTrue(import_result[0]['success']) for i in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress()) self.assertEqual(addr_info['hdkeypath'], "m/84'/1'/3'/1/{}".format(i)) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--path', 'm/0h/0h/4h/*', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--path', 'm/0h/0h/4h/*', '0', '20']) import_result = self.wrpc.importmulti(keypool_desc) self.assertTrue(import_result[0]['success']) for i in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress()) self.assertEqual(addr_info['hdkeypath'], "m/0'/0'/4'/{}".format(i)) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--path', '/0h/0h/4h/*', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--path', '/0h/0h/4h/*', '0', '20']) self.assertEqual(keypool_desc['error'], 'Path must start with m/') self.assertEqual(keypool_desc['code'], -7) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--path', 'm/0h/0h/4h/', '0', '20']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--path', 'm/0h/0h/4h/', '0', '20']) self.assertEqual(keypool_desc['error'], 'Path must end with /*') self.assertEqual(keypool_desc['code'], -7) @@ -251,7 +266,7 @@ class TestSignTx(DeviceTestCase): def _generate_and_finalize(self, unknown_inputs, psbt): if not unknown_inputs: # Just do the normal signing process to test "all inputs" case - sign_res = process_commands(self.dev_args + ['signtx', psbt['psbt']]) + sign_res = self.do_command(self.dev_args + ['signtx', psbt['psbt']]) finalize_res = self.wrpc.finalizepsbt(sign_res['psbt']) else: # Sign only input one on first pass @@ -280,11 +295,11 @@ class TestSignTx(DeviceTestCase): second_psbt = second_psbt.serialize() # First will always have something to sign - first_sign_res = process_commands(self.dev_args + ['signtx', first_psbt]) + first_sign_res = self.do_command(self.dev_args + ['signtx', first_psbt]) self.assertTrue(single_input == self.wrpc.finalizepsbt(first_sign_res['psbt'])['complete']) # Second may have nothing to sign (1 input case) # and also may throw an error(e.g., ColdCard) - second_sign_res = process_commands(self.dev_args + ['signtx', second_psbt]) + second_sign_res = self.do_command(self.dev_args + ['signtx', second_psbt]) if 'psbt' in second_sign_res: self.assertTrue(not self.wrpc.finalizepsbt(second_sign_res['psbt'])['complete']) combined_psbt = self.wrpc.combinepsbt([first_sign_res['psbt'], second_sign_res['psbt']]) @@ -300,10 +315,10 @@ class TestSignTx(DeviceTestCase): def _test_signtx(self, input_type, multisig): # Import some keys to the watch only wallet and send coins to them - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '30', '40']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '30', '40']) import_result = self.wrpc.importmulti(keypool_desc) self.assertTrue(import_result[0]['success']) - keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--internal', '30', '40']) + keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--internal', '30', '40']) import_result = self.wrpc.importmulti(keypool_desc) self.assertTrue(import_result[0]['success']) sh_wpkh_addr = self.wrpc.getnewaddress('', 'p2sh-segwit') @@ -397,18 +412,18 @@ class TestDisplayAddress(DeviceTestCase): self.emulator.stop() def test_display_address_bad_args(self): - result = process_commands(self.dev_args + ['displayaddress', '--sh_wpkh', '--wpkh', '--path', 'm/49h/1h/0h/0/0']) + result = self.do_command(self.dev_args + ['displayaddress', '--sh_wpkh', '--wpkh', '--path', 'm/49h/1h/0h/0/0']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['code'], -7) def test_display_address_path(self): - process_commands(self.dev_args + ['displayaddress', '--path', 'm/44h/1h/0h/0/0']) - process_commands(self.dev_args + ['displayaddress', '--sh_wpkh', '--path', 'm/49h/1h/0h/0/0']) - process_commands(self.dev_args + ['displayaddress', '--wpkh', '--path', 'm/84h/1h/0h/0/0']) + self.do_command(self.dev_args + ['displayaddress', '--path', 'm/44h/1h/0h/0/0']) + self.do_command(self.dev_args + ['displayaddress', '--sh_wpkh', '--path', 'm/49h/1h/0h/0/0']) + self.do_command(self.dev_args + ['displayaddress', '--wpkh', '--path', 'm/84h/1h/0h/0/0']) def test_display_address_bad_path(self): - result = process_commands(self.dev_args + ['displayaddress', '--path', 'f']) + result = self.do_command(self.dev_args + ['displayaddress', '--path', 'f']) self.assertEquals(result['code'], -7) def test_display_address_descriptor(self): @@ -454,8 +469,8 @@ class TestSignMessage(DeviceTestCase): self.emulator.stop() def test_sign_msg(self): - process_commands(self.dev_args + ['signmessage', 'Message signing test', 'm/44h/1h/0h/0/0']) + self.do_command(self.dev_args + ['signmessage', '"Message signing test"', 'm/44h/1h/0h/0/0']) def test_bad_path(self): - result = process_commands(self.dev_args + ['signmessage', 'Message signing test', 'f']) + result = self.do_command(self.dev_args + ['signmessage', '"Message signing test"', 'f']) self.assertEquals(result['code'], -7) diff --git a/test/test_digitalbitbox.py b/test/test_digitalbitbox.py index 2a86380..de9c960 100755 --- a/test/test_digitalbitbox.py +++ b/test/test_digitalbitbox.py @@ -13,7 +13,7 @@ from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestG from hwilib.cli import process_commands from hwilib.devices.digitalbitbox import BitboxSimulator, send_plain, send_encrypt -def digitalbitbox_test_suite(rpc, userpass, simulator): +def digitalbitbox_test_suite(rpc, userpass, simulator, interface): # Start the Digital bitbox simulator simulator_proc = subprocess.Popen(['./' + os.path.basename(simulator), '../../tests/sd_files/'], cwd=os.path.dirname(simulator), stderr=subprocess.DEVNULL) # Wait for simulator to be up @@ -44,27 +44,27 @@ def digitalbitbox_test_suite(rpc, userpass, simulator): # DigitalBitbox specific management command tests class TestDBBManCommands(DeviceTestCase): def test_restore(self): - result = process_commands(self.dev_args + ['restore']) + result = self.do_command(self.dev_args + ['restore']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Digital Bitbox does not support restoring via software') self.assertEqual(result['code'], -9) def test_pin(self): - result = process_commands(self.dev_args + ['promptpin']) + result = self.do_command(self.dev_args + ['promptpin']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Digital Bitbox does not need a PIN sent from the host') self.assertEqual(result['code'], -9) - result = process_commands(self.dev_args + ['sendpin', '1234']) + result = self.do_command(self.dev_args + ['sendpin', '1234']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Digital Bitbox does not need a PIN sent from the host') self.assertEqual(result['code'], -9) def test_display(self): - result = process_commands(self.dev_args + ['displayaddress', '--path', 'm/0h']) + result = self.do_command(self.dev_args + ['displayaddress', '--path', 'm/0h']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Digital Bitbox does not have a screen to display addresses on') @@ -72,72 +72,73 @@ def digitalbitbox_test_suite(rpc, userpass, simulator): def test_setup_wipe(self): # Device is init, setup should fail - result = process_commands(self.dev_args + ['setup', '--label', 'setup_test', '--backup_passphrase', 'testpass']) + result = self.do_command(self.dev_args + ['setup', '--label', 'setup_test', '--backup_passphrase', 'testpass']) self.assertEquals(result['code'], -10) self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again') # Wipe - result = process_commands(self.dev_args + ['wipe']) + result = self.do_command(self.dev_args + ['wipe']) self.assertTrue(result['success']) # Check arguments - result = process_commands(self.dev_args + ['setup', '--label', 'setup_test']) + result = self.do_command(self.dev_args + ['setup', '--label', 'setup_test']) self.assertEquals(result['code'], -7) self.assertEquals(result['error'], 'The label and backup passphrase for a new Digital Bitbox wallet must be specified and cannot be empty') - result = process_commands(self.dev_args + ['setup', '--backup_passphrase', 'testpass']) + result = self.do_command(self.dev_args + ['setup', '--backup_passphrase', 'testpass']) self.assertEquals(result['code'], -7) self.assertEquals(result['error'], 'The label and backup passphrase for a new Digital Bitbox wallet must be specified and cannot be empty') # Setup - result = process_commands(self.dev_args + ['setup', '--label', 'setup_test', '--backup_passphrase', 'testpass']) + result = self.do_command(self.dev_args + ['setup', '--label', 'setup_test', '--backup_passphrase', 'testpass']) self.assertTrue(result['success']) # Reset back to original send_encrypt(json.dumps({"seed":{"source":"backup","filename":"test_backup.pdf","key":"key"}}), '0000', dev) # Make sure device is init, setup should fail - result = process_commands(self.dev_args + ['setup', '--label', 'setup_test', '--backup_passphrase', 'testpass']) + result = self.do_command(self.dev_args + ['setup', '--label', 'setup_test', '--backup_passphrase', 'testpass']) self.assertEquals(result['code'], -10) self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again') def test_backup(self): # Check arguments - result = process_commands(self.dev_args + ['backup', '--label', 'backup_test']) + result = self.do_command(self.dev_args + ['backup', '--label', 'backup_test']) self.assertEquals(result['code'], -7) self.assertEquals(result['error'], 'The label and backup passphrase for a Digital Bitbox backup must be specified and cannot be empty') - result = process_commands(self.dev_args + ['backup', '--backup_passphrase', 'key']) + result = self.do_command(self.dev_args + ['backup', '--backup_passphrase', 'key']) self.assertEquals(result['code'], -7) self.assertEquals(result['error'], 'The label and backup passphrase for a Digital Bitbox backup must be specified and cannot be empty') # Wipe - result = process_commands(self.dev_args + ['wipe']) + result = self.do_command(self.dev_args + ['wipe']) self.assertTrue(result['success']) # Setup - result = process_commands(self.dev_args + ['setup', '--label', 'backup_test', '--backup_passphrase', 'testpass']) + result = self.do_command(self.dev_args + ['setup', '--label', 'backup_test', '--backup_passphrase', 'testpass']) self.assertTrue(result['success']) # make the backup - result = process_commands(self.dev_args + ['backup', '--label', 'backup_test_backup', '--backup_passphrase', 'testpass']) + result = self.do_command(self.dev_args + ['backup', '--label', 'backup_test_backup', '--backup_passphrase', 'testpass']) self.assertTrue(result['success']) # Generic Device tests suite = unittest.TestSuite() - suite.addTest(DeviceTestCase.parameterize(TestDBBManCommands, rpc, userpass, type, path, fingerprint, master_xpub, '0000')) - suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, '0000')) - suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, '0000')) - suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, '0000')) - suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, '0000')) + suite.addTest(DeviceTestCase.parameterize(TestDBBManCommands, rpc, userpass, type, path, fingerprint, master_xpub, '0000', interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, '0000', interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, '0000', interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, '0000', interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, '0000', interface=interface)) return suite if __name__ == '__main__': parser = argparse.ArgumentParser(description='Test Digital Bitbox implementation') parser.add_argument('simulator', help='Path to simulator binary') parser.add_argument('bitcoind', help='Path to bitcoind binary') + parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli'], default='library') args = parser.parse_args() # Start bitcoind rpc, userpass = start_bitcoind(args.bitcoind) - suite = digitalbitbox_test_suite(rpc, userpass, args.simulator) + suite = digitalbitbox_test_suite(rpc, userpass, args.simulator, args.interface) unittest.TextTestRunner(verbosity=2).run(suite) diff --git a/test/test_keepkey.py b/test/test_keepkey.py index d412873..2525228 100755 --- a/test/test_keepkey.py +++ b/test/test_keepkey.py @@ -67,19 +67,28 @@ class KeepkeyEmulator(DeviceEmulator): self.emulator_proc.wait() class KeepkeyTestCase(unittest.TestCase): - def __init__(self, emulator, methodName='runTest'): + def __init__(self, emulator, interface='library', methodName='runTest'): super(KeepkeyTestCase, self).__init__(methodName) self.emulator = emulator + self.interface = interface @staticmethod - def parameterize(testclass, emulator): + def parameterize(testclass, emulator, interface='library'): testloader = unittest.TestLoader() testnames = testloader.getTestCaseNames(testclass) suite = unittest.TestSuite() for name in testnames: - suite.addTest(testclass(emulator, name)) + suite.addTest(testclass(emulator, interface, name)) return suite + def do_command(self, args): + if self.interface == 'cli': + proc = subprocess.Popen(['hwi ' + ' '.join(args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True) + result = proc.communicate() + return json.loads(result[0].decode()) + else: + return process_commands(args) + def __str__(self): return 'keepkey: {}'.format(super().__str__()) @@ -104,12 +113,12 @@ class TestKeepkeyGetxpub(KeepkeyTestCase): load_device_by_xprv(client=self.client, xprv=vec['xprv'], pin='', passphrase_protection=False, label='test', language='english') # Test getmasterxpub - gmxp_res = process_commands(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub']) + gmxp_res = self.do_command(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub']) self.assertEqual(gmxp_res['xpub'], vec['master_xpub']) # Test the path derivs for path_vec in vec['vectors']: - gxp_res = process_commands(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']]) + gxp_res = self.do_command(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']]) self.assertEqual(gxp_res['xpub'], path_vec['xpub']) # Keepkey specific management (setup, wipe, restore, backup, promptpin, sendpin) command tests @@ -123,12 +132,12 @@ class TestKeepkeyManCommands(KeepkeyTestCase): def test_setup_wipe(self): # Device is init, setup should fail - result = process_commands(self.dev_args + ['setup']) + result = self.do_command(self.dev_args + ['setup']) self.assertEquals(result['code'], -10) self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again') # Wipe - result = process_commands(self.dev_args + ['wipe']) + result = self.do_command(self.dev_args + ['wipe']) self.assertTrue(result['success']) # Setup @@ -139,12 +148,12 @@ class TestKeepkeyManCommands(KeepkeyTestCase): self.assertTrue(result['success']) # Make sure device is init, setup should fail - result = process_commands(self.dev_args + ['setup']) + result = self.do_command(self.dev_args + ['setup']) self.assertEquals(result['code'], -10) self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again') def test_backup(self): - result = process_commands(self.dev_args + ['backup']) + result = self.do_command(self.dev_args + ['backup']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Keepkey does not support creating a backup via software') @@ -152,10 +161,10 @@ class TestKeepkeyManCommands(KeepkeyTestCase): def test_pins(self): # There's no PIN - result = process_commands(self.dev_args + ['--debug', 'promptpin']) + result = self.do_command(self.dev_args + ['--debug', 'promptpin']) self.assertEqual(result['error'], 'This device does not need a PIN') self.assertEqual(result['code'], -11) - result = process_commands(self.dev_args + ['sendpin', '1234']) + result = self.do_command(self.dev_args + ['sendpin', '1234']) self.assertEqual(result['error'], 'This device does not need a PIN') self.assertEqual(result['code'], -11) @@ -163,42 +172,42 @@ class TestKeepkeyManCommands(KeepkeyTestCase): device.wipe(self.client) load_device_by_mnemonic(client=self.client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='1234', passphrase_protection=False, label='test') self.client.call(messages.ClearSession()) - result = process_commands(self.dev_args + ['promptpin']) + result = self.do_command(self.dev_args + ['promptpin']) self.assertTrue(result['success']) # Invalid pins - result = process_commands(self.dev_args + ['sendpin', 'notnum']) + result = self.do_command(self.dev_args + ['sendpin', 'notnum']) self.assertEqual(result['error'], 'Non-numeric PIN provided') self.assertEqual(result['code'], -7) - result = process_commands(self.dev_args + ['sendpin', '00000']) + result = self.do_command(self.dev_args + ['sendpin', '00000']) self.assertFalse(result['success']) # Make sure we get a needs pin message - result = process_commands(self.dev_args + ['getxpub', 'm/0h']) + result = self.do_command(self.dev_args + ['getxpub', 'm/0h']) self.assertEqual(result['code'], -12) self.assertEqual(result['error'], 'Keepkey is locked. Unlock by using \'promptpin\' and then \'sendpin\'.') # Prompt pin self.client.call(messages.ClearSession()) - result = process_commands(self.dev_args + ['promptpin']) + result = self.do_command(self.dev_args + ['promptpin']) self.assertTrue(result['success']) # Send the PIN self.client.open() pin = self.client.debug.encode_pin('1234') - result = process_commands(self.dev_args + ['sendpin', pin]) + result = self.do_command(self.dev_args + ['sendpin', pin]) self.assertTrue(result['success']) # Sending PIN after unlock - result = process_commands(self.dev_args + ['promptpin']) + result = self.do_command(self.dev_args + ['promptpin']) self.assertEqual(result['error'], 'The PIN has already been sent to this device') self.assertEqual(result['code'], -11) - result = process_commands(self.dev_args + ['sendpin', '1234']) + result = self.do_command(self.dev_args + ['sendpin', '1234']) self.assertEqual(result['error'], 'The PIN has already been sent to this device') self.assertEqual(result['code'], -11) -def keepkey_test_suite(emulator, rpc, userpass): +def keepkey_test_suite(emulator, rpc, userpass, interface): # Redirect stderr to /dev/null as it's super spammy sys.stderr = open(os.devnull, 'w') @@ -211,23 +220,24 @@ def keepkey_test_suite(emulator, rpc, userpass): # Generic Device tests suite = unittest.TestSuite() - suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator)) - suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator)) - suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator)) - suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator)) - suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator)) - suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyGetxpub, emulator=dev_emulator)) - suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyManCommands, emulator=dev_emulator)) + suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) + suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyGetxpub, emulator=dev_emulator, interface=interface)) + suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyManCommands, emulator=dev_emulator, interface=interface)) return suite if __name__ == '__main__': parser = argparse.ArgumentParser(description='Test Keepkey implementation') parser.add_argument('emulator', help='Path to the Keepkey emulator') parser.add_argument('bitcoind', help='Path to bitcoind binary') + parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli'], default='library') args = parser.parse_args() # Start bitcoind rpc, userpass = start_bitcoind(args.bitcoind) - suite = keepkey_test_suite(args.emulator, rpc, userpass) + suite = keepkey_test_suite(args.emulator, rpc, userpass, args.interface) unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite) diff --git a/test/test_ledger.py b/test/test_ledger.py index 06445da..e6233ff 100755 --- a/test/test_ledger.py +++ b/test/test_ledger.py @@ -14,7 +14,7 @@ from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestD from hwilib.cli import process_commands -def ledger_test_suite(rpc, userpass): +def ledger_test_suite(rpc, userpass, interface): # Look for real ledger using HWI API(self-referential, but no other way) enum_res = process_commands(['enumerate']) path = None @@ -31,41 +31,41 @@ def ledger_test_suite(rpc, userpass): # Ledger specific disabled command tests class TestLedgerDisabledCommands(DeviceTestCase): def test_pin(self): - result = process_commands(self.dev_args + ['promptpin']) + result = self.do_command(self.dev_args + ['promptpin']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Ledger Nano S does not need a PIN sent from the host') self.assertEqual(result['code'], -9) - result = process_commands(self.dev_args + ['sendpin', '1234']) + result = self.do_command(self.dev_args + ['sendpin', '1234']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Ledger Nano S does not need a PIN sent from the host') self.assertEqual(result['code'], -9) def test_setup(self): - result = process_commands(self.dev_args + ['setup']) + result = self.do_command(self.dev_args + ['setup']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Ledger Nano S does not support software setup') self.assertEqual(result['code'], -9) def test_wipe(self): - result = process_commands(self.dev_args + ['wipe']) + result = self.do_command(self.dev_args + ['wipe']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Ledger Nano S does not support wiping via software') self.assertEqual(result['code'], -9) def test_restore(self): - result = process_commands(self.dev_args + ['restore']) + result = self.do_command(self.dev_args + ['restore']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Ledger Nano S does not support restoring via software') self.assertEqual(result['code'], -9) def test_backup(self): - result = process_commands(self.dev_args + ['backup']) + result = self.do_command(self.dev_args + ['backup']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Ledger Nano S does not support creating a backup via software') @@ -73,21 +73,22 @@ def ledger_test_suite(rpc, userpass): # Generic Device tests suite = unittest.TestSuite() - suite.addTest(DeviceTestCase.parameterize(TestLedgerDisabledCommands, rpc, userpass, 'ledger', path, fingerprint, master_xpub)) - suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, 'ledger', path, fingerprint, master_xpub)) - suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, 'ledger', path, fingerprint, master_xpub)) - suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, 'ledger', path, fingerprint, master_xpub)) - suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, 'ledger', path, fingerprint, master_xpub)) - suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, 'ledger', path, fingerprint, master_xpub)) + suite.addTest(DeviceTestCase.parameterize(TestLedgerDisabledCommands, rpc, userpass, 'ledger', path, fingerprint, master_xpub, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, 'ledger', path, fingerprint, master_xpub, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, 'ledger', path, fingerprint, master_xpub, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, 'ledger', path, fingerprint, master_xpub, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, 'ledger', path, fingerprint, master_xpub, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, 'ledger', path, fingerprint, master_xpub, interface=interface)) return suite if __name__ == '__main__': parser = argparse.ArgumentParser(description='Test Ledger implementation') parser.add_argument('bitcoind', help='Path to bitcoind binary') + parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli'], default='library') args = parser.parse_args() # Start bitcoind rpc, userpass = start_bitcoind(args.bitcoind) - suite = ledger_test_suite(rpc, userpass) + suite = ledger_test_suite(rpc, userpass, args.interface) unittest.TextTestRunner(verbosity=2).run(suite) diff --git a/test/test_trezor.py b/test/test_trezor.py index fd8291d..d4e271a 100755 --- a/test/test_trezor.py +++ b/test/test_trezor.py @@ -67,19 +67,28 @@ class TrezorEmulator(DeviceEmulator): self.emulator_proc.wait() class TrezorTestCase(unittest.TestCase): - def __init__(self, emulator, methodName='runTest'): + def __init__(self, emulator, interface='library', methodName='runTest'): super(TrezorTestCase, self).__init__(methodName) self.emulator = emulator + self.interface = interface @staticmethod - def parameterize(testclass, emulator): + def parameterize(testclass, emulator, interface='library'): testloader = unittest.TestLoader() testnames = testloader.getTestCaseNames(testclass) suite = unittest.TestSuite() for name in testnames: - suite.addTest(testclass(emulator, name)) + suite.addTest(testclass(emulator, interface, name)) return suite + def do_command(self, args): + if self.interface == 'cli': + proc = subprocess.Popen(['hwi ' + ' '.join(args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True) + result = proc.communicate() + return json.loads(result[0].decode()) + else: + return process_commands(args) + def __str__(self): return 'trezor: {}'.format(super().__str__()) @@ -104,12 +113,12 @@ class TestTrezorGetxpub(TrezorTestCase): load_device_by_xprv(client=self.client, xprv=vec['xprv'], pin='', passphrase_protection=False, label='test', language='english') # Test getmasterxpub - gmxp_res = process_commands(['-t', 'trezor', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub']) + gmxp_res = self.do_command(['-t', 'trezor', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub']) self.assertEqual(gmxp_res['xpub'], vec['master_xpub']) # Test the path derivs for path_vec in vec['vectors']: - gxp_res = process_commands(['-t', 'trezor', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']]) + gxp_res = self.do_command(['-t', 'trezor', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']]) self.assertEqual(gxp_res['xpub'], path_vec['xpub']) # Trezor specific management (setup, wipe, restore, backup, promptpin, sendpin) command tests @@ -123,12 +132,12 @@ class TestTrezorManCommands(TrezorTestCase): def test_setup_wipe(self): # Device is init, setup should fail - result = process_commands(self.dev_args + ['setup']) + result = self.do_command(self.dev_args + ['setup']) self.assertEquals(result['code'], -10) self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again') # Wipe - result = process_commands(self.dev_args + ['wipe']) + result = self.do_command(self.dev_args + ['wipe']) self.assertTrue(result['success']) # Setup @@ -139,12 +148,12 @@ class TestTrezorManCommands(TrezorTestCase): self.assertTrue(result['success']) # Make sure device is init, setup should fail - result = process_commands(self.dev_args + ['setup']) + result = self.do_command(self.dev_args + ['setup']) self.assertEquals(result['code'], -10) self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again') def test_backup(self): - result = process_commands(self.dev_args + ['backup']) + result = self.do_command(self.dev_args + ['backup']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Trezor does not support creating a backup via software') @@ -152,10 +161,10 @@ class TestTrezorManCommands(TrezorTestCase): def test_pins(self): # There's no PIN - result = process_commands(self.dev_args + ['--debug', 'promptpin']) + result = self.do_command(self.dev_args + ['--debug', 'promptpin']) self.assertEqual(result['error'], 'This device does not need a PIN') self.assertEqual(result['code'], -11) - result = process_commands(self.dev_args + ['sendpin', '1234']) + result = self.do_command(self.dev_args + ['sendpin', '1234']) self.assertEqual(result['error'], 'This device does not need a PIN') self.assertEqual(result['code'], -11) @@ -163,42 +172,42 @@ class TestTrezorManCommands(TrezorTestCase): device.wipe(self.client) load_device_by_mnemonic(client=self.client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='1234', passphrase_protection=False, label='test') self.client.call(messages.ClearSession()) - result = process_commands(self.dev_args + ['promptpin']) + result = self.do_command(self.dev_args + ['promptpin']) self.assertTrue(result['success']) # Invalid pins - result = process_commands(self.dev_args + ['sendpin', 'notnum']) + result = self.do_command(self.dev_args + ['sendpin', 'notnum']) self.assertEqual(result['error'], 'Non-numeric PIN provided') self.assertEqual(result['code'], -7) - result = process_commands(self.dev_args + ['sendpin', '00000']) + result = self.do_command(self.dev_args + ['sendpin', '00000']) self.assertFalse(result['success']) # Make sure we get a needs pin message - result = process_commands(self.dev_args + ['getxpub', 'm/0h']) + result = self.do_command(self.dev_args + ['getxpub', 'm/0h']) self.assertEqual(result['code'], -12) self.assertEqual(result['error'], 'Trezor is locked. Unlock by using \'promptpin\' and then \'sendpin\'.') # Prompt pin self.client.call(messages.ClearSession()) - result = process_commands(self.dev_args + ['promptpin']) + result = self.do_command(self.dev_args + ['promptpin']) self.assertTrue(result['success']) # Send the PIN self.client.open() pin = self.client.debug.encode_pin('1234') - result = process_commands(self.dev_args + ['sendpin', pin]) + result = self.do_command(self.dev_args + ['sendpin', pin]) self.assertTrue(result['success']) # Sending PIN after unlock - result = process_commands(self.dev_args + ['promptpin']) + result = self.do_command(self.dev_args + ['promptpin']) self.assertEqual(result['error'], 'The PIN has already been sent to this device') self.assertEqual(result['code'], -11) - result = process_commands(self.dev_args + ['sendpin', '1234']) + result = self.do_command(self.dev_args + ['sendpin', '1234']) self.assertEqual(result['error'], 'The PIN has already been sent to this device') self.assertEqual(result['code'], -11) -def trezor_test_suite(emulator, rpc, userpass): +def trezor_test_suite(emulator, rpc, userpass, interface): # Redirect stderr to /dev/null as it's super spammy sys.stderr = open(os.devnull, 'w') @@ -211,23 +220,24 @@ def trezor_test_suite(emulator, rpc, userpass): # Generic Device tests suite = unittest.TestSuite() - suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator)) - suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator)) - suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator)) - suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator)) - suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator)) - suite.addTest(TrezorTestCase.parameterize(TestTrezorGetxpub, emulator=dev_emulator)) - suite.addTest(TrezorTestCase.parameterize(TestTrezorManCommands, emulator=dev_emulator)) + suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) + suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) + suite.addTest(TrezorTestCase.parameterize(TestTrezorGetxpub, emulator=dev_emulator, interface=interface)) + suite.addTest(TrezorTestCase.parameterize(TestTrezorManCommands, emulator=dev_emulator, interface=interface)) return suite if __name__ == '__main__': parser = argparse.ArgumentParser(description='Test Trezor implementation') parser.add_argument('emulator', help='Path to the Trezor emulator') parser.add_argument('bitcoind', help='Path to bitcoind binary') + parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli'], default='library') args = parser.parse_args() # Start bitcoind rpc, userpass = start_bitcoind(args.bitcoind) - suite = trezor_test_suite(args.emulator, rpc, userpass) + suite = trezor_test_suite(args.emulator, rpc, userpass, args.interface) unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite) From c69c0ee9202177c7cb7a9880347f5c06ee16fa05 Mon Sep 17 00:00:00 2001 From: Andrew Chow Date: Thu, 14 Feb 2019 23:06:27 -0500 Subject: [PATCH 2/3] Add travis job for cli interface test --- .travis.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index f1705f1..aa45112 100644 --- a/.travis.yml +++ b/.travis.yml @@ -49,5 +49,9 @@ install: - cd test; ./setup_environment.sh; cd .. - pip uninstall -y trezor # Hack to get rid of master branch version of trezor that is installed for trezor-mcu build - python setup.py install -script: - - cd test; ./run_tests.py +jobs: + include: + - name: With process_commands interface + script: cd test; ./run_tests.py --interface=library + - name: With command line interface + script: cd test; ./run_tests.py --interface=cli From 1a2b67d1f35226be50dde32d47105bfa06ce0725 Mon Sep 17 00:00:00 2001 From: Andrew Chow Date: Thu, 14 Feb 2019 23:48:24 -0500 Subject: [PATCH 3/3] Test how a very large transaction is handled --- test/test_device.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/test/test_device.py b/test/test_device.py index 30e0765..3f8b9bc 100644 --- a/test/test_device.py +++ b/test/test_device.py @@ -404,6 +404,35 @@ class TestSignTx(DeviceTestCase): else: self._test_signtx("all", self.type in supports_multisig) + # Make a huge transaction which might cause some problems with different interfaces + def test_big_tx(self): + # make a huge transaction that is unrelated to the hardware wallet + outputs = [] + num_inputs = 60 + for i in range(0, num_inputs): + outputs.append({self.wpk_rpc.getnewaddress('', 'legacy'): 0.001}) + psbt = self.wpk_rpc.walletcreatefundedpsbt([], outputs, 0, {}, True)['psbt'] + psbt = self.wpk_rpc.walletprocesspsbt(psbt)['psbt'] + tx = self.wpk_rpc.finalizepsbt(psbt)['hex'] + txid = self.wpk_rpc.sendrawtransaction(tx) + inputs = [] + for i in range(0, num_inputs): + inputs.append({'txid': txid, 'vout': i}) + psbt = self.wpk_rpc.walletcreatefundedpsbt(inputs, [{self.wpk_rpc.getnewaddress('', 'legacy'): 0.001 * num_inputs}], 0, {'subtractFeeFromOutputs': [0]}, True)['psbt'] + # For cli, this should throw an exception + try: + result = self.do_command(self.dev_args + ['signtx', psbt]) + if self.interface == 'cli': + self.fail('Big tx did not cause CLI to error') + if self.type == 'coldcard': + self.assertEqual(result['code'], -7) + else: + self.assertNotIn('code', result) + self.assertNotIn('error', result) + except OSError as e: + if self.interface == 'cli': + pass + class TestDisplayAddress(DeviceTestCase): def setUp(self): self.emulator.start()