#! /usr/bin/env python3 import atexit import json import os import shlex import shutil import subprocess import tempfile import time import unittest from authproxy import AuthServiceProxy, JSONRPCException from hwilib.base58 import xpub_to_pub_hex from hwilib.cli import process_commands from hwilib.descriptor import AddChecksum from hwilib.serializations import PSBT SUPPORTS_MS_DISPLAY = {'trezor_1', 'keepkey', 'coldcard', 'trezor_t'} SUPPORTS_XPUB_MS_DISPLAY = {'trezor_t'} # Class for emulator control class DeviceEmulator(): def start(self): pass def stop(self): pass def start_bitcoind(bitcoind_path): datadir = tempfile.mkdtemp() bitcoind_proc = subprocess.Popen([bitcoind_path, '-regtest', '-datadir=' + datadir, '-noprinttoconsole', '-fallbackfee=0.0002']) def cleanup_bitcoind(): bitcoind_proc.kill() shutil.rmtree(datadir) atexit.register(cleanup_bitcoind) # Wait for cookie file to be created while not os.path.exists(datadir + '/regtest/.cookie'): time.sleep(0.5) # Read .cookie file to get user and pass with open(datadir + '/regtest/.cookie') as f: userpass = f.readline().lstrip().rstrip() rpc = AuthServiceProxy('http://{}@127.0.0.1:18443'.format(userpass)) # Wait for bitcoind to be ready ready = False while not ready: try: rpc.getblockchaininfo() ready = True except JSONRPCException: time.sleep(0.5) pass # Make sure there are blocks and coins available rpc.generatetoaddress(101, rpc.getnewaddress()) return (rpc, userpass) class DeviceTestCase(unittest.TestCase): def __init__(self, rpc, rpc_userpass, type, full_type, path, fingerprint, master_xpub, password='', emulator=None, interface='library', methodName='runTest'): super(DeviceTestCase, self).__init__(methodName) self.rpc = rpc self.rpc_userpass = rpc_userpass self.type = type self.full_type = full_type self.path = path self.fingerprint = fingerprint self.master_xpub = master_xpub self.password = password self.dev_args = ['-t', self.type, '-d', self.path, '--testnet'] if emulator: self.emulator = emulator else: self.emulator = DeviceEmulator() if password: self.dev_args.extend(['-p', password]) self.interface = interface @staticmethod def parameterize(testclass, rpc, rpc_userpass, type, full_type, path, fingerprint, master_xpub, password='', interface='library', emulator=None): testloader = unittest.TestLoader() testnames = testloader.getTestCaseNames(testclass) suite = unittest.TestSuite() for name in testnames: suite.addTest(testclass(rpc, rpc_userpass, type, full_type, path, fingerprint, master_xpub, password, emulator, interface, name)) return suite def do_command(self, args): cli_args = [] for arg in args: cli_args.append(shlex.quote(arg)) if self.interface == 'cli': proc = subprocess.Popen(['hwi ' + ' '.join(cli_args)], stdout=subprocess.PIPE, shell=True) result = proc.communicate() return json.loads(result[0].decode()) elif self.interface == 'bindist': proc = subprocess.Popen(['../dist/hwi ' + ' '.join(cli_args)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, shell=True) result = proc.communicate() return json.loads(result[0].decode()) elif self.interface == 'stdin': input_str = '\n'.join(args) + '\n' proc = subprocess.Popen(['hwi', '--stdin'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) result = proc.communicate(input_str.encode()) return json.loads(result[0].decode()) else: return process_commands(args) def get_password_args(self): if self.password: return ['-p', self.password] return [] def __str__(self): return '{}: {}'.format(self.full_type, super().__str__()) def __repr__(self): return '{}: {}'.format(self.full_type, super().__repr__()) def setup_wallets(self): wallet_name = '{}_{}_test'.format(self.full_type, self.id()) self.rpc.createwallet(wallet_name=wallet_name, disable_private_keys=True, descriptors=True) self.wrpc = AuthServiceProxy('http://{}@127.0.0.1:18443/wallet/{}'.format(self.rpc_userpass, wallet_name)) self.wpk_rpc = AuthServiceProxy('http://{}@127.0.0.1:18443/wallet/'.format(self.rpc_userpass)) def setUp(self): self.emulator.start() def tearDown(self): self.emulator.stop() class TestDeviceConnect(DeviceTestCase): def test_enumerate(self): enum_res = self.do_command(self.get_password_args() + ['enumerate']) found = False for device in enum_res: if (device['type'] == self.type or device['model'] == self.type) and device['path'] == self.path and device['fingerprint'] == self.fingerprint: self.assertIn('type', device) self.assertIn('model', device) self.assertIn('path', device) self.assertIn('needs_pin_sent', device) self.assertIn('needs_passphrase_sent', device) self.assertNotIn('error', device) self.assertNotIn('code', device) found = True self.assertTrue(found) def test_no_type(self): gmxp_res = self.do_command(['getmasterxpub']) self.assertIn('error', gmxp_res) self.assertEqual(gmxp_res['error'], 'You must specify a device type or fingerprint for all commands except enumerate') self.assertIn('code', gmxp_res) self.assertEqual(gmxp_res['code'], -1) def test_path_type(self): gmxp_res = self.do_command(self.get_password_args() + ['-t', self.type, '-d', self.path, 'getmasterxpub']) self.assertEqual(gmxp_res['xpub'], self.master_xpub) def test_fingerprint_autodetect(self): gmxp_res = self.do_command(self.get_password_args() + ['-f', self.fingerprint, 'getmasterxpub']) self.assertEqual(gmxp_res['xpub'], self.master_xpub) # Nonexistent fingerprint gmxp_res = self.do_command(self.get_password_args() + ['-f', '0000ffff', 'getmasterxpub']) self.assertEqual(gmxp_res['error'], 'Could not find device with specified fingerprint') self.assertEqual(gmxp_res['code'], -3) def test_type_only_autodetect(self): gmxp_res = self.do_command(self.get_password_args() + ['-t', self.type, 'getmasterxpub']) self.assertEqual(gmxp_res['xpub'], self.master_xpub) # Unknown device type gmxp_res = self.do_command(['-t', 'fakedev', '-d', 'fakepath', 'getmasterxpub']) self.assertEqual(gmxp_res['error'], 'Unknown device type specified') self.assertEqual(gmxp_res['code'], -4) class TestGetKeypool(DeviceTestCase): def setUp(self): super().setUp() self.setup_wallets() def test_getkeypool(self): pkh_keypool_desc = self.do_command(self.dev_args + ['getkeypool', '0', '20']) import_result = self.wrpc.importdescriptors(pkh_keypool_desc) self.assertTrue(import_result[0]['success']) for _ in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress('', 'legacy')) self.assertTrue(addr_info['hdkeypath'].startswith("m/44'/1'/0'/0/")) addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress('legacy')) self.assertTrue(addr_info['hdkeypath'].startswith("m/44'/1'/0'/1/")) shwpkh_keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--sh_wpkh', '0', '20']) import_result = self.wrpc.importdescriptors(shwpkh_keypool_desc) self.assertTrue(import_result[0]['success']) for _ in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress('', 'p2sh-segwit')) self.assertTrue(addr_info['hdkeypath'].startswith("m/49'/1'/0'/0/")) addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress('p2sh-segwit')) self.assertTrue(addr_info['hdkeypath'].startswith("m/49'/1'/0'/1/")) wpkh_keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--wpkh', '0', '20']) import_result = self.wrpc.importdescriptors(wpkh_keypool_desc) self.assertTrue(import_result[0]['success']) for _ in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress('', 'bech32')) self.assertTrue(addr_info['hdkeypath'].startswith("m/84'/1'/0'/0/")) addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress('bech32')) self.assertTrue(addr_info['hdkeypath'].startswith("m/84'/1'/0'/1/")) # Test that `--all` option gives the "concatenation" of previous three calls all_keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--all', '0', '20']) self.assertEqual(all_keypool_desc, pkh_keypool_desc + wpkh_keypool_desc + shwpkh_keypool_desc) keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--sh_wpkh', '--account', '3', '0', '20']) import_result = self.wrpc.importdescriptors(keypool_desc) self.assertTrue(import_result[0]['success']) for _ in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress('', 'p2sh-segwit')) self.assertTrue(addr_info['hdkeypath'].startswith("m/49'/1'/3'/0/")) addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress('p2sh-segwit')) self.assertTrue(addr_info['hdkeypath'].startswith("m/49'/1'/3'/1/")) keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--wpkh', '--account', '3', '0', '20']) import_result = self.wrpc.importdescriptors(keypool_desc) self.assertTrue(import_result[0]['success']) for _ in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress('', 'bech32')) self.assertTrue(addr_info['hdkeypath'].startswith("m/84'/1'/3'/0/")) addr_info = self.wrpc.getaddressinfo(self.wrpc.getrawchangeaddress('bech32')) self.assertTrue(addr_info['hdkeypath'].startswith("m/84'/1'/3'/1/")) keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--path', 'm/0h/0h/4h/*', '0', '20']) import_result = self.wrpc.importdescriptors(keypool_desc) self.assertTrue(import_result[0]['success']) for _ in range(0, 21): addr_info = self.wrpc.getaddressinfo(self.wrpc.getnewaddress('', 'legacy')) self.assertTrue(addr_info['hdkeypath'].startswith("m/0'/0'/4'/")) keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--path', '/0h/0h/4h/*', '0', '20']) self.assertEqual(keypool_desc['error'], 'Path must start with m/') self.assertEqual(keypool_desc['code'], -7) keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--path', 'm/0h/0h/4h/', '0', '20']) self.assertEqual(keypool_desc['error'], 'Path must end with /*') self.assertEqual(keypool_desc['code'], -7) class TestGetDescriptors(DeviceTestCase): def tearDown(self): self.emulator.stop() def test_getdescriptors(self): descriptors = self.do_command(self.dev_args + ['getdescriptors']) self.assertIn('receive', descriptors) self.assertIn('internal', descriptors) self.assertEqual(len(descriptors['receive']), 3) self.assertEqual(len(descriptors['internal']), 3) for descriptor in descriptors['receive']: info_result = self.rpc.getdescriptorinfo(descriptor) self.assertTrue(info_result['isrange']) self.assertTrue(info_result['issolvable']) for descriptor in descriptors['internal']: info_result = self.rpc.getdescriptorinfo(descriptor) self.assertTrue(info_result['isrange']) self.assertTrue(info_result['issolvable']) class TestSignTx(DeviceTestCase): def setUp(self): super().setUp() self.setup_wallets() def _generate_and_finalize(self, unknown_inputs, psbt): if not unknown_inputs: # Just do the normal signing process to test "all inputs" case sign_res = self.do_command(self.dev_args + ['signtx', psbt['psbt']]) finalize_res = self.wrpc.finalizepsbt(sign_res['psbt']) else: # Sign only input one on first pass # then rest on second pass to test ability to successfully # ignore inputs that are not its own. Then combine both # signing passes to ensure they are actually properly being # partially signed at each step. first_psbt = PSBT() first_psbt.deserialize(psbt['psbt']) second_psbt = PSBT() second_psbt.deserialize(psbt['psbt']) # Blank master fingerprint to make hww fail to sign # Single input PSBTs will be fully signed by first signer for psbt_input in first_psbt.inputs[1:]: for pubkey, path in psbt_input.hd_keypaths.items(): psbt_input.hd_keypaths[pubkey] = [0] + path[1:] for pubkey, path in second_psbt.inputs[0].hd_keypaths.items(): second_psbt.inputs[0].hd_keypaths[pubkey] = [0] + path[1:] single_input = len(first_psbt.inputs) == 1 # Process the psbts first_psbt = first_psbt.serialize() second_psbt = second_psbt.serialize() # First will always have something to sign first_sign_res = self.do_command(self.dev_args + ['signtx', first_psbt]) self.assertTrue(single_input == self.wrpc.finalizepsbt(first_sign_res['psbt'])['complete']) # Second may have nothing to sign (1 input case) # and also may throw an error(e.g., ColdCard) second_sign_res = self.do_command(self.dev_args + ['signtx', second_psbt]) if 'psbt' in second_sign_res: self.assertTrue(not self.wrpc.finalizepsbt(second_sign_res['psbt'])['complete']) combined_psbt = self.wrpc.combinepsbt([first_sign_res['psbt'], second_sign_res['psbt']]) else: self.assertTrue('error' in second_sign_res) combined_psbt = first_sign_res['psbt'] finalize_res = self.wrpc.finalizepsbt(combined_psbt) self.assertTrue(finalize_res['complete']) self.assertTrue(self.wrpc.testmempoolaccept([finalize_res['hex']])[0]["allowed"]) return finalize_res['hex'] def _make_multisigs(self): desc_pubkeys = [] sorted_pubkeys = [] for i in range(0, 3): path = "/48h/1h/{}h/0/0".format(i) origin = '{}{}'.format(self.fingerprint, path) xpub = self.do_command(self.dev_args + ["--expert", "getxpub", "m{}".format(path)]) desc_pubkeys.append("[{}]{}".format(origin, xpub["pubkey"])) sorted_pubkeys.append(xpub["pubkey"]) sorted_pubkeys.sort() sh_desc = AddChecksum("sh(sortedmulti(2,{},{},{}))".format(desc_pubkeys[0], desc_pubkeys[1], desc_pubkeys[2])) sh_ms_info = self.rpc.createmultisig(2, sorted_pubkeys, "legacy") self.assertEqual(self.rpc.deriveaddresses(sh_desc)[0], sh_ms_info["address"]) sh_wsh_desc = AddChecksum("sh(wsh(sortedmulti(2,{},{},{})))".format(desc_pubkeys[1], desc_pubkeys[2], desc_pubkeys[0])) sh_wsh_ms_info = self.rpc.createmultisig(2, sorted_pubkeys, "p2sh-segwit") self.assertEqual(self.rpc.deriveaddresses(sh_wsh_desc)[0], sh_wsh_ms_info["address"]) wsh_desc = AddChecksum("wsh(sortedmulti(2,{},{},{}))".format(desc_pubkeys[2], desc_pubkeys[1], desc_pubkeys[0])) wsh_ms_info = self.rpc.createmultisig(2, sorted_pubkeys, "bech32") self.assertEqual(self.rpc.deriveaddresses(wsh_desc)[0], wsh_ms_info["address"]) return sh_desc, sh_ms_info["address"], sh_wsh_desc, sh_wsh_ms_info["address"], wsh_desc, wsh_ms_info["address"] def _test_signtx(self, input_type, multisig, external): # Import some keys to the watch only wallet and send coins to them keypool_desc = self.do_command(self.dev_args + ['getkeypool', '--all', '30', '50']) import_result = self.wrpc.importdescriptors(keypool_desc) self.assertTrue(import_result[0]['success']) sh_wpkh_addr = self.wrpc.getnewaddress('', 'p2sh-segwit') wpkh_addr = self.wrpc.getnewaddress('', 'bech32') pkh_addr = self.wrpc.getnewaddress('', 'legacy') sh_multi_desc, sh_multi_addr, sh_wsh_multi_desc, sh_wsh_multi_addr, wsh_multi_desc, wsh_multi_addr = self._make_multisigs() sh_multi_import = {'desc': sh_multi_desc, "timestamp": "now", "label": "shmulti"} sh_wsh_multi_import = {'desc': sh_wsh_multi_desc, "timestamp": "now", "label": "shwshmulti"} wsh_multi_import = {'desc': wsh_multi_desc, "timestamp": "now", "label": "wshmulti"} multi_result = self.wrpc.importdescriptors([sh_multi_import, sh_wsh_multi_import, wsh_multi_import]) self.assertTrue(multi_result[0]['success']) self.assertTrue(multi_result[1]['success']) self.assertTrue(multi_result[2]['success']) in_amt = 3 out_amt = in_amt // 3 number_inputs = 0 # Single-sig if input_type == 'segwit' or input_type == 'all': self.wpk_rpc.sendtoaddress(sh_wpkh_addr, in_amt) self.wpk_rpc.sendtoaddress(wpkh_addr, in_amt) number_inputs += 2 if input_type == 'legacy' or input_type == 'all': self.wpk_rpc.sendtoaddress(pkh_addr, in_amt) number_inputs += 1 # Now do segwit/legacy multisig if multisig: if input_type == 'legacy' or input_type == 'all': self.wpk_rpc.sendtoaddress(sh_multi_addr, in_amt) number_inputs += 1 if input_type == 'segwit' or input_type == 'all': self.wpk_rpc.sendtoaddress(wsh_multi_addr, in_amt) self.wpk_rpc.sendtoaddress(sh_wsh_multi_addr, in_amt) number_inputs += 2 self.wpk_rpc.generatetoaddress(6, self.wpk_rpc.getnewaddress()) # Spend different amounts, requiring 1 to 3 inputs for i in range(number_inputs): # Create a psbt spending the above if i == number_inputs - 1: self.assertTrue((i + 1) * in_amt == self.wrpc.getbalance("*", 0, True)) psbt = self.wrpc.walletcreatefundedpsbt([], [{self.wpk_rpc.getnewaddress('', 'legacy'): (i + 1) * out_amt}, {self.wpk_rpc.getnewaddress('', 'p2sh-segwit'): (i + 1) * out_amt}, {self.wpk_rpc.getnewaddress('', 'bech32'): (i + 1) * out_amt}], 0, {'includeWatching': True, 'subtractFeeFromOutputs': [0, 1, 2]}, True) if external: # Sign with unknown inputs in two steps self._generate_and_finalize(True, psbt) # Sign all inputs all at once final_tx = self._generate_and_finalize(False, psbt) # Send off final tx to sweep the wallet self.wrpc.sendrawtransaction(final_tx) # Test wrapper to avoid mixed-inputs signing for Ledger def test_signtx(self): supports_mixed = {'coldcard', 'trezor_1', 'digitalbitbox', 'keepkey', 'trezor_t'} supports_multisig = {'ledger', 'trezor_1', 'digitalbitbox', 'keepkey', 'coldcard', 'trezor_t'} supports_external = {'ledger', 'trezor_1', 'digitalbitbox', 'keepkey', 'coldcard', 'trezor_t'} self._test_signtx("legacy", self.full_type in supports_multisig, self.full_type in supports_external) self._test_signtx("segwit", self.full_type in supports_multisig, self.full_type in supports_external) if self.full_type in supports_mixed: self._test_signtx("all", self.full_type in supports_multisig, self.full_type in supports_external) # Make a huge transaction which might cause some problems with different interfaces def test_big_tx(self): # make a huge transaction that is unrelated to the hardware wallet outputs = [] num_inputs = 60 for i in range(0, num_inputs): outputs.append({self.wpk_rpc.getnewaddress('', 'legacy'): 0.001}) psbt = self.wpk_rpc.walletcreatefundedpsbt([], outputs, 0, {}, True)['psbt'] psbt = self.wpk_rpc.walletprocesspsbt(psbt)['psbt'] tx = self.wpk_rpc.finalizepsbt(psbt)['hex'] txid = self.wpk_rpc.sendrawtransaction(tx) inputs = [] for i in range(0, num_inputs): inputs.append({'txid': txid, 'vout': i}) psbt = self.wpk_rpc.walletcreatefundedpsbt(inputs, [{self.wpk_rpc.getnewaddress('', 'legacy'): 0.001 * num_inputs}], 0, {'subtractFeeFromOutputs': [0]}, True)['psbt'] # For cli, this should throw an exception try: result = self.do_command(self.dev_args + ['signtx', psbt]) if self.interface == 'cli': self.fail('Big tx did not cause CLI to error') if self.type == 'coldcard': self.assertEqual(result['code'], -7) else: self.assertNotIn('code', result) self.assertNotIn('error', result) except OSError: if self.interface == 'cli': pass class TestDisplayAddress(DeviceTestCase): def test_display_address_bad_args(self): result = self.do_command(self.dev_args + ['displayaddress', '--sh_wpkh', '--wpkh', '--path', 'm/49h/1h/0h/0/0']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['code'], -7) def test_display_address_path(self): result = self.do_command(self.dev_args + ['displayaddress', '--path', 'm/44h/1h/0h/0/0']) self.assertNotIn('error', result) self.assertNotIn('code', result) self.assertIn('address', result) result = self.do_command(self.dev_args + ['displayaddress', '--sh_wpkh', '--path', 'm/49h/1h/0h/0/0']) self.assertNotIn('error', result) self.assertNotIn('code', result) self.assertIn('address', result) result = self.do_command(self.dev_args + ['displayaddress', '--wpkh', '--path', 'm/84h/1h/0h/0/0']) self.assertNotIn('error', result) self.assertNotIn('code', result) self.assertIn('address', result) def test_display_address_bad_path(self): result = self.do_command(self.dev_args + ['displayaddress', '--path', 'f']) self.assertEquals(result['code'], -7) def test_display_address_descriptor(self): account_xpub = self.do_command(self.dev_args + ['getxpub', 'm/84h/1h/0h'])['xpub'] p2sh_segwit_account_xpub = self.do_command(self.dev_args + ['getxpub', 'm/49h/1h/0h'])['xpub'] legacy_account_xpub = self.do_command(self.dev_args + ['getxpub', 'm/44h/1h/0h'])['xpub'] # Native SegWit address using xpub: result = self.do_command(self.dev_args + ['displayaddress', '--desc', 'wpkh([' + self.fingerprint + '/84h/1h/0h]' + account_xpub + '/0/0)']) self.assertNotIn('error', result) self.assertNotIn('code', result) self.assertIn('address', result) # Native SegWit address using hex encoded pubkey: result = self.do_command(self.dev_args + ['displayaddress', '--desc', 'wpkh([' + self.fingerprint + '/84h/1h/0h]' + xpub_to_pub_hex(account_xpub) + '/0/0)']) self.assertNotIn('error', result) self.assertNotIn('code', result) self.assertIn('address', result) # P2SH wrapped SegWit address using xpub: result = self.do_command(self.dev_args + ['displayaddress', '--desc', 'sh(wpkh([' + self.fingerprint + '/49h/1h/0h]' + p2sh_segwit_account_xpub + '/0/0))']) self.assertNotIn('error', result) self.assertNotIn('code', result) self.assertIn('address', result) # Legacy address result = self.do_command(self.dev_args + ['displayaddress', '--desc', 'pkh([' + self.fingerprint + '/44h/1h/0h]' + legacy_account_xpub + '/0/0)']) self.assertNotIn('error', result) self.assertNotIn('code', result) self.assertIn('address', result) # Should check xpub result = self.do_command(self.dev_args + ['displayaddress', '--desc', 'wpkh([' + self.fingerprint + '/84h/1h/0h]' + "not_and_xpub" + '/0/0)']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['code'], -7) # Should check hex pub result = self.do_command(self.dev_args + ['displayaddress', '--desc', 'wpkh([' + self.fingerprint + '/84h/1h/0h]' + "not_and_xpub" + '/0/0)']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['code'], -7) # Should check fingerprint self.do_command(self.dev_args + ['displayaddress', '--desc', 'wpkh([00000000/84h/1h/0h]' + account_xpub + '/0/0)']) self.assertIn('error', result) self.assertIn('code', result) self.assertEqual(result['code'], -7) def _make_single_multisig(self, addrtype): desc_pubkeys = [] sorted_pubkeys = [] for i in range(0, 3): path = "/48h/1h/{}h/0/0".format(i) origin = '{}{}'.format(self.fingerprint, path) xpub = self.do_command(self.dev_args + ["--expert", "getxpub", "m{}".format(path)]) desc_pubkeys.append("[{}]{}".format(origin, xpub["pubkey"])) sorted_pubkeys.append((xpub["pubkey"], origin)) sorted_pubkeys.sort(key=lambda tup: tup[0]) if addrtype == "pkh": desc = AddChecksum("sh(sortedmulti(2,{},{},{}))".format(desc_pubkeys[0], desc_pubkeys[1], desc_pubkeys[2])) ms_info = self.rpc.createmultisig(2, [x[0] for x in sorted_pubkeys], "legacy") elif addrtype == "sh_wpkh": desc = AddChecksum("sh(wsh(sortedmulti(2,{},{},{})))".format(desc_pubkeys[1], desc_pubkeys[2], desc_pubkeys[0])) ms_info = self.rpc.createmultisig(2, [x[0] for x in sorted_pubkeys], "p2sh-segwit") elif addrtype == "wpkh": desc = AddChecksum("wsh(sortedmulti(2,{},{},{}))".format(desc_pubkeys[2], desc_pubkeys[1], desc_pubkeys[0])) ms_info = self.rpc.createmultisig(2, [x[0] for x in sorted_pubkeys], "bech32") else: self.fail("Oops the test is broken") self.assertEqual(self.rpc.deriveaddresses(desc)[0], ms_info["address"]) path = "{},{},{}".format(sorted_pubkeys[0][1], sorted_pubkeys[1][1], sorted_pubkeys[2][1]) return ms_info["address"], desc, ms_info["redeemScript"], path def test_display_address_multisig(self): if self.full_type not in SUPPORTS_MS_DISPLAY: raise unittest.SkipTest("{} does not support multisig display".format(self.full_type)) for addrtype in ["pkh", "sh_wpkh", "wpkh"]: for use_desc in [True, False]: with self.subTest(addrtype=addrtype, use_desc=use_desc): addr, desc, rs, path = self._make_single_multisig(addrtype) if use_desc: args = ['displayaddress', '--desc', desc] else: args = ['displayaddress', '--path', path, '--redeem_script', rs] if addrtype != "pkh": args.append("--{}".format(addrtype)) result = self.do_command(self.dev_args + args) self.assertNotIn('error', result) self.assertNotIn('code', result) self.assertIn('address', result) if addrtype == "wpkh": # removes prefix and checksum since regtest gives # prefix `bcrt` on Bitcoin Core while wallets return testnet `tb` prefix self.assertEqual(addr[4:58], result['address'][2:56]) else: self.assertEqual(addr, result['address']) def test_display_address_xpub_multisig(self): if self.full_type not in SUPPORTS_XPUB_MS_DISPLAY: raise unittest.SkipTest("{} does not support multsig display with xpubs".format(self.full_type)) account_xpub = self.do_command(self.dev_args + ['getxpub', 'm/48h/1h/0h'])['xpub'] desc = 'wsh(multi(2,[' + self.fingerprint + '/48h/1h/0h]' + account_xpub + '/0/0,[' + self.fingerprint + '/48h/1h/0h]' + account_xpub + '/1/0))' result = self.do_command(self.dev_args + ['displayaddress', '--desc', desc]) self.assertNotIn('error', result) self.assertNotIn('code', result) self.assertIn('address', result) addr = self.rpc.deriveaddresses(AddChecksum(desc))[0] # removes prefix and checksum since regtest gives # prefix `bcrt` on Bitcoin Core while wallets return testnet `tb` prefix self.assertEqual(addr[4:58], result['address'][2:56]) class TestSignMessage(DeviceTestCase): def test_sign_msg(self): self.do_command(self.dev_args + ['signmessage', '"Message signing test"', 'm/44h/1h/0h/0/0']) def test_bad_path(self): result = self.do_command(self.dev_args + ['signmessage', '"Message signing test"', 'f']) self.assertEquals(result['code'], -7)