Refactor tests to optionally have emulator start and stop for each test

The trezor and keepkey emulators use the same port so they cannot
be run at the same time. To work around this, the emulators are
instead started and stopped before and after each test using
unittest's setUp and tearDown functions. However, other devices
which do not have conflicts can still be run at test suite creation
time. This is still done for the coldcard.

Furthermore, since the trezor and keepkey both create and use
emulator.img files in the current working directory, when they are
started, the working directory is changed to be the one containing
the emulator executable to avoid conflicting emulator.img files
This commit is contained in:
Andrew Chow 2019-01-10 21:12:29 -05:00
parent a3f31aae60
commit 8ddf434489
3 changed files with 190 additions and 105 deletions

View File

@ -12,6 +12,13 @@ from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException
from hwilib.commands import process_commands
from hwilib.serializations import PSBT
# Class for emulator control
class DeviceEmulator():
def start(self):
pass
def stop(self):
pass
def start_bitcoind(bitcoind_path):
datadir = tempfile.mkdtemp()
bitcoind_proc = subprocess.Popen([bitcoind_path, '-regtest', '-datadir=' + datadir, '-noprinttoconsole'])
@ -42,7 +49,7 @@ def start_bitcoind(bitcoind_path):
return (rpc, userpass)
class DeviceTestCase(unittest.TestCase):
def __init__(self, rpc, rpc_userpass, type, path, fingerprint, master_xpub, password = '', methodName='runTest'):
def __init__(self, rpc, rpc_userpass, type, path, fingerprint, master_xpub, password = '', emulator=None, methodName='runTest'):
super(DeviceTestCase, self).__init__(methodName)
self.rpc = rpc
self.rpc_userpass = rpc_userpass
@ -52,19 +59,29 @@ class DeviceTestCase(unittest.TestCase):
self.master_xpub = master_xpub
self.password = password
self.dev_args = ['-t', self.type, '-d', self.path]
if emulator:
self.emulator = emulator
else:
self.emulator = DeviceEmulator()
if password:
self.dev_args.extend(['-p', password])
@staticmethod
def parameterize(testclass, rpc, rpc_userpass, type, path, fingerprint, master_xpub, password = ''):
def parameterize(testclass, rpc, rpc_userpass, type, path, fingerprint, master_xpub, password = '', emulator=None):
testloader = unittest.TestLoader()
testnames = testloader.getTestCaseNames(testclass)
suite = unittest.TestSuite()
for name in testnames:
suite.addTest(testclass(rpc, rpc_userpass, type, path, fingerprint, master_xpub, password, name))
suite.addTest(testclass(rpc, rpc_userpass, type, path, fingerprint, master_xpub, password, emulator, name))
return suite
class TestDeviceConnect(DeviceTestCase):
def setUp(self):
self.emulator.start()
def tearDown(self):
self.emulator.stop()
def test_enumerate(self):
enum_res = process_commands(['-p', self.password, 'enumerate'])
found = False
@ -102,6 +119,10 @@ class TestGetKeypool(DeviceTestCase):
self.wpk_rpc = AuthServiceProxy('http://{}@127.0.0.1:18443/wallet/'.format(self.rpc_userpass))
if '--testnet' not in self.dev_args:
self.dev_args.append('--testnet')
self.emulator.start()
def tearDown(self):
self.emulator.stop()
def test_getkeypool_bad_args(self):
result = process_commands(self.dev_args + ['getkeypool', '--sh_wpkh', '--wpkh', '0', '20'])
@ -205,6 +226,10 @@ class TestSignTx(DeviceTestCase):
self.wpk_rpc = AuthServiceProxy('http://{}@127.0.0.1:18443/wallet/'.format(self.rpc_userpass))
if '--testnet' not in self.dev_args:
self.dev_args.append('--testnet')
self.emulator.start()
def tearDown(self):
self.emulator.stop()
def _generate_and_finalize(self, unknown_inputs, psbt):
if not unknown_inputs:
@ -342,6 +367,11 @@ class TestSignTx(DeviceTestCase):
self._test_signtx("all", self.type in supports_multisig)
class TestDisplayAddress(DeviceTestCase):
def setUp(self):
self.emulator.start()
def tearDown(self):
self.emulator.stop()
def test_display_address_bad_args(self):
result = process_commands(self.dev_args + ['displayaddress', '--sh_wpkh', '--wpkh', 'm/49h/1h/0h/0/0'])
@ -355,6 +385,11 @@ class TestDisplayAddress(DeviceTestCase):
process_commands(self.dev_args + ['displayaddress', '--wpkh', 'm/84h/1h/0h/0/0'])
class TestSignMessage(DeviceTestCase):
def setUp(self):
self.emulator.start()
def tearDown(self):
self.emulator.stop()
def test_sign_msg(self):
process_commands(self.dev_args + ['signmessage', 'Message signing test', 'm/44h/1h/0h/0/0'])

View File

@ -13,48 +13,73 @@ import unittest
from keepkeylib.transport_udp import UDPTransport
from keepkeylib.client import KeepKeyDebugClient
from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignTx
from test_device import DeviceEmulator, DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignTx
from hwilib.commands import process_commands
emulator_proc = None
class KeepkeyEmulator(DeviceEmulator):
def __init__(self, emulator_path):
self.emulator_proc = None
self.emulator_path = emulator_path
def start_emulator():
# Start the Keepkey emulator
emulator_proc = subprocess.Popen([emulator])
# Wait for emulator to be up
# From https://github.com/trezor/trezor-mcu/blob/master/script/wait_for_emulator.py
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect(('127.0.0.1', 21324))
sock.settimeout(0)
while True:
try:
sock.sendall(b"PINGPING")
r = sock.recv(8)
if r == b"PONGPONG":
break
except Exception:
time.sleep(0.05)
def start(self):
# Start the Keepkey emulator
self.emulator_proc = subprocess.Popen(['./' + os.path.basename(self.emulator_path)], cwd=os.path.dirname(self.emulator_path), stdout=subprocess.DEVNULL)
# Wait for emulator to be up
# From https://github.com/trezor/trezor-mcu/blob/master/script/wait_for_emulator.py
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect(('127.0.0.1', 21324))
sock.settimeout(0)
while True:
try:
sock.sendall(b"PINGPING")
r = sock.recv(8)
if r == b"PONGPONG":
break
except Exception:
time.sleep(0.05)
# Setup the emulator
sim_dev = UDPTransport('127.0.0.1:21324')
sim_dev.buffer = b'' # HACK to work around a bug in the keepkey library
sim_dev_debug = UDPTransport('127.0.0.1:21325')
sim_dev_debug.buffer = b'' # HACK to work around a bug in the keepkey library
client = KeepKeyDebugClient(sim_dev)
client.set_debuglink(sim_dev_debug)
client.wipe_device()
client.load_device_by_mnemonic(mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='', passphrase_protection=False, label='test', language='english') # From Trezor device tests
return client
# Redirect stdout to /dev/null as the keepkey lib kind of spammy
sys.stdout = open(os.devnull, 'w')
def stop_emulator():
if emulator_proc:
emulator_proc.kill()
# Setup the emulator
sim_dev = UDPTransport('127.0.0.1:21324')
sim_dev.buffer = b'' # HACK to work around a bug in the keepkey library
sim_dev_debug = UDPTransport('127.0.0.1:21325')
sim_dev_debug.buffer = b'' # HACK to work around a bug in the keepkey library
client = KeepKeyDebugClient(sim_dev)
client.set_debuglink(sim_dev_debug)
client.wipe_device()
client.load_device_by_mnemonic(mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='', passphrase_protection=False, label='test', language='english') # From Trezor device tests
return client
def stop(self):
self.emulator_proc.kill()
self.emulator_proc.wait()
# Redirect stdout back to stdout
sys.stdout = sys.__stdout__
class KeepkeyTestCase(unittest.TestCase):
def __init__(self, emulator, methodName='runTest'):
super(KeepkeyTestCase, self).__init__(methodName)
self.emulator = emulator
@staticmethod
def parameterize(testclass, emulator):
testloader = unittest.TestLoader()
testnames = testloader.getTestCaseNames(testclass)
suite = unittest.TestSuite()
for name in testnames:
suite.addTest(testclass(emulator, name))
return suite
# Keepkey specific getxpub test because this requires device specific thing to set xprvs
class TestKeepkeyGetxpub(unittest.TestCase):
class TestKeepkeyGetxpub(KeepkeyTestCase):
def setUp(self):
self.client = start_emulator()
self.client = self.emulator.start()
def tearDown(self):
self.emulator.stop()
def test_getxpub(self):
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/bip32_vectors.json'), encoding='utf-8') as f:
@ -83,14 +108,15 @@ def keepkey_test_suite(emulator, rpc, userpass):
path = 'udp:127.0.0.1:21324'
fingerprint = '95d8f670'
master_xpub = 'xpub6D1weXBcFAo8CqBbpP4TbH5sxQH8ZkqC5pDEvJ95rNNBZC9zrKmZP2fXMuve7ZRBe18pWQQsGg68jkq24mZchHwYENd8cCiSb71u3KD4AFH'
dev_emulator = KeepkeyEmulator(emulator)
# Generic Device tests
suite = unittest.TestSuite()
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub))
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub))
suite.addTest(TestKeepkeyGetxpub())
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyGetxpub, emulator=dev_emulator))
return suite
if __name__ == '__main__':

View File

@ -6,6 +6,7 @@ import json
import os
import socket
import subprocess
import sys
import time
import unittest
@ -14,82 +15,105 @@ from trezorlib.transport import enumerate_devices
from trezorlib.transport.udp import UdpTransport
from trezorlib.debuglink import TrezorClientDebugLink, load_device_by_mnemonic, load_device_by_xprv
from trezorlib import device
from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignTx
from test_device import DeviceEmulator, DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignTx
from hwilib.commands import process_commands
def trezor_test_suite(emulator, rpc, userpass):
# Start the Trezor emulator
emulator_proc = subprocess.Popen([emulator])
# Wait for emulator to be up
# From https://github.com/trezor/trezor-mcu/blob/master/script/wait_for_emulator.py
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect(('127.0.0.1', 21324))
sock.settimeout(0)
while True:
try:
sock.sendall(b"PINGPING")
r = sock.recv(8)
if r == b"PONGPONG":
class TrezorEmulator(DeviceEmulator):
def __init__(self, path):
self.emulator_path = path
self.emulator_proc = None
def start(self):
# Start the Trezor emulator
self.emulator_proc = subprocess.Popen(['./' + os.path.basename(self.emulator_path)], cwd=os.path.dirname(self.emulator_path))
# Wait for emulator to be up
# From https://github.com/trezor/trezor-mcu/blob/master/script/wait_for_emulator.py
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect(('127.0.0.1', 21324))
sock.settimeout(0)
while True:
try:
sock.sendall(b"PINGPING")
r = sock.recv(8)
if r == b"PONGPONG":
break
except Exception:
time.sleep(0.05)
# Setup the emulator
for dev in enumerate_devices():
# Find the udp transport, that's the emulator
if isinstance(dev, UdpTransport):
wirelink = dev
break
except Exception:
time.sleep(0.05)
# Cleanup
def cleanup_emulator():
emulator_proc.kill()
atexit.register(cleanup_emulator)
client = TrezorClientDebugLink(wirelink)
device.wipe(client)
load_device_by_mnemonic(client=client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='', passphrase_protection=False, label='test') # From Trezor device tests
return client
# Setup the emulator
for dev in enumerate_devices():
# Find the udp transport, that's the emulator
if isinstance(dev, UdpTransport):
wirelink = dev
break
client = TrezorClientDebugLink(wirelink)
device.wipe(client)
load_device_by_mnemonic(client=client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='', passphrase_protection=False, label='test') # From Trezor device tests
def stop(self):
self.emulator_proc.kill()
self.emulator_proc.wait()
class TrezorTestCase(unittest.TestCase):
def __init__(self, client, methodName='runTest'):
super(TrezorTestCase, self).__init__(methodName)
self.client = client
class TrezorTestCase(unittest.TestCase):
def __init__(self, emulator, methodName='runTest'):
super(TrezorTestCase, self).__init__(methodName)
self.emulator = emulator
@staticmethod
def parameterize(testclass, client):
testloader = unittest.TestLoader()
testnames = testloader.getTestCaseNames(testclass)
suite = unittest.TestSuite()
for name in testnames:
suite.addTest(testclass(client, name))
return suite
@staticmethod
def parameterize(testclass, emulator):
testloader = unittest.TestLoader()
testnames = testloader.getTestCaseNames(testclass)
suite = unittest.TestSuite()
for name in testnames:
suite.addTest(testclass(emulator, name))
return suite
# Trezor specific getxpub test because this requires device specific thing to set xprvs
class TestTrezorGetxpub(TrezorTestCase):
def test_getxpub(self):
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/bip32_vectors.json'), encoding='utf-8') as f:
vectors = json.load(f)
for vec in vectors:
with self.subTest(vector=vec):
# Setup with xprv
device.wipe(self.client)
load_device_by_xprv(client=self.client, xprv=vec['xprv'], pin='', passphrase_protection=False, label='test', language='english')
# Trezor specific getxpub test because this requires device specific thing to set xprvs
class TestTrezorGetxpub(TrezorTestCase):
def setUp(self):
self.client = self.emulator.start()
# Test getmasterxpub
gmxp_res = process_commands(['-t', 'trezor', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub'])
self.assertEqual(gmxp_res['xpub'], vec['master_xpub'])
def tearDown(self):
self.emulator.stop()
# Test the path derivs
for path_vec in vec['vectors']:
gxp_res = process_commands(['-t', 'trezor', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']])
self.assertEqual(gxp_res['xpub'], path_vec['xpub'])
def test_getxpub(self):
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/bip32_vectors.json'), encoding='utf-8') as f:
vectors = json.load(f)
for vec in vectors:
with self.subTest(vector=vec):
# Setup with xprv
device.wipe(self.client)
load_device_by_xprv(client=self.client, xprv=vec['xprv'], pin='', passphrase_protection=False, label='test', language='english')
# Test getmasterxpub
gmxp_res = process_commands(['-t', 'trezor', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub'])
self.assertEqual(gmxp_res['xpub'], vec['master_xpub'])
# Test the path derivs
for path_vec in vec['vectors']:
gxp_res = process_commands(['-t', 'trezor', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']])
self.assertEqual(gxp_res['xpub'], path_vec['xpub'])
def trezor_test_suite(emulator, rpc, userpass):
# Redirect stderr to /dev/null as it's super spammy
sys.stderr = open(os.devnull, 'w')
# Device info for tests
type = 'trezor'
path = 'udp:127.0.0.1:21324'
fingerprint = '95d8f670'
master_xpub = 'xpub6D1weXBcFAo8CqBbpP4TbH5sxQH8ZkqC5pDEvJ95rNNBZC9zrKmZP2fXMuve7ZRBe18pWQQsGg68jkq24mZchHwYENd8cCiSb71u3KD4AFH'
dev_emulator = TrezorEmulator(emulator)
# Generic Device tests
suite = unittest.TestSuite()
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, 'trezor', 'udp:127.0.0.1:21324', '95d8f670', 'xpub6D1weXBcFAo8CqBbpP4TbH5sxQH8ZkqC5pDEvJ95rNNBZC9zrKmZP2fXMuve7ZRBe18pWQQsGg68jkq24mZchHwYENd8cCiSb71u3KD4AFH'))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, 'trezor', 'udp:127.0.0.1:21324', '95d8f670', 'xpub6D1weXBcFAo8CqBbpP4TbH5sxQH8ZkqC5pDEvJ95rNNBZC9zrKmZP2fXMuve7ZRBe18pWQQsGg68jkq24mZchHwYENd8cCiSb71u3KD4AFH'))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, 'trezor', 'udp:127.0.0.1:21324', '95d8f670', 'xpub6D1weXBcFAo8CqBbpP4TbH5sxQH8ZkqC5pDEvJ95rNNBZC9zrKmZP2fXMuve7ZRBe18pWQQsGg68jkq24mZchHwYENd8cCiSb71u3KD4AFH'))
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, 'trezor', 'udp:127.0.0.1:21324', '95d8f670', 'xpub6D1weXBcFAo8CqBbpP4TbH5sxQH8ZkqC5pDEvJ95rNNBZC9zrKmZP2fXMuve7ZRBe18pWQQsGg68jkq24mZchHwYENd8cCiSb71u3KD4AFH'))
suite.addTest(TrezorTestCase.parameterize(TestTrezorGetxpub, client))
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator))
suite.addTest(TrezorTestCase.parameterize(TestTrezorGetxpub, emulator=dev_emulator))
return suite
if __name__ == '__main__':
@ -102,4 +126,4 @@ if __name__ == '__main__':
rpc, userpass = start_bitcoind(args.bitcoind)
suite = trezor_test_suite(args.emulator, rpc, userpass)
unittest.TextTestRunner().run(suite)
unittest.TextTestRunner(stream=sys.stdout).run(suite)