Add Keepkey to Trezorlib and have KeepkeyClient use TrezorClient
This commit is contained in:
parent
2693ae34f6
commit
f0c5aa7d3c
@ -1,462 +1,36 @@
|
||||
# KeepKey interaction script
|
||||
|
||||
from ..hwwclient import HardwareWalletClient
|
||||
from ..errors import BadArgumentError, DeviceAlreadyInitError, DeviceAlreadyUnlockedError, DeviceConnectionError, UnavailableActionError, DeviceNotReadyError
|
||||
from keepkeylib.transport_hid import HidTransport
|
||||
from keepkeylib.transport_udp import UDPTransport
|
||||
from keepkeylib.client import BaseClient, DebugWireMixin, DebugLinkMixin, ProtocolMixin, TextUIMixin
|
||||
from keepkeylib import tools
|
||||
from keepkeylib import messages_pb2 as messages, types_pb2 as proto
|
||||
from keepkeylib.tx_api import TxApi
|
||||
from ..base58 import get_xpub_fingerprint, decode, to_address, xpub_main_2_test, get_xpub_fingerprint_hex
|
||||
from ..serializations import ser_uint256, uint256_from_str
|
||||
from .. import bech32
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
|
||||
KEEPKEY_VENDOR_ID = 0x2B24
|
||||
KEEPKEY_DEVICE_ID = 0x0001
|
||||
from .trezorlib.transport import enumerate_devices
|
||||
from .trezor import TrezorClient
|
||||
from ..base58 import get_xpub_fingerprint_hex
|
||||
|
||||
py_enumerate = enumerate # Need to use the enumerate built-in but there's another function already named that
|
||||
|
||||
PIN_MATRIX_DESCRIPTION = """
|
||||
Use the numeric keypad to describe number positions. The layout is:
|
||||
7 8 9
|
||||
4 5 6
|
||||
1 2 3
|
||||
""".strip()
|
||||
|
||||
class TxAPIPSBT(TxApi):
|
||||
|
||||
def __init__(self, psbt):
|
||||
super().__init__('bitcoin_psbt', None)
|
||||
self.psbt = psbt
|
||||
|
||||
def get_tx(self, txhash):
|
||||
# Find index of the input
|
||||
for i, input in py_enumerate(self.psbt.tx.vin):
|
||||
if input.prevout.hash == uint256_from_str(binascii.unhexlify(txhash)[::-1]):
|
||||
break
|
||||
|
||||
psbt_in = self.psbt.inputs[i]
|
||||
t = proto.TransactionType()
|
||||
if psbt_in.non_witness_utxo:
|
||||
assert(psbt_in.non_witness_utxo.sha256 == uint256_from_str(binascii.unhexlify(txhash)[::-1]))
|
||||
tx = psbt_in.non_witness_utxo
|
||||
|
||||
t.version = tx.nVersion
|
||||
t.lock_time = tx.nLockTime
|
||||
|
||||
for vin in tx.vin:
|
||||
i = t.inputs.add()
|
||||
i.prev_hash = ser_uint256(vin.prevout.hash)[::-1]
|
||||
i.prev_index = vin.prevout.n
|
||||
i.script_sig = vin.scriptSig
|
||||
i.sequence = vin.nSequence
|
||||
|
||||
for vout in tx.vout:
|
||||
o = t.bin_outputs.add()
|
||||
o.amount = vout.nValue
|
||||
o.script_pubkey = vout.scriptPubKey
|
||||
elif psbt_in.witness_utxo:
|
||||
# HACK: the library looks up this info for all inputs. we just need to appease it for segwit stuff
|
||||
t.version = 1
|
||||
t.lock_time = 0
|
||||
o = t.bin_outputs.add()
|
||||
o.amount = psbt_in.witness_utxo.nValue
|
||||
o.script_pubkey = psbt_in.witness_utxo.scriptPubKey
|
||||
else:
|
||||
raise BadArgumentError('{} is not an input in this transaction'.format(txhash))
|
||||
|
||||
return t
|
||||
|
||||
# Only handles up to 15 of 15
|
||||
def parse_multisig(script):
|
||||
# Get m
|
||||
m = script[0] - 80
|
||||
if m < 1 or m > 15:
|
||||
return (False, None)
|
||||
|
||||
# Get pubkeys and build HDNodePathType
|
||||
pubkeys = []
|
||||
offset = 1
|
||||
while True:
|
||||
pubkey_len = script[offset]
|
||||
if pubkey_len != 33:
|
||||
break
|
||||
offset += 1
|
||||
key = script[offset:offset + 33]
|
||||
offset += 33
|
||||
|
||||
hd_node = proto.HDNodeType(depth=0, fingerprint=0, child_num=0, chain_code=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', public_key=key)
|
||||
pubkeys.append(proto.HDNodePathType(node=hd_node, address_n=[]))
|
||||
|
||||
# Check things at the end
|
||||
n = script[offset] - 80
|
||||
if n != len(pubkeys):
|
||||
return (False, None)
|
||||
offset += 1
|
||||
op_cms = script[offset]
|
||||
if op_cms != 174:
|
||||
return (False, None)
|
||||
|
||||
# Build MultisigRedeemScriptType and return it
|
||||
multisig = proto.MultisigRedeemScriptType(m=m, signatures=[b''] * n, pubkeys=pubkeys)
|
||||
return (True, multisig)
|
||||
|
||||
# Doesn't init device
|
||||
class NoInitMixin(ProtocolMixin):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def init_device(self):
|
||||
pass
|
||||
|
||||
def actual_init_device(self):
|
||||
return super().init_device()
|
||||
|
||||
class KeepKey(NoInitMixin, TextUIMixin, BaseClient):
|
||||
pass
|
||||
|
||||
class KeepKeyDebug(NoInitMixin, DebugLinkMixin, DebugWireMixin, BaseClient):
|
||||
pass
|
||||
|
||||
def keepkey_exception(f):
|
||||
def func(*args, **kwargs):
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except ValueError as e:
|
||||
raise BadArgumentError(str(e))
|
||||
except OSError as e:
|
||||
raise DeviceConnectionError(str(e))
|
||||
return func
|
||||
|
||||
# This class extends the HardwareWalletClient for Digital Bitbox specific things
|
||||
class KeepkeyClient(HardwareWalletClient):
|
||||
|
||||
class KeepkeyClient(TrezorClient):
|
||||
def __init__(self, path, password=''):
|
||||
super(KeepkeyClient, self).__init__(path, password)
|
||||
if path.startswith('hid:'):
|
||||
path = path[4:]
|
||||
transport = HidTransport((path.encode(), None))
|
||||
self.client = KeepKey(transport)
|
||||
elif path.startswith('udp:'):
|
||||
path = path[4:]
|
||||
transport = UDPTransport(path)
|
||||
# Use the debug client for the simulator
|
||||
self.client = KeepKeyDebug(transport)
|
||||
# Get the debug link
|
||||
ip, port = path.split(':')
|
||||
new_port = int(port) + 1
|
||||
debug_transport = UDPTransport('{}:{}'.format(ip, new_port))
|
||||
self.client.set_debuglink(debug_transport)
|
||||
else:
|
||||
raise IOError('Unknown device transport')
|
||||
|
||||
# if it wasn't able to find a client, throw an error
|
||||
if not self.client:
|
||||
raise IOError("no Device")
|
||||
|
||||
self.password = password
|
||||
os.environ['PASSPHRASE'] = password
|
||||
|
||||
def _check_unlocked(self):
|
||||
self.client.actual_init_device()
|
||||
if self.client.features.pin_protection and not self.client.features.pin_cached:
|
||||
raise DeviceNotReadyError('Keepkey is locked. Unlock by using \'promptpin\' and then \'sendpin\'.')
|
||||
|
||||
# Must return a dict with the xpub
|
||||
# Retrieves the public key at the specified BIP 32 derivation path
|
||||
@keepkey_exception
|
||||
def get_pubkey_at_path(self, path):
|
||||
self._check_unlocked()
|
||||
path = path.replace('h', '\'')
|
||||
path = path.replace('H', '\'')
|
||||
expanded_path = tools.parse_path(path)
|
||||
output = self.client.get_public_node(expanded_path)
|
||||
if self.is_testnet:
|
||||
return {'xpub':xpub_main_2_test(output.xpub)}
|
||||
else:
|
||||
return {'xpub':output.xpub}
|
||||
|
||||
# Must return a hex string with the signed transaction
|
||||
# The tx must be in the combined unsigned transaction format
|
||||
@keepkey_exception
|
||||
def sign_tx(self, tx):
|
||||
self._check_unlocked()
|
||||
|
||||
# Get this devices master key fingerprint
|
||||
master_key = self.client.get_public_node([0])
|
||||
master_fp = get_xpub_fingerprint(master_key.xpub)
|
||||
|
||||
# Do multiple passes for multisig
|
||||
passes = 1
|
||||
p = 0
|
||||
|
||||
while p < passes:
|
||||
# Prepare inputs
|
||||
inputs = []
|
||||
to_ignore = []
|
||||
for input_num, (psbt_in, txin) in py_enumerate(list(zip(tx.inputs, tx.tx.vin))):
|
||||
txinputtype = proto.TxInputType()
|
||||
|
||||
# Set the input stuff
|
||||
txinputtype.prev_hash = ser_uint256(txin.prevout.hash)[::-1]
|
||||
txinputtype.prev_index = txin.prevout.n
|
||||
txinputtype.sequence = txin.nSequence
|
||||
|
||||
# Detrermine spend type
|
||||
scriptcode = b''
|
||||
if psbt_in.non_witness_utxo:
|
||||
utxo = psbt_in.non_witness_utxo.vout[txin.prevout.n]
|
||||
txinputtype.script_type = proto.SPENDADDRESS
|
||||
scriptcode = utxo.scriptPubKey
|
||||
txinputtype.amount = psbt_in.non_witness_utxo.vout[txin.prevout.n].nValue
|
||||
elif psbt_in.witness_utxo:
|
||||
utxo = psbt_in.witness_utxo
|
||||
# Check if the output is p2sh
|
||||
if psbt_in.witness_utxo.is_p2sh():
|
||||
txinputtype.script_type = proto.SPENDP2SHWITNESS
|
||||
else:
|
||||
txinputtype.script_type = proto.SPENDWITNESS
|
||||
scriptcode = psbt_in.witness_utxo.scriptPubKey
|
||||
txinputtype.amount = psbt_in.witness_utxo.nValue
|
||||
|
||||
# Set the script
|
||||
if psbt_in.witness_script:
|
||||
scriptcode = psbt_in.witness_script
|
||||
elif psbt_in.redeem_script:
|
||||
scriptcode = psbt_in.redeem_script
|
||||
|
||||
def ignore_input():
|
||||
txinputtype.address_n.extend([0x80000000])
|
||||
txinputtype.ClearField('multisig')
|
||||
txinputtype.script_type = proto.SPENDWITNESS
|
||||
inputs.append(txinputtype)
|
||||
to_ignore.append(input_num)
|
||||
|
||||
# Check for multisig
|
||||
is_ms, multisig = parse_multisig(scriptcode)
|
||||
if is_ms:
|
||||
# Add to txinputtype
|
||||
txinputtype.multisig.CopyFrom(multisig)
|
||||
if psbt_in.non_witness_utxo:
|
||||
if utxo.is_p2sh:
|
||||
txinputtype.script_type = proto.SPENDMULTISIG
|
||||
else:
|
||||
# Cannot sign bare multisig, ignore it
|
||||
ignore_input()
|
||||
continue
|
||||
elif not is_ms and psbt_in.non_witness_utxo and not utxo.is_p2pkh:
|
||||
# Cannot sign unknown spk, ignore it
|
||||
ignore_input()
|
||||
continue
|
||||
elif not is_ms and psbt_in.witness_utxo and psbt_in.witness_script:
|
||||
# Cannot sign unknown witness script, ignore it
|
||||
ignore_input()
|
||||
continue
|
||||
|
||||
# Find key to sign with
|
||||
found = False
|
||||
our_keys = 0
|
||||
for key in psbt_in.hd_keypaths.keys():
|
||||
keypath = psbt_in.hd_keypaths[key]
|
||||
if keypath[0] == master_fp and key not in psbt_in.partial_sigs:
|
||||
if not found:
|
||||
txinputtype.address_n.extend(keypath[1:])
|
||||
found = True
|
||||
our_keys += 1
|
||||
|
||||
# Determine if we need to do more passes to sign everything
|
||||
if our_keys > passes:
|
||||
passes = our_keys
|
||||
|
||||
if not found:
|
||||
# This input is not one of ours
|
||||
ignore_input()
|
||||
continue
|
||||
|
||||
# append to inputs
|
||||
inputs.append(txinputtype)
|
||||
|
||||
# address version byte
|
||||
if self.is_testnet:
|
||||
p2pkh_version = b'\x6f'
|
||||
p2sh_version = b'\xc4'
|
||||
bech32_hrp = 'tb'
|
||||
else:
|
||||
p2pkh_version = b'\x00'
|
||||
p2sh_version = b'\x05'
|
||||
bech32_hrp = 'bc'
|
||||
|
||||
# prepare outputs
|
||||
outputs = []
|
||||
for out in tx.tx.vout:
|
||||
txoutput = proto.TxOutputType()
|
||||
txoutput.amount = out.nValue
|
||||
txoutput.script_type = proto.PAYTOADDRESS
|
||||
if out.is_p2pkh():
|
||||
txoutput.address = to_address(out.scriptPubKey[3:23], p2pkh_version)
|
||||
txoutput.script_type = 0
|
||||
elif out.is_p2sh():
|
||||
txoutput.address = to_address(out.scriptPubKey[2:22], p2sh_version)
|
||||
txoutput.script_type = 1
|
||||
else:
|
||||
wit, ver, prog = out.is_witness()
|
||||
if wit:
|
||||
txoutput.address = bech32.encode(bech32_hrp, ver, prog)
|
||||
else:
|
||||
raise BadArgumentError("Output is not an address")
|
||||
|
||||
# append to outputs
|
||||
outputs.append(txoutput)
|
||||
|
||||
# Sign the transaction
|
||||
self.client.set_tx_api(TxAPIPSBT(tx))
|
||||
if self.is_testnet:
|
||||
signed_tx = self.client.sign_tx("Testnet", inputs, outputs, tx.tx.nVersion, tx.tx.nLockTime)
|
||||
else:
|
||||
signed_tx = self.client.sign_tx("Bitcoin", inputs, outputs, tx.tx.nVersion, tx.tx.nLockTime)
|
||||
|
||||
# Each input has one signature
|
||||
for input_num, (psbt_in, sig) in py_enumerate(list(zip(tx.inputs, signed_tx[0]))):
|
||||
if input_num in to_ignore:
|
||||
continue
|
||||
for pubkey in psbt_in.hd_keypaths.keys():
|
||||
fp = psbt_in.hd_keypaths[pubkey][0]
|
||||
if fp == master_fp and pubkey not in psbt_in.partial_sigs:
|
||||
psbt_in.partial_sigs[pubkey] = sig + b'\x01'
|
||||
break
|
||||
|
||||
p += 1
|
||||
|
||||
return {'psbt':tx.serialize()}
|
||||
|
||||
# Must return a base64 encoded string with the signed message
|
||||
# The message can be any string
|
||||
@keepkey_exception
|
||||
def sign_message(self, message, keypath):
|
||||
self._check_unlocked()
|
||||
keypath = keypath.replace('h', '\'')
|
||||
keypath = keypath.replace('H', '\'')
|
||||
expanded_path = tools.parse_path(keypath)
|
||||
result = self.client.sign_message('Bitcoin', expanded_path, message)
|
||||
return {'signature': base64.b64encode(result.signature).decode('utf-8')}
|
||||
|
||||
# Display address of specified type on the device. Only supports single-key based addresses.
|
||||
@keepkey_exception
|
||||
def display_address(self, keypath, p2sh_p2wpkh, bech32):
|
||||
self._check_unlocked()
|
||||
keypath = keypath.replace('h', '\'')
|
||||
keypath = keypath.replace('H', '\'')
|
||||
expanded_path = tools.parse_path(keypath)
|
||||
address = self.client.get_address(
|
||||
"Testnet" if self.is_testnet else "Bitcoin",
|
||||
expanded_path,
|
||||
show_display=True,
|
||||
script_type=proto.SPENDWITNESS if bech32 else (proto.SPENDP2SHWITNESS if p2sh_p2wpkh else proto.SPENDADDRESS)
|
||||
)
|
||||
return {'address': address}
|
||||
|
||||
# Setup a new device
|
||||
@keepkey_exception
|
||||
def setup_device(self, label='', passphrase=''):
|
||||
self.client.actual_init_device()
|
||||
if self.client.features.initialized:
|
||||
raise DeviceAlreadyInitError('Device is already initialized. Use wipe first and try again')
|
||||
self.client.reset_device(False, 256, bool(self.password), True, label, 'english')
|
||||
return {'success': True}
|
||||
|
||||
# Wipe this device
|
||||
@keepkey_exception
|
||||
def wipe_device(self):
|
||||
self._check_unlocked()
|
||||
self.client.wipe_device()
|
||||
return {'success': True}
|
||||
|
||||
# Restore device from mnemonic or xprv
|
||||
@keepkey_exception
|
||||
def restore_device(self, label=''):
|
||||
self.client.recovery_device(False, 24, bool(self.password), True, label, 'english')
|
||||
return {'success': True}
|
||||
|
||||
# Begin backup process
|
||||
@keepkey_exception
|
||||
def backup_device(self, label='', passphrase=''):
|
||||
raise UnavailableActionError('The Keepkey does not support creating a backup via software')
|
||||
|
||||
# Close the device
|
||||
def close(self):
|
||||
self.client.close()
|
||||
|
||||
# Prompt for a pin on device
|
||||
@keepkey_exception
|
||||
def prompt_pin(self):
|
||||
self.client.actual_init_device()
|
||||
if not self.client.features.pin_protection:
|
||||
raise DeviceAlreadyUnlockedError('This device does not need a PIN')
|
||||
if self.client.features.pin_cached:
|
||||
raise DeviceAlreadyUnlockedError('The PIN has already been sent to this device')
|
||||
print('Use \'sendpin\' to provide the number positions for the PIN as displayed on your device\'s screen', file=sys.stderr)
|
||||
print(PIN_MATRIX_DESCRIPTION, file=sys.stderr)
|
||||
self.client.call_raw(messages.Ping(message=b'ping', button_protection=False, pin_protection=True, passphrase_protection=False))
|
||||
return {'success': True}
|
||||
|
||||
# Send the pin
|
||||
@keepkey_exception
|
||||
def send_pin(self, pin):
|
||||
if not pin.isdigit():
|
||||
raise BadArgumentError("Non-numeric PIN provided")
|
||||
resp = self.client.call_raw(messages.PinMatrixAck(pin=pin))
|
||||
if isinstance(resp, messages.Failure):
|
||||
self.client.features = self.client.call_raw(messages.GetFeatures())
|
||||
if isinstance(self.client.features, messages.Features):
|
||||
if not self.client.features.pin_protection:
|
||||
raise DeviceAlreadyUnlockedError('This device does not need a PIN')
|
||||
if self.client.features.pin_cached:
|
||||
raise DeviceAlreadyUnlockedError('The PIN has already been sent to this device')
|
||||
return {'success': False}
|
||||
return {'success': True}
|
||||
self.type = 'Keepkey'
|
||||
|
||||
def enumerate(password=''):
|
||||
results = []
|
||||
paths = []
|
||||
for d in HidTransport.enumerate():
|
||||
paths.append('hid:{}'.format(d[0].decode()))
|
||||
|
||||
# Try to open the simulator device and conenct to it
|
||||
try:
|
||||
sim_dev = UDPTransport('127.0.0.1:21324')
|
||||
sim_dev.socket.sendall(b"PINGPING")
|
||||
resp = sim_dev.socket.recv(8)
|
||||
if resp != b'PONGPONG':
|
||||
pass
|
||||
paths.append('udp:127.0.0.1:21324')
|
||||
except:
|
||||
pass
|
||||
|
||||
for path in paths:
|
||||
for dev in enumerate_devices():
|
||||
d_data = {}
|
||||
|
||||
d_data['type'] = 'keepkey'
|
||||
d_data['path'] = path
|
||||
d_data['path'] = dev.get_path()
|
||||
|
||||
client = None
|
||||
try:
|
||||
client = KeepkeyClient(path, password)
|
||||
client.client.actual_init_device()
|
||||
client = KeepkeyClient(d_data['path'], password)
|
||||
client.client.init_device()
|
||||
if not 'keepkey' in client.client.features.vendor:
|
||||
continue
|
||||
if client.client.features.initialized:
|
||||
master_xpub = client.get_pubkey_at_path('m/0h')['xpub']
|
||||
d_data['fingerprint'] = get_xpub_fingerprint_hex(master_xpub)
|
||||
else:
|
||||
d_data['error'] = 'Not initialized'
|
||||
except Exception as e:
|
||||
if str(e) == 'Unsupported device':
|
||||
continue
|
||||
d_data['error'] = "Could not open client or get fingerprint information: " + str(e)
|
||||
|
||||
if client:
|
||||
|
||||
@ -86,12 +86,12 @@ class TrezorClient(HardwareWalletClient):
|
||||
raise IOError("no Device")
|
||||
|
||||
self.password = password
|
||||
self.client.open()
|
||||
self.type = 'Trezor'
|
||||
|
||||
def _check_unlocked(self):
|
||||
self.client.init_device()
|
||||
if self.client.features.pin_protection and not self.client.features.pin_cached:
|
||||
raise DeviceNotReadyError('Trezor is locked. Unlock by using \'promptpin\' and then \'sendpin\'.')
|
||||
raise DeviceNotReadyError('{} is locked. Unlock by using \'promptpin\' and then \'sendpin\'.'.format(self.type))
|
||||
|
||||
# Must return a dict with the xpub
|
||||
# Retrieves the public key at the specified BIP 32 derivation path
|
||||
@ -338,7 +338,7 @@ class TrezorClient(HardwareWalletClient):
|
||||
|
||||
# Begin backup process
|
||||
def backup_device(self, label='', passphrase=''):
|
||||
raise UnavailableActionError('The Trezor does not support creating a backup via software')
|
||||
raise UnavailableActionError('The {} does not support creating a backup via software'.format(self.type))
|
||||
|
||||
# Close the device
|
||||
@trezor_exception
|
||||
@ -348,6 +348,7 @@ class TrezorClient(HardwareWalletClient):
|
||||
# Prompt for a pin on device
|
||||
@trezor_exception
|
||||
def prompt_pin(self):
|
||||
self.client.open()
|
||||
self.client.init_device()
|
||||
if not self.client.features.pin_protection:
|
||||
raise DeviceAlreadyUnlockedError('This device does not need a PIN')
|
||||
@ -361,16 +362,17 @@ class TrezorClient(HardwareWalletClient):
|
||||
# Send the pin
|
||||
@trezor_exception
|
||||
def send_pin(self, pin):
|
||||
self.client.features = self.client.call_raw(proto.GetFeatures())
|
||||
if isinstance(self.client.features, proto.Features):
|
||||
if not self.client.features.pin_protection:
|
||||
raise DeviceAlreadyUnlockedError('This device does not need a PIN')
|
||||
if self.client.features.pin_cached:
|
||||
raise DeviceAlreadyUnlockedError('The PIN has already been sent to this device')
|
||||
self.client.open()
|
||||
if not pin.isdigit():
|
||||
raise BadArgumentError("Non-numeric PIN provided")
|
||||
resp = self.client.call_raw(proto.PinMatrixAck(pin=pin))
|
||||
if isinstance(resp, proto.Failure):
|
||||
self.client.features = self.client.call_raw(proto.GetFeatures())
|
||||
if isinstance(self.client.features, proto.Features):
|
||||
if not self.client.features.pin_protection:
|
||||
raise DeviceAlreadyUnlockedError('This device does not need a PIN')
|
||||
if self.client.features.pin_cached:
|
||||
raise DeviceAlreadyUnlockedError('The PIN has already been sent to this device')
|
||||
return {'success': False}
|
||||
return {'success': True}
|
||||
|
||||
@ -386,14 +388,13 @@ def enumerate(password=''):
|
||||
try:
|
||||
client = TrezorClient(d_data['path'], password)
|
||||
client.client.init_device()
|
||||
if not 'trezor' in client.client.features.vendor:
|
||||
continue
|
||||
if client.client.features.initialized:
|
||||
master_xpub = client.get_pubkey_at_path('m/0h')['xpub']
|
||||
d_data['fingerprint'] = get_xpub_fingerprint_hex(master_xpub)
|
||||
else:
|
||||
d_data['error'] = 'Not initialized'
|
||||
except TypeError as e:
|
||||
if dev.get_path().startswith('udp:'):
|
||||
continue
|
||||
except Exception as e:
|
||||
d_data['error'] = "Could not open client or get fingerprint information: " + str(e)
|
||||
|
||||
|
||||
@ -10,3 +10,4 @@ This stripped down version was made at commit [d5c2636f0d1b7da3cb94a4eff6169d77f
|
||||
- Include the compiled protobuf definitions instead of making them on install
|
||||
- Removed functions that HWI does not use or plan to use
|
||||
- Changed `TrezorClient` from calling `init_device()` (HWI needs this behavior and doing it in the library makes this simpler)
|
||||
- Add Keepkey support. Some fields of some messages had to be removed to support both the Keepkey and the Trezor in the same library
|
||||
|
||||
@ -4,5 +4,6 @@ __version__ = "0.11.1"
|
||||
MINIMUM_FIRMWARE_VERSION = {
|
||||
"1": (1, 6, 1),
|
||||
"T": (2, 0, 10),
|
||||
"K1-14AM": (0, 0, 0)
|
||||
}
|
||||
# fmt: on
|
||||
|
||||
@ -27,7 +27,7 @@ if sys.version_info.major < 3:
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
VENDORS = ("bitcointrezor.com", "trezor.io")
|
||||
VENDORS = ("bitcointrezor.com", "trezor.io", "keepkey.com")
|
||||
MAX_PASSPHRASE_LENGTH = 50
|
||||
|
||||
DEPRECATION_ERROR = """
|
||||
|
||||
@ -156,9 +156,9 @@ def reset(
|
||||
pin_protection=True,
|
||||
label=None,
|
||||
language="english",
|
||||
u2f_counter=0,
|
||||
skip_backup=False,
|
||||
no_backup=False,
|
||||
# u2f_counter=0,
|
||||
# skip_backup=False,
|
||||
# no_backup=False,
|
||||
):
|
||||
if client.features.initialized:
|
||||
raise RuntimeError(
|
||||
@ -179,9 +179,9 @@ def reset(
|
||||
pin_protection=bool(pin_protection),
|
||||
language=language,
|
||||
label=label,
|
||||
u2f_counter=u2f_counter,
|
||||
skip_backup=bool(skip_backup),
|
||||
no_backup=bool(no_backup),
|
||||
# u2f_counter=u2f_counter,
|
||||
# skip_backup=bool(skip_backup),
|
||||
# no_backup=bool(no_backup),
|
||||
)
|
||||
|
||||
resp = client.call(msg)
|
||||
|
||||
@ -20,7 +20,7 @@ class DebugLinkState(p.MessageType):
|
||||
reset_entropy: bytes = None,
|
||||
recovery_fake_word: str = None,
|
||||
recovery_word_pos: int = None,
|
||||
reset_word_pos: int = None,
|
||||
# reset_word_pos: int = None,
|
||||
) -> None:
|
||||
self.layout = layout
|
||||
self.pin = pin
|
||||
@ -32,7 +32,7 @@ class DebugLinkState(p.MessageType):
|
||||
self.reset_entropy = reset_entropy
|
||||
self.recovery_fake_word = recovery_fake_word
|
||||
self.recovery_word_pos = recovery_word_pos
|
||||
self.reset_word_pos = reset_word_pos
|
||||
# self.reset_word_pos = reset_word_pos
|
||||
|
||||
@classmethod
|
||||
def get_fields(cls):
|
||||
@ -47,5 +47,5 @@ class DebugLinkState(p.MessageType):
|
||||
8: ('reset_entropy', p.BytesType, 0),
|
||||
9: ('recovery_fake_word', p.UnicodeType, 0),
|
||||
10: ('recovery_word_pos', p.UVarintType, 0),
|
||||
11: ('reset_word_pos', p.UVarintType, 0),
|
||||
# 11: ('reset_word_pos', p.UVarintType, 0),
|
||||
}
|
||||
|
||||
@ -83,15 +83,15 @@ class Features(p.MessageType):
|
||||
15: ('imported', p.BoolType, 0),
|
||||
16: ('pin_cached', p.BoolType, 0),
|
||||
17: ('passphrase_cached', p.BoolType, 0),
|
||||
18: ('firmware_present', p.BoolType, 0),
|
||||
19: ('needs_backup', p.BoolType, 0),
|
||||
20: ('flags', p.UVarintType, 0),
|
||||
# 18: ('firmware_present', p.BoolType, 0),
|
||||
# 19: ('needs_backup', p.BoolType, 0),
|
||||
# 20: ('flags', p.UVarintType, 0),
|
||||
21: ('model', p.UnicodeType, 0),
|
||||
22: ('fw_major', p.UVarintType, 0),
|
||||
23: ('fw_minor', p.UVarintType, 0),
|
||||
24: ('fw_patch', p.UVarintType, 0),
|
||||
25: ('fw_vendor', p.UnicodeType, 0),
|
||||
26: ('fw_vendor_keys', p.BytesType, 0),
|
||||
27: ('unfinished_backup', p.BoolType, 0),
|
||||
28: ('no_backup', p.BoolType, 0),
|
||||
# 22: ('fw_major', p.UVarintType, 0),
|
||||
# 23: ('fw_minor', p.UVarintType, 0),
|
||||
# 24: ('fw_patch', p.UVarintType, 0),
|
||||
# 25: ('fw_vendor', p.UnicodeType, 0),
|
||||
# 26: ('fw_vendor_keys', p.BytesType, 0),
|
||||
# 27: ('unfinished_backup', p.BoolType, 0),
|
||||
# 28: ('no_backup', p.BoolType, 0),
|
||||
}
|
||||
|
||||
@ -14,9 +14,9 @@ class ResetDevice(p.MessageType):
|
||||
pin_protection: bool = None,
|
||||
language: str = None,
|
||||
label: str = None,
|
||||
u2f_counter: int = None,
|
||||
skip_backup: bool = None,
|
||||
no_backup: bool = None,
|
||||
# u2f_counter: int = None,
|
||||
# skip_backup: bool = None,
|
||||
# no_backup: bool = None,
|
||||
) -> None:
|
||||
self.display_random = display_random
|
||||
self.strength = strength
|
||||
@ -24,9 +24,9 @@ class ResetDevice(p.MessageType):
|
||||
self.pin_protection = pin_protection
|
||||
self.language = language
|
||||
self.label = label
|
||||
self.u2f_counter = u2f_counter
|
||||
self.skip_backup = skip_backup
|
||||
self.no_backup = no_backup
|
||||
# self.u2f_counter = u2f_counter
|
||||
# self.skip_backup = skip_backup
|
||||
# self.no_backup = no_backup
|
||||
|
||||
@classmethod
|
||||
def get_fields(cls):
|
||||
@ -37,7 +37,7 @@ class ResetDevice(p.MessageType):
|
||||
4: ('pin_protection', p.BoolType, 0),
|
||||
5: ('language', p.UnicodeType, 0), # default=english
|
||||
6: ('label', p.UnicodeType, 0),
|
||||
7: ('u2f_counter', p.UVarintType, 0),
|
||||
8: ('skip_backup', p.BoolType, 0),
|
||||
9: ('no_backup', p.BoolType, 0),
|
||||
# 7: ('u2f_counter', p.UVarintType, 0),
|
||||
# 8: ('skip_backup', p.BoolType, 0),
|
||||
# 9: ('no_backup', p.BoolType, 0),
|
||||
}
|
||||
|
||||
@ -26,8 +26,9 @@ LOG = logging.getLogger(__name__)
|
||||
DEV_TREZOR1 = (0x534C, 0x0001)
|
||||
DEV_TREZOR2 = (0x1209, 0x53C1)
|
||||
DEV_TREZOR2_BL = (0x1209, 0x53C0)
|
||||
DEV_KEEPKEY = (0x2B24, 0x0001)
|
||||
|
||||
TREZORS = {DEV_TREZOR1, DEV_TREZOR2, DEV_TREZOR2_BL}
|
||||
TREZORS = {DEV_TREZOR1, DEV_TREZOR2, DEV_TREZOR2_BL, DEV_KEEPKEY}
|
||||
|
||||
UDEV_RULES_STR = """
|
||||
Do you have udev rules installed?
|
||||
|
||||
@ -19,7 +19,7 @@ import sys
|
||||
import time
|
||||
from typing import Any, Dict, Iterable
|
||||
|
||||
from . import DEV_TREZOR1, UDEV_RULES_STR, TransportException
|
||||
from . import DEV_TREZOR1, DEV_KEEPKEY, UDEV_RULES_STR, TransportException
|
||||
from .protocol import ProtocolBasedTransport, ProtocolV1
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -130,7 +130,7 @@ class HidTransport(ProtocolBasedTransport):
|
||||
devices = []
|
||||
for dev in hid.enumerate(0, 0):
|
||||
usb_id = (dev["vendor_id"], dev["product_id"])
|
||||
if usb_id != DEV_TREZOR1:
|
||||
if usb_id != DEV_TREZOR1 and usb_id != DEV_KEEPKEY:
|
||||
continue
|
||||
if debug:
|
||||
if not is_debuglink(dev):
|
||||
|
||||
1
setup.py
1
setup.py
@ -16,7 +16,6 @@ setuptools.setup(
|
||||
install_requires=[
|
||||
'hidapi', # HID API needed in general
|
||||
'btchip-python', # Ledger Nano S
|
||||
'keepkey>=6.0.1', # KeepKey
|
||||
'ckcc-protocol[cli]', # Coldcard
|
||||
'pyaes',
|
||||
'ecdsa', # Needed for Ledger but their library does not install it
|
||||
|
||||
@ -3,7 +3,6 @@
|
||||
import argparse
|
||||
import atexit
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import subprocess
|
||||
@ -11,9 +10,11 @@ import sys
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from keepkeylib.transport_udp import UDPTransport
|
||||
from keepkeylib.client import KeepKeyDebugClient
|
||||
from keepkeylib import messages_pb2 as messages
|
||||
from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException
|
||||
from hwilib.devices.trezorlib.transport import enumerate_devices
|
||||
from hwilib.devices.trezorlib.transport.udp import UdpTransport
|
||||
from hwilib.devices.trezorlib.debuglink import TrezorClientDebugLink, load_device_by_mnemonic, load_device_by_xprv
|
||||
from hwilib.devices.trezorlib import device, messages
|
||||
from test_device import DeviceEmulator, DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignMessage, TestSignTx
|
||||
|
||||
from hwilib.cli import process_commands
|
||||
@ -21,17 +22,16 @@ from hwilib.devices.keepkey import KeepkeyClient
|
||||
|
||||
from types import MethodType
|
||||
|
||||
def pin_matrix(self, code=None):
|
||||
def get_pin(self, code=None):
|
||||
if self.pin:
|
||||
pin = self.debug.encode_pin(self.pin)
|
||||
return self.debuglink.encode_pin(self.pin)
|
||||
else:
|
||||
pin = self.debug.read_pin_encoded()
|
||||
return messages.PinMatrixAck(pin=pin)
|
||||
return self.debuglink.read_pin_encoded()
|
||||
|
||||
class KeepkeyEmulator(DeviceEmulator):
|
||||
def __init__(self, emulator_path):
|
||||
def __init__(self, path):
|
||||
self.emulator_path = path
|
||||
self.emulator_proc = None
|
||||
self.emulator_path = emulator_path
|
||||
|
||||
def start(self):
|
||||
# Start the Keepkey emulator
|
||||
@ -50,25 +50,21 @@ class KeepkeyEmulator(DeviceEmulator):
|
||||
except Exception:
|
||||
time.sleep(0.05)
|
||||
|
||||
# Redirect stdout to /dev/null as the keepkey lib kind of spammy
|
||||
sys.stdout = open(os.devnull, 'w')
|
||||
|
||||
# Setup the emulator
|
||||
sim_dev = UDPTransport('127.0.0.1:21324')
|
||||
sim_dev.buffer = b'' # HACK to work around a bug in the keepkey library
|
||||
sim_dev_debug = UDPTransport('127.0.0.1:21325')
|
||||
sim_dev_debug.buffer = b'' # HACK to work around a bug in the keepkey library
|
||||
client = KeepKeyDebugClient(sim_dev)
|
||||
client.set_debuglink(sim_dev_debug)
|
||||
client.wipe_device()
|
||||
client.load_device_by_mnemonic(mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='', passphrase_protection=False, label='test', language='english') # From Trezor device tests
|
||||
for dev in enumerate_devices():
|
||||
# Find the udp transport, that's the emulator
|
||||
if isinstance(dev, UdpTransport):
|
||||
wirelink = dev
|
||||
break
|
||||
client = TrezorClientDebugLink(wirelink)
|
||||
client.init_device()
|
||||
device.wipe(client)
|
||||
load_device_by_mnemonic(client=client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='', passphrase_protection=False, label='test') # From Trezor device tests
|
||||
return client
|
||||
|
||||
def stop(self):
|
||||
self.emulator_proc.kill()
|
||||
self.emulator_proc.wait()
|
||||
# Redirect stdout back to stdout
|
||||
sys.stdout = sys.__stdout__
|
||||
|
||||
class KeepkeyTestCase(unittest.TestCase):
|
||||
def __init__(self, emulator, methodName='runTest'):
|
||||
@ -97,15 +93,15 @@ class TestKeepkeyGetxpub(KeepkeyTestCase):
|
||||
|
||||
def tearDown(self):
|
||||
self.emulator.stop()
|
||||
|
||||
|
||||
def test_getxpub(self):
|
||||
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/bip32_vectors.json'), encoding='utf-8') as f:
|
||||
vectors = json.load(f)
|
||||
for vec in vectors:
|
||||
with self.subTest(vector=vec):
|
||||
# Setup with xprv
|
||||
self.client.wipe_device()
|
||||
self.client.load_device_by_xprv(xprv=vec['xprv'], pin='', passphrase_protection=False, label='test', language='english')
|
||||
device.wipe(self.client)
|
||||
load_device_by_xprv(client=self.client, xprv=vec['xprv'], pin='', passphrase_protection=False, label='test', language='english')
|
||||
|
||||
# Test getmasterxpub
|
||||
gmxp_res = process_commands(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getmasterxpub'])
|
||||
@ -116,7 +112,7 @@ class TestKeepkeyGetxpub(KeepkeyTestCase):
|
||||
gxp_res = process_commands(['-t', 'keepkey', '-d', 'udp:127.0.0.1:21324', 'getxpub', path_vec['path']])
|
||||
self.assertEqual(gxp_res['xpub'], path_vec['xpub'])
|
||||
|
||||
# Trezor specific management (setup, wipe, restore, backup, promptpin, sendpin) command tests
|
||||
# Keepkey specific management (setup, wipe, restore, backup, promptpin, sendpin) command tests
|
||||
class TestKeepkeyManCommands(KeepkeyTestCase):
|
||||
def setUp(self):
|
||||
self.client = self.emulator.start()
|
||||
@ -136,10 +132,10 @@ class TestKeepkeyManCommands(KeepkeyTestCase):
|
||||
self.assertTrue(result['success'])
|
||||
|
||||
# Setup
|
||||
k_client = KeepkeyClient('udp:127.0.0.1:21324', 'test')
|
||||
k_client.client.callback_PinMatrixRequest = MethodType(pin_matrix, k_client.client)
|
||||
k_client.client.pin = '1234'
|
||||
result = k_client.setup_device()
|
||||
t_client = KeepkeyClient('udp:127.0.0.1:21324', 'test')
|
||||
t_client.client.ui.get_pin = MethodType(get_pin, t_client.client.ui)
|
||||
t_client.client.ui.pin = '1234'
|
||||
result = t_client.setup_device()
|
||||
self.assertTrue(result['success'])
|
||||
|
||||
# Make sure device is init, setup should fail
|
||||
@ -164,13 +160,13 @@ class TestKeepkeyManCommands(KeepkeyTestCase):
|
||||
self.assertEqual(result['code'], -11)
|
||||
|
||||
# Set a PIN
|
||||
self.client.wipe_device()
|
||||
self.client.load_device_by_mnemonic(mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='1234', passphrase_protection=False, label='test', language='english')
|
||||
device.wipe(self.client)
|
||||
load_device_by_mnemonic(client=self.client, mnemonic='alcohol woman abuse must during monitor noble actual mixed trade anger aisle', pin='1234', passphrase_protection=False, label='test')
|
||||
self.client.call(messages.ClearSession())
|
||||
result = process_commands(self.dev_args + ['promptpin'])
|
||||
self.assertTrue(result['success'])
|
||||
|
||||
# Invalid pin
|
||||
# Invalid pins
|
||||
result = process_commands(self.dev_args + ['sendpin', 'notnum'])
|
||||
self.assertEqual(result['error'], 'Non-numeric PIN provided')
|
||||
self.assertEqual(result['code'], -7)
|
||||
@ -189,6 +185,7 @@ class TestKeepkeyManCommands(KeepkeyTestCase):
|
||||
self.assertTrue(result['success'])
|
||||
|
||||
# Send the PIN
|
||||
self.client.open()
|
||||
pin = self.client.debug.encode_pin('1234')
|
||||
result = process_commands(self.dev_args + ['sendpin', pin])
|
||||
self.assertTrue(result['success'])
|
||||
|
||||
Loading…
Reference in New Issue
Block a user