Merge #125: Test how the CLI deals with really big transactions

1a2b67d Test how a very large transaction is handled (Andrew Chow)
c69c0ee Add travis job for cli interface test (Andrew Chow)
e594a0b Allow specifiying a different interface for using HWI in tests (Andrew Chow)

Pull request description:

  It was mentioned in the #bitcoin-core-dev IRC channel a few days ago that really big transactions can cause problems if it was entered as an argument in the shell. This PR adds to the test suite the option to run the tests with different interfaces to allow us to test this. It can use the "library" interface which is what we currently do. This interface calls `process_commands` directly. The `cli` interface uses `subprocess` to run the `hwi` command in the shell to test the command line interface. This will emulate entering a command in the shell. This allows us to test the big transaction case which has been added. This case only tests the interface and not whether the transaction is signed. This test should pass when using the `library` interface but not when using the `cli`. Both interfaces will also be tested on travis as separate jobs.

Tree-SHA512: 47c85c3b4df66aaa552dc55028267024ea53a56f8028bc0e161fd390ea3e96166f3d9157f320bbf922c63fd5726f13c3918c75cbd3a1311c1536477c0605dcee
This commit is contained in:
Andrew Chow 2019-02-28 21:47:52 -05:00
commit 8d738ee31c
No known key found for this signature in database
GPG Key ID: 17565732E08E5E41
8 changed files with 222 additions and 150 deletions

View File

@ -49,5 +49,9 @@ install:
- cd test; ./setup_environment.sh; cd ..
- pip uninstall -y trezor # Hack to get rid of master branch version of trezor that is installed for trezor-mcu build
- python setup.py install
script:
- cd test; ./run_tests.py
jobs:
include:
- name: With process_commands interface
script: cd test; ./run_tests.py --interface=library
- name: With command line interface
script: cd test; ./run_tests.py --interface=cli

View File

@ -32,6 +32,7 @@ dbb_group.add_argument('--no_bitbox', help='Do not run Digital Bitbox test with
dbb_group.add_argument('--bitbox', help='Path to Digital bitbox simulator.', default='work/mcu/build/bin/simulator')
parser.add_argument('--bitcoind', help='Path to bitcoind.', default='work/bitcoin/src/bitcoind')
parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli'], default='library')
args = parser.parse_args()
# Run tests
@ -45,14 +46,14 @@ if not args.no_trezor or not args.no_coldcard or args.ledger or not args.no_bitb
rpc, userpass = start_bitcoind(args.bitcoind)
if not args.no_trezor:
suite.addTest(trezor_test_suite(args.trezor, rpc, userpass))
suite.addTest(trezor_test_suite(args.trezor, rpc, userpass, args.interface))
if not args.no_coldcard:
suite.addTest(coldcard_test_suite(args.coldcard, rpc, userpass))
suite.addTest(coldcard_test_suite(args.coldcard, rpc, userpass, args.interface))
if args.ledger:
suite.addTest(ledger_test_suite(rpc, userpass))
suite.addTest(ledger_test_suite(rpc, userpass, args.interface))
if not args.no_bitbox:
suite.addTest(digitalbitbox_test_suite(rpc, userpass, args.bitbox))
suite.addTest(digitalbitbox_test_suite(rpc, userpass, args.bitbox, args.interface))
if not args.no_keepkey:
suite.addTest(keepkey_test_suite(args.keepkey, rpc, userpass))
suite.addTest(keepkey_test_suite(args.keepkey, rpc, userpass, args.interface))
result = unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite)
sys.exit(not result.wasSuccessful())

View File

@ -12,7 +12,7 @@ from hwilib.devices.ckcc.protocol import CCProtocolPacker
from hwilib.devices.ckcc.client import ColdcardDevice
from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignMessage, TestSignTx
def coldcard_test_suite(simulator, rpc, userpass):
def coldcard_test_suite(simulator, rpc, userpass, interface):
# Start the Coldcard simulator
simulator_proc = subprocess.Popen(['python3', os.path.basename(simulator)], cwd=os.path.dirname(simulator), stdout=subprocess.DEVNULL)
# Wait for simulator to be up
@ -30,39 +30,39 @@ def coldcard_test_suite(simulator, rpc, userpass):
# Coldcard specific management command tests
class TestColdcardManCommands(DeviceTestCase):
def test_setup(self):
result = process_commands(self.dev_args + ['setup'])
result = self.do_command(self.dev_args + ['setup'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Coldcard does not support software setup')
self.assertEqual(result['code'], -9)
def test_wipe(self):
result = process_commands(self.dev_args + ['wipe'])
result = self.do_command(self.dev_args + ['wipe'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Coldcard does not support wiping via software')
self.assertEqual(result['code'], -9)
def test_restore(self):
result = process_commands(self.dev_args + ['restore'])
result = self.do_command(self.dev_args + ['restore'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Coldcard does not support restoring via software')
self.assertEqual(result['code'], -9)
def test_backup(self):
result = process_commands(self.dev_args + ['backup'])
result = self.do_command(self.dev_args + ['backup'])
self.assertTrue(result['success'])
self.assertIn('The backup has been written to', result['message'])
def test_pin(self):
result = process_commands(self.dev_args + ['promptpin'])
result = self.do_command(self.dev_args + ['promptpin'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Coldcard does not need a PIN sent from the host')
self.assertEqual(result['code'], -9)
result = process_commands(self.dev_args + ['sendpin', '1234'])
result = self.do_command(self.dev_args + ['sendpin', '1234'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Coldcard does not need a PIN sent from the host')
@ -70,22 +70,23 @@ def coldcard_test_suite(simulator, rpc, userpass):
# Generic device tests
suite = unittest.TestSuite()
suite.addTest(DeviceTestCase.parameterize(TestColdcardManCommands, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', ''))
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd'))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd'))
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd'))
suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd'))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd'))
suite.addTest(DeviceTestCase.parameterize(TestColdcardManCommands, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', '', interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd', interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd', interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd', interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd', interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, 'coldcard', '/tmp/ckcc-simulator.sock', '0f056943', 'tpubDDpWvmUrPZrhSPmUzCMBHffvC3HyMAPnWDSAQNBTnj1iZeJa7BZQEttFiP4DS4GCcXQHezdXhn86Hj6LHX5EDstXPWrMaSneRWM8yUf6NFd', interface=interface))
return suite
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Test Coldcard implementation')
parser.add_argument('simulator', help='Path to the Coldcard simulator')
parser.add_argument('bitcoind', help='Path to bitcoind binary')
parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli'], default='library')
args = parser.parse_args()
# Start bitcoind
rpc, userpass = start_bitcoind(args.bitcoind)
suite = coldcard_test_suite(args.simulator, rpc, userpass)
suite = coldcard_test_suite(args.simulator, rpc, userpass, args.interface)
unittest.TextTestRunner(verbosity=2).run(suite)

View File

@ -1,6 +1,7 @@
#! /usr/bin/env python3
import atexit
import json
import os
import shutil
import subprocess
@ -50,7 +51,7 @@ def start_bitcoind(bitcoind_path):
return (rpc, userpass)
class DeviceTestCase(unittest.TestCase):
def __init__(self, rpc, rpc_userpass, type, path, fingerprint, master_xpub, password = '', emulator=None, methodName='runTest'):
def __init__(self, rpc, rpc_userpass, type, path, fingerprint, master_xpub, password = '', emulator=None, interface='library', methodName='runTest'):
super(DeviceTestCase, self).__init__(methodName)
self.rpc = rpc
self.rpc_userpass = rpc_userpass
@ -66,16 +67,30 @@ class DeviceTestCase(unittest.TestCase):
self.emulator = DeviceEmulator()
if password:
self.dev_args.extend(['-p', password])
self.interface = interface
@staticmethod
def parameterize(testclass, rpc, rpc_userpass, type, path, fingerprint, master_xpub, password = '', emulator=None):
def parameterize(testclass, rpc, rpc_userpass, type, path, fingerprint, master_xpub, password = '', interface='library', emulator=None):
testloader = unittest.TestLoader()
testnames = testloader.getTestCaseNames(testclass)
suite = unittest.TestSuite()
for name in testnames:
suite.addTest(testclass(rpc, rpc_userpass, type, path, fingerprint, master_xpub, password, emulator, name))
suite.addTest(testclass(rpc, rpc_userpass, type, path, fingerprint, master_xpub, password, emulator, interface, name))
return suite
def do_command(self, args):
if self.interface == 'cli':
proc = subprocess.Popen(['hwi ' + ' '.join(args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True)
result = proc.communicate()
return json.loads(result[0].decode())
else:
return process_commands(args)
def get_password_args(self):
if self.password:
return ['-p', self.password]
return []
def __str__(self):
return '{}: {}'.format(self.type, super().__str__())
@ -90,7 +105,7 @@ class TestDeviceConnect(DeviceTestCase):
self.emulator.stop()
def test_enumerate(self):
enum_res = process_commands(['-p', self.password, 'enumerate'])
enum_res = self.do_command(self.get_password_args() + ['enumerate'])
found = False
for device in enum_res:
if device['type'] == self.type and device['path'] == self.path and device['fingerprint'] == self.fingerprint:
@ -99,31 +114,31 @@ class TestDeviceConnect(DeviceTestCase):
self.assertTrue(found)
def test_no_type(self):
gmxp_res = process_commands(['getmasterxpub'])
gmxp_res = self.do_command(['getmasterxpub'])
self.assertIn('error', gmxp_res)
self.assertEqual(gmxp_res['error'], 'You must specify a device type or fingerprint for all commands except enumerate')
self.assertIn('code', gmxp_res)
self.assertEqual(gmxp_res['code'], -1)
def test_path_type(self):
gmxp_res = process_commands(['-t', self.type, '-d', self.path, '-p', self.password, 'getmasterxpub'])
gmxp_res = self.do_command(self.get_password_args() + ['-t', self.type, '-d', self.path, 'getmasterxpub'])
self.assertEqual(gmxp_res['xpub'], self.master_xpub)
def test_fingerprint_autodetect(self):
gmxp_res = process_commands(['-f', self.fingerprint, '-p', self.password, 'getmasterxpub'])
gmxp_res = self.do_command(self.get_password_args() + ['-f', self.fingerprint, 'getmasterxpub'])
self.assertEqual(gmxp_res['xpub'], self.master_xpub)
# Nonexistent fingerprint
gmxp_res = process_commands(['-f', '0000ffff', '-p', self.password, 'getmasterxpub'])
gmxp_res = self.do_command(self.get_password_args() + ['-f', '0000ffff', 'getmasterxpub'])
self.assertEqual(gmxp_res['error'], 'Could not find device with specified fingerprint')
self.assertEqual(gmxp_res['code'], -3)
def test_type_only_autodetect(self):
gmxp_res = process_commands(['-t', self.type, '-p', self.password, 'getmasterxpub'])
gmxp_res = self.do_command(self.get_password_args() + ['-t', self.type, 'getmasterxpub'])
self.assertEqual(gmxp_res['xpub'], self.master_xpub)
# Unknown device type
gmxp_res = process_commands(['-t', 'fakedev', '-d', 'fakepath', 'getmasterxpub'])
gmxp_res = self.do_command(['-t', 'fakedev', '-d', 'fakepath', 'getmasterxpub'])
self.assertEqual(gmxp_res['error'], 'Unknown device type specified')
self.assertEqual(gmxp_res['code'], -4)
@ -142,17 +157,17 @@ class TestGetKeypool(DeviceTestCase):
self.emulator.stop()
def test_getkeypool_bad_args(self):
result = process_commands(self.dev_args + ['getkeypool', '--sh_wpkh', '--wpkh', '0', '20'])
result = self.do_command(self.dev_args + ['getkeypool', '--sh_wpkh', '--wpkh', '0', '20'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['code'], -7)
def test_getkeypool(self):
non_keypool_desc = process_commands(self.dev_args + ['getkeypool', '0', '20'])
non_keypool_desc = self.do_command(self.dev_args + ['getkeypool', '0', '20'])
import_result = self.wpk_rpc.importmulti(non_keypool_desc)
self.assertTrue(import_result[0]['success'])
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '0', '20'])
import_result = self.wpk_rpc.importmulti(keypool_desc)
self.assertFalse(import_result[0]['success'])
@ -162,75 +177,75 @@ class TestGetKeypool(DeviceTestCase):
addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress())
self.assertEqual(addr_info['hdkeypath'], "m/44'/1'/0'/0/{}".format(i))
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--internal', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--internal', '0', '20'])
import_result = self.wrpc.importmulti(keypool_desc)
self.assertTrue(import_result[0]['success'])
for i in range(0, 21):
addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress())
self.assertEqual(addr_info['hdkeypath'], "m/44'/1'/0'/1/{}".format(i))
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '0', '20'])
import_result = self.wrpc.importmulti(keypool_desc)
self.assertTrue(import_result[0]['success'])
for i in range(0, 21):
addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress())
self.assertEqual(addr_info['hdkeypath'], "m/49'/1'/0'/0/{}".format(i))
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--internal', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--internal', '0', '20'])
import_result = self.wrpc.importmulti(keypool_desc)
self.assertTrue(import_result[0]['success'])
for i in range(0, 21):
addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress())
self.assertEqual(addr_info['hdkeypath'], "m/49'/1'/0'/1/{}".format(i))
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '0', '20'])
import_result = self.wrpc.importmulti(keypool_desc)
self.assertTrue(import_result[0]['success'])
for i in range(0, 21):
addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress())
self.assertEqual(addr_info['hdkeypath'], "m/84'/1'/0'/0/{}".format(i))
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '--internal', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '--internal', '0', '20'])
import_result = self.wrpc.importmulti(keypool_desc)
self.assertTrue(import_result[0]['success'])
for i in range(0, 21):
addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress())
self.assertEqual(addr_info['hdkeypath'], "m/84'/1'/0'/1/{}".format(i))
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--account', '3', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--account', '3', '0', '20'])
import_result = self.wrpc.importmulti(keypool_desc)
self.assertTrue(import_result[0]['success'])
for i in range(0, 21):
addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress())
self.assertEqual(addr_info['hdkeypath'], "m/49'/1'/3'/0/{}".format(i))
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--internal', '--account', '3', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--internal', '--account', '3', '0', '20'])
import_result = self.wrpc.importmulti(keypool_desc)
self.assertTrue(import_result[0]['success'])
for i in range(0, 21):
addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress())
self.assertEqual(addr_info['hdkeypath'], "m/49'/1'/3'/1/{}".format(i))
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '--account', '3', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '--account', '3', '0', '20'])
import_result = self.wrpc.importmulti(keypool_desc)
self.assertTrue(import_result[0]['success'])
for i in range(0, 21):
addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress())
self.assertEqual(addr_info['hdkeypath'], "m/84'/1'/3'/0/{}".format(i))
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '--internal', '--account', '3', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--wpkh', '--internal', '--account', '3', '0', '20'])
import_result = self.wrpc.importmulti(keypool_desc)
self.assertTrue(import_result[0]['success'])
for i in range(0, 21):
addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress())
self.assertEqual(addr_info['hdkeypath'], "m/84'/1'/3'/1/{}".format(i))
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--path', 'm/0h/0h/4h/*', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--path', 'm/0h/0h/4h/*', '0', '20'])
import_result = self.wrpc.importmulti(keypool_desc)
self.assertTrue(import_result[0]['success'])
for i in range(0, 21):
addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress())
self.assertEqual(addr_info['hdkeypath'], "m/0'/0'/4'/{}".format(i))
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--path', '/0h/0h/4h/*', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--path', '/0h/0h/4h/*', '0', '20'])
self.assertEqual(keypool_desc['error'], 'Path must start with m/')
self.assertEqual(keypool_desc['code'], -7)
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--path', 'm/0h/0h/4h/', '0', '20'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--path', 'm/0h/0h/4h/', '0', '20'])
self.assertEqual(keypool_desc['error'], 'Path must end with /*')
self.assertEqual(keypool_desc['code'], -7)
@ -251,7 +266,7 @@ class TestSignTx(DeviceTestCase):
def _generate_and_finalize(self, unknown_inputs, psbt):
if not unknown_inputs:
# Just do the normal signing process to test "all inputs" case
sign_res = process_commands(self.dev_args + ['signtx', psbt['psbt']])
sign_res = self.do_command(self.dev_args + ['signtx', psbt['psbt']])
finalize_res = self.wrpc.finalizepsbt(sign_res['psbt'])
else:
# Sign only input one on first pass
@ -280,11 +295,11 @@ class TestSignTx(DeviceTestCase):
second_psbt = second_psbt.serialize()
# First will always have something to sign
first_sign_res = process_commands(self.dev_args + ['signtx', first_psbt])
first_sign_res = self.do_command(self.dev_args + ['signtx', first_psbt])
self.assertTrue(single_input == self.wrpc.finalizepsbt(first_sign_res['psbt'])['complete'])
# Second may have nothing to sign (1 input case)
# and also may throw an error(e.g., ColdCard)
second_sign_res = process_commands(self.dev_args + ['signtx', second_psbt])
second_sign_res = self.do_command(self.dev_args + ['signtx', second_psbt])
if 'psbt' in second_sign_res:
self.assertTrue(not self.wrpc.finalizepsbt(second_sign_res['psbt'])['complete'])
combined_psbt = self.wrpc.combinepsbt([first_sign_res['psbt'], second_sign_res['psbt']])
@ -300,10 +315,10 @@ class TestSignTx(DeviceTestCase):
def _test_signtx(self, input_type, multisig):
# Import some keys to the watch only wallet and send coins to them
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '30', '40'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '30', '40'])
import_result = self.wrpc.importmulti(keypool_desc)
self.assertTrue(import_result[0]['success'])
keypool_desc = process_commands(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--internal', '30', '40'])
keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--keypool', '--sh_wpkh', '--internal', '30', '40'])
import_result = self.wrpc.importmulti(keypool_desc)
self.assertTrue(import_result[0]['success'])
sh_wpkh_addr = self.wrpc.getnewaddress('', 'p2sh-segwit')
@ -389,6 +404,35 @@ class TestSignTx(DeviceTestCase):
else:
self._test_signtx("all", self.type in supports_multisig)
# Make a huge transaction which might cause some problems with different interfaces
def test_big_tx(self):
# make a huge transaction that is unrelated to the hardware wallet
outputs = []
num_inputs = 60
for i in range(0, num_inputs):
outputs.append({self.wpk_rpc.getnewaddress('', 'legacy'): 0.001})
psbt = self.wpk_rpc.walletcreatefundedpsbt([], outputs, 0, {}, True)['psbt']
psbt = self.wpk_rpc.walletprocesspsbt(psbt)['psbt']
tx = self.wpk_rpc.finalizepsbt(psbt)['hex']
txid = self.wpk_rpc.sendrawtransaction(tx)
inputs = []
for i in range(0, num_inputs):
inputs.append({'txid': txid, 'vout': i})
psbt = self.wpk_rpc.walletcreatefundedpsbt(inputs, [{self.wpk_rpc.getnewaddress('', 'legacy'): 0.001 * num_inputs}], 0, {'subtractFeeFromOutputs': [0]}, True)['psbt']
# For cli, this should throw an exception
try:
result = self.do_command(self.dev_args + ['signtx', psbt])
if self.interface == 'cli':
self.fail('Big tx did not cause CLI to error')
if self.type == 'coldcard':
self.assertEqual(result['code'], -7)
else:
self.assertNotIn('code', result)
self.assertNotIn('error', result)
except OSError as e:
if self.interface == 'cli':
pass
class TestDisplayAddress(DeviceTestCase):
def setUp(self):
self.emulator.start()
@ -397,18 +441,18 @@ class TestDisplayAddress(DeviceTestCase):
self.emulator.stop()
def test_display_address_bad_args(self):
result = process_commands(self.dev_args + ['displayaddress', '--sh_wpkh', '--wpkh', '--path', 'm/49h/1h/0h/0/0'])
result = self.do_command(self.dev_args + ['displayaddress', '--sh_wpkh', '--wpkh', '--path', 'm/49h/1h/0h/0/0'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['code'], -7)
def test_display_address_path(self):
process_commands(self.dev_args + ['displayaddress', '--path', 'm/44h/1h/0h/0/0'])
process_commands(self.dev_args + ['displayaddress', '--sh_wpkh', '--path', 'm/49h/1h/0h/0/0'])
process_commands(self.dev_args + ['displayaddress', '--wpkh', '--path', 'm/84h/1h/0h/0/0'])
self.do_command(self.dev_args + ['displayaddress', '--path', 'm/44h/1h/0h/0/0'])
self.do_command(self.dev_args + ['displayaddress', '--sh_wpkh', '--path', 'm/49h/1h/0h/0/0'])
self.do_command(self.dev_args + ['displayaddress', '--wpkh', '--path', 'm/84h/1h/0h/0/0'])
def test_display_address_bad_path(self):
result = process_commands(self.dev_args + ['displayaddress', '--path', 'f'])
result = self.do_command(self.dev_args + ['displayaddress', '--path', 'f'])
self.assertEquals(result['code'], -7)
def test_display_address_descriptor(self):
@ -454,8 +498,8 @@ class TestSignMessage(DeviceTestCase):
self.emulator.stop()
def test_sign_msg(self):
process_commands(self.dev_args + ['signmessage', 'Message signing test', 'm/44h/1h/0h/0/0'])
self.do_command(self.dev_args + ['signmessage', '"Message signing test"', 'm/44h/1h/0h/0/0'])
def test_bad_path(self):
result = process_commands(self.dev_args + ['signmessage', 'Message signing test', 'f'])
result = self.do_command(self.dev_args + ['signmessage', '"Message signing test"', 'f'])
self.assertEquals(result['code'], -7)

View File

@ -13,7 +13,7 @@ from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestG
from hwilib.cli import process_commands
from hwilib.devices.digitalbitbox import BitboxSimulator, send_plain, send_encrypt
def digitalbitbox_test_suite(rpc, userpass, simulator):
def digitalbitbox_test_suite(rpc, userpass, simulator, interface):
# Start the Digital bitbox simulator
simulator_proc = subprocess.Popen(['./' + os.path.basename(simulator), '../../tests/sd_files/'], cwd=os.path.dirname(simulator), stderr=subprocess.DEVNULL)
# Wait for simulator to be up
@ -44,27 +44,27 @@ def digitalbitbox_test_suite(rpc, userpass, simulator):
# DigitalBitbox specific management command tests
class TestDBBManCommands(DeviceTestCase):
def test_restore(self):
result = process_commands(self.dev_args + ['restore'])
result = self.do_command(self.dev_args + ['restore'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Digital Bitbox does not support restoring via software')
self.assertEqual(result['code'], -9)
def test_pin(self):
result = process_commands(self.dev_args + ['promptpin'])
result = self.do_command(self.dev_args + ['promptpin'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Digital Bitbox does not need a PIN sent from the host')
self.assertEqual(result['code'], -9)
result = process_commands(self.dev_args + ['sendpin', '1234'])
result = self.do_command(self.dev_args + ['sendpin', '1234'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Digital Bitbox does not need a PIN sent from the host')
self.assertEqual(result['code'], -9)
def test_display(self):
result = process_commands(self.dev_args + ['displayaddress', '--path', 'm/0h'])
result = self.do_command(self.dev_args + ['displayaddress', '--path', 'm/0h'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Digital Bitbox does not have a screen to display addresses on')
@ -72,24 +72,24 @@ def digitalbitbox_test_suite(rpc, userpass, simulator):
def test_setup_wipe(self):
# Device is init, setup should fail
result = process_commands(self.dev_args + ['setup', '--label', 'setup_test', '--backup_passphrase', 'testpass'])
result = self.do_command(self.dev_args + ['setup', '--label', 'setup_test', '--backup_passphrase', 'testpass'])
self.assertEquals(result['code'], -10)
self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again')
# Wipe
result = process_commands(self.dev_args + ['wipe'])
result = self.do_command(self.dev_args + ['wipe'])
self.assertTrue(result['success'])
# Check arguments
result = process_commands(self.dev_args + ['setup', '--label', 'setup_test'])
result = self.do_command(self.dev_args + ['setup', '--label', 'setup_test'])
self.assertEquals(result['code'], -7)
self.assertEquals(result['error'], 'The label and backup passphrase for a new Digital Bitbox wallet must be specified and cannot be empty')
result = process_commands(self.dev_args + ['setup', '--backup_passphrase', 'testpass'])
result = self.do_command(self.dev_args + ['setup', '--backup_passphrase', 'testpass'])
self.assertEquals(result['code'], -7)
self.assertEquals(result['error'], 'The label and backup passphrase for a new Digital Bitbox wallet must be specified and cannot be empty')
# Setup
result = process_commands(self.dev_args + ['setup', '--label', 'setup_test', '--backup_passphrase', 'testpass'])
result = self.do_command(self.dev_args + ['setup', '--label', 'setup_test', '--backup_passphrase', 'testpass'])
self.assertTrue(result['success'])
# Reset back to original
@ -99,48 +99,49 @@ def digitalbitbox_test_suite(rpc, userpass, simulator):
send_encrypt(json.dumps({"seed":{"source":"backup","filename":"test_backup.pdf","key":"key"}}), '0000', dev)
# Make sure device is init, setup should fail
result = process_commands(self.dev_args + ['setup', '--label', 'setup_test', '--backup_passphrase', 'testpass'])
result = self.do_command(self.dev_args + ['setup', '--label', 'setup_test', '--backup_passphrase', 'testpass'])
self.assertEquals(result['code'], -10)
self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again')
def test_backup(self):
# Check arguments
result = process_commands(self.dev_args + ['backup', '--label', 'backup_test'])
result = self.do_command(self.dev_args + ['backup', '--label', 'backup_test'])
self.assertEquals(result['code'], -7)
self.assertEquals(result['error'], 'The label and backup passphrase for a Digital Bitbox backup must be specified and cannot be empty')
result = process_commands(self.dev_args + ['backup', '--backup_passphrase', 'key'])
result = self.do_command(self.dev_args + ['backup', '--backup_passphrase', 'key'])
self.assertEquals(result['code'], -7)
self.assertEquals(result['error'], 'The label and backup passphrase for a Digital Bitbox backup must be specified and cannot be empty')
# Wipe
result = process_commands(self.dev_args + ['wipe'])
result = self.do_command(self.dev_args + ['wipe'])
self.assertTrue(result['success'])
# Setup
result = process_commands(self.dev_args + ['setup', '--label', 'backup_test', '--backup_passphrase', 'testpass'])
result = self.do_command(self.dev_args + ['setup', '--label', 'backup_test', '--backup_passphrase', 'testpass'])
self.assertTrue(result['success'])
# make the backup
result = process_commands(self.dev_args + ['backup', '--label', 'backup_test_backup', '--backup_passphrase', 'testpass'])
result = self.do_command(self.dev_args + ['backup', '--label', 'backup_test_backup', '--backup_passphrase', 'testpass'])
self.assertTrue(result['success'])
# Generic Device tests
suite = unittest.TestSuite()
suite.addTest(DeviceTestCase.parameterize(TestDBBManCommands, rpc, userpass, type, path, fingerprint, master_xpub, '0000'))
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, '0000'))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, '0000'))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, '0000'))
suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, '0000'))
suite.addTest(DeviceTestCase.parameterize(TestDBBManCommands, rpc, userpass, type, path, fingerprint, master_xpub, '0000', interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, '0000', interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, '0000', interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, '0000', interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, '0000', interface=interface))
return suite
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Test Digital Bitbox implementation')
parser.add_argument('simulator', help='Path to simulator binary')
parser.add_argument('bitcoind', help='Path to bitcoind binary')
parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli'], default='library')
args = parser.parse_args()
# Start bitcoind
rpc, userpass = start_bitcoind(args.bitcoind)
suite = digitalbitbox_test_suite(rpc, userpass, args.simulator)
suite = digitalbitbox_test_suite(rpc, userpass, args.simulator, args.interface)
unittest.TextTestRunner(verbosity=2).run(suite)

View File

@ -67,19 +67,28 @@ class KeepkeyEmulator(DeviceEmulator):
self.emulator_proc.wait()
class KeepkeyTestCase(unittest.TestCase):
def __init__(self, emulator, methodName='runTest'):
def __init__(self, emulator, interface='library', methodName='runTest'):
super(KeepkeyTestCase, self).__init__(methodName)
self.emulator = emulator
self.interface = interface
@staticmethod
def parameterize(testclass, emulator):
def parameterize(testclass, emulator, interface='library'):
testloader = unittest.TestLoader()
testnames = testloader.getTestCaseNames(testclass)
suite = unittest.TestSuite()
for name in testnames:
suite.addTest(testclass(emulator, name))
suite.addTest(testclass(emulator, interface, name))
return suite
def do_command(self, args):
if self.interface == 'cli':
proc = subprocess.Popen(['hwi ' + ' '.join(args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True)
result = proc.communicate()
return json.loads(result[0].decode())
else:
return process_commands(args)
def __str__(self):
return 'keepkey: {}'.format(super().__str__())
@ -104,12 +113,12 @@ class TestKeepkeyGetxpub(KeepkeyTestCase):
load_device_by_xprv(client=self.client, xprv=vec['xprv'], pin='', passphrase_protection=False, label='test', language='english')
# Test getmasterxpub
gmxp_res = process_commands(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub'])
gmxp_res = self.do_command(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub'])
self.assertEqual(gmxp_res['xpub'], vec['master_xpub'])
# Test the path derivs
for path_vec in vec['vectors']:
gxp_res = process_commands(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']])
gxp_res = self.do_command(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']])
self.assertEqual(gxp_res['xpub'], path_vec['xpub'])
# Keepkey specific management (setup, wipe, restore, backup, promptpin, sendpin) command tests
@ -123,12 +132,12 @@ class TestKeepkeyManCommands(KeepkeyTestCase):
def test_setup_wipe(self):
# Device is init, setup should fail
result = process_commands(self.dev_args + ['setup'])
result = self.do_command(self.dev_args + ['setup'])
self.assertEquals(result['code'], -10)
self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again')
# Wipe
result = process_commands(self.dev_args + ['wipe'])
result = self.do_command(self.dev_args + ['wipe'])
self.assertTrue(result['success'])
# Setup
@ -139,12 +148,12 @@ class TestKeepkeyManCommands(KeepkeyTestCase):
self.assertTrue(result['success'])
# Make sure device is init, setup should fail
result = process_commands(self.dev_args + ['setup'])
result = self.do_command(self.dev_args + ['setup'])
self.assertEquals(result['code'], -10)
self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again')
def test_backup(self):
result = process_commands(self.dev_args + ['backup'])
result = self.do_command(self.dev_args + ['backup'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Keepkey does not support creating a backup via software')
@ -152,10 +161,10 @@ class TestKeepkeyManCommands(KeepkeyTestCase):
def test_pins(self):
# There's no PIN
result = process_commands(self.dev_args + ['--debug', 'promptpin'])
result = self.do_command(self.dev_args + ['--debug', 'promptpin'])
self.assertEqual(result['error'], 'This device does not need a PIN')
self.assertEqual(result['code'], -11)
result = process_commands(self.dev_args + ['sendpin', '1234'])
result = self.do_command(self.dev_args + ['sendpin', '1234'])
self.assertEqual(result['error'], 'This device does not need a PIN')
self.assertEqual(result['code'], -11)
@ -163,42 +172,42 @@ class TestKeepkeyManCommands(KeepkeyTestCase):
device.wipe(self.client)
load_device_by_mnemonic(client=self.client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='1234', passphrase_protection=False, label='test')
self.client.call(messages.ClearSession())
result = process_commands(self.dev_args + ['promptpin'])
result = self.do_command(self.dev_args + ['promptpin'])
self.assertTrue(result['success'])
# Invalid pins
result = process_commands(self.dev_args + ['sendpin', 'notnum'])
result = self.do_command(self.dev_args + ['sendpin', 'notnum'])
self.assertEqual(result['error'], 'Non-numeric PIN provided')
self.assertEqual(result['code'], -7)
result = process_commands(self.dev_args + ['sendpin', '00000'])
result = self.do_command(self.dev_args + ['sendpin', '00000'])
self.assertFalse(result['success'])
# Make sure we get a needs pin message
result = process_commands(self.dev_args + ['getxpub', 'm/0h'])
result = self.do_command(self.dev_args + ['getxpub', 'm/0h'])
self.assertEqual(result['code'], -12)
self.assertEqual(result['error'], 'Keepkey is locked. Unlock by using \'promptpin\' and then \'sendpin\'.')
# Prompt pin
self.client.call(messages.ClearSession())
result = process_commands(self.dev_args + ['promptpin'])
result = self.do_command(self.dev_args + ['promptpin'])
self.assertTrue(result['success'])
# Send the PIN
self.client.open()
pin = self.client.debug.encode_pin('1234')
result = process_commands(self.dev_args + ['sendpin', pin])
result = self.do_command(self.dev_args + ['sendpin', pin])
self.assertTrue(result['success'])
# Sending PIN after unlock
result = process_commands(self.dev_args + ['promptpin'])
result = self.do_command(self.dev_args + ['promptpin'])
self.assertEqual(result['error'], 'The PIN has already been sent to this device')
self.assertEqual(result['code'], -11)
result = process_commands(self.dev_args + ['sendpin', '1234'])
result = self.do_command(self.dev_args + ['sendpin', '1234'])
self.assertEqual(result['error'], 'The PIN has already been sent to this device')
self.assertEqual(result['code'], -11)
def keepkey_test_suite(emulator, rpc, userpass):
def keepkey_test_suite(emulator, rpc, userpass, interface):
# Redirect stderr to /dev/null as it's super spammy
sys.stderr = open(os.devnull, 'w')
@ -211,23 +220,24 @@ def keepkey_test_suite(emulator, rpc, userpass):
# Generic Device tests
suite = unittest.TestSuite()
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyGetxpub, emulator=dev_emulator))
suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyManCommands, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyGetxpub, emulator=dev_emulator, interface=interface))
suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyManCommands, emulator=dev_emulator, interface=interface))
return suite
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Test Keepkey implementation')
parser.add_argument('emulator', help='Path to the Keepkey emulator')
parser.add_argument('bitcoind', help='Path to bitcoind binary')
parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli'], default='library')
args = parser.parse_args()
# Start bitcoind
rpc, userpass = start_bitcoind(args.bitcoind)
suite = keepkey_test_suite(args.emulator, rpc, userpass)
suite = keepkey_test_suite(args.emulator, rpc, userpass, args.interface)
unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite)

View File

@ -14,7 +14,7 @@ from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestD
from hwilib.cli import process_commands
def ledger_test_suite(rpc, userpass):
def ledger_test_suite(rpc, userpass, interface):
# Look for real ledger using HWI API(self-referential, but no other way)
enum_res = process_commands(['enumerate'])
path = None
@ -31,41 +31,41 @@ def ledger_test_suite(rpc, userpass):
# Ledger specific disabled command tests
class TestLedgerDisabledCommands(DeviceTestCase):
def test_pin(self):
result = process_commands(self.dev_args + ['promptpin'])
result = self.do_command(self.dev_args + ['promptpin'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Ledger Nano S does not need a PIN sent from the host')
self.assertEqual(result['code'], -9)
result = process_commands(self.dev_args + ['sendpin', '1234'])
result = self.do_command(self.dev_args + ['sendpin', '1234'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Ledger Nano S does not need a PIN sent from the host')
self.assertEqual(result['code'], -9)
def test_setup(self):
result = process_commands(self.dev_args + ['setup'])
result = self.do_command(self.dev_args + ['setup'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Ledger Nano S does not support software setup')
self.assertEqual(result['code'], -9)
def test_wipe(self):
result = process_commands(self.dev_args + ['wipe'])
result = self.do_command(self.dev_args + ['wipe'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Ledger Nano S does not support wiping via software')
self.assertEqual(result['code'], -9)
def test_restore(self):
result = process_commands(self.dev_args + ['restore'])
result = self.do_command(self.dev_args + ['restore'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Ledger Nano S does not support restoring via software')
self.assertEqual(result['code'], -9)
def test_backup(self):
result = process_commands(self.dev_args + ['backup'])
result = self.do_command(self.dev_args + ['backup'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Ledger Nano S does not support creating a backup via software')
@ -73,21 +73,22 @@ def ledger_test_suite(rpc, userpass):
# Generic Device tests
suite = unittest.TestSuite()
suite.addTest(DeviceTestCase.parameterize(TestLedgerDisabledCommands, rpc, userpass, 'ledger', path, fingerprint, master_xpub))
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, 'ledger', path, fingerprint, master_xpub))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, 'ledger', path, fingerprint, master_xpub))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, 'ledger', path, fingerprint, master_xpub))
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, 'ledger', path, fingerprint, master_xpub))
suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, 'ledger', path, fingerprint, master_xpub))
suite.addTest(DeviceTestCase.parameterize(TestLedgerDisabledCommands, rpc, userpass, 'ledger', path, fingerprint, master_xpub, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, 'ledger', path, fingerprint, master_xpub, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, 'ledger', path, fingerprint, master_xpub, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, 'ledger', path, fingerprint, master_xpub, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, 'ledger', path, fingerprint, master_xpub, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, 'ledger', path, fingerprint, master_xpub, interface=interface))
return suite
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Test Ledger implementation')
parser.add_argument('bitcoind', help='Path to bitcoind binary')
parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli'], default='library')
args = parser.parse_args()
# Start bitcoind
rpc, userpass = start_bitcoind(args.bitcoind)
suite = ledger_test_suite(rpc, userpass)
suite = ledger_test_suite(rpc, userpass, args.interface)
unittest.TextTestRunner(verbosity=2).run(suite)

View File

@ -67,19 +67,28 @@ class TrezorEmulator(DeviceEmulator):
self.emulator_proc.wait()
class TrezorTestCase(unittest.TestCase):
def __init__(self, emulator, methodName='runTest'):
def __init__(self, emulator, interface='library', methodName='runTest'):
super(TrezorTestCase, self).__init__(methodName)
self.emulator = emulator
self.interface = interface
@staticmethod
def parameterize(testclass, emulator):
def parameterize(testclass, emulator, interface='library'):
testloader = unittest.TestLoader()
testnames = testloader.getTestCaseNames(testclass)
suite = unittest.TestSuite()
for name in testnames:
suite.addTest(testclass(emulator, name))
suite.addTest(testclass(emulator, interface, name))
return suite
def do_command(self, args):
if self.interface == 'cli':
proc = subprocess.Popen(['hwi ' + ' '.join(args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True)
result = proc.communicate()
return json.loads(result[0].decode())
else:
return process_commands(args)
def __str__(self):
return 'trezor: {}'.format(super().__str__())
@ -104,12 +113,12 @@ class TestTrezorGetxpub(TrezorTestCase):
load_device_by_xprv(client=self.client, xprv=vec['xprv'], pin='', passphrase_protection=False, label='test', language='english')
# Test getmasterxpub
gmxp_res = process_commands(['-t', 'trezor', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub'])
gmxp_res = self.do_command(['-t', 'trezor', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub'])
self.assertEqual(gmxp_res['xpub'], vec['master_xpub'])
# Test the path derivs
for path_vec in vec['vectors']:
gxp_res = process_commands(['-t', 'trezor', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']])
gxp_res = self.do_command(['-t', 'trezor', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']])
self.assertEqual(gxp_res['xpub'], path_vec['xpub'])
# Trezor specific management (setup, wipe, restore, backup, promptpin, sendpin) command tests
@ -123,12 +132,12 @@ class TestTrezorManCommands(TrezorTestCase):
def test_setup_wipe(self):
# Device is init, setup should fail
result = process_commands(self.dev_args + ['setup'])
result = self.do_command(self.dev_args + ['setup'])
self.assertEquals(result['code'], -10)
self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again')
# Wipe
result = process_commands(self.dev_args + ['wipe'])
result = self.do_command(self.dev_args + ['wipe'])
self.assertTrue(result['success'])
# Setup
@ -139,12 +148,12 @@ class TestTrezorManCommands(TrezorTestCase):
self.assertTrue(result['success'])
# Make sure device is init, setup should fail
result = process_commands(self.dev_args + ['setup'])
result = self.do_command(self.dev_args + ['setup'])
self.assertEquals(result['code'], -10)
self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again')
def test_backup(self):
result = process_commands(self.dev_args + ['backup'])
result = self.do_command(self.dev_args + ['backup'])
self.assertIn('error', result)
self.assertIn('code', result)
self.assertEqual(result['error'], 'The Trezor does not support creating a backup via software')
@ -152,10 +161,10 @@ class TestTrezorManCommands(TrezorTestCase):
def test_pins(self):
# There's no PIN
result = process_commands(self.dev_args + ['--debug', 'promptpin'])
result = self.do_command(self.dev_args + ['--debug', 'promptpin'])
self.assertEqual(result['error'], 'This device does not need a PIN')
self.assertEqual(result['code'], -11)
result = process_commands(self.dev_args + ['sendpin', '1234'])
result = self.do_command(self.dev_args + ['sendpin', '1234'])
self.assertEqual(result['error'], 'This device does not need a PIN')
self.assertEqual(result['code'], -11)
@ -163,42 +172,42 @@ class TestTrezorManCommands(TrezorTestCase):
device.wipe(self.client)
load_device_by_mnemonic(client=self.client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='1234', passphrase_protection=False, label='test')
self.client.call(messages.ClearSession())
result = process_commands(self.dev_args + ['promptpin'])
result = self.do_command(self.dev_args + ['promptpin'])
self.assertTrue(result['success'])
# Invalid pins
result = process_commands(self.dev_args + ['sendpin', 'notnum'])
result = self.do_command(self.dev_args + ['sendpin', 'notnum'])
self.assertEqual(result['error'], 'Non-numeric PIN provided')
self.assertEqual(result['code'], -7)
result = process_commands(self.dev_args + ['sendpin', '00000'])
result = self.do_command(self.dev_args + ['sendpin', '00000'])
self.assertFalse(result['success'])
# Make sure we get a needs pin message
result = process_commands(self.dev_args + ['getxpub', 'm/0h'])
result = self.do_command(self.dev_args + ['getxpub', 'm/0h'])
self.assertEqual(result['code'], -12)
self.assertEqual(result['error'], 'Trezor is locked. Unlock by using \'promptpin\' and then \'sendpin\'.')
# Prompt pin
self.client.call(messages.ClearSession())
result = process_commands(self.dev_args + ['promptpin'])
result = self.do_command(self.dev_args + ['promptpin'])
self.assertTrue(result['success'])
# Send the PIN
self.client.open()
pin = self.client.debug.encode_pin('1234')
result = process_commands(self.dev_args + ['sendpin', pin])
result = self.do_command(self.dev_args + ['sendpin', pin])
self.assertTrue(result['success'])
# Sending PIN after unlock
result = process_commands(self.dev_args + ['promptpin'])
result = self.do_command(self.dev_args + ['promptpin'])
self.assertEqual(result['error'], 'The PIN has already been sent to this device')
self.assertEqual(result['code'], -11)
result = process_commands(self.dev_args + ['sendpin', '1234'])
result = self.do_command(self.dev_args + ['sendpin', '1234'])
self.assertEqual(result['error'], 'The PIN has already been sent to this device')
self.assertEqual(result['code'], -11)
def trezor_test_suite(emulator, rpc, userpass):
def trezor_test_suite(emulator, rpc, userpass, interface):
# Redirect stderr to /dev/null as it's super spammy
sys.stderr = open(os.devnull, 'w')
@ -211,23 +220,24 @@ def trezor_test_suite(emulator, rpc, userpass):
# Generic Device tests
suite = unittest.TestSuite()
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(TrezorTestCase.parameterize(TestTrezorGetxpub, emulator=dev_emulator))
suite.addTest(TrezorTestCase.parameterize(TestTrezorManCommands, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
suite.addTest(TrezorTestCase.parameterize(TestTrezorGetxpub, emulator=dev_emulator, interface=interface))
suite.addTest(TrezorTestCase.parameterize(TestTrezorManCommands, emulator=dev_emulator, interface=interface))
return suite
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Test Trezor implementation')
parser.add_argument('emulator', help='Path to the Trezor emulator')
parser.add_argument('bitcoind', help='Path to bitcoind binary')
parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli'], default='library')
args = parser.parse_args()
# Start bitcoind
rpc, userpass = start_bitcoind(args.bitcoind)
suite = trezor_test_suite(args.emulator, rpc, userpass)
suite = trezor_test_suite(args.emulator, rpc, userpass, args.interface)
unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite)