#! /usr/bin/env python3 import argparse import atexit import json import os import socket import subprocess import sys import time import unittest from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException from hwilib.devices.trezorlib.transport import enumerate_devices from hwilib.devices.trezorlib.transport.udp import UdpTransport from hwilib.devices.trezorlib.debuglink import TrezorClientDebugLink, load_device_by_mnemonic, load_device_by_xprv from hwilib.devices.trezorlib import device, messages from test_device import DeviceEmulator, DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignMessage, TestSignTx from hwilib.cli import process_commands from hwilib.devices.keepkey import KeepkeyClient from types import MethodType def get_pin(self, code=None): if self.pin: return self.debuglink.encode_pin(self.pin) else: return self.debuglink.read_pin_encoded() class KeepkeyEmulator(DeviceEmulator): def __init__(self, path): self.emulator_path = path self.emulator_proc = None def start(self): # Start the Keepkey emulator self.emulator_proc = subprocess.Popen(['./' + os.path.basename(self.emulator_path)], cwd=os.path.dirname(self.emulator_path), stdout=subprocess.DEVNULL) # Wait for emulator to be up # From https://github.com/trezor/trezor-mcu/blob/master/script/wait_for_emulator.py sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.connect(('127.0.0.1', 21324)) sock.settimeout(0) while True: try: sock.sendall(b"PINGPING") r = sock.recv(8) if r == b"PONGPONG": break except Exception: time.sleep(0.05) # Setup the emulator for dev in enumerate_devices(): # Find the udp transport, that's the emulator if isinstance(dev, UdpTransport): wirelink = dev break client = TrezorClientDebugLink(wirelink) client.init_device() device.wipe(client) load_device_by_mnemonic(client=client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='', passphrase_protection=False, label='test') # From Trezor device tests return client def stop(self): self.emulator_proc.kill() self.emulator_proc.wait() class KeepkeyTestCase(unittest.TestCase): def __init__(self, emulator, interface='library', methodName='runTest'): super(KeepkeyTestCase, self).__init__(methodName) self.emulator = emulator self.interface = interface @staticmethod def parameterize(testclass, emulator, interface='library'): testloader = unittest.TestLoader() testnames = testloader.getTestCaseNames(testclass) suite = unittest.TestSuite() for name in testnames: suite.addTest(testclass(emulator, interface, name)) return suite def do_command(self, args): if self.interface == 'cli': proc = subprocess.Popen(['hwi ' + ' '.join(args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True) result = proc.communicate() return json.loads(result[0].decode()) elif self.interface == 'bindist': proc = subprocess.Popen(['../dist/hwi ' + ' '.join(args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True) result = proc.communicate() return json.loads(result[0].decode()) elif self.interface == 'stdin': input_str = '\n'.join(args) + '\n' proc = subprocess.Popen(['hwi', '--stdin'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) result = proc.communicate(input_str.encode()) return json.loads(result[0].decode()) else: return process_commands(args) def __str__(self): return 'keepkey: {}'.format(super().__str__()) def __repr__(self): return 'keepkey: {}'.format(super().__repr__()) # Keepkey specific getxpub test because this requires device specific thing to set xprvs class TestKeepkeyGetxpub(KeepkeyTestCase): def setUp(self): self.client = self.emulator.start() def tearDown(self): self.emulator.stop() def test_getxpub(self): with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/bip32_vectors.json'), encoding='utf-8') as f: vectors = json.load(f) for vec in vectors: with self.subTest(vector=vec): # Setup with xprv device.wipe(self.client) load_device_by_xprv(client=self.client, xprv=vec['xprv'], pin='', passphrase_protection=False, label='test', language='english') # Test getmasterxpub gmxp_res = self.do_command(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub']) self.assertEqual(gmxp_res['xpub'], vec['master_xpub']) # Test the path derivs for path_vec in vec['vectors']: gxp_res = self.do_command(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']]) self.assertEqual(gxp_res['xpub'], path_vec['xpub']) # Keepkey specific management (setup, wipe, restore, backup, promptpin, sendpin) command tests class TestKeepkeyManCommands(KeepkeyTestCase): def setUp(self): self.client = self.emulator.start() self.dev_args = ['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324'] def tearDown(self): self.emulator.stop() def test_setup_wipe(self): # Device is init, setup should fail result = self.do_command(self.dev_args + ['-i', 'setup']) self.assertEquals(result['code'], -10) self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again') # Wipe result = self.do_command(self.dev_args + ['wipe']) self.assertTrue(result['success']) # Setup t_client = KeepkeyClient('udp:127.0.0.1:21324', 'test') t_client.client.ui.get_pin = MethodType(get_pin, t_client.client.ui) t_client.client.ui.pin = '1234' result = t_client.setup_device() self.assertTrue(result['success']) # Make sure device is init, setup should fail result = self.do_command(self.dev_args + ['-i', 'setup']) self.assertEquals(result['code'], -10) self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again') def test_backup(self): result = self.do_command(self.dev_args + ['backup']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['error'], 'The Keepkey does not support creating a backup via software') self.assertEqual(result['code'], -9) def test_pins(self): # There's no PIN result = self.do_command(self.dev_args + ['--debug', 'promptpin']) self.assertEqual(result['error'], 'This device does not need a PIN') self.assertEqual(result['code'], -11) result = self.do_command(self.dev_args + ['sendpin', '1234']) self.assertEqual(result['error'], 'This device does not need a PIN') self.assertEqual(result['code'], -11) # Set a PIN device.wipe(self.client) load_device_by_mnemonic(client=self.client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='1234', passphrase_protection=False, label='test') self.client.call(messages.ClearSession()) result = self.do_command(self.dev_args + ['promptpin']) self.assertTrue(result['success']) # Invalid pins result = self.do_command(self.dev_args + ['sendpin', 'notnum']) self.assertEqual(result['error'], 'Non-numeric PIN provided') self.assertEqual(result['code'], -7) result = self.do_command(self.dev_args + ['sendpin', '00000']) self.assertFalse(result['success']) # Make sure we get a needs pin message result = self.do_command(self.dev_args + ['getxpub', 'm/0h']) self.assertEqual(result['code'], -12) self.assertEqual(result['error'], 'Keepkey is locked. Unlock by using \'promptpin\' and then \'sendpin\'.') # Prompt pin self.client.call(messages.ClearSession()) result = self.do_command(self.dev_args + ['promptpin']) self.assertTrue(result['success']) # Send the PIN self.client.open() pin = self.client.debug.encode_pin('1234') result = self.do_command(self.dev_args + ['sendpin', pin]) self.assertTrue(result['success']) # Sending PIN after unlock result = self.do_command(self.dev_args + ['promptpin']) self.assertEqual(result['error'], 'The PIN has already been sent to this device') self.assertEqual(result['code'], -11) result = self.do_command(self.dev_args + ['sendpin', '1234']) self.assertEqual(result['error'], 'The PIN has already been sent to this device') self.assertEqual(result['code'], -11) def keepkey_test_suite(emulator, rpc, userpass, interface): # Redirect stderr to /dev/null as it's super spammy sys.stderr = open(os.devnull, 'w') # Device info for tests type = 'keepkey' path = 'udp:127.0.0.1:21324' fingerprint = '95d8f670' master_xpub = 'xpub6D1weXBcFAo8CqBbpP4TbH5sxQH8ZkqC5pDEvJ95rNNBZC9zrKmZP2fXMuve7ZRBe18pWQQsGg68jkq24mZchHwYENd8cCiSb71u3KD4AFH' dev_emulator = KeepkeyEmulator(emulator) # Generic Device tests suite = unittest.TestSuite() suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface)) suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyGetxpub, emulator=dev_emulator, interface=interface)) suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyManCommands, emulator=dev_emulator, interface=interface)) return suite if __name__ == '__main__': parser = argparse.ArgumentParser(description='Test Keepkey implementation') parser.add_argument('emulator', help='Path to the Keepkey emulator') parser.add_argument('bitcoind', help='Path to bitcoind binary') parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli', 'bindist'], default='library') args = parser.parse_args() # Start bitcoind rpc, userpass = start_bitcoind(args.bitcoind) suite = keepkey_test_suite(args.emulator, rpc, userpass, args.interface) unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite)