multisig-hsm/reference/orchestrator.py
mineracks 7a17ffd12e Initial public release — multisig HSM reference + recipe book
Open-source 2-of-3 policy-enforced threshold HSM: auto-signs cold→hot treasury
refills under on-device Coldcard policy, no human in the loop. Includes the full
operator manual + quick-start, the reference coordinator/signing code, and a
signer-host bootstrap. No keys, seeds, or secrets — placeholders only.

Live signet demo: https://multisighsm.mineracks.com

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 13:56:51 +10:00

776 lines
43 KiB
Python

#!/usr/bin/env python3
"""Reference demo orchestrator for the mineracks multisig HSM — interactive REAL-SIGNET policy HSM.
All 3 sims are HSM policy signers (any 2 sign → real 2-of-3 failover). Chain access is the fleet's
synced signet node (RPC over your private network). Keyless coordinator; on-device policy gates.
Endpoints (nginx maps /api/ -> :8099/):
GET /status -> JSON {balance_sat, devices, quorum, policy, sink_addr}
GET /spend?amount=SATS&dest=sink|other -> SSE; builds+signs a real-signet spend. dest=other pays an
off-whitelist address (HSM refuses); over max_amount refuses.
GET /signmsg?text=... -> SSE; 2-of-3 sign a message (proof of control, no funds move)
GET /device?n=N&on=0|1 -> toggle a signer on/off (failover / quorum demo)
GET /policy?max_sat=&per_period_sat=&period_min= -> set policy (max-amount + velocity) + RE-ARM all sims
"""
import json, os, io, time, re, subprocess, threading, urllib.parse, hashlib, hmac, secrets
from contextlib import contextmanager
from base64 import b64decode, b64encode, b32decode, b32encode
from http.cookies import SimpleCookie
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from ckcc.client import ColdcardDevice
from ckcc.protocol import CCProtocolPacker
from ckcc.constants import USER_AUTH_TOTP
from ckcc.cli import real_file_upload
HOME = os.path.expanduser('~'); V = '27.0'
CLI = HOME + '/bitcoin-%s/bin/bitcoin-cli' % V
CKCC = HOME + '/ccsim-venv/bin/ckcc'
WALLET_FILE = HOME + '/cksim/ckms23.txt'; POLICY_FILE = HOME + '/cksim/policy.json'
DEMO_RIG = HOME + '/cksim/demo_rig.sh'
WALLET = 'ckms23-signet'
RPC_HOST = os.environ.get('RPC_HOST', '127.0.0.1'); RPC_PORT = os.environ.get('RPC_PORT', '38332')
RPC_USER = os.environ.get('RPC_USER', 'rpcuser'); RPC_PASS = os.environ.get('RPC_PASS', '')
SOCK = {1: '/tmp/cksim-1.sock', 2: '/tmp/cksim-2.sock', 3: '/tmp/cksim-3.sock'}
LET = {1: 'A', 2: 'B', 3: 'C'}; ALL = (1, 2, 3)
DEFAULT_SAT = 2000; MSG_PATH = "m/48h/1h/0h/2h/0/0"
TPUB = {'00000001': 'tpubREPLACE_WITH_YOUR_SIGNER_1_XPUB',
'00000002': 'tpubREPLACE_WITH_YOUR_SIGNER_2_XPUB',
'00000003': 'tpubREPLACE_WITH_YOUR_SIGNER_3_XPUB'}
LOCK = threading.Lock()
ENABLED = {1: True, 2: True, 3: True}
POLICY = {'max_sat': 8000, 'per_period_sat': 500000, 'period_min': 60}
_WALLET_READY = [False]
# Coordinator-side GLOBAL velocity cap — the AUTHORITATIVE spend limit across ALL signers, and the epic's
# hardest open problem. On-device per-signer velocity (POLICY['per_period_sat']) is only a backstop: under a
# rotating "any 2 of 3" the three devices' local counters DRIFT (each tracks only the txns IT signed), and a
# Coldcard decrements its counter the moment it signs — even if that PSBT is later dropped and never
# broadcast. So no device's counter reflects true global outflow. The real cap therefore lives HERE in the
# keyless coordinator and counts ONLY real broadcasts (the ledger) — so dropped PSBTs never burn budget.
GLOBAL = {'per_period_sat': 50000, 'period_min': 60}
# TOTP-gated SURGE tier: a signed-in owner supplies a TOTP code to unlock a higher on-device tier (bigger
# per-txn cap + velocity) WITH a human in the loop — routine flows stay automated. The coordinator only
# RELAYS the code (via user_auth) to the signers; the secret lives on the devices + the owner's authenticator.
SURGE = {'max_sat': 200000, 'per_period_sat': 2000000} # tier-2 device limits (TOTP required)
SURGE_GLOBAL = {'per_period_sat': 1000000} # coordinator global cap when a TOTP is present
TOTP_SECRET_FILE = HOME + '/cksim/multisig-totp.secret' # owner's TOTP secret (chmod 600, NOT in git)
LEDGER_FILE = HOME + '/cksim/spend_ledger.json'
RESET_TS = [0] # demo-only: "Reset counter" moves the window start to now (production never resets)
_TX_CACHE = {'t': 0.0, 'txs': None}
def ledger_load():
try: return json.load(open(LEDGER_FILE))
except Exception: return []
def ledger_append(amount_sat, txid):
led = (ledger_load() + [{'ts': int(time.time()), 'amount': int(amount_sat), 'txid': txid}])[-5000:]
try: open(LEDGER_FILE, 'w').write(json.dumps(led))
except Exception: pass
_TX_CACHE['txs'] = None # force the next global_spent to re-read the chain (so the gate sees this spend)
def _recent_sends():
# cache the node's recent wallet txns ~4s so /status polling doesn't hammer RPC
if time.time() - _TX_CACHE['t'] < 4 and _TX_CACHE['txs'] is not None:
return _TX_CACHE['txs']
try: txs = json.loads(bc('listtransactions', '*', '300', '0', 'true', wallet=WALLET))
except Exception: return None
_TX_CACHE['t'] = time.time(); _TX_CACHE['txs'] = txs; return txs
def global_spent():
# CHAIN-DERIVED authoritative period outflow. The watch-only wallet's on-chain 'send' total over the
# window IS ground truth, so ANY coordinator replica on ANY host recomputes the same number straight from
# the blockchain — no single-host ledger to lose (no SPOF) and nothing to tamper. The local ledger is only
# a cache, used if the node RPC is briefly unreachable. (Change back to the 2-of-3 is recognised as
# internal by the descriptor wallet, so only real external sends — the refills — are counted.)
#
# MEMPOOL counts too: an unconfirmed (0-conf) send appears in listtransactions the moment it broadcasts,
# so a burst of spends inside one block interval can't slip past the cap. We exclude only abandoned /
# conflicted (confirmations < 0) sends, so a genuinely dropped tx doesn't permanently burn budget.
cut = max(int(time.time()) - GLOBAL['period_min'] * 60, RESET_TS[0])
txs = _recent_sends()
if txs is not None:
return sum(abs(int(round(t.get('amount', 0) * 1e8))) for t in txs
if t.get('category') == 'send' and t.get('time', 0) >= cut
and not t.get('abandoned') and t.get('confirmations', 0) >= 0)
return sum(int(e.get('amount', 0)) for e in ledger_load() if e.get('ts', 0) >= cut)
# ---- Nostr (NIP-07) sign-in -------------------------------------------------
# Optional opt-in identity (same pattern as walletplayground): anonymous use is unaffected. A signed-in user
# may authorise their OWN signet address onto every signer's on-device whitelist (a re-arm), instead of being
# confined to the sink. Self-contained pure-Python BIP-340 verify (no crypto dep). The rig is shared, so the
# authorised set is global; it's capped and the page says so. The HMAC secret lives only on the VM (chmod
# 600, NOT in git): a leak only lets someone forge a session, never move funds (the keys are on the devices).
SECRET_FILE = HOME + '/cksim/multisig-auth.secret'
COOKIE = 'ms_session'; CHALLENGE_TTL = 300; SESSION_TTL = 60 * 60 * 24 * 30
EXTRA_WL = [] # authorised custom destination addresses (in addition to the sink); capped, global
PUBKEYS = {} # uid -> nostr pubkey (in-memory, display only)
def _load_secret():
try:
s = open(SECRET_FILE, 'rb').read().strip()
if len(s) >= 32: return s
except FileNotFoundError: pass
s = secrets.token_hex(32).encode()
fd = os.open(SECRET_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(fd, 'wb') as f: f.write(s)
return s
SECRET = _load_secret()
# secp256k1 BIP-340 schnorr verify (verify-only reference impl)
_p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F
_n = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
_G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798,
0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8)
def _modinv(a, m): return pow(a, m - 2, m)
def _padd(P1, P2):
if P1 is None: return P2
if P2 is None: return P1
if P1[0] == P2[0] and P1[1] != P2[1]: return None
if P1 == P2: lam = (3 * P1[0] * P1[0] * _modinv(2 * P1[1], _p)) % _p
else: lam = ((P2[1] - P1[1]) * _modinv(P2[0] - P1[0], _p)) % _p
x3 = (lam * lam - P1[0] - P2[0]) % _p
return (x3, (lam * (P1[0] - x3) - P1[1]) % _p)
def _pmul(P, k):
R = None
while k:
if k & 1: R = _padd(R, P)
P = _padd(P, P); k >>= 1
return R
def _lift_x(x):
if x >= _p: return None
y_sq = (pow(x, 3, _p) + 7) % _p
y = pow(y_sq, (_p + 1) // 4, _p)
if pow(y, 2, _p) != y_sq: return None
return (x, y if y % 2 == 0 else _p - y)
def _tagged(tag, msg):
th = hashlib.sha256(tag.encode()).digest()
return hashlib.sha256(th + th + msg).digest()
def schnorr_verify(msg32, pub32, sig64):
if len(msg32) != 32 or len(pub32) != 32 or len(sig64) != 64: return False
P = _lift_x(int.from_bytes(pub32, 'big'))
if P is None: return False
r = int.from_bytes(sig64[:32], 'big'); s = int.from_bytes(sig64[32:], 'big')
if r >= _p or s >= _n: return False
e = int.from_bytes(_tagged('BIP0340/challenge', sig64[:32] + pub32 + msg32), 'big') % _n
R = _padd(_pmul(_G, s), _pmul(P, _n - e))
return R is not None and R[1] % 2 == 0 and R[0] == r
def nostr_event_id(ev):
s = json.dumps([0, ev['pubkey'], ev['created_at'], ev['kind'], ev['tags'], ev['content']],
separators=(',', ':'), ensure_ascii=False)
return hashlib.sha256(s.encode('utf-8')).hexdigest()
def _mac(msg): return hmac.new(SECRET, msg.encode(), hashlib.sha256).hexdigest()[:32]
def make_challenge():
nonce = secrets.token_hex(16); ts = str(int(time.time()))
return '%s.%s.%s' % (nonce, ts, _mac('%s.%s' % (nonce, ts)))
def check_challenge(ch):
try: nonce, ts, mac = ch.split('.')
except (ValueError, AttributeError): return False
if not hmac.compare_digest(mac, _mac('%s.%s' % (nonce, ts))): return False
return abs(int(time.time()) - int(ts)) <= CHALLENGE_TTL
def make_session(uid):
exp = str(int(time.time()) + SESSION_TTL)
return '%s.%s.%s' % (uid, exp, _mac('%s.%s' % (uid, exp)))
def read_session(cookie_header):
if not cookie_header: return None
c = SimpleCookie()
try: c.load(cookie_header)
except Exception: return None
if COOKIE not in c: return None
try: uid, exp, mac = c[COOKIE].value.split('.')
except ValueError: return None
if not hmac.compare_digest(mac, _mac('%s.%s' % (uid, exp))): return None
if int(time.time()) > int(exp): return None
return uid
def uid_for(provider, subject):
return provider + ':' + hashlib.sha256(('%s:%s' % (provider, subject)).encode()).hexdigest()[:24]
def add_wl(addr):
if addr not in EXTRA_WL:
EXTRA_WL.append(addr)
while len(EXTRA_WL) > 25: EXTRA_WL.pop(0)
def totp_secret():
# owner TOTP shared secret — generated once, persisted on the VM only (chmod 600, never in git), enrolled
# on each signer + shown to the signed-in owner for their authenticator. (Demo simplification: production
# enrols via the device's own QR so the coordinator never sees the secret.)
try:
s = open(TOTP_SECRET_FILE).read().strip()
if s: return s
except Exception: pass
s = b32encode(secrets.token_bytes(20)).decode()
fd = os.open(TOTP_SECRET_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(fd, 'w') as f: f.write(s)
return s
def bc(*a, wallet=None):
base = [CLI, '-signet', '-rpcconnect=%s' % RPC_HOST, '-rpcport=%s' % RPC_PORT,
'-rpcuser=%s' % RPC_USER, '-rpcpassword=%s' % RPC_PASS]
if wallet: base.append('-rpcwallet=%s' % wallet)
r = subprocess.run(base + list(a), capture_output=True, text=True, timeout=120)
if r.returncode: raise RuntimeError('bitcoin-cli %s: %s' % (a[:1], r.stderr.strip()[:160]))
return r.stdout.strip()
def descstr(b):
keys = ','.join('[%s/48h/1h/0h/2h]%s/%d/*' % (x, p, b) for x, p in TPUB.items())
return 'wsh(sortedmulti(2,%s))' % keys
def _ck(d): return json.loads(bc('getdescriptorinfo', d))['descriptor']
def ensure_wallet():
if _WALLET_READY[0]: return
if WALLET not in json.loads(bc('listwallets')):
try: bc('loadwallet', WALLET)
except RuntimeError:
bc('createwallet', WALLET, 'true', 'true', '', 'false', 'true')
ts = int(time.time()) - 14400
bc('importdescriptors', json.dumps([
{'desc': _ck(descstr(0)), 'active': True, 'internal': False, 'timestamp': ts, 'range': [0, 30]},
{'desc': _ck(descstr(1)), 'active': True, 'internal': True, 'timestamp': ts, 'range': [0, 30]}]), wallet=WALLET)
_WALLET_READY[0] = True
def signet_addr():
try: return open(HOME + '/cksim/signet_addr.txt').read().strip()
except Exception: return '(addr missing)'
def balance_sat():
ensure_wallet()
us = json.loads(bc('listunspent', '0', wallet=WALLET))
return us, int(round(sum(u['amount'] for u in us) * 1e8))
# A single-sig "sink" wallet on the fleet node receives the demo spends (EXTERNAL to the 2-of-3, so the
# Coldcard counts real outflow and max_amount actually bites). Each run first sweeps the sink back to the
# 2-of-3 funding address, so funds circulate and the demo is self-sustaining (depletes only by fees).
SINK = 'ckms-sink'; SINK_ADDR_FILE = HOME + '/cksim/sink_addr.txt'
def ensure_sink():
if SINK not in json.loads(bc('listwallets')):
try: bc('loadwallet', SINK)
except RuntimeError: bc('createwallet', SINK)
def sink_addr():
# STABLE whitelisted destination — reused (persisted) so it can live in each Coldcard's on-device
# address whitelist; an anonymous demo spend may only pay this address.
try:
a = open(SINK_ADDR_FILE).read().strip()
if a: return a
except Exception: pass
ensure_sink(); a = bc('getnewaddress', wallet=SINK); open(SINK_ADDR_FILE, 'w').write(a); return a
def other_addr():
# A fresh sink-wallet address deliberately NOT on the whitelist (the whitelist-block demo). The spend to
# it is refused on-device, so no coins ever land here.
ensure_sink(); return bc('getnewaddress', wallet=SINK)
def reclaim_sink():
# Sweep CONFIRMED sink coins back (minconf=1 -> trusted, so no untrusted-0-conf "-6" errors and no
# ever-growing unconfirmed chain). Sink funds become reclaimable one signet block after each demo spend.
try:
ensure_sink()
if json.loads(bc('listunspent', '1', wallet=SINK)):
bc('-named', 'sendall', 'recipients=' + json.dumps([signet_addr()]),
'fee_rate=1', 'options=' + json.dumps({'send_max': True}), wallet=SINK)
except Exception:
pass
# ---- device helpers ----------------------------------------------------------
# The ckcc simulator client BINDS a per-(simpid,ourpid) unix socket with only 5 instance slots; a leaked
# (un-closed) ColdcardDevice holds its slot until process exit (atexit), so opening many without closing
# eventually exhausts the slots -> "[Errno 98] Address already in use". So EVERY device is opened via this
# context manager and closed immediately after use — never leak a ColdcardDevice in a long-running process.
@contextmanager
def dev(n):
d = ColdcardDevice(sn=SOCK[n])
try:
yield d
finally:
try: d.close()
except Exception: pass
def is_active(n):
with dev(n) as d:
return bool(hstat_raw(d).get('active'))
def E(d, c):
r = d.send_recv(b'EXEC' + c.encode(), encrypt=False)
return r.decode() if isinstance(r, (bytes, bytearray)) else r
def Vv(d, c):
r = d.send_recv(b'EVAL' + c.encode())
return (r.decode() if isinstance(r, (bytes, bytearray)) else r).strip()
def K(d, k): d.send_recv(CCProtocolPacker.sim_keypress(k.encode('ascii')))
def hstat_raw(d):
try:
r = d.send_recv(CCProtocolPacker.hsm_status()); r = r.decode() if isinstance(r, (bytes, bytearray)) else r
return json.loads(r)
except Exception: return {}
def hstat(n):
with dev(n) as d:
s = hstat_raw(d)
return {'active': bool(s.get('active')),
'approved': s.get('approvals', s.get('approved', 0)),
'refused': s.get('refusals', s.get('refused', 0))}
# The sim sockets are single-client, so concurrent device reads (status poll vs a running spend) collide
# and make a live signer look offline. A single background poller reads the devices under LOCK and caches
# the result; the /status endpoint serves the cache (never touches a device). During a spend (which holds
# LOCK) the poller simply waits — no contention.
STATUS_CACHE = {n: {'active': False, 'approved': 0, 'refused': 0} for n in ALL}
HEAL = {n: 0.0 for n in ALL} # last auto-heal attempt per signer (cooldown)
_LAST_HEALTH = ['']
def _quorum():
return sum(1 for n in ALL if ENABLED[n] and STATUS_CACHE[n]['active'])
def status_poller():
tick = 0
while True:
time.sleep(2); tick += 1
with LOCK:
for n in ALL:
try: STATUS_CACHE[n] = hstat(n)
except Exception: # unreachable/dead signer -> mark inactive so quorum + heal see the truth
STATUS_CACHE[n] = {'active': False, 'approved': STATUS_CACHE[n]['approved'], 'refused': STATUS_CACHE[n]['refused']}
# quorum-health line for Loki/Grafana/Sentinel — on change, plus a ~30s heartbeat
q = _quorum(); ready = sum(1 for n in ALL if STATUS_CACHE[n]['active'])
lvl = 'OK' if q >= 3 else ('WARN' if q == 2 else 'CRIT')
msg = 'multisig-health quorum=%d needed=2 signers_ready=%d/3 status=%s' % (q, ready, lvl)
if msg != _LAST_HEALTH[0] or tick % 15 == 0:
_LAST_HEALTH[0] = msg; print(msg, flush=True)
def heal_loop():
# Boot-to-signing-ready auto-heal: re-arm any ENABLED signer that is reachable but not HSM-active (e.g. it
# rebooted and came back online). The quorum self-heals unattended. Cooldown avoids hammering; a signer
# whose process is gone can't be healed here (it raises) — the rig must relaunch it first.
time.sleep(24)
while True:
time.sleep(10)
for n in ALL:
if not ENABLED[n] or STATUS_CACHE[n]['active'] or time.time() - HEAL[n] < 25: continue
HEAL[n] = time.time()
try:
with LOCK:
if STATUS_CACHE[n]['active']: continue
print('auto-heal: signer %d not signing-ready — re-arming…' % n, flush=True)
arm_one(n, lambda *a, **k: None)
print('auto-heal: signer %d re-armed to signing-ready' % n, flush=True)
except Exception as e:
print('auto-heal: signer %d still unreachable (%s)' % (n, str(e)[:60]), flush=True)
def story(d): return E(d, "RV.write(repr(sim_display.story))")
def ms_count(d):
try: return int(Vv(d, "len(settings.get('multisig',[]))") or 0)
except Exception: return 0
def write_policy():
# Rule dimensions, all enforced ON-DEVICE: max_amount (per-txn), per_period+period (velocity),
# whitelist (allowed destinations — change back to ckms23 is auto-recognised and not gated by it).
# Two-tier, FIRST-MATCH ordered: rule#1 = automated baseline (no users); rule#2 = TOTP-gated surge
# (higher cap + velocity, requires the "owner" to present a code). A small spend matches rule#1 and
# auto-signs; one that exceeds it falls through to rule#2 and needs the TOTP.
wl = [sink_addr()] + list(EXTRA_WL)
pol = {"must_log": True, "period": POLICY['period_min'],
"msg_paths": ["any"], # allow on-device message signing under HSM (proof-of-control demo)
"rules": [
{"max_amount": POLICY['max_sat'], "per_period": POLICY['per_period_sat'],
"wallet": "ckms23", "whitelist": wl},
{"max_amount": SURGE['max_sat'], "per_period": SURGE['per_period_sat'],
"wallet": "ckms23", "whitelist": wl, "users": ["owner"], "min_users": 1}]}
open(POLICY_FILE, 'w').write(json.dumps(pol))
def arm_one(n, log):
with dev(n) as d:
if hstat_raw(d).get('active'): return
need_reg = ms_count(d) < 1
if need_reg:
log('coldcard %s · registering the 2-of-3 wallet…' % LET[n], 'c')
subprocess.run([CKCC, '-c', SOCK[n], 'upload', '-m', WALLET_FILE], capture_output=True)
time.sleep(1.2)
with dev(n) as d2:
for _ in range(10):
if ms_count(d2) >= 1: break
K(d2, 'y'); time.sleep(0.8)
with dev(n) as d:
E(d, "settings.put('hsmcmd', True); settings.save(); RV.write('ok')")
try: # enrol the "owner" TOTP user (referenced by the surge rule) — before HSM start, fresh device
d.send_recv(CCProtocolPacker.create_user(b'owner', USER_AUTH_TOTP, b32decode(totp_secret(), casefold=True)))
except Exception: pass
log('coldcard %s · entering HSM mode (auto ≤%d · surge ≤%d w/ TOTP)…' % (LET[n], POLICY['max_sat'], SURGE['max_sat']), 'c')
flen, sha = real_file_upload(open(POLICY_FILE, 'rb'), d)
d.send_recv(CCProtocolPacker.hsm_start(flen, sha)); time.sleep(1.3)
for _ in range(18):
if hstat_raw(d).get('active'): break
s = story(d); m = re.search(r'Press \((\d)\) to save', s)
if m: K(d, m.group(1)); time.sleep(1.4); continue
if 'Press OK to enable' in s or s.startswith("('Start HSM?'"): K(d, 'y'); time.sleep(1.4); continue
time.sleep(0.4)
log('coldcard %s · HSM ACTIVE ✓' % LET[n], 'g')
def arm_all(log):
write_policy()
for n in ALL: arm_one(n, log)
def rearm_all(log):
write_policy()
log('restarting all signers to load the new policy…', 'c')
subprocess.run(['bash', DEMO_RIG], capture_output=True, timeout=150)
time.sleep(3)
for n in ALL: arm_one(n, log)
log('policy applied · ≤%d sats/txn · ≤%d sats per %d min · whitelist enforced on all signers'
% (POLICY['max_sat'], POLICY['per_period_sat'], POLICY['period_min']), 'g')
def _reason(d, fallback):
# The device records WHY it refused in hsm_status.last_refusal (e.g. "over per-txn limit",
# "exceeds velocity", "address not allowed", "has 1 warning(s)"). Surface it — far clearer than the
# generic ckcc "You refused permission" string, and it makes the policy dimension obvious in the demo.
try:
time.sleep(0.2)
r = re.sub(r'^\s*Rejected:\s*', '', hstat_raw(d).get('last_refusal') or '').strip()
if r: return r[:90]
except Exception: pass
return fallback
def sign_hsm(n, psbt, code=None):
with dev(n) as d:
if code: # relay the owner's TOTP so the device's surge rule (min_users) is satisfied
try: d.send_recv(CCProtocolPacker.user_auth(b'owner', code.encode('ascii'), int(time.time()) // 30))
except Exception: pass
before = hstat_raw(d).get('refusals', hstat_raw(d).get('refused', 0))
flen, sha = real_file_upload(io.BytesIO(psbt), d)
try:
d.send_recv(CCProtocolPacker.sign_transaction(flen, sha, flags=0x0), timeout=None)
except Exception as e:
return ('refused', _reason(d, str(e)[:80]))
for _ in range(40):
time.sleep(0.4)
try:
r = d.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None)
except Exception as e:
return ('refused', _reason(d, str(e)[:80]))
if r is not None:
rl, rsha = r
return ('signed', b64encode(d.download_file(rl, rsha, file_number=1)).decode())
cur = hstat_raw(d)
if cur.get('refusals', cur.get('refused', 0)) > before:
return ('refused', _reason(d, 'on-device policy denied the spend'))
return ('refused', 'no signature (policy)')
# ---- streamed spend ----------------------------------------------------------
def sse_spend(send, amount_sat, dest_kind='sink', custom_addr=None, code=None):
def log(t, cls=''): send({'log': t, 'cls': cls})
def step(n): send({'step': n});
def done(n): send({'done_step': n})
amount_sat = max(546, int(amount_sat))
amount_btc = amount_sat / 1e8
surge = bool(code)
over = amount_sat > POLICY['max_sat']
off_list = dest_kind == 'other'
avail = [n for n in ALL if ENABLED[n] and is_active(n)]
if len(avail) < 2:
log('quorum lost · only %d policy signer(s) online — a 2-of-3 spend needs 2.' % len(avail), 'y')
log('(this is the failure-domain property: lose any one signer and you keep operating; lose two and funds are safe but frozen.)', 'c')
send({'end': True}); return
signers = avail[:2]
# COORDINATOR global velocity gate — the authoritative cap, enforced BEFORE any signer is asked, and
# counting only real broadcasts. Off-list spends are skipped (the devices refuse them on the whitelist).
if not off_list:
cap = SURGE_GLOBAL['per_period_sat'] if surge else GLOBAL['per_period_sat']
spent = global_spent()
if spent + amount_sat > cap:
log('coordinator %s velocity: this %d-sat spend would push the %d-min total to %d > the %d-sat cap.'
% ('surge' if surge else 'global', amount_sat, GLOBAL['period_min'], spent + amount_sat, cap), 'y')
log('blocked at the keyless coordinator, before any signer is asked — per-device velocity is only a backstop; the real cap is global and counts only broadcast txns, so dropped PSBTs never burn budget.', 'c')
send({'refused': True, 'end': True}); return
reclaim_sink() # pull back funds from prior demo spends -> self-sustaining
us, bal = balance_sat()
if bal < amount_sat + 500:
log('the 2-of-3 wallet needs confirmed signet coins. Send some to:', 'y'); log(' ' + signet_addr(), 'a')
send({'end': True}); return
step(1)
if surge:
log('SURGE: building a %d-sat PSBT — TOTP-authorised tier 2 (cap %d, velocity %d)…' % (amount_sat, SURGE['max_sat'], SURGE['per_period_sat']), 'c')
elif over:
log('building a PSBT that EXCEEDS the automated tier: %d sats > the %d-sat cap (no TOTP)…' % (amount_sat, POLICY['max_sat']), 'c')
else:
log('building a PSBT within policy: %d sats (limit %d)…' % (amount_sat, POLICY['max_sat']), 'c')
if dest_kind == 'custom' and custom_addr:
dest = custom_addr # the signed-in user's authorised (whitelisted) address
log('paying your authorised address %s (on the whitelist)…' % custom_addr, 'c')
elif off_list:
dest = other_addr() # NOT on the on-device whitelist -> the HSM should refuse the payee
log('paying an address that is NOT on the policy whitelist — the signers should refuse it…', 'c')
else:
dest = sink_addr() # EXTERNAL to the 2-of-3 (whitelisted) -> real outflow, policy bites
# Seed the spend with the LARGEST UTXO. Coldcard's HSM rejects ANY PSBT carrying a warning, and "Big Fee"
# fires when the network fee exceeds 5% of total value. Core's default minimal coin-selection would grab a
# tiny fragmented UTXO (fee then ~6%+), so we pin the biggest UTXO as the seed input (add_inputs tops up if
# needed) -> fee is a fraction of a percent. fee_rate=1 for extra headroom. include_unsafe: spend our own
# 0-conf coins (sink-reclaim returns funds as untrusted 0-conf).
seed = max(us, key=lambda u: u['amount'])
try:
funded = json.loads(bc('walletcreatefundedpsbt',
json.dumps([{'txid': seed['txid'], 'vout': seed['vout']}]),
json.dumps([{dest: round(amount_btc, 8)}]), '0',
json.dumps({'fee_rate': 1, 'change_type': 'bech32',
'include_unsafe': True, 'add_inputs': True}), wallet=WALLET))
except RuntimeError as e:
m = str(e)
if 'Insufficient' in m or 'mempool-chain' in m or 'too-long' in m or 'too long' in m:
log('the demo wallet is briefly tied up in unconfirmed transactions — give the next signet block ~10 min, or top up the address.', 'y')
else:
log('build error: ' + m[:140], 'e')
send({'end': True}); return
psbt = b64decode(funded['psbt']); done(1)
step(2); log('coordinator fans the PSBT to signers %s (it holds no keys)…' % '+'.join(LET[n] for n in signers), 'c'); time.sleep(0.4); done(2)
step(3)
if surge: log('relaying your TOTP authorisation to each signer (the coordinator never holds the secret)…', 'c')
sigs = []; refusals = 0
for n in signers:
log('coldcard %s · evaluating against its on-device policy…' % LET[n], 'c')
kind, payload = sign_hsm(n, psbt, code)
if kind == 'signed':
sigs.append(payload); log('coldcard %s · signed ✓' % LET[n], 'g')
else:
refusals += 1; log('coldcard %s · REFUSED ✗ — %s' % (LET[n], payload), 'y')
if refusals:
log('policy held: the spend was blocked on-device. No transaction created.', 'y')
log('// the device refused to sign — exactly what a programmable HSM is for.', 'c')
send({'refused': True, 'end': True}); return
done(3)
step(4); fin = json.loads(bc('finalizepsbt', bc('combinepsbt', json.dumps(sigs))))
if not fin.get('complete'):
log('ERROR: signatures did not finalize', 'e'); send({'end': True}); return
log('combined 2 partial signatures · transaction complete ✓', 'g'); done(4)
step(5); txid = bc('sendrawtransaction', fin['hex'])
if not off_list: ledger_append(amount_sat, txid) # only real broadcasts count toward the global cap
log('broadcast to signet ✓ txid ' + txid, 'a')
log('watch it confirm live → mempool.space/signet/tx/' + txid, 'g')
if surge:
log('// TOTP-authorised surge: 2 of 3 signed a larger spend with the owner in the loop — still bounded by the on-device surge ceiling.', 'c')
else:
log('// two independent Coldcards moved real signet coins under policy — no human in the loop.', 'c')
done(5); send({'end': True})
# ---- streamed message signing (2-of-3 proof of control, no funds moved) ------
def sign_msg_one(n, text):
r = subprocess.run([CKCC, '-c', SOCK[n], 'msg', text, '-p', MSG_PATH], capture_output=True, text=True, timeout=45)
if r.returncode != 0:
return ('fail', (r.stderr.strip() or r.stdout.strip() or 'refused')[:90])
lines = [l.strip() for l in r.stdout.replace('\r', '\n').splitlines() if l.strip() and 'Waiting' not in l]
sig = lines[-1] if lines else ''
addr = lines[-2] if len(lines) >= 2 else ''
return ('signed', (addr, sig))
def sse_signmsg(send, text):
def log(t, cls=''): send({'log': t, 'cls': cls})
text = (text or '').strip()[:140] or 'mineracks 2-of-3 proof-of-control'
avail = [n for n in ALL if ENABLED[n] and is_active(n)]
if len(avail) < 2:
log('quorum lost · only %d signer(s) online — need 2 keyholders to attest.' % len(avail), 'y'); send({'end': True}); return
signers = avail[:2]
log('$ coordinator sign-message', 'c')
log('challenge: "%s"' % text, 'a')
log('asking %s to each sign it with their key in the 2-of-3 wallet (HSM · no human)…' % '+'.join(LET[n] for n in signers), 'c')
got = 0
for n in signers:
kind, payload = sign_msg_one(n, text)
if kind == 'signed':
addr, sig = payload; got += 1
log('coldcard %s attested ✓' % LET[n], 'g')
log(' address ' + addr, 'a')
log(' signature ' + sig, 'a')
else:
log('coldcard %s could not sign — %s' % (LET[n], payload), 'y')
if got >= 2:
log('2 of 3 keyholders signed the same challenge ✓ — verifiable proof of 2-of-3 control, no funds moved.', 'g')
log('// verify each signature against its address with any Bitcoin signed-message tool.', 'c')
send({'end': True})
# ---- http --------------------------------------------------------------------
def status_json():
try: _, bal = balance_sat()
except Exception: bal = -1
devs = {}
for n in ALL:
h = STATUS_CACHE[n]
devs[str(n)] = {'on': ENABLED[n], 'active': h['active'], 'approved': h['approved'], 'refused': h['refused']}
online = sum(1 for n in ALL if ENABLED[n] and devs[str(n)]['active'])
return {'balance_sat': bal, 'devices': devs, 'quorum_online': online, 'quorum_needed': 2,
'policy': {'max_sat': POLICY['max_sat'], 'per_period_sat': POLICY['per_period_sat'], 'period_min': POLICY['period_min']},
'fund_addr': signet_addr(), 'sink_addr': sink_addr(), 'authorized': list(EXTRA_WL),
'global': {'per_period_sat': GLOBAL['per_period_sat'], 'period_min': GLOBAL['period_min'], 'spent': global_spent()},
'surge': {'max_sat': SURGE['max_sat'], 'per_period_sat': SURGE['per_period_sat'],
'global_per_period_sat': SURGE_GLOBAL['per_period_sat']}}
class H(BaseHTTPRequestHandler):
def _json(self, obj, code=200, set_cookie=None):
b = json.dumps(obj).encode(); self.send_response(code)
self.send_header('Content-Type', 'application/json'); self.send_header('Cache-Control', 'no-cache')
if set_cookie: self.send_header('Set-Cookie', set_cookie)
self.send_header('Content-Length', str(len(b))); self.end_headers(); self.wfile.write(b)
def _uid(self): return read_session(self.headers.get('Cookie', ''))
def _sse(self):
self.send_response(200); self.send_header('Content-Type', 'text/event-stream')
self.send_header('Cache-Control', 'no-cache'); self.end_headers()
def send(obj): self.wfile.write(('data: %s\n\n' % json.dumps(obj)).encode()); self.wfile.flush()
return send
def do_GET(self):
u = urllib.parse.urlparse(self.path); p = u.path.rstrip('/'); q = urllib.parse.parse_qs(u.query)
if p in ('', '/status'):
try: self._json(status_json())
except Exception as e: self._json({'error': str(e)[:120]}, 500)
return
if p == '/auth/me':
uid = self._uid(); self._json({'user': uid, 'pubkey': PUBKEYS.get(uid) if uid else None}); return
if p == '/auth/nostr/challenge':
self._json({'challenge': make_challenge()}); return
if p == '/totp_enroll':
if not self._uid(): self._json({'error': 'sign in required'}, 401); return
sec = totp_secret()
self._json({'secret': sec,
'otpauth': 'otpauth://totp/multisigHSM:owner?secret=%s&issuer=multisigHSM&period=30&digits=6' % sec}); return
if p == '/device':
n = int(q.get('n', [0])[0]); on = q.get('on', ['1'])[0] == '1'
if n in ALL: ENABLED[n] = on
self._json({'ok': True, 'devices': status_json()['devices']}); return
if p == '/globalpolicy':
if q.get('reset', [''])[0] == '1':
RESET_TS[0] = int(time.time()); _TX_CACHE['txs'] = None # move the window start to now (demo)
try: open(LEDGER_FILE, 'w').write('[]')
except Exception: pass
try: GLOBAL['per_period_sat'] = max(546, min(10**10, int(q.get('per_period_sat', [GLOBAL['per_period_sat']])[0])))
except Exception: pass
try: GLOBAL['period_min'] = max(1, min(10080, int(q.get('period_min', [GLOBAL['period_min']])[0])))
except Exception: pass
self._json({'ok': True, 'global': status_json()['global']}); return
if p == '/policy':
try: POLICY['max_sat'] = max(546, min(100000000, int(q.get('max_sat', [POLICY['max_sat']])[0])))
except Exception: pass
try: POLICY['per_period_sat'] = max(546, min(1000000000, int(q.get('per_period_sat', [POLICY['per_period_sat']])[0])))
except Exception: pass
try: POLICY['period_min'] = max(1, min(1440, int(q.get('period_min', [POLICY['period_min']])[0])))
except Exception: pass
send = self._sse()
try:
with LOCK: rearm_all(lambda t, c='': send({'log': t, 'cls': c}))
send({'end': True})
except Exception as e:
try: send({'log': 'policy re-arm error: %s' % str(e)[:160], 'cls': 'e'}); send({'end': True})
except Exception: pass
return
if p == '/surgepolicy':
send = self._sse()
if not self._uid():
send({'log': 'sign in with Nostr to configure the TOTP surge tier.', 'cls': 'y'}); send({'end': True}); return
try: SURGE['max_sat'] = max(POLICY['max_sat'], min(10**9, int(q.get('max_sat', [SURGE['max_sat']])[0])))
except Exception: pass
try: SURGE['per_period_sat'] = max(SURGE['max_sat'], min(10**10, int(q.get('per_period_sat', [SURGE['per_period_sat']])[0])))
except Exception: pass
try: SURGE_GLOBAL['per_period_sat'] = max(GLOBAL['per_period_sat'], min(10**10, int(q.get('global_per_period_sat', [SURGE_GLOBAL['per_period_sat']])[0])))
except Exception: pass
try:
with LOCK: rearm_all(lambda t, c='': send({'log': t, 'cls': c}))
send({'log': 'surge tier set · TOTP unlocks ≤%d sats/txn · ≤%d sats/period' % (SURGE['max_sat'], SURGE['per_period_sat']), 'cls': 'g'})
send({'end': True})
except Exception as e:
try: send({'log': 'surge re-arm error: %s' % str(e)[:160], 'cls': 'e'}); send({'end': True})
except Exception: pass
return
if p == '/authorize':
send = self._sse()
try:
uid = self._uid()
if not uid:
send({'log': 'sign in with Nostr first to authorise your own address.', 'cls': 'y'}); send({'end': True}); return
addr = q.get('addr', [''])[0].strip()
try: valid = json.loads(bc('validateaddress', addr)).get('isvalid', False)
except Exception: valid = False
if not valid:
send({'log': 'that is not a valid signet address.', 'cls': 'y'}); send({'end': True}); return
with LOCK:
add_wl(addr)
send({'log': 'adding %s to the policy whitelist on all 3 signers…' % addr, 'cls': 'c'})
rearm_all(lambda t, c='': send({'log': t, 'cls': c}))
send({'log': 'your address is now whitelisted — you can pay it; anonymous users still cannot.', 'cls': 'g'})
send({'authorized': addr, 'end': True})
except Exception as e:
try: send({'log': 'authorize error: %s' % str(e)[:160], 'cls': 'e'}); send({'end': True})
except Exception: pass
return
if p == '/reboot':
send = self._sse()
try:
n = int(q.get('n', [0])[0])
if n not in ALL: send({'log': 'bad signer', 'cls': 'e'}); send({'end': True}); return
send({'log': 'rebooting signer %s — power-cycling the device (loses HSM mode)…' % LET[n], 'cls': 'c'}); send({'step': 0})
HEAL[n] = 0 # let auto-heal act immediately
subprocess.run(['bash', DEMO_RIG, str(n)], capture_output=True, timeout=120)
send({'log': 'signer %s back online but NOT signing-ready (fresh boot, no HSM) — quorum is now %d/3.' % (LET[n], _quorum()), 'cls': 'y'})
send({'log': 'the coordinator detects it and AUTO-HEALS — re-registering the wallet, re-enrolling, re-entering HSM, unattended…', 'cls': 'c'})
ok = False
for _ in range(45):
time.sleep(1.5)
if STATUS_CACHE[n]['active']: ok = True; break
if ok:
send({'log': 'auto-heal complete: signer %s is signing-ready again ✓ — quorum restored to %d/3.' % (LET[n], _quorum()), 'cls': 'g'})
send({'log': '// boot-to-signing-ready: no human re-armed it. A rebooted signer self-rejoins the quorum.', 'cls': 'c'})
else:
send({'log': 'signer %s not back yet — auto-heal keeps retrying every ~10s in the background.' % LET[n], 'cls': 'y'})
send({'end': True})
except Exception as e:
try: send({'log': 'reboot error: %s' % str(e)[:120], 'cls': 'e'}); send({'end': True})
except Exception: pass
return
if p in ('/spend', '/signmsg'):
send = self._sse()
try:
if p == '/spend':
try: amount = int(q.get('amount', [DEFAULT_SAT])[0])
except Exception: amount = DEFAULT_SAT
dk = q.get('dest', ['sink'])[0]
code = (q.get('code', [''])[0] or '').strip() or None
uid = self._uid()
if code and not uid:
send({'log': 'sign in with Nostr to use a TOTP surge authorisation.', 'cls': 'y'}); send({'end': True})
elif code and not (code.isdigit() and len(code) == 6):
send({'log': 'a TOTP code is 6 digits.', 'cls': 'y'}); send({'end': True})
elif dk == 'custom':
addr = q.get('addr', [''])[0].strip()
if not uid or not addr:
send({'log': 'sign in with Nostr and authorise an address first.', 'cls': 'y'}); send({'end': True})
else:
with LOCK: sse_spend(send, amount, 'custom', addr, code)
elif dk == 'other':
with LOCK: sse_spend(send, amount, 'other', None, code)
else:
with LOCK: sse_spend(send, amount, 'sink', None, code)
else:
with LOCK: sse_signmsg(send, q.get('text', [''])[0])
except Exception as e:
try: send({'log': 'ERROR: %s' % str(e)[:200], 'cls': 'e'}); send({'end': True})
except Exception: pass
return
self.send_response(404); self.end_headers()
def do_POST(self):
p = urllib.parse.urlparse(self.path).path.rstrip('/')
n = int(self.headers.get('Content-Length', 0) or 0)
body = self.rfile.read(n) if n else b''
if p == '/auth/nostr/verify':
try: ev = json.loads(body)
except Exception: return self._json({'error': 'bad json'}, 400)
try:
if nostr_event_id(ev) != ev['id']: return self._json({'error': 'bad event id'}, 400)
if not check_challenge(ev.get('content', '')): return self._json({'error': 'stale or invalid challenge'}, 401)
ok = schnorr_verify(bytes.fromhex(ev['id']), bytes.fromhex(ev['pubkey']), bytes.fromhex(ev['sig']))
except (KeyError, ValueError): return self._json({'error': 'malformed event'}, 400)
if not ok: return self._json({'error': 'signature verification failed'}, 401)
uid = uid_for('nostr', ev['pubkey']); PUBKEYS[uid] = ev['pubkey']
cookie = '%s=%s; Path=/; Max-Age=%d; HttpOnly; Secure; SameSite=Lax' % (COOKIE, make_session(uid), SESSION_TTL)
return self._json({'user': uid, 'pubkey': ev['pubkey']}, 200, cookie)
if p == '/auth/logout':
return self._json({'ok': True}, 200, '%s=; Path=/; Max-Age=0; HttpOnly; Secure; SameSite=Lax' % COOKIE)
self.send_response(404); self.end_headers()
def log_message(self, *a): pass
def startup():
time.sleep(8)
try: ensure_wallet()
except Exception as e: print('wallet init:', e, flush=True)
with LOCK:
try: arm_all(lambda *a, **k: None)
except Exception as e: print('startup arm:', e, flush=True)
if __name__ == '__main__':
threading.Thread(target=startup, daemon=True).start()
threading.Thread(target=status_poller, daemon=True).start()
threading.Thread(target=heal_loop, daemon=True).start()
print('interactive signet orchestrator on :8099', flush=True)
ThreadingHTTPServer(('127.0.0.1', 8099), H).serve_forever()