Compare commits

...

5 Commits

Author SHA1 Message Date
Vincent Koc
b3e8c998c3
fix: satisfy worker control lint 2026-05-09 07:43:58 +08:00
Vincent Koc
218bfdeac8
docs: document adaptive control transport 2026-05-09 07:42:03 +08:00
Vincent Koc
b2eee1521b
feat: use control websocket for attach 2026-05-09 07:39:08 +08:00
Vincent Koc
a8a8a95243
feat: add coordinator control websocket 2026-05-09 07:32:24 +08:00
Vincent Koc
f399bf6d27
fix: reduce ssh run transport chatter 2026-05-07 20:03:08 -07:00
15 changed files with 910 additions and 53 deletions

View File

@ -4,6 +4,7 @@
### Added
- Added an authenticated coordinator control WebSocket for low-latency run attach streams and lease heartbeats, with HTTP polling/heartbeat fallback for older brokers.
- Added mediated egress commands and browser wiring so Linux desktop leases can proxy selected app traffic through the operator machine via the coordinator bridge.
- Added WebVNC portal clipboard controls for sending local clipboard text into the remote session and copying remote clipboard text back to the local browser.
- Added rescue-first desktop/WebVNC failure output that names the failing layer and prints exact `rescue:` or native VNC fallback commands when bridges, viewers, browser launches, VNC targets, or input stacks hang.
@ -11,6 +12,7 @@
### Fixed
- Fixed `crabbox run` transport chatter by keeping SSH multiplexers alive longer, retrying fallback SSH ports for streaming commands, and batching stdout/stderr preview events into larger coordinator chunks.
- Fixed `egress start --coordinator` so live public-route egress starts work when the local default coordinator is Cloudflare Access-protected.
- Fixed Tailscale exit-node bootstrap paths to prefer tailnet metadata and fail clearly when remote exit-node egress is not active.
- Fixed `run --no-sync` timing summaries so they report `sync_skipped=true`.

View File

@ -10,8 +10,10 @@ crabbox attach run_abcdef123456 --poll 500ms
## Behavior
`attach` polls the coordinator for new run events on a fixed interval,
prints them as they arrive, and exits when the run finishes.
`attach` follows coordinator run events, prints them as they arrive, and exits
when the run finishes. Newer brokers stream events over the authenticated
coordinator control WebSocket; older brokers or dropped sockets fall back to
the HTTPS events API from the last printed sequence.
- stdout and stderr preview events are written back to stdout and stderr,
preserving the stream split;
@ -20,8 +22,8 @@ prints them as they arrive, and exits when the run finishes.
message;
- when the run has already finished, attach prints any remaining events
and exits;
- when the run is still active, attach polls until it sees a `finish`
event.
- when the run is still active, attach waits for streamed events or polls until
it sees the run finish.
`attach` is not detached command execution. It follows the events the
original CLI is emitting; if that CLI process dies, the run state remains
@ -40,7 +42,7 @@ after completion.
```text
--id <run-id> run id (also accepted as a positional argument)
--after <seq> resume after this event sequence number
--poll <duration> polling interval, default 1s
--poll <duration> fallback poll interval and WebSocket idle check, default 1s
```
## Use Cases

View File

@ -36,11 +36,17 @@ Cloud machines are vanilla Ubuntu runners that hold no broker secrets. They are
The CLI talks to the broker over HTTPS, then talks **directly** to the leased runner over SSH and rsync. The runner never calls the broker; that path stays one-way.
For long-lived coordinator interactions, newer CLIs also open one authenticated
WebSocket to the Fleet Durable Object at `/v1/control`. That socket carries
run-event attach streams and lease heartbeats so high-latency links do less
request polling. HTTPS endpoints remain canonical storage and compatibility
fallbacks, so older CLIs and older brokers still work.
## Ownership
| Layer | Owns |
|:------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **CLI** | config + flags; per-lease SSH key; SSH readiness; Git seeding + rsync; sync fingerprints + sanity checks; remote command + streaming; heartbeats; release |
| **CLI** | config + flags; per-lease SSH key; SSH readiness; Git seeding + rsync; sync fingerprints + sanity checks; remote command + streaming; control WebSocket when available; HTTP fallback; release |
| **Broker** | request auth + identity; serialized lease state; provider credentials; machine create/delete; lease expiry; pool/status/inspect; usage; spend caps |
| **Provider** | raw compute: Hetzner Cloud servers or AWS EC2 instances |
| **Runner** | nothing durable for brokered boxes: Linux prepared by cloud-init with SSH, Git, rsync, curl, jq, `/work/crabbox`; AWS Windows/WSL2/macOS targets have provider-specific bootstrap; static targets are existing SSH hosts; project runtimes come from repo-owned setup |

View File

@ -9,6 +9,19 @@ Read when:
Crabbox performance comes from avoiding repeated setup, keeping the sync small, choosing available capacity, and reusing project-defined hydration when it matters.
## High-Latency Links
Crabbox should not require a special slow-network mode. The CLI keeps SSH as
the universal command transport, but uses SSH ControlMaster with a longer
persist window so repeated probes, sync helpers, and commands avoid paying a
new handshake every time. Streaming commands retry coordinator-provided
fallback ports just like readiness and helper probes.
When the broker supports it, `crabbox attach` and lease heartbeats use one
authenticated coordinator WebSocket instead of repeated HTTP polls. If the
socket cannot connect or drops, the CLI resumes through the existing HTTPS API
from the last acknowledged run-event sequence.
## Warm Leases
Use `warmup` for repeated agent loops:

223
internal/cli/control_ws.go Normal file
View File

@ -0,0 +1,223 @@
package cli
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"nhooyr.io/websocket"
)
const coordinatorControlDialTimeout = 1500 * time.Millisecond
type coordinatorControlConn struct {
conn *websocket.Conn
}
type coordinatorControlMessage struct {
Type string `json:"type"`
Protocol int `json:"protocol,omitempty"`
ClientID string `json:"clientID,omitempty"`
RunID string `json:"runID,omitempty"`
Events []CoordinatorRunEvent `json:"events,omitempty"`
NextSeq int `json:"nextSeq,omitempty"`
LeaseID string `json:"leaseID,omitempty"`
OK bool `json:"ok,omitempty"`
ExpiresAt string `json:"expiresAt,omitempty"`
Code string `json:"code,omitempty"`
Message string `json:"message,omitempty"`
Error string `json:"error,omitempty"`
IdleTimeoutSeconds int `json:"idleTimeoutSeconds,omitempty"`
Telemetry *LeaseTelemetry `json:"telemetry,omitempty"`
}
func dialCoordinatorControl(ctx context.Context, coord *CoordinatorClient) (*coordinatorControlConn, error) {
endpoint, err := coordinatorControlURL(coord.BaseURL)
if err != nil {
return nil, err
}
headers := http.Header{}
coord.addRequestHeaders(headers)
opts := &websocket.DialOptions{
HTTPHeader: headers,
}
if coord.Client != nil {
opts.HTTPClient = coord.Client
}
conn, resp, err := websocket.Dial(ctx, endpoint, opts)
if err != nil {
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
return nil, err
}
return &coordinatorControlConn{conn: conn}, nil
}
func coordinatorControlURL(baseURL string) (string, error) {
base, err := url.Parse(baseURL)
if err != nil {
return "", err
}
switch base.Scheme {
case "http":
base.Scheme = "ws"
case "https":
base.Scheme = "wss"
case "ws", "wss":
default:
return "", fmt.Errorf("unsupported coordinator scheme %q", base.Scheme)
}
base.Path = strings.TrimRight(base.Path, "/") + "/v1/control"
base.RawQuery = ""
base.Fragment = ""
return base.String(), nil
}
func (c *coordinatorControlConn) close() {
if c == nil || c.conn == nil {
return
}
_ = c.conn.Close(websocket.StatusNormalClosure, "")
}
func (c *coordinatorControlConn) write(ctx context.Context, payload any) error {
data, err := json.Marshal(payload)
if err != nil {
return err
}
return c.conn.Write(ctx, websocket.MessageText, data)
}
func (c *coordinatorControlConn) read(ctx context.Context) (coordinatorControlMessage, error) {
typ, data, err := c.conn.Read(ctx)
if err != nil {
return coordinatorControlMessage{}, err
}
if typ != websocket.MessageText {
return coordinatorControlMessage{}, fmt.Errorf("control websocket sent non-text frame")
}
var msg coordinatorControlMessage
if err := json.Unmarshal(data, &msg); err != nil {
return coordinatorControlMessage{}, err
}
return msg, nil
}
func followRunControlWebSocket(ctx context.Context, coord *CoordinatorClient, runID string, after int, poll time.Duration, stdout, stderr io.Writer) (int, bool, bool, error) {
dialCtx, cancel := context.WithTimeout(ctx, coordinatorControlDialTimeout)
control, err := dialCoordinatorControl(dialCtx, coord)
cancel()
if err != nil {
if ctx.Err() != nil {
return after, false, false, ctx.Err()
}
return after, false, false, nil
}
defer control.close()
nextAfter := after
writeCtx, writeCancel := context.WithTimeout(ctx, 5*time.Second)
err = control.write(writeCtx, map[string]any{
"type": "subscribe_run",
"runID": runID,
"after": nextAfter,
"limit": 100,
})
writeCancel()
if err != nil {
return nextAfter, false, true, nil
}
for {
readCtx, readCancel := context.WithTimeout(ctx, poll)
msg, err := control.read(readCtx)
readCancel()
if err != nil {
if ctx.Err() != nil {
return nextAfter, false, true, ctx.Err()
}
if errors.Is(err, context.DeadlineExceeded) {
done, err := coordinatorRunDone(ctx, coord, runID)
if err != nil {
return nextAfter, false, true, err
}
if done {
return nextAfter, true, true, nil
}
continue
}
done, doneErr := coordinatorRunDone(ctx, coord, runID)
if doneErr != nil {
return nextAfter, false, true, doneErr
}
if done {
return nextAfter, true, true, nil
}
return nextAfter, false, true, nil
}
switch msg.Type {
case "hello", "pong", "heartbeat":
continue
case "error":
return nextAfter, false, true, nil
case "run_events":
for _, event := range msg.Events {
if event.Seq <= nextAfter {
continue
}
nextAfter = event.Seq
printAttachEvent(stdout, stderr, event)
}
ackCtx, ackCancel := context.WithTimeout(ctx, 2*time.Second)
_ = control.write(ackCtx, map[string]any{
"type": "ack",
"runID": runID,
"seq": nextAfter,
})
ackCancel()
}
}
}
func coordinatorRunDone(ctx context.Context, coord *CoordinatorClient, runID string) (bool, error) {
run, err := coord.Run(ctx, runID)
if err != nil {
return false, err
}
return run.State != "running", nil
}
func (c *coordinatorControlConn) heartbeat(ctx context.Context, leaseID string, idleTimeout *time.Duration, telemetry *LeaseTelemetry) error {
payload := coordinatorControlMessage{
Type: "heartbeat",
LeaseID: leaseID,
Telemetry: telemetry,
}
if idleTimeout != nil && *idleTimeout > 0 {
payload.IdleTimeoutSeconds = int(idleTimeout.Seconds())
}
if err := c.write(ctx, payload); err != nil {
return err
}
for {
msg, err := c.read(ctx)
if err != nil {
return err
}
if msg.Type != "heartbeat" {
continue
}
if !msg.OK {
if msg.Error != "" {
return fmt.Errorf("control heartbeat failed: %s", msg.Error)
}
return fmt.Errorf("control heartbeat failed")
}
return nil
}
}

View File

@ -897,16 +897,7 @@ func (c *CoordinatorClient) doHTTP(ctx context.Context, method, path string, dat
if hasBody {
req.Header.Set("Content-Type", "application/json")
}
if c.Token != "" {
req.Header.Set("Authorization", "Bearer "+c.Token)
}
c.addAccessHeaders(req.Header)
if owner := localCoordinatorOwner(); owner != "" {
req.Header.Set("X-Crabbox-Owner", owner)
}
if org := os.Getenv("CRABBOX_ORG"); org != "" {
req.Header.Set("X-Crabbox-Org", org)
}
c.addRequestHeaders(req.Header)
resp, err := c.Client.Do(req)
if err != nil {
return err
@ -915,6 +906,19 @@ func (c *CoordinatorClient) doHTTP(ctx context.Context, method, path string, dat
return decodeCoordinatorResponse(method, path, resp.StatusCode, resp.Body, out)
}
func (c *CoordinatorClient) addRequestHeaders(headers http.Header) {
if c.Token != "" {
headers.Set("Authorization", "Bearer "+c.Token)
}
c.addAccessHeaders(headers)
if owner := localCoordinatorOwner(); owner != "" {
headers.Set("X-Crabbox-Owner", owner)
}
if org := os.Getenv("CRABBOX_ORG"); org != "" {
headers.Set("X-Crabbox-Org", org)
}
}
func (c *CoordinatorClient) doCurl(ctx context.Context, method, path string, data []byte, hasBody bool, out any) error {
config, cleanup, err := c.curlConfig(method, path, data, hasBody)
if err != nil {

View File

@ -13,6 +13,8 @@ import (
"strings"
"testing"
"time"
"nhooyr.io/websocket"
)
func TestCoordinatorMachineIDAcceptsStringOrNumber(t *testing.T) {
@ -295,6 +297,10 @@ func TestCoordinatorAppendRunTelemetry(t *testing.T) {
func TestCoordinatorHeartbeatTouchesImmediately(t *testing.T) {
touches := make(chan struct{}, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/v1/control" {
http.NotFound(w, r)
return
}
if r.URL.Path != "/v1/leases/cbx_123/heartbeat" {
t.Fatalf("unexpected path %s", r.URL.Path)
}
@ -318,6 +324,10 @@ func TestCoordinatorHeartbeatTouchesImmediately(t *testing.T) {
func TestCoordinatorHeartbeatIncludesTelemetry(t *testing.T) {
bodies := make(chan string, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/v1/control" {
http.NotFound(w, r)
return
}
if r.URL.Path != "/v1/leases/cbx_123/heartbeat" {
t.Fatalf("unexpected path %s", r.URL.Path)
}
@ -346,6 +356,59 @@ func TestCoordinatorHeartbeatIncludesTelemetry(t *testing.T) {
}
}
func TestCoordinatorHeartbeatUsesControlWebSocket(t *testing.T) {
bodies := make(chan string, 1)
httpHeartbeats := make(chan struct{}, 1)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/v1/control":
conn, err := websocket.Accept(w, r, nil)
if err != nil {
t.Errorf("accept control websocket: %v", err)
return
}
defer conn.Close(websocket.StatusNormalClosure, "")
_, data, err := conn.Read(r.Context())
if err != nil {
t.Errorf("read control heartbeat: %v", err)
return
}
bodies <- string(data)
_ = conn.Write(r.Context(), websocket.MessageText, []byte(`{"type":"heartbeat","leaseID":"cbx_123","ok":true,"expiresAt":"2026-05-01T00:30:00Z"}`))
<-r.Context().Done()
case r.Method == http.MethodPost && r.URL.Path == "/v1/leases/cbx_123/heartbeat":
httpHeartbeats <- struct{}{}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"lease":{"id":"cbx_123","provider":"aws","state":"active","expiresAt":"2026-05-01T00:30:00Z"}}`))
default:
t.Fatalf("unexpected path %s", r.URL.Path)
}
}))
defer server.Close()
load := 0.77
collector := func(context.Context) (*LeaseTelemetry, error) {
return &LeaseTelemetry{Load1: &load}, nil
}
client := CoordinatorClient{BaseURL: server.URL, Client: server.Client()}
stop := startCoordinatorHeartbeat(context.Background(), &client, "cbx_123", 30*time.Minute, nil, collector, io.Discard)
defer stop()
select {
case body := <-bodies:
if !strings.Contains(body, `"type":"heartbeat"`) || !strings.Contains(body, `"load1":0.77`) {
t.Fatalf("control heartbeat body=%s", body)
}
case <-time.After(2 * time.Second):
t.Fatal("heartbeat did not use control websocket")
}
select {
case <-httpHeartbeats:
t.Fatal("heartbeat fell back to HTTP despite websocket success")
default:
}
}
func TestCoordinatorLeaseWatchCancelsWhenLeaseReleased(t *testing.T) {
oldInterval := coordinatorLeaseWatchInterval
coordinatorLeaseWatchInterval = 10 * time.Millisecond

View File

@ -152,6 +152,13 @@ func (a App) attach(ctx context.Context, args []string) error {
return err
}
nextAfter := *after
if wsAfter, done, used, err := followRunControlWebSocket(ctx, coord, *runID, nextAfter, *poll, a.Stdout, a.Stderr); err != nil {
return err
} else if done {
return nil
} else if used {
nextAfter = wsAfter
}
for {
events, err := coord.RunEvents(ctx, *runID, nextAfter, 100)
if err != nil {

View File

@ -7,6 +7,8 @@ import (
"net/http"
"net/http/httptest"
"testing"
"nhooyr.io/websocket"
)
func TestEventsCommandPassesPagination(t *testing.T) {
@ -39,6 +41,8 @@ func TestAttachCommandReplaysOutputAndStopsWhenRunFinished(t *testing.T) {
eventCalls := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/v1/control":
http.NotFound(w, r)
case r.Method == http.MethodGet && r.URL.Path == "/v1/runs/run_123/events":
eventCalls++
if eventCalls == 1 {
@ -80,3 +84,62 @@ func TestAttachCommandReplaysOutputAndStopsWhenRunFinished(t *testing.T) {
t.Fatalf("eventCalls=%d, want 2", eventCalls)
}
}
func TestAttachCommandStreamsOverControlWebSocket(t *testing.T) {
controlCalls := 0
eventCalls := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case r.Method == http.MethodGet && r.URL.Path == "/v1/control":
controlCalls++
conn, err := websocket.Accept(w, r, nil)
if err != nil {
t.Errorf("accept control websocket: %v", err)
return
}
defer conn.Close(websocket.StatusNormalClosure, "")
for {
_, data, err := conn.Read(r.Context())
if err != nil {
return
}
var msg map[string]any
if err := json.Unmarshal(data, &msg); err != nil {
t.Errorf("control message JSON: %v", err)
return
}
if msg["type"] == "subscribe_run" {
if msg["runID"] != "run_123" {
t.Errorf("runID=%v", msg["runID"])
}
_ = conn.Write(r.Context(), websocket.MessageText, []byte(`{"type":"run_events","runID":"run_123","events":[{"runID":"run_123","seq":1,"type":"stdout","stream":"stdout","data":"hello ws\n","createdAt":"2026-05-02T00:00:00Z"}],"nextSeq":1}`))
}
}
case r.Method == http.MethodGet && r.URL.Path == "/v1/runs/run_123":
_, _ = w.Write([]byte(`{"run":{"id":"run_123","leaseID":"cbx_123","owner":"peter@example.com","org":"openclaw","provider":"aws","class":"standard","serverType":"t3.small","command":["true"],"state":"succeeded","phase":"finished","logBytes":0,"logTruncated":false,"startedAt":"2026-05-02T00:00:00Z"}}`))
case r.Method == http.MethodGet && r.URL.Path == "/v1/runs/run_123/events":
eventCalls++
_, _ = w.Write([]byte(`{"events":[]}`))
default:
t.Fatalf("unexpected request %s %s", r.Method, r.URL.String())
}
}))
defer server.Close()
t.Setenv("CRABBOX_COORDINATOR", server.URL)
t.Setenv("CRABBOX_COORDINATOR_TOKEN", "")
var stdout, stderr bytes.Buffer
app := App{Stdout: &stdout, Stderr: &stderr}
if err := app.attach(context.Background(), []string{"run_123", "--poll", "1ms"}); err != nil {
t.Fatal(err)
}
if stdout.String() != "hello ws\n" {
t.Fatalf("stdout=%q", stdout.String())
}
if controlCalls != 1 {
t.Fatalf("controlCalls=%d, want 1", controlCalls)
}
if eventCalls != 0 {
t.Fatalf("HTTP eventCalls=%d, want websocket attach to avoid polling events", eventCalls)
}
}

View File

@ -852,16 +852,42 @@ func startCoordinatorHeartbeat(ctx context.Context, coord *CoordinatorClient, le
done := make(chan struct{})
go func() {
defer close(done)
var control *coordinatorControlConn
defer func() {
if control != nil {
control.close()
}
}()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
telemetry := collectLeaseTelemetryBestEffort(rootCtx, telemetryCollector)
callCtx, heartbeatCancel := context.WithTimeout(rootCtx, 20*time.Second)
var err error
var idleTimeoutOverride *time.Duration
if updateIdleTimeout != nil {
_, err = coord.UpdateLeaseIdleTimeoutWithTelemetry(callCtx, leaseID, *updateIdleTimeout, telemetry)
idleTimeoutOverride = updateIdleTimeout
}
if control == nil {
dialCtx, dialCancel := context.WithTimeout(callCtx, coordinatorControlDialTimeout)
control, _ = dialCoordinatorControl(dialCtx, coord)
dialCancel()
}
if control != nil {
err = control.heartbeat(callCtx, leaseID, idleTimeoutOverride, telemetry)
if err != nil {
control.close()
control = nil
}
}
if control == nil {
if updateIdleTimeout != nil {
_, err = coord.UpdateLeaseIdleTimeoutWithTelemetry(callCtx, leaseID, *updateIdleTimeout, telemetry)
} else {
_, err = coord.TouchLeaseWithTelemetry(callCtx, leaseID, telemetry)
}
} else {
_, err = coord.TouchLeaseWithTelemetry(callCtx, leaseID, telemetry)
err = nil
}
heartbeatCancel()
if err != nil && rootCtx.Err() == nil {

View File

@ -9,7 +9,7 @@ import (
)
const (
runEventOutputChunkBytes = 4096
runEventOutputChunkBytes = 16 * 1024
runEventOutputMaxBytes = 64 * 1024
runEventOutputQueueSize = 32
runEventOutputPostWait = 2 * time.Second

View File

@ -329,20 +329,23 @@ func runSSHInput(ctx context.Context, target SSHTarget, remote string, input io.
func runSSHStream(ctx context.Context, target SSHTarget, remote string, stdout, stderr io.Writer) int {
remote = wrapRemoteForTarget(target, remote)
cmd := exec.CommandContext(ctx, "ssh", sshArgs(target, remote)...)
cmd.Stdout = stdout
cmd.Stderr = stderr
err := cmd.Run()
if err == nil {
return 0
}
var exitErr *exec.ExitError
if ok := asExitError(err, &exitErr); ok {
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
return status.ExitStatus()
lastCode := 7
for _, port := range sshPortCandidates(target.Port, target.FallbackPorts) {
probe := target
probe.Port = port
cmd := exec.CommandContext(ctx, "ssh", sshArgs(probe, remote)...)
cmd.Stdout = stdout
cmd.Stderr = stderr
err := cmd.Run()
if err == nil {
return 0
}
lastCode = exitCode(err)
if !shouldRetrySSHPort(err) {
return lastCode
}
}
return 7
return lastCode
}
func sshArgs(target SSHTarget, remote string) []string {
@ -380,7 +383,7 @@ func sshBaseArgsWithOptions(target SSHTarget, connectTimeout, connectionAttempts
} else {
args = append(args,
"-o", "ControlMaster=auto",
"-o", "ControlPersist=60s",
"-o", "ControlPersist=10m",
"-o", "ControlPath="+sshControlPath(target),
)
}

View File

@ -223,7 +223,7 @@ func TestSSHArgsIncludeReliabilityOptions(t *testing.T) {
"ServerAliveInterval=15",
"ServerAliveCountMax=2",
"ControlMaster=auto",
"ControlPersist=60s",
"ControlPersist=10m",
"ControlPath=",
"crabbox-ssh-",
"-%C",
@ -279,6 +279,54 @@ func TestShouldRetrySSHPortOnlyForTransportExit(t *testing.T) {
}
}
func TestRunSSHStreamRetriesFallbackPorts(t *testing.T) {
dir := t.TempDir()
sshPath := filepath.Join(dir, "ssh")
portsPath := filepath.Join(dir, "ports")
script := `#!/bin/sh
port=""
while [ "$#" -gt 0 ]; do
if [ "$1" = "-p" ]; then
shift
port="$1"
fi
shift
done
printf '%s\n' "$port" >> "$CRABBOX_FAKE_SSH_PORTS"
if [ "$port" = "2222" ]; then
exit 255
fi
printf 'ok\n'
exit 0
`
if err := os.WriteFile(sshPath, []byte(script), 0o755); err != nil {
t.Fatal(err)
}
t.Setenv("PATH", dir+string(os.PathListSeparator)+os.Getenv("PATH"))
t.Setenv("CRABBOX_FAKE_SSH_PORTS", portsPath)
var stdout, stderr bytes.Buffer
code := runSSHStream(context.Background(), SSHTarget{
User: "crabbox",
Host: "203.0.113.10",
Port: "2222",
FallbackPorts: []string{"22"},
}, "true", &stdout, &stderr)
if code != 0 {
t.Fatalf("runSSHStream exit=%d stderr=%q", code, stderr.String())
}
if stdout.String() != "ok\n" {
t.Fatalf("stdout=%q want ok", stdout.String())
}
ports, err := os.ReadFile(portsPath)
if err != nil {
t.Fatal(err)
}
if string(ports) != "2222\n22\n" {
t.Fatalf("ports=%q want fallback sequence", string(ports))
}
}
func TestSSHCommandLineRedactsSecretAuthUser(t *testing.T) {
target := SSHTarget{
User: "tok_live_secret",

View File

@ -192,7 +192,26 @@ type BridgeAttachment =
| { kind: "code-agent"; leaseID: string }
| { kind: "code-viewer"; leaseID: string; id: string }
| { kind: "egress-host"; leaseID: string; sessionID: string }
| { kind: "egress-client"; leaseID: string; sessionID: string };
| { kind: "egress-client"; leaseID: string; sessionID: string }
| {
kind: "control";
clientID: string;
owner: string;
org: string;
admin?: boolean;
subscriptions?: Record<string, number>;
};
type ControlMessage =
| { type: "subscribe_run"; runID?: string; after?: number; limit?: number }
| { type: "ack"; runID?: string; seq?: number }
| {
type: "heartbeat";
leaseID?: string;
idleTimeoutSeconds?: number;
telemetry?: Partial<LeaseTelemetry>;
}
| { type: "ping" };
interface WebVNCEvent {
at: string;
@ -212,6 +231,7 @@ export class FleetDurableObject implements DurableObject {
private readonly egressHosts = new Map<string, WebSocket>();
private readonly egressClients = new Map<string, WebSocket>();
private readonly egressSessions = new Map<string, EgressSessionStatus>();
private readonly controlSockets = new Map<string, WebSocket>();
constructor(
private readonly state: DurableObjectState,
@ -253,6 +273,9 @@ export class FleetDurableObject implements DurableObject {
if (method === "GET" && parts.join("/") === "v1/whoami") {
return this.whoami(request);
}
if (method === "GET" && parts.join("/") === "v1/control") {
return await this.controlSocket(request);
}
if (method === "GET" && parts.join("/") === "v1/admin/leases") {
return await this.adminLeases(request);
}
@ -454,6 +477,9 @@ export class FleetDurableObject implements DurableObject {
this.egressClients.set(egressSocketKey(attachment.leaseID, attachment.sessionID), socket);
this.trackEgressSession(attachment);
break;
case "control":
this.controlSockets.set(attachment.clientID, socket);
break;
}
}
@ -501,6 +527,142 @@ export class FleetDurableObject implements DurableObject {
this.egressSessions.set(leaseID, sessionStatus);
}
private async controlSocket(request: Request): Promise<Response> {
if (request.headers.get("upgrade")?.toLowerCase() !== "websocket") {
return json({ error: "websocket_required" }, { status: 426 });
}
const pair = new WebSocketPair();
const client = pair[0];
const server = pair[1];
const attachment: BridgeAttachment = {
kind: "control",
clientID: crypto.randomUUID(),
owner: requestOwner(request),
org: requestOrg(request, this.env),
admin: isAdminRequest(request),
subscriptions: {},
};
this.controlSockets.set(attachment.clientID, server);
this.acceptBridgeWebSocket(server, attachment);
sendControl(server, {
type: "hello",
protocol: 1,
clientID: attachment.clientID,
});
return new Response(null, { status: 101, webSocket: client });
}
private async handleControlMessage(
socket: WebSocket,
attachment: Extract<BridgeAttachment, { kind: "control" }>,
message: string | ArrayBuffer | Blob,
): Promise<void> {
if (typeof message !== "string") {
sendControl(socket, {
type: "error",
code: "invalid_message",
message: "expected JSON text",
});
return;
}
let input: ControlMessage;
try {
input = JSON.parse(message) as ControlMessage;
} catch {
sendControl(socket, { type: "error", code: "invalid_json", message: "invalid JSON" });
return;
}
switch (input.type) {
case "subscribe_run":
await this.subscribeControlRun(socket, attachment, input);
return;
case "ack":
this.ackControlRun(socket, attachment, input);
return;
case "heartbeat":
await this.controlHeartbeat(socket, attachment, input);
return;
case "ping":
sendControl(socket, { type: "pong" });
return;
default:
sendControl(socket, {
type: "error",
code: "unknown_type",
message: "unknown control message",
});
}
}
private async subscribeControlRun(
socket: WebSocket,
attachment: Extract<BridgeAttachment, { kind: "control" }>,
input: Extract<ControlMessage, { type: "subscribe_run" }>,
): Promise<void> {
const runID = typeof input.runID === "string" ? input.runID : "";
const run = runID ? await this.getRun(runID) : undefined;
if (!run || !this.runVisibleToControl(run, attachment)) {
sendControl(socket, { type: "error", code: "not_found", message: "run not found" });
return;
}
const after = finiteControlNumber(input.after) ?? 0;
const limit = Math.min(finiteControlNumber(input.limit) ?? 100, 500);
const events = await this.runEvents(runID, after, limit);
const nextSeq = events.at(-1)?.seq ?? after;
attachment.subscriptions = { ...attachment.subscriptions, [runID]: nextSeq };
this.serializeBridgeAttachment(socket, attachment);
sendControl(socket, { type: "run_events", runID, events, nextSeq });
}
private ackControlRun(
socket: WebSocket,
attachment: Extract<BridgeAttachment, { kind: "control" }>,
input: Extract<ControlMessage, { type: "ack" }>,
): void {
if (typeof input.runID !== "string") {
return;
}
const seq = finiteControlNumber(input.seq);
if (seq === undefined) {
return;
}
attachment.subscriptions = { ...attachment.subscriptions, [input.runID]: seq };
this.serializeBridgeAttachment(socket, attachment);
}
private async controlHeartbeat(
socket: WebSocket,
attachment: Extract<BridgeAttachment, { kind: "control" }>,
input: Extract<ControlMessage, { type: "heartbeat" }>,
): Promise<void> {
const leaseID = typeof input.leaseID === "string" ? input.leaseID : "";
const lease = leaseID ? await this.resolveLeaseForControl(leaseID, attachment) : undefined;
if (!lease) {
sendControl(socket, { type: "heartbeat", leaseID, ok: false, error: "not_found" });
return;
}
const heartbeat: { idleTimeoutSeconds?: number; telemetry?: Partial<LeaseTelemetry> } = {};
if (input.idleTimeoutSeconds !== undefined) {
heartbeat.idleTimeoutSeconds = input.idleTimeoutSeconds;
}
if (input.telemetry !== undefined) {
heartbeat.telemetry = input.telemetry;
}
await this.applyLeaseHeartbeat(lease, heartbeat);
sendControl(socket, {
type: "heartbeat",
leaseID: lease.id,
ok: true,
expiresAt: lease.expiresAt,
});
}
private serializeBridgeAttachment(socket: WebSocket, attachment: BridgeAttachment): void {
if (typeof socket.serializeAttachment === "function") {
socket.serializeAttachment(attachment);
}
}
private async handleBridgeMessage(
socket: WebSocket,
attachment: BridgeAttachment,
@ -548,6 +710,9 @@ export class FleetDurableObject implements DurableObject {
this.egressHosts.get(egressSocketKey(attachment.leaseID, attachment.sessionID)),
);
break;
case "control":
await this.handleControlMessage(socket, attachment, message);
break;
}
void socket;
}
@ -576,6 +741,11 @@ export class FleetDurableObject implements DurableObject {
case "egress-client":
this.clearEgressClient(attachment.leaseID, attachment.sessionID, socket);
break;
case "control":
if (this.controlSockets.get(attachment.clientID) === socket) {
this.controlSockets.delete(attachment.clientID);
}
break;
}
}
@ -737,25 +907,7 @@ export class FleetDurableObject implements DurableObject {
idleTimeoutSeconds?: number;
telemetry?: Partial<LeaseTelemetry>;
}>(request);
const now = new Date();
const requestedIdleTimeoutSeconds = body.idleTimeoutSeconds;
if (
Number.isFinite(requestedIdleTimeoutSeconds) &&
requestedIdleTimeoutSeconds !== undefined &&
requestedIdleTimeoutSeconds > 0
) {
lease.idleTimeoutSeconds = clampLeaseSeconds(requestedIdleTimeoutSeconds, 86_400);
}
const telemetry = sanitizeLeaseTelemetry(body.telemetry, now);
if (telemetry) {
lease.telemetry = telemetry;
lease.telemetryHistory = appendLeaseTelemetryHistory(lease.telemetryHistory, telemetry);
}
lease.updatedAt = now.toISOString();
lease.lastTouchedAt = now.toISOString();
lease.expiresAt = recomputeLeaseExpiresAt(lease, now).toISOString();
await this.putLease(lease);
await this.scheduleAlarm();
await this.applyLeaseHeartbeat(lease, body);
return json({ lease });
}
if (method === "POST" && action === "tailscale") {
@ -778,6 +930,34 @@ export class FleetDurableObject implements DurableObject {
return json({ error: "not_found" }, { status: 404 });
}
private async applyLeaseHeartbeat(
lease: LeaseRecord,
input: {
idleTimeoutSeconds?: number;
telemetry?: Partial<LeaseTelemetry>;
},
): Promise<void> {
const now = new Date();
const requestedIdleTimeoutSeconds = input.idleTimeoutSeconds;
if (
Number.isFinite(requestedIdleTimeoutSeconds) &&
requestedIdleTimeoutSeconds !== undefined &&
requestedIdleTimeoutSeconds > 0
) {
lease.idleTimeoutSeconds = clampLeaseSeconds(requestedIdleTimeoutSeconds, 86_400);
}
const telemetry = sanitizeLeaseTelemetry(input.telemetry, now);
if (telemetry) {
lease.telemetry = telemetry;
lease.telemetryHistory = appendLeaseTelemetryHistory(lease.telemetryHistory, telemetry);
}
lease.updatedAt = now.toISOString();
lease.lastTouchedAt = now.toISOString();
lease.expiresAt = recomputeLeaseExpiresAt(lease, now).toISOString();
await this.putLease(lease);
await this.scheduleAlarm();
}
private async prepareTailscaleConfig(
config: ReturnType<typeof leaseConfig>,
input: LeaseRequest,
@ -2514,6 +2694,34 @@ export class FleetDurableObject implements DurableObject {
return matches[0];
}
private async resolveLeaseForControl(
identifier: string,
attachment: Extract<BridgeAttachment, { kind: "control" }>,
): Promise<LeaseRecord | undefined> {
const exact = await this.getLease(identifier);
if (exact) {
return this.leaseVisibleToControl(exact, attachment) ? exact : undefined;
}
const slug = normalizeLeaseSlug(identifier);
if (!slug) {
return undefined;
}
const now = Date.now();
const matches = (await this.leaseRecords()).filter(
(lease) =>
lease.state === "active" &&
Date.parse(lease.expiresAt) > now &&
normalizeLeaseSlug(lease.slug) === slug &&
this.leaseVisibleToControl(lease, attachment),
);
if (matches.length > 1) {
throw new Error(
`ambiguous slug ${slug}: ${matches.map((lease) => `${lease.id}:${lease.owner}`).join(", ")}`,
);
}
return matches[0];
}
private async leaseRecords(): Promise<LeaseRecord[]> {
const leases = await this.state.storage.list<LeaseRecord>({ prefix: "lease:" });
return [...leases.values()];
@ -2594,6 +2802,24 @@ export class FleetDurableObject implements DurableObject {
);
}
private runVisibleToControl(
run: RunRecord,
attachment: Extract<BridgeAttachment, { kind: "control" }>,
): boolean {
return Boolean(
attachment.admin || (run.owner === attachment.owner && run.org === attachment.org),
);
}
private leaseVisibleToControl(
lease: LeaseRecord,
attachment: Extract<BridgeAttachment, { kind: "control" }>,
): boolean {
return Boolean(
attachment.admin || (lease.owner === attachment.owner && lease.org === attachment.org),
);
}
private async putLease(lease: LeaseRecord): Promise<void> {
await this.state.storage.put(leaseKey(lease.id), lease);
}
@ -2629,9 +2855,34 @@ export class FleetDurableObject implements DurableObject {
run.lastEventAt = now;
await this.state.storage.put(runEventKey(run.id, seq), event);
await this.putRun(run);
this.broadcastRunEvent(run, event);
return event;
}
private broadcastRunEvent(run: RunRecord, event: RunEventRecord): void {
for (const socket of this.controlSockets.values()) {
if (socket.readyState !== WebSocket.OPEN) {
continue;
}
const attachment = bridgeAttachment(socket);
if (!attachment || attachment.kind !== "control") {
continue;
}
const after = attachment.subscriptions?.[run.id];
if (after === undefined || after >= event.seq || !this.runVisibleToControl(run, attachment)) {
continue;
}
attachment.subscriptions = { ...attachment.subscriptions, [run.id]: event.seq };
this.serializeBridgeAttachment(socket, attachment);
sendControl(socket, {
type: "run_events",
runID: run.id,
events: [event],
nextSeq: event.seq,
});
}
}
private provider(provider: Provider, region = "eu-west-1"): CloudProvider {
const testProvider = this.testProviders[provider];
if (testProvider) {
@ -3134,18 +3385,41 @@ function bridgeAttachment(socket: WebSocket): BridgeAttachment | undefined {
return typeof attachment.leaseID === "string" && typeof attachment.sessionID === "string"
? attachment
: undefined;
case "control":
return typeof attachment.clientID === "string" &&
typeof attachment.owner === "string" &&
typeof attachment.org === "string"
? {
...attachment,
subscriptions:
attachment.subscriptions && typeof attachment.subscriptions === "object"
? attachment.subscriptions
: {},
}
: undefined;
default:
return undefined;
}
}
function bridgeTags(attachment: BridgeAttachment): string[] {
if (attachment.kind === "control") {
return [`control:${attachment.clientID}`, `owner:${attachment.owner}`, `org:${attachment.org}`];
}
if (attachment.kind === "egress-host" || attachment.kind === "egress-client") {
return [`lease:${attachment.leaseID}`, `session:${attachment.sessionID}`, attachment.kind];
}
return [`lease:${attachment.leaseID}`, attachment.kind];
}
function sendControl(socket: WebSocket, payload: unknown): void {
try {
socket.send(JSON.stringify(payload));
} catch {
closeSocket(socket, 1011, "control send failed");
}
}
function bytesToBase64(bytes: Uint8Array): string {
let binary = "";
for (const byte of bytes) {
@ -3282,6 +3556,12 @@ function finiteQueryNumber(value: string | null): number | undefined {
return Number.isFinite(parsed) && parsed >= 0 ? Math.trunc(parsed) : undefined;
}
function finiteControlNumber(value: number | undefined): number | undefined {
return Number.isFinite(value) && value !== undefined && value >= 0
? Math.trunc(value)
: undefined;
}
function boundedEgressString(value: string | undefined): string | undefined {
const normalized = String(value ?? "").trim();
if (!normalized) {

View File

@ -61,6 +61,40 @@ class MemoryStorage {
}
}
class FakeWebSocket {
readyState = WebSocket.OPEN;
private attachment: unknown;
private readonly sent: string[] = [];
constructor(attachment?: unknown) {
this.attachment = attachment;
}
send(data: string): void {
this.sent.push(data);
}
close(): void {
this.readyState = WebSocket.CLOSED;
}
accept(): void {}
addEventListener(): void {}
serializeAttachment(attachment: unknown): void {
this.attachment = attachment;
}
deserializeAttachment(): unknown {
return this.attachment;
}
sentJSON(): unknown[] {
return this.sent.map((value) => JSON.parse(value) as unknown);
}
}
describe("fleet lease identity and idle", () => {
it("creates leases through the public route with slug and idle metadata", async () => {
const storage = new MemoryStorage();
@ -1972,6 +2006,89 @@ describe("fleet run history", () => {
]);
});
it("streams run events and lease heartbeats over a control websocket", async () => {
const storage = new MemoryStorage();
const fleet = testFleet(storage);
const headers = {
"x-crabbox-owner": "peter@example.com",
"x-crabbox-org": "openclaw",
};
storage.seed(
"lease:cbx_000000000001",
testLease({
id: "cbx_000000000001",
slug: "blue-lobster",
owner: "peter@example.com",
org: "openclaw",
expiresAt: new Date(Date.now() + 60 * 60 * 1000).toISOString(),
}),
);
storage.seed(
"run:run_000000000001",
testRun({
id: "run_000000000001",
leaseID: "cbx_000000000001",
owner: "peter@example.com",
org: "openclaw",
eventCount: 1,
}),
);
storage.seed("runevent:run_000000000001:000000000001", {
runID: "run_000000000001",
seq: 1,
type: "run.started",
phase: "starting",
createdAt: "2026-05-01T00:00:00.000Z",
});
const socket = new FakeWebSocket({
kind: "control",
clientID: "ctrl_1",
owner: "peter@example.com",
org: "openclaw",
subscriptions: {},
});
(
fleet as unknown as {
controlSockets: Map<string, WebSocket>;
}
).controlSockets.set("ctrl_1", socket as unknown as WebSocket);
await fleet.webSocketMessage(
socket as unknown as WebSocket,
JSON.stringify({ type: "subscribe_run", runID: "run_000000000001", after: 0 }),
);
expect(socket.sentJSON()[0]).toMatchObject({
type: "run_events",
runID: "run_000000000001",
nextSeq: 1,
events: [{ seq: 1, type: "run.started" }],
});
await fleet.fetch(
request("POST", "/v1/runs/run_000000000001/events", {
headers,
body: { type: "stdout", stream: "stdout", data: "ok\n" },
}),
);
expect(socket.sentJSON()[1]).toMatchObject({
type: "run_events",
runID: "run_000000000001",
nextSeq: 2,
events: [{ seq: 2, type: "stdout", data: "ok\n" }],
});
await fleet.webSocketMessage(
socket as unknown as WebSocket,
JSON.stringify({ type: "heartbeat", leaseID: "blue-lobster", idleTimeoutSeconds: 900 }),
);
expect(socket.sentJSON()[2]).toMatchObject({
type: "heartbeat",
leaseID: "cbx_000000000001",
ok: true,
});
expect(storage.value<LeaseRecord>("lease:cbx_000000000001")?.idleTimeoutSeconds).toBe(900);
});
it("records finished runs and serves logs", async () => {
const fleet = testFleet();
const ownerHeaders = {