feat: add cloudflare coordinator

This commit is contained in:
Peter Steinberger 2026-04-30 17:49:29 +01:00
parent 9afd1e41dc
commit b4cd356443
No known key found for this signature in database
27 changed files with 3964 additions and 28 deletions

View File

@ -49,3 +49,35 @@ jobs:
- name: Build
run: go build -trimpath -o /tmp/crabbox ./cmd/crabbox
worker:
name: Worker
runs-on: ubuntu-latest
timeout-minutes: 15
steps:
- name: Check out
uses: actions/checkout@v6
- name: Set up Node
uses: actions/setup-node@v6
with:
node-version: 24
cache: npm
cache-dependency-path: worker/package-lock.json
- name: Install
run: npm ci
working-directory: worker
- name: Typecheck
run: npm run check
working-directory: worker
- name: Test
run: npm test
working-directory: worker
- name: Build
run: npm run build
working-directory: worker

3
.gitignore vendored
View File

@ -1,5 +1,6 @@
bin/
dist/
node_modules/
.wrangler/
*.test
coverage.out

View File

@ -2,7 +2,7 @@
Crabbox is an open source remote testbox runner for OpenClaw maintainers. It gives a Blacksmith Testboxes-style local loop on owned Hetzner capacity: provision or reuse a warm Linux box, sync the current dirty checkout, run a command remotely, stream output, and clean up.
The current implementation is a direct Go CLI that talks to Hetzner Cloud and SSH. The planned shared control plane is a Cloudflare Worker plus Durable Object coordinator; see [docs/mvp-plan.md](docs/mvp-plan.md).
The current implementation is a Go CLI plus a Cloudflare Worker/Durable Object coordinator. The CLI can also fall back to direct Hetzner Cloud calls when `CRABBOX_COORDINATOR` is unset.
## Status
@ -14,6 +14,9 @@ Working today:
- `crabbox stop`
- `crabbox pool list`
- `crabbox machine cleanup`
- Cloudflare Worker coordinator on Workers/Durable Objects
- bearer-token coordinator auth for automation
- Cloudflare route for `crabbox.clawd.bot/*`
- Hetzner server provisioning with class fallback
- cloud-init bootstrap for Node 22, pnpm, Docker, Git, and rsync
- rsync overlay of local dirty worktrees
@ -22,8 +25,6 @@ Working today:
Not yet done:
- Cloudflare coordinator API
- Durable Object shared lease store
- `crabbox login`
- GitHub Actions/OIDC-compatible execution
- untrusted multi-tenant isolation
@ -36,6 +37,7 @@ Prerequisites:
- `git`, `ssh`, `rsync`, and `curl`
- Hetzner token in `HCLOUD_TOKEN` or `HETZNER_TOKEN`
- SSH key at `~/.ssh/id_ed25519`, or set `CRABBOX_SSH_KEY`
- deployed coordinator env in `CRABBOX_COORDINATOR` and `CRABBOX_COORDINATOR_TOKEN`
Build:
@ -49,6 +51,13 @@ Check local prerequisites and Hetzner access:
bin/crabbox doctor
```
Use the deployed coordinator:
```sh
export CRABBOX_COORDINATOR=https://crabbox-coordinator.steipete.workers.dev
bin/crabbox pool list
```
Warm a reusable OpenClaw testbox:
```sh
@ -80,18 +89,54 @@ beast ccx63, ccx53, ccx43, cpx62, cx53
During verification, Hetzner rejected `ccx63`, `ccx53`, and `ccx43` because of the account dedicated-core quota, so Crabbox fell back to `cpx62`.
## OpenClaw Verification
## Cloudflare Deployment
Verified from `/Users/steipete/Projects/openclaw` on a warm fallback `cpx62` runner:
Worker source lives in `worker/`.
Local checks:
```sh
CI=1 /usr/bin/time -p /Users/steipete/Projects/crabbox/bin/crabbox run --id cbx_f782c469c9ce -- pnpm test:changed:max
npm ci --prefix worker
npm run check --prefix worker
npm test --prefix worker
npm run build --prefix worker
```
Deploy:
```sh
export CLOUDFLARE_API_TOKEN="$CRABBOX_CLOUDFLARE_API_TOKEN"
export CLOUDFLARE_ACCOUNT_ID="$CRABBOX_CLOUDFLARE_ACCOUNT_ID"
npx wrangler deploy --config worker/wrangler.jsonc
```
Required Worker secrets:
```text
HETZNER_TOKEN
CRABBOX_SHARED_TOKEN
```
The Worker is deployed at:
```text
https://crabbox-coordinator.steipete.workers.dev
```
The Cloudflare route `crabbox.clawd.bot/*` is also attached and currently protected by Cloudflare Access.
## OpenClaw Verification
Verified from `/Users/steipete/Projects/openclaw` on a Cloudflare-created fallback `cpx62` runner:
```sh
CI=1 /usr/bin/time -p /Users/steipete/Projects/crabbox/bin/crabbox run --id cbx_f60f47cbc879 -- pnpm test:changed:max
```
Result:
- 61 Vitest shards completed successfully.
- End-to-end wall time was 93.17 seconds.
- End-to-end warm wall time was 93.66 seconds through the Cloudflare coordinator path.
- The timing includes rsync scan, remote Git hydration, command execution, and output streaming.
For true Blacksmith Testboxes parity, raise the Hetzner dedicated-core quota and re-run on `ccx63`.
@ -103,6 +148,8 @@ Environment variables:
```text
HCLOUD_TOKEN or HETZNER_TOKEN Hetzner Cloud API token
CRABBOX_PROFILE default openclaw-check
CRABBOX_COORDINATOR optional coordinator URL
CRABBOX_COORDINATOR_TOKEN optional coordinator bearer token
CRABBOX_DEFAULT_CLASS default beast
CRABBOX_HETZNER_LOCATION default fsn1
CRABBOX_HETZNER_IMAGE default ubuntu-24.04
@ -130,6 +177,10 @@ gofmt -w $(git ls-files '*.go')
go vet ./...
go test -race ./...
go build -trimpath -o bin/crabbox ./cmd/crabbox
npm ci --prefix worker
npm run check --prefix worker
npm test --prefix worker
npm run build --prefix worker
```
CI runs the same checks on pushes and pull requests.

View File

@ -176,6 +176,8 @@ CRABBOX_LOG
Provider/deploy variables live outside normal CLI operation:
```text
CRABBOX_COORDINATOR
CRABBOX_COORDINATOR_TOKEN
CRABBOX_CLOUDFLARE_API_TOKEN
CRABBOX_CLOUDFLARE_ACCOUNT_ID
CRABBOX_CLOUDFLARE_ZONE_ID
@ -208,4 +210,3 @@ JSON output:
```
No progress bars when stdout is not a TTY.

View File

@ -68,6 +68,7 @@ Current local status:
- Core Cloudflare, Hetzner, and GitHub tokens are present in local `~/.profile`.
- The Crabbox Cloudflare token is mirrored to MacBook Pro `~/.profile`.
- `CRABBOX_COORDINATOR` and `CRABBOX_COORDINATOR_TOKEN` are present in local and MacBook Pro `~/.profile`.
- GitHub OAuth client ID and secret are present in local and MacBook Pro `~/.profile`.
- Cloudflare Access GitHub IdP is created.
- Cloudflare Access fallback app is created for `crabbox.clawd.bot`.
@ -223,20 +224,34 @@ Deployment should:
6. Configure route/custom domain on `crabbox.clawd.bot`.
7. Verify `/v1/health` on the fallback domain.
Use `npx wrangler` from the Worker package unless `wrangler` is installed globally. Do not assume `hcloud` is installed; the implementation can use the Hetzner API directly from Go.
Use `npx wrangler` from the Worker package unless `wrangler` is installed globally. Do not assume `hcloud` is installed; the implementation can use the Hetzner API directly from Go or from the Worker.
Current deployed coordinator:
```text
https://crabbox-coordinator.steipete.workers.dev
crabbox.clawd.bot/* -> crabbox-coordinator, protected by Cloudflare Access
```
Current Worker secrets:
```text
HETZNER_TOKEN
CRABBOX_SHARED_TOKEN
```
## Verified OpenClaw Run
Warm-run command from `/Users/steipete/Projects/openclaw`:
Warm-run command from `/Users/steipete/Projects/openclaw` through the Cloudflare coordinator:
```sh
CI=1 /usr/bin/time -p /Users/steipete/Projects/crabbox/bin/crabbox run --id cbx_f782c469c9ce -- pnpm test:changed:max
CI=1 /usr/bin/time -p /Users/steipete/Projects/crabbox/bin/crabbox run --id cbx_f60f47cbc879 -- pnpm test:changed:max
```
Result:
- 61 Vitest shards completed successfully.
- End-to-end wall time: 93.17 seconds.
- End-to-end warm wall time: 93.66 seconds.
- Runner class: requested `beast`, actual fallback `cpx62`.
- Sync path: rsync overlay plus remote Git hydrate for shallow checkout merge-base support.

View File

@ -171,7 +171,7 @@ And proves:
## Known Current Infra Facts
- Direct CLI execution is implemented and verified. It can create/reuse a Hetzner server, bootstrap it, sync a local checkout with rsync, hydrate shallow Git history enough for changed-test detection, run commands over SSH, stream output, and release/delete leases.
- The Cloudflare coordinator and Durable Object lease store remain the intended shared-control-plane path, but are not on the verified execution path yet.
- The Cloudflare coordinator and Durable Object lease store are implemented and deployed. The CLI uses them when `CRABBOX_COORDINATOR` is set, and falls back to direct Hetzner otherwise.
- Intended primary domain: `crabbox.openclaw.ai`.
- Current Cloudflare-manageable fallback domain: `crabbox.clawd.bot`.
- `openclaw.ai` is currently not visible as a Cloudflare zone in the available account; DNS is on Namecheap nameservers.
@ -184,22 +184,21 @@ And proves:
- GitHub OAuth client ID and secret are present in local and MacBook Pro `~/.profile`.
- Cloudflare Access GitHub IdP `GitHub OpenClaw` exists.
- Cloudflare Access app `Crabbox Coordinator` exists for `crabbox.clawd.bot`.
- Worker `crabbox-coordinator` is deployed at `https://crabbox-coordinator.steipete.workers.dev` and routed from `crabbox.clawd.bot/*`.
- Coordinator bearer auth uses `CRABBOX_COORDINATOR_TOKEN` locally and `CRABBOX_SHARED_TOKEN` in the Worker.
- Hetzner token is available in local and Mac Studio `~/.profile`.
- The Hetzner account currently hits a dedicated-core quota/resource limit for `ccx63`, `ccx53`, and `ccx43`. The `beast` class falls back to `cpx62` until quota is raised.
- Public SSH on port 22 was not usable from the tested network path; cloud-init opens SSH on port 2222 and the CLI uses that by default.
- OpenClaw verification on the fallback `cpx62` runner passed `CI=1 pnpm test:changed:max`, completing 61 Vitest shards in 93.17 seconds end-to-end for a warm run, including rsync scan and remote Git hydration.
- OpenClaw verification through the Cloudflare coordinator on the fallback `cpx62` runner passed `CI=1 pnpm test:changed:max`, completing 61 Vitest shards in 93.66 seconds end-to-end for a warm run, including rsync scan and remote Git hydration.
- GitHub org slug is `openclaw`.
- `wrangler` and `hcloud` are not assumed to be globally installed; use `npx wrangler` and direct Hetzner API or document install steps.
## Next Implementation Milestones
1. Raise Hetzner dedicated-core quota so `beast` can use `ccx63` instead of falling back to `cpx62`.
2. Add Cloudflare Worker API skeleton with local tests.
3. Add Durable Object lease store.
4. Move direct Hetzner lease lifecycle behind the coordinator for shared pool use.
5. Bind the Worker behind the existing Cloudflare Access fallback app.
6. Configure the fallback route/custom domain on `crabbox.clawd.bot`.
7. Add `crabbox login` and Access token handling.
8. Add one-shot `run --profile` cleanup semantics for ephemeral servers.
9. Add heartbeat support for long-running commands.
10. Re-run OpenClaw `pnpm test:changed:max` on `ccx63` and compare against Blacksmith Testboxes.
2. Add `crabbox login` and Cloudflare Access token handling.
3. Add Cloudflare Access service-token support for non-browser CLI use on `crabbox.clawd.bot`.
4. Add heartbeat support for long-running commands.
5. Add one-shot `run --profile` cleanup semantics coverage in integration tests.
6. Add coordinator admin cleanup/drain endpoints.
7. Re-run OpenClaw `pnpm test:changed:max` on `ccx63` and compare against Blacksmith Testboxes.

View File

@ -21,6 +21,7 @@ MVP:
- GitHub Access IdP is configured for the `openclaw` org.
- Coordinator validates `Cf-Access-Jwt-Assertion`.
- Coordinator maps Access identity to lease owner.
- Workers.dev automation currently uses a shared bearer token while `crabbox login` and Access service-token support are completed.
Target:
@ -51,6 +52,7 @@ Rules:
- Never accept secret values as command-line flag values.
- Never log env values.
- Redact known secret-looking strings in diagnostics.
- `CRABBOX_SHARED_TOKEN` is stored as a Worker secret; local clients use `CRABBOX_COORDINATOR_TOKEN`.
Profile allowlist example:

View File

@ -58,6 +58,8 @@ Usage:
Environment:
HCLOUD_TOKEN or HETZNER_TOKEN
CRABBOX_COORDINATOR, optional Cloudflare coordinator URL
CRABBOX_COORDINATOR_TOKEN, optional coordinator bearer token
CRABBOX_SSH_KEY, default ~/.ssh/id_ed25519
CRABBOX_DEFAULT_CLASS, default beast`)
}

View File

@ -10,6 +10,8 @@ type Config struct {
Profile string
Class string
ServerType string
Coordinator string
CoordToken string
Location string
Image string
SSHUser string
@ -32,6 +34,8 @@ func defaultConfig() Config {
Profile: getenv("CRABBOX_PROFILE", "openclaw-check"),
Class: class,
ServerType: serverTypeForClass(class),
Coordinator: os.Getenv("CRABBOX_COORDINATOR"),
CoordToken: os.Getenv("CRABBOX_COORDINATOR_TOKEN"),
Location: getenv("CRABBOX_HETZNER_LOCATION", "fsn1"),
Image: getenv("CRABBOX_HETZNER_IMAGE", "ubuntu-24.04"),
SSHUser: getenv("CRABBOX_SSH_USER", "crabbox"),

171
internal/cli/coordinator.go Normal file
View File

@ -0,0 +1,171 @@
package cli
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
type CoordinatorClient struct {
BaseURL string
Token string
Client *http.Client
}
type CoordinatorLease struct {
ID string `json:"id"`
Profile string `json:"profile"`
Class string `json:"class"`
ServerType string `json:"serverType"`
ServerID int64 `json:"serverID"`
ServerName string `json:"serverName"`
Host string `json:"host"`
SSHUser string `json:"sshUser"`
SSHPort string `json:"sshPort"`
WorkRoot string `json:"workRoot"`
Keep bool `json:"keep"`
State string `json:"state"`
ExpiresAt string `json:"expiresAt"`
}
type CoordinatorMachine struct {
ID int64 `json:"id"`
Name string `json:"name"`
Status string `json:"status"`
ServerType string `json:"serverType"`
Host string `json:"host"`
Labels map[string]string `json:"labels"`
}
func newCoordinatorClient(cfg Config) (*CoordinatorClient, bool, error) {
if cfg.Coordinator == "" {
return nil, false, nil
}
base, err := url.Parse(cfg.Coordinator)
if err != nil {
return nil, true, exit(2, "invalid CRABBOX_COORDINATOR: %v", err)
}
if base.Scheme == "" || base.Host == "" {
return nil, true, exit(2, "CRABBOX_COORDINATOR must be an absolute URL")
}
base.Path = strings.TrimRight(base.Path, "/")
return &CoordinatorClient{
BaseURL: strings.TrimRight(base.String(), "/"),
Token: cfg.CoordToken,
Client: &http.Client{Timeout: 5 * time.Minute},
}, true, nil
}
func (c *CoordinatorClient) CreateLease(ctx context.Context, cfg Config, publicKey string, keep bool) (CoordinatorLease, error) {
var res struct {
Lease CoordinatorLease `json:"lease"`
}
err := c.do(ctx, http.MethodPost, "/v1/leases", map[string]any{
"profile": cfg.Profile,
"class": cfg.Class,
"serverType": cfg.ServerType,
"location": cfg.Location,
"image": cfg.Image,
"sshUser": cfg.SSHUser,
"sshPort": cfg.SSHPort,
"providerKey": cfg.ProviderKey,
"workRoot": cfg.WorkRoot,
"ttlSeconds": int(cfg.TTL.Seconds()),
"keep": keep,
"sshPublicKey": publicKey,
}, &res)
return res.Lease, err
}
func (c *CoordinatorClient) GetLease(ctx context.Context, id string) (CoordinatorLease, error) {
var res struct {
Lease CoordinatorLease `json:"lease"`
}
err := c.do(ctx, http.MethodGet, "/v1/leases/"+url.PathEscape(id), nil, &res)
return res.Lease, err
}
func (c *CoordinatorClient) ReleaseLease(ctx context.Context, id string, deleteServer bool) (CoordinatorLease, error) {
var res struct {
Lease CoordinatorLease `json:"lease"`
}
err := c.do(ctx, http.MethodPost, "/v1/leases/"+url.PathEscape(id)+"/release", map[string]any{"delete": deleteServer}, &res)
return res.Lease, err
}
func (c *CoordinatorClient) Pool(ctx context.Context) ([]CoordinatorMachine, error) {
var res struct {
Machines []CoordinatorMachine `json:"machines"`
}
err := c.do(ctx, http.MethodGet, "/v1/pool", nil, &res)
return res.Machines, err
}
func (c *CoordinatorClient) Health(ctx context.Context) error {
var res map[string]any
return c.do(ctx, http.MethodGet, "/v1/health", nil, &res)
}
func (c *CoordinatorClient) do(ctx context.Context, method, path string, body any, out any) error {
var r *bytes.Reader
if body != nil {
data, err := json.Marshal(body)
if err != nil {
return err
}
r = bytes.NewReader(data)
} else {
r = bytes.NewReader(nil)
}
req, err := http.NewRequestWithContext(ctx, method, c.BaseURL+path, r)
if err != nil {
return err
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
if c.Token != "" {
req.Header.Set("Authorization", "Bearer "+c.Token)
}
resp, err := c.Client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
data, _ := io.ReadAll(io.LimitReader(resp.Body, 600))
msg := strings.TrimSpace(string(data))
if msg != "" {
return fmt.Errorf("coordinator %s %s: http %d: %s", method, path, resp.StatusCode, msg)
}
return fmt.Errorf("coordinator %s %s: http %d", method, path, resp.StatusCode)
}
if out != nil {
if err := json.NewDecoder(resp.Body).Decode(out); err != nil {
return err
}
}
return nil
}
func leaseToServerTarget(lease CoordinatorLease, cfg Config) (Server, SSHTarget, string) {
server := Server{
ID: lease.ServerID,
Name: lease.ServerName,
Status: lease.State,
Labels: map[string]string{
"lease": lease.ID,
"keep": fmt.Sprint(lease.Keep),
},
}
server.PublicNet.IPv4.IP = lease.Host
server.ServerType.Name = lease.ServerType
target := SSHTarget{User: lease.SSHUser, Host: lease.Host, Key: cfg.SSHKey, Port: lease.SSHPort}
return server, target, lease.ID
}

View File

@ -25,6 +25,18 @@ func (a App) doctor(ctx context.Context, args []string) error {
}
cfg := defaultConfig()
if coord, ok, err := newCoordinatorClient(cfg); err != nil {
fmt.Fprintf(a.Stdout, "failed coord %v\n", err)
ok = false
} else if ok {
if err := coord.Health(ctx); err != nil {
fmt.Fprintf(a.Stdout, "failed coord %v\n", err)
ok = false
} else {
fmt.Fprintf(a.Stdout, "ok coord %s\n", cfg.Coordinator)
}
}
if _, err := os.Stat(cfg.SSHKey); err != nil {
fmt.Fprintf(a.Stdout, "missing ssh key %s\n", cfg.SSHKey)
ok = false

View File

@ -15,6 +15,23 @@ func (a App) pool(ctx context.Context, args []string) error {
if err := fs.Parse(args[1:]); err != nil {
return exit(2, "%v", err)
}
cfg := defaultConfig()
if coord, ok, err := newCoordinatorClient(cfg); err != nil {
return err
} else if ok {
machines, err := coord.Pool(ctx)
if err != nil {
return err
}
if *jsonOut {
return json.NewEncoder(a.Stdout).Encode(machines)
}
for _, s := range machines {
fmt.Fprintf(a.Stdout, "%-10d %-28s %-12s %-8s %-15s lease=%s keep=%s\n",
s.ID, s.Name, s.Status, s.ServerType, s.Host, s.Labels["lease"], s.Labels["keep"])
}
return nil
}
client, err := newHetznerClient()
if err != nil {
return err

View File

@ -24,7 +24,18 @@ func (a App) warmup(ctx context.Context, args []string) error {
cfg.ServerType = serverTypeForClass(*class)
cfg.TTL = *ttl
server, target, leaseID, err := a.acquire(ctx, cfg, *keep)
coord, useCoordinator, err := newCoordinatorClient(cfg)
if err != nil {
return err
}
var server Server
var target SSHTarget
var leaseID string
if useCoordinator {
server, target, leaseID, err = a.acquireCoordinator(ctx, cfg, coord, *keep)
} else {
server, target, leaseID, err = a.acquire(ctx, cfg, *keep)
}
if err != nil {
return err
}
@ -64,10 +75,26 @@ func (a App) runCommand(ctx context.Context, args []string) error {
var leaseID string
var err error
acquired := false
coord, useCoordinator, err := newCoordinatorClient(cfg)
if err != nil {
return err
}
if *leaseIDFlag != "" {
server, target, leaseID, err = a.findLease(ctx, cfg, *leaseIDFlag)
if useCoordinator {
var lease CoordinatorLease
lease, err = coord.GetLease(ctx, *leaseIDFlag)
if err == nil {
server, target, leaseID = leaseToServerTarget(lease, cfg)
}
} else {
server, target, leaseID, err = a.findLease(ctx, cfg, *leaseIDFlag)
}
} else {
server, target, leaseID, err = a.acquire(ctx, cfg, *keep)
if useCoordinator {
server, target, leaseID, err = a.acquireCoordinator(ctx, cfg, coord, *keep)
} else {
server, target, leaseID, err = a.acquire(ctx, cfg, *keep)
}
acquired = true
}
if err != nil {
@ -77,7 +104,11 @@ func (a App) runCommand(ctx context.Context, args []string) error {
defer func() {
if !*keep {
fmt.Fprintf(a.Stderr, "releasing %s server=%d\n", leaseID, server.ID)
_ = deleteByID(context.Background(), server.ID)
if useCoordinator {
_, _ = coord.ReleaseLease(context.Background(), leaseID, true)
} else {
_ = deleteByID(context.Background(), server.ID)
}
}
}()
}
@ -112,6 +143,24 @@ func (a App) runCommand(ctx context.Context, args []string) error {
return nil
}
func (a App) acquireCoordinator(ctx context.Context, cfg Config, coord *CoordinatorClient, keep bool) (Server, SSHTarget, string, error) {
publicKey, err := publicKeyFor(cfg.SSHKey)
if err != nil {
return Server{}, SSHTarget{}, "", err
}
fmt.Fprintf(a.Stderr, "coordinator lease class=%s preferred_type=%s keep=%v\n", cfg.Class, cfg.ServerType, keep)
lease, err := coord.CreateLease(ctx, cfg, publicKey, keep)
if err != nil {
return Server{}, SSHTarget{}, "", err
}
server, target, leaseID := leaseToServerTarget(lease, cfg)
fmt.Fprintf(a.Stderr, "leased %s server=%d type=%s ip=%s via coordinator\n", leaseID, server.ID, server.ServerType.Name, target.Host)
if err := waitForSSH(ctx, target, a.Stderr); err != nil {
return Server{}, SSHTarget{}, "", err
}
return server, target, leaseID, nil
}
func (a App) acquire(ctx context.Context, cfg Config, keep bool) (Server, SSHTarget, string, error) {
client, err := newHetznerClient()
if err != nil {
@ -202,6 +251,16 @@ func (a App) stop(ctx context.Context, args []string) error {
return exit(2, "usage: crabbox stop <lease-or-server-id>")
}
cfg := defaultConfig()
if coord, ok, err := newCoordinatorClient(cfg); err != nil {
return err
} else if ok {
lease, err := coord.ReleaseLease(ctx, args[0], true)
if err != nil {
return err
}
fmt.Fprintf(a.Stderr, "released lease=%s server=%d\n", lease.ID, lease.ServerID)
return nil
}
server, _, leaseID, err := a.findLease(ctx, cfg, args[0])
if err != nil {
return err

2794
worker/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

18
worker/package.json Normal file
View File

@ -0,0 +1,18 @@
{
"name": "@openclaw/crabbox-worker",
"version": "0.1.0",
"private": true,
"type": "module",
"scripts": {
"build": "wrangler deploy --dry-run --outdir dist",
"check": "tsc --noEmit",
"deploy": "wrangler deploy",
"test": "vitest run"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20260430.0",
"typescript": "^5.9.3",
"vitest": "^4.0.10",
"wrangler": "^4.50.0"
}
}

54
worker/src/bootstrap.ts Normal file
View File

@ -0,0 +1,54 @@
import type { LeaseConfig } from "./config";
export function cloudInit(config: LeaseConfig): string {
return `#cloud-config
package_update: true
package_upgrade: false
users:
- name: ${config.sshUser}
groups: sudo
shell: /bin/bash
sudo: ['ALL=(ALL) NOPASSWD:ALL']
ssh_authorized_keys:
- ${config.sshPublicKey}
packages:
- openssh-server
- ca-certificates
- curl
- git
- rsync
- build-essential
- docker.io
- jq
write_files:
- path: /etc/ssh/sshd_config.d/99-crabbox-port.conf
permissions: '0644'
content: |
Port 22
Port ${config.sshPort}
PasswordAuthentication no
- path: /usr/local/bin/crabbox-ready
permissions: '0755'
content: |
#!/usr/bin/env bash
set -euo pipefail
node --version
pnpm --version
git --version
rsync --version >/dev/null
docker --version
runcmd:
- mkdir -p ${config.workRoot} /var/cache/crabbox/pnpm /var/cache/crabbox/npm
- chown -R ${config.sshUser}:${config.sshUser} ${config.workRoot} /var/cache/crabbox
- systemctl enable --now ssh
- systemctl restart ssh
- systemctl enable --now docker
- usermod -aG docker ${config.sshUser}
- curl -fsSL https://deb.nodesource.com/setup_22.x | bash -
- apt-get install -y nodejs
- corepack enable
- corepack prepare pnpm@10.33.2 --activate
- sudo -u ${config.sshUser} bash -lc 'pnpm config set store-dir /var/cache/crabbox/pnpm'
- crabbox-ready
`;
}

66
worker/src/config.ts Normal file
View File

@ -0,0 +1,66 @@
import type { LeaseRequest } from "./types";
export interface LeaseConfig {
profile: string;
class: string;
serverType: string;
location: string;
image: string;
sshUser: string;
sshPort: string;
providerKey: string;
workRoot: string;
ttlSeconds: number;
keep: boolean;
sshPublicKey: string;
}
export function leaseConfig(input: LeaseRequest): LeaseConfig {
const machineClass = input.class ?? "beast";
const serverType = input.serverType ?? serverTypeForClass(machineClass);
const ttlSeconds = clampTTL(input.ttlSeconds ?? 5400);
const sshPublicKey = input.sshPublicKey?.trim() ?? "";
if (!sshPublicKey) {
throw new Error("sshPublicKey is required");
}
return {
profile: input.profile ?? "openclaw-check",
class: machineClass,
serverType,
location: input.location ?? "fsn1",
image: input.image ?? "ubuntu-24.04",
sshUser: input.sshUser ?? "crabbox",
sshPort: input.sshPort ?? "2222",
providerKey: input.providerKey ?? "crabbox-steipete",
workRoot: input.workRoot ?? "/work/crabbox",
ttlSeconds,
keep: input.keep ?? false,
sshPublicKey
};
}
export function serverTypeForClass(machineClass: string): string {
return serverTypeCandidatesForClass(machineClass)[0] ?? machineClass;
}
export function serverTypeCandidatesForClass(machineClass: string): string[] {
switch (machineClass) {
case "standard":
return ["ccx33", "cpx62", "cx53"];
case "fast":
return ["ccx43", "cpx62", "cx53"];
case "large":
return ["ccx53", "ccx43", "cpx62", "cx53"];
case "beast":
return ["ccx63", "ccx53", "ccx43", "cpx62", "cx53"];
default:
return [machineClass];
}
}
function clampTTL(ttlSeconds: number): number {
if (!Number.isFinite(ttlSeconds) || ttlSeconds <= 0) {
return 5400;
}
return Math.min(Math.trunc(ttlSeconds), 86_400);
}

179
worker/src/fleet.ts Normal file
View File

@ -0,0 +1,179 @@
import { leaseConfig } from "./config";
import { errorMessage, json, pathParts, readJson, requestOwner } from "./http";
import { HetznerClient } from "./hetzner";
import type { Env, LeaseRecord, LeaseRequest } from "./types";
const fleetID = "default";
export class FleetDurableObject implements DurableObject {
constructor(
private readonly state: DurableObjectState,
private readonly env: Env
) {}
async fetch(request: Request): Promise<Response> {
try {
const parts = pathParts(request);
const method = request.method.toUpperCase();
if (method === "GET" && parts.join("/") === "v1/health") {
return json({ ok: true, fleet: fleetID });
}
if (method === "GET" && parts.join("/") === "v1/pool") {
return await this.pool();
}
if (method === "GET" && parts.join("/") === "v1/leases") {
return await this.listLeases();
}
if (method === "POST" && parts.join("/") === "v1/leases") {
return await this.createLease(request);
}
if (parts[0] === "v1" && parts[1] === "leases" && parts[2]) {
return await this.leaseRoute(request, parts[2], parts[3]);
}
return json({ error: "not_found" }, { status: 404 });
} catch (error) {
return json({ error: errorMessage(error) }, { status: 500 });
}
}
async alarm(): Promise<void> {
await this.expireLeases();
await this.scheduleAlarm();
}
private async createLease(request: Request): Promise<Response> {
const owner = requestOwner(request);
const input = await readJson<LeaseRequest>(request);
const config = leaseConfig(input);
const leaseID = newLeaseID();
const client = new HetznerClient(this.env);
const { server, serverType } = await client.createServerWithFallback(config, leaseID, owner);
const now = new Date();
const record: LeaseRecord = {
id: leaseID,
owner,
profile: config.profile,
class: config.class,
serverType,
serverID: server.id,
serverName: server.name,
host: server.public_net.ipv4.ip,
sshUser: config.sshUser,
sshPort: config.sshPort,
workRoot: config.workRoot,
keep: config.keep,
state: "active",
createdAt: now.toISOString(),
updatedAt: now.toISOString(),
expiresAt: new Date(now.getTime() + config.ttlSeconds * 1000).toISOString()
};
await this.putLease(record);
await this.scheduleAlarm();
return json({ lease: record }, { status: 201 });
}
private async leaseRoute(request: Request, leaseID: string, action?: string): Promise<Response> {
const method = request.method.toUpperCase();
if (method === "GET" && action === undefined) {
const lease = await this.getLease(leaseID);
return lease ? json({ lease }) : json({ error: "not_found" }, { status: 404 });
}
if (method === "POST" && action === "heartbeat") {
const lease = await this.requireLease(leaseID);
lease.updatedAt = new Date().toISOString();
await this.putLease(lease);
return json({ lease });
}
if (method === "POST" && action === "release") {
return this.releaseLease(request, leaseID);
}
return json({ error: "not_found" }, { status: 404 });
}
private async releaseLease(request: Request, leaseID: string): Promise<Response> {
const lease = await this.requireLease(leaseID);
const body = await optionalJson<{ delete?: boolean }>(request);
const shouldDelete = body.delete ?? !lease.keep;
if (shouldDelete && lease.state === "active") {
await new HetznerClient(this.env).deleteServer(lease.serverID);
}
lease.state = "released";
lease.updatedAt = new Date().toISOString();
await this.putLease(lease);
return json({ lease });
}
private async pool(): Promise<Response> {
const client = new HetznerClient(this.env);
const servers = await client.listCrabboxServers();
return json({ machines: servers.map((server) => client.toMachine(server)) });
}
private async listLeases(): Promise<Response> {
const leases = await this.state.storage.list<LeaseRecord>({ prefix: "lease:" });
return json({ leases: [...leases.values()] });
}
private async expireLeases(): Promise<void> {
const leases = await this.state.storage.list<LeaseRecord>({ prefix: "lease:" });
const now = Date.now();
const client = new HetznerClient(this.env);
for (const lease of leases.values()) {
if (lease.state !== "active" || Date.parse(lease.expiresAt) > now) {
continue;
}
if (!lease.keep) {
await client.deleteServer(lease.serverID).catch(() => undefined);
}
lease.state = "expired";
lease.updatedAt = new Date().toISOString();
await this.putLease(lease);
}
}
private async scheduleAlarm(): Promise<void> {
const leases = await this.state.storage.list<LeaseRecord>({ prefix: "lease:" });
const activeExpiries = [...leases.values()]
.filter((lease) => lease.state === "active")
.map((lease) => Date.parse(lease.expiresAt))
.filter((time) => Number.isFinite(time));
if (activeExpiries.length === 0) {
await this.state.storage.deleteAlarm();
return;
}
await this.state.storage.setAlarm(Math.min(...activeExpiries));
}
private async getLease(leaseID: string): Promise<LeaseRecord | undefined> {
return this.state.storage.get<LeaseRecord>(leaseKey(leaseID));
}
private async requireLease(leaseID: string): Promise<LeaseRecord> {
const lease = await this.getLease(leaseID);
if (!lease) {
throw new Error(`lease not found: ${leaseID}`);
}
return lease;
}
private async putLease(lease: LeaseRecord): Promise<void> {
await this.state.storage.put(leaseKey(lease.id), lease);
}
}
function leaseKey(leaseID: string): string {
return `lease:${leaseID}`;
}
function newLeaseID(): string {
const bytes = new Uint8Array(6);
crypto.getRandomValues(bytes);
return `cbx_${[...bytes].map((byte) => byte.toString(16).padStart(2, "0")).join("")}`;
}
async function optionalJson<T>(request: Request): Promise<T> {
if (!request.headers.get("content-type")?.includes("application/json")) {
return {} as T;
}
return readJson<T>(request);
}

215
worker/src/hetzner.ts Normal file
View File

@ -0,0 +1,215 @@
import { cloudInit } from "./bootstrap";
import { serverTypeCandidatesForClass, type LeaseConfig } from "./config";
import type { Env, HetznerSSHKey, HetznerServer, MachineView } from "./types";
interface HetznerListServersResponse {
servers: HetznerServer[];
}
interface HetznerListSSHKeysResponse {
ssh_keys: HetznerSSHKey[];
}
interface HetznerSSHKeyResponse {
ssh_key: HetznerSSHKey;
}
interface HetznerServerResponse {
server: HetznerServer;
}
export class HetznerClient {
private readonly token: string;
constructor(env: Env) {
if (!env.HETZNER_TOKEN) {
throw new Error("HETZNER_TOKEN secret is required");
}
this.token = env.HETZNER_TOKEN;
}
async listCrabboxServers(): Promise<HetznerServer[]> {
const query = new URLSearchParams({
label_selector: "crabbox=true",
per_page: "100"
});
const response = await this.request<HetznerListServersResponse>("GET", `/servers?${query}`);
return response.servers;
}
async ensureSSHKey(name: string, publicKey: string): Promise<HetznerSSHKey> {
const byName = await this.request<HetznerListSSHKeysResponse>(
"GET",
`/ssh_keys?${new URLSearchParams({ name })}`
);
for (const key of byName.ssh_keys) {
if (key.name === name) {
if (key.public_key.trim() !== publicKey.trim()) {
throw new Error(`hetzner ssh key ${name} exists with different public key`);
}
return key;
}
}
const all = await this.request<HetznerListSSHKeysResponse>(
"GET",
`/ssh_keys?${new URLSearchParams({ per_page: "100" })}`
);
for (const key of all.ssh_keys) {
if (key.public_key.trim() === publicKey.trim()) {
return key;
}
}
const created = await this.request<HetznerSSHKeyResponse>("POST", "/ssh_keys", {
name,
public_key: publicKey,
labels: {
crabbox: "true",
created_by: "crabbox"
}
});
return created.ssh_key;
}
async createServerWithFallback(
config: LeaseConfig,
leaseID: string,
owner: string
): Promise<{ server: HetznerServer; serverType: string }> {
const key = await this.ensureSSHKey(config.providerKey, config.sshPublicKey);
const resolvedConfig = { ...config, providerKey: key.name };
const candidates = prependUnique(
resolvedConfig.serverType,
serverTypeCandidatesForClass(resolvedConfig.class)
);
const failures: string[] = [];
for (const serverType of candidates) {
try {
const server = await this.createServer({ ...resolvedConfig, serverType }, leaseID, owner);
return { server, serverType };
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
failures.push(`${serverType}: ${message}`);
if (!isRetryableProvisioningError(message)) {
break;
}
}
}
throw new Error(failures.join("; "));
}
async getServer(id: number): Promise<HetznerServer> {
return (await this.request<HetznerServerResponse>("GET", `/servers/${id}`)).server;
}
async waitForServerIP(id: number): Promise<HetznerServer> {
const deadline = Date.now() + 60_000;
while (Date.now() < deadline) {
const server = await this.getServer(id);
if (server.public_net.ipv4.ip) {
return server;
}
await sleep(2_000);
}
throw new Error(`timed out waiting for server IP: ${id}`);
}
async deleteServer(id: number): Promise<void> {
await this.request<void>("DELETE", `/servers/${id}`);
}
toMachine(server: HetznerServer): MachineView {
return {
id: server.id,
name: server.name,
status: server.status,
serverType: server.server_type.name,
host: server.public_net.ipv4.ip,
labels: server.labels
};
}
private async createServer(
config: LeaseConfig,
leaseID: string,
owner: string
): Promise<HetznerServer> {
const name = `crabbox-${leaseID}`.replaceAll("_", "-");
const labels = {
crabbox: "true",
profile: config.profile,
class: config.class,
server_type: config.serverType,
lease: leaseID,
state: "leased",
keep: String(config.keep),
owner: sanitizeLabel(owner),
created_by: "crabbox"
};
const response = await this.request<HetznerServerResponse>("POST", "/servers", {
name,
server_type: config.serverType,
image: config.image,
location: config.location,
labels,
ssh_keys: [config.providerKey],
user_data: cloudInit(config),
start_after_create: true,
public_net: {
enable_ipv4: true,
enable_ipv6: false
}
});
return response.server.public_net.ipv4.ip
? response.server
: await this.waitForServerIP(response.server.id);
}
private async request<T>(method: string, path: string, body?: unknown): Promise<T> {
const init: RequestInit = {
method,
headers: {
authorization: `Bearer ${this.token}`,
"content-type": "application/json"
}
};
if (body !== undefined) {
init.body = JSON.stringify(body);
}
const response = await fetch(`https://api.hetzner.cloud/v1${path}`, init);
if (!response.ok) {
throw new Error(`hetzner ${method} ${path}: http ${response.status}: ${await safeBody(response)}`);
}
if (response.status === 204) {
return undefined as T;
}
return (await response.json()) as T;
}
}
export function isRetryableProvisioningError(message: string): boolean {
return (
message.includes("dedicated_core_limit") ||
message.includes("resource_limit_exceeded") ||
message.includes("server_type_not_available") ||
message.includes("location_not_available")
);
}
function prependUnique(first: string, rest: string[]): string[] {
return [first, ...rest.filter((value) => value !== first)];
}
function sanitizeLabel(value: string): string {
return value.replaceAll(/[^a-zA-Z0-9_.@-]/g, "_").slice(0, 63) || "unknown";
}
async function safeBody(response: Response): Promise<string> {
const text = await response.text();
return text.length > 500 ? `${text.slice(0, 500)}...` : text;
}
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

42
worker/src/http.ts Normal file
View File

@ -0,0 +1,42 @@
export function json(data: unknown, init: ResponseInit = {}): Response {
const headers = new Headers(init.headers);
headers.set("content-type", "application/json; charset=utf-8");
return new Response(JSON.stringify(data), { ...init, headers });
}
export function text(message: string, status = 200): Response {
return new Response(message, {
status,
headers: { "content-type": "text/plain; charset=utf-8" }
});
}
export async function readJson<T>(request: Request): Promise<T> {
const value = (await request.json()) as unknown;
return value as T;
}
export function bearerToken(request: Request): string {
const header = request.headers.get("authorization") ?? "";
const [scheme, token] = header.split(" ", 2);
if (scheme?.toLowerCase() !== "bearer" || !token) {
return "";
}
return token;
}
export function requestOwner(request: Request): string {
return (
request.headers.get("cf-access-authenticated-user-email") ??
request.headers.get("x-crabbox-owner") ??
"unknown"
);
}
export function pathParts(request: Request): string[] {
return new URL(request.url).pathname.split("/").filter(Boolean);
}
export function errorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}

27
worker/src/index.ts Normal file
View File

@ -0,0 +1,27 @@
import { FleetDurableObject } from "./fleet";
import { bearerToken, json } from "./http";
import type { Env } from "./types";
export { FleetDurableObject };
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
if (request.method === "GET" && url.pathname === "/v1/health") {
return json({ ok: true, service: "crabbox-coordinator" });
}
if (!isAuthorized(request, env)) {
return json({ error: "unauthorized" }, { status: 401 });
}
const id = env.FLEET.idFromName("default");
return env.FLEET.get(id).fetch(request);
}
};
export function isAuthorized(request: Request, env: Pick<Env, "CRABBOX_SHARED_TOKEN">): boolean {
const expected = env.CRABBOX_SHARED_TOKEN;
if (!expected) {
return true;
}
return bearerToken(request) === expected;
}

70
worker/src/types.ts Normal file
View File

@ -0,0 +1,70 @@
export interface Env {
FLEET: DurableObjectNamespace;
HETZNER_TOKEN: string;
CRABBOX_SHARED_TOKEN?: string;
}
export interface LeaseRequest {
profile?: string;
class?: string;
serverType?: string;
location?: string;
image?: string;
sshUser?: string;
sshPort?: string;
providerKey?: string;
workRoot?: string;
ttlSeconds?: number;
keep?: boolean;
sshPublicKey?: string;
}
export interface LeaseRecord {
id: string;
owner: string;
profile: string;
class: string;
serverType: string;
serverID: number;
serverName: string;
host: string;
sshUser: string;
sshPort: string;
workRoot: string;
keep: boolean;
state: "active" | "released" | "expired" | "failed";
createdAt: string;
updatedAt: string;
expiresAt: string;
}
export interface HetznerServer {
id: number;
name: string;
status: string;
labels: Record<string, string>;
public_net: {
ipv4: {
ip: string;
};
};
server_type: {
name: string;
};
}
export interface HetznerSSHKey {
id: number;
name: string;
fingerprint: string;
public_key: string;
}
export interface MachineView {
id: number;
name: string;
status: string;
serverType: string;
host: string;
labels: Record<string, string>;
}

View File

@ -0,0 +1,32 @@
import { describe, expect, it } from "vitest";
import { leaseConfig, serverTypeCandidatesForClass, serverTypeForClass } from "../src/config";
describe("machine class config", () => {
it("maps known classes to preferred Hetzner candidates", () => {
expect(serverTypeForClass("beast")).toBe("ccx63");
expect(serverTypeCandidatesForClass("beast")).toEqual([
"ccx63",
"ccx53",
"ccx43",
"cpx62",
"cx53"
]);
});
it("treats an unknown class as an explicit server type", () => {
expect(serverTypeCandidatesForClass("cpx62")).toEqual(["cpx62"]);
});
});
describe("lease config", () => {
it("requires an ssh public key", () => {
expect(() => leaseConfig({})).toThrow("sshPublicKey is required");
});
it("uses strict defaults and clamps ttl", () => {
const config = leaseConfig({ sshPublicKey: "ssh-ed25519 test", ttlSeconds: 999_999 });
expect(config.profile).toBe("openclaw-check");
expect(config.sshPort).toBe("2222");
expect(config.ttlSeconds).toBe(86_400);
});
});

18
worker/test/http.test.ts Normal file
View File

@ -0,0 +1,18 @@
import { describe, expect, it } from "vitest";
import { isAuthorized } from "../src";
describe("coordinator auth", () => {
it("allows requests when no shared token is configured", () => {
const request = new Request("https://example.test/v1/pool");
expect(isAuthorized(request, {})).toBe(true);
});
it("requires the configured bearer token", () => {
const denied = new Request("https://example.test/v1/pool");
const allowed = new Request("https://example.test/v1/pool", {
headers: { authorization: "Bearer secret" }
});
expect(isAuthorized(denied, { CRABBOX_SHARED_TOKEN: "secret" })).toBe(false);
expect(isAuthorized(allowed, { CRABBOX_SHARED_TOKEN: "secret" })).toBe(true);
});
});

19
worker/tsconfig.json Normal file
View File

@ -0,0 +1,19 @@
{
"compilerOptions": {
"target": "ES2023",
"module": "ESNext",
"moduleResolution": "Bundler",
"lib": ["ES2023"],
"types": ["@cloudflare/workers-types"],
"strict": true,
"exactOptionalPropertyTypes": true,
"noFallthroughCasesInSwitch": true,
"noImplicitOverride": true,
"noImplicitReturns": true,
"noPropertyAccessFromIndexSignature": true,
"noUncheckedIndexedAccess": true,
"useUnknownInCatchVariables": true,
"skipLibCheck": false
},
"include": ["src/**/*.ts"]
}

8
worker/vitest.config.ts Normal file
View File

@ -0,0 +1,8 @@
import { defineConfig } from "vitest/config";
export default defineConfig({
test: {
environment: "node",
globals: false
}
});

28
worker/wrangler.jsonc Normal file
View File

@ -0,0 +1,28 @@
{
"$schema": "./node_modules/wrangler/config-schema.json",
"name": "crabbox-coordinator",
"main": "src/index.ts",
"compatibility_date": "2026-04-30",
"workers_dev": true,
"preview_urls": false,
"routes": [
{
"pattern": "crabbox.clawd.bot/*",
"zone_name": "clawd.bot"
}
],
"durable_objects": {
"bindings": [
{
"name": "FLEET",
"class_name": "FleetDurableObject"
}
]
},
"migrations": [
{
"tag": "v1",
"new_sqlite_classes": ["FleetDurableObject"]
}
]
}