""" Low-level CKBunker client. This talks CKBunker's own WebSocket protocol — the same one its Vue.js web UI uses. It is NOT a wrapper around upstream CKBunker's Python SDK; at the time of writing (v0.9.1) the upstream `ckbunker` CLI has a broken import path and there is no packaged client library. See docs/PROTOCOL.md for why a hand- rolled WebSocket client is the right choice here. The Client is intentionally minimal: one HTTP GET to obtain a session cookie and WebSocket URL, one WebSocket connection per operation (or one shared session if you ask for batch mode), and a dozen message types. It surfaces signing outcomes as a SignResult dataclass so the harness can assert on specific outcomes — including *expected rejections*, which matter as much as successes when validating a policy. """ from __future__ import annotations import asyncio import base64 import enum import hashlib import json import re import time from contextlib import asynccontextmanager from dataclasses import dataclass, field from typing import AsyncIterator try: import requests except ImportError as e: # pragma: no cover raise SystemExit("requests is required: pip install requests") from e try: import websockets except ImportError as e: # pragma: no cover raise SystemExit("websockets is required: pip install websockets") from e # --------------------------------------------------------------------------- # Data types # --------------------------------------------------------------------------- class SignStatus(enum.Enum): """Outcome of a signing attempt. SIGNED — Coldcard signed and returned a PSBT / finalised tx. REJECTED — Coldcard returned a rejection (policy violation, bad TOTP, missing user auth). The `reason` field carries the human-readable reason string from the Coldcard. TIMEOUT — No response within the signing deadline. WS_ERROR — WebSocket/transport failure before we got to a decision. """ SIGNED = "signed" REJECTED = "rejected" TIMEOUT = "timeout" WS_ERROR = "ws_error" @dataclass class SignResult: status: SignStatus signed_bytes: bytes | None = None # when status == SIGNED and not finalised signed_hex: str | None = None # when status == SIGNED and finalised reason: str | None = None # when status == REJECTED error: str | None = None # when status == WS_ERROR / TIMEOUT elapsed_seconds: float = 0.0 raw_frames: list[str] = field(default_factory=list) # captured frames for debugging def ok(self) -> bool: return self.status == SignStatus.SIGNED def is_expected_rejection(self, expect_phrase: str | None = None) -> bool: """True if the Coldcard rejected AND the reason contains the expected phrase. When validating policy you usually want to assert *the specific rejection reason* matches (e.g. "rule #1: need user(s) confirmation"), not just that some rejection happened. """ if self.status != SignStatus.REJECTED: return False if expect_phrase is None: return True return (self.reason or "").lower().find(expect_phrase.lower()) != -1 @dataclass class MessageSignResult: status: SignStatus address: str | None = None signature: str | None = None reason: str | None = None error: str | None = None # --------------------------------------------------------------------------- # Client # --------------------------------------------------------------------------- class Client: """CKBunker signing client. Typical use: client = Client("http://100.80.63.14:9823", totp_secret="...") async with client.session() as session: result = await session.sign_psbt(psbt_bytes, use_totp=True) For batch signing reuse the same `session`; it keeps the WebSocket open. """ def __init__( self, base_url: str, *, cf_access_client_id: str | None = None, cf_access_client_secret: str | None = None, totp_secret: str | None = None, user: str = "mineracks", verbose: bool = False, ) -> None: self.base_url = base_url.rstrip("/") self.cf_id = cf_access_client_id self.cf_secret = cf_access_client_secret self.totp_secret = totp_secret self.user = user self.verbose = verbose # -- HTTP: session cookie + WebSocket URL ------------------------------- def _cf_headers(self) -> dict[str, str]: headers: dict[str, str] = {} if self.cf_id: headers["CF-Access-Client-Id"] = self.cf_id if self.cf_secret: headers["CF-Access-Client-Secret"] = self.cf_secret return headers def fetch_ws_endpoint(self, timeout: float = 15.0) -> tuple[str, str]: """Hit the CKBunker root page and return (ws_url, cookie_header). Why: CKBunker's aiohttp session binds the Vue app to a cookie. The WebSocket URL is embedded in the page HTML (path like /websocket/). The same cookie must be presented on the WS upgrade or the server rejects the connection. """ resp = requests.get( self.base_url + "/", headers=self._cf_headers(), timeout=timeout, ) resp.raise_for_status() cookies = "; ".join(f"{k}={v}" for k, v in resp.cookies.items()) ws_url = self._extract_ws_url(resp.text) return ws_url, cookies def _extract_ws_url(self, html: str) -> str: """Find the WebSocket path in the CKBunker page HTML. CKBunker embeds the WS path in the rendered template. We accept several spellings so newer CKBunker revisions don't silently break us. """ patterns = [ r"['\"](/websocket/[A-Za-z0-9+/=_-]+)['\"]", r"ws_url\s*[=:]\s*['\"]([^'\"]+)['\"]", r"new WebSocket\([^)]*['\"]([^'\"]*websocket[^'\"]*)['\"]", ] for pat in patterns: m = re.search(pat, html) if not m: continue path = m.group(1) if path.startswith("/"): host = self.base_url.replace("https://", "").replace("http://", "") scheme = "wss" if self.base_url.startswith("https") else "ws" return f"{scheme}://{host}{path}" return path # Fallback — catches early regressions where CKBunker drops the token. host = self.base_url.replace("https://", "").replace("http://", "") scheme = "wss" if self.base_url.startswith("https") else "ws" return f"{scheme}://{host}/websocket/" # -- TOTP --------------------------------------------------------------- def current_totp(self) -> tuple[str, int, int]: """Generate a TOTP (code, remaining_seconds, window_counter). The window_counter matches what CKBunker's server computes as `int(time.time()) // 30`, which it expects in auth_offer_guess args[1]. """ if not self.totp_secret: raise RuntimeError("TOTP requested but no secret configured") import pyotp totp = pyotp.TOTP(self.totp_secret) code = totp.now() remaining = totp.interval - (int(time.time()) % totp.interval) window = int(time.time()) // 30 return code, remaining, window # -- Session context ---------------------------------------------------- @asynccontextmanager async def session(self) -> AsyncIterator["_Session"]: """Open an authenticated WebSocket session. Use the yielded `_Session` for one or more sign_psbt / sign_message calls. The WebSocket closes cleanly on exit from the `async with`. """ ws_url, cookies = self.fetch_ws_endpoint() if self.verbose: print(f"[ws] {ws_url}") extra_headers = self._cf_headers() if cookies: extra_headers["Cookie"] = cookies async with websockets.connect( ws_url, additional_headers=extra_headers, ping_interval=10, ping_timeout=30, close_timeout=10, ) as ws: session = _Session(ws, self) await session._handshake() yield session # --------------------------------------------------------------------------- # Session — owns one open WebSocket # --------------------------------------------------------------------------- class _Session: """One open WebSocket, with helpers for the operations the harness needs.""" def __init__(self, ws: "websockets.WebSocketClientProtocol", client: Client) -> None: self._ws = ws self._client = client self._frames: list[str] = [] async def _send(self, action: str, args: list) -> None: payload = json.dumps({"action": action, "args": args}) if self._client.verbose: print(f"[→] {payload[:200]}") await self._ws.send(payload) async def _recv(self, timeout: float) -> dict | None: try: msg = await asyncio.wait_for(self._ws.recv(), timeout=timeout) except asyncio.TimeoutError: return None if not isinstance(msg, str): return None self._frames.append(msg) if self._client.verbose: print(f"[←] {msg[:200]}") try: return json.loads(msg) except json.JSONDecodeError: return None async def _drain(self, seconds: float = 1.0) -> None: deadline = time.time() + seconds while time.time() < deadline: if await self._recv(timeout=0.5) is None: break async def _handshake(self) -> None: """Send `_connected` and drain the initial HSM status frame.""" await self._send("_connected", ["/"]) await self._drain(1.0) # -- Public operations ------------------------------------------------ async def sign_psbt( self, psbt_bytes: bytes, *, use_totp: bool = False, totp_code: str | None = None, finalize: bool = False, timeout_seconds: float = 30.0, ) -> SignResult: """Upload and attempt to sign a PSBT. When `use_totp=True` the client will auto-generate a code from the configured TOTP secret (unless `totp_code` is passed explicitly). Returns a SignResult regardless of outcome — rejections are not exceptions. """ start = time.time() try: psbt_b64 = base64.b64encode(psbt_bytes).decode("ascii") psbt_sha = hashlib.sha256(psbt_bytes).hexdigest() # Step 1 — upload PSBT await self._send("upload_psbt", [len(psbt_bytes), psbt_sha, psbt_b64]) await self._drain(2.0) # Step 2 — optional TOTP authorisation if use_totp or totp_code: if not totp_code: totp_code, _remaining, _window = self._client.current_totp() window = int(time.time()) // 30 await self._send("auth_offer_guess", [0, window, totp_code]) await self._drain(2.0) # Step 3 — submit for signing # Args shape (observed in CKBunker 0.9.1): # [psbt_sha, broadcast, finalize, download] await self._send("submit_psbt", [psbt_sha, False, finalize, True]) # Step 4 — poll for decision deadline = start + timeout_seconds while time.time() < deadline: data = await self._recv(timeout=5.0) if data is None: continue # Rejection surfaces via a modal dialog containing "Rejected". if "show_modal" in data and "html" in data: html = data["html"] if "Failed" in html or "Rejected" in html: reason = self._extract_reason(html) return SignResult( status=SignStatus.REJECTED, reason=reason, elapsed_seconds=time.time() - start, raw_frames=list(self._frames), ) # Success surfaces via a local_download frame. if "local_download" in data: dl = data["local_download"] raw = dl.get("data", "") is_b64 = dl.get("is_b64", False) if finalize: return SignResult( status=SignStatus.SIGNED, signed_hex=raw, elapsed_seconds=time.time() - start, raw_frames=list(self._frames), ) decoded = base64.b64decode(raw) if is_b64 else raw.encode() return SignResult( status=SignStatus.SIGNED, signed_bytes=decoded, elapsed_seconds=time.time() - start, raw_frames=list(self._frames), ) return SignResult( status=SignStatus.TIMEOUT, error=f"no decision within {timeout_seconds}s", elapsed_seconds=time.time() - start, raw_frames=list(self._frames), ) except Exception as e: return SignResult( status=SignStatus.WS_ERROR, error=f"{type(e).__name__}: {e}", elapsed_seconds=time.time() - start, raw_frames=list(self._frames), ) async def sign_message( self, message: str, *, derivation_path: str = "m/84'/0'/0'/1", address_format: str = "segwit", timeout_seconds: float = 20.0, ) -> MessageSignResult: """Sign a text message. Coldcard policy must allow the derivation path. Returns (address, signature) on success. CKBunker 0.9.1 surfaces these in a `message_signed` or `show_result` frame depending on version; we accept either. """ start = time.time() try: await self._send( "sign_message", [message, derivation_path, address_format], ) deadline = start + timeout_seconds while time.time() < deadline: data = await self._recv(timeout=5.0) if data is None: continue if "show_modal" in data and "html" in data: html = data["html"] if "Failed" in html or "Rejected" in html: return MessageSignResult( status=SignStatus.REJECTED, reason=self._extract_reason(html), ) # Two possible success shapes. if "message_signed" in data: ms = data["message_signed"] return MessageSignResult( status=SignStatus.SIGNED, address=ms.get("address"), signature=ms.get("signature"), ) if "local_download" in data: dl = data["local_download"] raw = dl.get("data", "") # The signed message usually comes back as # "\n
\n" on separate lines. parts = raw.strip().splitlines() if len(parts) >= 2: return MessageSignResult( status=SignStatus.SIGNED, signature=parts[0], address=parts[1], ) return MessageSignResult( status=SignStatus.TIMEOUT, error=f"no signature within {timeout_seconds}s", ) except Exception as e: return MessageSignResult( status=SignStatus.WS_ERROR, error=f"{type(e).__name__}: {e}", ) # -- Helpers ---------------------------------------------------------- @staticmethod def _extract_reason(html: str) -> str: """Pull a human-readable rejection reason out of a CKBunker modal. CKBunker renders rejections as HTML like:

Rejected by Coldcard.

Rejected: rule #1: need user(s) confirmation, rule #2: ...

We keep only the "Rejected: ..." line because that is the verbatim policy decision from the Coldcard. """ m = re.findall(r"Rejected[^<]*", html) if not m: return html[:200] # The policy line is usually the *last* "Rejected:" match. for line in reversed(m): if ":" in line: return line.strip() return m[-1].strip() def captured_frames(self) -> list[str]: """All raw JSON frames received this session — useful for debugging.""" return list(self._frames)