diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ede327..7117eed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - Added per-lease portal detail pages with bridge status, pasteable commands, recent run links, and a stop action. - Added portal run detail pages with command metadata, result summaries, dense viewport-fitted portal tables, provider/OS badges, active/ended/provider/target filters, sticky portal chrome, and copyable retained log previews. - Added admin portal visibility for non-owned runner leases, including `mine`/`system` filters and matching detail/code/VNC drilldowns for operator sessions. +- Added latest lease telemetry snapshots for coordinator-backed Linux leases, including load, memory, disk, and uptime in `status --json` and the portal detail view. - Added `.crabboxignore` for repo-local sync-only exclude patterns shared by `run` and `sync-plan`. - Documented the prebaked runner image boundary: provider-owned AMIs/snapshots hold machine capabilities while repo/runtime caches stay in QA workflows or warm leases. diff --git a/docs/architecture.md b/docs/architecture.md index e57bfb1..d3db3ba 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -42,7 +42,7 @@ leased machine 8. CLI seeds remote Git when possible, compares sync fingerprints, and syncs changed files with `rsync --delete`. 9. CLI runs sync sanity and configured base-ref hydration. 10. CLI runs the command over SSH and streams stdout/stderr. -11. CLI heartbeats while the command runs; heartbeats touch `lastTouchedAt` and recompute idle expiry up to the TTL cap. +11. CLI heartbeats while the command runs; heartbeats touch `lastTouchedAt`, recompute idle expiry up to the TTL cap, and attach a best-effort latest Linux telemetry snapshot when SSH is reachable. 12. CLI releases the lease when done. 13. Durable Object alarm cleans up stale leases and expired machines. @@ -72,6 +72,8 @@ POST /v1/admin/leases/{id-or-slug}/delete Admin endpoints and `GET /v1/pool` require the separate admin token. GitHub browser-login tokens are user tokens for normal lease operations and are minted only after allowed GitHub org membership is verified. User-token list, exact-ID lookup, slug lookup, heartbeat, release, run history, logs, and usage are scoped to the token owner/org. +Heartbeat bodies may include a `telemetry` object. The coordinator stores only the latest sanitized snapshot on the lease record. Current CLI snapshots include Linux load average, memory use, root-disk use, uptime, source, and capture timestamp. + ## Durable Object State Use one fleet Durable Object for MVP. It owns all atomic scheduling decisions. diff --git a/docs/commands/status.md b/docs/commands/status.md index 26e84eb..c2e5898 100644 --- a/docs/commands/status.md +++ b/docs/commands/status.md @@ -30,4 +30,6 @@ Flags: ``` Human and JSON output include the selected network. With Tailscale metadata, -status also prints the tailnet host/state. +status also prints the tailnet host/state. For coordinator-backed Linux leases +that have received a recent heartbeat, status also includes the latest +best-effort telemetry snapshot: load, memory, disk, uptime, and capture age. diff --git a/docs/orchestrator.md b/docs/orchestrator.md index dc167f2..99acf2a 100644 --- a/docs/orchestrator.md +++ b/docs/orchestrator.md @@ -43,6 +43,8 @@ The Worker stores coordinator leases as `active`, `released`, `expired`, or `fai `crabbox warmup --idle-timeout 30m` and `crabbox run --idle-timeout 30m` set inactivity expiry. `--ttl` is a separate maximum wall-clock lifetime. The CLI sends coordinator heartbeats while a lease is in use; each heartbeat updates `lastTouchedAt` and recomputes `expiresAt = min(createdAt + ttl, lastTouchedAt + idleTimeout)`. +For Linux leases, heartbeats also attach a best-effort latest telemetry snapshot when SSH is reachable. The Durable Object keeps only the latest sanitized load, memory, disk, uptime, source, and capture timestamp on the lease record; it is not a time-series store. + Direct-provider mode does not have a central heartbeat or alarm. It labels machines with `created_at`, `last_touched_at`, `idle_timeout_secs`, `expires_at`, `state`, `lease`, and `slug`; `crabbox cleanup` uses those labels conservatively. ## Cleanup diff --git a/internal/cli/actions.go b/internal/cli/actions.go index b75e4b5..db1ca71 100644 --- a/internal/cli/actions.go +++ b/internal/cli/actions.go @@ -81,7 +81,7 @@ func (a App) actionsHydrate(ctx context.Context, args []string) error { if coord, ok, err := newTargetCoordinatorClient(cfg); err != nil { return err } else if ok { - stopHeartbeat := startCoordinatorHeartbeat(ctx, coord, leaseID, cfg.IdleTimeout, nil, a.Stderr) + stopHeartbeat := startCoordinatorHeartbeat(ctx, coord, leaseID, cfg.IdleTimeout, nil, leaseTelemetryCollectorForTarget(target), a.Stderr) defer stopHeartbeat() } else { a.touchActiveLeaseBestEffort(ctx, cfg, server, leaseID) diff --git a/internal/cli/coordinator.go b/internal/cli/coordinator.go index 4fa0ea3..870daaf 100644 --- a/internal/cli/coordinator.go +++ b/internal/cli/coordinator.go @@ -57,6 +57,7 @@ type CoordinatorLease struct { UpdatedAt string `json:"updatedAt,omitempty"` LastTouchedAt string `json:"lastTouchedAt,omitempty"` ExpiresAt string `json:"expiresAt"` + Telemetry *LeaseTelemetry `json:"telemetry,omitempty"` } type ProvisioningAttempt struct { @@ -378,26 +379,37 @@ func (c *CoordinatorClient) ReleaseLease(ctx context.Context, id string, deleteS } func (c *CoordinatorClient) TouchLease(ctx context.Context, id string) (CoordinatorLease, error) { - return c.heartbeatLease(ctx, id, nil) + return c.heartbeatLease(ctx, id, nil, nil) +} + +func (c *CoordinatorClient) TouchLeaseWithTelemetry(ctx context.Context, id string, telemetry *LeaseTelemetry) (CoordinatorLease, error) { + return c.heartbeatLease(ctx, id, nil, telemetry) } func (c *CoordinatorClient) UpdateLeaseIdleTimeout(ctx context.Context, id string, idleTimeout time.Duration) (CoordinatorLease, error) { - return c.heartbeatLease(ctx, id, &idleTimeout) + return c.heartbeatLease(ctx, id, &idleTimeout, nil) } -func (c *CoordinatorClient) heartbeatLease(ctx context.Context, id string, idleTimeout *time.Duration) (CoordinatorLease, error) { +func (c *CoordinatorClient) UpdateLeaseIdleTimeoutWithTelemetry(ctx context.Context, id string, idleTimeout time.Duration, telemetry *LeaseTelemetry) (CoordinatorLease, error) { + return c.heartbeatLease(ctx, id, &idleTimeout, telemetry) +} + +func (c *CoordinatorClient) heartbeatLease(ctx context.Context, id string, idleTimeout *time.Duration, telemetry *LeaseTelemetry) (CoordinatorLease, error) { var res struct { Lease CoordinatorLease `json:"lease"` } - err := c.do(ctx, http.MethodPost, "/v1/leases/"+url.PathEscape(id)+"/heartbeat", heartbeatRequestBody(idleTimeout), &res) + err := c.do(ctx, http.MethodPost, "/v1/leases/"+url.PathEscape(id)+"/heartbeat", heartbeatRequestBody(idleTimeout, telemetry), &res) return res.Lease, err } -func heartbeatRequestBody(idleTimeout *time.Duration) map[string]any { +func heartbeatRequestBody(idleTimeout *time.Duration, telemetry *LeaseTelemetry) map[string]any { body := map[string]any{} if idleTimeout != nil && *idleTimeout > 0 { body["idleTimeoutSeconds"] = int(idleTimeout.Seconds()) } + if telemetry != nil { + body["telemetry"] = telemetry + } return body } diff --git a/internal/cli/coordinator_test.go b/internal/cli/coordinator_test.go index 6977eab..2a9965c 100644 --- a/internal/cli/coordinator_test.go +++ b/internal/cli/coordinator_test.go @@ -222,14 +222,19 @@ func TestCoordinatorHTTPAddsAccessHeaders(t *testing.T) { } func TestHeartbeatRequestBodyOmitsIdleTimeoutForTouch(t *testing.T) { - if body := heartbeatRequestBody(nil); len(body) != 0 { + if body := heartbeatRequestBody(nil, nil); len(body) != 0 { t.Fatalf("touch heartbeat body=%v, want empty", body) } idleTimeout := 45 * time.Minute - body := heartbeatRequestBody(&idleTimeout) + body := heartbeatRequestBody(&idleTimeout, nil) if body["idleTimeoutSeconds"] != 2700 { t.Fatalf("heartbeat body=%v, want idle timeout seconds", body) } + load := 0.42 + body = heartbeatRequestBody(nil, &LeaseTelemetry{Load1: &load}) + if body["telemetry"] == nil { + t.Fatalf("heartbeat body=%v, want telemetry", body) + } } func TestCoordinatorTouchAndUpdateHeartbeatBodies(t *testing.T) { @@ -248,10 +253,14 @@ func TestCoordinatorTouchAndUpdateHeartbeatBodies(t *testing.T) { if _, err := client.TouchLease(context.Background(), "cbx_123"); err != nil { t.Fatal(err) } + load := 0.42 + if _, err := client.TouchLeaseWithTelemetry(context.Background(), "cbx_123", &LeaseTelemetry{Load1: &load}); err != nil { + t.Fatal(err) + } if _, err := client.UpdateLeaseIdleTimeout(context.Background(), "cbx_123", 45*time.Minute); err != nil { t.Fatal(err) } - if len(bodies) != 2 || bodies[0] != "{}" || !strings.Contains(bodies[1], `"idleTimeoutSeconds":2700`) { + if len(bodies) != 3 || bodies[0] != "{}" || !strings.Contains(bodies[1], `"load1":0.42`) || !strings.Contains(bodies[2], `"idleTimeoutSeconds":2700`) { t.Fatalf("heartbeat bodies=%q", bodies) } } @@ -269,7 +278,7 @@ func TestCoordinatorHeartbeatTouchesImmediately(t *testing.T) { defer server.Close() client := CoordinatorClient{BaseURL: server.URL, Client: server.Client()} - stop := startCoordinatorHeartbeat(context.Background(), &client, "cbx_123", 30*time.Minute, nil, io.Discard) + stop := startCoordinatorHeartbeat(context.Background(), &client, "cbx_123", 30*time.Minute, nil, nil, io.Discard) defer stop() select { @@ -279,6 +288,37 @@ func TestCoordinatorHeartbeatTouchesImmediately(t *testing.T) { } } +func TestCoordinatorHeartbeatIncludesTelemetry(t *testing.T) { + bodies := make(chan string, 1) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/v1/leases/cbx_123/heartbeat" { + t.Fatalf("unexpected path %s", r.URL.Path) + } + data, _ := io.ReadAll(r.Body) + bodies <- string(data) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"lease":{"id":"cbx_123","provider":"aws","state":"active","expiresAt":"2026-05-01T00:30:00Z"}}`)) + })) + defer server.Close() + + load := 0.77 + collector := func(context.Context) (*LeaseTelemetry, error) { + return &LeaseTelemetry{Load1: &load}, nil + } + client := CoordinatorClient{BaseURL: server.URL, Client: server.Client()} + stop := startCoordinatorHeartbeat(context.Background(), &client, "cbx_123", 30*time.Minute, nil, collector, io.Discard) + defer stop() + + select { + case body := <-bodies: + if !strings.Contains(body, `"load1":0.77`) { + t.Fatalf("heartbeat body=%s, want telemetry", body) + } + case <-time.After(2 * time.Second): + t.Fatal("heartbeat did not touch immediately") + } +} + func TestCoordinatorLeaseWatchCancelsWhenLeaseReleased(t *testing.T) { oldInterval := coordinatorLeaseWatchInterval coordinatorLeaseWatchInterval = 10 * time.Millisecond diff --git a/internal/cli/run.go b/internal/cli/run.go index 97a84c7..4a707f9 100644 --- a/internal/cli/run.go +++ b/internal/cli/run.go @@ -299,7 +299,7 @@ func (a App) runCommand(ctx context.Context, args []string) (err error) { return recordFailure(err) } } - stopHeartbeat := startCoordinatorHeartbeat(ctx, coord, leaseID, cfg.IdleTimeout, heartbeatIdleTimeout, a.Stderr) + stopHeartbeat := startCoordinatorHeartbeat(ctx, coord, leaseID, cfg.IdleTimeout, heartbeatIdleTimeout, leaseTelemetryCollectorForTarget(target), a.Stderr) defer stopHeartbeat() } @@ -710,7 +710,7 @@ func (a App) acquireCoordinator(ctx context.Context, cfg Config, coord *Coordina } waitCtx, cancelWait := context.WithCancelCause(ctx) defer cancelWait(nil) - stopHeartbeat := startCoordinatorHeartbeat(waitCtx, coord, leaseID, cfg.IdleTimeout, nil, a.Stderr) + stopHeartbeat := startCoordinatorHeartbeat(waitCtx, coord, leaseID, cfg.IdleTimeout, nil, leaseTelemetryCollectorForTarget(target), a.Stderr) defer stopHeartbeat() stopLeaseWatch := startCoordinatorLeaseWatch(waitCtx, coord, leaseID, cancelWait, a.Stderr) defer stopLeaseWatch() @@ -845,7 +845,7 @@ func (a App) releaseAcquiredLeaseBestEffort(ctx context.Context, cfg Config, coo removeLeaseClaim(leaseID) } -func startCoordinatorHeartbeat(ctx context.Context, coord *CoordinatorClient, leaseID string, idleTimeout time.Duration, updateIdleTimeout *time.Duration, stderr io.Writer) func() { +func startCoordinatorHeartbeat(ctx context.Context, coord *CoordinatorClient, leaseID string, idleTimeout time.Duration, updateIdleTimeout *time.Duration, telemetryCollector leaseTelemetryCollector, stderr io.Writer) func() { rootCtx, cancel := context.WithCancel(ctx) interval := heartbeatInterval(idleTimeout) done := make(chan struct{}) @@ -854,12 +854,13 @@ func startCoordinatorHeartbeat(ctx context.Context, coord *CoordinatorClient, le ticker := time.NewTicker(interval) defer ticker.Stop() for { + telemetry := collectLeaseTelemetryBestEffort(rootCtx, telemetryCollector) callCtx, heartbeatCancel := context.WithTimeout(rootCtx, 20*time.Second) var err error if updateIdleTimeout != nil { - _, err = coord.UpdateLeaseIdleTimeout(callCtx, leaseID, *updateIdleTimeout) + _, err = coord.UpdateLeaseIdleTimeoutWithTelemetry(callCtx, leaseID, *updateIdleTimeout, telemetry) } else { - _, err = coord.TouchLease(callCtx, leaseID) + _, err = coord.TouchLeaseWithTelemetry(callCtx, leaseID, telemetry) } heartbeatCancel() if err != nil && rootCtx.Err() == nil { diff --git a/internal/cli/status.go b/internal/cli/status.go index a148b58..cd22303 100644 --- a/internal/cli/status.go +++ b/internal/cli/status.go @@ -49,7 +49,11 @@ func (a App) status(ctx context.Context, args []string) error { if state.Tailscale != nil && state.Tailscale.Enabled { tailscale = fmt.Sprintf(" tailscale=%s", blank(tailscaleTargetHost(*state.Tailscale), blank(state.Tailscale.State, "requested"))) } - fmt.Fprintf(a.Stdout, "%s slug=%s provider=%s target=%s windows_mode=%s state=%s type=%s host=%s network=%s%s ready=%t has_host=%t idle_for=%s idle_timeout=%s expires=%s\n", state.ID, blank(state.Slug, "-"), state.Provider, state.TargetOS, blank(state.WindowsMode, "-"), state.State, state.ServerType, state.Host, state.Network, tailscale, state.Ready, state.HasHost, blank(state.IdleFor, "-"), blank(state.IdleTimeout, "-"), blank(state.ExpiresAt, "-")) + telemetry := leaseTelemetryStatusSummary(state.Telemetry) + if telemetry != "" { + telemetry = " " + telemetry + } + fmt.Fprintf(a.Stdout, "%s slug=%s provider=%s target=%s windows_mode=%s state=%s type=%s host=%s network=%s%s ready=%t has_host=%t idle_for=%s idle_timeout=%s expires=%s%s\n", state.ID, blank(state.Slug, "-"), state.Provider, state.TargetOS, blank(state.WindowsMode, "-"), state.State, state.ServerType, state.Host, state.Network, tailscale, state.Ready, state.HasHost, blank(state.IdleFor, "-"), blank(state.IdleTimeout, "-"), blank(state.ExpiresAt, "-"), telemetry) } if !*wait || state.Ready { return nil @@ -85,6 +89,7 @@ type statusView struct { Labels map[string]string `json:"labels,omitempty"` HasHost bool `json:"hasHost"` Ready bool `json:"ready"` + Telemetry *LeaseTelemetry `json:"telemetry,omitempty"` } func (a App) leaseStatus(ctx context.Context, cfg Config, id string) (statusView, error) { @@ -127,6 +132,7 @@ func (a App) leaseStatus(ctx context.Context, cfg Config, id string) (statusView Labels: map[string]string{"keep": fmt.Sprint(lease.Keep)}, HasHost: hasHost, Ready: ready, + Telemetry: lease.Telemetry, }, nil } server, target, leaseID, err := a.findLease(ctx, cfg, id) diff --git a/internal/cli/telemetry.go b/internal/cli/telemetry.go new file mode 100644 index 0000000..9831040 --- /dev/null +++ b/internal/cli/telemetry.go @@ -0,0 +1,181 @@ +package cli + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" +) + +const leaseTelemetryTimeout = 5 * time.Second + +type LeaseTelemetry struct { + CapturedAt string `json:"capturedAt,omitempty"` + Source string `json:"source,omitempty"` + Load1 *float64 `json:"load1,omitempty"` + Load5 *float64 `json:"load5,omitempty"` + Load15 *float64 `json:"load15,omitempty"` + MemoryUsedBytes *int64 `json:"memoryUsedBytes,omitempty"` + MemoryTotalBytes *int64 `json:"memoryTotalBytes,omitempty"` + MemoryPercent *float64 `json:"memoryPercent,omitempty"` + DiskUsedBytes *int64 `json:"diskUsedBytes,omitempty"` + DiskTotalBytes *int64 `json:"diskTotalBytes,omitempty"` + DiskPercent *float64 `json:"diskPercent,omitempty"` + UptimeSeconds *int64 `json:"uptimeSeconds,omitempty"` +} + +type leaseTelemetryCollector func(context.Context) (*LeaseTelemetry, error) + +func leaseTelemetryCollectorForTarget(target SSHTarget) leaseTelemetryCollector { + return func(ctx context.Context) (*LeaseTelemetry, error) { + return collectLeaseTelemetry(ctx, target) + } +} + +func collectLeaseTelemetry(ctx context.Context, target SSHTarget) (*LeaseTelemetry, error) { + if target.Host == "" { + return nil, nil + } + if target.TargetOS != "" && target.TargetOS != targetLinux { + return nil, nil + } + output, err := runSSHOutput(ctx, target, remoteLeaseTelemetryScript()) + if err != nil { + return nil, err + } + return parseLeaseTelemetry(output, time.Now().UTC(), "ssh-linux"), nil +} + +func collectLeaseTelemetryBestEffort(ctx context.Context, collector leaseTelemetryCollector) *LeaseTelemetry { + if collector == nil { + return nil + } + callCtx, cancel := context.WithTimeout(ctx, leaseTelemetryTimeout) + defer cancel() + telemetry, err := collector(callCtx) + if err != nil { + return nil + } + return telemetry +} + +func remoteLeaseTelemetryScript() string { + return `set +e +if [ -r /proc/loadavg ]; then awk '{print "load1="$1; print "load5="$2; print "load15="$3}' /proc/loadavg; fi +if [ -r /proc/meminfo ]; then awk ' + /^MemTotal:/ { total=$2*1024 } + /^MemAvailable:/ { available=$2*1024 } + END { + if (total > 0) { + used=total-available + if (used < 0) used=0 + printf "memoryTotalBytes=%.0f\n", total + printf "memoryUsedBytes=%.0f\n", used + printf "memoryPercent=%.2f\n", used*100/total + } + }' /proc/meminfo; fi +df -PB1 / 2>/dev/null | awk 'NR==2 { print "diskTotalBytes="$2; print "diskUsedBytes="$3; if ($2 > 0) printf "diskPercent=%.2f\n", $3*100/$2 }' +if [ -r /proc/uptime ]; then awk '{printf "uptimeSeconds=%.0f\n", $1}' /proc/uptime; fi` +} + +func parseLeaseTelemetry(output string, capturedAt time.Time, source string) *LeaseTelemetry { + telemetry := LeaseTelemetry{ + CapturedAt: capturedAt.Format(time.RFC3339), + Source: source, + } + hasMetric := false + for _, line := range strings.Split(output, "\n") { + key, raw, ok := strings.Cut(strings.TrimSpace(line), "=") + if !ok || key == "" || raw == "" { + continue + } + switch key { + case "load1": + hasMetric = setFloat(&telemetry.Load1, raw) || hasMetric + case "load5": + hasMetric = setFloat(&telemetry.Load5, raw) || hasMetric + case "load15": + hasMetric = setFloat(&telemetry.Load15, raw) || hasMetric + case "memoryUsedBytes": + hasMetric = setInt64(&telemetry.MemoryUsedBytes, raw) || hasMetric + case "memoryTotalBytes": + hasMetric = setInt64(&telemetry.MemoryTotalBytes, raw) || hasMetric + case "memoryPercent": + hasMetric = setFloat(&telemetry.MemoryPercent, raw) || hasMetric + case "diskUsedBytes": + hasMetric = setInt64(&telemetry.DiskUsedBytes, raw) || hasMetric + case "diskTotalBytes": + hasMetric = setInt64(&telemetry.DiskTotalBytes, raw) || hasMetric + case "diskPercent": + hasMetric = setFloat(&telemetry.DiskPercent, raw) || hasMetric + case "uptimeSeconds": + hasMetric = setInt64(&telemetry.UptimeSeconds, raw) || hasMetric + } + } + if !hasMetric { + return nil + } + return &telemetry +} + +func setFloat(dst **float64, raw string) bool { + value, err := strconv.ParseFloat(strings.TrimSpace(raw), 64) + if err != nil || value < 0 { + return false + } + *dst = &value + return true +} + +func setInt64(dst **int64, raw string) bool { + value, err := strconv.ParseFloat(strings.TrimSpace(raw), 64) + if err != nil || value < 0 { + return false + } + rounded := int64(value) + *dst = &rounded + return true +} + +func leaseTelemetryStatusSummary(telemetry *LeaseTelemetry) string { + if telemetry == nil { + return "" + } + parts := []string{} + if telemetry.Load1 != nil { + parts = append(parts, fmt.Sprintf("load=%.2f", *telemetry.Load1)) + } + if telemetry.MemoryUsedBytes != nil && telemetry.MemoryTotalBytes != nil { + parts = append(parts, fmt.Sprintf("mem=%s/%s", formatBytesCompact(*telemetry.MemoryUsedBytes), formatBytesCompact(*telemetry.MemoryTotalBytes))) + } else if telemetry.MemoryPercent != nil { + parts = append(parts, fmt.Sprintf("mem=%.0f%%", *telemetry.MemoryPercent)) + } + if telemetry.DiskUsedBytes != nil && telemetry.DiskTotalBytes != nil { + parts = append(parts, fmt.Sprintf("disk=%s/%s", formatBytesCompact(*telemetry.DiskUsedBytes), formatBytesCompact(*telemetry.DiskTotalBytes))) + } else if telemetry.DiskPercent != nil { + parts = append(parts, fmt.Sprintf("disk=%.0f%%", *telemetry.DiskPercent)) + } + if telemetry.UptimeSeconds != nil { + parts = append(parts, fmt.Sprintf("uptime=%s", formatSecondsDuration(int(*telemetry.UptimeSeconds)))) + } + if telemetry.CapturedAt != "" { + parts = append(parts, "telemetry="+blank(idleForString(telemetry.CapturedAt, time.Now()), "now")) + } + return strings.Join(parts, " ") +} + +func formatBytesCompact(value int64) string { + const unit = 1024 + if value < unit { + return fmt.Sprintf("%dB", value) + } + f := float64(value) + for _, suffix := range []string{"KiB", "MiB", "GiB", "TiB"} { + f /= unit + if f < unit { + return fmt.Sprintf("%.1f%s", f, suffix) + } + } + return fmt.Sprintf("%.1fPiB", f/unit) +} diff --git a/internal/cli/telemetry_test.go b/internal/cli/telemetry_test.go new file mode 100644 index 0000000..c30d15b --- /dev/null +++ b/internal/cli/telemetry_test.go @@ -0,0 +1,46 @@ +package cli + +import ( + "strings" + "testing" + "time" +) + +func TestParseLeaseTelemetry(t *testing.T) { + telemetry := parseLeaseTelemetry(strings.Join([]string{ + "load1=0.12", + "load5=0.34", + "load15=0.56", + "memoryUsedBytes=1073741824", + "memoryTotalBytes=2147483648", + "memoryPercent=50.00", + "diskUsedBytes=3221225472", + "diskTotalBytes=10737418240", + "diskPercent=30.00", + "uptimeSeconds=3661", + }, "\n"), time.Date(2026, 5, 5, 1, 2, 3, 0, time.UTC), "test") + if telemetry == nil { + t.Fatal("telemetry nil") + } + if telemetry.CapturedAt != "2026-05-05T01:02:03Z" || telemetry.Source != "test" { + t.Fatalf("metadata=%#v", telemetry) + } + if telemetry.Load1 == nil || *telemetry.Load1 != 0.12 { + t.Fatalf("load1=%v", telemetry.Load1) + } + if telemetry.MemoryUsedBytes == nil || *telemetry.MemoryUsedBytes != 1073741824 { + t.Fatalf("memoryUsedBytes=%v", telemetry.MemoryUsedBytes) + } + if telemetry.DiskPercent == nil || *telemetry.DiskPercent != 30 { + t.Fatalf("diskPercent=%v", telemetry.DiskPercent) + } + if telemetry.UptimeSeconds == nil || *telemetry.UptimeSeconds != 3661 { + t.Fatalf("uptimeSeconds=%v", telemetry.UptimeSeconds) + } +} + +func TestParseLeaseTelemetrySkipsEmptyMetrics(t *testing.T) { + if telemetry := parseLeaseTelemetry("noise\nload1=-1\n", time.Now(), "test"); telemetry != nil { + t.Fatalf("telemetry=%#v, want nil", telemetry) + } +} diff --git a/worker/src/fleet.ts b/worker/src/fleet.ts index 945d129..4a3c20a 100644 --- a/worker/src/fleet.ts +++ b/worker/src/fleet.ts @@ -25,6 +25,7 @@ import type { Env, LeaseRecord, LeaseRequest, + LeaseTelemetry, Provider, ProviderImage, ProviderMachine, @@ -533,7 +534,10 @@ export class FleetDurableObject implements DurableObject { if (!lease) { return notFound(); } - const body = await optionalJson<{ idleTimeoutSeconds?: number }>(request); + const body = await optionalJson<{ + idleTimeoutSeconds?: number; + telemetry?: Partial; + }>(request); const now = new Date(); const requestedIdleTimeoutSeconds = body.idleTimeoutSeconds; if ( @@ -543,6 +547,10 @@ export class FleetDurableObject implements DurableObject { ) { lease.idleTimeoutSeconds = clampLeaseSeconds(requestedIdleTimeoutSeconds, 86_400); } + const telemetry = sanitizeLeaseTelemetry(body.telemetry, now); + if (telemetry) { + lease.telemetry = telemetry; + } lease.updatedAt = now.toISOString(); lease.lastTouchedAt = now.toISOString(); lease.expiresAt = recomputeLeaseExpiresAt(lease, now).toISOString(); @@ -2556,6 +2564,65 @@ function clampLeaseSeconds(value: number | undefined, max: number): number { return Math.min(Math.trunc(value), max); } +function sanitizeLeaseTelemetry( + input: Partial | undefined, + now: Date, +): LeaseTelemetry | undefined { + if (!input || typeof input !== "object") { + return undefined; + } + const telemetry: LeaseTelemetry = { + capturedAt: sanitizeTelemetryTimestamp(input.capturedAt, now), + }; + const source = typeof input.source === "string" ? input.source.trim() : ""; + if (source) { + telemetry.source = source.slice(0, 32); + } + let hasMetric = false; + for (const [key, max] of [ + ["load1", 10_000], + ["load5", 10_000], + ["load15", 10_000], + ["memoryPercent", 100], + ["diskPercent", 100], + ] as const) { + const value = sanitizeTelemetryNumber(input[key], max); + if (value !== undefined) { + telemetry[key] = value; + hasMetric = true; + } + } + for (const key of [ + "memoryUsedBytes", + "memoryTotalBytes", + "diskUsedBytes", + "diskTotalBytes", + "uptimeSeconds", + ] as const) { + const value = sanitizeTelemetryNumber(input[key], Number.MAX_SAFE_INTEGER); + if (value !== undefined) { + telemetry[key] = Math.trunc(value); + hasMetric = true; + } + } + return hasMetric ? telemetry : undefined; +} + +function sanitizeTelemetryTimestamp(value: string | undefined, now: Date): string { + const parsed = Date.parse(value ?? ""); + if (!Number.isFinite(parsed)) { + return now.toISOString(); + } + return new Date(parsed).toISOString(); +} + +function sanitizeTelemetryNumber(value: unknown, max: number): number | undefined { + if (typeof value !== "number" || !Number.isFinite(value) || value < 0) { + return undefined; + } + return Math.min(value, max); +} + function allocateLeaseSlug( requested: string, leaseID: string, diff --git a/worker/src/portal.ts b/worker/src/portal.ts index cc2b247..357b066 100644 --- a/worker/src/portal.ts +++ b/worker/src/portal.ts @@ -129,6 +129,7 @@ export function portalLeaseDetail( ${metaRow("host", lease.host || "pending")} ${metaRow("ssh", lease.sshPort ? `${lease.sshUser || "crabbox"}@${lease.host || "host"}:${lease.sshPort}` : "pending")} ${metaRow("work root", lease.workRoot || "pending")} + ${leaseTelemetryRows(lease.telemetry)} ${metaRow("expires", shortTime(lease.expiresAt))} ${ @@ -720,6 +721,53 @@ function targetBadge(target: string | undefined, windowsMode?: string): string { return `${targetIcon(value)}${escapeHTML(label)}`; } +function leaseTelemetryRows(telemetry: LeaseRecord["telemetry"]): string { + if (!telemetry) { + return ""; + } + return [ + metaRow("load", telemetryLoad(telemetry)), + metaRow( + "memory", + telemetryStorage( + telemetry.memoryUsedBytes, + telemetry.memoryTotalBytes, + telemetry.memoryPercent, + ), + ), + metaRow( + "disk", + telemetryStorage(telemetry.diskUsedBytes, telemetry.diskTotalBytes, telemetry.diskPercent), + ), + metaRow( + "uptime", + telemetry.uptimeSeconds !== undefined ? formatSeconds(telemetry.uptimeSeconds) : undefined, + ), + metaRow("seen", telemetry.capturedAt ? relativeTime(telemetry.capturedAt) : undefined), + ].join(""); +} + +function telemetryLoad(telemetry: LeaseRecord["telemetry"]): string | undefined { + if (!telemetry || telemetry.load1 === undefined) { + return undefined; + } + const load5 = telemetry.load5 === undefined ? "" : ` / ${telemetry.load5.toFixed(2)}`; + const load15 = telemetry.load15 === undefined ? "" : ` / ${telemetry.load15.toFixed(2)}`; + return `${telemetry.load1.toFixed(2)}${load5}${load15}`; +} + +function telemetryStorage( + used: number | undefined, + total: number | undefined, + percent: number | undefined, +): string | undefined { + if (used !== undefined && total !== undefined) { + const percentLabel = percent === undefined ? "" : ` (${Math.round(percent)}%)`; + return `${formatBytes(used)} / ${formatBytes(total)}${percentLabel}`; + } + return percent === undefined ? undefined : `${Math.round(percent)}%`; +} + function providerIcon(provider: string): string { if (provider === "aws") { return ``; @@ -1231,6 +1279,22 @@ function formatDuration(value: number | undefined): string { return `${minutes}m ${rest}s`; } +function formatSeconds(value: number): string { + const seconds = Math.max(0, Math.round(value)); + if (seconds < 60) { + return `${seconds}s`; + } + const minutes = Math.floor(seconds / 60); + if (minutes < 60) { + return `${minutes}m`; + } + const hours = Math.floor(minutes / 60); + if (hours < 48) { + return `${hours}h`; + } + return `${Math.floor(hours / 24)}d`; +} + function formatExitCode(value: number | undefined): string { return Number.isFinite(value) ? String(value) : "-"; } @@ -1242,7 +1306,10 @@ function formatBytes(value: number): string { if (value < 1024 * 1024) { return `${(value / 1024).toFixed(1)} KiB`; } - return `${(value / 1024 / 1024).toFixed(1)} MiB`; + if (value < 1024 * 1024 * 1024) { + return `${(value / 1024 / 1024).toFixed(1)} MiB`; + } + return `${(value / 1024 / 1024 / 1024).toFixed(1)} GiB`; } function truncate(value: string, maxLength: number): string { diff --git a/worker/src/types.ts b/worker/src/types.ts index 63ae1ef..ebd35ab 100644 --- a/worker/src/types.ts +++ b/worker/src/types.ts @@ -90,6 +90,21 @@ export type Provider = "hetzner" | "aws"; export type TargetOS = "linux" | "macos" | "windows"; export type WindowsMode = "normal" | "wsl2"; +export interface LeaseTelemetry { + capturedAt: string; + source?: string; + load1?: number; + load5?: number; + load15?: number; + memoryUsedBytes?: number; + memoryTotalBytes?: number; + memoryPercent?: number; + diskUsedBytes?: number; + diskTotalBytes?: number; + diskPercent?: number; + uptimeSeconds?: number; +} + export interface LeaseRecord { id: string; slug?: string; @@ -127,6 +142,7 @@ export interface LeaseRecord { updatedAt: string; lastTouchedAt?: string; expiresAt: string; + telemetry?: LeaseTelemetry; releasedAt?: string; endedAt?: string; } diff --git a/worker/test/fleet.test.ts b/worker/test/fleet.test.ts index 161df6e..ff16968 100644 --- a/worker/test/fleet.test.ts +++ b/worker/test/fleet.test.ts @@ -409,7 +409,17 @@ describe("fleet lease identity and idle", () => { "x-crabbox-owner": "peter@example.com", "x-crabbox-org": "openclaw", }, - body: { idleTimeoutSeconds: 2400 }, + body: { + idleTimeoutSeconds: 2400, + telemetry: { + capturedAt: "2026-05-05T01:02:03Z", + source: "ssh-linux", + load1: 0.42, + memoryUsedBytes: 1024, + memoryTotalBytes: 2048, + memoryPercent: 50, + }, + }, }), ); expect(heartbeat.status).toBe(200); @@ -417,6 +427,14 @@ describe("fleet lease identity and idle", () => { expect(lease.id).toBe("cbx_000000000001"); expect(lease.slug).toBe("blue-lobster"); expect(lease.idleTimeoutSeconds).toBe(2400); + expect(lease.telemetry).toMatchObject({ + capturedAt: "2026-05-05T01:02:03.000Z", + source: "ssh-linux", + load1: 0.42, + memoryUsedBytes: 1024, + memoryTotalBytes: 2048, + memoryPercent: 50, + }); expect(Date.parse(lease.expiresAt)).toBeGreaterThan(expiresAt.getTime()); }); @@ -476,6 +494,20 @@ describe("fleet lease identity and idle", () => { org: "openclaw", desktop: true, code: true, + telemetry: { + capturedAt: new Date(Date.now() - 15_000).toISOString(), + source: "ssh-linux", + load1: 0.42, + load5: 0.24, + load15: 0.12, + memoryUsedBytes: 1024, + memoryTotalBytes: 2048, + memoryPercent: 50, + diskUsedBytes: 1024 * 1024 * 1024, + diskTotalBytes: 4 * 1024 * 1024 * 1024, + diskPercent: 25, + uptimeSeconds: 3600, + }, expiresAt: new Date(Date.now() + 60 * 60 * 1000).toISOString(), }), ); @@ -682,6 +714,20 @@ describe("fleet lease identity and idle", () => { org: "openclaw", desktop: true, code: true, + telemetry: { + capturedAt: new Date(Date.now() - 15_000).toISOString(), + source: "ssh-linux", + load1: 0.42, + load5: 0.24, + load15: 0.12, + memoryUsedBytes: 1024, + memoryTotalBytes: 2048, + memoryPercent: 50, + diskUsedBytes: 1024 * 1024 * 1024, + diskTotalBytes: 4 * 1024 * 1024 * 1024, + diskPercent: 25, + uptimeSeconds: 3600, + }, expiresAt: new Date(Date.now() + 60 * 60 * 1000).toISOString(), }), ); @@ -754,6 +800,10 @@ describe("fleet lease identity and idle", () => { ); expect(body).toContain('data-provider="hetzner"'); expect(body).toContain('data-target="linux"'); + expect(body).toContain("
load
0.42 / 0.24 / 0.12
"); + expect(body).toContain("
memory
1.0 KiB / 2.0 KiB (50%)
"); + expect(body).toContain("
disk
1.0 GiB / 4.0 GiB (25%)
"); + expect(body).toContain("
uptime
1h
"); expect(body).toContain("table-search"); expect(body).toContain("/portal/runs/run_000000000001"); expect(body).toContain("/portal/runs/run_000000000001/logs");