315 lines
14 KiB
Python
Executable File
315 lines
14 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
|
|
import argparse
|
|
import atexit
|
|
import json
|
|
import os
|
|
import shlex
|
|
import socket
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import unittest
|
|
|
|
from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException
|
|
from hwilib.devices.trezorlib.transport import enumerate_devices
|
|
from hwilib.devices.trezorlib.transport.udp import UdpTransport
|
|
from hwilib.devices.trezorlib.debuglink import TrezorClientDebugLink, load_device_by_mnemonic, load_device_by_xprv
|
|
from hwilib.devices.trezorlib import device, messages
|
|
from test_device import DeviceEmulator, DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignMessage, TestSignTx
|
|
|
|
from hwilib.cli import process_commands
|
|
from hwilib.devices.keepkey import KeepkeyClient
|
|
|
|
from types import MethodType
|
|
|
|
def get_pin(self, code=None):
|
|
if self.pin:
|
|
return self.debuglink.encode_pin(self.pin)
|
|
else:
|
|
return self.debuglink.read_pin_encoded()
|
|
|
|
class KeepkeyEmulator(DeviceEmulator):
|
|
def __init__(self, path):
|
|
self.emulator_path = path
|
|
self.emulator_proc = None
|
|
|
|
def start(self):
|
|
# Start the Keepkey emulator
|
|
self.emulator_proc = subprocess.Popen(['./' + os.path.basename(self.emulator_path)], cwd=os.path.dirname(self.emulator_path), stdout=subprocess.DEVNULL)
|
|
# Wait for emulator to be up
|
|
# From https://github.com/trezor/trezor-mcu/blob/master/script/wait_for_emulator.py
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.connect(('127.0.0.1', 21324))
|
|
sock.settimeout(0)
|
|
while True:
|
|
try:
|
|
sock.sendall(b"PINGPING")
|
|
r = sock.recv(8)
|
|
if r == b"PONGPONG":
|
|
break
|
|
except Exception:
|
|
time.sleep(0.05)
|
|
|
|
# Setup the emulator
|
|
for dev in enumerate_devices():
|
|
# Find the udp transport, that's the emulator
|
|
if isinstance(dev, UdpTransport):
|
|
wirelink = dev
|
|
break
|
|
client = TrezorClientDebugLink(wirelink)
|
|
client.init_device()
|
|
device.wipe(client)
|
|
load_device_by_mnemonic(client=client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='', passphrase_protection=False, label='test') # From Trezor device tests
|
|
return client
|
|
|
|
def stop(self):
|
|
self.emulator_proc.kill()
|
|
self.emulator_proc.wait()
|
|
|
|
class KeepkeyTestCase(unittest.TestCase):
|
|
def __init__(self, emulator, interface='library', methodName='runTest'):
|
|
super(KeepkeyTestCase, self).__init__(methodName)
|
|
self.emulator = emulator
|
|
self.interface = interface
|
|
|
|
@staticmethod
|
|
def parameterize(testclass, emulator, interface='library'):
|
|
testloader = unittest.TestLoader()
|
|
testnames = testloader.getTestCaseNames(testclass)
|
|
suite = unittest.TestSuite()
|
|
for name in testnames:
|
|
suite.addTest(testclass(emulator, interface, name))
|
|
return suite
|
|
|
|
def do_command(self, args):
|
|
cli_args = []
|
|
for arg in args:
|
|
cli_args.append(shlex.quote(arg))
|
|
if self.interface == 'cli':
|
|
proc = subprocess.Popen(['hwi ' + ' '.join(cli_args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True)
|
|
result = proc.communicate()
|
|
return json.loads(result[0].decode())
|
|
elif self.interface == 'bindist':
|
|
proc = subprocess.Popen(['../dist/hwi ' + ' '.join(cli_args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True)
|
|
result = proc.communicate()
|
|
return json.loads(result[0].decode())
|
|
elif self.interface == 'stdin':
|
|
input_str = '\n'.join(args) + '\n'
|
|
proc = subprocess.Popen(['hwi', '--stdin'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
|
|
result = proc.communicate(input_str.encode())
|
|
return json.loads(result[0].decode())
|
|
else:
|
|
return process_commands(args)
|
|
|
|
def __str__(self):
|
|
return 'keepkey: {}'.format(super().__str__())
|
|
|
|
def __repr__(self):
|
|
return 'keepkey: {}'.format(super().__repr__())
|
|
|
|
# Keepkey specific getxpub test because this requires device specific thing to set xprvs
|
|
class TestKeepkeyGetxpub(KeepkeyTestCase):
|
|
def setUp(self):
|
|
self.client = self.emulator.start()
|
|
|
|
def tearDown(self):
|
|
self.emulator.stop()
|
|
|
|
def test_getxpub(self):
|
|
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/bip32_vectors.json'), encoding='utf-8') as f:
|
|
vectors = json.load(f)
|
|
for vec in vectors:
|
|
with self.subTest(vector=vec):
|
|
# Setup with xprv
|
|
device.wipe(self.client)
|
|
load_device_by_xprv(client=self.client, xprv=vec['xprv'], pin='', passphrase_protection=False, label='test', language='english')
|
|
|
|
# Test getmasterxpub
|
|
gmxp_res = self.do_command(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub'])
|
|
self.assertEqual(gmxp_res['xpub'], vec['master_xpub'])
|
|
|
|
# Test the path derivs
|
|
for path_vec in vec['vectors']:
|
|
gxp_res = self.do_command(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']])
|
|
self.assertEqual(gxp_res['xpub'], path_vec['xpub'])
|
|
|
|
# Keepkey specific management (setup, wipe, restore, backup, promptpin, sendpin) command tests
|
|
class TestKeepkeyManCommands(KeepkeyTestCase):
|
|
def setUp(self):
|
|
self.client = self.emulator.start()
|
|
self.dev_args = ['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324']
|
|
|
|
def tearDown(self):
|
|
self.emulator.stop()
|
|
|
|
def test_setup_wipe(self):
|
|
# Device is init, setup should fail
|
|
result = self.do_command(self.dev_args + ['-i', 'setup'])
|
|
self.assertEquals(result['code'], -10)
|
|
self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again')
|
|
|
|
# Wipe
|
|
result = self.do_command(self.dev_args + ['wipe'])
|
|
self.assertTrue(result['success'])
|
|
|
|
# Setup
|
|
t_client = KeepkeyClient('udp:127.0.0.1:21324', 'test')
|
|
t_client.client.ui.get_pin = MethodType(get_pin, t_client.client.ui)
|
|
t_client.client.ui.pin = '1234'
|
|
result = t_client.setup_device()
|
|
self.assertTrue(result['success'])
|
|
|
|
# Make sure device is init, setup should fail
|
|
result = self.do_command(self.dev_args + ['-i', 'setup'])
|
|
self.assertEquals(result['code'], -10)
|
|
self.assertEquals(result['error'], 'Device is already initialized. Use wipe first and try again')
|
|
|
|
def test_backup(self):
|
|
result = self.do_command(self.dev_args + ['backup'])
|
|
self.assertIn('error', result)
|
|
self.assertIn('code', result)
|
|
self.assertEqual(result['error'], 'The Keepkey does not support creating a backup via software')
|
|
self.assertEqual(result['code'], -9)
|
|
|
|
def test_pins(self):
|
|
# There's no PIN
|
|
result = self.do_command(self.dev_args + ['--debug', 'promptpin'])
|
|
self.assertEqual(result['error'], 'This device does not need a PIN')
|
|
self.assertEqual(result['code'], -11)
|
|
result = self.do_command(self.dev_args + ['sendpin', '1234'])
|
|
self.assertEqual(result['error'], 'This device does not need a PIN')
|
|
self.assertEqual(result['code'], -11)
|
|
result = self.do_command(self.dev_args + ['enumerate'])
|
|
for dev in result:
|
|
if dev['type'] == 'trezor' and dev['path'] == 'udp:127.0.0.1:21324':
|
|
self.assertFalse(dev['needs_pin_sent'])
|
|
|
|
# Set a PIN
|
|
device.wipe(self.client)
|
|
load_device_by_mnemonic(client=self.client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='1234', passphrase_protection=False, label='test')
|
|
self.client.call(messages.ClearSession())
|
|
result = self.do_command(self.dev_args + ['enumerate'])
|
|
for dev in result:
|
|
if dev['type'] == 'trezor' and dev['path'] == 'udp:127.0.0.1:21324':
|
|
self.assertTrue(dev['needs_pin_sent'])
|
|
result = self.do_command(self.dev_args + ['promptpin'])
|
|
self.assertTrue(result['success'])
|
|
|
|
# Invalid pins
|
|
result = self.do_command(self.dev_args + ['sendpin', 'notnum'])
|
|
self.assertEqual(result['error'], 'Non-numeric PIN provided')
|
|
self.assertEqual(result['code'], -7)
|
|
|
|
result = self.do_command(self.dev_args + ['sendpin', '00000'])
|
|
self.assertFalse(result['success'])
|
|
|
|
# Make sure we get a needs pin message
|
|
result = self.do_command(self.dev_args + ['getxpub', 'm/0h'])
|
|
self.assertEqual(result['code'], -12)
|
|
self.assertEqual(result['error'], 'Keepkey is locked. Unlock by using \'promptpin\' and then \'sendpin\'.')
|
|
|
|
# Prompt pin
|
|
self.client.call(messages.ClearSession())
|
|
result = self.do_command(self.dev_args + ['promptpin'])
|
|
self.assertTrue(result['success'])
|
|
|
|
# Send the PIN
|
|
self.client.open()
|
|
pin = self.client.debug.encode_pin('1234')
|
|
result = self.do_command(self.dev_args + ['sendpin', pin])
|
|
self.assertTrue(result['success'])
|
|
|
|
result = self.do_command(self.dev_args + ['enumerate'])
|
|
for dev in result:
|
|
if dev['type'] == 'trezor' and dev['path'] == 'udp:127.0.0.1:21324':
|
|
self.assertFalse(dev['needs_pin_sent'])
|
|
|
|
# Sending PIN after unlock
|
|
result = self.do_command(self.dev_args + ['promptpin'])
|
|
self.assertEqual(result['error'], 'The PIN has already been sent to this device')
|
|
self.assertEqual(result['code'], -11)
|
|
result = self.do_command(self.dev_args + ['sendpin', '1234'])
|
|
self.assertEqual(result['error'], 'The PIN has already been sent to this device')
|
|
self.assertEqual(result['code'], -11)
|
|
|
|
def test_passphrase(self):
|
|
# There's no passphrase
|
|
result = self.do_command(self.dev_args + ['enumerate'])
|
|
for dev in result:
|
|
if dev['type'] == 'keepkey' and dev['path'] == 'udp:127.0.0.1:21324':
|
|
self.assertFalse(dev['needs_passphrase_sent'])
|
|
self.assertEquals(dev['fingerprint'], '95d8f670')
|
|
# Setting a passphrase won't change the fingerprint
|
|
result = self.do_command(self.dev_args + ['-p', 'pass', 'enumerate'])
|
|
for dev in result:
|
|
if dev['type'] == 'keepkey' and dev['path'] == 'udp:127.0.0.1:21324':
|
|
self.assertFalse(dev['needs_passphrase_sent'])
|
|
self.assertEquals(dev['fingerprint'], '95d8f670')
|
|
|
|
# Set a passphrase
|
|
device.wipe(self.client)
|
|
self.client.set_passphrase('pass')
|
|
load_device_by_mnemonic(client=self.client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='', passphrase_protection=True, label='test')
|
|
self.client.call(messages.ClearSession())
|
|
|
|
# A passphrase will need to be sent
|
|
result = self.do_command(self.dev_args + ['enumerate'])
|
|
for dev in result:
|
|
if dev['type'] == 'keepkey' and dev['path'] == 'udp:127.0.0.1:21324':
|
|
self.assertTrue(dev['needs_passphrase_sent'])
|
|
result = self.do_command(self.dev_args + ['-p', 'pass', 'enumerate'])
|
|
for dev in result:
|
|
if dev['type'] == 'keepkey' and dev['path'] == 'udp:127.0.0.1:21324':
|
|
self.assertFalse(dev['needs_passphrase_sent'])
|
|
fpr = dev['fingerprint']
|
|
# A different passphrase would not change the fingerprint
|
|
result = self.do_command(self.dev_args + ['-p', 'pass2', 'enumerate'])
|
|
for dev in result:
|
|
if dev['type'] == 'keepkey' and dev['path'] == 'udp:127.0.0.1:21324':
|
|
self.assertFalse(dev['needs_passphrase_sent'])
|
|
self.assertEqual(dev['fingerprint'], fpr)
|
|
|
|
# Clearing the session and starting a new one with a new passphrase should change the passphrase
|
|
self.client.call(messages.ClearSession())
|
|
result = self.do_command(self.dev_args + ['-p', 'pass3', 'enumerate'])
|
|
for dev in result:
|
|
if dev['type'] == 'keepkey' and dev['path'] == 'udp:127.0.0.1:21324':
|
|
self.assertFalse(dev['needs_passphrase_sent'])
|
|
self.assertNotEqual(dev['fingerprint'], fpr)
|
|
|
|
def keepkey_test_suite(emulator, rpc, userpass, interface):
|
|
# Redirect stderr to /dev/null as it's super spammy
|
|
sys.stderr = open(os.devnull, 'w')
|
|
|
|
# Device info for tests
|
|
type = 'keepkey'
|
|
path = 'udp:127.0.0.1:21324'
|
|
fingerprint = '95d8f670'
|
|
master_xpub = 'xpub6D1weXBcFAo8CqBbpP4TbH5sxQH8ZkqC5pDEvJ95rNNBZC9zrKmZP2fXMuve7ZRBe18pWQQsGg68jkq24mZchHwYENd8cCiSb71u3KD4AFH'
|
|
dev_emulator = KeepkeyEmulator(emulator)
|
|
|
|
# Generic Device tests
|
|
suite = unittest.TestSuite()
|
|
suite.addTest(DeviceTestCase.parameterize(TestDeviceConnect, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
|
|
suite.addTest(DeviceTestCase.parameterize(TestGetKeypool, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
|
|
suite.addTest(DeviceTestCase.parameterize(TestSignTx, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
|
|
suite.addTest(DeviceTestCase.parameterize(TestDisplayAddress, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
|
|
suite.addTest(DeviceTestCase.parameterize(TestSignMessage, rpc, userpass, type, path, fingerprint, master_xpub, emulator=dev_emulator, interface=interface))
|
|
suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyGetxpub, emulator=dev_emulator, interface=interface))
|
|
suite.addTest(KeepkeyTestCase.parameterize(TestKeepkeyManCommands, emulator=dev_emulator, interface=interface))
|
|
return suite
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description='Test Keepkey implementation')
|
|
parser.add_argument('emulator', help='Path to the Keepkey emulator')
|
|
parser.add_argument('bitcoind', help='Path to bitcoind binary')
|
|
parser.add_argument('--interface', help='Which interface to send commands over', choices=['library', 'cli', 'bindist'], default='library')
|
|
args = parser.parse_args()
|
|
|
|
# Start bitcoind
|
|
rpc, userpass = start_bitcoind(args.bitcoind)
|
|
|
|
suite = keepkey_test_suite(args.emulator, rpc, userpass, args.interface)
|
|
unittest.TextTestRunner(stream=sys.stdout, verbosity=2).run(suite)
|