#!/usr/bin/env python3 """Reference demo orchestrator for the mineracks multisig HSM — interactive REAL-SIGNET policy HSM. All 3 sims are HSM policy signers (any 2 sign → real 2-of-3 failover). Chain access is the fleet's synced signet node (RPC over your private network). Keyless coordinator; on-device policy gates. Endpoints (nginx maps /api/ -> :8099/): GET /status -> JSON {balance_sat, devices, quorum, policy, sink_addr} GET /spend?amount=SATS&dest=sink|other -> SSE; builds+signs a real-signet spend. dest=other pays an off-whitelist address (HSM refuses); over max_amount refuses. GET /signmsg?text=... -> SSE; 2-of-3 sign a message (proof of control, no funds move) GET /device?n=N&on=0|1 -> toggle a signer on/off (failover / quorum demo) GET /policy?max_sat=&per_period_sat=&period_min= -> set policy (max-amount + velocity) + RE-ARM all sims """ import json, os, io, time, re, subprocess, threading, urllib.parse, hashlib, hmac, secrets from contextlib import contextmanager from base64 import b64decode, b64encode, b32decode, b32encode from http.cookies import SimpleCookie from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from ckcc.client import ColdcardDevice from ckcc.protocol import CCProtocolPacker from ckcc.constants import USER_AUTH_TOTP from ckcc.cli import real_file_upload HOME = os.path.expanduser('~'); V = '27.0' CLI = HOME + '/bitcoin-%s/bin/bitcoin-cli' % V CKCC = HOME + '/ccsim-venv/bin/ckcc' WALLET_FILE = HOME + '/cksim/ckms23.txt'; POLICY_FILE = HOME + '/cksim/policy.json' DEMO_RIG = HOME + '/cksim/demo_rig.sh' WALLET = 'ckms23-signet' RPC_HOST = os.environ.get('RPC_HOST', '127.0.0.1'); RPC_PORT = os.environ.get('RPC_PORT', '38332') RPC_USER = os.environ.get('RPC_USER', 'rpcuser'); RPC_PASS = os.environ.get('RPC_PASS', '') SOCK = {1: '/tmp/cksim-1.sock', 2: '/tmp/cksim-2.sock', 3: '/tmp/cksim-3.sock'} LET = {1: 'A', 2: 'B', 3: 'C'}; ALL = (1, 2, 3) DEFAULT_SAT = 2000; MSG_PATH = "m/48h/1h/0h/2h/0/0" TPUB = {'00000001': 'tpubREPLACE_WITH_YOUR_SIGNER_1_XPUB', '00000002': 'tpubREPLACE_WITH_YOUR_SIGNER_2_XPUB', '00000003': 'tpubREPLACE_WITH_YOUR_SIGNER_3_XPUB'} LOCK = threading.Lock() ENABLED = {1: True, 2: True, 3: True} POLICY = {'max_sat': 8000, 'per_period_sat': 500000, 'period_min': 60} _WALLET_READY = [False] # Coordinator-side GLOBAL velocity cap — the AUTHORITATIVE spend limit across ALL signers, and the epic's # hardest open problem. On-device per-signer velocity (POLICY['per_period_sat']) is only a backstop: under a # rotating "any 2 of 3" the three devices' local counters DRIFT (each tracks only the txns IT signed), and a # Coldcard decrements its counter the moment it signs — even if that PSBT is later dropped and never # broadcast. So no device's counter reflects true global outflow. The real cap therefore lives HERE in the # keyless coordinator and counts ONLY real broadcasts (the ledger) — so dropped PSBTs never burn budget. GLOBAL = {'per_period_sat': 50000, 'period_min': 60} # TOTP-gated SURGE tier: a signed-in owner supplies a TOTP code to unlock a higher on-device tier (bigger # per-txn cap + velocity) WITH a human in the loop — routine flows stay automated. The coordinator only # RELAYS the code (via user_auth) to the signers; the secret lives on the devices + the owner's authenticator. SURGE = {'max_sat': 200000, 'per_period_sat': 2000000} # tier-2 device limits (TOTP required) SURGE_GLOBAL = {'per_period_sat': 1000000} # coordinator global cap when a TOTP is present TOTP_SECRET_FILE = HOME + '/cksim/multisig-totp.secret' # owner's TOTP secret (chmod 600, NOT in git) LEDGER_FILE = HOME + '/cksim/spend_ledger.json' RESET_TS = [0] # demo-only: "Reset counter" moves the window start to now (production never resets) _TX_CACHE = {'t': 0.0, 'txs': None} def ledger_load(): try: return json.load(open(LEDGER_FILE)) except Exception: return [] def ledger_append(amount_sat, txid): led = (ledger_load() + [{'ts': int(time.time()), 'amount': int(amount_sat), 'txid': txid}])[-5000:] try: open(LEDGER_FILE, 'w').write(json.dumps(led)) except Exception: pass _TX_CACHE['txs'] = None # force the next global_spent to re-read the chain (so the gate sees this spend) def _recent_sends(): # cache the node's recent wallet txns ~4s so /status polling doesn't hammer RPC if time.time() - _TX_CACHE['t'] < 4 and _TX_CACHE['txs'] is not None: return _TX_CACHE['txs'] try: txs = json.loads(bc('listtransactions', '*', '300', '0', 'true', wallet=WALLET)) except Exception: return None _TX_CACHE['t'] = time.time(); _TX_CACHE['txs'] = txs; return txs def global_spent(): # CHAIN-DERIVED authoritative period outflow. The watch-only wallet's on-chain 'send' total over the # window IS ground truth, so ANY coordinator replica on ANY host recomputes the same number straight from # the blockchain — no single-host ledger to lose (no SPOF) and nothing to tamper. The local ledger is only # a cache, used if the node RPC is briefly unreachable. (Change back to the 2-of-3 is recognised as # internal by the descriptor wallet, so only real external sends — the refills — are counted.) # # MEMPOOL counts too: an unconfirmed (0-conf) send appears in listtransactions the moment it broadcasts, # so a burst of spends inside one block interval can't slip past the cap. We exclude only abandoned / # conflicted (confirmations < 0) sends, so a genuinely dropped tx doesn't permanently burn budget. cut = max(int(time.time()) - GLOBAL['period_min'] * 60, RESET_TS[0]) txs = _recent_sends() if txs is not None: return sum(abs(int(round(t.get('amount', 0) * 1e8))) for t in txs if t.get('category') == 'send' and t.get('time', 0) >= cut and not t.get('abandoned') and t.get('confirmations', 0) >= 0) return sum(int(e.get('amount', 0)) for e in ledger_load() if e.get('ts', 0) >= cut) # ---- Nostr (NIP-07) sign-in ------------------------------------------------- # Optional opt-in identity (same pattern as walletplayground): anonymous use is unaffected. A signed-in user # may authorise their OWN signet address onto every signer's on-device whitelist (a re-arm), instead of being # confined to the sink. Self-contained pure-Python BIP-340 verify (no crypto dep). The rig is shared, so the # authorised set is global; it's capped and the page says so. The HMAC secret lives only on the VM (chmod # 600, NOT in git): a leak only lets someone forge a session, never move funds (the keys are on the devices). SECRET_FILE = HOME + '/cksim/multisig-auth.secret' COOKIE = 'ms_session'; CHALLENGE_TTL = 300; SESSION_TTL = 60 * 60 * 24 * 30 EXTRA_WL = [] # authorised custom destination addresses (in addition to the sink); capped, global PUBKEYS = {} # uid -> nostr pubkey (in-memory, display only) def _load_secret(): try: s = open(SECRET_FILE, 'rb').read().strip() if len(s) >= 32: return s except FileNotFoundError: pass s = secrets.token_hex(32).encode() fd = os.open(SECRET_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) with os.fdopen(fd, 'wb') as f: f.write(s) return s SECRET = _load_secret() # secp256k1 BIP-340 schnorr verify (verify-only reference impl) _p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F _n = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 _G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798, 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8) def _modinv(a, m): return pow(a, m - 2, m) def _padd(P1, P2): if P1 is None: return P2 if P2 is None: return P1 if P1[0] == P2[0] and P1[1] != P2[1]: return None if P1 == P2: lam = (3 * P1[0] * P1[0] * _modinv(2 * P1[1], _p)) % _p else: lam = ((P2[1] - P1[1]) * _modinv(P2[0] - P1[0], _p)) % _p x3 = (lam * lam - P1[0] - P2[0]) % _p return (x3, (lam * (P1[0] - x3) - P1[1]) % _p) def _pmul(P, k): R = None while k: if k & 1: R = _padd(R, P) P = _padd(P, P); k >>= 1 return R def _lift_x(x): if x >= _p: return None y_sq = (pow(x, 3, _p) + 7) % _p y = pow(y_sq, (_p + 1) // 4, _p) if pow(y, 2, _p) != y_sq: return None return (x, y if y % 2 == 0 else _p - y) def _tagged(tag, msg): th = hashlib.sha256(tag.encode()).digest() return hashlib.sha256(th + th + msg).digest() def schnorr_verify(msg32, pub32, sig64): if len(msg32) != 32 or len(pub32) != 32 or len(sig64) != 64: return False P = _lift_x(int.from_bytes(pub32, 'big')) if P is None: return False r = int.from_bytes(sig64[:32], 'big'); s = int.from_bytes(sig64[32:], 'big') if r >= _p or s >= _n: return False e = int.from_bytes(_tagged('BIP0340/challenge', sig64[:32] + pub32 + msg32), 'big') % _n R = _padd(_pmul(_G, s), _pmul(P, _n - e)) return R is not None and R[1] % 2 == 0 and R[0] == r def nostr_event_id(ev): s = json.dumps([0, ev['pubkey'], ev['created_at'], ev['kind'], ev['tags'], ev['content']], separators=(',', ':'), ensure_ascii=False) return hashlib.sha256(s.encode('utf-8')).hexdigest() def _mac(msg): return hmac.new(SECRET, msg.encode(), hashlib.sha256).hexdigest()[:32] def make_challenge(): nonce = secrets.token_hex(16); ts = str(int(time.time())) return '%s.%s.%s' % (nonce, ts, _mac('%s.%s' % (nonce, ts))) def check_challenge(ch): try: nonce, ts, mac = ch.split('.') except (ValueError, AttributeError): return False if not hmac.compare_digest(mac, _mac('%s.%s' % (nonce, ts))): return False return abs(int(time.time()) - int(ts)) <= CHALLENGE_TTL def make_session(uid): exp = str(int(time.time()) + SESSION_TTL) return '%s.%s.%s' % (uid, exp, _mac('%s.%s' % (uid, exp))) def read_session(cookie_header): if not cookie_header: return None c = SimpleCookie() try: c.load(cookie_header) except Exception: return None if COOKIE not in c: return None try: uid, exp, mac = c[COOKIE].value.split('.') except ValueError: return None if not hmac.compare_digest(mac, _mac('%s.%s' % (uid, exp))): return None if int(time.time()) > int(exp): return None return uid def uid_for(provider, subject): return provider + ':' + hashlib.sha256(('%s:%s' % (provider, subject)).encode()).hexdigest()[:24] def add_wl(addr): if addr not in EXTRA_WL: EXTRA_WL.append(addr) while len(EXTRA_WL) > 25: EXTRA_WL.pop(0) def totp_secret(): # owner TOTP shared secret — generated once, persisted on the VM only (chmod 600, never in git), enrolled # on each signer + shown to the signed-in owner for their authenticator. (Demo simplification: production # enrols via the device's own QR so the coordinator never sees the secret.) try: s = open(TOTP_SECRET_FILE).read().strip() if s: return s except Exception: pass s = b32encode(secrets.token_bytes(20)).decode() fd = os.open(TOTP_SECRET_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) with os.fdopen(fd, 'w') as f: f.write(s) return s def bc(*a, wallet=None): base = [CLI, '-signet', '-rpcconnect=%s' % RPC_HOST, '-rpcport=%s' % RPC_PORT, '-rpcuser=%s' % RPC_USER, '-rpcpassword=%s' % RPC_PASS] if wallet: base.append('-rpcwallet=%s' % wallet) r = subprocess.run(base + list(a), capture_output=True, text=True, timeout=120) if r.returncode: raise RuntimeError('bitcoin-cli %s: %s' % (a[:1], r.stderr.strip()[:160])) return r.stdout.strip() def descstr(b): keys = ','.join('[%s/48h/1h/0h/2h]%s/%d/*' % (x, p, b) for x, p in TPUB.items()) return 'wsh(sortedmulti(2,%s))' % keys def _ck(d): return json.loads(bc('getdescriptorinfo', d))['descriptor'] def ensure_wallet(): if _WALLET_READY[0]: return if WALLET not in json.loads(bc('listwallets')): try: bc('loadwallet', WALLET) except RuntimeError: bc('createwallet', WALLET, 'true', 'true', '', 'false', 'true') ts = int(time.time()) - 14400 bc('importdescriptors', json.dumps([ {'desc': _ck(descstr(0)), 'active': True, 'internal': False, 'timestamp': ts, 'range': [0, 30]}, {'desc': _ck(descstr(1)), 'active': True, 'internal': True, 'timestamp': ts, 'range': [0, 30]}]), wallet=WALLET) _WALLET_READY[0] = True def signet_addr(): try: return open(HOME + '/cksim/signet_addr.txt').read().strip() except Exception: return '(addr missing)' def balance_sat(): ensure_wallet() us = json.loads(bc('listunspent', '0', wallet=WALLET)) return us, int(round(sum(u['amount'] for u in us) * 1e8)) # A single-sig "sink" wallet on the fleet node receives the demo spends (EXTERNAL to the 2-of-3, so the # Coldcard counts real outflow and max_amount actually bites). Each run first sweeps the sink back to the # 2-of-3 funding address, so funds circulate and the demo is self-sustaining (depletes only by fees). SINK = 'ckms-sink'; SINK_ADDR_FILE = HOME + '/cksim/sink_addr.txt' def ensure_sink(): if SINK not in json.loads(bc('listwallets')): try: bc('loadwallet', SINK) except RuntimeError: bc('createwallet', SINK) def sink_addr(): # STABLE whitelisted destination — reused (persisted) so it can live in each Coldcard's on-device # address whitelist; an anonymous demo spend may only pay this address. try: a = open(SINK_ADDR_FILE).read().strip() if a: return a except Exception: pass ensure_sink(); a = bc('getnewaddress', wallet=SINK); open(SINK_ADDR_FILE, 'w').write(a); return a def other_addr(): # A fresh sink-wallet address deliberately NOT on the whitelist (the whitelist-block demo). The spend to # it is refused on-device, so no coins ever land here. ensure_sink(); return bc('getnewaddress', wallet=SINK) def reclaim_sink(): # Sweep CONFIRMED sink coins back (minconf=1 -> trusted, so no untrusted-0-conf "-6" errors and no # ever-growing unconfirmed chain). Sink funds become reclaimable one signet block after each demo spend. try: ensure_sink() if json.loads(bc('listunspent', '1', wallet=SINK)): bc('-named', 'sendall', 'recipients=' + json.dumps([signet_addr()]), 'fee_rate=1', 'options=' + json.dumps({'send_max': True}), wallet=SINK) except Exception: pass # ---- device helpers ---------------------------------------------------------- # The ckcc simulator client BINDS a per-(simpid,ourpid) unix socket with only 5 instance slots; a leaked # (un-closed) ColdcardDevice holds its slot until process exit (atexit), so opening many without closing # eventually exhausts the slots -> "[Errno 98] Address already in use". So EVERY device is opened via this # context manager and closed immediately after use — never leak a ColdcardDevice in a long-running process. @contextmanager def dev(n): d = ColdcardDevice(sn=SOCK[n]) try: yield d finally: try: d.close() except Exception: pass def is_active(n): with dev(n) as d: return bool(hstat_raw(d).get('active')) def E(d, c): r = d.send_recv(b'EXEC' + c.encode(), encrypt=False) return r.decode() if isinstance(r, (bytes, bytearray)) else r def Vv(d, c): r = d.send_recv(b'EVAL' + c.encode()) return (r.decode() if isinstance(r, (bytes, bytearray)) else r).strip() def K(d, k): d.send_recv(CCProtocolPacker.sim_keypress(k.encode('ascii'))) def hstat_raw(d): try: r = d.send_recv(CCProtocolPacker.hsm_status()); r = r.decode() if isinstance(r, (bytes, bytearray)) else r return json.loads(r) except Exception: return {} def hstat(n): with dev(n) as d: s = hstat_raw(d) return {'active': bool(s.get('active')), 'approved': s.get('approvals', s.get('approved', 0)), 'refused': s.get('refusals', s.get('refused', 0))} # The sim sockets are single-client, so concurrent device reads (status poll vs a running spend) collide # and make a live signer look offline. A single background poller reads the devices under LOCK and caches # the result; the /status endpoint serves the cache (never touches a device). During a spend (which holds # LOCK) the poller simply waits — no contention. STATUS_CACHE = {n: {'active': False, 'approved': 0, 'refused': 0} for n in ALL} HEAL = {n: 0.0 for n in ALL} # last auto-heal attempt per signer (cooldown) _LAST_HEALTH = [''] def _quorum(): return sum(1 for n in ALL if ENABLED[n] and STATUS_CACHE[n]['active']) def status_poller(): tick = 0 while True: time.sleep(2); tick += 1 with LOCK: for n in ALL: try: STATUS_CACHE[n] = hstat(n) except Exception: # unreachable/dead signer -> mark inactive so quorum + heal see the truth STATUS_CACHE[n] = {'active': False, 'approved': STATUS_CACHE[n]['approved'], 'refused': STATUS_CACHE[n]['refused']} # quorum-health line for Loki/Grafana/Sentinel — on change, plus a ~30s heartbeat q = _quorum(); ready = sum(1 for n in ALL if STATUS_CACHE[n]['active']) lvl = 'OK' if q >= 3 else ('WARN' if q == 2 else 'CRIT') msg = 'multisig-health quorum=%d needed=2 signers_ready=%d/3 status=%s' % (q, ready, lvl) if msg != _LAST_HEALTH[0] or tick % 15 == 0: _LAST_HEALTH[0] = msg; print(msg, flush=True) def heal_loop(): # Boot-to-signing-ready auto-heal: re-arm any ENABLED signer that is reachable but not HSM-active (e.g. it # rebooted and came back online). The quorum self-heals unattended. Cooldown avoids hammering; a signer # whose process is gone can't be healed here (it raises) — the rig must relaunch it first. time.sleep(24) while True: time.sleep(10) for n in ALL: if not ENABLED[n] or STATUS_CACHE[n]['active'] or time.time() - HEAL[n] < 25: continue HEAL[n] = time.time() try: with LOCK: if STATUS_CACHE[n]['active']: continue print('auto-heal: signer %d not signing-ready — re-arming…' % n, flush=True) arm_one(n, lambda *a, **k: None) print('auto-heal: signer %d re-armed to signing-ready' % n, flush=True) except Exception as e: print('auto-heal: signer %d still unreachable (%s)' % (n, str(e)[:60]), flush=True) def story(d): return E(d, "RV.write(repr(sim_display.story))") def ms_count(d): try: return int(Vv(d, "len(settings.get('multisig',[]))") or 0) except Exception: return 0 def write_policy(): # Rule dimensions, all enforced ON-DEVICE: max_amount (per-txn), per_period+period (velocity), # whitelist (allowed destinations — change back to ckms23 is auto-recognised and not gated by it). # Two-tier, FIRST-MATCH ordered: rule#1 = automated baseline (no users); rule#2 = TOTP-gated surge # (higher cap + velocity, requires the "owner" to present a code). A small spend matches rule#1 and # auto-signs; one that exceeds it falls through to rule#2 and needs the TOTP. wl = [sink_addr()] + list(EXTRA_WL) pol = {"must_log": True, "period": POLICY['period_min'], "msg_paths": ["any"], # allow on-device message signing under HSM (proof-of-control demo) "rules": [ {"max_amount": POLICY['max_sat'], "per_period": POLICY['per_period_sat'], "wallet": "ckms23", "whitelist": wl}, {"max_amount": SURGE['max_sat'], "per_period": SURGE['per_period_sat'], "wallet": "ckms23", "whitelist": wl, "users": ["owner"], "min_users": 1}]} open(POLICY_FILE, 'w').write(json.dumps(pol)) def arm_one(n, log): with dev(n) as d: if hstat_raw(d).get('active'): return need_reg = ms_count(d) < 1 if need_reg: log('coldcard %s · registering the 2-of-3 wallet…' % LET[n], 'c') subprocess.run([CKCC, '-c', SOCK[n], 'upload', '-m', WALLET_FILE], capture_output=True) time.sleep(1.2) with dev(n) as d2: for _ in range(10): if ms_count(d2) >= 1: break K(d2, 'y'); time.sleep(0.8) with dev(n) as d: E(d, "settings.put('hsmcmd', True); settings.save(); RV.write('ok')") try: # enrol the "owner" TOTP user (referenced by the surge rule) — before HSM start, fresh device d.send_recv(CCProtocolPacker.create_user(b'owner', USER_AUTH_TOTP, b32decode(totp_secret(), casefold=True))) except Exception: pass log('coldcard %s · entering HSM mode (auto ≤%d · surge ≤%d w/ TOTP)…' % (LET[n], POLICY['max_sat'], SURGE['max_sat']), 'c') flen, sha = real_file_upload(open(POLICY_FILE, 'rb'), d) d.send_recv(CCProtocolPacker.hsm_start(flen, sha)); time.sleep(1.3) for _ in range(18): if hstat_raw(d).get('active'): break s = story(d); m = re.search(r'Press \((\d)\) to save', s) if m: K(d, m.group(1)); time.sleep(1.4); continue if 'Press OK to enable' in s or s.startswith("('Start HSM?'"): K(d, 'y'); time.sleep(1.4); continue time.sleep(0.4) log('coldcard %s · HSM ACTIVE ✓' % LET[n], 'g') def arm_all(log): write_policy() for n in ALL: arm_one(n, log) def rearm_all(log): write_policy() log('restarting all signers to load the new policy…', 'c') subprocess.run(['bash', DEMO_RIG], capture_output=True, timeout=150) time.sleep(3) for n in ALL: arm_one(n, log) log('policy applied · ≤%d sats/txn · ≤%d sats per %d min · whitelist enforced on all signers' % (POLICY['max_sat'], POLICY['per_period_sat'], POLICY['period_min']), 'g') def _reason(d, fallback): # The device records WHY it refused in hsm_status.last_refusal (e.g. "over per-txn limit", # "exceeds velocity", "address not allowed", "has 1 warning(s)"). Surface it — far clearer than the # generic ckcc "You refused permission" string, and it makes the policy dimension obvious in the demo. try: time.sleep(0.2) r = re.sub(r'^\s*Rejected:\s*', '', hstat_raw(d).get('last_refusal') or '').strip() if r: return r[:90] except Exception: pass return fallback def sign_hsm(n, psbt, code=None): with dev(n) as d: if code: # relay the owner's TOTP so the device's surge rule (min_users) is satisfied try: d.send_recv(CCProtocolPacker.user_auth(b'owner', code.encode('ascii'), int(time.time()) // 30)) except Exception: pass before = hstat_raw(d).get('refusals', hstat_raw(d).get('refused', 0)) flen, sha = real_file_upload(io.BytesIO(psbt), d) try: d.send_recv(CCProtocolPacker.sign_transaction(flen, sha, flags=0x0), timeout=None) except Exception as e: return ('refused', _reason(d, str(e)[:80])) for _ in range(40): time.sleep(0.4) try: r = d.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None) except Exception as e: return ('refused', _reason(d, str(e)[:80])) if r is not None: rl, rsha = r return ('signed', b64encode(d.download_file(rl, rsha, file_number=1)).decode()) cur = hstat_raw(d) if cur.get('refusals', cur.get('refused', 0)) > before: return ('refused', _reason(d, 'on-device policy denied the spend')) return ('refused', 'no signature (policy)') # ---- streamed spend ---------------------------------------------------------- def sse_spend(send, amount_sat, dest_kind='sink', custom_addr=None, code=None): def log(t, cls=''): send({'log': t, 'cls': cls}) def step(n): send({'step': n}); def done(n): send({'done_step': n}) amount_sat = max(546, int(amount_sat)) amount_btc = amount_sat / 1e8 surge = bool(code) over = amount_sat > POLICY['max_sat'] off_list = dest_kind == 'other' avail = [n for n in ALL if ENABLED[n] and is_active(n)] if len(avail) < 2: log('quorum lost · only %d policy signer(s) online — a 2-of-3 spend needs 2.' % len(avail), 'y') log('(this is the failure-domain property: lose any one signer and you keep operating; lose two and funds are safe but frozen.)', 'c') send({'end': True}); return signers = avail[:2] # COORDINATOR global velocity gate — the authoritative cap, enforced BEFORE any signer is asked, and # counting only real broadcasts. Off-list spends are skipped (the devices refuse them on the whitelist). if not off_list: cap = SURGE_GLOBAL['per_period_sat'] if surge else GLOBAL['per_period_sat'] spent = global_spent() if spent + amount_sat > cap: log('coordinator %s velocity: this %d-sat spend would push the %d-min total to %d > the %d-sat cap.' % ('surge' if surge else 'global', amount_sat, GLOBAL['period_min'], spent + amount_sat, cap), 'y') log('blocked at the keyless coordinator, before any signer is asked — per-device velocity is only a backstop; the real cap is global and counts only broadcast txns, so dropped PSBTs never burn budget.', 'c') send({'refused': True, 'end': True}); return reclaim_sink() # pull back funds from prior demo spends -> self-sustaining us, bal = balance_sat() if bal < amount_sat + 500: log('the 2-of-3 wallet needs confirmed signet coins. Send some to:', 'y'); log(' ' + signet_addr(), 'a') send({'end': True}); return step(1) if surge: log('SURGE: building a %d-sat PSBT — TOTP-authorised tier 2 (cap %d, velocity %d)…' % (amount_sat, SURGE['max_sat'], SURGE['per_period_sat']), 'c') elif over: log('building a PSBT that EXCEEDS the automated tier: %d sats > the %d-sat cap (no TOTP)…' % (amount_sat, POLICY['max_sat']), 'c') else: log('building a PSBT within policy: %d sats (limit %d)…' % (amount_sat, POLICY['max_sat']), 'c') if dest_kind == 'custom' and custom_addr: dest = custom_addr # the signed-in user's authorised (whitelisted) address log('paying your authorised address %s (on the whitelist)…' % custom_addr, 'c') elif off_list: dest = other_addr() # NOT on the on-device whitelist -> the HSM should refuse the payee log('paying an address that is NOT on the policy whitelist — the signers should refuse it…', 'c') else: dest = sink_addr() # EXTERNAL to the 2-of-3 (whitelisted) -> real outflow, policy bites # Seed the spend with the LARGEST UTXO. Coldcard's HSM rejects ANY PSBT carrying a warning, and "Big Fee" # fires when the network fee exceeds 5% of total value. Core's default minimal coin-selection would grab a # tiny fragmented UTXO (fee then ~6%+), so we pin the biggest UTXO as the seed input (add_inputs tops up if # needed) -> fee is a fraction of a percent. fee_rate=1 for extra headroom. include_unsafe: spend our own # 0-conf coins (sink-reclaim returns funds as untrusted 0-conf). seed = max(us, key=lambda u: u['amount']) try: funded = json.loads(bc('walletcreatefundedpsbt', json.dumps([{'txid': seed['txid'], 'vout': seed['vout']}]), json.dumps([{dest: round(amount_btc, 8)}]), '0', json.dumps({'fee_rate': 1, 'change_type': 'bech32', 'include_unsafe': True, 'add_inputs': True}), wallet=WALLET)) except RuntimeError as e: m = str(e) if 'Insufficient' in m or 'mempool-chain' in m or 'too-long' in m or 'too long' in m: log('the demo wallet is briefly tied up in unconfirmed transactions — give the next signet block ~10 min, or top up the address.', 'y') else: log('build error: ' + m[:140], 'e') send({'end': True}); return psbt = b64decode(funded['psbt']); done(1) step(2); log('coordinator fans the PSBT to signers %s (it holds no keys)…' % '+'.join(LET[n] for n in signers), 'c'); time.sleep(0.4); done(2) step(3) if surge: log('relaying your TOTP authorisation to each signer (the coordinator never holds the secret)…', 'c') sigs = []; refusals = 0 for n in signers: log('coldcard %s · evaluating against its on-device policy…' % LET[n], 'c') kind, payload = sign_hsm(n, psbt, code) if kind == 'signed': sigs.append(payload); log('coldcard %s · signed ✓' % LET[n], 'g') else: refusals += 1; log('coldcard %s · REFUSED ✗ — %s' % (LET[n], payload), 'y') if refusals: log('policy held: the spend was blocked on-device. No transaction created.', 'y') log('// the device refused to sign — exactly what a programmable HSM is for.', 'c') send({'refused': True, 'end': True}); return done(3) step(4); fin = json.loads(bc('finalizepsbt', bc('combinepsbt', json.dumps(sigs)))) if not fin.get('complete'): log('ERROR: signatures did not finalize', 'e'); send({'end': True}); return log('combined 2 partial signatures · transaction complete ✓', 'g'); done(4) step(5); txid = bc('sendrawtransaction', fin['hex']) if not off_list: ledger_append(amount_sat, txid) # only real broadcasts count toward the global cap log('broadcast to signet ✓ txid ' + txid, 'a') log('watch it confirm live → mempool.space/signet/tx/' + txid, 'g') if surge: log('// TOTP-authorised surge: 2 of 3 signed a larger spend with the owner in the loop — still bounded by the on-device surge ceiling.', 'c') else: log('// two independent Coldcards moved real signet coins under policy — no human in the loop.', 'c') done(5); send({'end': True}) # ---- streamed message signing (2-of-3 proof of control, no funds moved) ------ def sign_msg_one(n, text): r = subprocess.run([CKCC, '-c', SOCK[n], 'msg', text, '-p', MSG_PATH], capture_output=True, text=True, timeout=45) if r.returncode != 0: return ('fail', (r.stderr.strip() or r.stdout.strip() or 'refused')[:90]) lines = [l.strip() for l in r.stdout.replace('\r', '\n').splitlines() if l.strip() and 'Waiting' not in l] sig = lines[-1] if lines else '' addr = lines[-2] if len(lines) >= 2 else '' return ('signed', (addr, sig)) def sse_signmsg(send, text): def log(t, cls=''): send({'log': t, 'cls': cls}) text = (text or '').strip()[:140] or 'mineracks 2-of-3 proof-of-control' avail = [n for n in ALL if ENABLED[n] and is_active(n)] if len(avail) < 2: log('quorum lost · only %d signer(s) online — need 2 keyholders to attest.' % len(avail), 'y'); send({'end': True}); return signers = avail[:2] log('$ coordinator sign-message', 'c') log('challenge: "%s"' % text, 'a') log('asking %s to each sign it with their key in the 2-of-3 wallet (HSM · no human)…' % '+'.join(LET[n] for n in signers), 'c') got = 0 for n in signers: kind, payload = sign_msg_one(n, text) if kind == 'signed': addr, sig = payload; got += 1 log('coldcard %s attested ✓' % LET[n], 'g') log(' address ' + addr, 'a') log(' signature ' + sig, 'a') else: log('coldcard %s could not sign — %s' % (LET[n], payload), 'y') if got >= 2: log('2 of 3 keyholders signed the same challenge ✓ — verifiable proof of 2-of-3 control, no funds moved.', 'g') log('// verify each signature against its address with any Bitcoin signed-message tool.', 'c') send({'end': True}) # ---- http -------------------------------------------------------------------- def status_json(): try: _, bal = balance_sat() except Exception: bal = -1 devs = {} for n in ALL: h = STATUS_CACHE[n] devs[str(n)] = {'on': ENABLED[n], 'active': h['active'], 'approved': h['approved'], 'refused': h['refused']} online = sum(1 for n in ALL if ENABLED[n] and devs[str(n)]['active']) return {'balance_sat': bal, 'devices': devs, 'quorum_online': online, 'quorum_needed': 2, 'policy': {'max_sat': POLICY['max_sat'], 'per_period_sat': POLICY['per_period_sat'], 'period_min': POLICY['period_min']}, 'fund_addr': signet_addr(), 'sink_addr': sink_addr(), 'authorized': list(EXTRA_WL), 'global': {'per_period_sat': GLOBAL['per_period_sat'], 'period_min': GLOBAL['period_min'], 'spent': global_spent()}, 'surge': {'max_sat': SURGE['max_sat'], 'per_period_sat': SURGE['per_period_sat'], 'global_per_period_sat': SURGE_GLOBAL['per_period_sat']}} class H(BaseHTTPRequestHandler): def _json(self, obj, code=200, set_cookie=None): b = json.dumps(obj).encode(); self.send_response(code) self.send_header('Content-Type', 'application/json'); self.send_header('Cache-Control', 'no-cache') if set_cookie: self.send_header('Set-Cookie', set_cookie) self.send_header('Content-Length', str(len(b))); self.end_headers(); self.wfile.write(b) def _uid(self): return read_session(self.headers.get('Cookie', '')) def _sse(self): self.send_response(200); self.send_header('Content-Type', 'text/event-stream') self.send_header('Cache-Control', 'no-cache'); self.end_headers() def send(obj): self.wfile.write(('data: %s\n\n' % json.dumps(obj)).encode()); self.wfile.flush() return send def do_GET(self): u = urllib.parse.urlparse(self.path); p = u.path.rstrip('/'); q = urllib.parse.parse_qs(u.query) if p in ('', '/status'): try: self._json(status_json()) except Exception as e: self._json({'error': str(e)[:120]}, 500) return if p == '/auth/me': uid = self._uid(); self._json({'user': uid, 'pubkey': PUBKEYS.get(uid) if uid else None}); return if p == '/auth/nostr/challenge': self._json({'challenge': make_challenge()}); return if p == '/totp_enroll': if not self._uid(): self._json({'error': 'sign in required'}, 401); return sec = totp_secret() self._json({'secret': sec, 'otpauth': 'otpauth://totp/multisigHSM:owner?secret=%s&issuer=multisigHSM&period=30&digits=6' % sec}); return if p == '/device': n = int(q.get('n', [0])[0]); on = q.get('on', ['1'])[0] == '1' if n in ALL: ENABLED[n] = on self._json({'ok': True, 'devices': status_json()['devices']}); return if p == '/globalpolicy': if q.get('reset', [''])[0] == '1': RESET_TS[0] = int(time.time()); _TX_CACHE['txs'] = None # move the window start to now (demo) try: open(LEDGER_FILE, 'w').write('[]') except Exception: pass try: GLOBAL['per_period_sat'] = max(546, min(10**10, int(q.get('per_period_sat', [GLOBAL['per_period_sat']])[0]))) except Exception: pass try: GLOBAL['period_min'] = max(1, min(10080, int(q.get('period_min', [GLOBAL['period_min']])[0]))) except Exception: pass self._json({'ok': True, 'global': status_json()['global']}); return if p == '/policy': try: POLICY['max_sat'] = max(546, min(100000000, int(q.get('max_sat', [POLICY['max_sat']])[0]))) except Exception: pass try: POLICY['per_period_sat'] = max(546, min(1000000000, int(q.get('per_period_sat', [POLICY['per_period_sat']])[0]))) except Exception: pass try: POLICY['period_min'] = max(1, min(1440, int(q.get('period_min', [POLICY['period_min']])[0]))) except Exception: pass send = self._sse() try: with LOCK: rearm_all(lambda t, c='': send({'log': t, 'cls': c})) send({'end': True}) except Exception as e: try: send({'log': 'policy re-arm error: %s' % str(e)[:160], 'cls': 'e'}); send({'end': True}) except Exception: pass return if p == '/surgepolicy': send = self._sse() if not self._uid(): send({'log': 'sign in with Nostr to configure the TOTP surge tier.', 'cls': 'y'}); send({'end': True}); return try: SURGE['max_sat'] = max(POLICY['max_sat'], min(10**9, int(q.get('max_sat', [SURGE['max_sat']])[0]))) except Exception: pass try: SURGE['per_period_sat'] = max(SURGE['max_sat'], min(10**10, int(q.get('per_period_sat', [SURGE['per_period_sat']])[0]))) except Exception: pass try: SURGE_GLOBAL['per_period_sat'] = max(GLOBAL['per_period_sat'], min(10**10, int(q.get('global_per_period_sat', [SURGE_GLOBAL['per_period_sat']])[0]))) except Exception: pass try: with LOCK: rearm_all(lambda t, c='': send({'log': t, 'cls': c})) send({'log': 'surge tier set · TOTP unlocks ≤%d sats/txn · ≤%d sats/period' % (SURGE['max_sat'], SURGE['per_period_sat']), 'cls': 'g'}) send({'end': True}) except Exception as e: try: send({'log': 'surge re-arm error: %s' % str(e)[:160], 'cls': 'e'}); send({'end': True}) except Exception: pass return if p == '/authorize': send = self._sse() try: uid = self._uid() if not uid: send({'log': 'sign in with Nostr first to authorise your own address.', 'cls': 'y'}); send({'end': True}); return addr = q.get('addr', [''])[0].strip() try: valid = json.loads(bc('validateaddress', addr)).get('isvalid', False) except Exception: valid = False if not valid: send({'log': 'that is not a valid signet address.', 'cls': 'y'}); send({'end': True}); return with LOCK: add_wl(addr) send({'log': 'adding %s to the policy whitelist on all 3 signers…' % addr, 'cls': 'c'}) rearm_all(lambda t, c='': send({'log': t, 'cls': c})) send({'log': 'your address is now whitelisted — you can pay it; anonymous users still cannot.', 'cls': 'g'}) send({'authorized': addr, 'end': True}) except Exception as e: try: send({'log': 'authorize error: %s' % str(e)[:160], 'cls': 'e'}); send({'end': True}) except Exception: pass return if p == '/reboot': send = self._sse() try: n = int(q.get('n', [0])[0]) if n not in ALL: send({'log': 'bad signer', 'cls': 'e'}); send({'end': True}); return send({'log': 'rebooting signer %s — power-cycling the device (loses HSM mode)…' % LET[n], 'cls': 'c'}); send({'step': 0}) HEAL[n] = 0 # let auto-heal act immediately subprocess.run(['bash', DEMO_RIG, str(n)], capture_output=True, timeout=120) send({'log': 'signer %s back online but NOT signing-ready (fresh boot, no HSM) — quorum is now %d/3.' % (LET[n], _quorum()), 'cls': 'y'}) send({'log': 'the coordinator detects it and AUTO-HEALS — re-registering the wallet, re-enrolling, re-entering HSM, unattended…', 'cls': 'c'}) ok = False for _ in range(45): time.sleep(1.5) if STATUS_CACHE[n]['active']: ok = True; break if ok: send({'log': 'auto-heal complete: signer %s is signing-ready again ✓ — quorum restored to %d/3.' % (LET[n], _quorum()), 'cls': 'g'}) send({'log': '// boot-to-signing-ready: no human re-armed it. A rebooted signer self-rejoins the quorum.', 'cls': 'c'}) else: send({'log': 'signer %s not back yet — auto-heal keeps retrying every ~10s in the background.' % LET[n], 'cls': 'y'}) send({'end': True}) except Exception as e: try: send({'log': 'reboot error: %s' % str(e)[:120], 'cls': 'e'}); send({'end': True}) except Exception: pass return if p in ('/spend', '/signmsg'): send = self._sse() try: if p == '/spend': try: amount = int(q.get('amount', [DEFAULT_SAT])[0]) except Exception: amount = DEFAULT_SAT dk = q.get('dest', ['sink'])[0] code = (q.get('code', [''])[0] or '').strip() or None uid = self._uid() if code and not uid: send({'log': 'sign in with Nostr to use a TOTP surge authorisation.', 'cls': 'y'}); send({'end': True}) elif code and not (code.isdigit() and len(code) == 6): send({'log': 'a TOTP code is 6 digits.', 'cls': 'y'}); send({'end': True}) elif dk == 'custom': addr = q.get('addr', [''])[0].strip() if not uid or not addr: send({'log': 'sign in with Nostr and authorise an address first.', 'cls': 'y'}); send({'end': True}) else: with LOCK: sse_spend(send, amount, 'custom', addr, code) elif dk == 'other': with LOCK: sse_spend(send, amount, 'other', None, code) else: with LOCK: sse_spend(send, amount, 'sink', None, code) else: with LOCK: sse_signmsg(send, q.get('text', [''])[0]) except Exception as e: try: send({'log': 'ERROR: %s' % str(e)[:200], 'cls': 'e'}); send({'end': True}) except Exception: pass return self.send_response(404); self.end_headers() def do_POST(self): p = urllib.parse.urlparse(self.path).path.rstrip('/') n = int(self.headers.get('Content-Length', 0) or 0) body = self.rfile.read(n) if n else b'' if p == '/auth/nostr/verify': try: ev = json.loads(body) except Exception: return self._json({'error': 'bad json'}, 400) try: if nostr_event_id(ev) != ev['id']: return self._json({'error': 'bad event id'}, 400) if not check_challenge(ev.get('content', '')): return self._json({'error': 'stale or invalid challenge'}, 401) ok = schnorr_verify(bytes.fromhex(ev['id']), bytes.fromhex(ev['pubkey']), bytes.fromhex(ev['sig'])) except (KeyError, ValueError): return self._json({'error': 'malformed event'}, 400) if not ok: return self._json({'error': 'signature verification failed'}, 401) uid = uid_for('nostr', ev['pubkey']); PUBKEYS[uid] = ev['pubkey'] cookie = '%s=%s; Path=/; Max-Age=%d; HttpOnly; Secure; SameSite=Lax' % (COOKIE, make_session(uid), SESSION_TTL) return self._json({'user': uid, 'pubkey': ev['pubkey']}, 200, cookie) if p == '/auth/logout': return self._json({'ok': True}, 200, '%s=; Path=/; Max-Age=0; HttpOnly; Secure; SameSite=Lax' % COOKIE) self.send_response(404); self.end_headers() def log_message(self, *a): pass def startup(): time.sleep(8) try: ensure_wallet() except Exception as e: print('wallet init:', e, flush=True) with LOCK: try: arm_all(lambda *a, **k: None) except Exception as e: print('startup arm:', e, flush=True) if __name__ == '__main__': threading.Thread(target=startup, daemon=True).start() threading.Thread(target=status_poller, daemon=True).start() threading.Thread(target=heal_loop, daemon=True).start() print('interactive signet orchestrator on :8099', flush=True) ThreadingHTTPServer(('127.0.0.1', 8099), H).serve_forever()