Initial import: CKBunker HSM validation harness
WebSocket client + CLI harness + pytest suite that exercises each axis of a CKBunker + Coldcard Mk4 policy and asserts the expected outcomes, including the critical negative test that a large PSBT without TOTP is rejected with a specific 'rule #1: need user(s) confirmation' reason. Configuration via .env / YAML / CLI flags, two pre-crafted test PSBTs as fixtures (generation guide in fixtures/README.md), dashboard counter scraper as sanity check, design rationale in docs/.
This commit is contained in:
commit
9d380f5013
33
.env.example
Normal file
33
.env.example
Normal file
@ -0,0 +1,33 @@
|
||||
# CKBunker base URL.
|
||||
# - For Tailscale/private ingress use http://<tailnet-ip>:9823
|
||||
# - For public Cloudflare-fronted deployment use https://your.hostname
|
||||
# Tailscale is strongly preferred for this harness because Cloudflare Access
|
||||
# with service tokens does not pass the WebSocket upgrade cleanly.
|
||||
CKBUNKER_URL=http://100.80.63.14:9823
|
||||
|
||||
# Cloudflare Access service token (only needed if hitting a CF-Access-protected URL).
|
||||
# Leave blank when talking to the Tailscale IP directly.
|
||||
CF_ACCESS_CLIENT_ID=
|
||||
CF_ACCESS_CLIENT_SECRET=
|
||||
|
||||
# TOTP shared secret for the HSM user.
|
||||
# Issued by `ckcc user -t -q <username>` during enrolment (base32 string).
|
||||
# The harness uses this to auto-generate codes for Rule #1 tests.
|
||||
TOTP_SECRET=
|
||||
|
||||
# HSM user that matches the user named in the Coldcard's policy (typically the
|
||||
# one TOTP is bound to).
|
||||
HSM_USER=mineracks
|
||||
|
||||
# Path to a pre-crafted "small" PSBT whose value is <= your auto-approve cap
|
||||
# (Rule #2 equivalent). See fixtures/README.md for how to generate this.
|
||||
SMALL_PSBT_PATH=fixtures/small.psbt
|
||||
|
||||
# Path to a pre-crafted "large" PSBT whose value exceeds the auto-approve cap
|
||||
# but fits inside the 2FA-authorised cap (Rule #1 equivalent).
|
||||
LARGE_PSBT_PATH=fixtures/large.psbt
|
||||
|
||||
# Optional: Sparrow/Bitcoin Core address to verify a signed test message against.
|
||||
# Must match the derivation path below and belong to the Coldcard seed.
|
||||
MESSAGE_SIGN_ADDRESS=
|
||||
MESSAGE_SIGN_PATH=m/84'/0'/0'/1
|
||||
59
.gitignore
vendored
Normal file
59
.gitignore
vendored
Normal file
@ -0,0 +1,59 @@
|
||||
# Secrets — never commit
|
||||
.env
|
||||
.env.local
|
||||
*.pem
|
||||
*.key
|
||||
config.yaml
|
||||
config.local.yaml
|
||||
|
||||
# PSBTs that might hold real tx data
|
||||
fixtures/*.psbt
|
||||
fixtures/*.tx
|
||||
!fixtures/README.md
|
||||
|
||||
# Signed output
|
||||
signed/
|
||||
*_signed.psbt
|
||||
*_signed.tx
|
||||
|
||||
# Python
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
*.so
|
||||
.Python
|
||||
venv/
|
||||
env/
|
||||
.venv/
|
||||
ENV/
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
|
||||
# Testing
|
||||
.pytest_cache/
|
||||
.coverage
|
||||
htmlcov/
|
||||
.tox/
|
||||
|
||||
# IDE
|
||||
.vscode/
|
||||
.idea/
|
||||
*.swp
|
||||
.DS_Store
|
||||
|
||||
# Local working dirs
|
||||
tmp/
|
||||
scratch/
|
||||
21
LICENSE
Normal file
21
LICENSE
Normal file
@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2026 Mineracks
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
454
README.md
Normal file
454
README.md
Normal file
@ -0,0 +1,454 @@
|
||||
# mineracks-ckbunker-hsm-sign
|
||||
|
||||
**Production validation test harness for [CKBunker](https://github.com/Coldcard/ckbunker) + [Coldcard Mk4](https://coldcardwallet.com) HSM deployments.**
|
||||
|
||||
Runs a short, structured sequence of tests against a live CKBunker and exits non-zero if anything fails. Designed to be run once after setup, periodically from a monitor, or as a CI gate on configuration changes — so a silently-broken policy doesn't stay silent.
|
||||
|
||||
> **The critical test**: a transaction above your auto-approve cap is submitted without 2FA. The Coldcard must reject it with a specific `rule #1: need user(s) confirmation` error. If it signs, something is catastrophically wrong with your policy and the harness exits with a loud failure.
|
||||
|
||||
---
|
||||
|
||||
## Table of contents
|
||||
|
||||
- [What this is, what it isn't](#what-this-is-what-it-isnt)
|
||||
- [The test sequence](#the-test-sequence)
|
||||
- [Requirements](#requirements)
|
||||
- [Quick start](#quick-start)
|
||||
- [Configuration](#configuration)
|
||||
- [Generating test PSBTs](#generating-test-psbts)
|
||||
- [Running as a CLI](#running-as-a-cli)
|
||||
- [Running under pytest](#running-under-pytest)
|
||||
- [Example output](#example-output)
|
||||
- [Using it as a library](#using-it-as-a-library)
|
||||
- [CI integration](#ci-integration)
|
||||
- [Design rationale](#design-rationale)
|
||||
- [Troubleshooting](#troubleshooting)
|
||||
- [Project layout](#project-layout)
|
||||
- [License](#license)
|
||||
|
||||
---
|
||||
|
||||
## What this is, what it isn't
|
||||
|
||||
### Is
|
||||
|
||||
- A **validation harness** for a CKBunker + Coldcard HSM that you already
|
||||
have set up and policy-loaded.
|
||||
- A **reusable WebSocket client library** for CKBunker
|
||||
([`ckbunker_hsm_sign/client.py`](ckbunker_hsm_sign/client.py)) that you
|
||||
can import into your own automation (BTCPay plugins, n8n scripts, custom
|
||||
signers).
|
||||
- A set of **pytest tests** that assert each axis of the policy works.
|
||||
|
||||
### Isn't
|
||||
|
||||
- Not a setup tool — use upstream CKBunker's docs to get your bunker
|
||||
running and your policy loaded first.
|
||||
- Not a key / seed tool — it never sees the seed and doesn't try to.
|
||||
- Not a PSBT creator — you supply the test fixtures. See
|
||||
[fixtures/README.md](fixtures/README.md) for how to make them.
|
||||
- Not a broadcaster — `submit_psbt` is always called with `broadcast=False`.
|
||||
Nothing in this harness reaches the mempool.
|
||||
|
||||
---
|
||||
|
||||
## The test sequence
|
||||
|
||||
| # | Test | What it asserts |
|
||||
|---|-----------------------------------|--------------------------------------------------------------------------------------------------------------|
|
||||
| 1 | `connectivity` | HTTP on the CKBunker URL answers and exposes a WebSocket path. Session cookie is obtainable. |
|
||||
| 2 | `message_signing` | An arbitrary test message signs on your policy-allowed BIP32 path. Cheapest Coldcard reachability test. |
|
||||
| 3 | `rule2_auto_approve` | A PSBT ≤ your auto-approve cap signs **without** any TOTP. |
|
||||
| 4 | `rule1_without_totp_rejects` | A PSBT above your auto-approve cap is **rejected** when no TOTP is supplied. **The critical assertion.** |
|
||||
| 5 | `rule1_with_totp_signs` | The same PSBT signs when a fresh TOTP code is submitted. |
|
||||
| 6 | `counters_tracked` | Server-visible `Approvals` / `Refusals` counters moved by the expected amounts during tests 3–5. |
|
||||
|
||||
Tests 3–5 together **exercise both sides of every policy rule** in under a minute.
|
||||
|
||||
Tests are independently skippable via `config.yaml` or the `--tests` / `--skip` flags.
|
||||
|
||||
---
|
||||
|
||||
## Requirements
|
||||
|
||||
- **A running CKBunker** (tested against `v0.9.1`, commit `8526755`).
|
||||
- **A Coldcard Mk4** paired to the CKBunker, in HSM mode, with a
|
||||
**two-tier policy** loaded. The harness's default expectations match the
|
||||
pattern documented in [`docs/POLICY_RECOMMENDATIONS.md`](docs/POLICY_RECOMMENDATIONS.md),
|
||||
but the thresholds are configurable.
|
||||
- **Python 3.10+**.
|
||||
- **Network access** to the CKBunker's private ingress (Tailscale / VPN).
|
||||
The harness works via a Cloudflare-Access-fronted public URL for HTTP
|
||||
but WebSocket signing over CF Access with service tokens is unreliable —
|
||||
see [docs/WHY.md](docs/WHY.md).
|
||||
- **Two pre-crafted test PSBTs** — see [fixtures/README.md](fixtures/README.md).
|
||||
- **The TOTP shared secret** for the user named in your policy (required
|
||||
for test 5 only; test 4 runs without it).
|
||||
|
||||
---
|
||||
|
||||
## Quick start
|
||||
|
||||
```bash
|
||||
git clone https://git.mineracks.com/mineracks/mineracks-ckbunker-hsm-sign.git
|
||||
cd mineracks-ckbunker-hsm-sign
|
||||
|
||||
python3 -m venv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements.txt
|
||||
|
||||
cp .env.example .env
|
||||
$EDITOR .env # set CKBUNKER_URL, TOTP_SECRET, etc.
|
||||
|
||||
# Generate or copy in two PSBTs — see fixtures/README.md
|
||||
# fixtures/small.psbt (≤ auto-approve cap)
|
||||
# fixtures/large.psbt (> auto-approve cap, ≤ user-auth cap)
|
||||
|
||||
./hsm_validate.py
|
||||
```
|
||||
|
||||
A full run takes 10–30 seconds once the bunker and Coldcard are warm.
|
||||
|
||||
---
|
||||
|
||||
## Configuration
|
||||
|
||||
Three sources, in precedence order (highest wins):
|
||||
|
||||
1. **CLI flags** — `--url`, `--tests`, `--skip`, `--verbose`, …
|
||||
2. **`config.yaml`** (optional) — passed via `--config`. See [`config.example.yaml`](config.example.yaml).
|
||||
3. **`.env`** (auto-loaded from the CWD if present). See [`.env.example`](.env.example).
|
||||
|
||||
The same loader is used by `pytest`, so whatever you configure for the CLI
|
||||
applies to the test suite too.
|
||||
|
||||
### Required settings
|
||||
|
||||
| Setting | Source | Required for |
|
||||
|--------------------|--------------------------|-------------------------------------|
|
||||
| CKBunker URL | `CKBUNKER_URL` / `--url` | all tests |
|
||||
| Small PSBT | `SMALL_PSBT_PATH` | `rule2_auto_approve` |
|
||||
| Large PSBT | `LARGE_PSBT_PATH` | `rule1_without_totp_rejects`, `rule1_with_totp_signs` |
|
||||
| TOTP secret | `TOTP_SECRET` | `rule1_with_totp_signs` |
|
||||
| HSM user | `HSM_USER` | anywhere that user auth is involved |
|
||||
|
||||
### Optional settings
|
||||
|
||||
| Setting | Source | Purpose |
|
||||
|-----------------------|----------------------------|-----------------------------------------|
|
||||
| Cloudflare Access id | `CF_ACCESS_CLIENT_ID` | HTTP through CF Access (not WS) |
|
||||
| CF Access secret | `CF_ACCESS_CLIENT_SECRET` | HTTP through CF Access (not WS) |
|
||||
| Message sign path | `MESSAGE_SIGN_PATH` | `message_signing` uses this derivation |
|
||||
| Message sign address | `MESSAGE_SIGN_ADDRESS` | If set, verified against signature |
|
||||
| Verbose frames | `--verbose` / `-v` | Dump every WebSocket frame to stdout |
|
||||
| Save signed PSBTs | `--save-signed <dir>` | Keep the signed outputs for inspection |
|
||||
|
||||
---
|
||||
|
||||
## Generating test PSBTs
|
||||
|
||||
See [`fixtures/README.md`](fixtures/README.md) for three methods (Sparrow,
|
||||
`bitcoin-cli`, reusing stale UTXOs). The short version:
|
||||
|
||||
1. Build a **watch-only wallet** from your Coldcard xpub in Sparrow.
|
||||
2. Construct two payments from that wallet to any address you control:
|
||||
- One just under your auto-approve cap (`small.psbt`).
|
||||
- One comfortably above the cap but inside the user-auth cap (`large.psbt`).
|
||||
3. Export both as **PSBT** (binary or base64) into `fixtures/`.
|
||||
|
||||
The harness never broadcasts; it signs, optionally writes the signed
|
||||
result to disk, and discards. `large.psbt` can be re-used indefinitely —
|
||||
the rejection path is deterministic regardless of UTXO state.
|
||||
|
||||
---
|
||||
|
||||
## Running as a CLI
|
||||
|
||||
```bash
|
||||
# Full run
|
||||
./hsm_validate.py
|
||||
|
||||
# With a config file
|
||||
./hsm_validate.py --config config.yaml
|
||||
|
||||
# Override a single setting
|
||||
./hsm_validate.py --url http://10.0.0.14:9823
|
||||
|
||||
# Only the critical negative test
|
||||
./hsm_validate.py --tests rule1_without_totp_rejects
|
||||
|
||||
# Everything except the TOTP sign test (e.g. during TOTP rotation)
|
||||
./hsm_validate.py --skip rule1_with_totp_signs
|
||||
|
||||
# Very verbose (dumps every WebSocket frame)
|
||||
./hsm_validate.py --verbose
|
||||
|
||||
# Save signed PSBTs for inspection
|
||||
./hsm_validate.py --save-signed /tmp/hsm-validate-signed
|
||||
```
|
||||
|
||||
Exit codes:
|
||||
|
||||
- `0` — all enabled tests passed (or were skipped).
|
||||
- `1` — at least one test failed.
|
||||
- `2` — configuration error.
|
||||
|
||||
---
|
||||
|
||||
## Running under pytest
|
||||
|
||||
```bash
|
||||
pip install pytest pytest-asyncio
|
||||
pytest -v tests/
|
||||
```
|
||||
|
||||
The pytest session reads the same `.env` / `config.yaml` that the CLI does.
|
||||
Each test file corresponds to one test in the CLI sequence:
|
||||
|
||||
```
|
||||
tests/test_01_connectivity.py
|
||||
tests/test_02_message_signing.py
|
||||
tests/test_03_rule2_auto_approve.py
|
||||
tests/test_04_rule1_without_totp_rejects.py ← the critical negative test
|
||||
tests/test_05_rule1_with_totp_signs.py
|
||||
tests/test_06_counters_tracked.py
|
||||
```
|
||||
|
||||
Run only the critical test:
|
||||
|
||||
```bash
|
||||
pytest -v tests/test_04_rule1_without_totp_rejects.py
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Example output
|
||||
|
||||
```text
|
||||
Target: http://100.80.63.14:9823
|
||||
User: mineracks
|
||||
Policy: ≤10000 sats auto, ≤100000 sats with TOTP
|
||||
────────────────────────────────────────────────────────────────────────
|
||||
✓ connectivity HTTP + WS endpoint reachable (0.3s)
|
||||
WebSocket URL: ws://100.80.63.14:9823/websocket/CBG5KH5BCCG6W3BXDH5QQY5Q
|
||||
Session cookies: yes
|
||||
✓ message_signing signed via Coldcard (0.9s)
|
||||
Address: bc1qy926zzc4yw8f0gd6tvdy2fm0hr4a4tx3u4963h
|
||||
Signature: JyeJVJuBuVB0M79FFDLrfz10j7NtGRSac+7Oj0dpyZ/MePoh...
|
||||
✓ rule2_auto_approve signed without TOTP (395 bytes) (1.1s)
|
||||
✓ rule1_without_totp_rejects rejected as expected — Rejected: rule #1: need user(s) confirmation, rule #2: would exceed period spending (1.2s)
|
||||
✓ rule1_with_totp_signs signed with TOTP (395 bytes) (1.4s)
|
||||
✓ counters_tracked dashboard counters moved as expected (0.4s)
|
||||
Approvals: 2 → 4 (Δ2)
|
||||
Refusals: 0 → 1 (Δ1)
|
||||
Amount spent: 0.00009 → 0.00109 BTC
|
||||
────────────────────────────────────────────────────────────────────────
|
||||
|
||||
6 passed, 0 failed, 0 skipped
|
||||
```
|
||||
|
||||
A failure — the one you actually want to catch — looks like this:
|
||||
|
||||
```text
|
||||
✗ rule1_without_totp_rejects policy NOT enforced: large PSBT was signed without TOTP — STOP AND INVESTIGATE
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Using it as a library
|
||||
|
||||
The WebSocket client is reusable standalone:
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from ckbunker_hsm_sign import Client
|
||||
|
||||
async def main():
|
||||
client = Client(
|
||||
base_url="http://100.80.63.14:9823",
|
||||
totp_secret="JBSWY3DPEHPK3PXP",
|
||||
)
|
||||
async with client.session() as session:
|
||||
psbt = Path("mytx.psbt").read_bytes()
|
||||
result = await session.sign_psbt(psbt, use_totp=True)
|
||||
if result.ok():
|
||||
Path("signed.psbt").write_bytes(result.signed_bytes)
|
||||
else:
|
||||
print("sign failed:", result.status.value, result.reason)
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
Batch signing is just sequential sign calls inside the same session —
|
||||
the WebSocket stays open.
|
||||
|
||||
See [`docs/PROTOCOL.md`](docs/PROTOCOL.md) for the full protocol reference.
|
||||
|
||||
---
|
||||
|
||||
## CI integration
|
||||
|
||||
The CLI exits 0/1/2, which is all a CI runner needs. Minimal examples:
|
||||
|
||||
### Gitea Actions / GitHub Actions
|
||||
|
||||
```yaml
|
||||
name: validate-hsm
|
||||
on:
|
||||
schedule: [{ cron: "0 6 * * *" }] # 6 AM daily
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
validate:
|
||||
runs-on: self-hosted # needs Tailscale access
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
with: { python-version: "3.12" }
|
||||
- run: pip install -r requirements.txt
|
||||
- run: ./hsm_validate.py
|
||||
env:
|
||||
CKBUNKER_URL: ${{ secrets.CKBUNKER_URL }}
|
||||
TOTP_SECRET: ${{ secrets.TOTP_SECRET }}
|
||||
SMALL_PSBT_PATH: fixtures/small.psbt
|
||||
LARGE_PSBT_PATH: fixtures/large.psbt
|
||||
```
|
||||
|
||||
### Cron / oncall monitor
|
||||
|
||||
```cron
|
||||
# Every hour, email oncall if anything fails
|
||||
17 * * * * cd /opt/hsm-validate && ./hsm_validate.py >/tmp/hsm.out 2>&1 || mail -s "HSM validation FAILED" oncall@example.com < /tmp/hsm.out
|
||||
```
|
||||
|
||||
### Woodpecker / Drone
|
||||
|
||||
```yaml
|
||||
steps:
|
||||
- name: validate
|
||||
image: python:3.12
|
||||
commands:
|
||||
- pip install -r requirements.txt
|
||||
- ./hsm_validate.py
|
||||
secrets: [ ckbunker_url, totp_secret ]
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Design rationale
|
||||
|
||||
Full reasoning lives in [`docs/WHY.md`](docs/WHY.md). Short version:
|
||||
|
||||
- **Explicit rejection assertions**, not "sign succeeded / no error".
|
||||
Policy failures are silent unless you check for the *specific* rejection
|
||||
reason.
|
||||
- **Two-tier policy as the default assumption**: auto-approve under X,
|
||||
TOTP under Y, reject above. This matches what most HSM-backed Bitcoin
|
||||
operations look like; adjust thresholds in config.
|
||||
- **Pre-crafted fixtures** instead of PSBT generation — keeps the harness
|
||||
deployment-agnostic and avoids needing the Coldcard's xpub / spendable
|
||||
UTXOs at harness-build time.
|
||||
- **Hand-rolled WebSocket client** — upstream CKBunker doesn't ship a
|
||||
Python client library; the `ckbunker` console script has a broken
|
||||
import path in v0.9.1.
|
||||
- **No broadcast, ever** — the harness always calls `submit_psbt` with
|
||||
`broadcast=False`. A validation run doesn't touch the mempool.
|
||||
|
||||
---
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### "HTTP fetch failed: 403"
|
||||
|
||||
You're hitting a Cloudflare-Access-protected URL without service token
|
||||
credentials. Either set `CF_ACCESS_CLIENT_ID` + `CF_ACCESS_CLIENT_SECRET`
|
||||
or switch `CKBUNKER_URL` to the private ingress (Tailscale IP).
|
||||
|
||||
### "timeout: no decision within 30s"
|
||||
|
||||
- Coldcard is not responding — check `lsusb | grep Coinkite` on the VM.
|
||||
- CKBunker is running but the Coldcard was detached after VM boot.
|
||||
Re-attach USB passthrough.
|
||||
- `ckbunker.service` is in a restart loop. Check `journalctl -u ckbunker`.
|
||||
|
||||
### `rule1_without_totp_rejects` → FAIL: "policy NOT enforced"
|
||||
|
||||
Stop the harness. Immediately verify the policy on the Coldcard:
|
||||
|
||||
1. Exit HSM mode via the Boot-to-HSM escape code (press `X`, code, `✔`
|
||||
within 60s of power on).
|
||||
2. Menu → Advanced → HSM → review installed policy.
|
||||
3. If the policy is missing or the user-auth rule is gone, reload it
|
||||
from your policy YAML via MicroSD.
|
||||
|
||||
### `message_signing` passes but PSBT tests fail
|
||||
|
||||
Coldcard is reachable but probably in a weird mode. Check the Coldcard's
|
||||
own screen for an error banner. Usually solved by a service restart:
|
||||
|
||||
```bash
|
||||
sudo systemctl restart ckbunker
|
||||
```
|
||||
|
||||
### Counters test skipped
|
||||
|
||||
Your CKBunker version renders the dashboard differently from what the
|
||||
scraper's regexes expect. This is a soft skip — the signing tests
|
||||
already prove correctness. File an issue with the page HTML if you want
|
||||
scraper support for your version.
|
||||
|
||||
### "TOTP_SECRET not configured" but I set it
|
||||
|
||||
`TOTP_SECRET` must be a **base32** secret (usually 16+ chars, letters A-Z
|
||||
and digits 2-7). If you stored a QR-code URL, extract the `secret=…`
|
||||
parameter from it.
|
||||
|
||||
---
|
||||
|
||||
## Project layout
|
||||
|
||||
```
|
||||
.
|
||||
├── README.md ← this file
|
||||
├── LICENSE ← MIT
|
||||
├── requirements.txt
|
||||
├── pyproject.toml ← optional `pip install -e .`
|
||||
├── .env.example ← environment variable template
|
||||
├── config.example.yaml ← YAML config template
|
||||
├── hsm_validate.py ← CLI entry point
|
||||
│
|
||||
├── ckbunker_hsm_sign/ ← library
|
||||
│ ├── __init__.py
|
||||
│ ├── client.py ← WebSocket + HTTP client
|
||||
│ ├── config.py ← .env + YAML loader
|
||||
│ ├── harness.py ← CLI test runner / reporter
|
||||
│ └── scraper.py ← dashboard counter scraper
|
||||
│
|
||||
├── tests/ ← pytest suite (same tests, different runner)
|
||||
│ ├── conftest.py
|
||||
│ ├── test_01_connectivity.py
|
||||
│ ├── test_02_message_signing.py
|
||||
│ ├── test_03_rule2_auto_approve.py
|
||||
│ ├── test_04_rule1_without_totp_rejects.py ← the critical negative test
|
||||
│ ├── test_05_rule1_with_totp_signs.py
|
||||
│ └── test_06_counters_tracked.py
|
||||
│
|
||||
├── fixtures/
|
||||
│ └── README.md ← how to generate test PSBTs
|
||||
│
|
||||
└── docs/
|
||||
├── PROTOCOL.md ← CKBunker WebSocket protocol reference
|
||||
├── WHY.md ← design rationale
|
||||
└── POLICY_RECOMMENDATIONS.md ← how to design a two-tier policy
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## License
|
||||
|
||||
MIT — see [`LICENSE`](LICENSE).
|
||||
|
||||
This project is not affiliated with Coinkite or the Coldcard team. "Coldcard"
|
||||
and "CKBunker" are products of [Coinkite Inc.](https://coinkite.com). This
|
||||
harness is an independent validation tool.
|
||||
23
ckbunker_hsm_sign/__init__.py
Normal file
23
ckbunker_hsm_sign/__init__.py
Normal file
@ -0,0 +1,23 @@
|
||||
"""CKBunker HSM production validation harness.
|
||||
|
||||
Public API:
|
||||
Client — low-level WebSocket client (connect, upload, sign)
|
||||
SignResult — dataclass describing one signing attempt
|
||||
Harness — high-level test runner used by hsm_validate.py
|
||||
load_config — merge env + YAML + CLI into a Config object
|
||||
"""
|
||||
|
||||
from .client import Client, SignResult, SignStatus
|
||||
from .harness import Harness
|
||||
from .config import Config, load_config
|
||||
|
||||
__all__ = [
|
||||
"Client",
|
||||
"SignResult",
|
||||
"SignStatus",
|
||||
"Harness",
|
||||
"Config",
|
||||
"load_config",
|
||||
]
|
||||
|
||||
__version__ = "1.0.0"
|
||||
459
ckbunker_hsm_sign/client.py
Normal file
459
ckbunker_hsm_sign/client.py
Normal file
@ -0,0 +1,459 @@
|
||||
"""
|
||||
Low-level CKBunker client.
|
||||
|
||||
This talks CKBunker's own WebSocket protocol — the same one its Vue.js web UI
|
||||
uses. It is NOT a wrapper around upstream CKBunker's Python SDK; at the time
|
||||
of writing (v0.9.1) the upstream `ckbunker` CLI has a broken import path and
|
||||
there is no packaged client library. See docs/PROTOCOL.md for why a hand-
|
||||
rolled WebSocket client is the right choice here.
|
||||
|
||||
The Client is intentionally minimal: one HTTP GET to obtain a session cookie
|
||||
and WebSocket URL, one WebSocket connection per operation (or one shared
|
||||
session if you ask for batch mode), and a dozen message types. It surfaces
|
||||
signing outcomes as a SignResult dataclass so the harness can assert on
|
||||
specific outcomes — including *expected rejections*, which matter as much as
|
||||
successes when validating a policy.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import enum
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
from dataclasses import dataclass, field
|
||||
from typing import AsyncIterator
|
||||
|
||||
try:
|
||||
import requests
|
||||
except ImportError as e: # pragma: no cover
|
||||
raise SystemExit("requests is required: pip install requests") from e
|
||||
|
||||
try:
|
||||
import websockets
|
||||
except ImportError as e: # pragma: no cover
|
||||
raise SystemExit("websockets is required: pip install websockets") from e
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data types
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class SignStatus(enum.Enum):
|
||||
"""Outcome of a signing attempt.
|
||||
|
||||
SIGNED — Coldcard signed and returned a PSBT / finalised tx.
|
||||
REJECTED — Coldcard returned a rejection (policy violation, bad
|
||||
TOTP, missing user auth). The `reason` field carries the
|
||||
human-readable reason string from the Coldcard.
|
||||
TIMEOUT — No response within the signing deadline.
|
||||
WS_ERROR — WebSocket/transport failure before we got to a decision.
|
||||
"""
|
||||
|
||||
SIGNED = "signed"
|
||||
REJECTED = "rejected"
|
||||
TIMEOUT = "timeout"
|
||||
WS_ERROR = "ws_error"
|
||||
|
||||
|
||||
@dataclass
|
||||
class SignResult:
|
||||
status: SignStatus
|
||||
signed_bytes: bytes | None = None # when status == SIGNED and not finalised
|
||||
signed_hex: str | None = None # when status == SIGNED and finalised
|
||||
reason: str | None = None # when status == REJECTED
|
||||
error: str | None = None # when status == WS_ERROR / TIMEOUT
|
||||
elapsed_seconds: float = 0.0
|
||||
raw_frames: list[str] = field(default_factory=list) # captured frames for debugging
|
||||
|
||||
def ok(self) -> bool:
|
||||
return self.status == SignStatus.SIGNED
|
||||
|
||||
def is_expected_rejection(self, expect_phrase: str | None = None) -> bool:
|
||||
"""True if the Coldcard rejected AND the reason contains the expected phrase.
|
||||
|
||||
When validating policy you usually want to assert *the specific
|
||||
rejection reason* matches (e.g. "rule #1: need user(s) confirmation"),
|
||||
not just that some rejection happened.
|
||||
"""
|
||||
if self.status != SignStatus.REJECTED:
|
||||
return False
|
||||
if expect_phrase is None:
|
||||
return True
|
||||
return (self.reason or "").lower().find(expect_phrase.lower()) != -1
|
||||
|
||||
|
||||
@dataclass
|
||||
class MessageSignResult:
|
||||
status: SignStatus
|
||||
address: str | None = None
|
||||
signature: str | None = None
|
||||
reason: str | None = None
|
||||
error: str | None = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Client
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class Client:
|
||||
"""CKBunker signing client.
|
||||
|
||||
Typical use:
|
||||
|
||||
client = Client("http://100.80.63.14:9823", totp_secret="...")
|
||||
async with client.session() as session:
|
||||
result = await session.sign_psbt(psbt_bytes, use_totp=True)
|
||||
|
||||
For batch signing reuse the same `session`; it keeps the WebSocket open.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str,
|
||||
*,
|
||||
cf_access_client_id: str | None = None,
|
||||
cf_access_client_secret: str | None = None,
|
||||
totp_secret: str | None = None,
|
||||
user: str = "mineracks",
|
||||
verbose: bool = False,
|
||||
) -> None:
|
||||
self.base_url = base_url.rstrip("/")
|
||||
self.cf_id = cf_access_client_id
|
||||
self.cf_secret = cf_access_client_secret
|
||||
self.totp_secret = totp_secret
|
||||
self.user = user
|
||||
self.verbose = verbose
|
||||
|
||||
# -- HTTP: session cookie + WebSocket URL -------------------------------
|
||||
|
||||
def _cf_headers(self) -> dict[str, str]:
|
||||
headers: dict[str, str] = {}
|
||||
if self.cf_id:
|
||||
headers["CF-Access-Client-Id"] = self.cf_id
|
||||
if self.cf_secret:
|
||||
headers["CF-Access-Client-Secret"] = self.cf_secret
|
||||
return headers
|
||||
|
||||
def fetch_ws_endpoint(self, timeout: float = 15.0) -> tuple[str, str]:
|
||||
"""Hit the CKBunker root page and return (ws_url, cookie_header).
|
||||
|
||||
Why: CKBunker's aiohttp session binds the Vue app to a cookie. The
|
||||
WebSocket URL is embedded in the page HTML (path like
|
||||
/websocket/<TOKEN>). The same cookie must be presented on the WS
|
||||
upgrade or the server rejects the connection.
|
||||
"""
|
||||
resp = requests.get(
|
||||
self.base_url + "/",
|
||||
headers=self._cf_headers(),
|
||||
timeout=timeout,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
cookies = "; ".join(f"{k}={v}" for k, v in resp.cookies.items())
|
||||
|
||||
ws_url = self._extract_ws_url(resp.text)
|
||||
return ws_url, cookies
|
||||
|
||||
def _extract_ws_url(self, html: str) -> str:
|
||||
"""Find the WebSocket path in the CKBunker page HTML.
|
||||
|
||||
CKBunker embeds the WS path in the rendered template. We accept
|
||||
several spellings so newer CKBunker revisions don't silently break us.
|
||||
"""
|
||||
patterns = [
|
||||
r"['\"](/websocket/[A-Za-z0-9+/=_-]+)['\"]",
|
||||
r"ws_url\s*[=:]\s*['\"]([^'\"]+)['\"]",
|
||||
r"new WebSocket\([^)]*['\"]([^'\"]*websocket[^'\"]*)['\"]",
|
||||
]
|
||||
for pat in patterns:
|
||||
m = re.search(pat, html)
|
||||
if not m:
|
||||
continue
|
||||
path = m.group(1)
|
||||
if path.startswith("/"):
|
||||
host = self.base_url.replace("https://", "").replace("http://", "")
|
||||
scheme = "wss" if self.base_url.startswith("https") else "ws"
|
||||
return f"{scheme}://{host}{path}"
|
||||
return path
|
||||
|
||||
# Fallback — catches early regressions where CKBunker drops the token.
|
||||
host = self.base_url.replace("https://", "").replace("http://", "")
|
||||
scheme = "wss" if self.base_url.startswith("https") else "ws"
|
||||
return f"{scheme}://{host}/websocket/"
|
||||
|
||||
# -- TOTP ---------------------------------------------------------------
|
||||
|
||||
def current_totp(self) -> tuple[str, int, int]:
|
||||
"""Generate a TOTP (code, remaining_seconds, window_counter).
|
||||
|
||||
The window_counter matches what CKBunker's server computes as
|
||||
`int(time.time()) // 30`, which it expects in auth_offer_guess args[1].
|
||||
"""
|
||||
if not self.totp_secret:
|
||||
raise RuntimeError("TOTP requested but no secret configured")
|
||||
import pyotp
|
||||
totp = pyotp.TOTP(self.totp_secret)
|
||||
code = totp.now()
|
||||
remaining = totp.interval - (int(time.time()) % totp.interval)
|
||||
window = int(time.time()) // 30
|
||||
return code, remaining, window
|
||||
|
||||
# -- Session context ----------------------------------------------------
|
||||
|
||||
@asynccontextmanager
|
||||
async def session(self) -> AsyncIterator["_Session"]:
|
||||
"""Open an authenticated WebSocket session.
|
||||
|
||||
Use the yielded `_Session` for one or more sign_psbt / sign_message
|
||||
calls. The WebSocket closes cleanly on exit from the `async with`.
|
||||
"""
|
||||
ws_url, cookies = self.fetch_ws_endpoint()
|
||||
if self.verbose:
|
||||
print(f"[ws] {ws_url}")
|
||||
|
||||
extra_headers = self._cf_headers()
|
||||
if cookies:
|
||||
extra_headers["Cookie"] = cookies
|
||||
|
||||
async with websockets.connect(
|
||||
ws_url,
|
||||
additional_headers=extra_headers,
|
||||
ping_interval=10,
|
||||
ping_timeout=30,
|
||||
close_timeout=10,
|
||||
) as ws:
|
||||
session = _Session(ws, self)
|
||||
await session._handshake()
|
||||
yield session
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Session — owns one open WebSocket
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _Session:
|
||||
"""One open WebSocket, with helpers for the operations the harness needs."""
|
||||
|
||||
def __init__(self, ws: "websockets.WebSocketClientProtocol", client: Client) -> None:
|
||||
self._ws = ws
|
||||
self._client = client
|
||||
self._frames: list[str] = []
|
||||
|
||||
async def _send(self, action: str, args: list) -> None:
|
||||
payload = json.dumps({"action": action, "args": args})
|
||||
if self._client.verbose:
|
||||
print(f"[→] {payload[:200]}")
|
||||
await self._ws.send(payload)
|
||||
|
||||
async def _recv(self, timeout: float) -> dict | None:
|
||||
try:
|
||||
msg = await asyncio.wait_for(self._ws.recv(), timeout=timeout)
|
||||
except asyncio.TimeoutError:
|
||||
return None
|
||||
if not isinstance(msg, str):
|
||||
return None
|
||||
self._frames.append(msg)
|
||||
if self._client.verbose:
|
||||
print(f"[←] {msg[:200]}")
|
||||
try:
|
||||
return json.loads(msg)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
|
||||
async def _drain(self, seconds: float = 1.0) -> None:
|
||||
deadline = time.time() + seconds
|
||||
while time.time() < deadline:
|
||||
if await self._recv(timeout=0.5) is None:
|
||||
break
|
||||
|
||||
async def _handshake(self) -> None:
|
||||
"""Send `_connected` and drain the initial HSM status frame."""
|
||||
await self._send("_connected", ["/"])
|
||||
await self._drain(1.0)
|
||||
|
||||
# -- Public operations ------------------------------------------------
|
||||
|
||||
async def sign_psbt(
|
||||
self,
|
||||
psbt_bytes: bytes,
|
||||
*,
|
||||
use_totp: bool = False,
|
||||
totp_code: str | None = None,
|
||||
finalize: bool = False,
|
||||
timeout_seconds: float = 30.0,
|
||||
) -> SignResult:
|
||||
"""Upload and attempt to sign a PSBT.
|
||||
|
||||
When `use_totp=True` the client will auto-generate a code from the
|
||||
configured TOTP secret (unless `totp_code` is passed explicitly).
|
||||
Returns a SignResult regardless of outcome — rejections are not
|
||||
exceptions.
|
||||
"""
|
||||
start = time.time()
|
||||
try:
|
||||
psbt_b64 = base64.b64encode(psbt_bytes).decode("ascii")
|
||||
psbt_sha = hashlib.sha256(psbt_bytes).hexdigest()
|
||||
|
||||
# Step 1 — upload PSBT
|
||||
await self._send("upload_psbt", [len(psbt_bytes), psbt_sha, psbt_b64])
|
||||
await self._drain(2.0)
|
||||
|
||||
# Step 2 — optional TOTP authorisation
|
||||
if use_totp or totp_code:
|
||||
if not totp_code:
|
||||
totp_code, _remaining, _window = self._client.current_totp()
|
||||
window = int(time.time()) // 30
|
||||
await self._send("auth_offer_guess", [0, window, totp_code])
|
||||
await self._drain(2.0)
|
||||
|
||||
# Step 3 — submit for signing
|
||||
# Args shape (observed in CKBunker 0.9.1):
|
||||
# [psbt_sha, broadcast, finalize, download]
|
||||
await self._send("submit_psbt", [psbt_sha, False, finalize, True])
|
||||
|
||||
# Step 4 — poll for decision
|
||||
deadline = start + timeout_seconds
|
||||
while time.time() < deadline:
|
||||
data = await self._recv(timeout=5.0)
|
||||
if data is None:
|
||||
continue
|
||||
|
||||
# Rejection surfaces via a modal dialog containing "Rejected".
|
||||
if "show_modal" in data and "html" in data:
|
||||
html = data["html"]
|
||||
if "Failed" in html or "Rejected" in html:
|
||||
reason = self._extract_reason(html)
|
||||
return SignResult(
|
||||
status=SignStatus.REJECTED,
|
||||
reason=reason,
|
||||
elapsed_seconds=time.time() - start,
|
||||
raw_frames=list(self._frames),
|
||||
)
|
||||
|
||||
# Success surfaces via a local_download frame.
|
||||
if "local_download" in data:
|
||||
dl = data["local_download"]
|
||||
raw = dl.get("data", "")
|
||||
is_b64 = dl.get("is_b64", False)
|
||||
if finalize:
|
||||
return SignResult(
|
||||
status=SignStatus.SIGNED,
|
||||
signed_hex=raw,
|
||||
elapsed_seconds=time.time() - start,
|
||||
raw_frames=list(self._frames),
|
||||
)
|
||||
decoded = base64.b64decode(raw) if is_b64 else raw.encode()
|
||||
return SignResult(
|
||||
status=SignStatus.SIGNED,
|
||||
signed_bytes=decoded,
|
||||
elapsed_seconds=time.time() - start,
|
||||
raw_frames=list(self._frames),
|
||||
)
|
||||
|
||||
return SignResult(
|
||||
status=SignStatus.TIMEOUT,
|
||||
error=f"no decision within {timeout_seconds}s",
|
||||
elapsed_seconds=time.time() - start,
|
||||
raw_frames=list(self._frames),
|
||||
)
|
||||
except Exception as e:
|
||||
return SignResult(
|
||||
status=SignStatus.WS_ERROR,
|
||||
error=f"{type(e).__name__}: {e}",
|
||||
elapsed_seconds=time.time() - start,
|
||||
raw_frames=list(self._frames),
|
||||
)
|
||||
|
||||
async def sign_message(
|
||||
self,
|
||||
message: str,
|
||||
*,
|
||||
derivation_path: str = "m/84'/0'/0'/1",
|
||||
address_format: str = "segwit",
|
||||
timeout_seconds: float = 20.0,
|
||||
) -> MessageSignResult:
|
||||
"""Sign a text message. Coldcard policy must allow the derivation path.
|
||||
|
||||
Returns (address, signature) on success. CKBunker 0.9.1 surfaces these
|
||||
in a `message_signed` or `show_result` frame depending on version; we
|
||||
accept either.
|
||||
"""
|
||||
start = time.time()
|
||||
try:
|
||||
await self._send(
|
||||
"sign_message",
|
||||
[message, derivation_path, address_format],
|
||||
)
|
||||
deadline = start + timeout_seconds
|
||||
while time.time() < deadline:
|
||||
data = await self._recv(timeout=5.0)
|
||||
if data is None:
|
||||
continue
|
||||
|
||||
if "show_modal" in data and "html" in data:
|
||||
html = data["html"]
|
||||
if "Failed" in html or "Rejected" in html:
|
||||
return MessageSignResult(
|
||||
status=SignStatus.REJECTED,
|
||||
reason=self._extract_reason(html),
|
||||
)
|
||||
|
||||
# Two possible success shapes.
|
||||
if "message_signed" in data:
|
||||
ms = data["message_signed"]
|
||||
return MessageSignResult(
|
||||
status=SignStatus.SIGNED,
|
||||
address=ms.get("address"),
|
||||
signature=ms.get("signature"),
|
||||
)
|
||||
if "local_download" in data:
|
||||
dl = data["local_download"]
|
||||
raw = dl.get("data", "")
|
||||
# The signed message usually comes back as
|
||||
# "<signature>\n<address>\n<message>" on separate lines.
|
||||
parts = raw.strip().splitlines()
|
||||
if len(parts) >= 2:
|
||||
return MessageSignResult(
|
||||
status=SignStatus.SIGNED,
|
||||
signature=parts[0],
|
||||
address=parts[1],
|
||||
)
|
||||
|
||||
return MessageSignResult(
|
||||
status=SignStatus.TIMEOUT,
|
||||
error=f"no signature within {timeout_seconds}s",
|
||||
)
|
||||
except Exception as e:
|
||||
return MessageSignResult(
|
||||
status=SignStatus.WS_ERROR,
|
||||
error=f"{type(e).__name__}: {e}",
|
||||
)
|
||||
|
||||
# -- Helpers ----------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _extract_reason(html: str) -> str:
|
||||
"""Pull a human-readable rejection reason out of a CKBunker modal.
|
||||
|
||||
CKBunker renders rejections as HTML like:
|
||||
<p>Rejected by Coldcard.</p>
|
||||
<p>Rejected: rule #1: need user(s) confirmation, rule #2: ...</p>
|
||||
We keep only the "Rejected: ..." line because that is the verbatim
|
||||
policy decision from the Coldcard.
|
||||
"""
|
||||
m = re.findall(r"Rejected[^<]*", html)
|
||||
if not m:
|
||||
return html[:200]
|
||||
# The policy line is usually the *last* "Rejected:" match.
|
||||
for line in reversed(m):
|
||||
if ":" in line:
|
||||
return line.strip()
|
||||
return m[-1].strip()
|
||||
|
||||
def captured_frames(self) -> list[str]:
|
||||
"""All raw JSON frames received this session — useful for debugging."""
|
||||
return list(self._frames)
|
||||
167
ckbunker_hsm_sign/config.py
Normal file
167
ckbunker_hsm_sign/config.py
Normal file
@ -0,0 +1,167 @@
|
||||
"""Configuration loading.
|
||||
|
||||
Three sources, in precedence order (highest wins):
|
||||
1. CLI flags
|
||||
2. YAML file (if --config path is provided)
|
||||
3. Environment / .env
|
||||
|
||||
Each source is optional. The harness fails with a clear error if something
|
||||
it actually needs is missing at test-run time, not up-front — so running
|
||||
`hsm_validate.py --tests connectivity` works with almost no config.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
yaml = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class PolicyExpectations:
|
||||
auto_approve_per_txn_sats: int = 10_000
|
||||
auto_approve_per_period_sats: int = 50_000
|
||||
user_authorised_per_txn_sats: int = 100_000
|
||||
user_authorised_per_period_sats: int = 500_000
|
||||
velocity_minutes: int = 1440
|
||||
message_signing: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config:
|
||||
url: str = "http://127.0.0.1:9823"
|
||||
cf_client_id: str | None = None
|
||||
cf_client_secret: str | None = None
|
||||
|
||||
totp_secret: str | None = None
|
||||
user: str = "mineracks"
|
||||
message_sign_path: str = "m/84'/0'/0'/1"
|
||||
message_sign_address: str | None = None
|
||||
|
||||
small_psbt_path: str = "fixtures/small.psbt"
|
||||
large_psbt_path: str = "fixtures/large.psbt"
|
||||
|
||||
policy: PolicyExpectations = field(default_factory=PolicyExpectations)
|
||||
|
||||
tests: dict[str, bool] = field(
|
||||
default_factory=lambda: {
|
||||
"connectivity": True,
|
||||
"message_signing": True,
|
||||
"rule2_auto_approve": True,
|
||||
"rule1_without_totp_rejects": True,
|
||||
"rule1_with_totp_signs": True,
|
||||
"counters_tracked": True,
|
||||
}
|
||||
)
|
||||
|
||||
verbose: bool = False
|
||||
save_signed_dir: str | None = None
|
||||
|
||||
|
||||
def _load_dotenv(path: Path) -> dict[str, str]:
|
||||
"""Tiny .env parser — we don't want python-dotenv as a dependency."""
|
||||
out: dict[str, str] = {}
|
||||
if not path.exists():
|
||||
return out
|
||||
for line in path.read_text().splitlines():
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#") or "=" not in line:
|
||||
continue
|
||||
k, v = line.split("=", 1)
|
||||
v = v.strip().strip('"').strip("'")
|
||||
out[k.strip()] = v
|
||||
return out
|
||||
|
||||
|
||||
def _apply_env(cfg: Config, env: dict[str, str]) -> None:
|
||||
def get(k: str, default: Any = None) -> Any:
|
||||
return env.get(k, os.environ.get(k, default))
|
||||
|
||||
cfg.url = get("CKBUNKER_URL", cfg.url)
|
||||
cfg.cf_client_id = get("CF_ACCESS_CLIENT_ID", cfg.cf_client_id) or None
|
||||
cfg.cf_client_secret = get("CF_ACCESS_CLIENT_SECRET", cfg.cf_client_secret) or None
|
||||
cfg.totp_secret = get("TOTP_SECRET", cfg.totp_secret) or None
|
||||
cfg.user = get("HSM_USER", cfg.user)
|
||||
cfg.message_sign_path = get("MESSAGE_SIGN_PATH", cfg.message_sign_path)
|
||||
cfg.message_sign_address = get("MESSAGE_SIGN_ADDRESS", cfg.message_sign_address) or None
|
||||
cfg.small_psbt_path = get("SMALL_PSBT_PATH", cfg.small_psbt_path)
|
||||
cfg.large_psbt_path = get("LARGE_PSBT_PATH", cfg.large_psbt_path)
|
||||
|
||||
|
||||
def _apply_yaml(cfg: Config, data: dict) -> None:
|
||||
if not data:
|
||||
return
|
||||
bunker = data.get("ckbunker", {})
|
||||
cfg.url = bunker.get("url", cfg.url)
|
||||
cfg.cf_client_id = bunker.get("cf_access_client_id", cfg.cf_client_id)
|
||||
cfg.cf_client_secret = bunker.get("cf_access_client_secret", cfg.cf_client_secret)
|
||||
|
||||
hsm = data.get("hsm", {})
|
||||
cfg.user = hsm.get("user", cfg.user)
|
||||
cfg.message_sign_path = hsm.get("message_sign_path", cfg.message_sign_path)
|
||||
cfg.message_sign_address = hsm.get("message_sign_address", cfg.message_sign_address)
|
||||
|
||||
pol = data.get("policy", {}) or {}
|
||||
aa = pol.get("auto_approve", {}) or {}
|
||||
ua = pol.get("user_authorised", {}) or {}
|
||||
cfg.policy.auto_approve_per_txn_sats = aa.get(
|
||||
"per_txn_sats", cfg.policy.auto_approve_per_txn_sats
|
||||
)
|
||||
cfg.policy.auto_approve_per_period_sats = aa.get(
|
||||
"per_period_sats", cfg.policy.auto_approve_per_period_sats
|
||||
)
|
||||
cfg.policy.user_authorised_per_txn_sats = ua.get(
|
||||
"per_txn_sats", cfg.policy.user_authorised_per_txn_sats
|
||||
)
|
||||
cfg.policy.user_authorised_per_period_sats = ua.get(
|
||||
"per_period_sats", cfg.policy.user_authorised_per_period_sats
|
||||
)
|
||||
cfg.policy.velocity_minutes = pol.get("velocity_minutes", cfg.policy.velocity_minutes)
|
||||
cfg.policy.message_signing = pol.get("message_signing", cfg.policy.message_signing)
|
||||
|
||||
fx = data.get("fixtures", {}) or {}
|
||||
cfg.small_psbt_path = fx.get("small_psbt", cfg.small_psbt_path)
|
||||
cfg.large_psbt_path = fx.get("large_psbt", cfg.large_psbt_path)
|
||||
|
||||
tests = data.get("tests", {}) or {}
|
||||
for k, v in tests.items():
|
||||
if k in cfg.tests:
|
||||
cfg.tests[k] = bool(v)
|
||||
|
||||
out = data.get("output", {}) or {}
|
||||
cfg.verbose = bool(out.get("verbose", cfg.verbose))
|
||||
cfg.save_signed_dir = out.get("save_signed_dir", cfg.save_signed_dir)
|
||||
|
||||
|
||||
def load_config(
|
||||
*,
|
||||
yaml_path: Path | None = None,
|
||||
dotenv_path: Path | None = Path(".env"),
|
||||
overrides: dict[str, Any] | None = None,
|
||||
) -> Config:
|
||||
cfg = Config()
|
||||
|
||||
env = _load_dotenv(dotenv_path) if dotenv_path else {}
|
||||
_apply_env(cfg, env)
|
||||
|
||||
if yaml_path:
|
||||
if yaml is None:
|
||||
raise SystemExit("PyYAML required to read --config. pip install PyYAML")
|
||||
with open(yaml_path) as f:
|
||||
data = yaml.safe_load(f) or {}
|
||||
_apply_yaml(cfg, data)
|
||||
|
||||
if overrides:
|
||||
for k, v in overrides.items():
|
||||
if v is None:
|
||||
continue
|
||||
if hasattr(cfg, k):
|
||||
setattr(cfg, k, v)
|
||||
|
||||
return cfg
|
||||
354
ckbunker_hsm_sign/harness.py
Normal file
354
ckbunker_hsm_sign/harness.py
Normal file
@ -0,0 +1,354 @@
|
||||
"""Test harness — runs the validation sequence and reports results.
|
||||
|
||||
Each test method returns a TestOutcome. The harness tallies them and exits
|
||||
non-zero if anything failed, so it slots into CI / cron monitors without
|
||||
extra wiring.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import enum
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
from .client import Client, SignStatus
|
||||
from .config import Config
|
||||
from .scraper import fetch_counters, DashboardCounters
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Outcome
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class Verdict(enum.Enum):
|
||||
PASS = "pass"
|
||||
FAIL = "fail"
|
||||
SKIP = "skip"
|
||||
|
||||
|
||||
@dataclass
|
||||
class TestOutcome:
|
||||
name: str
|
||||
verdict: Verdict
|
||||
detail: str = ""
|
||||
elapsed_seconds: float = 0.0
|
||||
subpoints: list[str] = field(default_factory=list)
|
||||
|
||||
def print(self) -> None:
|
||||
icon = {"pass": "✓", "fail": "✗", "skip": "·"}[self.verdict.value]
|
||||
colour = {"pass": "\033[32m", "fail": "\033[31m", "skip": "\033[90m"}[self.verdict.value]
|
||||
reset = "\033[0m"
|
||||
line = f"{colour}{icon} {self.name:<42}{reset} {self.detail}"
|
||||
if self.elapsed_seconds:
|
||||
line += f" \033[90m({self.elapsed_seconds:.1f}s)\033[0m"
|
||||
print(line)
|
||||
for sub in self.subpoints:
|
||||
print(f" {sub}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Harness
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class Harness:
|
||||
def __init__(self, config: Config) -> None:
|
||||
self.cfg = config
|
||||
self.client = Client(
|
||||
base_url=config.url,
|
||||
cf_access_client_id=config.cf_client_id,
|
||||
cf_access_client_secret=config.cf_client_secret,
|
||||
totp_secret=config.totp_secret,
|
||||
user=config.user,
|
||||
verbose=config.verbose,
|
||||
)
|
||||
|
||||
# -- Driver ---------------------------------------------------------
|
||||
|
||||
def run_all(self) -> list[TestOutcome]:
|
||||
"""Run enabled tests in the defined order, return outcomes."""
|
||||
outcomes: list[TestOutcome] = []
|
||||
|
||||
def run(name: str, fn: Callable[[], TestOutcome]) -> None:
|
||||
if not self.cfg.tests.get(name, True):
|
||||
out = TestOutcome(name=name, verdict=Verdict.SKIP, detail="(disabled in config)")
|
||||
outcomes.append(out)
|
||||
out.print()
|
||||
return
|
||||
try:
|
||||
out = fn()
|
||||
except Exception as e:
|
||||
out = TestOutcome(
|
||||
name=name, verdict=Verdict.FAIL,
|
||||
detail=f"unexpected: {type(e).__name__}: {e}",
|
||||
)
|
||||
outcomes.append(out)
|
||||
out.print()
|
||||
|
||||
print(f"\nTarget: {self.cfg.url}")
|
||||
print(f"User: {self.cfg.user}")
|
||||
print(f"Policy: ≤{self.cfg.policy.auto_approve_per_txn_sats} sats auto, "
|
||||
f"≤{self.cfg.policy.user_authorised_per_txn_sats} sats with TOTP")
|
||||
print("─" * 72)
|
||||
|
||||
run("connectivity", self.t_connectivity)
|
||||
run("message_signing", lambda: asyncio.run(self.t_message_signing()))
|
||||
counters_before = self._snapshot_counters()
|
||||
run("rule2_auto_approve", lambda: asyncio.run(self.t_rule2_auto_approve()))
|
||||
run("rule1_without_totp_rejects", lambda: asyncio.run(self.t_rule1_without_totp_rejects()))
|
||||
run("rule1_with_totp_signs", lambda: asyncio.run(self.t_rule1_with_totp_signs()))
|
||||
counters_after = self._snapshot_counters()
|
||||
run("counters_tracked",
|
||||
lambda: self.t_counters_tracked(counters_before, counters_after))
|
||||
|
||||
print("─" * 72)
|
||||
passed = sum(1 for o in outcomes if o.verdict == Verdict.PASS)
|
||||
failed = sum(1 for o in outcomes if o.verdict == Verdict.FAIL)
|
||||
skipped = sum(1 for o in outcomes if o.verdict == Verdict.SKIP)
|
||||
print(f"\n {passed} passed, {failed} failed, {skipped} skipped\n")
|
||||
return outcomes
|
||||
|
||||
# -- Individual tests ------------------------------------------------
|
||||
|
||||
def t_connectivity(self) -> TestOutcome:
|
||||
"""TCP + HTTP + WS reachable with any configured credentials."""
|
||||
start = time.time()
|
||||
try:
|
||||
ws_url, cookies = self.client.fetch_ws_endpoint()
|
||||
except Exception as e:
|
||||
return TestOutcome(
|
||||
name="connectivity", verdict=Verdict.FAIL,
|
||||
detail=f"HTTP fetch failed: {e}",
|
||||
elapsed_seconds=time.time() - start,
|
||||
)
|
||||
sub = [f"WebSocket URL: {ws_url}",
|
||||
f"Session cookies: {'yes' if cookies else 'none — auth may fail'}"]
|
||||
return TestOutcome(
|
||||
name="connectivity", verdict=Verdict.PASS,
|
||||
detail="HTTP + WS endpoint reachable",
|
||||
elapsed_seconds=time.time() - start, subpoints=sub,
|
||||
)
|
||||
|
||||
async def t_message_signing(self) -> TestOutcome:
|
||||
"""Coldcard can sign an arbitrary message via the policy's allowed path.
|
||||
|
||||
This is the cheapest end-to-end proof that the VM can reach the
|
||||
Coldcard and the Coldcard is willing to sign on the declared path.
|
||||
"""
|
||||
if not self.cfg.policy.message_signing:
|
||||
return TestOutcome(
|
||||
name="message_signing", verdict=Verdict.SKIP,
|
||||
detail="(policy.message_signing = false)",
|
||||
)
|
||||
start = time.time()
|
||||
async with self.client.session() as session:
|
||||
res = await session.sign_message(
|
||||
message="hsm-validate sanity test",
|
||||
derivation_path=self.cfg.message_sign_path,
|
||||
)
|
||||
elapsed = time.time() - start
|
||||
if res.status == SignStatus.SIGNED and res.signature:
|
||||
sub = [f"Address: {res.address}",
|
||||
f"Signature: {res.signature[:48]}..."]
|
||||
return TestOutcome(
|
||||
name="message_signing", verdict=Verdict.PASS,
|
||||
detail="signed via Coldcard", elapsed_seconds=elapsed, subpoints=sub,
|
||||
)
|
||||
return TestOutcome(
|
||||
name="message_signing", verdict=Verdict.FAIL,
|
||||
detail=f"status={res.status.value} reason={res.reason or res.error}",
|
||||
elapsed_seconds=elapsed,
|
||||
)
|
||||
|
||||
async def t_rule2_auto_approve(self) -> TestOutcome:
|
||||
"""Sub-threshold PSBT should sign WITHOUT any user auth."""
|
||||
start = time.time()
|
||||
path = Path(self.cfg.small_psbt_path)
|
||||
if not path.exists():
|
||||
return TestOutcome(
|
||||
name="rule2_auto_approve", verdict=Verdict.SKIP,
|
||||
detail=f"fixture not found: {path} (see fixtures/README.md)",
|
||||
)
|
||||
psbt = _read_psbt(path)
|
||||
async with self.client.session() as session:
|
||||
res = await session.sign_psbt(psbt, use_totp=False)
|
||||
elapsed = time.time() - start
|
||||
if res.ok():
|
||||
self._save_signed("rule2_auto_approve", res.signed_bytes)
|
||||
return TestOutcome(
|
||||
name="rule2_auto_approve", verdict=Verdict.PASS,
|
||||
detail=f"signed without TOTP ({len(res.signed_bytes or b'')} bytes)",
|
||||
elapsed_seconds=elapsed,
|
||||
)
|
||||
return TestOutcome(
|
||||
name="rule2_auto_approve", verdict=Verdict.FAIL,
|
||||
detail=f"expected SIGNED, got {res.status.value}: {res.reason or res.error}",
|
||||
elapsed_seconds=elapsed,
|
||||
)
|
||||
|
||||
async def t_rule1_without_totp_rejects(self) -> TestOutcome:
|
||||
"""Above-threshold PSBT without TOTP must be rejected by the Coldcard.
|
||||
|
||||
This is the single most important assertion in the harness: it
|
||||
confirms policy is active and the user-auth rule is enforced.
|
||||
"""
|
||||
start = time.time()
|
||||
path = Path(self.cfg.large_psbt_path)
|
||||
if not path.exists():
|
||||
return TestOutcome(
|
||||
name="rule1_without_totp_rejects", verdict=Verdict.SKIP,
|
||||
detail=f"fixture not found: {path}",
|
||||
)
|
||||
psbt = _read_psbt(path)
|
||||
async with self.client.session() as session:
|
||||
res = await session.sign_psbt(psbt, use_totp=False)
|
||||
elapsed = time.time() - start
|
||||
if res.is_expected_rejection("rule #1"):
|
||||
return TestOutcome(
|
||||
name="rule1_without_totp_rejects", verdict=Verdict.PASS,
|
||||
detail=f"rejected as expected — {res.reason}",
|
||||
elapsed_seconds=elapsed,
|
||||
)
|
||||
if res.ok():
|
||||
return TestOutcome(
|
||||
name="rule1_without_totp_rejects", verdict=Verdict.FAIL,
|
||||
detail="policy NOT enforced: large PSBT was signed without TOTP — "
|
||||
"STOP AND INVESTIGATE",
|
||||
elapsed_seconds=elapsed,
|
||||
)
|
||||
return TestOutcome(
|
||||
name="rule1_without_totp_rejects", verdict=Verdict.FAIL,
|
||||
detail=f"unexpected outcome {res.status.value}: {res.reason or res.error}",
|
||||
elapsed_seconds=elapsed,
|
||||
)
|
||||
|
||||
async def t_rule1_with_totp_signs(self) -> TestOutcome:
|
||||
"""Above-threshold PSBT WITH a fresh TOTP code should sign."""
|
||||
start = time.time()
|
||||
if not self.cfg.totp_secret:
|
||||
return TestOutcome(
|
||||
name="rule1_with_totp_signs", verdict=Verdict.SKIP,
|
||||
detail="TOTP_SECRET not configured",
|
||||
)
|
||||
path = Path(self.cfg.large_psbt_path)
|
||||
if not path.exists():
|
||||
return TestOutcome(
|
||||
name="rule1_with_totp_signs", verdict=Verdict.SKIP,
|
||||
detail=f"fixture not found: {path}",
|
||||
)
|
||||
psbt = _read_psbt(path)
|
||||
async with self.client.session() as session:
|
||||
res = await session.sign_psbt(psbt, use_totp=True)
|
||||
elapsed = time.time() - start
|
||||
if res.ok():
|
||||
self._save_signed("rule1_with_totp", res.signed_bytes)
|
||||
return TestOutcome(
|
||||
name="rule1_with_totp_signs", verdict=Verdict.PASS,
|
||||
detail=f"signed with TOTP ({len(res.signed_bytes or b'')} bytes)",
|
||||
elapsed_seconds=elapsed,
|
||||
)
|
||||
return TestOutcome(
|
||||
name="rule1_with_totp_signs", verdict=Verdict.FAIL,
|
||||
detail=f"expected SIGNED, got {res.status.value}: {res.reason or res.error}",
|
||||
elapsed_seconds=elapsed,
|
||||
)
|
||||
|
||||
def t_counters_tracked(
|
||||
self,
|
||||
before: DashboardCounters | None,
|
||||
after: DashboardCounters | None,
|
||||
) -> TestOutcome:
|
||||
"""The server-visible counters should reflect the signings we did."""
|
||||
if before is None or after is None:
|
||||
return TestOutcome(
|
||||
name="counters_tracked", verdict=Verdict.SKIP,
|
||||
detail="could not scrape dashboard — this CKBunker version may "
|
||||
"render counters differently",
|
||||
)
|
||||
approvals_delta = _delta(before.approvals, after.approvals)
|
||||
refusals_delta = _delta(before.refusals, after.refusals)
|
||||
sub = [
|
||||
f"Approvals: {before.approvals} → {after.approvals} (Δ{approvals_delta})",
|
||||
f"Refusals: {before.refusals} → {after.refusals} (Δ{refusals_delta})",
|
||||
]
|
||||
if before.amount_spent_btc is not None and after.amount_spent_btc is not None:
|
||||
sub.append(
|
||||
f"Amount spent: {before.amount_spent_btc} → {after.amount_spent_btc} BTC"
|
||||
)
|
||||
|
||||
# We expect at least 1 approval (the small PSBT) and at least 1 refusal
|
||||
# (the large-without-TOTP) to have moved, when those tests ran.
|
||||
expected_approvals = int(self.cfg.tests.get("rule2_auto_approve", True)) \
|
||||
+ int(self.cfg.tests.get("rule1_with_totp_signs", True))
|
||||
expected_refusals = int(self.cfg.tests.get("rule1_without_totp_rejects", True))
|
||||
|
||||
if approvals_delta is None or refusals_delta is None:
|
||||
return TestOutcome(
|
||||
name="counters_tracked", verdict=Verdict.SKIP,
|
||||
detail="counter values missing", subpoints=sub,
|
||||
)
|
||||
if approvals_delta >= expected_approvals and refusals_delta >= expected_refusals:
|
||||
return TestOutcome(
|
||||
name="counters_tracked", verdict=Verdict.PASS,
|
||||
detail="dashboard counters moved as expected", subpoints=sub,
|
||||
)
|
||||
return TestOutcome(
|
||||
name="counters_tracked", verdict=Verdict.FAIL,
|
||||
detail=(f"expected ≥{expected_approvals} approvals and "
|
||||
f"≥{expected_refusals} refusals, saw "
|
||||
f"Δ{approvals_delta}/Δ{refusals_delta}"),
|
||||
subpoints=sub,
|
||||
)
|
||||
|
||||
# -- Helpers --------------------------------------------------------
|
||||
|
||||
def _snapshot_counters(self) -> DashboardCounters | None:
|
||||
try:
|
||||
return fetch_counters(
|
||||
self.cfg.url,
|
||||
cf_client_id=self.cfg.cf_client_id,
|
||||
cf_client_secret=self.cfg.cf_client_secret,
|
||||
)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _save_signed(self, label: str, data: bytes | None) -> None:
|
||||
if not self.cfg.save_signed_dir or not data:
|
||||
return
|
||||
d = Path(self.cfg.save_signed_dir)
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
(d / f"{label}.psbt").write_bytes(data)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _delta(before: int | None, after: int | None) -> int | None:
|
||||
if before is None or after is None:
|
||||
return None
|
||||
return after - before
|
||||
|
||||
|
||||
def _read_psbt(path: Path) -> bytes:
|
||||
"""Load a PSBT in any common encoding (binary, base64, hex)."""
|
||||
raw = path.read_bytes()
|
||||
if raw[:5] == b"psbt\xff":
|
||||
return raw
|
||||
import base64
|
||||
try:
|
||||
decoded = base64.b64decode(raw.strip())
|
||||
if decoded[:5] == b"psbt\xff":
|
||||
return decoded
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
decoded = bytes.fromhex(raw.strip().decode("ascii"))
|
||||
if decoded[:5] == b"psbt\xff":
|
||||
return decoded
|
||||
except Exception:
|
||||
pass
|
||||
raise SystemExit(f"{path} does not contain a valid PSBT")
|
||||
89
ckbunker_hsm_sign/scraper.py
Normal file
89
ckbunker_hsm_sign/scraper.py
Normal file
@ -0,0 +1,89 @@
|
||||
"""Dashboard scraper.
|
||||
|
||||
The harness wants to verify that *the server-visible counters moved* after
|
||||
each test — a sanity check against "signer returned a PSBT but the server
|
||||
didn't actually account for it". CKBunker renders these counters into the
|
||||
top of every page, so we just do a regex pass over the HTML.
|
||||
|
||||
This is intentionally tolerant: CKBunker versions vary slightly in the
|
||||
markup. If we can't find a value we return None, and the counter assertions
|
||||
in the harness treat that as a soft skip rather than a hard fail.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
@dataclass
|
||||
class DashboardCounters:
|
||||
approvals: int | None
|
||||
refusals: int | None
|
||||
amount_spent_btc: float | None
|
||||
period_ends: str | None
|
||||
|
||||
|
||||
def fetch_counters(
|
||||
base_url: str,
|
||||
*,
|
||||
cf_client_id: str | None = None,
|
||||
cf_client_secret: str | None = None,
|
||||
timeout: float = 15.0,
|
||||
) -> DashboardCounters:
|
||||
headers: dict[str, str] = {}
|
||||
if cf_client_id:
|
||||
headers["CF-Access-Client-Id"] = cf_client_id
|
||||
if cf_client_secret:
|
||||
headers["CF-Access-Client-Secret"] = cf_client_secret
|
||||
|
||||
resp = requests.get(base_url.rstrip("/") + "/", headers=headers, timeout=timeout)
|
||||
resp.raise_for_status()
|
||||
html = resp.text
|
||||
|
||||
return DashboardCounters(
|
||||
approvals=_pluck_int(html, ["Approvals"]),
|
||||
refusals=_pluck_int(html, ["Refusals"]),
|
||||
amount_spent_btc=_pluck_btc(html, ["Amount Spent"]),
|
||||
period_ends=_pluck_text(html, ["Period Ends"]),
|
||||
)
|
||||
|
||||
|
||||
def _pluck_int(html: str, labels: list[str]) -> int | None:
|
||||
# Matches either:
|
||||
# <th>Approvals</th> ... <td>2</td>
|
||||
# <div>Approvals</div><div class="...">2</div>
|
||||
# keeps a small search window after each label.
|
||||
for label in labels:
|
||||
m = re.search(rf"{re.escape(label)}.{{0,500}}?>\s*(\d+)\s*<", html, re.S)
|
||||
if m:
|
||||
try:
|
||||
return int(m.group(1))
|
||||
except ValueError:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def _pluck_btc(html: str, labels: list[str]) -> float | None:
|
||||
for label in labels:
|
||||
m = re.search(
|
||||
rf"{re.escape(label)}.{{0,500}}?>\s*([0-9]+\.[0-9]+)\s*BTC",
|
||||
html,
|
||||
re.S,
|
||||
)
|
||||
if m:
|
||||
try:
|
||||
return float(m.group(1))
|
||||
except ValueError:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def _pluck_text(html: str, labels: list[str]) -> str | None:
|
||||
for label in labels:
|
||||
m = re.search(rf"{re.escape(label)}.{{0,500}}?>\s*([^<\s][^<]{{0,40}}?)\s*<", html, re.S)
|
||||
if m:
|
||||
return m.group(1).strip()
|
||||
return None
|
||||
55
config.example.yaml
Normal file
55
config.example.yaml
Normal file
@ -0,0 +1,55 @@
|
||||
# Example harness configuration.
|
||||
# Copy to config.yaml and edit for your deployment, or use environment
|
||||
# variables (.env) — CLI flags take precedence over YAML, YAML over env.
|
||||
#
|
||||
# The harness reads this to know what outcomes to ASSERT against your
|
||||
# policy. If your policy shape differs from what's described here, edit
|
||||
# these expectations rather than forcing your policy to fit the defaults.
|
||||
|
||||
ckbunker:
|
||||
url: http://100.80.63.14:9823
|
||||
# Only needed if ckbunker.url is behind Cloudflare Access. Leave null for
|
||||
# direct Tailscale access.
|
||||
cf_access_client_id: null
|
||||
cf_access_client_secret: null
|
||||
|
||||
hsm:
|
||||
user: mineracks # user declared in your Coldcard policy
|
||||
message_sign_path: "m/84'/0'/0'/1"
|
||||
|
||||
# What the harness should ASSERT about the installed policy. These must
|
||||
# MATCH the policy actually loaded onto the Coldcard, or tests will fail.
|
||||
policy:
|
||||
# Rule #2 equivalent: auto-approve (no user auth).
|
||||
auto_approve:
|
||||
per_txn_sats: 10000 # 0.0001 BTC
|
||||
per_period_sats: 50000 # 0.0005 BTC
|
||||
|
||||
# Rule #1 equivalent: requires TOTP from `hsm.user`.
|
||||
user_authorised:
|
||||
per_txn_sats: 100000 # 0.001 BTC
|
||||
per_period_sats: 500000 # 0.005 BTC
|
||||
|
||||
velocity_minutes: 1440 # 24 hours
|
||||
|
||||
message_signing: true # set false if your policy blocks it
|
||||
|
||||
# Paths to pre-crafted test PSBTs (see fixtures/README.md).
|
||||
fixtures:
|
||||
small_psbt: fixtures/small.psbt # value <= auto_approve.per_txn_sats
|
||||
large_psbt: fixtures/large.psbt # value > auto_approve.per_txn_sats
|
||||
# and <= user_authorised.per_txn_sats
|
||||
|
||||
# Test selection. Set false to skip a test that doesn't apply.
|
||||
tests:
|
||||
connectivity: true
|
||||
message_signing: true
|
||||
rule2_auto_approve: true
|
||||
rule1_without_totp_rejects: true
|
||||
rule1_with_totp_signs: true
|
||||
counters_tracked: true
|
||||
|
||||
# Output control.
|
||||
output:
|
||||
verbose: false # dump every WebSocket frame
|
||||
save_signed_dir: null # set a path to keep the signed PSBTs
|
||||
127
docs/POLICY_RECOMMENDATIONS.md
Normal file
127
docs/POLICY_RECOMMENDATIONS.md
Normal file
@ -0,0 +1,127 @@
|
||||
# Policy design recommendations
|
||||
|
||||
Not exhaustive — consult the upstream
|
||||
[Coldcard HSM docs](https://coldcardwallet.com/docs/ckbunker-hsm) for the
|
||||
full grammar. This file captures the two-tier pattern the harness is
|
||||
designed around and why it's a reasonable starting point for a signing
|
||||
HSM that backs automation.
|
||||
|
||||
## The two-tier pattern
|
||||
|
||||
```
|
||||
Rule #2 (auto-sign, no user auth)
|
||||
per-txn ≤ X sats
|
||||
period ≤ N × X sats (N ≈ 5, so a handful of small sends per window)
|
||||
|
||||
Rule #1 (user-auth via TOTP)
|
||||
per-txn ≤ Y sats (Y ≫ X, but still a small fraction of custody)
|
||||
period ≤ M × Y sats
|
||||
|
||||
(implicit) Rule #3: anything else is rejected — on-device keypad/MicroSD
|
||||
required to authorise.
|
||||
```
|
||||
|
||||
### Why two tiers
|
||||
|
||||
- **Single-tier "always require TOTP"** makes the HSM useless for
|
||||
automation: every BTCPay callback, every n8n webhook, every monitoring
|
||||
script wakes a human.
|
||||
- **Single-tier "always auto-sign"** is indistinguishable from a hot
|
||||
wallet with extra steps.
|
||||
- Two tiers let routine small sends go through un-touched while keeping
|
||||
human-in-the-loop pressure on anything larger.
|
||||
|
||||
### Picking X (auto-approve cap)
|
||||
|
||||
Rule of thumb: the **most expensive single automated action** you're
|
||||
comfortable with happening unattended. Examples:
|
||||
|
||||
| Automation | Sensible X (sats) |
|
||||
|----------------------------------------|-------------------|
|
||||
| Lightning channel rebalance | 50,000 – 200,000 |
|
||||
| BTCPay invoice settlement | 10,000 – 50,000 |
|
||||
| Routine small withdrawals (newsletter) | 5,000 – 20,000 |
|
||||
| Dev test sends | 1,000 – 5,000 |
|
||||
|
||||
Pick **the smallest X** that covers your routine traffic. Anything
|
||||
larger is a Rule #1 event — worth waking the TOTP holder for.
|
||||
|
||||
### Picking N (period multiplier)
|
||||
|
||||
- Too low (N=1): first sign empties the period budget, second sign fails
|
||||
even though it's within per-txn cap.
|
||||
- Too high (N≥10): an attacker who steals the VM can drain the budget
|
||||
faster than a human will notice.
|
||||
- Reasonable: N = 3 to 5. Combined with a 24 h velocity window, this
|
||||
caps the *catastrophic* loss from a VM compromise at ~5×X per day.
|
||||
|
||||
### Picking Y (user-auth cap)
|
||||
|
||||
A hard ceiling on what TOTP alone can authorise. For custody above Y,
|
||||
the only path is keypad + MicroSD — physical presence at the device.
|
||||
|
||||
Common shapes:
|
||||
|
||||
- **Operational float** wallet: Y = 10×X. Big enough to cover a busy
|
||||
day; small enough that losing the TOTP secret isn't an existential
|
||||
problem.
|
||||
- **Hot reserve**: Y = 0 (no Rule #1). Forces all non-routine sends
|
||||
through physical presence.
|
||||
|
||||
## Velocity period
|
||||
|
||||
The Coldcard resets counters after `velocity_minutes` of wall-clock.
|
||||
1440 (24 h) is the standard choice. Shorter windows (60–240 min) make
|
||||
the HSM safer during active use but noisier during quiet periods
|
||||
(routine sends hit the reset mid-day). Longer windows (> 24 h) make a
|
||||
compromise more painful to recover from (stolen budget persists).
|
||||
|
||||
## Message signing
|
||||
|
||||
Useful for:
|
||||
|
||||
- proving control of an address to auditors / regulators
|
||||
- proof-of-reserves (signed message with timestamp)
|
||||
- sanity-checking Coldcard reachability (the harness's
|
||||
`message_signing` test)
|
||||
|
||||
Usually safe to enable on any path — message signing doesn't spend
|
||||
funds. If you need to restrict it, the policy supports a BIP32 path
|
||||
regex.
|
||||
|
||||
## Boot-to-HSM
|
||||
|
||||
**Always enable** for production. Without it, anyone with physical
|
||||
access to the device (and the PIN) can navigate out of HSM mode by
|
||||
tapping the menu.
|
||||
|
||||
**Always set a 6-digit escape code** — writing down a "cannot escape HSM"
|
||||
device is terrifying and operationally wrong (you will need to enrol new
|
||||
users, update policy, etc.). The escape code must be typed within 60
|
||||
seconds of Coldcard boot, which is a reasonable safety margin.
|
||||
|
||||
**Record the escape code in a separate place from the seed backup.** A
|
||||
password manager on the TOTP holder's phone is fine; not the same piece
|
||||
of paper as the seed words.
|
||||
|
||||
## Logging
|
||||
|
||||
- **MicroSD logging ON** — on-device audit trail that survives VM
|
||||
compromise. Keeps a tamper-evident record even if the VM is tampered
|
||||
with. Costs: you must physically eject the MicroSD to review it.
|
||||
- **Fail-if-cant-log OFF** — otherwise a MicroSD hiccup halts signing.
|
||||
Default is fine.
|
||||
|
||||
## Storage locker read count
|
||||
|
||||
CKBunker encrypts its local state with a key held in the Coldcard's
|
||||
Storage Locker. The Locker has a **read counter** — typical policies
|
||||
allow 13 reads before the Locker self-wipes. This means:
|
||||
|
||||
- CKBunker can restart up to 13 times before you need to re-install the
|
||||
policy.
|
||||
- Heavy debugging (restarting CKBunker to try things) burns reads fast.
|
||||
- After policy reinstall, the counter resets.
|
||||
|
||||
Monitor restart frequency. If you find yourself restarting CKBunker
|
||||
often, investigate *why* rather than spending Locker reads.
|
||||
185
docs/PROTOCOL.md
Normal file
185
docs/PROTOCOL.md
Normal file
@ -0,0 +1,185 @@
|
||||
# CKBunker WebSocket protocol
|
||||
|
||||
**Target version**: CKBunker `v0.9.1` (commit `8526755`, 2024-08-06).
|
||||
This document is reverse-engineered from the running server + its Vue.js
|
||||
front-end. There is no formal protocol spec upstream — if a newer CKBunker
|
||||
release changes shapes, the client in [`client.py`](../ckbunker_hsm_sign/client.py)
|
||||
is where you'll need to adapt.
|
||||
|
||||
## Connection setup
|
||||
|
||||
1. **HTTP GET `/`** — pick up the aiohttp session cookie and the WebSocket
|
||||
URL. The Vue template embeds the URL as `/websocket/<TOKEN>` — the
|
||||
client's `_extract_ws_url` greps for that pattern (plus two fallbacks
|
||||
for older spellings).
|
||||
2. **WebSocket connect** to that URL with the session cookie in `Cookie:`.
|
||||
Without the cookie the server may accept the upgrade but ignore the
|
||||
first action — symptom is a client that hangs forever on `_connected`.
|
||||
3. Optional Cloudflare Access headers (`CF-Access-Client-Id`,
|
||||
`CF-Access-Client-Secret`) if the CKBunker is behind CF Access.
|
||||
|
||||
> **Cloudflare Access + WebSocket**: in practice CF Access with *service
|
||||
> tokens* is unreliable on the WS upgrade. For automation, use a direct
|
||||
> private ingress (Tailscale, WireGuard, VPN) rather than the CF-fronted
|
||||
> hostname.
|
||||
|
||||
## Frame format
|
||||
|
||||
All frames are JSON objects. Client → server frames have the shape:
|
||||
|
||||
```json
|
||||
{"action": "<action_name>", "args": [...]}
|
||||
```
|
||||
|
||||
Server → client frames have no `action` key; they carry one or more
|
||||
UI-update fields that the Vue app consumes:
|
||||
|
||||
| Server field | Meaning |
|
||||
|--------------------|--------------------------------------------------------|
|
||||
| `vue_app_cb` | "Vue app callback" — UI state refresh (counters, etc.) |
|
||||
| `show_modal` | Render a modal dialog; its `html` field carries body |
|
||||
| `local_download` | Hand the browser a file; used to return signed PSBTs |
|
||||
| `message_signed` | (some versions) Returned by `sign_message` |
|
||||
|
||||
## Action catalogue
|
||||
|
||||
### `_connected`
|
||||
|
||||
Sent once immediately after the WebSocket upgrade. Tells the server which
|
||||
page the client is "on", so it can push the right `vue_app_cb` refreshes.
|
||||
|
||||
```json
|
||||
{"action": "_connected", "args": ["/"]}
|
||||
```
|
||||
|
||||
The server replies with one or more `vue_app_cb` frames describing the
|
||||
current HSM status (approvals, refusals, amount spent, period ends).
|
||||
|
||||
### `upload_psbt`
|
||||
|
||||
Uploads a PSBT into the server's working slot. The PSBT is base64 and
|
||||
must match the declared SHA-256 — the server rejects mismatches.
|
||||
|
||||
```json
|
||||
{"action": "upload_psbt", "args": [<size_bytes>, "<sha256_hex>", "<base64_psbt>"]}
|
||||
```
|
||||
|
||||
Response: a `vue_app_cb` confirming the slot is populated and the
|
||||
preview fields are rendered. No positive acknowledgement besides the UI
|
||||
update.
|
||||
|
||||
### `auth_offer_guess`
|
||||
|
||||
Offers a TOTP code for the currently-loaded PSBT. The three args are
|
||||
`(slot_index, time_window_counter, code_string)`:
|
||||
|
||||
```json
|
||||
{"action": "auth_offer_guess", "args": [0, 1712962374, "579322"]}
|
||||
```
|
||||
|
||||
- `slot_index=0` — CKBunker supports multiple auth slots for multi-user
|
||||
policies; we only use one.
|
||||
- `time_window_counter` — `int(time.time()) // 30`. This lets the server
|
||||
tolerate small clock skew without re-running TOTP for every skewed code.
|
||||
- `code_string` — the 6-digit code generated from the shared secret.
|
||||
|
||||
Response: usually silent if accepted; on rejection the server holds the
|
||||
code in its internal state and only surfaces "bad code" once you try
|
||||
`submit_psbt`.
|
||||
|
||||
### `submit_psbt`
|
||||
|
||||
Commits to signing. The server hands the PSBT to the Coldcard for
|
||||
evaluation.
|
||||
|
||||
```json
|
||||
{"action": "submit_psbt", "args": ["<sha256>", <broadcast>, <finalize>, <download>]}
|
||||
```
|
||||
|
||||
- `<sha256>` — must match the previously-uploaded PSBT.
|
||||
- `<broadcast>` (bool) — have the server push the signed tx to a node. We
|
||||
always send `false` (we never want the harness to broadcast).
|
||||
- `<finalize>` (bool) — Coldcard combines and finalises, returns raw hex
|
||||
instead of PSBT.
|
||||
- `<download>` (bool) — request the signed bytes back in a
|
||||
`local_download` frame. We always send `true`.
|
||||
|
||||
Response: one of
|
||||
- `local_download` — success. Fields: `data` (bytes or hex), `is_b64` flag.
|
||||
- `show_modal` with `html` containing `"Rejected"` — Coldcard refused.
|
||||
The human-readable reason follows "Rejected:" in the HTML.
|
||||
|
||||
### `sign_message`
|
||||
|
||||
Message signing on an allowed derivation path:
|
||||
|
||||
```json
|
||||
{"action": "sign_message", "args": ["<text>", "<bip32_path>", "<addr_format>"]}
|
||||
```
|
||||
|
||||
- `<addr_format>` — `"segwit"`, `"classic"`, or `"p2sh"`.
|
||||
|
||||
Response shapes differ between CKBunker versions:
|
||||
|
||||
- Newer: `message_signed` frame with `{address, signature}`.
|
||||
- Older: `local_download` with a three-line body: `signature\naddress\nmessage`.
|
||||
|
||||
The client handles both.
|
||||
|
||||
## Response parsing notes
|
||||
|
||||
### Rejection text
|
||||
|
||||
Coldcard rejection reasons come back embedded in a rendered HTML modal. The
|
||||
grammar is stable:
|
||||
|
||||
```
|
||||
Rejected by Coldcard.
|
||||
Rejected: <reason[, reason...]>
|
||||
```
|
||||
|
||||
Common reasons observed:
|
||||
|
||||
| Reason | Meaning |
|
||||
|-----------------------------------------------------------|-------------------------------------------------|
|
||||
| `rule #1: need user(s) confirmation` | Rule #1 applies, no user auth supplied |
|
||||
| `rule #2: would exceed period spending` | Rule #2 cap hit, falls through to Rule #1 |
|
||||
| `bad TOTP code` | TOTP was supplied but didn't verify |
|
||||
| `policy refuses this path` | Message signing on a disallowed path |
|
||||
| `not enough funds` | UTXOs for the PSBT aren't available |
|
||||
| `warnings rejected` | PSBT carries a warning and policy doesn't allow |
|
||||
|
||||
The harness's `SignResult.is_expected_rejection("rule #1")` does a
|
||||
case-insensitive substring match so the actual rejection reason can be
|
||||
asserted without overfitting to exact Coldcard firmware wording.
|
||||
|
||||
### The "Amount Spent" display bug
|
||||
|
||||
CKBunker 0.9.1 occasionally renders `Amount Spent` as the sum of the Rule #1
|
||||
and Rule #2 period caps instead of actual cumulative spend. The Coldcard's
|
||||
internal velocity counter is authoritative. The harness does **not** rely
|
||||
on the amount field for any assertion — it checks `Approvals` and
|
||||
`Refusals` deltas only, which are accurate.
|
||||
|
||||
## Timing
|
||||
|
||||
Coldcard signing is fast but not instant — typical round-trip under 1s for
|
||||
small PSBTs, 2–5s for TOTP-authorised PSBTs. The harness uses a 30-second
|
||||
timeout for sign attempts, 20 seconds for message signing. If you see
|
||||
timeouts regularly, check:
|
||||
|
||||
- USB passthrough is still attached (`lsusb | grep d13e` on the VM)
|
||||
- the Coldcard isn't blocked on a screen prompt (it shouldn't be in HSM mode)
|
||||
- `ckbunker.service` isn't restarting under load
|
||||
|
||||
## What this protocol can't do
|
||||
|
||||
- **No policy introspection over the wire.** The installed policy is only
|
||||
visible via the UI (and the Coldcard keypad/MicroSD log). This harness
|
||||
therefore relies on the operator declaring expected thresholds in
|
||||
`config.yaml` and asserts outcomes against those declared values.
|
||||
- **No atomic batch sign.** Each PSBT is submitted one at a time. The
|
||||
WebSocket can be reused, but each sign_psbt call is independent. This is
|
||||
fine — the Coldcard enforces per-txn limits anyway.
|
||||
- **No policy change.** There is no protocol action for editing the
|
||||
policy. This is intentional; policy changes go through keypad + MicroSD.
|
||||
159
docs/WHY.md
Normal file
159
docs/WHY.md
Normal file
@ -0,0 +1,159 @@
|
||||
# Why this harness exists, and why it's written the way it is
|
||||
|
||||
## Why a harness at all
|
||||
|
||||
The Coldcard HSM's whole value proposition is that the **policy on the
|
||||
device is what enforces safety** — not the VM, not the network, not the
|
||||
operator. That's a great story, until someone mis-installs a policy file
|
||||
and nobody notices because the "happy path" (small, auto-approved txs)
|
||||
still works.
|
||||
|
||||
Failure modes this harness is designed to catch:
|
||||
|
||||
1. **Policy rule collapse** — the auto-approve rule (Rule #2) is loaded
|
||||
but the user-auth rule (Rule #1) is missing or weakened, so large
|
||||
transactions sign without 2FA. The **`rule1_without_totp_rejects`
|
||||
test** is the single most important assertion: it attempts to sign an
|
||||
above-threshold transaction without TOTP and requires a specific
|
||||
rejection reason.
|
||||
|
||||
2. **TOTP secret drift** — authenticator app rotated, backup unclear, or
|
||||
a policy rewrite issued a new secret without updating the operator's
|
||||
phone. The **`rule1_with_totp_signs` test** catches this before you
|
||||
need to send a real transaction.
|
||||
|
||||
3. **Coldcard USB detach** — Proxmox USB passthrough occasionally
|
||||
detaches after host reboots. CKBunker starts, the UI renders, but the
|
||||
Coldcard isn't actually attached. The **`message_signing` test**
|
||||
catches this cheaply (no UTXO needed).
|
||||
|
||||
4. **Cloudflare Access regression** — an accident in the Zero Trust
|
||||
dashboard exposes the bunker to the internet. The harness doesn't
|
||||
directly test CF Access policy, but running it via the Tailscale IP
|
||||
while periodically curl-ing the public hostname catches the
|
||||
"SSO gate missing" case.
|
||||
|
||||
5. **Silent server rejection** — CKBunker returns an HTTP 200 with a
|
||||
rejection modal, not an HTTP error code. Automated clients that only
|
||||
check HTTP status can "succeed" against a server that refused to
|
||||
sign. The harness parses the modal and treats rejections as failures
|
||||
when a signature was expected.
|
||||
|
||||
## Why WebSocket, not HTTP
|
||||
|
||||
CKBunker's web UI and its signing protocol live on the same WebSocket
|
||||
endpoint. The HTTP endpoints render HTML only. If you only speak HTTP
|
||||
you can **watch** the counters but can't **cause** a sign. The harness
|
||||
needs to cause signs — so WebSocket.
|
||||
|
||||
An unfortunate side-effect: Cloudflare Access with service tokens
|
||||
doesn't pass the WebSocket upgrade cleanly. This is why the harness
|
||||
assumes a private ingress (Tailscale) is available even for
|
||||
CF-fronted deployments.
|
||||
|
||||
## Why a custom client and not upstream
|
||||
|
||||
Upstream CKBunker ships a `ckbunker` console script, but in `v0.9.1` it
|
||||
has a broken import path (tries to `import main` from outside the
|
||||
package). There is no packaged Python client. The 500-line client in
|
||||
`ckbunker_hsm_sign/client.py` is hand-rolled against the observed
|
||||
WebSocket protocol — small enough to audit, big enough to be useful,
|
||||
and stable because CKBunker's own Vue front-end doesn't change often.
|
||||
|
||||
The cost: if upstream changes frame shapes, this harness will need an
|
||||
update. The protocol doc (`PROTOCOL.md`) captures the current shapes so
|
||||
future changes are easy to diff.
|
||||
|
||||
## Why the harness doesn't generate PSBTs
|
||||
|
||||
**Generating spendable PSBTs requires the Coldcard's xpub, a UTXO, and
|
||||
a recipient.** That's significant state that differs per deployment. The
|
||||
harness stays deployment-agnostic by accepting **pre-crafted PSBT
|
||||
fixtures** (see [`fixtures/README.md`](../fixtures/README.md)).
|
||||
|
||||
This also means you don't risk spending real sats on a validation run.
|
||||
The same `large.psbt` can be re-used indefinitely for the reject-path
|
||||
test because the Coldcard rejects on **amount**, not UTXO availability.
|
||||
|
||||
## Why config over code
|
||||
|
||||
Every deployment has its own policy shape. Rather than hard-code
|
||||
"10,000 sats" as the auto-approve cap, the harness reads thresholds
|
||||
from `config.yaml` and asserts them against outcomes. If your Rule #2
|
||||
per-txn cap is 50,000 sats, you:
|
||||
|
||||
1. Edit `config.yaml` — set `policy.auto_approve.per_txn_sats: 50000`.
|
||||
2. Craft `small.psbt` at 49,000 sats and `large.psbt` at 100,000 sats.
|
||||
3. Run the harness.
|
||||
|
||||
No code changes. The **outcomes** the harness asserts are framed as
|
||||
"this PSBT should/shouldn't sign in this path", not "this specific sat
|
||||
amount should sign".
|
||||
|
||||
## Why pytest AND a CLI
|
||||
|
||||
Different operators want different ergonomics:
|
||||
|
||||
- **`hsm_validate.py`** (CLI) — human-readable coloured output, runs the
|
||||
tests in order, exits 0/1/2. Good for oncall dashboards, cron monitors,
|
||||
demoing to stakeholders.
|
||||
- **`pytest tests/`** — integrates with existing CI, produces JUnit XML,
|
||||
lets you parametrise against multiple environments. Good for
|
||||
automated deploy gates.
|
||||
|
||||
Both paths share the same client, fixtures, and config loader — there's
|
||||
no duplication.
|
||||
|
||||
## Why the tests are numbered (`test_01`, `test_02` …)
|
||||
|
||||
pytest doesn't guarantee execution order across files. The numbered
|
||||
prefixes ensure the order reads top-to-bottom when presented (by
|
||||
collection order and by `pytest -v` output), matching the narrative
|
||||
of the CLI harness. This helps when screenshotting a run for an
|
||||
incident report — the sequence looks sensible.
|
||||
|
||||
## Why we scrape the dashboard at all
|
||||
|
||||
The counters test is a **sanity check against client-side deception**.
|
||||
If a future bug in the client mis-identifies a rejection as a
|
||||
signature (or vice versa), the dashboard deltas reveal it: the
|
||||
Coldcard doesn't lie about whether it signed, and the dashboard
|
||||
reflects Coldcard state. If the harness says "4 signs, 1 reject" but
|
||||
the dashboard shows "0 signs, 0 rejects", something is wrong at the
|
||||
network layer.
|
||||
|
||||
The scraper is tolerant: CKBunker versions vary in HTML shape, so if
|
||||
the regex can't find the numbers the test skips rather than fails.
|
||||
The real signing assertions already prove end-to-end correctness.
|
||||
|
||||
## Why rejections aren't exceptions
|
||||
|
||||
A rejection is a successful policy evaluation — the **Coldcard did
|
||||
exactly what it was configured to do**. Treating rejections as Python
|
||||
exceptions would:
|
||||
|
||||
- force every call site into try/except
|
||||
- conflate policy behaviour with transport errors (network, timeout)
|
||||
- hide the rejection reason behind an exception type
|
||||
|
||||
Instead, `SignResult.status` is an enum with four values (`SIGNED`,
|
||||
`REJECTED`, `TIMEOUT`, `WS_ERROR`) and the caller asserts the status it
|
||||
expects. `is_expected_rejection("rule #1")` keeps the specific-reason
|
||||
check terse.
|
||||
|
||||
## Why "don't broadcast" is the default
|
||||
|
||||
`submit_psbt` accepts a `broadcast=True` flag that asks CKBunker to
|
||||
push the signed tx. The harness always sends `broadcast=false`. A
|
||||
validation run should never touch the mempool. Operators who want to
|
||||
drive real signings via this client should use it directly, not via
|
||||
the harness.
|
||||
|
||||
## Why there's no CI/CD templating
|
||||
|
||||
Every shop's CI is different (GitHub Actions, Drone, Gitea Actions,
|
||||
Jenkins, Woodpecker). Providing a single-vendor pipeline template
|
||||
would add maintenance burden without saving meaningful integration
|
||||
time. The `hsm_validate.py` CLI returns exit code 0 on success, 1 on
|
||||
failure — which is all any CI needs. Integration examples live in the
|
||||
README.
|
||||
102
fixtures/README.md
Normal file
102
fixtures/README.md
Normal file
@ -0,0 +1,102 @@
|
||||
# Test PSBTs — how to generate them
|
||||
|
||||
The harness needs **two pre-crafted PSBTs**:
|
||||
|
||||
| Fixture | Amount | Policy path expected |
|
||||
|------------------|------------------|------------------------------|
|
||||
| `small.psbt` | ≤ auto-approve cap (e.g. 9,000 sats if your Rule #2 cap is 10,000) | Signs without TOTP |
|
||||
| `large.psbt` | > auto-approve cap, ≤ user-auth cap (e.g. 100,000 sats) | Rejected without TOTP; signs with TOTP |
|
||||
|
||||
Both PSBTs must:
|
||||
|
||||
- be **spendable by the Coldcard** bound to your CKBunker (same seed / xpub)
|
||||
- spend to **an address you control** (or a burn address — they are test
|
||||
inputs, you never broadcast them)
|
||||
- use a real UTXO the Coldcard can see (watch-only wallet)
|
||||
|
||||
---
|
||||
|
||||
## Method 1 — Sparrow Wallet (recommended for first-time setup)
|
||||
|
||||
1. In Sparrow, open or create a **watch-only wallet** loaded with your
|
||||
Coldcard's xpub. (The Coldcard's HSM-Mode QR or a `coldcard.txt` export
|
||||
works.)
|
||||
2. Send yourself a small amount on testnet **or** signet so you have a UTXO
|
||||
to spend without losing real sats. (For mainnet demos, 10k sats is
|
||||
~AUD $1.)
|
||||
3. Build two transactions:
|
||||
- `Small demo` — pay **9,000 sats** (or 90% of your Rule #2 per-txn cap)
|
||||
to any receive address in the same wallet. Sparrow → Send → *Save PSBT*
|
||||
→ write to `fixtures/small.psbt`.
|
||||
- `Large demo` — pay **100,000 sats** (or mid-range of your Rule #1 cap)
|
||||
the same way. Save as `fixtures/large.psbt`.
|
||||
4. Both PSBTs should show **Coldcard as a required signer** in Sparrow.
|
||||
|
||||
> Do NOT broadcast these. The harness signs them, but you verify the
|
||||
> signatures in Sparrow and then discard — there's no reason to spend real
|
||||
> sats on a validation run.
|
||||
|
||||
---
|
||||
|
||||
## Method 2 — bitcoind (CI / automation)
|
||||
|
||||
If you're wiring the harness into CI against a regtest or signet
|
||||
deployment, scripting PSBT generation is a one-off:
|
||||
|
||||
```bash
|
||||
#!/usr/bin/env bash
|
||||
# Requires bitcoin-cli on PATH, pointed at a node that sees your wallet.
|
||||
set -euo pipefail
|
||||
|
||||
WALLET="ckbunker-watch"
|
||||
FEE_RATE=10 # sat/vB
|
||||
|
||||
recipient=$(bitcoin-cli -rpcwallet=$WALLET getnewaddress)
|
||||
|
||||
small_raw=$(bitcoin-cli -rpcwallet=$WALLET walletcreatefundedpsbt \
|
||||
'[]' "[{\"$recipient\":0.00009000}]" 0 \
|
||||
"{\"fee_rate\":$FEE_RATE}" | jq -r '.psbt')
|
||||
echo "$small_raw" | base64 -d > fixtures/small.psbt
|
||||
|
||||
large_raw=$(bitcoin-cli -rpcwallet=$WALLET walletcreatefundedpsbt \
|
||||
'[]' "[{\"$recipient\":0.00100000}]" 0 \
|
||||
"{\"fee_rate\":$FEE_RATE}" | jq -r '.psbt')
|
||||
echo "$large_raw" | base64 -d > fixtures/large.psbt
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Method 3 — use the same PSBT file over and over
|
||||
|
||||
Nothing in the harness requires the PSBT to be spendable *right now* for the
|
||||
reject-path test (`test_04`). The Coldcard rejects on **amount**, not on
|
||||
whether the UTXO is still unspent. So:
|
||||
|
||||
- `small.psbt` can be reused until the UTXO is spent elsewhere.
|
||||
- `large.psbt` can be reused indefinitely — every validation run that tests
|
||||
Rule #1 rejection produces a rejection regardless of UTXO state.
|
||||
|
||||
If you run the full suite frequently, consider crafting `large.psbt`
|
||||
deliberately against an **already-spent UTXO** so the success path
|
||||
(`test_05`) fails at signature verification (not policy evaluation) —
|
||||
this is arguably safer than running with signable funds live.
|
||||
|
||||
---
|
||||
|
||||
## File format
|
||||
|
||||
Either **binary** (`psbt\xff...` magic bytes) or **base64**-encoded text is
|
||||
accepted by the harness — it auto-detects via magic bytes. Sparrow exports
|
||||
binary by default; bitcoin-cli returns base64.
|
||||
|
||||
---
|
||||
|
||||
## What NOT to do
|
||||
|
||||
- Do not commit real PSBTs to git — `.gitignore` already blocks `*.psbt` in
|
||||
this directory.
|
||||
- Do not use a PSBT that spends a UTXO you can't afford to move. The
|
||||
harness does not broadcast, but a leaked signed PSBT *can* be broadcast
|
||||
by anyone.
|
||||
- Do not reuse production keys for generating fixtures — prefer testnet
|
||||
or signet.
|
||||
94
hsm_validate.py
Normal file
94
hsm_validate.py
Normal file
@ -0,0 +1,94 @@
|
||||
#!/usr/bin/env python3
|
||||
"""CKBunker HSM production validator — CLI entrypoint.
|
||||
|
||||
Runs a short, structured sequence of tests against a live CKBunker + Coldcard
|
||||
deployment and exits non-zero if anything fails. Safe to run in CI or as a
|
||||
periodic monitor; all signing uses pre-crafted test PSBTs that you supply.
|
||||
|
||||
Usage:
|
||||
./hsm_validate.py # env/.env only
|
||||
./hsm_validate.py --config config.yaml
|
||||
./hsm_validate.py --url http://10.x.y.z:9823 --tests connectivity message_signing
|
||||
|
||||
Exits:
|
||||
0 all enabled tests passed (or were skipped)
|
||||
1 at least one test failed
|
||||
2 configuration error
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from ckbunker_hsm_sign import Harness, load_config
|
||||
from ckbunker_hsm_sign.harness import Verdict
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
p = argparse.ArgumentParser(
|
||||
description="Validate a CKBunker + Coldcard HSM deployment",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=__doc__.split("Usage:")[1],
|
||||
)
|
||||
p.add_argument("--config", type=Path, default=None,
|
||||
help="YAML configuration file (see config.example.yaml)")
|
||||
p.add_argument("--env", type=Path, default=Path(".env"),
|
||||
help="dotenv file to read (default: .env)")
|
||||
p.add_argument("--url", default=None,
|
||||
help="override CKBunker URL")
|
||||
p.add_argument("--tests", nargs="+", default=None,
|
||||
help="only run these tests (by name)")
|
||||
p.add_argument("--skip", nargs="+", default=None,
|
||||
help="skip these tests (by name)")
|
||||
p.add_argument("--verbose", "-v", action="store_true",
|
||||
help="dump every WebSocket frame")
|
||||
p.add_argument("--save-signed", default=None,
|
||||
help="write signed PSBTs from sign tests into this directory")
|
||||
p.add_argument("--list-tests", action="store_true",
|
||||
help="print test names and exit")
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
|
||||
try:
|
||||
overrides = {
|
||||
"url": args.url,
|
||||
"verbose": args.verbose,
|
||||
"save_signed_dir": args.save_signed,
|
||||
}
|
||||
cfg = load_config(
|
||||
yaml_path=args.config,
|
||||
dotenv_path=args.env if args.env.exists() else None,
|
||||
overrides={k: v for k, v in overrides.items() if v is not None},
|
||||
)
|
||||
except SystemExit as e:
|
||||
print(f"configuration error: {e}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
if args.list_tests:
|
||||
for name in cfg.tests:
|
||||
print(name)
|
||||
return 0
|
||||
|
||||
if args.tests:
|
||||
for k in cfg.tests:
|
||||
cfg.tests[k] = k in args.tests
|
||||
if args.skip:
|
||||
for k in args.skip:
|
||||
if k in cfg.tests:
|
||||
cfg.tests[k] = False
|
||||
|
||||
harness = Harness(cfg)
|
||||
outcomes = harness.run_all()
|
||||
|
||||
if any(o.verdict == Verdict.FAIL for o in outcomes):
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
48
pyproject.toml
Normal file
48
pyproject.toml
Normal file
@ -0,0 +1,48 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=61.0"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "ckbunker-hsm-sign"
|
||||
version = "1.0.0"
|
||||
description = "Production validation test harness for CKBunker + Coldcard Mk4 HSM deployments"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10"
|
||||
license = { text = "MIT" }
|
||||
authors = [
|
||||
{ name = "Mineracks" },
|
||||
]
|
||||
keywords = ["bitcoin", "hsm", "coldcard", "ckbunker", "signing", "testing"]
|
||||
classifiers = [
|
||||
"Development Status :: 4 - Beta",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Programming Language :: Python :: 3.12",
|
||||
"Topic :: Security :: Cryptography",
|
||||
"Topic :: Software Development :: Testing",
|
||||
]
|
||||
dependencies = [
|
||||
"websockets>=12.0",
|
||||
"pyotp>=2.9.0",
|
||||
"requests>=2.31.0",
|
||||
"PyYAML>=6.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
test = ["pytest>=8.0.0", "pytest-asyncio>=0.23.0"]
|
||||
|
||||
[project.scripts]
|
||||
hsm-validate = "ckbunker_hsm_sign.cli:main"
|
||||
|
||||
[project.urls]
|
||||
Source = "https://git.mineracks.com/mineracks/mineracks-ckbunker-hsm-sign"
|
||||
|
||||
[tool.setuptools]
|
||||
packages = ["ckbunker_hsm_sign"]
|
||||
py-modules = ["hsm_validate"]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
asyncio_mode = "auto"
|
||||
testpaths = ["tests"]
|
||||
5
requirements.txt
Normal file
5
requirements.txt
Normal file
@ -0,0 +1,5 @@
|
||||
websockets>=12.0
|
||||
pyotp>=2.9.0
|
||||
requests>=2.31.0
|
||||
PyYAML>=6.0
|
||||
pytest>=8.0.0
|
||||
82
tests/conftest.py
Normal file
82
tests/conftest.py
Normal file
@ -0,0 +1,82 @@
|
||||
"""Pytest fixtures.
|
||||
|
||||
Each test module imports `client_session` and/or `cfg` from here. Running
|
||||
`pytest` against a CKBunker deployment picks up configuration from the same
|
||||
sources as the CLI harness — .env first, then `config.yaml` if present.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from ckbunker_hsm_sign import Client, load_config
|
||||
from ckbunker_hsm_sign.config import Config
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def cfg() -> Config:
|
||||
yaml_path = Path("config.yaml")
|
||||
return load_config(
|
||||
yaml_path=yaml_path if yaml_path.exists() else None,
|
||||
dotenv_path=Path(".env") if Path(".env").exists() else None,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def client(cfg: Config) -> Client:
|
||||
return Client(
|
||||
base_url=cfg.url,
|
||||
cf_access_client_id=cfg.cf_client_id,
|
||||
cf_access_client_secret=cfg.cf_client_secret,
|
||||
totp_secret=cfg.totp_secret,
|
||||
user=cfg.user,
|
||||
verbose=cfg.verbose,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def event_loop():
|
||||
"""Give each test its own event loop — WebSockets don't love being shared."""
|
||||
loop = asyncio.new_event_loop()
|
||||
yield loop
|
||||
loop.close()
|
||||
|
||||
|
||||
def _read_psbt(path: Path) -> bytes:
|
||||
import base64
|
||||
raw = path.read_bytes()
|
||||
if raw[:5] == b"psbt\xff":
|
||||
return raw
|
||||
try:
|
||||
decoded = base64.b64decode(raw.strip())
|
||||
if decoded[:5] == b"psbt\xff":
|
||||
return decoded
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
decoded = bytes.fromhex(raw.strip().decode("ascii"))
|
||||
if decoded[:5] == b"psbt\xff":
|
||||
return decoded
|
||||
except Exception:
|
||||
pass
|
||||
pytest.skip(f"{path} is not a valid PSBT")
|
||||
return b"" # unreachable
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def small_psbt(cfg: Config) -> bytes:
|
||||
path = Path(cfg.small_psbt_path)
|
||||
if not path.exists():
|
||||
pytest.skip(f"{path} not found — see fixtures/README.md")
|
||||
return _read_psbt(path)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def large_psbt(cfg: Config) -> bytes:
|
||||
path = Path(cfg.large_psbt_path)
|
||||
if not path.exists():
|
||||
pytest.skip(f"{path} not found — see fixtures/README.md")
|
||||
return _read_psbt(path)
|
||||
12
tests/test_01_connectivity.py
Normal file
12
tests/test_01_connectivity.py
Normal file
@ -0,0 +1,12 @@
|
||||
"""Basic reachability: CKBunker answers HTTP and exposes a WebSocket URL."""
|
||||
|
||||
from ckbunker_hsm_sign import Client
|
||||
|
||||
|
||||
def test_http_endpoint_reachable(client: Client):
|
||||
ws_url, cookies = client.fetch_ws_endpoint()
|
||||
assert ws_url.startswith("ws://") or ws_url.startswith("wss://"), ws_url
|
||||
# A session cookie is not strictly required by CKBunker, but its absence
|
||||
# often means we were silently rate-limited or hit the wrong hostname.
|
||||
# Surface it so the operator notices.
|
||||
assert cookies is not None # even "" is acceptable; None means parse failure
|
||||
25
tests/test_02_message_signing.py
Normal file
25
tests/test_02_message_signing.py
Normal file
@ -0,0 +1,25 @@
|
||||
"""Message signing: the cheapest live proof that the Coldcard is reachable
|
||||
and willing to sign under the policy."""
|
||||
|
||||
import pytest
|
||||
|
||||
from ckbunker_hsm_sign import Client, SignStatus
|
||||
from ckbunker_hsm_sign.config import Config
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_signs_message_on_allowed_path(client: Client, cfg: Config):
|
||||
if not cfg.policy.message_signing:
|
||||
pytest.skip("policy.message_signing is disabled in config")
|
||||
|
||||
async with client.session() as session:
|
||||
res = await session.sign_message(
|
||||
message="hsm-validate unit test",
|
||||
derivation_path=cfg.message_sign_path,
|
||||
)
|
||||
|
||||
assert res.status == SignStatus.SIGNED, (res.status, res.reason, res.error)
|
||||
assert res.signature, "no signature returned"
|
||||
# An address is nice-to-have; some CKBunker versions omit it for QR-only paths.
|
||||
if res.address:
|
||||
assert res.address.startswith(("bc1", "1", "3")), res.address
|
||||
18
tests/test_03_rule2_auto_approve.py
Normal file
18
tests/test_03_rule2_auto_approve.py
Normal file
@ -0,0 +1,18 @@
|
||||
"""Rule #2 equivalent: a sub-threshold PSBT must sign without any 2FA."""
|
||||
|
||||
import pytest
|
||||
|
||||
from ckbunker_hsm_sign import Client, SignStatus
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_small_psbt_signs_without_totp(client: Client, small_psbt: bytes):
|
||||
async with client.session() as session:
|
||||
res = await session.sign_psbt(small_psbt, use_totp=False)
|
||||
|
||||
assert res.status == SignStatus.SIGNED, (
|
||||
f"expected SIGNED, got {res.status.value}: {res.reason or res.error}"
|
||||
)
|
||||
assert res.signed_bytes, "no signed bytes returned"
|
||||
# The returned bytes should still be a valid PSBT envelope.
|
||||
assert res.signed_bytes[:5] == b"psbt\xff", res.signed_bytes[:5]
|
||||
27
tests/test_04_rule1_without_totp_rejects.py
Normal file
27
tests/test_04_rule1_without_totp_rejects.py
Normal file
@ -0,0 +1,27 @@
|
||||
"""The critical negative test: a transaction that exceeds the auto-approve
|
||||
cap must be *rejected* by the Coldcard when TOTP is absent.
|
||||
|
||||
If this test passes, your policy is doing its job. If it fails by reporting
|
||||
SIGNED, stop everything and review the policy on-device — you are running
|
||||
with no 2FA gate on Rule #1-sized spends.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from ckbunker_hsm_sign import Client, SignStatus
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_large_psbt_without_totp_is_rejected(client: Client, large_psbt: bytes):
|
||||
async with client.session() as session:
|
||||
res = await session.sign_psbt(large_psbt, use_totp=False)
|
||||
|
||||
# Fail LOUDLY if the policy didn't stop this.
|
||||
assert res.status != SignStatus.SIGNED, (
|
||||
"POLICY NOT ENFORCED: large PSBT signed without TOTP. "
|
||||
"Check the Coldcard's installed policy immediately."
|
||||
)
|
||||
assert res.is_expected_rejection("rule #1"), (
|
||||
f"expected a 'rule #1: need user(s) confirmation' rejection, "
|
||||
f"got status={res.status.value} reason={res.reason!r}"
|
||||
)
|
||||
20
tests/test_05_rule1_with_totp_signs.py
Normal file
20
tests/test_05_rule1_with_totp_signs.py
Normal file
@ -0,0 +1,20 @@
|
||||
"""Rule #1 equivalent: with a fresh TOTP code the same large PSBT signs."""
|
||||
|
||||
import pytest
|
||||
|
||||
from ckbunker_hsm_sign import Client, SignStatus
|
||||
from ckbunker_hsm_sign.config import Config
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_large_psbt_signs_with_totp(client: Client, large_psbt: bytes, cfg: Config):
|
||||
if not cfg.totp_secret:
|
||||
pytest.skip("TOTP_SECRET not configured")
|
||||
|
||||
async with client.session() as session:
|
||||
res = await session.sign_psbt(large_psbt, use_totp=True)
|
||||
|
||||
assert res.status == SignStatus.SIGNED, (
|
||||
f"expected SIGNED, got {res.status.value}: {res.reason or res.error}"
|
||||
)
|
||||
assert res.signed_bytes and res.signed_bytes[:5] == b"psbt\xff"
|
||||
25
tests/test_06_counters_tracked.py
Normal file
25
tests/test_06_counters_tracked.py
Normal file
@ -0,0 +1,25 @@
|
||||
"""Server-visible counters should reflect the operations just done.
|
||||
|
||||
Soft test: if the scraper can't read the dashboard on your CKBunker version,
|
||||
this skips rather than fails — the real signing tests already prove the
|
||||
end-to-end path worked.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from ckbunker_hsm_sign.config import Config
|
||||
from ckbunker_hsm_sign.scraper import fetch_counters
|
||||
|
||||
|
||||
def test_counters_read(cfg: Config):
|
||||
counters = fetch_counters(
|
||||
cfg.url,
|
||||
cf_client_id=cfg.cf_client_id,
|
||||
cf_client_secret=cfg.cf_client_secret,
|
||||
)
|
||||
if counters.approvals is None and counters.refusals is None:
|
||||
pytest.skip("could not parse dashboard counters on this CKBunker version")
|
||||
assert counters.approvals is None or counters.approvals >= 0
|
||||
assert counters.refusals is None or counters.refusals >= 0
|
||||
if counters.amount_spent_btc is not None:
|
||||
assert counters.amount_spent_btc >= 0
|
||||
Loading…
Reference in New Issue
Block a user