feat(portal): record lease telemetry snapshots
This commit is contained in:
parent
45f607b43a
commit
7b699f4cda
@ -11,6 +11,7 @@
|
||||
- Added per-lease portal detail pages with bridge status, pasteable commands, recent run links, and a stop action.
|
||||
- Added portal run detail pages with command metadata, result summaries, dense viewport-fitted portal tables, provider/OS badges, active/ended/provider/target filters, sticky portal chrome, and copyable retained log previews.
|
||||
- Added admin portal visibility for non-owned runner leases, including `mine`/`system` filters and matching detail/code/VNC drilldowns for operator sessions.
|
||||
- Added latest lease telemetry snapshots for coordinator-backed Linux leases, including load, memory, disk, and uptime in `status --json` and the portal detail view.
|
||||
- Added `.crabboxignore` for repo-local sync-only exclude patterns shared by `run` and `sync-plan`.
|
||||
- Documented the prebaked runner image boundary: provider-owned AMIs/snapshots hold machine capabilities while repo/runtime caches stay in QA workflows or warm leases.
|
||||
|
||||
|
||||
@ -42,7 +42,7 @@ leased machine
|
||||
8. CLI seeds remote Git when possible, compares sync fingerprints, and syncs changed files with `rsync --delete`.
|
||||
9. CLI runs sync sanity and configured base-ref hydration.
|
||||
10. CLI runs the command over SSH and streams stdout/stderr.
|
||||
11. CLI heartbeats while the command runs; heartbeats touch `lastTouchedAt` and recompute idle expiry up to the TTL cap.
|
||||
11. CLI heartbeats while the command runs; heartbeats touch `lastTouchedAt`, recompute idle expiry up to the TTL cap, and attach a best-effort latest Linux telemetry snapshot when SSH is reachable.
|
||||
12. CLI releases the lease when done.
|
||||
13. Durable Object alarm cleans up stale leases and expired machines.
|
||||
|
||||
@ -72,6 +72,8 @@ POST /v1/admin/leases/{id-or-slug}/delete
|
||||
|
||||
Admin endpoints and `GET /v1/pool` require the separate admin token. GitHub browser-login tokens are user tokens for normal lease operations and are minted only after allowed GitHub org membership is verified. User-token list, exact-ID lookup, slug lookup, heartbeat, release, run history, logs, and usage are scoped to the token owner/org.
|
||||
|
||||
Heartbeat bodies may include a `telemetry` object. The coordinator stores only the latest sanitized snapshot on the lease record. Current CLI snapshots include Linux load average, memory use, root-disk use, uptime, source, and capture timestamp.
|
||||
|
||||
## Durable Object State
|
||||
|
||||
Use one fleet Durable Object for MVP. It owns all atomic scheduling decisions.
|
||||
|
||||
@ -30,4 +30,6 @@ Flags:
|
||||
```
|
||||
|
||||
Human and JSON output include the selected network. With Tailscale metadata,
|
||||
status also prints the tailnet host/state.
|
||||
status also prints the tailnet host/state. For coordinator-backed Linux leases
|
||||
that have received a recent heartbeat, status also includes the latest
|
||||
best-effort telemetry snapshot: load, memory, disk, uptime, and capture age.
|
||||
|
||||
@ -43,6 +43,8 @@ The Worker stores coordinator leases as `active`, `released`, `expired`, or `fai
|
||||
|
||||
`crabbox warmup --idle-timeout 30m` and `crabbox run --idle-timeout 30m` set inactivity expiry. `--ttl` is a separate maximum wall-clock lifetime. The CLI sends coordinator heartbeats while a lease is in use; each heartbeat updates `lastTouchedAt` and recomputes `expiresAt = min(createdAt + ttl, lastTouchedAt + idleTimeout)`.
|
||||
|
||||
For Linux leases, heartbeats also attach a best-effort latest telemetry snapshot when SSH is reachable. The Durable Object keeps only the latest sanitized load, memory, disk, uptime, source, and capture timestamp on the lease record; it is not a time-series store.
|
||||
|
||||
Direct-provider mode does not have a central heartbeat or alarm. It labels machines with `created_at`, `last_touched_at`, `idle_timeout_secs`, `expires_at`, `state`, `lease`, and `slug`; `crabbox cleanup` uses those labels conservatively.
|
||||
|
||||
## Cleanup
|
||||
|
||||
@ -81,7 +81,7 @@ func (a App) actionsHydrate(ctx context.Context, args []string) error {
|
||||
if coord, ok, err := newTargetCoordinatorClient(cfg); err != nil {
|
||||
return err
|
||||
} else if ok {
|
||||
stopHeartbeat := startCoordinatorHeartbeat(ctx, coord, leaseID, cfg.IdleTimeout, nil, a.Stderr)
|
||||
stopHeartbeat := startCoordinatorHeartbeat(ctx, coord, leaseID, cfg.IdleTimeout, nil, leaseTelemetryCollectorForTarget(target), a.Stderr)
|
||||
defer stopHeartbeat()
|
||||
} else {
|
||||
a.touchActiveLeaseBestEffort(ctx, cfg, server, leaseID)
|
||||
|
||||
@ -57,6 +57,7 @@ type CoordinatorLease struct {
|
||||
UpdatedAt string `json:"updatedAt,omitempty"`
|
||||
LastTouchedAt string `json:"lastTouchedAt,omitempty"`
|
||||
ExpiresAt string `json:"expiresAt"`
|
||||
Telemetry *LeaseTelemetry `json:"telemetry,omitempty"`
|
||||
}
|
||||
|
||||
type ProvisioningAttempt struct {
|
||||
@ -378,26 +379,37 @@ func (c *CoordinatorClient) ReleaseLease(ctx context.Context, id string, deleteS
|
||||
}
|
||||
|
||||
func (c *CoordinatorClient) TouchLease(ctx context.Context, id string) (CoordinatorLease, error) {
|
||||
return c.heartbeatLease(ctx, id, nil)
|
||||
return c.heartbeatLease(ctx, id, nil, nil)
|
||||
}
|
||||
|
||||
func (c *CoordinatorClient) TouchLeaseWithTelemetry(ctx context.Context, id string, telemetry *LeaseTelemetry) (CoordinatorLease, error) {
|
||||
return c.heartbeatLease(ctx, id, nil, telemetry)
|
||||
}
|
||||
|
||||
func (c *CoordinatorClient) UpdateLeaseIdleTimeout(ctx context.Context, id string, idleTimeout time.Duration) (CoordinatorLease, error) {
|
||||
return c.heartbeatLease(ctx, id, &idleTimeout)
|
||||
return c.heartbeatLease(ctx, id, &idleTimeout, nil)
|
||||
}
|
||||
|
||||
func (c *CoordinatorClient) heartbeatLease(ctx context.Context, id string, idleTimeout *time.Duration) (CoordinatorLease, error) {
|
||||
func (c *CoordinatorClient) UpdateLeaseIdleTimeoutWithTelemetry(ctx context.Context, id string, idleTimeout time.Duration, telemetry *LeaseTelemetry) (CoordinatorLease, error) {
|
||||
return c.heartbeatLease(ctx, id, &idleTimeout, telemetry)
|
||||
}
|
||||
|
||||
func (c *CoordinatorClient) heartbeatLease(ctx context.Context, id string, idleTimeout *time.Duration, telemetry *LeaseTelemetry) (CoordinatorLease, error) {
|
||||
var res struct {
|
||||
Lease CoordinatorLease `json:"lease"`
|
||||
}
|
||||
err := c.do(ctx, http.MethodPost, "/v1/leases/"+url.PathEscape(id)+"/heartbeat", heartbeatRequestBody(idleTimeout), &res)
|
||||
err := c.do(ctx, http.MethodPost, "/v1/leases/"+url.PathEscape(id)+"/heartbeat", heartbeatRequestBody(idleTimeout, telemetry), &res)
|
||||
return res.Lease, err
|
||||
}
|
||||
|
||||
func heartbeatRequestBody(idleTimeout *time.Duration) map[string]any {
|
||||
func heartbeatRequestBody(idleTimeout *time.Duration, telemetry *LeaseTelemetry) map[string]any {
|
||||
body := map[string]any{}
|
||||
if idleTimeout != nil && *idleTimeout > 0 {
|
||||
body["idleTimeoutSeconds"] = int(idleTimeout.Seconds())
|
||||
}
|
||||
if telemetry != nil {
|
||||
body["telemetry"] = telemetry
|
||||
}
|
||||
return body
|
||||
}
|
||||
|
||||
|
||||
@ -222,14 +222,19 @@ func TestCoordinatorHTTPAddsAccessHeaders(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHeartbeatRequestBodyOmitsIdleTimeoutForTouch(t *testing.T) {
|
||||
if body := heartbeatRequestBody(nil); len(body) != 0 {
|
||||
if body := heartbeatRequestBody(nil, nil); len(body) != 0 {
|
||||
t.Fatalf("touch heartbeat body=%v, want empty", body)
|
||||
}
|
||||
idleTimeout := 45 * time.Minute
|
||||
body := heartbeatRequestBody(&idleTimeout)
|
||||
body := heartbeatRequestBody(&idleTimeout, nil)
|
||||
if body["idleTimeoutSeconds"] != 2700 {
|
||||
t.Fatalf("heartbeat body=%v, want idle timeout seconds", body)
|
||||
}
|
||||
load := 0.42
|
||||
body = heartbeatRequestBody(nil, &LeaseTelemetry{Load1: &load})
|
||||
if body["telemetry"] == nil {
|
||||
t.Fatalf("heartbeat body=%v, want telemetry", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoordinatorTouchAndUpdateHeartbeatBodies(t *testing.T) {
|
||||
@ -248,10 +253,14 @@ func TestCoordinatorTouchAndUpdateHeartbeatBodies(t *testing.T) {
|
||||
if _, err := client.TouchLease(context.Background(), "cbx_123"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
load := 0.42
|
||||
if _, err := client.TouchLeaseWithTelemetry(context.Background(), "cbx_123", &LeaseTelemetry{Load1: &load}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := client.UpdateLeaseIdleTimeout(context.Background(), "cbx_123", 45*time.Minute); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(bodies) != 2 || bodies[0] != "{}" || !strings.Contains(bodies[1], `"idleTimeoutSeconds":2700`) {
|
||||
if len(bodies) != 3 || bodies[0] != "{}" || !strings.Contains(bodies[1], `"load1":0.42`) || !strings.Contains(bodies[2], `"idleTimeoutSeconds":2700`) {
|
||||
t.Fatalf("heartbeat bodies=%q", bodies)
|
||||
}
|
||||
}
|
||||
@ -269,7 +278,7 @@ func TestCoordinatorHeartbeatTouchesImmediately(t *testing.T) {
|
||||
defer server.Close()
|
||||
|
||||
client := CoordinatorClient{BaseURL: server.URL, Client: server.Client()}
|
||||
stop := startCoordinatorHeartbeat(context.Background(), &client, "cbx_123", 30*time.Minute, nil, io.Discard)
|
||||
stop := startCoordinatorHeartbeat(context.Background(), &client, "cbx_123", 30*time.Minute, nil, nil, io.Discard)
|
||||
defer stop()
|
||||
|
||||
select {
|
||||
@ -279,6 +288,37 @@ func TestCoordinatorHeartbeatTouchesImmediately(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoordinatorHeartbeatIncludesTelemetry(t *testing.T) {
|
||||
bodies := make(chan string, 1)
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path != "/v1/leases/cbx_123/heartbeat" {
|
||||
t.Fatalf("unexpected path %s", r.URL.Path)
|
||||
}
|
||||
data, _ := io.ReadAll(r.Body)
|
||||
bodies <- string(data)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{"lease":{"id":"cbx_123","provider":"aws","state":"active","expiresAt":"2026-05-01T00:30:00Z"}}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
load := 0.77
|
||||
collector := func(context.Context) (*LeaseTelemetry, error) {
|
||||
return &LeaseTelemetry{Load1: &load}, nil
|
||||
}
|
||||
client := CoordinatorClient{BaseURL: server.URL, Client: server.Client()}
|
||||
stop := startCoordinatorHeartbeat(context.Background(), &client, "cbx_123", 30*time.Minute, nil, collector, io.Discard)
|
||||
defer stop()
|
||||
|
||||
select {
|
||||
case body := <-bodies:
|
||||
if !strings.Contains(body, `"load1":0.77`) {
|
||||
t.Fatalf("heartbeat body=%s, want telemetry", body)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("heartbeat did not touch immediately")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoordinatorLeaseWatchCancelsWhenLeaseReleased(t *testing.T) {
|
||||
oldInterval := coordinatorLeaseWatchInterval
|
||||
coordinatorLeaseWatchInterval = 10 * time.Millisecond
|
||||
|
||||
@ -299,7 +299,7 @@ func (a App) runCommand(ctx context.Context, args []string) (err error) {
|
||||
return recordFailure(err)
|
||||
}
|
||||
}
|
||||
stopHeartbeat := startCoordinatorHeartbeat(ctx, coord, leaseID, cfg.IdleTimeout, heartbeatIdleTimeout, a.Stderr)
|
||||
stopHeartbeat := startCoordinatorHeartbeat(ctx, coord, leaseID, cfg.IdleTimeout, heartbeatIdleTimeout, leaseTelemetryCollectorForTarget(target), a.Stderr)
|
||||
defer stopHeartbeat()
|
||||
}
|
||||
|
||||
@ -710,7 +710,7 @@ func (a App) acquireCoordinator(ctx context.Context, cfg Config, coord *Coordina
|
||||
}
|
||||
waitCtx, cancelWait := context.WithCancelCause(ctx)
|
||||
defer cancelWait(nil)
|
||||
stopHeartbeat := startCoordinatorHeartbeat(waitCtx, coord, leaseID, cfg.IdleTimeout, nil, a.Stderr)
|
||||
stopHeartbeat := startCoordinatorHeartbeat(waitCtx, coord, leaseID, cfg.IdleTimeout, nil, leaseTelemetryCollectorForTarget(target), a.Stderr)
|
||||
defer stopHeartbeat()
|
||||
stopLeaseWatch := startCoordinatorLeaseWatch(waitCtx, coord, leaseID, cancelWait, a.Stderr)
|
||||
defer stopLeaseWatch()
|
||||
@ -845,7 +845,7 @@ func (a App) releaseAcquiredLeaseBestEffort(ctx context.Context, cfg Config, coo
|
||||
removeLeaseClaim(leaseID)
|
||||
}
|
||||
|
||||
func startCoordinatorHeartbeat(ctx context.Context, coord *CoordinatorClient, leaseID string, idleTimeout time.Duration, updateIdleTimeout *time.Duration, stderr io.Writer) func() {
|
||||
func startCoordinatorHeartbeat(ctx context.Context, coord *CoordinatorClient, leaseID string, idleTimeout time.Duration, updateIdleTimeout *time.Duration, telemetryCollector leaseTelemetryCollector, stderr io.Writer) func() {
|
||||
rootCtx, cancel := context.WithCancel(ctx)
|
||||
interval := heartbeatInterval(idleTimeout)
|
||||
done := make(chan struct{})
|
||||
@ -854,12 +854,13 @@ func startCoordinatorHeartbeat(ctx context.Context, coord *CoordinatorClient, le
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
telemetry := collectLeaseTelemetryBestEffort(rootCtx, telemetryCollector)
|
||||
callCtx, heartbeatCancel := context.WithTimeout(rootCtx, 20*time.Second)
|
||||
var err error
|
||||
if updateIdleTimeout != nil {
|
||||
_, err = coord.UpdateLeaseIdleTimeout(callCtx, leaseID, *updateIdleTimeout)
|
||||
_, err = coord.UpdateLeaseIdleTimeoutWithTelemetry(callCtx, leaseID, *updateIdleTimeout, telemetry)
|
||||
} else {
|
||||
_, err = coord.TouchLease(callCtx, leaseID)
|
||||
_, err = coord.TouchLeaseWithTelemetry(callCtx, leaseID, telemetry)
|
||||
}
|
||||
heartbeatCancel()
|
||||
if err != nil && rootCtx.Err() == nil {
|
||||
|
||||
@ -49,7 +49,11 @@ func (a App) status(ctx context.Context, args []string) error {
|
||||
if state.Tailscale != nil && state.Tailscale.Enabled {
|
||||
tailscale = fmt.Sprintf(" tailscale=%s", blank(tailscaleTargetHost(*state.Tailscale), blank(state.Tailscale.State, "requested")))
|
||||
}
|
||||
fmt.Fprintf(a.Stdout, "%s slug=%s provider=%s target=%s windows_mode=%s state=%s type=%s host=%s network=%s%s ready=%t has_host=%t idle_for=%s idle_timeout=%s expires=%s\n", state.ID, blank(state.Slug, "-"), state.Provider, state.TargetOS, blank(state.WindowsMode, "-"), state.State, state.ServerType, state.Host, state.Network, tailscale, state.Ready, state.HasHost, blank(state.IdleFor, "-"), blank(state.IdleTimeout, "-"), blank(state.ExpiresAt, "-"))
|
||||
telemetry := leaseTelemetryStatusSummary(state.Telemetry)
|
||||
if telemetry != "" {
|
||||
telemetry = " " + telemetry
|
||||
}
|
||||
fmt.Fprintf(a.Stdout, "%s slug=%s provider=%s target=%s windows_mode=%s state=%s type=%s host=%s network=%s%s ready=%t has_host=%t idle_for=%s idle_timeout=%s expires=%s%s\n", state.ID, blank(state.Slug, "-"), state.Provider, state.TargetOS, blank(state.WindowsMode, "-"), state.State, state.ServerType, state.Host, state.Network, tailscale, state.Ready, state.HasHost, blank(state.IdleFor, "-"), blank(state.IdleTimeout, "-"), blank(state.ExpiresAt, "-"), telemetry)
|
||||
}
|
||||
if !*wait || state.Ready {
|
||||
return nil
|
||||
@ -85,6 +89,7 @@ type statusView struct {
|
||||
Labels map[string]string `json:"labels,omitempty"`
|
||||
HasHost bool `json:"hasHost"`
|
||||
Ready bool `json:"ready"`
|
||||
Telemetry *LeaseTelemetry `json:"telemetry,omitempty"`
|
||||
}
|
||||
|
||||
func (a App) leaseStatus(ctx context.Context, cfg Config, id string) (statusView, error) {
|
||||
@ -127,6 +132,7 @@ func (a App) leaseStatus(ctx context.Context, cfg Config, id string) (statusView
|
||||
Labels: map[string]string{"keep": fmt.Sprint(lease.Keep)},
|
||||
HasHost: hasHost,
|
||||
Ready: ready,
|
||||
Telemetry: lease.Telemetry,
|
||||
}, nil
|
||||
}
|
||||
server, target, leaseID, err := a.findLease(ctx, cfg, id)
|
||||
|
||||
181
internal/cli/telemetry.go
Normal file
181
internal/cli/telemetry.go
Normal file
@ -0,0 +1,181 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const leaseTelemetryTimeout = 5 * time.Second
|
||||
|
||||
type LeaseTelemetry struct {
|
||||
CapturedAt string `json:"capturedAt,omitempty"`
|
||||
Source string `json:"source,omitempty"`
|
||||
Load1 *float64 `json:"load1,omitempty"`
|
||||
Load5 *float64 `json:"load5,omitempty"`
|
||||
Load15 *float64 `json:"load15,omitempty"`
|
||||
MemoryUsedBytes *int64 `json:"memoryUsedBytes,omitempty"`
|
||||
MemoryTotalBytes *int64 `json:"memoryTotalBytes,omitempty"`
|
||||
MemoryPercent *float64 `json:"memoryPercent,omitempty"`
|
||||
DiskUsedBytes *int64 `json:"diskUsedBytes,omitempty"`
|
||||
DiskTotalBytes *int64 `json:"diskTotalBytes,omitempty"`
|
||||
DiskPercent *float64 `json:"diskPercent,omitempty"`
|
||||
UptimeSeconds *int64 `json:"uptimeSeconds,omitempty"`
|
||||
}
|
||||
|
||||
type leaseTelemetryCollector func(context.Context) (*LeaseTelemetry, error)
|
||||
|
||||
func leaseTelemetryCollectorForTarget(target SSHTarget) leaseTelemetryCollector {
|
||||
return func(ctx context.Context) (*LeaseTelemetry, error) {
|
||||
return collectLeaseTelemetry(ctx, target)
|
||||
}
|
||||
}
|
||||
|
||||
func collectLeaseTelemetry(ctx context.Context, target SSHTarget) (*LeaseTelemetry, error) {
|
||||
if target.Host == "" {
|
||||
return nil, nil
|
||||
}
|
||||
if target.TargetOS != "" && target.TargetOS != targetLinux {
|
||||
return nil, nil
|
||||
}
|
||||
output, err := runSSHOutput(ctx, target, remoteLeaseTelemetryScript())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return parseLeaseTelemetry(output, time.Now().UTC(), "ssh-linux"), nil
|
||||
}
|
||||
|
||||
func collectLeaseTelemetryBestEffort(ctx context.Context, collector leaseTelemetryCollector) *LeaseTelemetry {
|
||||
if collector == nil {
|
||||
return nil
|
||||
}
|
||||
callCtx, cancel := context.WithTimeout(ctx, leaseTelemetryTimeout)
|
||||
defer cancel()
|
||||
telemetry, err := collector(callCtx)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return telemetry
|
||||
}
|
||||
|
||||
func remoteLeaseTelemetryScript() string {
|
||||
return `set +e
|
||||
if [ -r /proc/loadavg ]; then awk '{print "load1="$1; print "load5="$2; print "load15="$3}' /proc/loadavg; fi
|
||||
if [ -r /proc/meminfo ]; then awk '
|
||||
/^MemTotal:/ { total=$2*1024 }
|
||||
/^MemAvailable:/ { available=$2*1024 }
|
||||
END {
|
||||
if (total > 0) {
|
||||
used=total-available
|
||||
if (used < 0) used=0
|
||||
printf "memoryTotalBytes=%.0f\n", total
|
||||
printf "memoryUsedBytes=%.0f\n", used
|
||||
printf "memoryPercent=%.2f\n", used*100/total
|
||||
}
|
||||
}' /proc/meminfo; fi
|
||||
df -PB1 / 2>/dev/null | awk 'NR==2 { print "diskTotalBytes="$2; print "diskUsedBytes="$3; if ($2 > 0) printf "diskPercent=%.2f\n", $3*100/$2 }'
|
||||
if [ -r /proc/uptime ]; then awk '{printf "uptimeSeconds=%.0f\n", $1}' /proc/uptime; fi`
|
||||
}
|
||||
|
||||
func parseLeaseTelemetry(output string, capturedAt time.Time, source string) *LeaseTelemetry {
|
||||
telemetry := LeaseTelemetry{
|
||||
CapturedAt: capturedAt.Format(time.RFC3339),
|
||||
Source: source,
|
||||
}
|
||||
hasMetric := false
|
||||
for _, line := range strings.Split(output, "\n") {
|
||||
key, raw, ok := strings.Cut(strings.TrimSpace(line), "=")
|
||||
if !ok || key == "" || raw == "" {
|
||||
continue
|
||||
}
|
||||
switch key {
|
||||
case "load1":
|
||||
hasMetric = setFloat(&telemetry.Load1, raw) || hasMetric
|
||||
case "load5":
|
||||
hasMetric = setFloat(&telemetry.Load5, raw) || hasMetric
|
||||
case "load15":
|
||||
hasMetric = setFloat(&telemetry.Load15, raw) || hasMetric
|
||||
case "memoryUsedBytes":
|
||||
hasMetric = setInt64(&telemetry.MemoryUsedBytes, raw) || hasMetric
|
||||
case "memoryTotalBytes":
|
||||
hasMetric = setInt64(&telemetry.MemoryTotalBytes, raw) || hasMetric
|
||||
case "memoryPercent":
|
||||
hasMetric = setFloat(&telemetry.MemoryPercent, raw) || hasMetric
|
||||
case "diskUsedBytes":
|
||||
hasMetric = setInt64(&telemetry.DiskUsedBytes, raw) || hasMetric
|
||||
case "diskTotalBytes":
|
||||
hasMetric = setInt64(&telemetry.DiskTotalBytes, raw) || hasMetric
|
||||
case "diskPercent":
|
||||
hasMetric = setFloat(&telemetry.DiskPercent, raw) || hasMetric
|
||||
case "uptimeSeconds":
|
||||
hasMetric = setInt64(&telemetry.UptimeSeconds, raw) || hasMetric
|
||||
}
|
||||
}
|
||||
if !hasMetric {
|
||||
return nil
|
||||
}
|
||||
return &telemetry
|
||||
}
|
||||
|
||||
func setFloat(dst **float64, raw string) bool {
|
||||
value, err := strconv.ParseFloat(strings.TrimSpace(raw), 64)
|
||||
if err != nil || value < 0 {
|
||||
return false
|
||||
}
|
||||
*dst = &value
|
||||
return true
|
||||
}
|
||||
|
||||
func setInt64(dst **int64, raw string) bool {
|
||||
value, err := strconv.ParseFloat(strings.TrimSpace(raw), 64)
|
||||
if err != nil || value < 0 {
|
||||
return false
|
||||
}
|
||||
rounded := int64(value)
|
||||
*dst = &rounded
|
||||
return true
|
||||
}
|
||||
|
||||
func leaseTelemetryStatusSummary(telemetry *LeaseTelemetry) string {
|
||||
if telemetry == nil {
|
||||
return ""
|
||||
}
|
||||
parts := []string{}
|
||||
if telemetry.Load1 != nil {
|
||||
parts = append(parts, fmt.Sprintf("load=%.2f", *telemetry.Load1))
|
||||
}
|
||||
if telemetry.MemoryUsedBytes != nil && telemetry.MemoryTotalBytes != nil {
|
||||
parts = append(parts, fmt.Sprintf("mem=%s/%s", formatBytesCompact(*telemetry.MemoryUsedBytes), formatBytesCompact(*telemetry.MemoryTotalBytes)))
|
||||
} else if telemetry.MemoryPercent != nil {
|
||||
parts = append(parts, fmt.Sprintf("mem=%.0f%%", *telemetry.MemoryPercent))
|
||||
}
|
||||
if telemetry.DiskUsedBytes != nil && telemetry.DiskTotalBytes != nil {
|
||||
parts = append(parts, fmt.Sprintf("disk=%s/%s", formatBytesCompact(*telemetry.DiskUsedBytes), formatBytesCompact(*telemetry.DiskTotalBytes)))
|
||||
} else if telemetry.DiskPercent != nil {
|
||||
parts = append(parts, fmt.Sprintf("disk=%.0f%%", *telemetry.DiskPercent))
|
||||
}
|
||||
if telemetry.UptimeSeconds != nil {
|
||||
parts = append(parts, fmt.Sprintf("uptime=%s", formatSecondsDuration(int(*telemetry.UptimeSeconds))))
|
||||
}
|
||||
if telemetry.CapturedAt != "" {
|
||||
parts = append(parts, "telemetry="+blank(idleForString(telemetry.CapturedAt, time.Now()), "now"))
|
||||
}
|
||||
return strings.Join(parts, " ")
|
||||
}
|
||||
|
||||
func formatBytesCompact(value int64) string {
|
||||
const unit = 1024
|
||||
if value < unit {
|
||||
return fmt.Sprintf("%dB", value)
|
||||
}
|
||||
f := float64(value)
|
||||
for _, suffix := range []string{"KiB", "MiB", "GiB", "TiB"} {
|
||||
f /= unit
|
||||
if f < unit {
|
||||
return fmt.Sprintf("%.1f%s", f, suffix)
|
||||
}
|
||||
}
|
||||
return fmt.Sprintf("%.1fPiB", f/unit)
|
||||
}
|
||||
46
internal/cli/telemetry_test.go
Normal file
46
internal/cli/telemetry_test.go
Normal file
@ -0,0 +1,46 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestParseLeaseTelemetry(t *testing.T) {
|
||||
telemetry := parseLeaseTelemetry(strings.Join([]string{
|
||||
"load1=0.12",
|
||||
"load5=0.34",
|
||||
"load15=0.56",
|
||||
"memoryUsedBytes=1073741824",
|
||||
"memoryTotalBytes=2147483648",
|
||||
"memoryPercent=50.00",
|
||||
"diskUsedBytes=3221225472",
|
||||
"diskTotalBytes=10737418240",
|
||||
"diskPercent=30.00",
|
||||
"uptimeSeconds=3661",
|
||||
}, "\n"), time.Date(2026, 5, 5, 1, 2, 3, 0, time.UTC), "test")
|
||||
if telemetry == nil {
|
||||
t.Fatal("telemetry nil")
|
||||
}
|
||||
if telemetry.CapturedAt != "2026-05-05T01:02:03Z" || telemetry.Source != "test" {
|
||||
t.Fatalf("metadata=%#v", telemetry)
|
||||
}
|
||||
if telemetry.Load1 == nil || *telemetry.Load1 != 0.12 {
|
||||
t.Fatalf("load1=%v", telemetry.Load1)
|
||||
}
|
||||
if telemetry.MemoryUsedBytes == nil || *telemetry.MemoryUsedBytes != 1073741824 {
|
||||
t.Fatalf("memoryUsedBytes=%v", telemetry.MemoryUsedBytes)
|
||||
}
|
||||
if telemetry.DiskPercent == nil || *telemetry.DiskPercent != 30 {
|
||||
t.Fatalf("diskPercent=%v", telemetry.DiskPercent)
|
||||
}
|
||||
if telemetry.UptimeSeconds == nil || *telemetry.UptimeSeconds != 3661 {
|
||||
t.Fatalf("uptimeSeconds=%v", telemetry.UptimeSeconds)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseLeaseTelemetrySkipsEmptyMetrics(t *testing.T) {
|
||||
if telemetry := parseLeaseTelemetry("noise\nload1=-1\n", time.Now(), "test"); telemetry != nil {
|
||||
t.Fatalf("telemetry=%#v, want nil", telemetry)
|
||||
}
|
||||
}
|
||||
@ -25,6 +25,7 @@ import type {
|
||||
Env,
|
||||
LeaseRecord,
|
||||
LeaseRequest,
|
||||
LeaseTelemetry,
|
||||
Provider,
|
||||
ProviderImage,
|
||||
ProviderMachine,
|
||||
@ -533,7 +534,10 @@ export class FleetDurableObject implements DurableObject {
|
||||
if (!lease) {
|
||||
return notFound();
|
||||
}
|
||||
const body = await optionalJson<{ idleTimeoutSeconds?: number }>(request);
|
||||
const body = await optionalJson<{
|
||||
idleTimeoutSeconds?: number;
|
||||
telemetry?: Partial<LeaseTelemetry>;
|
||||
}>(request);
|
||||
const now = new Date();
|
||||
const requestedIdleTimeoutSeconds = body.idleTimeoutSeconds;
|
||||
if (
|
||||
@ -543,6 +547,10 @@ export class FleetDurableObject implements DurableObject {
|
||||
) {
|
||||
lease.idleTimeoutSeconds = clampLeaseSeconds(requestedIdleTimeoutSeconds, 86_400);
|
||||
}
|
||||
const telemetry = sanitizeLeaseTelemetry(body.telemetry, now);
|
||||
if (telemetry) {
|
||||
lease.telemetry = telemetry;
|
||||
}
|
||||
lease.updatedAt = now.toISOString();
|
||||
lease.lastTouchedAt = now.toISOString();
|
||||
lease.expiresAt = recomputeLeaseExpiresAt(lease, now).toISOString();
|
||||
@ -2556,6 +2564,65 @@ function clampLeaseSeconds(value: number | undefined, max: number): number {
|
||||
return Math.min(Math.trunc(value), max);
|
||||
}
|
||||
|
||||
function sanitizeLeaseTelemetry(
|
||||
input: Partial<LeaseTelemetry> | undefined,
|
||||
now: Date,
|
||||
): LeaseTelemetry | undefined {
|
||||
if (!input || typeof input !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
const telemetry: LeaseTelemetry = {
|
||||
capturedAt: sanitizeTelemetryTimestamp(input.capturedAt, now),
|
||||
};
|
||||
const source = typeof input.source === "string" ? input.source.trim() : "";
|
||||
if (source) {
|
||||
telemetry.source = source.slice(0, 32);
|
||||
}
|
||||
let hasMetric = false;
|
||||
for (const [key, max] of [
|
||||
["load1", 10_000],
|
||||
["load5", 10_000],
|
||||
["load15", 10_000],
|
||||
["memoryPercent", 100],
|
||||
["diskPercent", 100],
|
||||
] as const) {
|
||||
const value = sanitizeTelemetryNumber(input[key], max);
|
||||
if (value !== undefined) {
|
||||
telemetry[key] = value;
|
||||
hasMetric = true;
|
||||
}
|
||||
}
|
||||
for (const key of [
|
||||
"memoryUsedBytes",
|
||||
"memoryTotalBytes",
|
||||
"diskUsedBytes",
|
||||
"diskTotalBytes",
|
||||
"uptimeSeconds",
|
||||
] as const) {
|
||||
const value = sanitizeTelemetryNumber(input[key], Number.MAX_SAFE_INTEGER);
|
||||
if (value !== undefined) {
|
||||
telemetry[key] = Math.trunc(value);
|
||||
hasMetric = true;
|
||||
}
|
||||
}
|
||||
return hasMetric ? telemetry : undefined;
|
||||
}
|
||||
|
||||
function sanitizeTelemetryTimestamp(value: string | undefined, now: Date): string {
|
||||
const parsed = Date.parse(value ?? "");
|
||||
if (!Number.isFinite(parsed)) {
|
||||
return now.toISOString();
|
||||
}
|
||||
return new Date(parsed).toISOString();
|
||||
}
|
||||
|
||||
function sanitizeTelemetryNumber(value: unknown, max: number): number | undefined {
|
||||
if (typeof value !== "number" || !Number.isFinite(value) || value < 0) {
|
||||
return undefined;
|
||||
}
|
||||
return Math.min(value, max);
|
||||
}
|
||||
|
||||
function allocateLeaseSlug(
|
||||
requested: string,
|
||||
leaseID: string,
|
||||
|
||||
@ -129,6 +129,7 @@ export function portalLeaseDetail(
|
||||
${metaRow("host", lease.host || "pending")}
|
||||
${metaRow("ssh", lease.sshPort ? `${lease.sshUser || "crabbox"}@${lease.host || "host"}:${lease.sshPort}` : "pending")}
|
||||
${metaRow("work root", lease.workRoot || "pending")}
|
||||
${leaseTelemetryRows(lease.telemetry)}
|
||||
${metaRow("expires", shortTime(lease.expiresAt))}
|
||||
</dl>
|
||||
${
|
||||
@ -720,6 +721,53 @@ function targetBadge(target: string | undefined, windowsMode?: string): string {
|
||||
return `<span class="icon-label" data-target="${escapeHTML(value)}">${targetIcon(value)}<span>${escapeHTML(label)}</span></span>`;
|
||||
}
|
||||
|
||||
function leaseTelemetryRows(telemetry: LeaseRecord["telemetry"]): string {
|
||||
if (!telemetry) {
|
||||
return "";
|
||||
}
|
||||
return [
|
||||
metaRow("load", telemetryLoad(telemetry)),
|
||||
metaRow(
|
||||
"memory",
|
||||
telemetryStorage(
|
||||
telemetry.memoryUsedBytes,
|
||||
telemetry.memoryTotalBytes,
|
||||
telemetry.memoryPercent,
|
||||
),
|
||||
),
|
||||
metaRow(
|
||||
"disk",
|
||||
telemetryStorage(telemetry.diskUsedBytes, telemetry.diskTotalBytes, telemetry.diskPercent),
|
||||
),
|
||||
metaRow(
|
||||
"uptime",
|
||||
telemetry.uptimeSeconds !== undefined ? formatSeconds(telemetry.uptimeSeconds) : undefined,
|
||||
),
|
||||
metaRow("seen", telemetry.capturedAt ? relativeTime(telemetry.capturedAt) : undefined),
|
||||
].join("");
|
||||
}
|
||||
|
||||
function telemetryLoad(telemetry: LeaseRecord["telemetry"]): string | undefined {
|
||||
if (!telemetry || telemetry.load1 === undefined) {
|
||||
return undefined;
|
||||
}
|
||||
const load5 = telemetry.load5 === undefined ? "" : ` / ${telemetry.load5.toFixed(2)}`;
|
||||
const load15 = telemetry.load15 === undefined ? "" : ` / ${telemetry.load15.toFixed(2)}`;
|
||||
return `${telemetry.load1.toFixed(2)}${load5}${load15}`;
|
||||
}
|
||||
|
||||
function telemetryStorage(
|
||||
used: number | undefined,
|
||||
total: number | undefined,
|
||||
percent: number | undefined,
|
||||
): string | undefined {
|
||||
if (used !== undefined && total !== undefined) {
|
||||
const percentLabel = percent === undefined ? "" : ` (${Math.round(percent)}%)`;
|
||||
return `${formatBytes(used)} / ${formatBytes(total)}${percentLabel}`;
|
||||
}
|
||||
return percent === undefined ? undefined : `${Math.round(percent)}%`;
|
||||
}
|
||||
|
||||
function providerIcon(provider: string): string {
|
||||
if (provider === "aws") {
|
||||
return `<svg viewBox="0 0 24 24" aria-hidden="true"><path d="M4 15.5c3.8 2.2 9.1 2.5 14.8.9"/><path d="M17.5 13.2 20 16l-3.7.7"/><path d="M7 8.5h10l1.8 4H5.2z"/></svg>`;
|
||||
@ -1231,6 +1279,22 @@ function formatDuration(value: number | undefined): string {
|
||||
return `${minutes}m ${rest}s`;
|
||||
}
|
||||
|
||||
function formatSeconds(value: number): string {
|
||||
const seconds = Math.max(0, Math.round(value));
|
||||
if (seconds < 60) {
|
||||
return `${seconds}s`;
|
||||
}
|
||||
const minutes = Math.floor(seconds / 60);
|
||||
if (minutes < 60) {
|
||||
return `${minutes}m`;
|
||||
}
|
||||
const hours = Math.floor(minutes / 60);
|
||||
if (hours < 48) {
|
||||
return `${hours}h`;
|
||||
}
|
||||
return `${Math.floor(hours / 24)}d`;
|
||||
}
|
||||
|
||||
function formatExitCode(value: number | undefined): string {
|
||||
return Number.isFinite(value) ? String(value) : "-";
|
||||
}
|
||||
@ -1242,7 +1306,10 @@ function formatBytes(value: number): string {
|
||||
if (value < 1024 * 1024) {
|
||||
return `${(value / 1024).toFixed(1)} KiB`;
|
||||
}
|
||||
return `${(value / 1024 / 1024).toFixed(1)} MiB`;
|
||||
if (value < 1024 * 1024 * 1024) {
|
||||
return `${(value / 1024 / 1024).toFixed(1)} MiB`;
|
||||
}
|
||||
return `${(value / 1024 / 1024 / 1024).toFixed(1)} GiB`;
|
||||
}
|
||||
|
||||
function truncate(value: string, maxLength: number): string {
|
||||
|
||||
@ -90,6 +90,21 @@ export type Provider = "hetzner" | "aws";
|
||||
export type TargetOS = "linux" | "macos" | "windows";
|
||||
export type WindowsMode = "normal" | "wsl2";
|
||||
|
||||
export interface LeaseTelemetry {
|
||||
capturedAt: string;
|
||||
source?: string;
|
||||
load1?: number;
|
||||
load5?: number;
|
||||
load15?: number;
|
||||
memoryUsedBytes?: number;
|
||||
memoryTotalBytes?: number;
|
||||
memoryPercent?: number;
|
||||
diskUsedBytes?: number;
|
||||
diskTotalBytes?: number;
|
||||
diskPercent?: number;
|
||||
uptimeSeconds?: number;
|
||||
}
|
||||
|
||||
export interface LeaseRecord {
|
||||
id: string;
|
||||
slug?: string;
|
||||
@ -127,6 +142,7 @@ export interface LeaseRecord {
|
||||
updatedAt: string;
|
||||
lastTouchedAt?: string;
|
||||
expiresAt: string;
|
||||
telemetry?: LeaseTelemetry;
|
||||
releasedAt?: string;
|
||||
endedAt?: string;
|
||||
}
|
||||
|
||||
@ -409,7 +409,17 @@ describe("fleet lease identity and idle", () => {
|
||||
"x-crabbox-owner": "peter@example.com",
|
||||
"x-crabbox-org": "openclaw",
|
||||
},
|
||||
body: { idleTimeoutSeconds: 2400 },
|
||||
body: {
|
||||
idleTimeoutSeconds: 2400,
|
||||
telemetry: {
|
||||
capturedAt: "2026-05-05T01:02:03Z",
|
||||
source: "ssh-linux",
|
||||
load1: 0.42,
|
||||
memoryUsedBytes: 1024,
|
||||
memoryTotalBytes: 2048,
|
||||
memoryPercent: 50,
|
||||
},
|
||||
},
|
||||
}),
|
||||
);
|
||||
expect(heartbeat.status).toBe(200);
|
||||
@ -417,6 +427,14 @@ describe("fleet lease identity and idle", () => {
|
||||
expect(lease.id).toBe("cbx_000000000001");
|
||||
expect(lease.slug).toBe("blue-lobster");
|
||||
expect(lease.idleTimeoutSeconds).toBe(2400);
|
||||
expect(lease.telemetry).toMatchObject({
|
||||
capturedAt: "2026-05-05T01:02:03.000Z",
|
||||
source: "ssh-linux",
|
||||
load1: 0.42,
|
||||
memoryUsedBytes: 1024,
|
||||
memoryTotalBytes: 2048,
|
||||
memoryPercent: 50,
|
||||
});
|
||||
expect(Date.parse(lease.expiresAt)).toBeGreaterThan(expiresAt.getTime());
|
||||
});
|
||||
|
||||
@ -476,6 +494,20 @@ describe("fleet lease identity and idle", () => {
|
||||
org: "openclaw",
|
||||
desktop: true,
|
||||
code: true,
|
||||
telemetry: {
|
||||
capturedAt: new Date(Date.now() - 15_000).toISOString(),
|
||||
source: "ssh-linux",
|
||||
load1: 0.42,
|
||||
load5: 0.24,
|
||||
load15: 0.12,
|
||||
memoryUsedBytes: 1024,
|
||||
memoryTotalBytes: 2048,
|
||||
memoryPercent: 50,
|
||||
diskUsedBytes: 1024 * 1024 * 1024,
|
||||
diskTotalBytes: 4 * 1024 * 1024 * 1024,
|
||||
diskPercent: 25,
|
||||
uptimeSeconds: 3600,
|
||||
},
|
||||
expiresAt: new Date(Date.now() + 60 * 60 * 1000).toISOString(),
|
||||
}),
|
||||
);
|
||||
@ -682,6 +714,20 @@ describe("fleet lease identity and idle", () => {
|
||||
org: "openclaw",
|
||||
desktop: true,
|
||||
code: true,
|
||||
telemetry: {
|
||||
capturedAt: new Date(Date.now() - 15_000).toISOString(),
|
||||
source: "ssh-linux",
|
||||
load1: 0.42,
|
||||
load5: 0.24,
|
||||
load15: 0.12,
|
||||
memoryUsedBytes: 1024,
|
||||
memoryTotalBytes: 2048,
|
||||
memoryPercent: 50,
|
||||
diskUsedBytes: 1024 * 1024 * 1024,
|
||||
diskTotalBytes: 4 * 1024 * 1024 * 1024,
|
||||
diskPercent: 25,
|
||||
uptimeSeconds: 3600,
|
||||
},
|
||||
expiresAt: new Date(Date.now() + 60 * 60 * 1000).toISOString(),
|
||||
}),
|
||||
);
|
||||
@ -754,6 +800,10 @@ describe("fleet lease identity and idle", () => {
|
||||
);
|
||||
expect(body).toContain('data-provider="hetzner"');
|
||||
expect(body).toContain('data-target="linux"');
|
||||
expect(body).toContain("<dt>load</dt><dd>0.42 / 0.24 / 0.12</dd>");
|
||||
expect(body).toContain("<dt>memory</dt><dd>1.0 KiB / 2.0 KiB (50%)</dd>");
|
||||
expect(body).toContain("<dt>disk</dt><dd>1.0 GiB / 4.0 GiB (25%)</dd>");
|
||||
expect(body).toContain("<dt>uptime</dt><dd>1h</dd>");
|
||||
expect(body).toContain("table-search");
|
||||
expect(body).toContain("/portal/runs/run_000000000001");
|
||||
expect(body).toContain("/portal/runs/run_000000000001/logs");
|
||||
|
||||
Loading…
Reference in New Issue
Block a user