diff --git a/hwi.py b/hwi.py index 5172df0..4f9552b 100755 --- a/hwi.py +++ b/hwi.py @@ -2,11 +2,6 @@ # Hardware wallet interaction script -from hwilib.commands import process_commands +from hwilib.cli import main -import sys -import json - -if __name__ == '__main__': - result = process_commands(sys.argv[1:]) - print(json.dumps(result)) +main() diff --git a/hwilib/cli.py b/hwilib/cli.py new file mode 100644 index 0000000..06a2a60 --- /dev/null +++ b/hwilib/cli.py @@ -0,0 +1,162 @@ +#! /usr/bin/env python3 + +from .commands import backup_device, displayaddress, enumerate, find_device, \ + get_client, getmasterxpub, getxpub, getkeypool, restore_device, setup_device, \ + signmessage, signtx, wipe_device, NO_DEVICE_PATH, DEVICE_CONN_ERROR, NO_PASSWORD, \ + UNKNWON_DEVICE_TYPE + +import argparse +import getpass +import logging +import json +import sys + +def backup_device_handler(args, client): + return backup_device(client, label=args.label, backup_passphrase=args.backup_passphrase) + +def displayaddress_handler(args, client): + return displayaddress(client, path=args.path, sh_wpkh=args.sh_wpkh, wpkh=args.wpkh) + +def enumerate_handler(args): + return enumerate(password=args.password) + +def getmasterxpub_handler(args, client): + return getmasterxpub(client) + +def getxpub_handler(args, client): + return getxpub(client, path=args.path) + +def getkeypool_handler(args, client): + return getkeypool(client, path=args.path, start=args.start, end=args.end, internal=args.internal, keypool=args.keypool, account=args.account, sh_wpkh=args.sh_wpkh, wpkh=args.wpkh) + +def restore_device_handler(args, client): + return restore_device(client, label=args.label) + +def setup_device_handler(args, client): + return setup_device(client, label=args.label, backup_passphrase=args.backup_passphrase) + +def signmessage_handler(args, client): + return signmessage(client, message=args.message, path=args.path) + +def signtx_handler(args, client): + return signtx(client, psbt=args.psbt) + +def wipe_device_handler(args, client): + return wipe_device(client) + +def process_commands(args): + parser = argparse.ArgumentParser(description='Access and send commands to a hardware wallet device. Responses are in JSON format') + parser.add_argument('--device-path', '-d', help='Specify the device path of the device to connect to') + parser.add_argument('--device-type', '-t', help='Specify the type of device that will be connected. If `--device-path` not given, the first device of this type enumerated is used.') + parser.add_argument('--password', '-p', help='Device password if it has one (e.g. DigitalBitbox)', default='') + parser.add_argument('--stdinpass', help='Enter the device password on the command line', action='store_true') + parser.add_argument('--testnet', help='Use testnet prefixes', action='store_true') + parser.add_argument('--debug', help='Print debug statements', action='store_true') + parser.add_argument('--fingerprint', '-f', help='Specify the device to connect to using the first 4 bytes of the hash160 of the master public key. It will connect to the first device that matches this fingerprint.') + + subparsers = parser.add_subparsers(description='Commands', dest='command') + # work-around to make subparser required + subparsers.required = True + + enumerate_parser = subparsers.add_parser('enumerate', help='List all available devices') + enumerate_parser.set_defaults(func=enumerate_handler) + + getmasterxpub_parser = subparsers.add_parser('getmasterxpub', help='Get the extended public key at m/44\'/0\'/0\'') + getmasterxpub_parser.set_defaults(func=getmasterxpub_handler) + + signtx_parser = subparsers.add_parser('signtx', help='Sign a PSBT') + signtx_parser.add_argument('psbt', help='The Partially Signed Bitcoin Transaction to sign') + signtx_parser.set_defaults(func=signtx_handler) + + getxpub_parser = subparsers.add_parser('getxpub', help='Get an extended public key') + getxpub_parser.add_argument('path', help='The BIP 32 derivation path to derive the key at') + getxpub_parser.set_defaults(func=getxpub_handler) + + signmsg_parser = subparsers.add_parser('signmessage', help='Sign a message') + signmsg_parser.add_argument('message', help='The message to sign') + signmsg_parser.add_argument('path', help='The BIP 32 derivation path of the key to sign the message with') + signmsg_parser.set_defaults(func=signmessage_handler) + + getkeypool_parser = subparsers.add_parser('getkeypool', help='Get JSON array of keys that can be imported to Bitcoin Core with importmulti') + getkeypool_parser.add_argument('--keypool', action='store_true', help='Indicates that the keys are to be imported to the keypool') + getkeypool_parser.add_argument('--internal', action='store_true', help='Indicates that the keys are change keys') + getkeypool_parser.add_argument('--sh_wpkh', action='store_true', help='Generate p2sh-nested segwit addresses (default path: m/49h/0h/0h/[0,1]/*)') + getkeypool_parser.add_argument('--wpkh', action='store_true', help='Generate bech32 addresses (default path: m/84h/0h/0h/[0,1]/*)') + getkeypool_parser.add_argument('--account', help='BIP43 account (default: 0)', type=int, default=0) + getkeypool_parser.add_argument('--path', help='Derivation path, default follows BIP43 convention, e.g. m/84h/0h/0h/1/* with --wpkh --internal') + getkeypool_parser.add_argument('start', type=int, help='The index to start at.') + getkeypool_parser.add_argument('end', type=int, help='The index to end at.') + getkeypool_parser.set_defaults(func=getkeypool_handler) + + displayaddr_parser = subparsers.add_parser('displayaddress', help='Display an address') + displayaddr_parser.add_argument('path', help='The BIP 32 derivation path of the key embedded in the address') + displayaddr_parser.add_argument('--sh_wpkh', action='store_true', help='Display the p2sh-nested segwit address associated with this key path') + displayaddr_parser.add_argument('--wpkh', action='store_true', help='Display the bech32 version of the address associated with this key path') + displayaddr_parser.set_defaults(func=displayaddress_handler) + + setupdev_parser = subparsers.add_parser('setup', help='Setup a device. Passphrase protection uses the password given by -p') + setupdev_parser.add_argument('--label', '-l', help='The name to give to the device', default='') + setupdev_parser.add_argument('--backup_passphrase', '-b', help='The passphrase to use for the backup, if applicable', default='') + setupdev_parser.set_defaults(func=setup_device_handler) + + wipedev_parser = subparsers.add_parser('wipe', help='Wipe a device') + wipedev_parser.set_defaults(func=wipe_device_handler) + + restore_parser = subparsers.add_parser('restore', help='Initiate the device restoring process') + restore_parser.add_argument('--label', '-l', help='The name to give to the device', default='') + restore_parser.set_defaults(func=restore_device_handler) + + backup_parser = subparsers.add_parser('backup', help='Initiate the device backup creation process') + backup_parser.add_argument('--label', '-l', help='The name to give to the device', default='') + backup_parser.add_argument('--backup_passphrase', '-b', help='The passphrase to use for the backup, if applicable', default='') + backup_parser.set_defaults(func=backup_device_handler) + + args = parser.parse_args(args) + + device_path = args.device_path + device_type = args.device_type + password = args.password + command = args.command + + # Setup debug logging + logging.basicConfig(level=logging.DEBUG if args.debug else logging.WARNING) + + # Enter the password on stdin + if args.stdinpass: + password = getpass.getpass('Enter your device password: ') + args.password = password + + # List all available hardware wallet devices + if command == 'enumerate': + return args.func(args) + + # Auto detect if we are using fingerprint or type to identify device + if args.fingerprint or (args.device_type and not args.device_path): + client = find_device(args.device_path, args.password, args.device_type, args.fingerprint) + if not client: + return {'error':'Could not find device with specified fingerprint','code':DEVICE_CONN_ERROR} + elif args.device_type and args.device_path: + try: + client = get_client(device_type, device_path, password) + except NoPasswordError as e: + return {'error':str(e),'code':NO_PASSWORD} + except UnknownDeviceError as e: + return {'error':str(e),'code':UNKNWON_DEVICE_TYPE} + except Exception as e: + return {'error':str(e),'code':DEVICE_CONN_ERROR} + else: + return {'error':'You must specify a device type or fingerprint for all commands except enumerate','code':NO_DEVICE_PATH} + + client.is_testnet = args.testnet + + # Do the commands + result = args.func(args, client) + + # Close the device + client.close() + + return result + +def main(): + result = process_commands(sys.argv[1:]) + print(json.dumps(result)) diff --git a/hwilib/commands.py b/hwilib/commands.py index 8096a5a..15d445b 100644 --- a/hwilib/commands.py +++ b/hwilib/commands.py @@ -2,14 +2,8 @@ # Hardware wallet interaction script -import argparse -import hid -import json -import sys -import logging import glob import importlib -import getpass from .serializations import PSBT, Base64ToHex, HexToBase64, hash160 from .base58 import xpub_to_address, xpub_to_pub_hex, get_xpub_fingerprint_as_id, get_xpub_fingerprint_hex @@ -47,7 +41,7 @@ def get_client(device_type, device_path, password=''): return client # Get a list of all available hardware wallets -def enumerate(args): +def enumerate(password=''): result = [] # Gets the module names of all the files in devices/ @@ -57,23 +51,22 @@ def enumerate(args): for module in modules: try: imported_dev = importlib.import_module('.devices.' + module, __package__) - result.extend(imported_dev.enumerate(args.password)) + result.extend(imported_dev.enumerate(password)) except ImportError as e: pass # Ignore ImportErrors, the user may not have all device dependencies installed return result # Fingerprint or device type required -def find_device(args): - assert(args.device_path is None) - devices = enumerate(args) +def find_device(device_path, password='', device_type=None, fingerprint=None): + devices = enumerate(password) for d in devices: - if args.device_type is not None and d['type'] != args.device_type: + if device_type is not None and d['type'] != device_type: continue try: - client = get_client(d['type'], d['path'], args.password) + client = get_client(d['type'], d['path'], password) master_xpub = client.get_pubkey_at_path('m/0h')['xpub'] master_fpr = get_xpub_fingerprint_hex(master_xpub) - if args.fingerprint and master_fpr != args.fingerprint: + if fingerprint and master_fpr != fingerprint: client.close() continue else: @@ -82,17 +75,17 @@ def find_device(args): pass # Ignore things we wouldn't get fingerprints for return None -def getmasterxpub(args, client): +def getmasterxpub(client): try: return client.get_master_xpub() except NotImplementedError as e: return {'error': str(e), 'code': NOT_IMPLEMENTED} -def signtx(args, client): +def signtx(client, psbt): # Deserialize the transaction try: tx = PSBT() - tx.deserialize(args.psbt) + tx.deserialize(psbt) return client.sign_tx(tx) except NotImplementedError as e: return {'error': str(e), 'code': NOT_IMPLEMENTED} @@ -101,31 +94,22 @@ def signtx(args, client): except Exception as e: return {'error': str(e), 'code': BAD_ARGUMENT} -def getxpub(args, client): +def getxpub(client, path): try: - return client.get_pubkey_at_path(args.path) + return client.get_pubkey_at_path(path) except NotImplementedError as e: return {'error': str(e), 'code': NOT_IMPLEMENTED} -def signmessage(args, client): +def signmessage(client, message, path): try: - return client.sign_message(args.message, args.path) + return client.sign_message(message, path) except NotImplementedError as e: return {'error': str(e), 'code': NOT_IMPLEMENTED} except ValueError as e: return {'error': str(e), 'code': BAD_ARGUMENT} -def getkeypool(args, client): - # args[0]; start index (e.g. 0) - # args[1]: end index (e.g. 1000) - path = args.path - start = args.start - end = args.end - internal = args.internal - keypool = args.keypool - account = args.account or 0 - - if args.sh_wpkh == True and args.wpkh == True: +def getkeypool(client, path, start, end, internal=False, keypool=False, account=0, sh_wpkh=False, wpkh=True): + if sh_wpkh == True and wpkh == True: return {'error':'Both `--wpkh` and `--sh_wpkh` can not be selected at the same time.','code':BAD_ARGUMENT} try: @@ -139,15 +123,15 @@ def getkeypool(args, client): path = "m/" # Purpose - if args.wpkh == True: + if wpkh == True: path += "84'/" - elif args.sh_wpkh == True: + elif sh_wpkh == True: path += "49'/" else: path += "44'/" # Coin type - if args.testnet == True: + if client.is_testnet == True: path += "1'/" else: path += "0'/" @@ -156,7 +140,7 @@ def getkeypool(args, client): path += str(account) + '\'/' # Receive or change - if args.internal == True: + if internal == True: path += "1/*" else: path += "0/*" @@ -183,9 +167,9 @@ def getkeypool(args, client): descriptor_open = 'pkh(' descriptor_close = ')' - if args.wpkh == True: + if wpkh == True: descriptor_open = 'wpkh(' - elif args.sh_wpkh == True: + elif sh_wpkh == True: descriptor_open = 'sh(wpkh(' descriptor_close = '))' @@ -198,14 +182,14 @@ def getkeypool(args, client): import_data.append(this_import) return import_data -def displayaddress(args, client): - if args.sh_wpkh == True and args.wpkh == True: +def displayaddress(client, path, sh_wpkh=False, wpkh=False): + if sh_wpkh == True and wpkh == True: return {'error':'Both `--wpkh` and `--sh_wpkh` can not be selected at the same time.','code':BAD_ARGUMENT} - return client.display_address(args.path, args.sh_wpkh, args.wpkh) + return client.display_address(path, sh_wpkh, wpkh) -def setup_device(args, client): +def setup_device(client, label='', backup_passphrase=''): try: - return client.setup_device(args.label, args.backup_passphrase) + return client.setup_device(label, backup_passphrase) except UnavailableActionError as e: return {'error': str(e), 'code': UNAVAILABLE_ACTION} except DeviceAlreadyInitError as e: @@ -213,15 +197,15 @@ def setup_device(args, client): except ValueError as e: return {'error': str(e), 'code': BAD_ARGUMENT} -def wipe_device(args, client): +def wipe_device(client): try: return client.wipe_device() except UnavailableActionError as e: return {'error': str(e), 'code': UNAVAILABLE_ACTION} -def restore_device(args, client): +def restore_device(client, label): try: - return client.restore_device(args.label) + return client.restore_device(label) except UnavailableActionError as e: return {'error': str(e), 'code': UNAVAILABLE_ACTION} except DeviceAlreadyInitError as e: @@ -229,123 +213,10 @@ def restore_device(args, client): except ValueError as e: return {'error': str(e), 'code': BAD_ARGUMENT} -def backup_device(args, client): +def backup_device(client, label='', backup_passphrase=''): try: - return client.backup_device(args.label, args.backup_passphrase) + return client.backup_device(label, backup_passphrase) except UnavailableActionError as e: return {'error': str(e), 'code': UNAVAILABLE_ACTION} except ValueError as e: return {'error': str(e), 'code': BAD_ARGUMENT} - -def process_commands(args): - parser = argparse.ArgumentParser(description='Access and send commands to a hardware wallet device. Responses are in JSON format') - parser.add_argument('--device-path', '-d', help='Specify the device path of the device to connect to') - parser.add_argument('--device-type', '-t', help='Specify the type of device that will be connected. If `--device-path` not given, the first device of this type enumerated is used.') - parser.add_argument('--password', '-p', help='Device password if it has one (e.g. DigitalBitbox)', default='') - parser.add_argument('--stdinpass', help='Enter the device password on the command line', action='store_true') - parser.add_argument('--testnet', help='Use testnet prefixes', action='store_true') - parser.add_argument('--debug', help='Print debug statements', action='store_true') - parser.add_argument('--fingerprint', '-f', help='Specify the device to connect to using the first 4 bytes of the hash160 of the master public key. It will connect to the first device that matches this fingerprint.') - - subparsers = parser.add_subparsers(description='Commands', dest='command') - # work-around to make subparser required - subparsers.required = True - - enumerate_parser = subparsers.add_parser('enumerate', help='List all available devices') - enumerate_parser.set_defaults(func=enumerate) - - getmasterxpub_parser = subparsers.add_parser('getmasterxpub', help='Get the extended public key at m/44\'/0\'/0\'') - getmasterxpub_parser.set_defaults(func=getmasterxpub) - - signtx_parser = subparsers.add_parser('signtx', help='Sign a PSBT') - signtx_parser.add_argument('psbt', help='The Partially Signed Bitcoin Transaction to sign') - signtx_parser.set_defaults(func=signtx) - - getxpub_parser = subparsers.add_parser('getxpub', help='Get an extended public key') - getxpub_parser.add_argument('path', help='The BIP 32 derivation path to derive the key at') - getxpub_parser.set_defaults(func=getxpub) - - signmsg_parser = subparsers.add_parser('signmessage', help='Sign a message') - signmsg_parser.add_argument('message', help='The message to sign') - signmsg_parser.add_argument('path', help='The BIP 32 derivation path of the key to sign the message with') - signmsg_parser.set_defaults(func=signmessage) - - getkeypool_parser = subparsers.add_parser('getkeypool', help='Get JSON array of keys that can be imported to Bitcoin Core with importmulti') - getkeypool_parser.add_argument('--keypool', action='store_true', help='Indicates that the keys are to be imported to the keypool') - getkeypool_parser.add_argument('--internal', action='store_true', help='Indicates that the keys are change keys') - getkeypool_parser.add_argument('--sh_wpkh', action='store_true', help='Generate p2sh-nested segwit addresses (default path: m/49h/0h/0h/[0,1]/*)') - getkeypool_parser.add_argument('--wpkh', action='store_true', help='Generate bech32 addresses (default path: m/84h/0h/0h/[0,1]/*)') - getkeypool_parser.add_argument('--account', help='BIP43 account (default: 0)') - getkeypool_parser.add_argument('--path', help='Derivation path, default follows BIP43 convention, e.g. m/84h/0h/0h/1/* with --wpkh --internal') - getkeypool_parser.add_argument('start', type=int, help='The index to start at.') - getkeypool_parser.add_argument('end', type=int, help='The index to end at.') - getkeypool_parser.set_defaults(func=getkeypool) - - displayaddr_parser = subparsers.add_parser('displayaddress', help='Display an address') - displayaddr_parser.add_argument('path', help='The BIP 32 derivation path of the key embedded in the address') - displayaddr_parser.add_argument('--sh_wpkh', action='store_true', help='Display the p2sh-nested segwit address associated with this key path') - displayaddr_parser.add_argument('--wpkh', action='store_true', help='Display the bech32 version of the address associated with this key path') - displayaddr_parser.set_defaults(func=displayaddress) - - setupdev_parser = subparsers.add_parser('setup', help='Setup a device. Passphrase protection uses the password given by -p') - setupdev_parser.add_argument('--label', '-l', help='The name to give to the device', default='') - setupdev_parser.add_argument('--backup_passphrase', '-b', help='The passphrase to use for the backup, if applicable', default='') - setupdev_parser.set_defaults(func=setup_device) - - wipedev_parser = subparsers.add_parser('wipe', help='Wipe a device') - wipedev_parser.set_defaults(func=wipe_device) - - restore_parser = subparsers.add_parser('restore', help='Initiate the device restoring process') - restore_parser.add_argument('--label', '-l', help='The name to give to the device', default='') - restore_parser.set_defaults(func=restore_device) - - backup_parser = subparsers.add_parser('backup', help='Initiate the device backup creation process') - backup_parser.add_argument('--label', '-l', help='The name to give to the device', default='') - backup_parser.add_argument('--backup_passphrase', '-b', help='The passphrase to use for the backup, if applicable', default='') - backup_parser.set_defaults(func=backup_device) - - args = parser.parse_args(args) - - device_path = args.device_path - device_type = args.device_type - password = args.password - command = args.command - - # Setup debug logging - logging.basicConfig(level=logging.DEBUG if args.debug else logging.WARNING) - - # Enter the password on stdin - if args.stdinpass: - password = getpass.getpass('Enter your device password: ') - args.password = password - - # List all available hardware wallet devices - if command == 'enumerate': - return args.func(args) - - # Auto detect if we are using fingerprint or type to identify device - if args.fingerprint or (args.device_type and not args.device_path): - client = find_device(args) - if not client: - return {'error':'Could not find device with specified fingerprint','code':DEVICE_CONN_ERROR} - elif args.device_type and args.device_path: - try: - client = get_client(device_type, device_path, password) - except NoPasswordError as e: - return {'error':str(e),'code':NO_PASSWORD} - except UnknownDeviceError as e: - return {'error':str(e),'code':UNKNWON_DEVICE_TYPE} - except Exception as e: - return {'error':str(e),'code':DEVICE_CONN_ERROR} - else: - return {'error':'You must specify a device type or fingerprint for all commands except enumerate','code':NO_DEVICE_PATH} - - client.is_testnet = args.testnet - - # Do the commands - result = args.func(args, client) - - # Close the device - client.close() - - return result diff --git a/test/test_coldcard.py b/test/test_coldcard.py index 91559a0..9273b29 100755 --- a/test/test_coldcard.py +++ b/test/test_coldcard.py @@ -7,7 +7,7 @@ import subprocess import time import unittest -from hwilib.commands import process_commands +from hwilib.cli import process_commands from ckcc.protocol import CCProtocolPacker from ckcc.client import ColdcardDevice from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignMessage, TestSignTx diff --git a/test/test_device.py b/test/test_device.py index 462a606..f3b95a8 100644 --- a/test/test_device.py +++ b/test/test_device.py @@ -9,7 +9,7 @@ import time import unittest from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException -from hwilib.commands import process_commands +from hwilib.cli import process_commands from hwilib.serializations import PSBT # Class for emulator control diff --git a/test/test_digitalbitbox.py b/test/test_digitalbitbox.py index 666b857..287c395 100755 --- a/test/test_digitalbitbox.py +++ b/test/test_digitalbitbox.py @@ -10,7 +10,7 @@ import unittest from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestGetKeypool, TestSignTx, TestSignMessage -from hwilib.commands import process_commands +from hwilib.cli import process_commands from hwilib.devices.digitalbitbox import BitboxSimulator, send_plain, send_encrypt def digitalbitbox_test_suite(rpc, userpass, simulator): diff --git a/test/test_keepkey.py b/test/test_keepkey.py index 748257c..9af770a 100755 --- a/test/test_keepkey.py +++ b/test/test_keepkey.py @@ -15,7 +15,7 @@ from keepkeylib.transport_udp import UDPTransport from keepkeylib.client import KeepKeyDebugClient from test_device import DeviceEmulator, DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignTx -from hwilib.commands import process_commands +from hwilib.cli import process_commands class KeepkeyEmulator(DeviceEmulator): def __init__(self, emulator_path): diff --git a/test/test_ledger.py b/test/test_ledger.py index 3f25716..c636f32 100755 --- a/test/test_ledger.py +++ b/test/test_ledger.py @@ -12,7 +12,7 @@ import unittest from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException from test_device import DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignTx -from hwilib.commands import process_commands +from hwilib.cli import process_commands def ledger_test_suite(rpc, userpass): # Look for real ledger using HWI API(self-referential, but no other way) diff --git a/test/test_trezor.py b/test/test_trezor.py index dd7be0c..9908143 100755 --- a/test/test_trezor.py +++ b/test/test_trezor.py @@ -17,7 +17,7 @@ from trezorlib.debuglink import TrezorClientDebugLink, load_device_by_mnemonic, from trezorlib import device from test_device import DeviceEmulator, DeviceTestCase, start_bitcoind, TestDeviceConnect, TestDisplayAddress, TestGetKeypool, TestSignTx -from hwilib.commands import process_commands +from hwilib.cli import process_commands class TrezorEmulator(DeviceEmulator): def __init__(self, path):