Refactor to make hwilib more like a library

Moves the command argument parsing to hwi.py. Instead of each function
taking args and client, they now take client and whatever else they
need. hwi.py handles the conversion from args and client to the actual
arguments.
This commit is contained in:
Andrew Chow 2019-01-06 12:59:26 -05:00
parent 486d2fa367
commit 2ddd4ed41a
9 changed files with 202 additions and 174 deletions

9
hwi.py
View File

@ -2,11 +2,6 @@
# Hardware wallet interaction script
from hwilib.commands import process_commands
from hwilib.cli import main
import sys
import json
if __name__ == '__main__':
result = process_commands(sys.argv[1:])
print(json.dumps(result))
main()

162
hwilib/cli.py Normal file
View File

@ -0,0 +1,162 @@
#! /usr/bin/env python3
from .commands import backup_device, displayaddress, enumerate, find_device, \
get_client, getmasterxpub, getxpub, getkeypool, restore_device, setup_device, \
signmessage, signtx, wipe_device, NO_DEVICE_PATH, DEVICE_CONN_ERROR, NO_PASSWORD, \
UNKNWON_DEVICE_TYPE
import argparse
import getpass
import logging
import json
import sys
def backup_device_handler(args, client):
return backup_device(client, label=args.label, backup_passphrase=args.backup_passphrase)
def displayaddress_handler(args, client):
return displayaddress(client, path=args.path, sh_wpkh=args.sh_wpkh, wpkh=args.wpkh)
def enumerate_handler(args):
return enumerate(password=args.password)
def getmasterxpub_handler(args, client):
return getmasterxpub(client)
def getxpub_handler(args, client):
return getxpub(client, path=args.path)
def getkeypool_handler(args, client):
return getkeypool(client, path=args.path, start=args.start, end=args.end, internal=args.internal, keypool=args.keypool, account=args.account, sh_wpkh=args.sh_wpkh, wpkh=args.wpkh)
def restore_device_handler(args, client):
return restore_device(client, label=args.label)
def setup_device_handler(args, client):
return setup_device(client, label=args.label, backup_passphrase=args.backup_passphrase)
def signmessage_handler(args, client):
return signmessage(client, message=args.message, path=args.path)
def signtx_handler(args, client):
return signtx(client, psbt=args.psbt)
def wipe_device_handler(args, client):
return wipe_device(client)
def process_commands(args):
parser = argparse.ArgumentParser(description='Access and send commands to a hardware wallet device. Responses are in JSON format')
parser.add_argument('--device-path', '-d', help='Specify the device path of the device to connect to')
parser.add_argument('--device-type', '-t', help='Specify the type of device that will be connected. If `--device-path` not given, the first device of this type enumerated is used.')
parser.add_argument('--password', '-p', help='Device password if it has one (e.g. DigitalBitbox)', default='')
parser.add_argument('--stdinpass', help='Enter the device password on the command line', action='store_true')
parser.add_argument('--testnet', help='Use testnet prefixes', action='store_true')
parser.add_argument('--debug', help='Print debug statements', action='store_true')
parser.add_argument('--fingerprint', '-f', help='Specify the device to connect to using the first 4 bytes of the hash160 of the master public key. It will connect to the first device that matches this fingerprint.')
subparsers = parser.add_subparsers(description='Commands', dest='command')
# work-around to make subparser required
subparsers.required = True
enumerate_parser = subparsers.add_parser('enumerate', help='List all available devices')
enumerate_parser.set_defaults(func=enumerate_handler)
getmasterxpub_parser = subparsers.add_parser('getmasterxpub', help='Get the extended public key at m/44\'/0\'/0\'')
getmasterxpub_parser.set_defaults(func=getmasterxpub_handler)
signtx_parser = subparsers.add_parser('signtx', help='Sign a PSBT')
signtx_parser.add_argument('psbt', help='The Partially Signed Bitcoin Transaction to sign')
signtx_parser.set_defaults(func=signtx_handler)
getxpub_parser = subparsers.add_parser('getxpub', help='Get an extended public key')
getxpub_parser.add_argument('path', help='The BIP 32 derivation path to derive the key at')
getxpub_parser.set_defaults(func=getxpub_handler)
signmsg_parser = subparsers.add_parser('signmessage', help='Sign a message')
signmsg_parser.add_argument('message', help='The message to sign')
signmsg_parser.add_argument('path', help='The BIP 32 derivation path of the key to sign the message with')
signmsg_parser.set_defaults(func=signmessage_handler)
getkeypool_parser = subparsers.add_parser('getkeypool', help='Get JSON array of keys that can be imported to Bitcoin Core with importmulti')
getkeypool_parser.add_argument('--keypool', action='store_true', help='Indicates that the keys are to be imported to the keypool')
getkeypool_parser.add_argument('--internal', action='store_true', help='Indicates that the keys are change keys')
getkeypool_parser.add_argument('--sh_wpkh', action='store_true', help='Generate p2sh-nested segwit addresses (default path: m/49h/0h/0h/[0,1]/*)')
getkeypool_parser.add_argument('--wpkh', action='store_true', help='Generate bech32 addresses (default path: m/84h/0h/0h/[0,1]/*)')
getkeypool_parser.add_argument('--account', help='BIP43 account (default: 0)', type=int, default=0)
getkeypool_parser.add_argument('--path', help='Derivation path, default follows BIP43 convention, e.g. m/84h/0h/0h/1/* with --wpkh --internal')
getkeypool_parser.add_argument('start', type=int, help='The index to start at.')
getkeypool_parser.add_argument('end', type=int, help='The index to end at.')
getkeypool_parser.set_defaults(func=getkeypool_handler)
displayaddr_parser = subparsers.add_parser('displayaddress', help='Display an address')
displayaddr_parser.add_argument('path', help='The BIP 32 derivation path of the key embedded in the address')
displayaddr_parser.add_argument('--sh_wpkh', action='store_true', help='Display the p2sh-nested segwit address associated with this key path')
displayaddr_parser.add_argument('--wpkh', action='store_true', help='Display the bech32 version of the address associated with this key path')
displayaddr_parser.set_defaults(func=displayaddress_handler)
setupdev_parser = subparsers.add_parser('setup', help='Setup a device. Passphrase protection uses the password given by -p')
setupdev_parser.add_argument('--label', '-l', help='The name to give to the device', default='')
setupdev_parser.add_argument('--backup_passphrase', '-b', help='The passphrase to use for the backup, if applicable', default='')
setupdev_parser.set_defaults(func=setup_device_handler)
wipedev_parser = subparsers.add_parser('wipe', help='Wipe a device')
wipedev_parser.set_defaults(func=wipe_device_handler)
restore_parser = subparsers.add_parser('restore', help='Initiate the device restoring process')
restore_parser.add_argument('--label', '-l', help='The name to give to the device', default='')
restore_parser.set_defaults(func=restore_device_handler)
backup_parser = subparsers.add_parser('backup', help='Initiate the device backup creation process')
backup_parser.add_argument('--label', '-l', help='The name to give to the device', default='')
backup_parser.add_argument('--backup_passphrase', '-b', help='The passphrase to use for the backup, if applicable', default='')
backup_parser.set_defaults(func=backup_device_handler)
args = parser.parse_args(args)
device_path = args.device_path
device_type = args.device_type
password = args.password
command = args.command
# Setup debug logging
logging.basicConfig(level=logging.DEBUG if args.debug else logging.WARNING)
# Enter the password on stdin
if args.stdinpass:
password = getpass.getpass('Enter your device password: ')
args.password = password
# List all available hardware wallet devices
if command == 'enumerate':
return args.func(args)
# Auto detect if we are using fingerprint or type to identify device
if args.fingerprint or (args.device_type and not args.device_path):
client = find_device(args.device_path, args.password, args.device_type, args.fingerprint)
if not client:
return {'error':'Could not find device with specified fingerprint','code':DEVICE_CONN_ERROR}
elif args.device_type and args.device_path:
try:
client = get_client(device_type, device_path, password)
except NoPasswordError as e:
return {'error':str(e),'code':NO_PASSWORD}
except UnknownDeviceError as e:
return {'error':str(e),'code':UNKNWON_DEVICE_TYPE}
except Exception as e:
return {'error':str(e),'code':DEVICE_CONN_ERROR}
else:
return {'error':'You must specify a device type or fingerprint for all commands except enumerate','code':NO_DEVICE_PATH}
client.is_testnet = args.testnet
# Do the commands
result = args.func(args, client)
# Close the device
client.close()
return result
def main():
result = process_commands(sys.argv[1:])
print(json.dumps(result))

View File

@ -2,14 +2,8 @@
# Hardware wallet interaction script
import argparse
import hid
import json
import sys
import logging
import glob
import importlib
import getpass
from .serializations import PSBT, Base64ToHex, HexToBase64, hash160
from .base58 import xpub_to_address, xpub_to_pub_hex, get_xpub_fingerprint_as_id, get_xpub_fingerprint_hex
@ -47,7 +41,7 @@ def get_client(device_type, device_path, password=''):
return client
# Get a list of all available hardware wallets
def enumerate(args):
def enumerate(password=''):
result = []
# Gets the module names of all the files in devices/
@ -57,23 +51,22 @@ def enumerate(args):
for module in modules:
try:
imported_dev = importlib.import_module('.devices.' + module, __package__)
result.extend(imported_dev.enumerate(args.password))
result.extend(imported_dev.enumerate(password))
except ImportError as e:
pass # Ignore ImportErrors, the user may not have all device dependencies installed
return result
# Fingerprint or device type required
def find_device(args):
assert(args.device_path is None)
devices = enumerate(args)
def find_device(device_path, password='', device_type=None, fingerprint=None):
devices = enumerate(password)
for d in devices:
if args.device_type is not None and d['type'] != args.device_type:
if device_type is not None and d['type'] != device_type:
continue
try:
client = get_client(d['type'], d['path'], args.password)
client = get_client(d['type'], d['path'], password)
master_xpub = client.get_pubkey_at_path('m/0h')['xpub']
master_fpr = get_xpub_fingerprint_hex(master_xpub)
if args.fingerprint and master_fpr != args.fingerprint:
if fingerprint and master_fpr != fingerprint:
client.close()
continue
else:
@ -82,17 +75,17 @@ def find_device(args):
pass # Ignore things we wouldn't get fingerprints for
return None
def getmasterxpub(args, client):
def getmasterxpub(client):
try:
return client.get_master_xpub()
except NotImplementedError as e:
return {'error': str(e), 'code': NOT_IMPLEMENTED}
def signtx(args, client):
def signtx(client, psbt):
# Deserialize the transaction
try:
tx = PSBT()
tx.deserialize(args.psbt)
tx.deserialize(psbt)
return client.sign_tx(tx)
except NotImplementedError as e:
return {'error': str(e), 'code': NOT_IMPLEMENTED}
@ -101,31 +94,22 @@ def signtx(args, client):
except Exception as e:
return {'error': str(e), 'code': BAD_ARGUMENT}
def getxpub(args, client):
def getxpub(client, path):
try:
return client.get_pubkey_at_path(args.path)
return client.get_pubkey_at_path(path)
except NotImplementedError as e:
return {'error': str(e), 'code': NOT_IMPLEMENTED}
def signmessage(args, client):
def signmessage(client, message, path):
try:
return client.sign_message(args.message, args.path)
return client.sign_message(message, path)
except NotImplementedError as e:
return {'error': str(e), 'code': NOT_IMPLEMENTED}
except ValueError as e:
return {'error': str(e), 'code': BAD_ARGUMENT}
def getkeypool(args, client):
# args[0]; start index (e.g. 0)
# args[1]: end index (e.g. 1000)
path = args.path
start = args.start
end = args.end
internal = args.internal
keypool = args.keypool
account = args.account or 0
if args.sh_wpkh == True and args.wpkh == True:
def getkeypool(client, path, start, end, internal=False, keypool=False, account=0, sh_wpkh=False, wpkh=True):
if sh_wpkh == True and wpkh == True:
return {'error':'Both `--wpkh` and `--sh_wpkh` can not be selected at the same time.','code':BAD_ARGUMENT}
try:
@ -139,15 +123,15 @@ def getkeypool(args, client):
path = "m/"
# Purpose
if args.wpkh == True:
if wpkh == True:
path += "84'/"
elif args.sh_wpkh == True:
elif sh_wpkh == True:
path += "49'/"
else:
path += "44'/"
# Coin type
if args.testnet == True:
if client.is_testnet == True:
path += "1'/"
else:
path += "0'/"
@ -156,7 +140,7 @@ def getkeypool(args, client):
path += str(account) + '\'/'
# Receive or change
if args.internal == True:
if internal == True:
path += "1/*"
else:
path += "0/*"
@ -183,9 +167,9 @@ def getkeypool(args, client):
descriptor_open = 'pkh('
descriptor_close = ')'
if args.wpkh == True:
if wpkh == True:
descriptor_open = 'wpkh('
elif args.sh_wpkh == True:
elif sh_wpkh == True:
descriptor_open = 'sh(wpkh('
descriptor_close = '))'
@ -198,14 +182,14 @@ def getkeypool(args, client):
import_data.append(this_import)
return import_data
def displayaddress(args, client):
if args.sh_wpkh == True and args.wpkh == True:
def displayaddress(client, path, sh_wpkh=False, wpkh=False):
if sh_wpkh == True and wpkh == True:
return {'error':'Both `--wpkh` and `--sh_wpkh` can not be selected at the same time.','code':BAD_ARGUMENT}
return client.display_address(args.path, args.sh_wpkh, args.wpkh)
return client.display_address(path, sh_wpkh, wpkh)
def setup_device(args, client):
def setup_device(client, label='', backup_passphrase=''):
try:
return client.setup_device(args.label, args.backup_passphrase)
return client.setup_device(label, backup_passphrase)
except UnavailableActionError as e:
return {'error': str(e), 'code': UNAVAILABLE_ACTION}
except DeviceAlreadyInitError as e:
@ -213,15 +197,15 @@ def setup_device(args, client):
except ValueError as e:
return {'error': str(e), 'code': BAD_ARGUMENT}
def wipe_device(args, client):
def wipe_device(client):
try:
return client.wipe_device()
except UnavailableActionError as e:
return {'error': str(e), 'code': UNAVAILABLE_ACTION}
def restore_device(args, client):
def restore_device(client, label):
try:
return client.restore_device(args.label)
return client.restore_device(label)
except UnavailableActionError as e:
return {'error': str(e), 'code': UNAVAILABLE_ACTION}
except DeviceAlreadyInitError as e:
@ -229,123 +213,10 @@ def restore_device(args, client):
except ValueError as e:
return {'error': str(e), 'code': BAD_ARGUMENT}
def backup_device(args, client):
def backup_device(client, label='', backup_passphrase=''):
try:
return client.backup_device(args.label, args.backup_passphrase)
return client.backup_device(label, backup_passphrase)
except UnavailableActionError as e:
return {'error': str(e), 'code': UNAVAILABLE_ACTION}
except ValueError as e:
return {'error': str(e), 'code': BAD_ARGUMENT}
def process_commands(args):
parser = argparse.ArgumentParser(description='Access and send commands to a hardware wallet device. Responses are in JSON format')
parser.add_argument('--device-path', '-d', help='Specify the device path of the device to connect to')
parser.add_argument('--device-type', '-t', help='Specify the type of device that will be connected. If `--device-path` not given, the first device of this type enumerated is used.')
parser.add_argument('--password', '-p', help='Device password if it has one (e.g. DigitalBitbox)', default='')
parser.add_argument('--stdinpass', help='Enter the device password on the command line', action='store_true')
parser.add_argument('--testnet', help='Use testnet prefixes', action='store_true')
parser.add_argument('--debug', help='Print debug statements', action='store_true')
parser.add_argument('--fingerprint', '-f', help='Specify the device to connect to using the first 4 bytes of the hash160 of the master public key. It will connect to the first device that matches this fingerprint.')
subparsers = parser.add_subparsers(description='Commands', dest='command')
# work-around to make subparser required
subparsers.required = True
enumerate_parser = subparsers.add_parser('enumerate', help='List all available devices')
enumerate_parser.set_defaults(func=enumerate)
getmasterxpub_parser = subparsers.add_parser('getmasterxpub', help='Get the extended public key at m/44\'/0\'/0\'')
getmasterxpub_parser.set_defaults(func=getmasterxpub)
signtx_parser = subparsers.add_parser('signtx', help='Sign a PSBT')
signtx_parser.add_argument('psbt', help='The Partially Signed Bitcoin Transaction to sign')
signtx_parser.set_defaults(func=signtx)
getxpub_parser = subparsers.add_parser('getxpub', help='Get an extended public key')
getxpub_parser.add_argument('path', help='The BIP 32 derivation path to derive the key at')
getxpub_parser.set_defaults(func=getxpub)
signmsg_parser = subparsers.add_parser('signmessage', help='Sign a message')
signmsg_parser.add_argument('message', help='The message to sign')
signmsg_parser.add_argument('path', help='The BIP 32 derivation path of the key to sign the message with')
signmsg_parser.set_defaults(func=signmessage)
getkeypool_parser = subparsers.add_parser('getkeypool', help='Get JSON array of keys that can be imported to Bitcoin Core with importmulti')
getkeypool_parser.add_argument('--keypool', action='store_true', help='Indicates that the keys are to be imported to the keypool')
getkeypool_parser.add_argument('--internal', action='store_true', help='Indicates that the keys are change keys')
getkeypool_parser.add_argument('--sh_wpkh', action='store_true', help='Generate p2sh-nested segwit addresses (default path: m/49h/0h/0h/[0,1]/*)')
getkeypool_parser.add_argument('--wpkh', action='store_true', help='Generate bech32 addresses (default path: m/84h/0h/0h/[0,1]/*)')
getkeypool_parser.add_argument('--account', help='BIP43 account (default: 0)')
getkeypool_parser.add_argument('--path', help='Derivation path, default follows BIP43 convention, e.g. m/84h/0h/0h/1/* with --wpkh --internal')
getkeypool_parser.add_argument('start', type=int, help='The index to start at.')
getkeypool_parser.add_argument('end', type=int, help='The index to end at.')
getkeypool_parser.set_defaults(func=getkeypool)
displayaddr_parser = subparsers.add_parser('displayaddress', help='Display an address')
displayaddr_parser.add_argument('path', help='The BIP 32 derivation path of the key embedded in the address')
displayaddr_parser.add_argument('--sh_wpkh', action='store_true', help='Display the p2sh-nested segwit address associated with this key path')
displayaddr_parser.add_argument('--wpkh', action='store_true', help='Display the bech32 version of the address associated with this key path')
displayaddr_parser.set_defaults(func=displayaddress)
setupdev_parser = subparsers.add_parser('setup', help='Setup a device. Passphrase protection uses the password given by -p')
setupdev_parser.add_argument('--label', '-l', help='The name to give to the device', default='')
setupdev_parser.add_argument('--backup_passphrase', '-b', help='The passphrase to use for the backup, if applicable', default='')
setupdev_parser.set_defaults(func=setup_device)
wipedev_parser = subparsers.add_parser('wipe', help='Wipe a device')
wipedev_parser.set_defaults(func=wipe_device)
restore_parser = subparsers.add_parser('restore', help='Initiate the device restoring process')
restore_parser.add_argument('--label', '-l', help='The name to give to the device', default='')
restore_parser.set_defaults(func=restore_device)
backup_parser = subparsers.add_parser('backup', help='Initiate the device backup creation process')
backup_parser.add_argument('--label', '-l', help='The name to give to the device', default='')
backup_parser.add_argument('--backup_passphrase', '-b', help='The passphrase to use for the backup, if applicable', default='')
backup_parser.set_defaults(func=backup_device)
args = parser.parse_args(args)
device_path = args.device_path
device_type = args.device_type
password = args.password
command = args.command
# Setup debug logging
logging.basicConfig(level=logging.DEBUG if args.debug else logging.WARNING)
# Enter the password on stdin
if args.stdinpass:
password = getpass.getpass('Enter your device password: ')
args.password = password
# List all available hardware wallet devices
if command == 'enumerate':
return args.func(args)
# Auto detect if we are using fingerprint or type to identify device
if args.fingerprint or (args.device_type and not args.device_path):
client = find_device(args)
if not client:
return {'error':'Could not find device with specified fingerprint','code':DEVICE_CONN_ERROR}
elif args.device_type and args.device_path:
try:
client = get_client(device_type, device_path, password)
except NoPasswordError as e:
return {'error':str(e),'code':NO_PASSWORD}
except UnknownDeviceError as e:
return {'error':str(e),'code':UNKNWON_DEVICE_TYPE}
except Exception as e:
return {'error':str(e),'code':DEVICE_CONN_ERROR}
else:
return {'error':'You must specify a device type or fingerprint for all commands except enumerate','code':NO_DEVICE_PATH}
client.is_testnet = args.testnet
# Do the commands
result = args.func(args, client)
# Close the device
client.close()
return result

View File

@ -7,7 +7,7 @@ import subprocess
import time
import unittest
from hwilib.commands import process_commands
from hwilib.cli import process_commands
from ckcc.protocol import CCProtocolPacker
from ckcc.client import ColdcardDevice
from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignMessage, TestSignTx

View File

@ -9,7 +9,7 @@ import time
import unittest
from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException
from hwilib.commands import process_commands
from hwilib.cli import process_commands
from hwilib.serializations import PSBT
# Class for emulator control

View File

@ -10,7 +10,7 @@ import unittest
from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestGetKeypool, TestSignTx, TestSignMessage
from hwilib.commands import process_commands
from hwilib.cli import process_commands
from hwilib.devices.digitalbitbox import BitboxSimulator, send_plain, send_encrypt
def digitalbitbox_test_suite(rpc, userpass, simulator):

View File

@ -15,7 +15,7 @@ from keepkeylib.transport_udp import UDPTransport
from keepkeylib.client import KeepKeyDebugClient
from test_device import DeviceEmulator, DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignTx
from hwilib.commands import process_commands
from hwilib.cli import process_commands
class KeepkeyEmulator(DeviceEmulator):
def __init__(self, emulator_path):

View File

@ -12,7 +12,7 @@ import unittest
from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException
from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignTx
from hwilib.commands import process_commands
from hwilib.cli import process_commands
def ledger_test_suite(rpc, userpass):
# Look for real ledger using HWI API(self-referential, but no other way)

View File

@ -17,7 +17,7 @@ from trezorlib.debuglink import TrezorClientDebugLink, load_device_by_mnemonic,
from trezorlib import device
from test_device import DeviceEmulator, DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignTx
from hwilib.commands import process_commands
from hwilib.cli import process_commands
class TrezorEmulator(DeviceEmulator):
def __init__(self, path):