Add Keepkey to Trezorlib and have KeepkeyClient use TrezorClient

This commit is contained in:
Andrew Chow 2019-02-08 10:42:48 -05:00
parent 2693ae34f6
commit f0c5aa7d3c
13 changed files with 90 additions and 516 deletions

View File

@ -1,462 +1,36 @@
# KeepKey interaction script
from ..hwwclient import HardwareWalletClient
from ..errors import BadArgumentError, DeviceAlreadyInitError, DeviceAlreadyUnlockedError, DeviceConnectionError, UnavailableActionError, DeviceNotReadyError
from keepkeylib.transport_hid import HidTransport
from keepkeylib.transport_udp import UDPTransport
from keepkeylib.client import BaseClient, DebugWireMixin, DebugLinkMixin, ProtocolMixin, TextUIMixin
from keepkeylib import tools
from keepkeylib import messages_pb2 as messages, types_pb2 as proto
from keepkeylib.tx_api import TxApi
from ..base58 import get_xpub_fingerprint, decode, to_address, xpub_main_2_test, get_xpub_fingerprint_hex
from ..serializations import ser_uint256, uint256_from_str
from .. import bech32
import base64
import binascii
import json
import os
import sys
KEEPKEY_VENDOR_ID = 0x2B24
KEEPKEY_DEVICE_ID = 0x0001
from .trezorlib.transport import enumerate_devices
from .trezor import TrezorClient
from ..base58 import get_xpub_fingerprint_hex
py_enumerate = enumerate # Need to use the enumerate built-in but there's another function already named that
PIN_MATRIX_DESCRIPTION = """
Use the numeric keypad to describe number positions. The layout is:
7 8 9
4 5 6
1 2 3
""".strip()
class TxAPIPSBT(TxApi):
def __init__(self, psbt):
super().__init__('bitcoin_psbt', None)
self.psbt = psbt
def get_tx(self, txhash):
# Find index of the input
for i, input in py_enumerate(self.psbt.tx.vin):
if input.prevout.hash == uint256_from_str(binascii.unhexlify(txhash)[::-1]):
break
psbt_in = self.psbt.inputs[i]
t = proto.TransactionType()
if psbt_in.non_witness_utxo:
assert(psbt_in.non_witness_utxo.sha256 == uint256_from_str(binascii.unhexlify(txhash)[::-1]))
tx = psbt_in.non_witness_utxo
t.version = tx.nVersion
t.lock_time = tx.nLockTime
for vin in tx.vin:
i = t.inputs.add()
i.prev_hash = ser_uint256(vin.prevout.hash)[::-1]
i.prev_index = vin.prevout.n
i.script_sig = vin.scriptSig
i.sequence = vin.nSequence
for vout in tx.vout:
o = t.bin_outputs.add()
o.amount = vout.nValue
o.script_pubkey = vout.scriptPubKey
elif psbt_in.witness_utxo:
# HACK: the library looks up this info for all inputs. we just need to appease it for segwit stuff
t.version = 1
t.lock_time = 0
o = t.bin_outputs.add()
o.amount = psbt_in.witness_utxo.nValue
o.script_pubkey = psbt_in.witness_utxo.scriptPubKey
else:
raise BadArgumentError('{} is not an input in this transaction'.format(txhash))
return t
# Only handles up to 15 of 15
def parse_multisig(script):
# Get m
m = script[0] - 80
if m < 1 or m > 15:
return (False, None)
# Get pubkeys and build HDNodePathType
pubkeys = []
offset = 1
while True:
pubkey_len = script[offset]
if pubkey_len != 33:
break
offset += 1
key = script[offset:offset + 33]
offset += 33
hd_node = proto.HDNodeType(depth=0, fingerprint=0, child_num=0, chain_code=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', public_key=key)
pubkeys.append(proto.HDNodePathType(node=hd_node, address_n=[]))
# Check things at the end
n = script[offset] - 80
if n != len(pubkeys):
return (False, None)
offset += 1
op_cms = script[offset]
if op_cms != 174:
return (False, None)
# Build MultisigRedeemScriptType and return it
multisig = proto.MultisigRedeemScriptType(m=m, signatures=[b''] * n, pubkeys=pubkeys)
return (True, multisig)
# Doesn't init device
class NoInitMixin(ProtocolMixin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def init_device(self):
pass
def actual_init_device(self):
return super().init_device()
class KeepKey(NoInitMixin, TextUIMixin, BaseClient):
pass
class KeepKeyDebug(NoInitMixin, DebugLinkMixin, DebugWireMixin, BaseClient):
pass
def keepkey_exception(f):
def func(*args, **kwargs):
try:
return f(*args, **kwargs)
except ValueError as e:
raise BadArgumentError(str(e))
except OSError as e:
raise DeviceConnectionError(str(e))
return func
# This class extends the HardwareWalletClient for Digital Bitbox specific things
class KeepkeyClient(HardwareWalletClient):
class KeepkeyClient(TrezorClient):
def __init__(self, path, password=''):
super(KeepkeyClient, self).__init__(path, password)
if path.startswith('hid:'):
path = path[4:]
transport = HidTransport((path.encode(), None))
self.client = KeepKey(transport)
elif path.startswith('udp:'):
path = path[4:]
transport = UDPTransport(path)
# Use the debug client for the simulator
self.client = KeepKeyDebug(transport)
# Get the debug link
ip, port = path.split(':')
new_port = int(port) + 1
debug_transport = UDPTransport('{}:{}'.format(ip, new_port))
self.client.set_debuglink(debug_transport)
else:
raise IOError('Unknown device transport')
# if it wasn't able to find a client, throw an error
if not self.client:
raise IOError("no Device")
self.password = password
os.environ['PASSPHRASE'] = password
def _check_unlocked(self):
self.client.actual_init_device()
if self.client.features.pin_protection and not self.client.features.pin_cached:
raise DeviceNotReadyError('Keepkey is locked. Unlock by using \'promptpin\' and then \'sendpin\'.')
# Must return a dict with the xpub
# Retrieves the public key at the specified BIP 32 derivation path
@keepkey_exception
def get_pubkey_at_path(self, path):
self._check_unlocked()
path = path.replace('h', '\'')
path = path.replace('H', '\'')
expanded_path = tools.parse_path(path)
output = self.client.get_public_node(expanded_path)
if self.is_testnet:
return {'xpub':xpub_main_2_test(output.xpub)}
else:
return {'xpub':output.xpub}
# Must return a hex string with the signed transaction
# The tx must be in the combined unsigned transaction format
@keepkey_exception
def sign_tx(self, tx):
self._check_unlocked()
# Get this devices master key fingerprint
master_key = self.client.get_public_node([0])
master_fp = get_xpub_fingerprint(master_key.xpub)
# Do multiple passes for multisig
passes = 1
p = 0
while p < passes:
# Prepare inputs
inputs = []
to_ignore = []
for input_num, (psbt_in, txin) in py_enumerate(list(zip(tx.inputs, tx.tx.vin))):
txinputtype = proto.TxInputType()
# Set the input stuff
txinputtype.prev_hash = ser_uint256(txin.prevout.hash)[::-1]
txinputtype.prev_index = txin.prevout.n
txinputtype.sequence = txin.nSequence
# Detrermine spend type
scriptcode = b''
if psbt_in.non_witness_utxo:
utxo = psbt_in.non_witness_utxo.vout[txin.prevout.n]
txinputtype.script_type = proto.SPENDADDRESS
scriptcode = utxo.scriptPubKey
txinputtype.amount = psbt_in.non_witness_utxo.vout[txin.prevout.n].nValue
elif psbt_in.witness_utxo:
utxo = psbt_in.witness_utxo
# Check if the output is p2sh
if psbt_in.witness_utxo.is_p2sh():
txinputtype.script_type = proto.SPENDP2SHWITNESS
else:
txinputtype.script_type = proto.SPENDWITNESS
scriptcode = psbt_in.witness_utxo.scriptPubKey
txinputtype.amount = psbt_in.witness_utxo.nValue
# Set the script
if psbt_in.witness_script:
scriptcode = psbt_in.witness_script
elif psbt_in.redeem_script:
scriptcode = psbt_in.redeem_script
def ignore_input():
txinputtype.address_n.extend([0x80000000])
txinputtype.ClearField('multisig')
txinputtype.script_type = proto.SPENDWITNESS
inputs.append(txinputtype)
to_ignore.append(input_num)
# Check for multisig
is_ms, multisig = parse_multisig(scriptcode)
if is_ms:
# Add to txinputtype
txinputtype.multisig.CopyFrom(multisig)
if psbt_in.non_witness_utxo:
if utxo.is_p2sh:
txinputtype.script_type = proto.SPENDMULTISIG
else:
# Cannot sign bare multisig, ignore it
ignore_input()
continue
elif not is_ms and psbt_in.non_witness_utxo and not utxo.is_p2pkh:
# Cannot sign unknown spk, ignore it
ignore_input()
continue
elif not is_ms and psbt_in.witness_utxo and psbt_in.witness_script:
# Cannot sign unknown witness script, ignore it
ignore_input()
continue
# Find key to sign with
found = False
our_keys = 0
for key in psbt_in.hd_keypaths.keys():
keypath = psbt_in.hd_keypaths[key]
if keypath[0] == master_fp and key not in psbt_in.partial_sigs:
if not found:
txinputtype.address_n.extend(keypath[1:])
found = True
our_keys += 1
# Determine if we need to do more passes to sign everything
if our_keys > passes:
passes = our_keys
if not found:
# This input is not one of ours
ignore_input()
continue
# append to inputs
inputs.append(txinputtype)
# address version byte
if self.is_testnet:
p2pkh_version = b'\x6f'
p2sh_version = b'\xc4'
bech32_hrp = 'tb'
else:
p2pkh_version = b'\x00'
p2sh_version = b'\x05'
bech32_hrp = 'bc'
# prepare outputs
outputs = []
for out in tx.tx.vout:
txoutput = proto.TxOutputType()
txoutput.amount = out.nValue
txoutput.script_type = proto.PAYTOADDRESS
if out.is_p2pkh():
txoutput.address = to_address(out.scriptPubKey[3:23], p2pkh_version)
txoutput.script_type = 0
elif out.is_p2sh():
txoutput.address = to_address(out.scriptPubKey[2:22], p2sh_version)
txoutput.script_type = 1
else:
wit, ver, prog = out.is_witness()
if wit:
txoutput.address = bech32.encode(bech32_hrp, ver, prog)
else:
raise BadArgumentError("Output is not an address")
# append to outputs
outputs.append(txoutput)
# Sign the transaction
self.client.set_tx_api(TxAPIPSBT(tx))
if self.is_testnet:
signed_tx = self.client.sign_tx("Testnet", inputs, outputs, tx.tx.nVersion, tx.tx.nLockTime)
else:
signed_tx = self.client.sign_tx("Bitcoin", inputs, outputs, tx.tx.nVersion, tx.tx.nLockTime)
# Each input has one signature
for input_num, (psbt_in, sig) in py_enumerate(list(zip(tx.inputs, signed_tx[0]))):
if input_num in to_ignore:
continue
for pubkey in psbt_in.hd_keypaths.keys():
fp = psbt_in.hd_keypaths[pubkey][0]
if fp == master_fp and pubkey not in psbt_in.partial_sigs:
psbt_in.partial_sigs[pubkey] = sig + b'\x01'
break
p += 1
return {'psbt':tx.serialize()}
# Must return a base64 encoded string with the signed message
# The message can be any string
@keepkey_exception
def sign_message(self, message, keypath):
self._check_unlocked()
keypath = keypath.replace('h', '\'')
keypath = keypath.replace('H', '\'')
expanded_path = tools.parse_path(keypath)
result = self.client.sign_message('Bitcoin', expanded_path, message)
return {'signature': base64.b64encode(result.signature).decode('utf-8')}
# Display address of specified type on the device. Only supports single-key based addresses.
@keepkey_exception
def display_address(self, keypath, p2sh_p2wpkh, bech32):
self._check_unlocked()
keypath = keypath.replace('h', '\'')
keypath = keypath.replace('H', '\'')
expanded_path = tools.parse_path(keypath)
address = self.client.get_address(
"Testnet" if self.is_testnet else "Bitcoin",
expanded_path,
show_display=True,
script_type=proto.SPENDWITNESS if bech32 else (proto.SPENDP2SHWITNESS if p2sh_p2wpkh else proto.SPENDADDRESS)
)
return {'address': address}
# Setup a new device
@keepkey_exception
def setup_device(self, label='', passphrase=''):
self.client.actual_init_device()
if self.client.features.initialized:
raise DeviceAlreadyInitError('Device is already initialized. Use wipe first and try again')
self.client.reset_device(False, 256, bool(self.password), True, label, 'english')
return {'success': True}
# Wipe this device
@keepkey_exception
def wipe_device(self):
self._check_unlocked()
self.client.wipe_device()
return {'success': True}
# Restore device from mnemonic or xprv
@keepkey_exception
def restore_device(self, label=''):
self.client.recovery_device(False, 24, bool(self.password), True, label, 'english')
return {'success': True}
# Begin backup process
@keepkey_exception
def backup_device(self, label='', passphrase=''):
raise UnavailableActionError('The Keepkey does not support creating a backup via software')
# Close the device
def close(self):
self.client.close()
# Prompt for a pin on device
@keepkey_exception
def prompt_pin(self):
self.client.actual_init_device()
if not self.client.features.pin_protection:
raise DeviceAlreadyUnlockedError('This device does not need a PIN')
if self.client.features.pin_cached:
raise DeviceAlreadyUnlockedError('The PIN has already been sent to this device')
print('Use \'sendpin\' to provide the number positions for the PIN as displayed on your device\'s screen', file=sys.stderr)
print(PIN_MATRIX_DESCRIPTION, file=sys.stderr)
self.client.call_raw(messages.Ping(message=b'ping', button_protection=False, pin_protection=True, passphrase_protection=False))
return {'success': True}
# Send the pin
@keepkey_exception
def send_pin(self, pin):
if not pin.isdigit():
raise BadArgumentError("Non-numeric PIN provided")
resp = self.client.call_raw(messages.PinMatrixAck(pin=pin))
if isinstance(resp, messages.Failure):
self.client.features = self.client.call_raw(messages.GetFeatures())
if isinstance(self.client.features, messages.Features):
if not self.client.features.pin_protection:
raise DeviceAlreadyUnlockedError('This device does not need a PIN')
if self.client.features.pin_cached:
raise DeviceAlreadyUnlockedError('The PIN has already been sent to this device')
return {'success': False}
return {'success': True}
self.type = 'Keepkey'
def enumerate(password=''):
results = []
paths = []
for d in HidTransport.enumerate():
paths.append('hid:{}'.format(d[0].decode()))
# Try to open the simulator device and conenct to it
try:
sim_dev = UDPTransport('127.0.0.1:21324')
sim_dev.socket.sendall(b"PINGPING")
resp = sim_dev.socket.recv(8)
if resp != b'PONGPONG':
pass
paths.append('udp:127.0.0.1:21324')
except:
pass
for path in paths:
for dev in enumerate_devices():
d_data = {}
d_data['type'] = 'keepkey'
d_data['path'] = path
d_data['path'] = dev.get_path()
client = None
try:
client = KeepkeyClient(path, password)
client.client.actual_init_device()
client = KeepkeyClient(d_data['path'], password)
client.client.init_device()
if not 'keepkey' in client.client.features.vendor:
continue
if client.client.features.initialized:
master_xpub = client.get_pubkey_at_path('m/0h')['xpub']
d_data['fingerprint'] = get_xpub_fingerprint_hex(master_xpub)
else:
d_data['error'] = 'Not initialized'
except Exception as e:
if str(e) == 'Unsupported device':
continue
d_data['error'] = "Could not open client or get fingerprint information: " + str(e)
if client:

View File

@ -86,12 +86,12 @@ class TrezorClient(HardwareWalletClient):
raise IOError("no Device")
self.password = password
self.client.open()
self.type = 'Trezor'
def _check_unlocked(self):
self.client.init_device()
if self.client.features.pin_protection and not self.client.features.pin_cached:
raise DeviceNotReadyError('Trezor is locked. Unlock by using \'promptpin\' and then \'sendpin\'.')
raise DeviceNotReadyError('{} is locked. Unlock by using \'promptpin\' and then \'sendpin\'.'.format(self.type))
# Must return a dict with the xpub
# Retrieves the public key at the specified BIP 32 derivation path
@ -338,7 +338,7 @@ class TrezorClient(HardwareWalletClient):
# Begin backup process
def backup_device(self, label='', passphrase=''):
raise UnavailableActionError('The Trezor does not support creating a backup via software')
raise UnavailableActionError('The {} does not support creating a backup via software'.format(self.type))
# Close the device
@trezor_exception
@ -348,6 +348,7 @@ class TrezorClient(HardwareWalletClient):
# Prompt for a pin on device
@trezor_exception
def prompt_pin(self):
self.client.open()
self.client.init_device()
if not self.client.features.pin_protection:
raise DeviceAlreadyUnlockedError('This device does not need a PIN')
@ -361,16 +362,17 @@ class TrezorClient(HardwareWalletClient):
# Send the pin
@trezor_exception
def send_pin(self, pin):
self.client.features = self.client.call_raw(proto.GetFeatures())
if isinstance(self.client.features, proto.Features):
if not self.client.features.pin_protection:
raise DeviceAlreadyUnlockedError('This device does not need a PIN')
if self.client.features.pin_cached:
raise DeviceAlreadyUnlockedError('The PIN has already been sent to this device')
self.client.open()
if not pin.isdigit():
raise BadArgumentError("Non-numeric PIN provided")
resp = self.client.call_raw(proto.PinMatrixAck(pin=pin))
if isinstance(resp, proto.Failure):
self.client.features = self.client.call_raw(proto.GetFeatures())
if isinstance(self.client.features, proto.Features):
if not self.client.features.pin_protection:
raise DeviceAlreadyUnlockedError('This device does not need a PIN')
if self.client.features.pin_cached:
raise DeviceAlreadyUnlockedError('The PIN has already been sent to this device')
return {'success': False}
return {'success': True}
@ -386,14 +388,13 @@ def enumerate(password=''):
try:
client = TrezorClient(d_data['path'], password)
client.client.init_device()
if not 'trezor' in client.client.features.vendor:
continue
if client.client.features.initialized:
master_xpub = client.get_pubkey_at_path('m/0h')['xpub']
d_data['fingerprint'] = get_xpub_fingerprint_hex(master_xpub)
else:
d_data['error'] = 'Not initialized'
except TypeError as e:
if dev.get_path().startswith('udp:'):
continue
except Exception as e:
d_data['error'] = "Could not open client or get fingerprint information: " + str(e)

View File

@ -10,3 +10,4 @@ This stripped down version was made at commit [d5c2636f0d1b7da3cb94a4eff6169d77f
- Include the compiled protobuf definitions instead of making them on install
- Removed functions that HWI does not use or plan to use
- Changed `TrezorClient` from calling `init_device()` (HWI needs this behavior and doing it in the library makes this simpler)
- Add Keepkey support. Some fields of some messages had to be removed to support both the Keepkey and the Trezor in the same library

View File

@ -4,5 +4,6 @@ __version__ = "0.11.1"
MINIMUM_FIRMWARE_VERSION = {
"1": (1, 6, 1),
"T": (2, 0, 10),
"K1-14AM": (0, 0, 0)
}
# fmt: on

View File

@ -27,7 +27,7 @@ if sys.version_info.major < 3:
LOG = logging.getLogger(__name__)
VENDORS = ("bitcointrezor.com", "trezor.io")
VENDORS = ("bitcointrezor.com", "trezor.io", "keepkey.com")
MAX_PASSPHRASE_LENGTH = 50
DEPRECATION_ERROR = """

View File

@ -156,9 +156,9 @@ def reset(
pin_protection=True,
label=None,
language="english",
u2f_counter=0,
skip_backup=False,
no_backup=False,
# u2f_counter=0,
# skip_backup=False,
# no_backup=False,
):
if client.features.initialized:
raise RuntimeError(
@ -179,9 +179,9 @@ def reset(
pin_protection=bool(pin_protection),
language=language,
label=label,
u2f_counter=u2f_counter,
skip_backup=bool(skip_backup),
no_backup=bool(no_backup),
# u2f_counter=u2f_counter,
# skip_backup=bool(skip_backup),
# no_backup=bool(no_backup),
)
resp = client.call(msg)

View File

@ -20,7 +20,7 @@ class DebugLinkState(p.MessageType):
reset_entropy: bytes = None,
recovery_fake_word: str = None,
recovery_word_pos: int = None,
reset_word_pos: int = None,
# reset_word_pos: int = None,
) -> None:
self.layout = layout
self.pin = pin
@ -32,7 +32,7 @@ class DebugLinkState(p.MessageType):
self.reset_entropy = reset_entropy
self.recovery_fake_word = recovery_fake_word
self.recovery_word_pos = recovery_word_pos
self.reset_word_pos = reset_word_pos
# self.reset_word_pos = reset_word_pos
@classmethod
def get_fields(cls):
@ -47,5 +47,5 @@ class DebugLinkState(p.MessageType):
8: ('reset_entropy', p.BytesType, 0),
9: ('recovery_fake_word', p.UnicodeType, 0),
10: ('recovery_word_pos', p.UVarintType, 0),
11: ('reset_word_pos', p.UVarintType, 0),
# 11: ('reset_word_pos', p.UVarintType, 0),
}

View File

@ -83,15 +83,15 @@ class Features(p.MessageType):
15: ('imported', p.BoolType, 0),
16: ('pin_cached', p.BoolType, 0),
17: ('passphrase_cached', p.BoolType, 0),
18: ('firmware_present', p.BoolType, 0),
19: ('needs_backup', p.BoolType, 0),
20: ('flags', p.UVarintType, 0),
# 18: ('firmware_present', p.BoolType, 0),
# 19: ('needs_backup', p.BoolType, 0),
# 20: ('flags', p.UVarintType, 0),
21: ('model', p.UnicodeType, 0),
22: ('fw_major', p.UVarintType, 0),
23: ('fw_minor', p.UVarintType, 0),
24: ('fw_patch', p.UVarintType, 0),
25: ('fw_vendor', p.UnicodeType, 0),
26: ('fw_vendor_keys', p.BytesType, 0),
27: ('unfinished_backup', p.BoolType, 0),
28: ('no_backup', p.BoolType, 0),
# 22: ('fw_major', p.UVarintType, 0),
# 23: ('fw_minor', p.UVarintType, 0),
# 24: ('fw_patch', p.UVarintType, 0),
# 25: ('fw_vendor', p.UnicodeType, 0),
# 26: ('fw_vendor_keys', p.BytesType, 0),
# 27: ('unfinished_backup', p.BoolType, 0),
# 28: ('no_backup', p.BoolType, 0),
}

View File

@ -14,9 +14,9 @@ class ResetDevice(p.MessageType):
pin_protection: bool = None,
language: str = None,
label: str = None,
u2f_counter: int = None,
skip_backup: bool = None,
no_backup: bool = None,
# u2f_counter: int = None,
# skip_backup: bool = None,
# no_backup: bool = None,
) -> None:
self.display_random = display_random
self.strength = strength
@ -24,9 +24,9 @@ class ResetDevice(p.MessageType):
self.pin_protection = pin_protection
self.language = language
self.label = label
self.u2f_counter = u2f_counter
self.skip_backup = skip_backup
self.no_backup = no_backup
# self.u2f_counter = u2f_counter
# self.skip_backup = skip_backup
# self.no_backup = no_backup
@classmethod
def get_fields(cls):
@ -37,7 +37,7 @@ class ResetDevice(p.MessageType):
4: ('pin_protection', p.BoolType, 0),
5: ('language', p.UnicodeType, 0), # default=english
6: ('label', p.UnicodeType, 0),
7: ('u2f_counter', p.UVarintType, 0),
8: ('skip_backup', p.BoolType, 0),
9: ('no_backup', p.BoolType, 0),
# 7: ('u2f_counter', p.UVarintType, 0),
# 8: ('skip_backup', p.BoolType, 0),
# 9: ('no_backup', p.BoolType, 0),
}

View File

@ -26,8 +26,9 @@ LOG = logging.getLogger(__name__)
DEV_TREZOR1 = (0x534C, 0x0001)
DEV_TREZOR2 = (0x1209, 0x53C1)
DEV_TREZOR2_BL = (0x1209, 0x53C0)
DEV_KEEPKEY = (0x2B24, 0x0001)
TREZORS = {DEV_TREZOR1, DEV_TREZOR2, DEV_TREZOR2_BL}
TREZORS = {DEV_TREZOR1, DEV_TREZOR2, DEV_TREZOR2_BL, DEV_KEEPKEY}
UDEV_RULES_STR = """
Do you have udev rules installed?

View File

@ -19,7 +19,7 @@ import sys
import time
from typing import Any, Dict, Iterable
from . import DEV_TREZOR1, UDEV_RULES_STR, TransportException
from . import DEV_TREZOR1, DEV_KEEPKEY, UDEV_RULES_STR, TransportException
from .protocol import ProtocolBasedTransport, ProtocolV1
LOG = logging.getLogger(__name__)
@ -130,7 +130,7 @@ class HidTransport(ProtocolBasedTransport):
devices = []
for dev in hid.enumerate(0, 0):
usb_id = (dev["vendor_id"], dev["product_id"])
if usb_id != DEV_TREZOR1:
if usb_id != DEV_TREZOR1 and usb_id != DEV_KEEPKEY:
continue
if debug:
if not is_debuglink(dev):

View File

@ -16,7 +16,6 @@ setuptools.setup(
install_requires=[
'hidapi', # HID API needed in general
'btchip-python', # Ledger Nano S
'keepkey>=6.0.1', # KeepKey
'ckcc-protocol[cli]', # Coldcard
'pyaes',
'ecdsa', # Needed for Ledger but their library does not install it

View File

@ -3,7 +3,6 @@
import argparse
import atexit
import json
import logging
import os
import socket
import subprocess
@ -11,9 +10,11 @@ import sys
import time
import unittest
from keepkeylib.transport_udp import UDPTransport
from keepkeylib.client import KeepKeyDebugClient
from keepkeylib import messages_pb2 as messages
from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException
from hwilib.devices.trezorlib.transport import enumerate_devices
from hwilib.devices.trezorlib.transport.udp import UdpTransport
from hwilib.devices.trezorlib.debuglink import TrezorClientDebugLink, load_device_by_mnemonic, load_device_by_xprv
from hwilib.devices.trezorlib import device, messages
from test_device import DeviceEmulator, DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignMessage, TestSignTx
from hwilib.cli import process_commands
@ -21,17 +22,16 @@ from hwilib.devices.keepkey import KeepkeyClient
from types import MethodType
def pin_matrix(self, code=None):
def get_pin(self, code=None):
if self.pin:
pin = self.debug.encode_pin(self.pin)
return self.debuglink.encode_pin(self.pin)
else:
pin = self.debug.read_pin_encoded()
return messages.PinMatrixAck(pin=pin)
return self.debuglink.read_pin_encoded()
class KeepkeyEmulator(DeviceEmulator):
def __init__(self, emulator_path):
def __init__(self, path):
self.emulator_path = path
self.emulator_proc = None
self.emulator_path = emulator_path
def start(self):
# Start the Keepkey emulator
@ -50,25 +50,21 @@ class KeepkeyEmulator(DeviceEmulator):
except Exception:
time.sleep(0.05)
# Redirect stdout to /dev/null as the keepkey lib kind of spammy
sys.stdout = open(os.devnull, 'w')
# Setup the emulator
sim_dev = UDPTransport('127.0.0.1:21324')
sim_dev.buffer = b'' # HACK to work around a bug in the keepkey library
sim_dev_debug = UDPTransport('127.0.0.1:21325')
sim_dev_debug.buffer = b'' # HACK to work around a bug in the keepkey library
client = KeepKeyDebugClient(sim_dev)
client.set_debuglink(sim_dev_debug)
client.wipe_device()
client.load_device_by_mnemonic(mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='', passphrase_protection=False, label='test', language='english') # From Trezor device tests
for dev in enumerate_devices():
# Find the udp transport, that's the emulator
if isinstance(dev, UdpTransport):
wirelink = dev
break
client = TrezorClientDebugLink(wirelink)
client.init_device()
device.wipe(client)
load_device_by_mnemonic(client=client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='', passphrase_protection=False, label='test') # From Trezor device tests
return client
def stop(self):
self.emulator_proc.kill()
self.emulator_proc.wait()
# Redirect stdout back to stdout
sys.stdout = sys.__stdout__
class KeepkeyTestCase(unittest.TestCase):
def __init__(self, emulator, methodName='runTest'):
@ -97,15 +93,15 @@ class TestKeepkeyGetxpub(KeepkeyTestCase):
def tearDown(self):
self.emulator.stop()
def test_getxpub(self):
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/bip32_vectors.json'), encoding='utf-8') as f:
vectors = json.load(f)
for vec in vectors:
with self.subTest(vector=vec):
# Setup with xprv
self.client.wipe_device()
self.client.load_device_by_xprv(xprv=vec['xprv'], pin='', passphrase_protection=False, label='test', language='english')
device.wipe(self.client)
load_device_by_xprv(client=self.client, xprv=vec['xprv'], pin='', passphrase_protection=False, label='test', language='english')
# Test getmasterxpub
gmxp_res = process_commands(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub'])
@ -116,7 +112,7 @@ class TestKeepkeyGetxpub(KeepkeyTestCase):
gxp_res = process_commands(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']])
self.assertEqual(gxp_res['xpub'], path_vec['xpub'])
# Trezor specific management (setup, wipe, restore, backup, promptpin, sendpin) command tests
# Keepkey specific management (setup, wipe, restore, backup, promptpin, sendpin) command tests
class TestKeepkeyManCommands(KeepkeyTestCase):
def setUp(self):
self.client = self.emulator.start()
@ -136,10 +132,10 @@ class TestKeepkeyManCommands(KeepkeyTestCase):
self.assertTrue(result['success'])
# Setup
k_client = KeepkeyClient('udp:127.0.0.1:21324', 'test')
k_client.client.callback_PinMatrixRequest = MethodType(pin_matrix, k_client.client)
k_client.client.pin = '1234'
result = k_client.setup_device()
t_client = KeepkeyClient('udp:127.0.0.1:21324', 'test')
t_client.client.ui.get_pin = MethodType(get_pin, t_client.client.ui)
t_client.client.ui.pin = '1234'
result = t_client.setup_device()
self.assertTrue(result['success'])
# Make sure device is init, setup should fail
@ -164,13 +160,13 @@ class TestKeepkeyManCommands(KeepkeyTestCase):
self.assertEqual(result['code'], -11)
# Set a PIN
self.client.wipe_device()
self.client.load_device_by_mnemonic(mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='1234', passphrase_protection=False, label='test', language='english')
device.wipe(self.client)
load_device_by_mnemonic(client=self.client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='1234', passphrase_protection=False, label='test')
self.client.call(messages.ClearSession())
result = process_commands(self.dev_args + ['promptpin'])
self.assertTrue(result['success'])
# Invalid pin
# Invalid pins
result = process_commands(self.dev_args + ['sendpin', 'notnum'])
self.assertEqual(result['error'], 'Non-numeric PIN provided')
self.assertEqual(result['code'], -7)
@ -189,6 +185,7 @@ class TestKeepkeyManCommands(KeepkeyTestCase):
self.assertTrue(result['success'])
# Send the PIN
self.client.open()
pin = self.client.debug.encode_pin('1234')
result = process_commands(self.dev_args + ['sendpin', pin])
self.assertTrue(result['success'])