Compare commits
5 Commits
main
...
feat/adapt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3e8c998c3 | ||
|
|
218bfdeac8 | ||
|
|
b2eee1521b | ||
|
|
a8a8a95243 | ||
|
|
f399bf6d27 |
@ -4,6 +4,7 @@
|
||||
|
||||
### Added
|
||||
|
||||
- Added an authenticated coordinator control WebSocket for low-latency run attach streams and lease heartbeats, with HTTP polling/heartbeat fallback for older brokers.
|
||||
- Added mediated egress commands and browser wiring so Linux desktop leases can proxy selected app traffic through the operator machine via the coordinator bridge.
|
||||
- Added WebVNC portal clipboard controls for sending local clipboard text into the remote session and copying remote clipboard text back to the local browser.
|
||||
- Added rescue-first desktop/WebVNC failure output that names the failing layer and prints exact `rescue:` or native VNC fallback commands when bridges, viewers, browser launches, VNC targets, or input stacks hang.
|
||||
@ -11,6 +12,7 @@
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `crabbox run` transport chatter by keeping SSH multiplexers alive longer, retrying fallback SSH ports for streaming commands, and batching stdout/stderr preview events into larger coordinator chunks.
|
||||
- Fixed `egress start --coordinator` so live public-route egress starts work when the local default coordinator is Cloudflare Access-protected.
|
||||
- Fixed Tailscale exit-node bootstrap paths to prefer tailnet metadata and fail clearly when remote exit-node egress is not active.
|
||||
- Fixed `run --no-sync` timing summaries so they report `sync_skipped=true`.
|
||||
|
||||
@ -10,8 +10,10 @@ crabbox attach run_abcdef123456 --poll 500ms
|
||||
|
||||
## Behavior
|
||||
|
||||
`attach` polls the coordinator for new run events on a fixed interval,
|
||||
prints them as they arrive, and exits when the run finishes.
|
||||
`attach` follows coordinator run events, prints them as they arrive, and exits
|
||||
when the run finishes. Newer brokers stream events over the authenticated
|
||||
coordinator control WebSocket; older brokers or dropped sockets fall back to
|
||||
the HTTPS events API from the last printed sequence.
|
||||
|
||||
- stdout and stderr preview events are written back to stdout and stderr,
|
||||
preserving the stream split;
|
||||
@ -20,8 +22,8 @@ prints them as they arrive, and exits when the run finishes.
|
||||
message;
|
||||
- when the run has already finished, attach prints any remaining events
|
||||
and exits;
|
||||
- when the run is still active, attach polls until it sees a `finish`
|
||||
event.
|
||||
- when the run is still active, attach waits for streamed events or polls until
|
||||
it sees the run finish.
|
||||
|
||||
`attach` is not detached command execution. It follows the events the
|
||||
original CLI is emitting; if that CLI process dies, the run state remains
|
||||
@ -40,7 +42,7 @@ after completion.
|
||||
```text
|
||||
--id <run-id> run id (also accepted as a positional argument)
|
||||
--after <seq> resume after this event sequence number
|
||||
--poll <duration> polling interval, default 1s
|
||||
--poll <duration> fallback poll interval and WebSocket idle check, default 1s
|
||||
```
|
||||
|
||||
## Use Cases
|
||||
|
||||
@ -36,11 +36,17 @@ Cloud machines are vanilla Ubuntu runners that hold no broker secrets. They are
|
||||
|
||||
The CLI talks to the broker over HTTPS, then talks **directly** to the leased runner over SSH and rsync. The runner never calls the broker; that path stays one-way.
|
||||
|
||||
For long-lived coordinator interactions, newer CLIs also open one authenticated
|
||||
WebSocket to the Fleet Durable Object at `/v1/control`. That socket carries
|
||||
run-event attach streams and lease heartbeats so high-latency links do less
|
||||
request polling. HTTPS endpoints remain canonical storage and compatibility
|
||||
fallbacks, so older CLIs and older brokers still work.
|
||||
|
||||
## Ownership
|
||||
|
||||
| Layer | Owns |
|
||||
|:------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| **CLI** | config + flags; per-lease SSH key; SSH readiness; Git seeding + rsync; sync fingerprints + sanity checks; remote command + streaming; heartbeats; release |
|
||||
| **CLI** | config + flags; per-lease SSH key; SSH readiness; Git seeding + rsync; sync fingerprints + sanity checks; remote command + streaming; control WebSocket when available; HTTP fallback; release |
|
||||
| **Broker** | request auth + identity; serialized lease state; provider credentials; machine create/delete; lease expiry; pool/status/inspect; usage; spend caps |
|
||||
| **Provider** | raw compute: Hetzner Cloud servers or AWS EC2 instances |
|
||||
| **Runner** | nothing durable for brokered boxes: Linux prepared by cloud-init with SSH, Git, rsync, curl, jq, `/work/crabbox`; AWS Windows/WSL2/macOS targets have provider-specific bootstrap; static targets are existing SSH hosts; project runtimes come from repo-owned setup |
|
||||
|
||||
@ -9,6 +9,19 @@ Read when:
|
||||
|
||||
Crabbox performance comes from avoiding repeated setup, keeping the sync small, choosing available capacity, and reusing project-defined hydration when it matters.
|
||||
|
||||
## High-Latency Links
|
||||
|
||||
Crabbox should not require a special slow-network mode. The CLI keeps SSH as
|
||||
the universal command transport, but uses SSH ControlMaster with a longer
|
||||
persist window so repeated probes, sync helpers, and commands avoid paying a
|
||||
new handshake every time. Streaming commands retry coordinator-provided
|
||||
fallback ports just like readiness and helper probes.
|
||||
|
||||
When the broker supports it, `crabbox attach` and lease heartbeats use one
|
||||
authenticated coordinator WebSocket instead of repeated HTTP polls. If the
|
||||
socket cannot connect or drops, the CLI resumes through the existing HTTPS API
|
||||
from the last acknowledged run-event sequence.
|
||||
|
||||
## Warm Leases
|
||||
|
||||
Use `warmup` for repeated agent loops:
|
||||
|
||||
223
internal/cli/control_ws.go
Normal file
223
internal/cli/control_ws.go
Normal file
@ -0,0 +1,223 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"nhooyr.io/websocket"
|
||||
)
|
||||
|
||||
const coordinatorControlDialTimeout = 1500 * time.Millisecond
|
||||
|
||||
type coordinatorControlConn struct {
|
||||
conn *websocket.Conn
|
||||
}
|
||||
|
||||
type coordinatorControlMessage struct {
|
||||
Type string `json:"type"`
|
||||
Protocol int `json:"protocol,omitempty"`
|
||||
ClientID string `json:"clientID,omitempty"`
|
||||
RunID string `json:"runID,omitempty"`
|
||||
Events []CoordinatorRunEvent `json:"events,omitempty"`
|
||||
NextSeq int `json:"nextSeq,omitempty"`
|
||||
LeaseID string `json:"leaseID,omitempty"`
|
||||
OK bool `json:"ok,omitempty"`
|
||||
ExpiresAt string `json:"expiresAt,omitempty"`
|
||||
Code string `json:"code,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
IdleTimeoutSeconds int `json:"idleTimeoutSeconds,omitempty"`
|
||||
Telemetry *LeaseTelemetry `json:"telemetry,omitempty"`
|
||||
}
|
||||
|
||||
func dialCoordinatorControl(ctx context.Context, coord *CoordinatorClient) (*coordinatorControlConn, error) {
|
||||
endpoint, err := coordinatorControlURL(coord.BaseURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
headers := http.Header{}
|
||||
coord.addRequestHeaders(headers)
|
||||
opts := &websocket.DialOptions{
|
||||
HTTPHeader: headers,
|
||||
}
|
||||
if coord.Client != nil {
|
||||
opts.HTTPClient = coord.Client
|
||||
}
|
||||
conn, resp, err := websocket.Dial(ctx, endpoint, opts)
|
||||
if err != nil {
|
||||
if resp != nil && resp.Body != nil {
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return &coordinatorControlConn{conn: conn}, nil
|
||||
}
|
||||
|
||||
func coordinatorControlURL(baseURL string) (string, error) {
|
||||
base, err := url.Parse(baseURL)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
switch base.Scheme {
|
||||
case "http":
|
||||
base.Scheme = "ws"
|
||||
case "https":
|
||||
base.Scheme = "wss"
|
||||
case "ws", "wss":
|
||||
default:
|
||||
return "", fmt.Errorf("unsupported coordinator scheme %q", base.Scheme)
|
||||
}
|
||||
base.Path = strings.TrimRight(base.Path, "/") + "/v1/control"
|
||||
base.RawQuery = ""
|
||||
base.Fragment = ""
|
||||
return base.String(), nil
|
||||
}
|
||||
|
||||
func (c *coordinatorControlConn) close() {
|
||||
if c == nil || c.conn == nil {
|
||||
return
|
||||
}
|
||||
_ = c.conn.Close(websocket.StatusNormalClosure, "")
|
||||
}
|
||||
|
||||
func (c *coordinatorControlConn) write(ctx context.Context, payload any) error {
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.conn.Write(ctx, websocket.MessageText, data)
|
||||
}
|
||||
|
||||
func (c *coordinatorControlConn) read(ctx context.Context) (coordinatorControlMessage, error) {
|
||||
typ, data, err := c.conn.Read(ctx)
|
||||
if err != nil {
|
||||
return coordinatorControlMessage{}, err
|
||||
}
|
||||
if typ != websocket.MessageText {
|
||||
return coordinatorControlMessage{}, fmt.Errorf("control websocket sent non-text frame")
|
||||
}
|
||||
var msg coordinatorControlMessage
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
return coordinatorControlMessage{}, err
|
||||
}
|
||||
return msg, nil
|
||||
}
|
||||
|
||||
func followRunControlWebSocket(ctx context.Context, coord *CoordinatorClient, runID string, after int, poll time.Duration, stdout, stderr io.Writer) (int, bool, bool, error) {
|
||||
dialCtx, cancel := context.WithTimeout(ctx, coordinatorControlDialTimeout)
|
||||
control, err := dialCoordinatorControl(dialCtx, coord)
|
||||
cancel()
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return after, false, false, ctx.Err()
|
||||
}
|
||||
return after, false, false, nil
|
||||
}
|
||||
defer control.close()
|
||||
nextAfter := after
|
||||
writeCtx, writeCancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
err = control.write(writeCtx, map[string]any{
|
||||
"type": "subscribe_run",
|
||||
"runID": runID,
|
||||
"after": nextAfter,
|
||||
"limit": 100,
|
||||
})
|
||||
writeCancel()
|
||||
if err != nil {
|
||||
return nextAfter, false, true, nil
|
||||
}
|
||||
for {
|
||||
readCtx, readCancel := context.WithTimeout(ctx, poll)
|
||||
msg, err := control.read(readCtx)
|
||||
readCancel()
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return nextAfter, false, true, ctx.Err()
|
||||
}
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
done, err := coordinatorRunDone(ctx, coord, runID)
|
||||
if err != nil {
|
||||
return nextAfter, false, true, err
|
||||
}
|
||||
if done {
|
||||
return nextAfter, true, true, nil
|
||||
}
|
||||
continue
|
||||
}
|
||||
done, doneErr := coordinatorRunDone(ctx, coord, runID)
|
||||
if doneErr != nil {
|
||||
return nextAfter, false, true, doneErr
|
||||
}
|
||||
if done {
|
||||
return nextAfter, true, true, nil
|
||||
}
|
||||
return nextAfter, false, true, nil
|
||||
}
|
||||
switch msg.Type {
|
||||
case "hello", "pong", "heartbeat":
|
||||
continue
|
||||
case "error":
|
||||
return nextAfter, false, true, nil
|
||||
case "run_events":
|
||||
for _, event := range msg.Events {
|
||||
if event.Seq <= nextAfter {
|
||||
continue
|
||||
}
|
||||
nextAfter = event.Seq
|
||||
printAttachEvent(stdout, stderr, event)
|
||||
}
|
||||
ackCtx, ackCancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
_ = control.write(ackCtx, map[string]any{
|
||||
"type": "ack",
|
||||
"runID": runID,
|
||||
"seq": nextAfter,
|
||||
})
|
||||
ackCancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func coordinatorRunDone(ctx context.Context, coord *CoordinatorClient, runID string) (bool, error) {
|
||||
run, err := coord.Run(ctx, runID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return run.State != "running", nil
|
||||
}
|
||||
|
||||
func (c *coordinatorControlConn) heartbeat(ctx context.Context, leaseID string, idleTimeout *time.Duration, telemetry *LeaseTelemetry) error {
|
||||
payload := coordinatorControlMessage{
|
||||
Type: "heartbeat",
|
||||
LeaseID: leaseID,
|
||||
Telemetry: telemetry,
|
||||
}
|
||||
if idleTimeout != nil && *idleTimeout > 0 {
|
||||
payload.IdleTimeoutSeconds = int(idleTimeout.Seconds())
|
||||
}
|
||||
if err := c.write(ctx, payload); err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
msg, err := c.read(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if msg.Type != "heartbeat" {
|
||||
continue
|
||||
}
|
||||
if !msg.OK {
|
||||
if msg.Error != "" {
|
||||
return fmt.Errorf("control heartbeat failed: %s", msg.Error)
|
||||
}
|
||||
return fmt.Errorf("control heartbeat failed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -897,16 +897,7 @@ func (c *CoordinatorClient) doHTTP(ctx context.Context, method, path string, dat
|
||||
if hasBody {
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
}
|
||||
if c.Token != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+c.Token)
|
||||
}
|
||||
c.addAccessHeaders(req.Header)
|
||||
if owner := localCoordinatorOwner(); owner != "" {
|
||||
req.Header.Set("X-Crabbox-Owner", owner)
|
||||
}
|
||||
if org := os.Getenv("CRABBOX_ORG"); org != "" {
|
||||
req.Header.Set("X-Crabbox-Org", org)
|
||||
}
|
||||
c.addRequestHeaders(req.Header)
|
||||
resp, err := c.Client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -915,6 +906,19 @@ func (c *CoordinatorClient) doHTTP(ctx context.Context, method, path string, dat
|
||||
return decodeCoordinatorResponse(method, path, resp.StatusCode, resp.Body, out)
|
||||
}
|
||||
|
||||
func (c *CoordinatorClient) addRequestHeaders(headers http.Header) {
|
||||
if c.Token != "" {
|
||||
headers.Set("Authorization", "Bearer "+c.Token)
|
||||
}
|
||||
c.addAccessHeaders(headers)
|
||||
if owner := localCoordinatorOwner(); owner != "" {
|
||||
headers.Set("X-Crabbox-Owner", owner)
|
||||
}
|
||||
if org := os.Getenv("CRABBOX_ORG"); org != "" {
|
||||
headers.Set("X-Crabbox-Org", org)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CoordinatorClient) doCurl(ctx context.Context, method, path string, data []byte, hasBody bool, out any) error {
|
||||
config, cleanup, err := c.curlConfig(method, path, data, hasBody)
|
||||
if err != nil {
|
||||
|
||||
@ -13,6 +13,8 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"nhooyr.io/websocket"
|
||||
)
|
||||
|
||||
func TestCoordinatorMachineIDAcceptsStringOrNumber(t *testing.T) {
|
||||
@ -295,6 +297,10 @@ func TestCoordinatorAppendRunTelemetry(t *testing.T) {
|
||||
func TestCoordinatorHeartbeatTouchesImmediately(t *testing.T) {
|
||||
touches := make(chan struct{}, 1)
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/v1/control" {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
if r.URL.Path != "/v1/leases/cbx_123/heartbeat" {
|
||||
t.Fatalf("unexpected path %s", r.URL.Path)
|
||||
}
|
||||
@ -318,6 +324,10 @@ func TestCoordinatorHeartbeatTouchesImmediately(t *testing.T) {
|
||||
func TestCoordinatorHeartbeatIncludesTelemetry(t *testing.T) {
|
||||
bodies := make(chan string, 1)
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/v1/control" {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
if r.URL.Path != "/v1/leases/cbx_123/heartbeat" {
|
||||
t.Fatalf("unexpected path %s", r.URL.Path)
|
||||
}
|
||||
@ -346,6 +356,59 @@ func TestCoordinatorHeartbeatIncludesTelemetry(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoordinatorHeartbeatUsesControlWebSocket(t *testing.T) {
|
||||
bodies := make(chan string, 1)
|
||||
httpHeartbeats := make(chan struct{}, 1)
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case r.Method == http.MethodGet && r.URL.Path == "/v1/control":
|
||||
conn, err := websocket.Accept(w, r, nil)
|
||||
if err != nil {
|
||||
t.Errorf("accept control websocket: %v", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close(websocket.StatusNormalClosure, "")
|
||||
_, data, err := conn.Read(r.Context())
|
||||
if err != nil {
|
||||
t.Errorf("read control heartbeat: %v", err)
|
||||
return
|
||||
}
|
||||
bodies <- string(data)
|
||||
_ = conn.Write(r.Context(), websocket.MessageText, []byte(`{"type":"heartbeat","leaseID":"cbx_123","ok":true,"expiresAt":"2026-05-01T00:30:00Z"}`))
|
||||
<-r.Context().Done()
|
||||
case r.Method == http.MethodPost && r.URL.Path == "/v1/leases/cbx_123/heartbeat":
|
||||
httpHeartbeats <- struct{}{}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{"lease":{"id":"cbx_123","provider":"aws","state":"active","expiresAt":"2026-05-01T00:30:00Z"}}`))
|
||||
default:
|
||||
t.Fatalf("unexpected path %s", r.URL.Path)
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
load := 0.77
|
||||
collector := func(context.Context) (*LeaseTelemetry, error) {
|
||||
return &LeaseTelemetry{Load1: &load}, nil
|
||||
}
|
||||
client := CoordinatorClient{BaseURL: server.URL, Client: server.Client()}
|
||||
stop := startCoordinatorHeartbeat(context.Background(), &client, "cbx_123", 30*time.Minute, nil, collector, io.Discard)
|
||||
defer stop()
|
||||
|
||||
select {
|
||||
case body := <-bodies:
|
||||
if !strings.Contains(body, `"type":"heartbeat"`) || !strings.Contains(body, `"load1":0.77`) {
|
||||
t.Fatalf("control heartbeat body=%s", body)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("heartbeat did not use control websocket")
|
||||
}
|
||||
select {
|
||||
case <-httpHeartbeats:
|
||||
t.Fatal("heartbeat fell back to HTTP despite websocket success")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoordinatorLeaseWatchCancelsWhenLeaseReleased(t *testing.T) {
|
||||
oldInterval := coordinatorLeaseWatchInterval
|
||||
coordinatorLeaseWatchInterval = 10 * time.Millisecond
|
||||
|
||||
@ -152,6 +152,13 @@ func (a App) attach(ctx context.Context, args []string) error {
|
||||
return err
|
||||
}
|
||||
nextAfter := *after
|
||||
if wsAfter, done, used, err := followRunControlWebSocket(ctx, coord, *runID, nextAfter, *poll, a.Stdout, a.Stderr); err != nil {
|
||||
return err
|
||||
} else if done {
|
||||
return nil
|
||||
} else if used {
|
||||
nextAfter = wsAfter
|
||||
}
|
||||
for {
|
||||
events, err := coord.RunEvents(ctx, *runID, nextAfter, 100)
|
||||
if err != nil {
|
||||
|
||||
@ -7,6 +7,8 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"nhooyr.io/websocket"
|
||||
)
|
||||
|
||||
func TestEventsCommandPassesPagination(t *testing.T) {
|
||||
@ -39,6 +41,8 @@ func TestAttachCommandReplaysOutputAndStopsWhenRunFinished(t *testing.T) {
|
||||
eventCalls := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case r.Method == http.MethodGet && r.URL.Path == "/v1/control":
|
||||
http.NotFound(w, r)
|
||||
case r.Method == http.MethodGet && r.URL.Path == "/v1/runs/run_123/events":
|
||||
eventCalls++
|
||||
if eventCalls == 1 {
|
||||
@ -80,3 +84,62 @@ func TestAttachCommandReplaysOutputAndStopsWhenRunFinished(t *testing.T) {
|
||||
t.Fatalf("eventCalls=%d, want 2", eventCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAttachCommandStreamsOverControlWebSocket(t *testing.T) {
|
||||
controlCalls := 0
|
||||
eventCalls := 0
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case r.Method == http.MethodGet && r.URL.Path == "/v1/control":
|
||||
controlCalls++
|
||||
conn, err := websocket.Accept(w, r, nil)
|
||||
if err != nil {
|
||||
t.Errorf("accept control websocket: %v", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close(websocket.StatusNormalClosure, "")
|
||||
for {
|
||||
_, data, err := conn.Read(r.Context())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var msg map[string]any
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
t.Errorf("control message JSON: %v", err)
|
||||
return
|
||||
}
|
||||
if msg["type"] == "subscribe_run" {
|
||||
if msg["runID"] != "run_123" {
|
||||
t.Errorf("runID=%v", msg["runID"])
|
||||
}
|
||||
_ = conn.Write(r.Context(), websocket.MessageText, []byte(`{"type":"run_events","runID":"run_123","events":[{"runID":"run_123","seq":1,"type":"stdout","stream":"stdout","data":"hello ws\n","createdAt":"2026-05-02T00:00:00Z"}],"nextSeq":1}`))
|
||||
}
|
||||
}
|
||||
case r.Method == http.MethodGet && r.URL.Path == "/v1/runs/run_123":
|
||||
_, _ = w.Write([]byte(`{"run":{"id":"run_123","leaseID":"cbx_123","owner":"peter@example.com","org":"openclaw","provider":"aws","class":"standard","serverType":"t3.small","command":["true"],"state":"succeeded","phase":"finished","logBytes":0,"logTruncated":false,"startedAt":"2026-05-02T00:00:00Z"}}`))
|
||||
case r.Method == http.MethodGet && r.URL.Path == "/v1/runs/run_123/events":
|
||||
eventCalls++
|
||||
_, _ = w.Write([]byte(`{"events":[]}`))
|
||||
default:
|
||||
t.Fatalf("unexpected request %s %s", r.Method, r.URL.String())
|
||||
}
|
||||
}))
|
||||
defer server.Close()
|
||||
t.Setenv("CRABBOX_COORDINATOR", server.URL)
|
||||
t.Setenv("CRABBOX_COORDINATOR_TOKEN", "")
|
||||
|
||||
var stdout, stderr bytes.Buffer
|
||||
app := App{Stdout: &stdout, Stderr: &stderr}
|
||||
if err := app.attach(context.Background(), []string{"run_123", "--poll", "1ms"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if stdout.String() != "hello ws\n" {
|
||||
t.Fatalf("stdout=%q", stdout.String())
|
||||
}
|
||||
if controlCalls != 1 {
|
||||
t.Fatalf("controlCalls=%d, want 1", controlCalls)
|
||||
}
|
||||
if eventCalls != 0 {
|
||||
t.Fatalf("HTTP eventCalls=%d, want websocket attach to avoid polling events", eventCalls)
|
||||
}
|
||||
}
|
||||
|
||||
@ -852,16 +852,42 @@ func startCoordinatorHeartbeat(ctx context.Context, coord *CoordinatorClient, le
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
var control *coordinatorControlConn
|
||||
defer func() {
|
||||
if control != nil {
|
||||
control.close()
|
||||
}
|
||||
}()
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
telemetry := collectLeaseTelemetryBestEffort(rootCtx, telemetryCollector)
|
||||
callCtx, heartbeatCancel := context.WithTimeout(rootCtx, 20*time.Second)
|
||||
var err error
|
||||
var idleTimeoutOverride *time.Duration
|
||||
if updateIdleTimeout != nil {
|
||||
_, err = coord.UpdateLeaseIdleTimeoutWithTelemetry(callCtx, leaseID, *updateIdleTimeout, telemetry)
|
||||
idleTimeoutOverride = updateIdleTimeout
|
||||
}
|
||||
if control == nil {
|
||||
dialCtx, dialCancel := context.WithTimeout(callCtx, coordinatorControlDialTimeout)
|
||||
control, _ = dialCoordinatorControl(dialCtx, coord)
|
||||
dialCancel()
|
||||
}
|
||||
if control != nil {
|
||||
err = control.heartbeat(callCtx, leaseID, idleTimeoutOverride, telemetry)
|
||||
if err != nil {
|
||||
control.close()
|
||||
control = nil
|
||||
}
|
||||
}
|
||||
if control == nil {
|
||||
if updateIdleTimeout != nil {
|
||||
_, err = coord.UpdateLeaseIdleTimeoutWithTelemetry(callCtx, leaseID, *updateIdleTimeout, telemetry)
|
||||
} else {
|
||||
_, err = coord.TouchLeaseWithTelemetry(callCtx, leaseID, telemetry)
|
||||
}
|
||||
} else {
|
||||
_, err = coord.TouchLeaseWithTelemetry(callCtx, leaseID, telemetry)
|
||||
err = nil
|
||||
}
|
||||
heartbeatCancel()
|
||||
if err != nil && rootCtx.Err() == nil {
|
||||
|
||||
@ -9,7 +9,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
runEventOutputChunkBytes = 4096
|
||||
runEventOutputChunkBytes = 16 * 1024
|
||||
runEventOutputMaxBytes = 64 * 1024
|
||||
runEventOutputQueueSize = 32
|
||||
runEventOutputPostWait = 2 * time.Second
|
||||
|
||||
@ -329,20 +329,23 @@ func runSSHInput(ctx context.Context, target SSHTarget, remote string, input io.
|
||||
|
||||
func runSSHStream(ctx context.Context, target SSHTarget, remote string, stdout, stderr io.Writer) int {
|
||||
remote = wrapRemoteForTarget(target, remote)
|
||||
cmd := exec.CommandContext(ctx, "ssh", sshArgs(target, remote)...)
|
||||
cmd.Stdout = stdout
|
||||
cmd.Stderr = stderr
|
||||
err := cmd.Run()
|
||||
if err == nil {
|
||||
return 0
|
||||
}
|
||||
var exitErr *exec.ExitError
|
||||
if ok := asExitError(err, &exitErr); ok {
|
||||
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
|
||||
return status.ExitStatus()
|
||||
lastCode := 7
|
||||
for _, port := range sshPortCandidates(target.Port, target.FallbackPorts) {
|
||||
probe := target
|
||||
probe.Port = port
|
||||
cmd := exec.CommandContext(ctx, "ssh", sshArgs(probe, remote)...)
|
||||
cmd.Stdout = stdout
|
||||
cmd.Stderr = stderr
|
||||
err := cmd.Run()
|
||||
if err == nil {
|
||||
return 0
|
||||
}
|
||||
lastCode = exitCode(err)
|
||||
if !shouldRetrySSHPort(err) {
|
||||
return lastCode
|
||||
}
|
||||
}
|
||||
return 7
|
||||
return lastCode
|
||||
}
|
||||
|
||||
func sshArgs(target SSHTarget, remote string) []string {
|
||||
@ -380,7 +383,7 @@ func sshBaseArgsWithOptions(target SSHTarget, connectTimeout, connectionAttempts
|
||||
} else {
|
||||
args = append(args,
|
||||
"-o", "ControlMaster=auto",
|
||||
"-o", "ControlPersist=60s",
|
||||
"-o", "ControlPersist=10m",
|
||||
"-o", "ControlPath="+sshControlPath(target),
|
||||
)
|
||||
}
|
||||
|
||||
@ -223,7 +223,7 @@ func TestSSHArgsIncludeReliabilityOptions(t *testing.T) {
|
||||
"ServerAliveInterval=15",
|
||||
"ServerAliveCountMax=2",
|
||||
"ControlMaster=auto",
|
||||
"ControlPersist=60s",
|
||||
"ControlPersist=10m",
|
||||
"ControlPath=",
|
||||
"crabbox-ssh-",
|
||||
"-%C",
|
||||
@ -279,6 +279,54 @@ func TestShouldRetrySSHPortOnlyForTransportExit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunSSHStreamRetriesFallbackPorts(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
sshPath := filepath.Join(dir, "ssh")
|
||||
portsPath := filepath.Join(dir, "ports")
|
||||
script := `#!/bin/sh
|
||||
port=""
|
||||
while [ "$#" -gt 0 ]; do
|
||||
if [ "$1" = "-p" ]; then
|
||||
shift
|
||||
port="$1"
|
||||
fi
|
||||
shift
|
||||
done
|
||||
printf '%s\n' "$port" >> "$CRABBOX_FAKE_SSH_PORTS"
|
||||
if [ "$port" = "2222" ]; then
|
||||
exit 255
|
||||
fi
|
||||
printf 'ok\n'
|
||||
exit 0
|
||||
`
|
||||
if err := os.WriteFile(sshPath, []byte(script), 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Setenv("PATH", dir+string(os.PathListSeparator)+os.Getenv("PATH"))
|
||||
t.Setenv("CRABBOX_FAKE_SSH_PORTS", portsPath)
|
||||
|
||||
var stdout, stderr bytes.Buffer
|
||||
code := runSSHStream(context.Background(), SSHTarget{
|
||||
User: "crabbox",
|
||||
Host: "203.0.113.10",
|
||||
Port: "2222",
|
||||
FallbackPorts: []string{"22"},
|
||||
}, "true", &stdout, &stderr)
|
||||
if code != 0 {
|
||||
t.Fatalf("runSSHStream exit=%d stderr=%q", code, stderr.String())
|
||||
}
|
||||
if stdout.String() != "ok\n" {
|
||||
t.Fatalf("stdout=%q want ok", stdout.String())
|
||||
}
|
||||
ports, err := os.ReadFile(portsPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(ports) != "2222\n22\n" {
|
||||
t.Fatalf("ports=%q want fallback sequence", string(ports))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSSHCommandLineRedactsSecretAuthUser(t *testing.T) {
|
||||
target := SSHTarget{
|
||||
User: "tok_live_secret",
|
||||
|
||||
@ -192,7 +192,26 @@ type BridgeAttachment =
|
||||
| { kind: "code-agent"; leaseID: string }
|
||||
| { kind: "code-viewer"; leaseID: string; id: string }
|
||||
| { kind: "egress-host"; leaseID: string; sessionID: string }
|
||||
| { kind: "egress-client"; leaseID: string; sessionID: string };
|
||||
| { kind: "egress-client"; leaseID: string; sessionID: string }
|
||||
| {
|
||||
kind: "control";
|
||||
clientID: string;
|
||||
owner: string;
|
||||
org: string;
|
||||
admin?: boolean;
|
||||
subscriptions?: Record<string, number>;
|
||||
};
|
||||
|
||||
type ControlMessage =
|
||||
| { type: "subscribe_run"; runID?: string; after?: number; limit?: number }
|
||||
| { type: "ack"; runID?: string; seq?: number }
|
||||
| {
|
||||
type: "heartbeat";
|
||||
leaseID?: string;
|
||||
idleTimeoutSeconds?: number;
|
||||
telemetry?: Partial<LeaseTelemetry>;
|
||||
}
|
||||
| { type: "ping" };
|
||||
|
||||
interface WebVNCEvent {
|
||||
at: string;
|
||||
@ -212,6 +231,7 @@ export class FleetDurableObject implements DurableObject {
|
||||
private readonly egressHosts = new Map<string, WebSocket>();
|
||||
private readonly egressClients = new Map<string, WebSocket>();
|
||||
private readonly egressSessions = new Map<string, EgressSessionStatus>();
|
||||
private readonly controlSockets = new Map<string, WebSocket>();
|
||||
|
||||
constructor(
|
||||
private readonly state: DurableObjectState,
|
||||
@ -253,6 +273,9 @@ export class FleetDurableObject implements DurableObject {
|
||||
if (method === "GET" && parts.join("/") === "v1/whoami") {
|
||||
return this.whoami(request);
|
||||
}
|
||||
if (method === "GET" && parts.join("/") === "v1/control") {
|
||||
return await this.controlSocket(request);
|
||||
}
|
||||
if (method === "GET" && parts.join("/") === "v1/admin/leases") {
|
||||
return await this.adminLeases(request);
|
||||
}
|
||||
@ -454,6 +477,9 @@ export class FleetDurableObject implements DurableObject {
|
||||
this.egressClients.set(egressSocketKey(attachment.leaseID, attachment.sessionID), socket);
|
||||
this.trackEgressSession(attachment);
|
||||
break;
|
||||
case "control":
|
||||
this.controlSockets.set(attachment.clientID, socket);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -501,6 +527,142 @@ export class FleetDurableObject implements DurableObject {
|
||||
this.egressSessions.set(leaseID, sessionStatus);
|
||||
}
|
||||
|
||||
private async controlSocket(request: Request): Promise<Response> {
|
||||
if (request.headers.get("upgrade")?.toLowerCase() !== "websocket") {
|
||||
return json({ error: "websocket_required" }, { status: 426 });
|
||||
}
|
||||
const pair = new WebSocketPair();
|
||||
const client = pair[0];
|
||||
const server = pair[1];
|
||||
const attachment: BridgeAttachment = {
|
||||
kind: "control",
|
||||
clientID: crypto.randomUUID(),
|
||||
owner: requestOwner(request),
|
||||
org: requestOrg(request, this.env),
|
||||
admin: isAdminRequest(request),
|
||||
subscriptions: {},
|
||||
};
|
||||
this.controlSockets.set(attachment.clientID, server);
|
||||
this.acceptBridgeWebSocket(server, attachment);
|
||||
sendControl(server, {
|
||||
type: "hello",
|
||||
protocol: 1,
|
||||
clientID: attachment.clientID,
|
||||
});
|
||||
return new Response(null, { status: 101, webSocket: client });
|
||||
}
|
||||
|
||||
private async handleControlMessage(
|
||||
socket: WebSocket,
|
||||
attachment: Extract<BridgeAttachment, { kind: "control" }>,
|
||||
message: string | ArrayBuffer | Blob,
|
||||
): Promise<void> {
|
||||
if (typeof message !== "string") {
|
||||
sendControl(socket, {
|
||||
type: "error",
|
||||
code: "invalid_message",
|
||||
message: "expected JSON text",
|
||||
});
|
||||
return;
|
||||
}
|
||||
let input: ControlMessage;
|
||||
try {
|
||||
input = JSON.parse(message) as ControlMessage;
|
||||
} catch {
|
||||
sendControl(socket, { type: "error", code: "invalid_json", message: "invalid JSON" });
|
||||
return;
|
||||
}
|
||||
switch (input.type) {
|
||||
case "subscribe_run":
|
||||
await this.subscribeControlRun(socket, attachment, input);
|
||||
return;
|
||||
case "ack":
|
||||
this.ackControlRun(socket, attachment, input);
|
||||
return;
|
||||
case "heartbeat":
|
||||
await this.controlHeartbeat(socket, attachment, input);
|
||||
return;
|
||||
case "ping":
|
||||
sendControl(socket, { type: "pong" });
|
||||
return;
|
||||
default:
|
||||
sendControl(socket, {
|
||||
type: "error",
|
||||
code: "unknown_type",
|
||||
message: "unknown control message",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async subscribeControlRun(
|
||||
socket: WebSocket,
|
||||
attachment: Extract<BridgeAttachment, { kind: "control" }>,
|
||||
input: Extract<ControlMessage, { type: "subscribe_run" }>,
|
||||
): Promise<void> {
|
||||
const runID = typeof input.runID === "string" ? input.runID : "";
|
||||
const run = runID ? await this.getRun(runID) : undefined;
|
||||
if (!run || !this.runVisibleToControl(run, attachment)) {
|
||||
sendControl(socket, { type: "error", code: "not_found", message: "run not found" });
|
||||
return;
|
||||
}
|
||||
const after = finiteControlNumber(input.after) ?? 0;
|
||||
const limit = Math.min(finiteControlNumber(input.limit) ?? 100, 500);
|
||||
const events = await this.runEvents(runID, after, limit);
|
||||
const nextSeq = events.at(-1)?.seq ?? after;
|
||||
attachment.subscriptions = { ...attachment.subscriptions, [runID]: nextSeq };
|
||||
this.serializeBridgeAttachment(socket, attachment);
|
||||
sendControl(socket, { type: "run_events", runID, events, nextSeq });
|
||||
}
|
||||
|
||||
private ackControlRun(
|
||||
socket: WebSocket,
|
||||
attachment: Extract<BridgeAttachment, { kind: "control" }>,
|
||||
input: Extract<ControlMessage, { type: "ack" }>,
|
||||
): void {
|
||||
if (typeof input.runID !== "string") {
|
||||
return;
|
||||
}
|
||||
const seq = finiteControlNumber(input.seq);
|
||||
if (seq === undefined) {
|
||||
return;
|
||||
}
|
||||
attachment.subscriptions = { ...attachment.subscriptions, [input.runID]: seq };
|
||||
this.serializeBridgeAttachment(socket, attachment);
|
||||
}
|
||||
|
||||
private async controlHeartbeat(
|
||||
socket: WebSocket,
|
||||
attachment: Extract<BridgeAttachment, { kind: "control" }>,
|
||||
input: Extract<ControlMessage, { type: "heartbeat" }>,
|
||||
): Promise<void> {
|
||||
const leaseID = typeof input.leaseID === "string" ? input.leaseID : "";
|
||||
const lease = leaseID ? await this.resolveLeaseForControl(leaseID, attachment) : undefined;
|
||||
if (!lease) {
|
||||
sendControl(socket, { type: "heartbeat", leaseID, ok: false, error: "not_found" });
|
||||
return;
|
||||
}
|
||||
const heartbeat: { idleTimeoutSeconds?: number; telemetry?: Partial<LeaseTelemetry> } = {};
|
||||
if (input.idleTimeoutSeconds !== undefined) {
|
||||
heartbeat.idleTimeoutSeconds = input.idleTimeoutSeconds;
|
||||
}
|
||||
if (input.telemetry !== undefined) {
|
||||
heartbeat.telemetry = input.telemetry;
|
||||
}
|
||||
await this.applyLeaseHeartbeat(lease, heartbeat);
|
||||
sendControl(socket, {
|
||||
type: "heartbeat",
|
||||
leaseID: lease.id,
|
||||
ok: true,
|
||||
expiresAt: lease.expiresAt,
|
||||
});
|
||||
}
|
||||
|
||||
private serializeBridgeAttachment(socket: WebSocket, attachment: BridgeAttachment): void {
|
||||
if (typeof socket.serializeAttachment === "function") {
|
||||
socket.serializeAttachment(attachment);
|
||||
}
|
||||
}
|
||||
|
||||
private async handleBridgeMessage(
|
||||
socket: WebSocket,
|
||||
attachment: BridgeAttachment,
|
||||
@ -548,6 +710,9 @@ export class FleetDurableObject implements DurableObject {
|
||||
this.egressHosts.get(egressSocketKey(attachment.leaseID, attachment.sessionID)),
|
||||
);
|
||||
break;
|
||||
case "control":
|
||||
await this.handleControlMessage(socket, attachment, message);
|
||||
break;
|
||||
}
|
||||
void socket;
|
||||
}
|
||||
@ -576,6 +741,11 @@ export class FleetDurableObject implements DurableObject {
|
||||
case "egress-client":
|
||||
this.clearEgressClient(attachment.leaseID, attachment.sessionID, socket);
|
||||
break;
|
||||
case "control":
|
||||
if (this.controlSockets.get(attachment.clientID) === socket) {
|
||||
this.controlSockets.delete(attachment.clientID);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -737,25 +907,7 @@ export class FleetDurableObject implements DurableObject {
|
||||
idleTimeoutSeconds?: number;
|
||||
telemetry?: Partial<LeaseTelemetry>;
|
||||
}>(request);
|
||||
const now = new Date();
|
||||
const requestedIdleTimeoutSeconds = body.idleTimeoutSeconds;
|
||||
if (
|
||||
Number.isFinite(requestedIdleTimeoutSeconds) &&
|
||||
requestedIdleTimeoutSeconds !== undefined &&
|
||||
requestedIdleTimeoutSeconds > 0
|
||||
) {
|
||||
lease.idleTimeoutSeconds = clampLeaseSeconds(requestedIdleTimeoutSeconds, 86_400);
|
||||
}
|
||||
const telemetry = sanitizeLeaseTelemetry(body.telemetry, now);
|
||||
if (telemetry) {
|
||||
lease.telemetry = telemetry;
|
||||
lease.telemetryHistory = appendLeaseTelemetryHistory(lease.telemetryHistory, telemetry);
|
||||
}
|
||||
lease.updatedAt = now.toISOString();
|
||||
lease.lastTouchedAt = now.toISOString();
|
||||
lease.expiresAt = recomputeLeaseExpiresAt(lease, now).toISOString();
|
||||
await this.putLease(lease);
|
||||
await this.scheduleAlarm();
|
||||
await this.applyLeaseHeartbeat(lease, body);
|
||||
return json({ lease });
|
||||
}
|
||||
if (method === "POST" && action === "tailscale") {
|
||||
@ -778,6 +930,34 @@ export class FleetDurableObject implements DurableObject {
|
||||
return json({ error: "not_found" }, { status: 404 });
|
||||
}
|
||||
|
||||
private async applyLeaseHeartbeat(
|
||||
lease: LeaseRecord,
|
||||
input: {
|
||||
idleTimeoutSeconds?: number;
|
||||
telemetry?: Partial<LeaseTelemetry>;
|
||||
},
|
||||
): Promise<void> {
|
||||
const now = new Date();
|
||||
const requestedIdleTimeoutSeconds = input.idleTimeoutSeconds;
|
||||
if (
|
||||
Number.isFinite(requestedIdleTimeoutSeconds) &&
|
||||
requestedIdleTimeoutSeconds !== undefined &&
|
||||
requestedIdleTimeoutSeconds > 0
|
||||
) {
|
||||
lease.idleTimeoutSeconds = clampLeaseSeconds(requestedIdleTimeoutSeconds, 86_400);
|
||||
}
|
||||
const telemetry = sanitizeLeaseTelemetry(input.telemetry, now);
|
||||
if (telemetry) {
|
||||
lease.telemetry = telemetry;
|
||||
lease.telemetryHistory = appendLeaseTelemetryHistory(lease.telemetryHistory, telemetry);
|
||||
}
|
||||
lease.updatedAt = now.toISOString();
|
||||
lease.lastTouchedAt = now.toISOString();
|
||||
lease.expiresAt = recomputeLeaseExpiresAt(lease, now).toISOString();
|
||||
await this.putLease(lease);
|
||||
await this.scheduleAlarm();
|
||||
}
|
||||
|
||||
private async prepareTailscaleConfig(
|
||||
config: ReturnType<typeof leaseConfig>,
|
||||
input: LeaseRequest,
|
||||
@ -2514,6 +2694,34 @@ export class FleetDurableObject implements DurableObject {
|
||||
return matches[0];
|
||||
}
|
||||
|
||||
private async resolveLeaseForControl(
|
||||
identifier: string,
|
||||
attachment: Extract<BridgeAttachment, { kind: "control" }>,
|
||||
): Promise<LeaseRecord | undefined> {
|
||||
const exact = await this.getLease(identifier);
|
||||
if (exact) {
|
||||
return this.leaseVisibleToControl(exact, attachment) ? exact : undefined;
|
||||
}
|
||||
const slug = normalizeLeaseSlug(identifier);
|
||||
if (!slug) {
|
||||
return undefined;
|
||||
}
|
||||
const now = Date.now();
|
||||
const matches = (await this.leaseRecords()).filter(
|
||||
(lease) =>
|
||||
lease.state === "active" &&
|
||||
Date.parse(lease.expiresAt) > now &&
|
||||
normalizeLeaseSlug(lease.slug) === slug &&
|
||||
this.leaseVisibleToControl(lease, attachment),
|
||||
);
|
||||
if (matches.length > 1) {
|
||||
throw new Error(
|
||||
`ambiguous slug ${slug}: ${matches.map((lease) => `${lease.id}:${lease.owner}`).join(", ")}`,
|
||||
);
|
||||
}
|
||||
return matches[0];
|
||||
}
|
||||
|
||||
private async leaseRecords(): Promise<LeaseRecord[]> {
|
||||
const leases = await this.state.storage.list<LeaseRecord>({ prefix: "lease:" });
|
||||
return [...leases.values()];
|
||||
@ -2594,6 +2802,24 @@ export class FleetDurableObject implements DurableObject {
|
||||
);
|
||||
}
|
||||
|
||||
private runVisibleToControl(
|
||||
run: RunRecord,
|
||||
attachment: Extract<BridgeAttachment, { kind: "control" }>,
|
||||
): boolean {
|
||||
return Boolean(
|
||||
attachment.admin || (run.owner === attachment.owner && run.org === attachment.org),
|
||||
);
|
||||
}
|
||||
|
||||
private leaseVisibleToControl(
|
||||
lease: LeaseRecord,
|
||||
attachment: Extract<BridgeAttachment, { kind: "control" }>,
|
||||
): boolean {
|
||||
return Boolean(
|
||||
attachment.admin || (lease.owner === attachment.owner && lease.org === attachment.org),
|
||||
);
|
||||
}
|
||||
|
||||
private async putLease(lease: LeaseRecord): Promise<void> {
|
||||
await this.state.storage.put(leaseKey(lease.id), lease);
|
||||
}
|
||||
@ -2629,9 +2855,34 @@ export class FleetDurableObject implements DurableObject {
|
||||
run.lastEventAt = now;
|
||||
await this.state.storage.put(runEventKey(run.id, seq), event);
|
||||
await this.putRun(run);
|
||||
this.broadcastRunEvent(run, event);
|
||||
return event;
|
||||
}
|
||||
|
||||
private broadcastRunEvent(run: RunRecord, event: RunEventRecord): void {
|
||||
for (const socket of this.controlSockets.values()) {
|
||||
if (socket.readyState !== WebSocket.OPEN) {
|
||||
continue;
|
||||
}
|
||||
const attachment = bridgeAttachment(socket);
|
||||
if (!attachment || attachment.kind !== "control") {
|
||||
continue;
|
||||
}
|
||||
const after = attachment.subscriptions?.[run.id];
|
||||
if (after === undefined || after >= event.seq || !this.runVisibleToControl(run, attachment)) {
|
||||
continue;
|
||||
}
|
||||
attachment.subscriptions = { ...attachment.subscriptions, [run.id]: event.seq };
|
||||
this.serializeBridgeAttachment(socket, attachment);
|
||||
sendControl(socket, {
|
||||
type: "run_events",
|
||||
runID: run.id,
|
||||
events: [event],
|
||||
nextSeq: event.seq,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private provider(provider: Provider, region = "eu-west-1"): CloudProvider {
|
||||
const testProvider = this.testProviders[provider];
|
||||
if (testProvider) {
|
||||
@ -3134,18 +3385,41 @@ function bridgeAttachment(socket: WebSocket): BridgeAttachment | undefined {
|
||||
return typeof attachment.leaseID === "string" && typeof attachment.sessionID === "string"
|
||||
? attachment
|
||||
: undefined;
|
||||
case "control":
|
||||
return typeof attachment.clientID === "string" &&
|
||||
typeof attachment.owner === "string" &&
|
||||
typeof attachment.org === "string"
|
||||
? {
|
||||
...attachment,
|
||||
subscriptions:
|
||||
attachment.subscriptions && typeof attachment.subscriptions === "object"
|
||||
? attachment.subscriptions
|
||||
: {},
|
||||
}
|
||||
: undefined;
|
||||
default:
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
function bridgeTags(attachment: BridgeAttachment): string[] {
|
||||
if (attachment.kind === "control") {
|
||||
return [`control:${attachment.clientID}`, `owner:${attachment.owner}`, `org:${attachment.org}`];
|
||||
}
|
||||
if (attachment.kind === "egress-host" || attachment.kind === "egress-client") {
|
||||
return [`lease:${attachment.leaseID}`, `session:${attachment.sessionID}`, attachment.kind];
|
||||
}
|
||||
return [`lease:${attachment.leaseID}`, attachment.kind];
|
||||
}
|
||||
|
||||
function sendControl(socket: WebSocket, payload: unknown): void {
|
||||
try {
|
||||
socket.send(JSON.stringify(payload));
|
||||
} catch {
|
||||
closeSocket(socket, 1011, "control send failed");
|
||||
}
|
||||
}
|
||||
|
||||
function bytesToBase64(bytes: Uint8Array): string {
|
||||
let binary = "";
|
||||
for (const byte of bytes) {
|
||||
@ -3282,6 +3556,12 @@ function finiteQueryNumber(value: string | null): number | undefined {
|
||||
return Number.isFinite(parsed) && parsed >= 0 ? Math.trunc(parsed) : undefined;
|
||||
}
|
||||
|
||||
function finiteControlNumber(value: number | undefined): number | undefined {
|
||||
return Number.isFinite(value) && value !== undefined && value >= 0
|
||||
? Math.trunc(value)
|
||||
: undefined;
|
||||
}
|
||||
|
||||
function boundedEgressString(value: string | undefined): string | undefined {
|
||||
const normalized = String(value ?? "").trim();
|
||||
if (!normalized) {
|
||||
|
||||
@ -61,6 +61,40 @@ class MemoryStorage {
|
||||
}
|
||||
}
|
||||
|
||||
class FakeWebSocket {
|
||||
readyState = WebSocket.OPEN;
|
||||
private attachment: unknown;
|
||||
private readonly sent: string[] = [];
|
||||
|
||||
constructor(attachment?: unknown) {
|
||||
this.attachment = attachment;
|
||||
}
|
||||
|
||||
send(data: string): void {
|
||||
this.sent.push(data);
|
||||
}
|
||||
|
||||
close(): void {
|
||||
this.readyState = WebSocket.CLOSED;
|
||||
}
|
||||
|
||||
accept(): void {}
|
||||
|
||||
addEventListener(): void {}
|
||||
|
||||
serializeAttachment(attachment: unknown): void {
|
||||
this.attachment = attachment;
|
||||
}
|
||||
|
||||
deserializeAttachment(): unknown {
|
||||
return this.attachment;
|
||||
}
|
||||
|
||||
sentJSON(): unknown[] {
|
||||
return this.sent.map((value) => JSON.parse(value) as unknown);
|
||||
}
|
||||
}
|
||||
|
||||
describe("fleet lease identity and idle", () => {
|
||||
it("creates leases through the public route with slug and idle metadata", async () => {
|
||||
const storage = new MemoryStorage();
|
||||
@ -1972,6 +2006,89 @@ describe("fleet run history", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("streams run events and lease heartbeats over a control websocket", async () => {
|
||||
const storage = new MemoryStorage();
|
||||
const fleet = testFleet(storage);
|
||||
const headers = {
|
||||
"x-crabbox-owner": "peter@example.com",
|
||||
"x-crabbox-org": "openclaw",
|
||||
};
|
||||
storage.seed(
|
||||
"lease:cbx_000000000001",
|
||||
testLease({
|
||||
id: "cbx_000000000001",
|
||||
slug: "blue-lobster",
|
||||
owner: "peter@example.com",
|
||||
org: "openclaw",
|
||||
expiresAt: new Date(Date.now() + 60 * 60 * 1000).toISOString(),
|
||||
}),
|
||||
);
|
||||
storage.seed(
|
||||
"run:run_000000000001",
|
||||
testRun({
|
||||
id: "run_000000000001",
|
||||
leaseID: "cbx_000000000001",
|
||||
owner: "peter@example.com",
|
||||
org: "openclaw",
|
||||
eventCount: 1,
|
||||
}),
|
||||
);
|
||||
storage.seed("runevent:run_000000000001:000000000001", {
|
||||
runID: "run_000000000001",
|
||||
seq: 1,
|
||||
type: "run.started",
|
||||
phase: "starting",
|
||||
createdAt: "2026-05-01T00:00:00.000Z",
|
||||
});
|
||||
const socket = new FakeWebSocket({
|
||||
kind: "control",
|
||||
clientID: "ctrl_1",
|
||||
owner: "peter@example.com",
|
||||
org: "openclaw",
|
||||
subscriptions: {},
|
||||
});
|
||||
(
|
||||
fleet as unknown as {
|
||||
controlSockets: Map<string, WebSocket>;
|
||||
}
|
||||
).controlSockets.set("ctrl_1", socket as unknown as WebSocket);
|
||||
|
||||
await fleet.webSocketMessage(
|
||||
socket as unknown as WebSocket,
|
||||
JSON.stringify({ type: "subscribe_run", runID: "run_000000000001", after: 0 }),
|
||||
);
|
||||
expect(socket.sentJSON()[0]).toMatchObject({
|
||||
type: "run_events",
|
||||
runID: "run_000000000001",
|
||||
nextSeq: 1,
|
||||
events: [{ seq: 1, type: "run.started" }],
|
||||
});
|
||||
|
||||
await fleet.fetch(
|
||||
request("POST", "/v1/runs/run_000000000001/events", {
|
||||
headers,
|
||||
body: { type: "stdout", stream: "stdout", data: "ok\n" },
|
||||
}),
|
||||
);
|
||||
expect(socket.sentJSON()[1]).toMatchObject({
|
||||
type: "run_events",
|
||||
runID: "run_000000000001",
|
||||
nextSeq: 2,
|
||||
events: [{ seq: 2, type: "stdout", data: "ok\n" }],
|
||||
});
|
||||
|
||||
await fleet.webSocketMessage(
|
||||
socket as unknown as WebSocket,
|
||||
JSON.stringify({ type: "heartbeat", leaseID: "blue-lobster", idleTimeoutSeconds: 900 }),
|
||||
);
|
||||
expect(socket.sentJSON()[2]).toMatchObject({
|
||||
type: "heartbeat",
|
||||
leaseID: "cbx_000000000001",
|
||||
ok: true,
|
||||
});
|
||||
expect(storage.value<LeaseRecord>("lease:cbx_000000000001")?.idleTimeoutSeconds).toBe(900);
|
||||
});
|
||||
|
||||
it("records finished runs and serves logs", async () => {
|
||||
const fleet = testFleet();
|
||||
const ownerHeaders = {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user