Open-source 2-of-3 policy-enforced threshold HSM: auto-signs cold→hot treasury refills under on-device Coldcard policy, no human in the loop. Includes the full operator manual + quick-start, the reference coordinator/signing code, and a signer-host bootstrap. No keys, seeds, or secrets — placeholders only. Live signet demo: https://multisighsm.mineracks.com Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
776 lines
43 KiB
Python
776 lines
43 KiB
Python
#!/usr/bin/env python3
|
|
"""Reference demo orchestrator for the mineracks multisig HSM — interactive REAL-SIGNET policy HSM.
|
|
|
|
All 3 sims are HSM policy signers (any 2 sign → real 2-of-3 failover). Chain access is the fleet's
|
|
synced signet node (RPC over your private network). Keyless coordinator; on-device policy gates.
|
|
|
|
Endpoints (nginx maps /api/ -> :8099/):
|
|
GET /status -> JSON {balance_sat, devices, quorum, policy, sink_addr}
|
|
GET /spend?amount=SATS&dest=sink|other -> SSE; builds+signs a real-signet spend. dest=other pays an
|
|
off-whitelist address (HSM refuses); over max_amount refuses.
|
|
GET /signmsg?text=... -> SSE; 2-of-3 sign a message (proof of control, no funds move)
|
|
GET /device?n=N&on=0|1 -> toggle a signer on/off (failover / quorum demo)
|
|
GET /policy?max_sat=&per_period_sat=&period_min= -> set policy (max-amount + velocity) + RE-ARM all sims
|
|
"""
|
|
import json, os, io, time, re, subprocess, threading, urllib.parse, hashlib, hmac, secrets
|
|
from contextlib import contextmanager
|
|
from base64 import b64decode, b64encode, b32decode, b32encode
|
|
from http.cookies import SimpleCookie
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from ckcc.client import ColdcardDevice
|
|
from ckcc.protocol import CCProtocolPacker
|
|
from ckcc.constants import USER_AUTH_TOTP
|
|
from ckcc.cli import real_file_upload
|
|
|
|
HOME = os.path.expanduser('~'); V = '27.0'
|
|
CLI = HOME + '/bitcoin-%s/bin/bitcoin-cli' % V
|
|
CKCC = HOME + '/ccsim-venv/bin/ckcc'
|
|
WALLET_FILE = HOME + '/cksim/ckms23.txt'; POLICY_FILE = HOME + '/cksim/policy.json'
|
|
DEMO_RIG = HOME + '/cksim/demo_rig.sh'
|
|
WALLET = 'ckms23-signet'
|
|
RPC_HOST = os.environ.get('RPC_HOST', '127.0.0.1'); RPC_PORT = os.environ.get('RPC_PORT', '38332')
|
|
RPC_USER = os.environ.get('RPC_USER', 'rpcuser'); RPC_PASS = os.environ.get('RPC_PASS', '')
|
|
SOCK = {1: '/tmp/cksim-1.sock', 2: '/tmp/cksim-2.sock', 3: '/tmp/cksim-3.sock'}
|
|
LET = {1: 'A', 2: 'B', 3: 'C'}; ALL = (1, 2, 3)
|
|
DEFAULT_SAT = 2000; MSG_PATH = "m/48h/1h/0h/2h/0/0"
|
|
TPUB = {'00000001': 'tpubREPLACE_WITH_YOUR_SIGNER_1_XPUB',
|
|
'00000002': 'tpubREPLACE_WITH_YOUR_SIGNER_2_XPUB',
|
|
'00000003': 'tpubREPLACE_WITH_YOUR_SIGNER_3_XPUB'}
|
|
LOCK = threading.Lock()
|
|
ENABLED = {1: True, 2: True, 3: True}
|
|
POLICY = {'max_sat': 8000, 'per_period_sat': 500000, 'period_min': 60}
|
|
_WALLET_READY = [False]
|
|
|
|
# Coordinator-side GLOBAL velocity cap — the AUTHORITATIVE spend limit across ALL signers, and the epic's
|
|
# hardest open problem. On-device per-signer velocity (POLICY['per_period_sat']) is only a backstop: under a
|
|
# rotating "any 2 of 3" the three devices' local counters DRIFT (each tracks only the txns IT signed), and a
|
|
# Coldcard decrements its counter the moment it signs — even if that PSBT is later dropped and never
|
|
# broadcast. So no device's counter reflects true global outflow. The real cap therefore lives HERE in the
|
|
# keyless coordinator and counts ONLY real broadcasts (the ledger) — so dropped PSBTs never burn budget.
|
|
GLOBAL = {'per_period_sat': 50000, 'period_min': 60}
|
|
# TOTP-gated SURGE tier: a signed-in owner supplies a TOTP code to unlock a higher on-device tier (bigger
|
|
# per-txn cap + velocity) WITH a human in the loop — routine flows stay automated. The coordinator only
|
|
# RELAYS the code (via user_auth) to the signers; the secret lives on the devices + the owner's authenticator.
|
|
SURGE = {'max_sat': 200000, 'per_period_sat': 2000000} # tier-2 device limits (TOTP required)
|
|
SURGE_GLOBAL = {'per_period_sat': 1000000} # coordinator global cap when a TOTP is present
|
|
TOTP_SECRET_FILE = HOME + '/cksim/multisig-totp.secret' # owner's TOTP secret (chmod 600, NOT in git)
|
|
LEDGER_FILE = HOME + '/cksim/spend_ledger.json'
|
|
RESET_TS = [0] # demo-only: "Reset counter" moves the window start to now (production never resets)
|
|
_TX_CACHE = {'t': 0.0, 'txs': None}
|
|
def ledger_load():
|
|
try: return json.load(open(LEDGER_FILE))
|
|
except Exception: return []
|
|
def ledger_append(amount_sat, txid):
|
|
led = (ledger_load() + [{'ts': int(time.time()), 'amount': int(amount_sat), 'txid': txid}])[-5000:]
|
|
try: open(LEDGER_FILE, 'w').write(json.dumps(led))
|
|
except Exception: pass
|
|
_TX_CACHE['txs'] = None # force the next global_spent to re-read the chain (so the gate sees this spend)
|
|
def _recent_sends():
|
|
# cache the node's recent wallet txns ~4s so /status polling doesn't hammer RPC
|
|
if time.time() - _TX_CACHE['t'] < 4 and _TX_CACHE['txs'] is not None:
|
|
return _TX_CACHE['txs']
|
|
try: txs = json.loads(bc('listtransactions', '*', '300', '0', 'true', wallet=WALLET))
|
|
except Exception: return None
|
|
_TX_CACHE['t'] = time.time(); _TX_CACHE['txs'] = txs; return txs
|
|
def global_spent():
|
|
# CHAIN-DERIVED authoritative period outflow. The watch-only wallet's on-chain 'send' total over the
|
|
# window IS ground truth, so ANY coordinator replica on ANY host recomputes the same number straight from
|
|
# the blockchain — no single-host ledger to lose (no SPOF) and nothing to tamper. The local ledger is only
|
|
# a cache, used if the node RPC is briefly unreachable. (Change back to the 2-of-3 is recognised as
|
|
# internal by the descriptor wallet, so only real external sends — the refills — are counted.)
|
|
#
|
|
# MEMPOOL counts too: an unconfirmed (0-conf) send appears in listtransactions the moment it broadcasts,
|
|
# so a burst of spends inside one block interval can't slip past the cap. We exclude only abandoned /
|
|
# conflicted (confirmations < 0) sends, so a genuinely dropped tx doesn't permanently burn budget.
|
|
cut = max(int(time.time()) - GLOBAL['period_min'] * 60, RESET_TS[0])
|
|
txs = _recent_sends()
|
|
if txs is not None:
|
|
return sum(abs(int(round(t.get('amount', 0) * 1e8))) for t in txs
|
|
if t.get('category') == 'send' and t.get('time', 0) >= cut
|
|
and not t.get('abandoned') and t.get('confirmations', 0) >= 0)
|
|
return sum(int(e.get('amount', 0)) for e in ledger_load() if e.get('ts', 0) >= cut)
|
|
|
|
# ---- Nostr (NIP-07) sign-in -------------------------------------------------
|
|
# Optional opt-in identity (same pattern as walletplayground): anonymous use is unaffected. A signed-in user
|
|
# may authorise their OWN signet address onto every signer's on-device whitelist (a re-arm), instead of being
|
|
# confined to the sink. Self-contained pure-Python BIP-340 verify (no crypto dep). The rig is shared, so the
|
|
# authorised set is global; it's capped and the page says so. The HMAC secret lives only on the VM (chmod
|
|
# 600, NOT in git): a leak only lets someone forge a session, never move funds (the keys are on the devices).
|
|
SECRET_FILE = HOME + '/cksim/multisig-auth.secret'
|
|
COOKIE = 'ms_session'; CHALLENGE_TTL = 300; SESSION_TTL = 60 * 60 * 24 * 30
|
|
EXTRA_WL = [] # authorised custom destination addresses (in addition to the sink); capped, global
|
|
PUBKEYS = {} # uid -> nostr pubkey (in-memory, display only)
|
|
|
|
def _load_secret():
|
|
try:
|
|
s = open(SECRET_FILE, 'rb').read().strip()
|
|
if len(s) >= 32: return s
|
|
except FileNotFoundError: pass
|
|
s = secrets.token_hex(32).encode()
|
|
fd = os.open(SECRET_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
|
|
with os.fdopen(fd, 'wb') as f: f.write(s)
|
|
return s
|
|
SECRET = _load_secret()
|
|
|
|
# secp256k1 BIP-340 schnorr verify (verify-only reference impl)
|
|
_p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F
|
|
_n = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141
|
|
_G = (0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798,
|
|
0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8)
|
|
def _modinv(a, m): return pow(a, m - 2, m)
|
|
def _padd(P1, P2):
|
|
if P1 is None: return P2
|
|
if P2 is None: return P1
|
|
if P1[0] == P2[0] and P1[1] != P2[1]: return None
|
|
if P1 == P2: lam = (3 * P1[0] * P1[0] * _modinv(2 * P1[1], _p)) % _p
|
|
else: lam = ((P2[1] - P1[1]) * _modinv(P2[0] - P1[0], _p)) % _p
|
|
x3 = (lam * lam - P1[0] - P2[0]) % _p
|
|
return (x3, (lam * (P1[0] - x3) - P1[1]) % _p)
|
|
def _pmul(P, k):
|
|
R = None
|
|
while k:
|
|
if k & 1: R = _padd(R, P)
|
|
P = _padd(P, P); k >>= 1
|
|
return R
|
|
def _lift_x(x):
|
|
if x >= _p: return None
|
|
y_sq = (pow(x, 3, _p) + 7) % _p
|
|
y = pow(y_sq, (_p + 1) // 4, _p)
|
|
if pow(y, 2, _p) != y_sq: return None
|
|
return (x, y if y % 2 == 0 else _p - y)
|
|
def _tagged(tag, msg):
|
|
th = hashlib.sha256(tag.encode()).digest()
|
|
return hashlib.sha256(th + th + msg).digest()
|
|
def schnorr_verify(msg32, pub32, sig64):
|
|
if len(msg32) != 32 or len(pub32) != 32 or len(sig64) != 64: return False
|
|
P = _lift_x(int.from_bytes(pub32, 'big'))
|
|
if P is None: return False
|
|
r = int.from_bytes(sig64[:32], 'big'); s = int.from_bytes(sig64[32:], 'big')
|
|
if r >= _p or s >= _n: return False
|
|
e = int.from_bytes(_tagged('BIP0340/challenge', sig64[:32] + pub32 + msg32), 'big') % _n
|
|
R = _padd(_pmul(_G, s), _pmul(P, _n - e))
|
|
return R is not None and R[1] % 2 == 0 and R[0] == r
|
|
def nostr_event_id(ev):
|
|
s = json.dumps([0, ev['pubkey'], ev['created_at'], ev['kind'], ev['tags'], ev['content']],
|
|
separators=(',', ':'), ensure_ascii=False)
|
|
return hashlib.sha256(s.encode('utf-8')).hexdigest()
|
|
def _mac(msg): return hmac.new(SECRET, msg.encode(), hashlib.sha256).hexdigest()[:32]
|
|
def make_challenge():
|
|
nonce = secrets.token_hex(16); ts = str(int(time.time()))
|
|
return '%s.%s.%s' % (nonce, ts, _mac('%s.%s' % (nonce, ts)))
|
|
def check_challenge(ch):
|
|
try: nonce, ts, mac = ch.split('.')
|
|
except (ValueError, AttributeError): return False
|
|
if not hmac.compare_digest(mac, _mac('%s.%s' % (nonce, ts))): return False
|
|
return abs(int(time.time()) - int(ts)) <= CHALLENGE_TTL
|
|
def make_session(uid):
|
|
exp = str(int(time.time()) + SESSION_TTL)
|
|
return '%s.%s.%s' % (uid, exp, _mac('%s.%s' % (uid, exp)))
|
|
def read_session(cookie_header):
|
|
if not cookie_header: return None
|
|
c = SimpleCookie()
|
|
try: c.load(cookie_header)
|
|
except Exception: return None
|
|
if COOKIE not in c: return None
|
|
try: uid, exp, mac = c[COOKIE].value.split('.')
|
|
except ValueError: return None
|
|
if not hmac.compare_digest(mac, _mac('%s.%s' % (uid, exp))): return None
|
|
if int(time.time()) > int(exp): return None
|
|
return uid
|
|
def uid_for(provider, subject):
|
|
return provider + ':' + hashlib.sha256(('%s:%s' % (provider, subject)).encode()).hexdigest()[:24]
|
|
def add_wl(addr):
|
|
if addr not in EXTRA_WL:
|
|
EXTRA_WL.append(addr)
|
|
while len(EXTRA_WL) > 25: EXTRA_WL.pop(0)
|
|
def totp_secret():
|
|
# owner TOTP shared secret — generated once, persisted on the VM only (chmod 600, never in git), enrolled
|
|
# on each signer + shown to the signed-in owner for their authenticator. (Demo simplification: production
|
|
# enrols via the device's own QR so the coordinator never sees the secret.)
|
|
try:
|
|
s = open(TOTP_SECRET_FILE).read().strip()
|
|
if s: return s
|
|
except Exception: pass
|
|
s = b32encode(secrets.token_bytes(20)).decode()
|
|
fd = os.open(TOTP_SECRET_FILE, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
|
|
with os.fdopen(fd, 'w') as f: f.write(s)
|
|
return s
|
|
|
|
def bc(*a, wallet=None):
|
|
base = [CLI, '-signet', '-rpcconnect=%s' % RPC_HOST, '-rpcport=%s' % RPC_PORT,
|
|
'-rpcuser=%s' % RPC_USER, '-rpcpassword=%s' % RPC_PASS]
|
|
if wallet: base.append('-rpcwallet=%s' % wallet)
|
|
r = subprocess.run(base + list(a), capture_output=True, text=True, timeout=120)
|
|
if r.returncode: raise RuntimeError('bitcoin-cli %s: %s' % (a[:1], r.stderr.strip()[:160]))
|
|
return r.stdout.strip()
|
|
|
|
def descstr(b):
|
|
keys = ','.join('[%s/48h/1h/0h/2h]%s/%d/*' % (x, p, b) for x, p in TPUB.items())
|
|
return 'wsh(sortedmulti(2,%s))' % keys
|
|
def _ck(d): return json.loads(bc('getdescriptorinfo', d))['descriptor']
|
|
def ensure_wallet():
|
|
if _WALLET_READY[0]: return
|
|
if WALLET not in json.loads(bc('listwallets')):
|
|
try: bc('loadwallet', WALLET)
|
|
except RuntimeError:
|
|
bc('createwallet', WALLET, 'true', 'true', '', 'false', 'true')
|
|
ts = int(time.time()) - 14400
|
|
bc('importdescriptors', json.dumps([
|
|
{'desc': _ck(descstr(0)), 'active': True, 'internal': False, 'timestamp': ts, 'range': [0, 30]},
|
|
{'desc': _ck(descstr(1)), 'active': True, 'internal': True, 'timestamp': ts, 'range': [0, 30]}]), wallet=WALLET)
|
|
_WALLET_READY[0] = True
|
|
def signet_addr():
|
|
try: return open(HOME + '/cksim/signet_addr.txt').read().strip()
|
|
except Exception: return '(addr missing)'
|
|
def balance_sat():
|
|
ensure_wallet()
|
|
us = json.loads(bc('listunspent', '0', wallet=WALLET))
|
|
return us, int(round(sum(u['amount'] for u in us) * 1e8))
|
|
|
|
# A single-sig "sink" wallet on the fleet node receives the demo spends (EXTERNAL to the 2-of-3, so the
|
|
# Coldcard counts real outflow and max_amount actually bites). Each run first sweeps the sink back to the
|
|
# 2-of-3 funding address, so funds circulate and the demo is self-sustaining (depletes only by fees).
|
|
SINK = 'ckms-sink'; SINK_ADDR_FILE = HOME + '/cksim/sink_addr.txt'
|
|
def ensure_sink():
|
|
if SINK not in json.loads(bc('listwallets')):
|
|
try: bc('loadwallet', SINK)
|
|
except RuntimeError: bc('createwallet', SINK)
|
|
def sink_addr():
|
|
# STABLE whitelisted destination — reused (persisted) so it can live in each Coldcard's on-device
|
|
# address whitelist; an anonymous demo spend may only pay this address.
|
|
try:
|
|
a = open(SINK_ADDR_FILE).read().strip()
|
|
if a: return a
|
|
except Exception: pass
|
|
ensure_sink(); a = bc('getnewaddress', wallet=SINK); open(SINK_ADDR_FILE, 'w').write(a); return a
|
|
def other_addr():
|
|
# A fresh sink-wallet address deliberately NOT on the whitelist (the whitelist-block demo). The spend to
|
|
# it is refused on-device, so no coins ever land here.
|
|
ensure_sink(); return bc('getnewaddress', wallet=SINK)
|
|
def reclaim_sink():
|
|
# Sweep CONFIRMED sink coins back (minconf=1 -> trusted, so no untrusted-0-conf "-6" errors and no
|
|
# ever-growing unconfirmed chain). Sink funds become reclaimable one signet block after each demo spend.
|
|
try:
|
|
ensure_sink()
|
|
if json.loads(bc('listunspent', '1', wallet=SINK)):
|
|
bc('-named', 'sendall', 'recipients=' + json.dumps([signet_addr()]),
|
|
'fee_rate=1', 'options=' + json.dumps({'send_max': True}), wallet=SINK)
|
|
except Exception:
|
|
pass
|
|
|
|
# ---- device helpers ----------------------------------------------------------
|
|
# The ckcc simulator client BINDS a per-(simpid,ourpid) unix socket with only 5 instance slots; a leaked
|
|
# (un-closed) ColdcardDevice holds its slot until process exit (atexit), so opening many without closing
|
|
# eventually exhausts the slots -> "[Errno 98] Address already in use". So EVERY device is opened via this
|
|
# context manager and closed immediately after use — never leak a ColdcardDevice in a long-running process.
|
|
@contextmanager
|
|
def dev(n):
|
|
d = ColdcardDevice(sn=SOCK[n])
|
|
try:
|
|
yield d
|
|
finally:
|
|
try: d.close()
|
|
except Exception: pass
|
|
def is_active(n):
|
|
with dev(n) as d:
|
|
return bool(hstat_raw(d).get('active'))
|
|
def E(d, c):
|
|
r = d.send_recv(b'EXEC' + c.encode(), encrypt=False)
|
|
return r.decode() if isinstance(r, (bytes, bytearray)) else r
|
|
def Vv(d, c):
|
|
r = d.send_recv(b'EVAL' + c.encode())
|
|
return (r.decode() if isinstance(r, (bytes, bytearray)) else r).strip()
|
|
def K(d, k): d.send_recv(CCProtocolPacker.sim_keypress(k.encode('ascii')))
|
|
def hstat_raw(d):
|
|
try:
|
|
r = d.send_recv(CCProtocolPacker.hsm_status()); r = r.decode() if isinstance(r, (bytes, bytearray)) else r
|
|
return json.loads(r)
|
|
except Exception: return {}
|
|
def hstat(n):
|
|
with dev(n) as d:
|
|
s = hstat_raw(d)
|
|
return {'active': bool(s.get('active')),
|
|
'approved': s.get('approvals', s.get('approved', 0)),
|
|
'refused': s.get('refusals', s.get('refused', 0))}
|
|
|
|
# The sim sockets are single-client, so concurrent device reads (status poll vs a running spend) collide
|
|
# and make a live signer look offline. A single background poller reads the devices under LOCK and caches
|
|
# the result; the /status endpoint serves the cache (never touches a device). During a spend (which holds
|
|
# LOCK) the poller simply waits — no contention.
|
|
STATUS_CACHE = {n: {'active': False, 'approved': 0, 'refused': 0} for n in ALL}
|
|
HEAL = {n: 0.0 for n in ALL} # last auto-heal attempt per signer (cooldown)
|
|
_LAST_HEALTH = ['']
|
|
def _quorum():
|
|
return sum(1 for n in ALL if ENABLED[n] and STATUS_CACHE[n]['active'])
|
|
def status_poller():
|
|
tick = 0
|
|
while True:
|
|
time.sleep(2); tick += 1
|
|
with LOCK:
|
|
for n in ALL:
|
|
try: STATUS_CACHE[n] = hstat(n)
|
|
except Exception: # unreachable/dead signer -> mark inactive so quorum + heal see the truth
|
|
STATUS_CACHE[n] = {'active': False, 'approved': STATUS_CACHE[n]['approved'], 'refused': STATUS_CACHE[n]['refused']}
|
|
# quorum-health line for Loki/Grafana/Sentinel — on change, plus a ~30s heartbeat
|
|
q = _quorum(); ready = sum(1 for n in ALL if STATUS_CACHE[n]['active'])
|
|
lvl = 'OK' if q >= 3 else ('WARN' if q == 2 else 'CRIT')
|
|
msg = 'multisig-health quorum=%d needed=2 signers_ready=%d/3 status=%s' % (q, ready, lvl)
|
|
if msg != _LAST_HEALTH[0] or tick % 15 == 0:
|
|
_LAST_HEALTH[0] = msg; print(msg, flush=True)
|
|
|
|
def heal_loop():
|
|
# Boot-to-signing-ready auto-heal: re-arm any ENABLED signer that is reachable but not HSM-active (e.g. it
|
|
# rebooted and came back online). The quorum self-heals unattended. Cooldown avoids hammering; a signer
|
|
# whose process is gone can't be healed here (it raises) — the rig must relaunch it first.
|
|
time.sleep(24)
|
|
while True:
|
|
time.sleep(10)
|
|
for n in ALL:
|
|
if not ENABLED[n] or STATUS_CACHE[n]['active'] or time.time() - HEAL[n] < 25: continue
|
|
HEAL[n] = time.time()
|
|
try:
|
|
with LOCK:
|
|
if STATUS_CACHE[n]['active']: continue
|
|
print('auto-heal: signer %d not signing-ready — re-arming…' % n, flush=True)
|
|
arm_one(n, lambda *a, **k: None)
|
|
print('auto-heal: signer %d re-armed to signing-ready' % n, flush=True)
|
|
except Exception as e:
|
|
print('auto-heal: signer %d still unreachable (%s)' % (n, str(e)[:60]), flush=True)
|
|
def story(d): return E(d, "RV.write(repr(sim_display.story))")
|
|
def ms_count(d):
|
|
try: return int(Vv(d, "len(settings.get('multisig',[]))") or 0)
|
|
except Exception: return 0
|
|
|
|
def write_policy():
|
|
# Rule dimensions, all enforced ON-DEVICE: max_amount (per-txn), per_period+period (velocity),
|
|
# whitelist (allowed destinations — change back to ckms23 is auto-recognised and not gated by it).
|
|
# Two-tier, FIRST-MATCH ordered: rule#1 = automated baseline (no users); rule#2 = TOTP-gated surge
|
|
# (higher cap + velocity, requires the "owner" to present a code). A small spend matches rule#1 and
|
|
# auto-signs; one that exceeds it falls through to rule#2 and needs the TOTP.
|
|
wl = [sink_addr()] + list(EXTRA_WL)
|
|
pol = {"must_log": True, "period": POLICY['period_min'],
|
|
"msg_paths": ["any"], # allow on-device message signing under HSM (proof-of-control demo)
|
|
"rules": [
|
|
{"max_amount": POLICY['max_sat'], "per_period": POLICY['per_period_sat'],
|
|
"wallet": "ckms23", "whitelist": wl},
|
|
{"max_amount": SURGE['max_sat'], "per_period": SURGE['per_period_sat'],
|
|
"wallet": "ckms23", "whitelist": wl, "users": ["owner"], "min_users": 1}]}
|
|
open(POLICY_FILE, 'w').write(json.dumps(pol))
|
|
|
|
def arm_one(n, log):
|
|
with dev(n) as d:
|
|
if hstat_raw(d).get('active'): return
|
|
need_reg = ms_count(d) < 1
|
|
if need_reg:
|
|
log('coldcard %s · registering the 2-of-3 wallet…' % LET[n], 'c')
|
|
subprocess.run([CKCC, '-c', SOCK[n], 'upload', '-m', WALLET_FILE], capture_output=True)
|
|
time.sleep(1.2)
|
|
with dev(n) as d2:
|
|
for _ in range(10):
|
|
if ms_count(d2) >= 1: break
|
|
K(d2, 'y'); time.sleep(0.8)
|
|
with dev(n) as d:
|
|
E(d, "settings.put('hsmcmd', True); settings.save(); RV.write('ok')")
|
|
try: # enrol the "owner" TOTP user (referenced by the surge rule) — before HSM start, fresh device
|
|
d.send_recv(CCProtocolPacker.create_user(b'owner', USER_AUTH_TOTP, b32decode(totp_secret(), casefold=True)))
|
|
except Exception: pass
|
|
log('coldcard %s · entering HSM mode (auto ≤%d · surge ≤%d w/ TOTP)…' % (LET[n], POLICY['max_sat'], SURGE['max_sat']), 'c')
|
|
flen, sha = real_file_upload(open(POLICY_FILE, 'rb'), d)
|
|
d.send_recv(CCProtocolPacker.hsm_start(flen, sha)); time.sleep(1.3)
|
|
for _ in range(18):
|
|
if hstat_raw(d).get('active'): break
|
|
s = story(d); m = re.search(r'Press \((\d)\) to save', s)
|
|
if m: K(d, m.group(1)); time.sleep(1.4); continue
|
|
if 'Press OK to enable' in s or s.startswith("('Start HSM?'"): K(d, 'y'); time.sleep(1.4); continue
|
|
time.sleep(0.4)
|
|
log('coldcard %s · HSM ACTIVE ✓' % LET[n], 'g')
|
|
|
|
def arm_all(log):
|
|
write_policy()
|
|
for n in ALL: arm_one(n, log)
|
|
|
|
def rearm_all(log):
|
|
write_policy()
|
|
log('restarting all signers to load the new policy…', 'c')
|
|
subprocess.run(['bash', DEMO_RIG], capture_output=True, timeout=150)
|
|
time.sleep(3)
|
|
for n in ALL: arm_one(n, log)
|
|
log('policy applied · ≤%d sats/txn · ≤%d sats per %d min · whitelist enforced on all signers'
|
|
% (POLICY['max_sat'], POLICY['per_period_sat'], POLICY['period_min']), 'g')
|
|
|
|
def _reason(d, fallback):
|
|
# The device records WHY it refused in hsm_status.last_refusal (e.g. "over per-txn limit",
|
|
# "exceeds velocity", "address not allowed", "has 1 warning(s)"). Surface it — far clearer than the
|
|
# generic ckcc "You refused permission" string, and it makes the policy dimension obvious in the demo.
|
|
try:
|
|
time.sleep(0.2)
|
|
r = re.sub(r'^\s*Rejected:\s*', '', hstat_raw(d).get('last_refusal') or '').strip()
|
|
if r: return r[:90]
|
|
except Exception: pass
|
|
return fallback
|
|
|
|
def sign_hsm(n, psbt, code=None):
|
|
with dev(n) as d:
|
|
if code: # relay the owner's TOTP so the device's surge rule (min_users) is satisfied
|
|
try: d.send_recv(CCProtocolPacker.user_auth(b'owner', code.encode('ascii'), int(time.time()) // 30))
|
|
except Exception: pass
|
|
before = hstat_raw(d).get('refusals', hstat_raw(d).get('refused', 0))
|
|
flen, sha = real_file_upload(io.BytesIO(psbt), d)
|
|
try:
|
|
d.send_recv(CCProtocolPacker.sign_transaction(flen, sha, flags=0x0), timeout=None)
|
|
except Exception as e:
|
|
return ('refused', _reason(d, str(e)[:80]))
|
|
for _ in range(40):
|
|
time.sleep(0.4)
|
|
try:
|
|
r = d.send_recv(CCProtocolPacker.get_signed_txn(), timeout=None)
|
|
except Exception as e:
|
|
return ('refused', _reason(d, str(e)[:80]))
|
|
if r is not None:
|
|
rl, rsha = r
|
|
return ('signed', b64encode(d.download_file(rl, rsha, file_number=1)).decode())
|
|
cur = hstat_raw(d)
|
|
if cur.get('refusals', cur.get('refused', 0)) > before:
|
|
return ('refused', _reason(d, 'on-device policy denied the spend'))
|
|
return ('refused', 'no signature (policy)')
|
|
|
|
# ---- streamed spend ----------------------------------------------------------
|
|
def sse_spend(send, amount_sat, dest_kind='sink', custom_addr=None, code=None):
|
|
def log(t, cls=''): send({'log': t, 'cls': cls})
|
|
def step(n): send({'step': n});
|
|
def done(n): send({'done_step': n})
|
|
amount_sat = max(546, int(amount_sat))
|
|
amount_btc = amount_sat / 1e8
|
|
surge = bool(code)
|
|
over = amount_sat > POLICY['max_sat']
|
|
off_list = dest_kind == 'other'
|
|
|
|
avail = [n for n in ALL if ENABLED[n] and is_active(n)]
|
|
if len(avail) < 2:
|
|
log('quorum lost · only %d policy signer(s) online — a 2-of-3 spend needs 2.' % len(avail), 'y')
|
|
log('(this is the failure-domain property: lose any one signer and you keep operating; lose two and funds are safe but frozen.)', 'c')
|
|
send({'end': True}); return
|
|
signers = avail[:2]
|
|
|
|
# COORDINATOR global velocity gate — the authoritative cap, enforced BEFORE any signer is asked, and
|
|
# counting only real broadcasts. Off-list spends are skipped (the devices refuse them on the whitelist).
|
|
if not off_list:
|
|
cap = SURGE_GLOBAL['per_period_sat'] if surge else GLOBAL['per_period_sat']
|
|
spent = global_spent()
|
|
if spent + amount_sat > cap:
|
|
log('coordinator %s velocity: this %d-sat spend would push the %d-min total to %d > the %d-sat cap.'
|
|
% ('surge' if surge else 'global', amount_sat, GLOBAL['period_min'], spent + amount_sat, cap), 'y')
|
|
log('blocked at the keyless coordinator, before any signer is asked — per-device velocity is only a backstop; the real cap is global and counts only broadcast txns, so dropped PSBTs never burn budget.', 'c')
|
|
send({'refused': True, 'end': True}); return
|
|
|
|
reclaim_sink() # pull back funds from prior demo spends -> self-sustaining
|
|
us, bal = balance_sat()
|
|
if bal < amount_sat + 500:
|
|
log('the 2-of-3 wallet needs confirmed signet coins. Send some to:', 'y'); log(' ' + signet_addr(), 'a')
|
|
send({'end': True}); return
|
|
|
|
step(1)
|
|
if surge:
|
|
log('SURGE: building a %d-sat PSBT — TOTP-authorised tier 2 (cap %d, velocity %d)…' % (amount_sat, SURGE['max_sat'], SURGE['per_period_sat']), 'c')
|
|
elif over:
|
|
log('building a PSBT that EXCEEDS the automated tier: %d sats > the %d-sat cap (no TOTP)…' % (amount_sat, POLICY['max_sat']), 'c')
|
|
else:
|
|
log('building a PSBT within policy: %d sats (limit %d)…' % (amount_sat, POLICY['max_sat']), 'c')
|
|
if dest_kind == 'custom' and custom_addr:
|
|
dest = custom_addr # the signed-in user's authorised (whitelisted) address
|
|
log('paying your authorised address %s (on the whitelist)…' % custom_addr, 'c')
|
|
elif off_list:
|
|
dest = other_addr() # NOT on the on-device whitelist -> the HSM should refuse the payee
|
|
log('paying an address that is NOT on the policy whitelist — the signers should refuse it…', 'c')
|
|
else:
|
|
dest = sink_addr() # EXTERNAL to the 2-of-3 (whitelisted) -> real outflow, policy bites
|
|
# Seed the spend with the LARGEST UTXO. Coldcard's HSM rejects ANY PSBT carrying a warning, and "Big Fee"
|
|
# fires when the network fee exceeds 5% of total value. Core's default minimal coin-selection would grab a
|
|
# tiny fragmented UTXO (fee then ~6%+), so we pin the biggest UTXO as the seed input (add_inputs tops up if
|
|
# needed) -> fee is a fraction of a percent. fee_rate=1 for extra headroom. include_unsafe: spend our own
|
|
# 0-conf coins (sink-reclaim returns funds as untrusted 0-conf).
|
|
seed = max(us, key=lambda u: u['amount'])
|
|
try:
|
|
funded = json.loads(bc('walletcreatefundedpsbt',
|
|
json.dumps([{'txid': seed['txid'], 'vout': seed['vout']}]),
|
|
json.dumps([{dest: round(amount_btc, 8)}]), '0',
|
|
json.dumps({'fee_rate': 1, 'change_type': 'bech32',
|
|
'include_unsafe': True, 'add_inputs': True}), wallet=WALLET))
|
|
except RuntimeError as e:
|
|
m = str(e)
|
|
if 'Insufficient' in m or 'mempool-chain' in m or 'too-long' in m or 'too long' in m:
|
|
log('the demo wallet is briefly tied up in unconfirmed transactions — give the next signet block ~10 min, or top up the address.', 'y')
|
|
else:
|
|
log('build error: ' + m[:140], 'e')
|
|
send({'end': True}); return
|
|
psbt = b64decode(funded['psbt']); done(1)
|
|
|
|
step(2); log('coordinator fans the PSBT to signers %s (it holds no keys)…' % '+'.join(LET[n] for n in signers), 'c'); time.sleep(0.4); done(2)
|
|
|
|
step(3)
|
|
if surge: log('relaying your TOTP authorisation to each signer (the coordinator never holds the secret)…', 'c')
|
|
sigs = []; refusals = 0
|
|
for n in signers:
|
|
log('coldcard %s · evaluating against its on-device policy…' % LET[n], 'c')
|
|
kind, payload = sign_hsm(n, psbt, code)
|
|
if kind == 'signed':
|
|
sigs.append(payload); log('coldcard %s · signed ✓' % LET[n], 'g')
|
|
else:
|
|
refusals += 1; log('coldcard %s · REFUSED ✗ — %s' % (LET[n], payload), 'y')
|
|
if refusals:
|
|
log('policy held: the spend was blocked on-device. No transaction created.', 'y')
|
|
log('// the device refused to sign — exactly what a programmable HSM is for.', 'c')
|
|
send({'refused': True, 'end': True}); return
|
|
done(3)
|
|
|
|
step(4); fin = json.loads(bc('finalizepsbt', bc('combinepsbt', json.dumps(sigs))))
|
|
if not fin.get('complete'):
|
|
log('ERROR: signatures did not finalize', 'e'); send({'end': True}); return
|
|
log('combined 2 partial signatures · transaction complete ✓', 'g'); done(4)
|
|
|
|
step(5); txid = bc('sendrawtransaction', fin['hex'])
|
|
if not off_list: ledger_append(amount_sat, txid) # only real broadcasts count toward the global cap
|
|
log('broadcast to signet ✓ txid ' + txid, 'a')
|
|
log('watch it confirm live → mempool.space/signet/tx/' + txid, 'g')
|
|
if surge:
|
|
log('// TOTP-authorised surge: 2 of 3 signed a larger spend with the owner in the loop — still bounded by the on-device surge ceiling.', 'c')
|
|
else:
|
|
log('// two independent Coldcards moved real signet coins under policy — no human in the loop.', 'c')
|
|
done(5); send({'end': True})
|
|
|
|
# ---- streamed message signing (2-of-3 proof of control, no funds moved) ------
|
|
def sign_msg_one(n, text):
|
|
r = subprocess.run([CKCC, '-c', SOCK[n], 'msg', text, '-p', MSG_PATH], capture_output=True, text=True, timeout=45)
|
|
if r.returncode != 0:
|
|
return ('fail', (r.stderr.strip() or r.stdout.strip() or 'refused')[:90])
|
|
lines = [l.strip() for l in r.stdout.replace('\r', '\n').splitlines() if l.strip() and 'Waiting' not in l]
|
|
sig = lines[-1] if lines else ''
|
|
addr = lines[-2] if len(lines) >= 2 else ''
|
|
return ('signed', (addr, sig))
|
|
|
|
def sse_signmsg(send, text):
|
|
def log(t, cls=''): send({'log': t, 'cls': cls})
|
|
text = (text or '').strip()[:140] or 'mineracks 2-of-3 proof-of-control'
|
|
avail = [n for n in ALL if ENABLED[n] and is_active(n)]
|
|
if len(avail) < 2:
|
|
log('quorum lost · only %d signer(s) online — need 2 keyholders to attest.' % len(avail), 'y'); send({'end': True}); return
|
|
signers = avail[:2]
|
|
log('$ coordinator sign-message', 'c')
|
|
log('challenge: "%s"' % text, 'a')
|
|
log('asking %s to each sign it with their key in the 2-of-3 wallet (HSM · no human)…' % '+'.join(LET[n] for n in signers), 'c')
|
|
got = 0
|
|
for n in signers:
|
|
kind, payload = sign_msg_one(n, text)
|
|
if kind == 'signed':
|
|
addr, sig = payload; got += 1
|
|
log('coldcard %s attested ✓' % LET[n], 'g')
|
|
log(' address ' + addr, 'a')
|
|
log(' signature ' + sig, 'a')
|
|
else:
|
|
log('coldcard %s could not sign — %s' % (LET[n], payload), 'y')
|
|
if got >= 2:
|
|
log('2 of 3 keyholders signed the same challenge ✓ — verifiable proof of 2-of-3 control, no funds moved.', 'g')
|
|
log('// verify each signature against its address with any Bitcoin signed-message tool.', 'c')
|
|
send({'end': True})
|
|
|
|
# ---- http --------------------------------------------------------------------
|
|
def status_json():
|
|
try: _, bal = balance_sat()
|
|
except Exception: bal = -1
|
|
devs = {}
|
|
for n in ALL:
|
|
h = STATUS_CACHE[n]
|
|
devs[str(n)] = {'on': ENABLED[n], 'active': h['active'], 'approved': h['approved'], 'refused': h['refused']}
|
|
online = sum(1 for n in ALL if ENABLED[n] and devs[str(n)]['active'])
|
|
return {'balance_sat': bal, 'devices': devs, 'quorum_online': online, 'quorum_needed': 2,
|
|
'policy': {'max_sat': POLICY['max_sat'], 'per_period_sat': POLICY['per_period_sat'], 'period_min': POLICY['period_min']},
|
|
'fund_addr': signet_addr(), 'sink_addr': sink_addr(), 'authorized': list(EXTRA_WL),
|
|
'global': {'per_period_sat': GLOBAL['per_period_sat'], 'period_min': GLOBAL['period_min'], 'spent': global_spent()},
|
|
'surge': {'max_sat': SURGE['max_sat'], 'per_period_sat': SURGE['per_period_sat'],
|
|
'global_per_period_sat': SURGE_GLOBAL['per_period_sat']}}
|
|
|
|
class H(BaseHTTPRequestHandler):
|
|
def _json(self, obj, code=200, set_cookie=None):
|
|
b = json.dumps(obj).encode(); self.send_response(code)
|
|
self.send_header('Content-Type', 'application/json'); self.send_header('Cache-Control', 'no-cache')
|
|
if set_cookie: self.send_header('Set-Cookie', set_cookie)
|
|
self.send_header('Content-Length', str(len(b))); self.end_headers(); self.wfile.write(b)
|
|
def _uid(self): return read_session(self.headers.get('Cookie', ''))
|
|
def _sse(self):
|
|
self.send_response(200); self.send_header('Content-Type', 'text/event-stream')
|
|
self.send_header('Cache-Control', 'no-cache'); self.end_headers()
|
|
def send(obj): self.wfile.write(('data: %s\n\n' % json.dumps(obj)).encode()); self.wfile.flush()
|
|
return send
|
|
def do_GET(self):
|
|
u = urllib.parse.urlparse(self.path); p = u.path.rstrip('/'); q = urllib.parse.parse_qs(u.query)
|
|
if p in ('', '/status'):
|
|
try: self._json(status_json())
|
|
except Exception as e: self._json({'error': str(e)[:120]}, 500)
|
|
return
|
|
if p == '/auth/me':
|
|
uid = self._uid(); self._json({'user': uid, 'pubkey': PUBKEYS.get(uid) if uid else None}); return
|
|
if p == '/auth/nostr/challenge':
|
|
self._json({'challenge': make_challenge()}); return
|
|
if p == '/totp_enroll':
|
|
if not self._uid(): self._json({'error': 'sign in required'}, 401); return
|
|
sec = totp_secret()
|
|
self._json({'secret': sec,
|
|
'otpauth': 'otpauth://totp/multisigHSM:owner?secret=%s&issuer=multisigHSM&period=30&digits=6' % sec}); return
|
|
if p == '/device':
|
|
n = int(q.get('n', [0])[0]); on = q.get('on', ['1'])[0] == '1'
|
|
if n in ALL: ENABLED[n] = on
|
|
self._json({'ok': True, 'devices': status_json()['devices']}); return
|
|
if p == '/globalpolicy':
|
|
if q.get('reset', [''])[0] == '1':
|
|
RESET_TS[0] = int(time.time()); _TX_CACHE['txs'] = None # move the window start to now (demo)
|
|
try: open(LEDGER_FILE, 'w').write('[]')
|
|
except Exception: pass
|
|
try: GLOBAL['per_period_sat'] = max(546, min(10**10, int(q.get('per_period_sat', [GLOBAL['per_period_sat']])[0])))
|
|
except Exception: pass
|
|
try: GLOBAL['period_min'] = max(1, min(10080, int(q.get('period_min', [GLOBAL['period_min']])[0])))
|
|
except Exception: pass
|
|
self._json({'ok': True, 'global': status_json()['global']}); return
|
|
if p == '/policy':
|
|
try: POLICY['max_sat'] = max(546, min(100000000, int(q.get('max_sat', [POLICY['max_sat']])[0])))
|
|
except Exception: pass
|
|
try: POLICY['per_period_sat'] = max(546, min(1000000000, int(q.get('per_period_sat', [POLICY['per_period_sat']])[0])))
|
|
except Exception: pass
|
|
try: POLICY['period_min'] = max(1, min(1440, int(q.get('period_min', [POLICY['period_min']])[0])))
|
|
except Exception: pass
|
|
send = self._sse()
|
|
try:
|
|
with LOCK: rearm_all(lambda t, c='': send({'log': t, 'cls': c}))
|
|
send({'end': True})
|
|
except Exception as e:
|
|
try: send({'log': 'policy re-arm error: %s' % str(e)[:160], 'cls': 'e'}); send({'end': True})
|
|
except Exception: pass
|
|
return
|
|
if p == '/surgepolicy':
|
|
send = self._sse()
|
|
if not self._uid():
|
|
send({'log': 'sign in with Nostr to configure the TOTP surge tier.', 'cls': 'y'}); send({'end': True}); return
|
|
try: SURGE['max_sat'] = max(POLICY['max_sat'], min(10**9, int(q.get('max_sat', [SURGE['max_sat']])[0])))
|
|
except Exception: pass
|
|
try: SURGE['per_period_sat'] = max(SURGE['max_sat'], min(10**10, int(q.get('per_period_sat', [SURGE['per_period_sat']])[0])))
|
|
except Exception: pass
|
|
try: SURGE_GLOBAL['per_period_sat'] = max(GLOBAL['per_period_sat'], min(10**10, int(q.get('global_per_period_sat', [SURGE_GLOBAL['per_period_sat']])[0])))
|
|
except Exception: pass
|
|
try:
|
|
with LOCK: rearm_all(lambda t, c='': send({'log': t, 'cls': c}))
|
|
send({'log': 'surge tier set · TOTP unlocks ≤%d sats/txn · ≤%d sats/period' % (SURGE['max_sat'], SURGE['per_period_sat']), 'cls': 'g'})
|
|
send({'end': True})
|
|
except Exception as e:
|
|
try: send({'log': 'surge re-arm error: %s' % str(e)[:160], 'cls': 'e'}); send({'end': True})
|
|
except Exception: pass
|
|
return
|
|
if p == '/authorize':
|
|
send = self._sse()
|
|
try:
|
|
uid = self._uid()
|
|
if not uid:
|
|
send({'log': 'sign in with Nostr first to authorise your own address.', 'cls': 'y'}); send({'end': True}); return
|
|
addr = q.get('addr', [''])[0].strip()
|
|
try: valid = json.loads(bc('validateaddress', addr)).get('isvalid', False)
|
|
except Exception: valid = False
|
|
if not valid:
|
|
send({'log': 'that is not a valid signet address.', 'cls': 'y'}); send({'end': True}); return
|
|
with LOCK:
|
|
add_wl(addr)
|
|
send({'log': 'adding %s to the policy whitelist on all 3 signers…' % addr, 'cls': 'c'})
|
|
rearm_all(lambda t, c='': send({'log': t, 'cls': c}))
|
|
send({'log': 'your address is now whitelisted — you can pay it; anonymous users still cannot.', 'cls': 'g'})
|
|
send({'authorized': addr, 'end': True})
|
|
except Exception as e:
|
|
try: send({'log': 'authorize error: %s' % str(e)[:160], 'cls': 'e'}); send({'end': True})
|
|
except Exception: pass
|
|
return
|
|
if p == '/reboot':
|
|
send = self._sse()
|
|
try:
|
|
n = int(q.get('n', [0])[0])
|
|
if n not in ALL: send({'log': 'bad signer', 'cls': 'e'}); send({'end': True}); return
|
|
send({'log': 'rebooting signer %s — power-cycling the device (loses HSM mode)…' % LET[n], 'cls': 'c'}); send({'step': 0})
|
|
HEAL[n] = 0 # let auto-heal act immediately
|
|
subprocess.run(['bash', DEMO_RIG, str(n)], capture_output=True, timeout=120)
|
|
send({'log': 'signer %s back online but NOT signing-ready (fresh boot, no HSM) — quorum is now %d/3.' % (LET[n], _quorum()), 'cls': 'y'})
|
|
send({'log': 'the coordinator detects it and AUTO-HEALS — re-registering the wallet, re-enrolling, re-entering HSM, unattended…', 'cls': 'c'})
|
|
ok = False
|
|
for _ in range(45):
|
|
time.sleep(1.5)
|
|
if STATUS_CACHE[n]['active']: ok = True; break
|
|
if ok:
|
|
send({'log': 'auto-heal complete: signer %s is signing-ready again ✓ — quorum restored to %d/3.' % (LET[n], _quorum()), 'cls': 'g'})
|
|
send({'log': '// boot-to-signing-ready: no human re-armed it. A rebooted signer self-rejoins the quorum.', 'cls': 'c'})
|
|
else:
|
|
send({'log': 'signer %s not back yet — auto-heal keeps retrying every ~10s in the background.' % LET[n], 'cls': 'y'})
|
|
send({'end': True})
|
|
except Exception as e:
|
|
try: send({'log': 'reboot error: %s' % str(e)[:120], 'cls': 'e'}); send({'end': True})
|
|
except Exception: pass
|
|
return
|
|
if p in ('/spend', '/signmsg'):
|
|
send = self._sse()
|
|
try:
|
|
if p == '/spend':
|
|
try: amount = int(q.get('amount', [DEFAULT_SAT])[0])
|
|
except Exception: amount = DEFAULT_SAT
|
|
dk = q.get('dest', ['sink'])[0]
|
|
code = (q.get('code', [''])[0] or '').strip() or None
|
|
uid = self._uid()
|
|
if code and not uid:
|
|
send({'log': 'sign in with Nostr to use a TOTP surge authorisation.', 'cls': 'y'}); send({'end': True})
|
|
elif code and not (code.isdigit() and len(code) == 6):
|
|
send({'log': 'a TOTP code is 6 digits.', 'cls': 'y'}); send({'end': True})
|
|
elif dk == 'custom':
|
|
addr = q.get('addr', [''])[0].strip()
|
|
if not uid or not addr:
|
|
send({'log': 'sign in with Nostr and authorise an address first.', 'cls': 'y'}); send({'end': True})
|
|
else:
|
|
with LOCK: sse_spend(send, amount, 'custom', addr, code)
|
|
elif dk == 'other':
|
|
with LOCK: sse_spend(send, amount, 'other', None, code)
|
|
else:
|
|
with LOCK: sse_spend(send, amount, 'sink', None, code)
|
|
else:
|
|
with LOCK: sse_signmsg(send, q.get('text', [''])[0])
|
|
except Exception as e:
|
|
try: send({'log': 'ERROR: %s' % str(e)[:200], 'cls': 'e'}); send({'end': True})
|
|
except Exception: pass
|
|
return
|
|
self.send_response(404); self.end_headers()
|
|
def do_POST(self):
|
|
p = urllib.parse.urlparse(self.path).path.rstrip('/')
|
|
n = int(self.headers.get('Content-Length', 0) or 0)
|
|
body = self.rfile.read(n) if n else b''
|
|
if p == '/auth/nostr/verify':
|
|
try: ev = json.loads(body)
|
|
except Exception: return self._json({'error': 'bad json'}, 400)
|
|
try:
|
|
if nostr_event_id(ev) != ev['id']: return self._json({'error': 'bad event id'}, 400)
|
|
if not check_challenge(ev.get('content', '')): return self._json({'error': 'stale or invalid challenge'}, 401)
|
|
ok = schnorr_verify(bytes.fromhex(ev['id']), bytes.fromhex(ev['pubkey']), bytes.fromhex(ev['sig']))
|
|
except (KeyError, ValueError): return self._json({'error': 'malformed event'}, 400)
|
|
if not ok: return self._json({'error': 'signature verification failed'}, 401)
|
|
uid = uid_for('nostr', ev['pubkey']); PUBKEYS[uid] = ev['pubkey']
|
|
cookie = '%s=%s; Path=/; Max-Age=%d; HttpOnly; Secure; SameSite=Lax' % (COOKIE, make_session(uid), SESSION_TTL)
|
|
return self._json({'user': uid, 'pubkey': ev['pubkey']}, 200, cookie)
|
|
if p == '/auth/logout':
|
|
return self._json({'ok': True}, 200, '%s=; Path=/; Max-Age=0; HttpOnly; Secure; SameSite=Lax' % COOKIE)
|
|
self.send_response(404); self.end_headers()
|
|
def log_message(self, *a): pass
|
|
|
|
def startup():
|
|
time.sleep(8)
|
|
try: ensure_wallet()
|
|
except Exception as e: print('wallet init:', e, flush=True)
|
|
with LOCK:
|
|
try: arm_all(lambda *a, **k: None)
|
|
except Exception as e: print('startup arm:', e, flush=True)
|
|
|
|
if __name__ == '__main__':
|
|
threading.Thread(target=startup, daemon=True).start()
|
|
threading.Thread(target=status_poller, daemon=True).start()
|
|
threading.Thread(target=heal_loop, daemon=True).start()
|
|
print('interactive signet orchestrator on :8099', flush=True)
|
|
ThreadingHTTPServer(('127.0.0.1', 8099), H).serve_forever()
|