feat(telemetry): record run samples
This commit is contained in:
parent
24848f10cb
commit
101b9c18b4
@ -585,6 +585,14 @@ func (c *CoordinatorClient) FinishRun(ctx context.Context, runID string, exitCod
|
||||
return res.Run, err
|
||||
}
|
||||
|
||||
func (c *CoordinatorClient) AppendRunTelemetry(ctx context.Context, runID string, telemetry *LeaseTelemetry) (CoordinatorRun, error) {
|
||||
var res CoordinatorRunResponse
|
||||
err := c.do(ctx, http.MethodPost, "/v1/runs/"+url.PathEscape(runID)+"/telemetry", map[string]any{
|
||||
"telemetry": telemetry,
|
||||
}, &res)
|
||||
return res.Run, err
|
||||
}
|
||||
|
||||
func (c *CoordinatorClient) AppendRunEvent(ctx context.Context, runID string, input CoordinatorRunEventInput) (CoordinatorRunEvent, error) {
|
||||
var res CoordinatorRunEventResponse
|
||||
err := c.do(ctx, http.MethodPost, "/v1/runs/"+url.PathEscape(runID)+"/events", input, &res)
|
||||
|
||||
@ -269,6 +269,28 @@ func TestCoordinatorTouchAndUpdateHeartbeatBodies(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoordinatorAppendRunTelemetry(t *testing.T) {
|
||||
var body string
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost || r.URL.Path != "/v1/runs/run_123/telemetry" {
|
||||
t.Fatalf("unexpected request %s %s", r.Method, r.URL.Path)
|
||||
}
|
||||
data, _ := io.ReadAll(r.Body)
|
||||
body = string(data)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{"run":{"id":"run_123","leaseID":"cbx_123","owner":"peter@example.com","org":"openclaw","provider":"aws","class":"standard","serverType":"t3.small","command":["sleep","60"],"state":"running","logBytes":0,"logTruncated":false,"startedAt":"2026-05-02T00:00:00Z"}}`))
|
||||
}))
|
||||
defer server.Close()
|
||||
client := CoordinatorClient{BaseURL: server.URL, Client: server.Client()}
|
||||
load := 0.42
|
||||
if _, err := client.AppendRunTelemetry(context.Background(), "run_123", &LeaseTelemetry{Load1: &load}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !strings.Contains(body, `"telemetry"`) || !strings.Contains(body, `"load1":0.42`) {
|
||||
t.Fatalf("append telemetry body=%q", body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCoordinatorHeartbeatTouchesImmediately(t *testing.T) {
|
||||
touches := make(chan struct{}, 1)
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
@ -342,6 +342,7 @@ func (a App) runCommand(ctx context.Context, args []string) (err error) {
|
||||
}
|
||||
}
|
||||
recorder.CaptureTelemetryStart(ctx, target)
|
||||
recorder.StartTelemetrySampler(ctx, target)
|
||||
recorder.Event("sync.started", "sync", "")
|
||||
timings.syncSteps.sshReady = time.Since(stepStart)
|
||||
excludes, err := syncExcludes(repo.Root, cfg)
|
||||
@ -485,6 +486,7 @@ afterSync:
|
||||
}
|
||||
}
|
||||
recorder.CaptureTelemetryStart(ctx, target)
|
||||
recorder.StartTelemetrySampler(ctx, target)
|
||||
if *noSync {
|
||||
mkdirCommand := remoteMkdir(workdir)
|
||||
if isWindowsNativeTarget(target) {
|
||||
|
||||
@ -9,19 +9,25 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const runTelemetrySampleInterval = 15 * time.Second
|
||||
|
||||
type runRecorder struct {
|
||||
coord *CoordinatorClient
|
||||
command []string
|
||||
runID string
|
||||
stderr io.Writer
|
||||
deferUntilLease bool
|
||||
eventsMu sync.Mutex
|
||||
eventsDisabled bool
|
||||
finished bool
|
||||
warned bool
|
||||
warnMu sync.Mutex
|
||||
output *runOutputEventQueue
|
||||
telemetryStart *LeaseTelemetry
|
||||
coord *CoordinatorClient
|
||||
command []string
|
||||
runID string
|
||||
stderr io.Writer
|
||||
deferUntilLease bool
|
||||
eventsMu sync.Mutex
|
||||
eventsDisabled bool
|
||||
finished bool
|
||||
warned bool
|
||||
warnMu sync.Mutex
|
||||
output *runOutputEventQueue
|
||||
telemetryStart *LeaseTelemetry
|
||||
telemetryMu sync.Mutex
|
||||
telemetrySamples []*LeaseTelemetry
|
||||
telemetryCancel func()
|
||||
telemetryDone chan struct{}
|
||||
}
|
||||
|
||||
func newRunRecorder(ctx context.Context, coord *CoordinatorClient, cfg Config, command []string, stderr io.Writer) *runRecorder {
|
||||
@ -100,6 +106,41 @@ func (r *runRecorder) CaptureTelemetryStart(ctx context.Context, target SSHTarge
|
||||
return
|
||||
}
|
||||
r.telemetryStart = collectLeaseTelemetryBestEffort(ctx, leaseTelemetryCollectorForTarget(target))
|
||||
r.recordTelemetrySample(r.telemetryStart)
|
||||
r.appendTelemetryBestEffort(r.telemetryStart)
|
||||
}
|
||||
|
||||
func (r *runRecorder) StartTelemetrySampler(ctx context.Context, target SSHTarget) {
|
||||
if r == nil || r.coord == nil || r.runID == "" {
|
||||
return
|
||||
}
|
||||
r.telemetryMu.Lock()
|
||||
if r.telemetryCancel != nil {
|
||||
r.telemetryMu.Unlock()
|
||||
return
|
||||
}
|
||||
sampleCtx, cancel := context.WithCancel(ctx)
|
||||
done := make(chan struct{})
|
||||
r.telemetryCancel = cancel
|
||||
r.telemetryDone = done
|
||||
r.telemetryMu.Unlock()
|
||||
|
||||
collector := leaseTelemetryCollectorForTarget(target)
|
||||
go func() {
|
||||
defer close(done)
|
||||
ticker := time.NewTicker(runTelemetrySampleInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
sample := collectLeaseTelemetryBestEffort(sampleCtx, collector)
|
||||
r.recordTelemetrySample(sample)
|
||||
r.appendTelemetryBestEffort(sample)
|
||||
case <-sampleCtx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (r *runRecorder) attachRun(run CoordinatorRun) {
|
||||
@ -121,10 +162,12 @@ func (r *runRecorder) Finish(ctx context.Context, target SSHTarget, exitCode int
|
||||
}
|
||||
r.waitForOutputEvents(runEventOutputPostWait)
|
||||
r.finished = true
|
||||
r.stopTelemetrySampler()
|
||||
telemetryEnd := collectLeaseTelemetryBestEffort(ctx, leaseTelemetryCollectorForTarget(target))
|
||||
r.recordTelemetrySample(telemetryEnd)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if _, err := r.coord.FinishRun(ctx, r.runID, exitCode, sync, command, log, truncated, results, runTelemetrySummary(r.telemetryStart, telemetryEnd)); err != nil {
|
||||
if _, err := r.coord.FinishRun(ctx, r.runID, exitCode, sync, command, log, truncated, results, runTelemetrySummary(r.telemetryStart, telemetryEnd, r.telemetrySnapshot())); err != nil {
|
||||
r.warn("run history finish failed for %s: %v", r.runID, err)
|
||||
}
|
||||
}
|
||||
@ -155,6 +198,69 @@ func (r *runRecorder) warn(format string, args ...any) {
|
||||
fmt.Fprintf(r.stderr, "warning: "+format+"\n", args...)
|
||||
}
|
||||
|
||||
func (r *runRecorder) recordTelemetrySample(sample *LeaseTelemetry) {
|
||||
if r == nil || sample == nil || sample.CapturedAt == "" {
|
||||
return
|
||||
}
|
||||
r.telemetryMu.Lock()
|
||||
defer r.telemetryMu.Unlock()
|
||||
for index, existing := range r.telemetrySamples {
|
||||
if existing != nil && existing.CapturedAt == sample.CapturedAt {
|
||||
r.telemetrySamples[index] = sample
|
||||
return
|
||||
}
|
||||
}
|
||||
r.telemetrySamples = append(r.telemetrySamples, sample)
|
||||
if len(r.telemetrySamples) > 60 {
|
||||
r.telemetrySamples = r.telemetrySamples[len(r.telemetrySamples)-60:]
|
||||
}
|
||||
}
|
||||
|
||||
func (r *runRecorder) telemetrySnapshot() []*LeaseTelemetry {
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
r.telemetryMu.Lock()
|
||||
defer r.telemetryMu.Unlock()
|
||||
if len(r.telemetrySamples) == 0 {
|
||||
return nil
|
||||
}
|
||||
samples := make([]*LeaseTelemetry, len(r.telemetrySamples))
|
||||
copy(samples, r.telemetrySamples)
|
||||
return samples
|
||||
}
|
||||
|
||||
func (r *runRecorder) appendTelemetryBestEffort(sample *LeaseTelemetry) {
|
||||
if r == nil || r.coord == nil || r.runID == "" || sample == nil {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if _, err := r.coord.AppendRunTelemetry(ctx, r.runID, sample); err != nil && !isCoordinatorNotFoundError(err) {
|
||||
r.warn("run telemetry append failed for %s: %v", r.runID, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *runRecorder) stopTelemetrySampler() {
|
||||
if r == nil {
|
||||
return
|
||||
}
|
||||
r.telemetryMu.Lock()
|
||||
cancel := r.telemetryCancel
|
||||
done := r.telemetryDone
|
||||
r.telemetryCancel = nil
|
||||
r.telemetryDone = nil
|
||||
r.telemetryMu.Unlock()
|
||||
if cancel == nil {
|
||||
return
|
||||
}
|
||||
cancel()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
func (r *runRecorder) waitForOutputEvents(timeout time.Duration) {
|
||||
if r == nil || r.output == nil {
|
||||
return
|
||||
|
||||
@ -26,8 +26,9 @@ type LeaseTelemetry struct {
|
||||
}
|
||||
|
||||
type RunTelemetrySummary struct {
|
||||
Start *LeaseTelemetry `json:"start,omitempty"`
|
||||
End *LeaseTelemetry `json:"end,omitempty"`
|
||||
Start *LeaseTelemetry `json:"start,omitempty"`
|
||||
End *LeaseTelemetry `json:"end,omitempty"`
|
||||
Samples []*LeaseTelemetry `json:"samples,omitempty"`
|
||||
}
|
||||
|
||||
type leaseTelemetryCollector func(context.Context) (*LeaseTelemetry, error)
|
||||
@ -170,11 +171,11 @@ func leaseTelemetryStatusSummary(telemetry *LeaseTelemetry) string {
|
||||
return strings.Join(parts, " ")
|
||||
}
|
||||
|
||||
func runTelemetrySummary(start, end *LeaseTelemetry) *RunTelemetrySummary {
|
||||
if start == nil && end == nil {
|
||||
func runTelemetrySummary(start, end *LeaseTelemetry, samples []*LeaseTelemetry) *RunTelemetrySummary {
|
||||
if start == nil && end == nil && len(samples) == 0 {
|
||||
return nil
|
||||
}
|
||||
return &RunTelemetrySummary{Start: start, End: end}
|
||||
return &RunTelemetrySummary{Start: start, End: end, Samples: samples}
|
||||
}
|
||||
|
||||
func runTelemetryStatusSummary(telemetry *RunTelemetrySummary) string {
|
||||
@ -182,6 +183,9 @@ func runTelemetryStatusSummary(telemetry *RunTelemetrySummary) string {
|
||||
return ""
|
||||
}
|
||||
current := telemetry.End
|
||||
if current == nil {
|
||||
current = latestTelemetrySample(telemetry.Samples)
|
||||
}
|
||||
if current == nil {
|
||||
current = telemetry.Start
|
||||
}
|
||||
@ -206,6 +210,13 @@ func runTelemetryStatusSummary(telemetry *RunTelemetrySummary) string {
|
||||
return strings.Join(parts, " ")
|
||||
}
|
||||
|
||||
func latestTelemetrySample(samples []*LeaseTelemetry) *LeaseTelemetry {
|
||||
if len(samples) == 0 {
|
||||
return nil
|
||||
}
|
||||
return samples[len(samples)-1]
|
||||
}
|
||||
|
||||
func telemetryDeltaBytes(start, end *LeaseTelemetry, metric string) string {
|
||||
if start == nil || end == nil {
|
||||
return ""
|
||||
|
||||
@ -36,6 +36,7 @@ import type {
|
||||
RunEventRequest,
|
||||
RunFinishRequest,
|
||||
RunRecord,
|
||||
RunTelemetryRequest,
|
||||
RunTelemetrySummary,
|
||||
TestFailure,
|
||||
TestResultSummary,
|
||||
@ -47,6 +48,7 @@ const fleetID = "default";
|
||||
const maxStoredRunLogBytes = 8 * 1024 * 1024;
|
||||
const runLogChunkBytes = 64 * 1024;
|
||||
const maxLeaseTelemetryHistory = 60;
|
||||
const maxRunTelemetrySamples = 60;
|
||||
const webVNCTicketTTLSeconds = 120;
|
||||
const codeTicketTTLSeconds = 120;
|
||||
const maxPendingWebVNCBytes = 1024 * 1024;
|
||||
@ -1518,12 +1520,30 @@ export class FleetDurableObject implements DurableObject {
|
||||
const event = await this.appendRunEventRecord(run, input);
|
||||
return json({ event }, { status: 201 });
|
||||
}
|
||||
if (method === "POST" && action === "telemetry") {
|
||||
return this.appendRunTelemetry(request, runID);
|
||||
}
|
||||
if (method === "POST" && action === "finish") {
|
||||
return this.finishRun(request, runID);
|
||||
}
|
||||
return json({ error: "not_found" }, { status: 404 });
|
||||
}
|
||||
|
||||
private async appendRunTelemetry(request: Request, runID: string): Promise<Response> {
|
||||
const run = await this.getRun(runID);
|
||||
if (!run || !this.runVisibleToRequest(run, request)) {
|
||||
return notFound();
|
||||
}
|
||||
const input = await readJson<RunTelemetryRequest>(request);
|
||||
const telemetry = sanitizeLeaseTelemetry(input.telemetry, new Date());
|
||||
if (!telemetry) {
|
||||
return json({ error: "invalid_telemetry" }, { status: 400 });
|
||||
}
|
||||
run.telemetry = appendRunTelemetrySample(run.telemetry, telemetry);
|
||||
await this.putRun(run);
|
||||
return json({ run });
|
||||
}
|
||||
|
||||
private async finishRun(request: Request, runID: string): Promise<Response> {
|
||||
const run = await this.getRun(runID);
|
||||
if (!run || !this.runVisibleToRequest(run, request)) {
|
||||
@ -1555,7 +1575,7 @@ export class FleetDurableObject implements DurableObject {
|
||||
}
|
||||
const telemetry = sanitizeRunTelemetry(input.telemetry, now);
|
||||
if (telemetry) {
|
||||
run.telemetry = telemetry;
|
||||
run.telemetry = mergeRunTelemetry(run.telemetry, telemetry);
|
||||
}
|
||||
await this.writeRunLog(runID, logInput.log);
|
||||
await this.putRun(run);
|
||||
@ -2639,7 +2659,12 @@ function sanitizeRunTelemetry(
|
||||
}
|
||||
const start = sanitizeLeaseTelemetry(input.start, now);
|
||||
const end = sanitizeLeaseTelemetry(input.end, now);
|
||||
if (!start && !end) {
|
||||
const samples = Array.isArray(input.samples)
|
||||
? input.samples
|
||||
.map((sample) => sanitizeLeaseTelemetry(sample, now))
|
||||
.filter((sample): sample is LeaseTelemetry => sample !== undefined)
|
||||
: [];
|
||||
if (!start && !end && samples.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
const telemetry: RunTelemetrySummary = {};
|
||||
@ -2649,19 +2674,65 @@ function sanitizeRunTelemetry(
|
||||
if (end) {
|
||||
telemetry.end = end;
|
||||
}
|
||||
if (samples.length > 0) {
|
||||
telemetry.samples = boundedTelemetrySamples(samples, maxRunTelemetrySamples);
|
||||
}
|
||||
return telemetry;
|
||||
}
|
||||
|
||||
function mergeRunTelemetry(
|
||||
existing: RunTelemetrySummary | undefined,
|
||||
incoming: RunTelemetrySummary,
|
||||
): RunTelemetrySummary {
|
||||
const telemetry: RunTelemetrySummary = {
|
||||
...(existing ?? {}),
|
||||
...incoming,
|
||||
};
|
||||
telemetry.samples = boundedTelemetrySamples(
|
||||
[
|
||||
...((existing?.samples ?? []).filter(Boolean) as LeaseTelemetry[]),
|
||||
...((incoming.samples ?? []).filter(Boolean) as LeaseTelemetry[]),
|
||||
],
|
||||
maxRunTelemetrySamples,
|
||||
);
|
||||
if (telemetry.samples.length === 0) {
|
||||
delete telemetry.samples;
|
||||
}
|
||||
return telemetry;
|
||||
}
|
||||
|
||||
function appendRunTelemetrySample(
|
||||
telemetry: RunTelemetrySummary | undefined,
|
||||
sample: LeaseTelemetry,
|
||||
): RunTelemetrySummary {
|
||||
const next: RunTelemetrySummary = { ...(telemetry ?? {}) };
|
||||
next.samples = boundedTelemetrySamples([...(next.samples ?? []), sample], maxRunTelemetrySamples);
|
||||
if (!next.start) {
|
||||
next.start = sample;
|
||||
}
|
||||
return next;
|
||||
}
|
||||
|
||||
function appendLeaseTelemetryHistory(
|
||||
history: LeaseTelemetry[] | undefined,
|
||||
telemetry: LeaseTelemetry,
|
||||
): LeaseTelemetry[] {
|
||||
const existing = Array.isArray(history) ? history : [];
|
||||
const next = [
|
||||
...existing.filter((sample) => sample && sample.capturedAt !== telemetry.capturedAt),
|
||||
telemetry,
|
||||
].toSorted((left, right) => left.capturedAt.localeCompare(right.capturedAt));
|
||||
return next.slice(-maxLeaseTelemetryHistory);
|
||||
return boundedTelemetrySamples(
|
||||
[...(Array.isArray(history) ? history : []), telemetry],
|
||||
maxLeaseTelemetryHistory,
|
||||
);
|
||||
}
|
||||
|
||||
function boundedTelemetrySamples(samples: LeaseTelemetry[], max: number): LeaseTelemetry[] {
|
||||
const byTime = new Map<string, LeaseTelemetry>();
|
||||
for (const sample of samples) {
|
||||
if (sample?.capturedAt) {
|
||||
byTime.set(sample.capturedAt, sample);
|
||||
}
|
||||
}
|
||||
return [...byTime.values()]
|
||||
.toSorted((left, right) => left.capturedAt.localeCompare(right.capturedAt))
|
||||
.slice(-max);
|
||||
}
|
||||
|
||||
function sanitizeTelemetryTimestamp(value: string | undefined, now: Date): string {
|
||||
|
||||
@ -884,11 +884,12 @@ function formatTelemetryValue(value: number, unit: string): string {
|
||||
|
||||
function runTelemetryPanel(telemetry: RunRecord["telemetry"]): string {
|
||||
if (!telemetry) {
|
||||
return `<div class="run-telemetry-grid"><div class="run-metric" data-muted="true"><span>telemetry</span><strong>not sampled</strong><small>no box metrics</small></div></div>`;
|
||||
return `<div class="run-telemetry-panel"><div class="run-telemetry-grid"><div class="run-metric" data-muted="true"><span>telemetry</span><strong>not sampled</strong><small>no box metrics</small></div></div></div>`;
|
||||
}
|
||||
const current = telemetry.end || telemetry.start;
|
||||
const samples = runTelemetrySamples(telemetry);
|
||||
const current = telemetry.end || samples.at(-1) || telemetry.start;
|
||||
if (!current) {
|
||||
return `<div class="run-telemetry-grid"><div class="run-metric" data-muted="true"><span>telemetry</span><strong>not sampled</strong><small>no box metrics</small></div></div>`;
|
||||
return `<div class="run-telemetry-panel"><div class="run-telemetry-grid"><div class="run-metric" data-muted="true"><span>telemetry</span><strong>not sampled</strong><small>no box metrics</small></div></div></div>`;
|
||||
}
|
||||
const memory = telemetryStorage(
|
||||
current.memoryUsedBytes,
|
||||
@ -896,11 +897,36 @@ function runTelemetryPanel(telemetry: RunRecord["telemetry"]): string {
|
||||
current.memoryPercent,
|
||||
);
|
||||
const disk = telemetryStorage(current.diskUsedBytes, current.diskTotalBytes, current.diskPercent);
|
||||
return `<div class="run-telemetry-grid">
|
||||
${runMetric("load", telemetryLoad(current), "1 / 5 / 15")}
|
||||
${runMetric("memory", memory, telemetryDeltaLabel("delta", telemetry.start?.memoryUsedBytes, telemetry.end?.memoryUsedBytes))}
|
||||
${runMetric("disk", disk, telemetryDeltaLabel("delta", telemetry.start?.diskUsedBytes, telemetry.end?.diskUsedBytes))}
|
||||
${runMetric("sampled", current.capturedAt ? relativeTime(current.capturedAt) : undefined, current.capturedAt || "no timestamp")}
|
||||
const sampleLabel =
|
||||
samples.length > 1
|
||||
? `${samples.length} samples`
|
||||
: samples.length === 1
|
||||
? "1 sample"
|
||||
: "no history";
|
||||
return `<div class="run-telemetry-panel">
|
||||
<div class="run-telemetry-grid">
|
||||
${runMetric("load", telemetryLoad(current), "1 / 5 / 15")}
|
||||
${runMetric("memory", memory, telemetryDeltaLabel("delta", telemetry.start?.memoryUsedBytes, telemetry.end?.memoryUsedBytes))}
|
||||
${runMetric("disk", disk, telemetryDeltaLabel("delta", telemetry.start?.diskUsedBytes, telemetry.end?.diskUsedBytes))}
|
||||
${runMetric("sampled", current.capturedAt ? relativeTime(current.capturedAt) : undefined, sampleLabel)}
|
||||
</div>
|
||||
<div class="run-telemetry-trends">
|
||||
${telemetrySparkline(
|
||||
"load",
|
||||
samples.map((sample) => sample.load1),
|
||||
"load",
|
||||
)}
|
||||
${telemetrySparkline(
|
||||
"memory",
|
||||
samples.map((sample) => sample.memoryPercent),
|
||||
"%",
|
||||
)}
|
||||
${telemetrySparkline(
|
||||
"disk",
|
||||
samples.map((sample) => sample.diskPercent),
|
||||
"%",
|
||||
)}
|
||||
</div>
|
||||
</div>`;
|
||||
}
|
||||
|
||||
@ -925,7 +951,7 @@ function runTelemetryCell(telemetry: RunRecord["telemetry"]): string {
|
||||
if (!telemetry) {
|
||||
return "-";
|
||||
}
|
||||
const current = telemetry.end || telemetry.start;
|
||||
const current = telemetry.end || runTelemetrySamples(telemetry).at(-1) || telemetry.start;
|
||||
if (!current) {
|
||||
return "-";
|
||||
}
|
||||
@ -943,6 +969,21 @@ function runTelemetryCell(telemetry: RunRecord["telemetry"]): string {
|
||||
return parts.length ? parts.join(" · ") : "-";
|
||||
}
|
||||
|
||||
function runTelemetrySamples(telemetry: RunRecord["telemetry"]): LeaseTelemetrySample[] {
|
||||
if (!telemetry) {
|
||||
return [];
|
||||
}
|
||||
const byTime = new Map<string, LeaseTelemetrySample>();
|
||||
for (const sample of [telemetry.start, ...(telemetry.samples ?? []), telemetry.end]) {
|
||||
if (sample?.capturedAt) {
|
||||
byTime.set(sample.capturedAt, sample);
|
||||
}
|
||||
}
|
||||
return [...byTime.values()].toSorted((left, right) =>
|
||||
left.capturedAt.localeCompare(right.capturedAt),
|
||||
);
|
||||
}
|
||||
|
||||
function telemetryDelta(start: number | undefined, end: number | undefined): string | undefined {
|
||||
if (start === undefined || end === undefined) {
|
||||
return undefined;
|
||||
@ -1118,13 +1159,17 @@ function html(title: string, body: string, status = 200, nonce = ""): Response {
|
||||
.run-artifact-card .run-artifacts { gap:6px; padding:8px; }
|
||||
.run-artifact-card .button { width:100%; }
|
||||
.run-artifact-card .result-grid { grid-column:1 / -1; }
|
||||
.run-telemetry-grid { display:grid; grid-template-columns:repeat(4,minmax(0,1fr)); border-top:1px solid var(--line-soft); background:var(--panel-2); }
|
||||
.run-telemetry-panel { border-top:1px solid var(--line-soft); background:var(--panel-2); }
|
||||
.run-telemetry-grid { display:grid; grid-template-columns:repeat(4,minmax(0,1fr)); }
|
||||
.run-metric { min-width:0; padding:7px 10px; border-right:1px solid var(--line-soft); }
|
||||
.run-metric:last-child { border-right:0; }
|
||||
.run-metric span { display:block; color:var(--muted); font-size:10px; font-weight:700; letter-spacing:0.04em; text-transform:uppercase; }
|
||||
.run-metric strong { display:block; margin-top:2px; overflow:hidden; text-overflow:ellipsis; white-space:nowrap; font-size:13px; }
|
||||
.run-metric small { display:block; margin-top:1px; overflow:hidden; text-overflow:ellipsis; white-space:nowrap; color:var(--muted); font-size:11px; }
|
||||
.run-metric[data-muted="true"] { grid-column:1 / -1; }
|
||||
.run-telemetry-trends { display:grid; grid-template-columns:repeat(3,minmax(0,1fr)); border-top:1px solid var(--line-soft); }
|
||||
.run-telemetry-trends .telemetry-line { grid-template-columns:54px minmax(0,1fr) 46px; padding:7px 10px; border-right:1px solid var(--line-soft); }
|
||||
.run-telemetry-trends .telemetry-line:last-child { border-right:0; }
|
||||
.result-grid { display:grid; grid-template-columns:repeat(2,minmax(0,1fr)); gap:0; margin:4px -14px -14px; border-top:1px solid var(--line-soft); }
|
||||
.result-grid div { padding:8px 10px; border-bottom:1px solid var(--line-soft); }
|
||||
.result-grid dt { color:var(--muted); font-size:11px; text-transform:uppercase; margin-bottom:3px; }
|
||||
@ -1223,6 +1268,9 @@ function html(title: string, body: string, status = 200, nonce = ""): Response {
|
||||
.run-shell .detail-grid { grid-template-columns:1fr; }
|
||||
.run-shell .meta-grid { grid-template-columns:repeat(2,minmax(0,1fr)); }
|
||||
.run-telemetry-grid { grid-template-columns:repeat(2,minmax(0,1fr)); }
|
||||
.run-telemetry-trends { grid-template-columns:1fr; }
|
||||
.run-telemetry-trends .telemetry-line { border-right:0; border-bottom:1px solid var(--line-soft); }
|
||||
.run-telemetry-trends .telemetry-line:last-child { border-bottom:0; }
|
||||
}
|
||||
@media (max-width: 760px) {
|
||||
main { width:min(100vw - 20px, 1180px); padding:10px 0; }
|
||||
|
||||
@ -110,6 +110,7 @@ export interface LeaseTelemetry {
|
||||
export interface RunTelemetrySummary {
|
||||
start?: LeaseTelemetry;
|
||||
end?: LeaseTelemetry;
|
||||
samples?: LeaseTelemetry[];
|
||||
}
|
||||
|
||||
export interface LeaseRecord {
|
||||
@ -234,6 +235,10 @@ export interface RunFinishRequest {
|
||||
telemetry?: RunTelemetrySummary;
|
||||
}
|
||||
|
||||
export interface RunTelemetryRequest {
|
||||
telemetry?: Partial<LeaseTelemetry>;
|
||||
}
|
||||
|
||||
export interface RunEventRecord {
|
||||
runID: string;
|
||||
seq: number;
|
||||
|
||||
@ -1540,6 +1540,84 @@ describe("fleet run history", () => {
|
||||
expect(await logs.text()).toBe("ok\n");
|
||||
});
|
||||
|
||||
it("appends live run telemetry samples and preserves them on finish", async () => {
|
||||
const storage = new MemoryStorage();
|
||||
const fleet = testFleet(storage);
|
||||
const ownerHeaders = {
|
||||
"x-crabbox-owner": "peter@example.com",
|
||||
"x-crabbox-org": "openclaw",
|
||||
};
|
||||
storage.seed(
|
||||
"lease:cbx_000000000001",
|
||||
testLease({
|
||||
id: "cbx_000000000001",
|
||||
slug: "blue-lobster",
|
||||
owner: "peter@example.com",
|
||||
org: "openclaw",
|
||||
}),
|
||||
);
|
||||
const create = await fleet.fetch(
|
||||
request("POST", "/v1/runs", {
|
||||
headers: ownerHeaders,
|
||||
body: { leaseID: "cbx_000000000001", command: ["sleep", "60"] },
|
||||
}),
|
||||
);
|
||||
const { run } = (await create.json()) as { run: RunRecord };
|
||||
|
||||
const firstSample = await fleet.fetch(
|
||||
request("POST", `/v1/runs/${run.id}/telemetry`, {
|
||||
headers: ownerHeaders,
|
||||
body: {
|
||||
telemetry: {
|
||||
capturedAt: "2026-05-01T00:00:10Z",
|
||||
source: "ssh-linux",
|
||||
load1: 0.4,
|
||||
memoryPercent: 40,
|
||||
},
|
||||
},
|
||||
}),
|
||||
);
|
||||
expect(firstSample.status).toBe(200);
|
||||
const sampled = (await firstSample.json()) as { run: RunRecord };
|
||||
expect(sampled.run.telemetry?.start).toMatchObject({ load1: 0.4, memoryPercent: 40 });
|
||||
expect(sampled.run.telemetry?.samples).toHaveLength(1);
|
||||
|
||||
await fleet.fetch(
|
||||
request("POST", `/v1/runs/${run.id}/telemetry`, {
|
||||
headers: ownerHeaders,
|
||||
body: {
|
||||
telemetry: {
|
||||
capturedAt: "2026-05-01T00:00:20Z",
|
||||
source: "ssh-linux",
|
||||
load1: 0.9,
|
||||
memoryPercent: 55,
|
||||
},
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
const finish = await fleet.fetch(
|
||||
request("POST", `/v1/runs/${run.id}/finish`, {
|
||||
headers: ownerHeaders,
|
||||
body: {
|
||||
exitCode: 0,
|
||||
telemetry: {
|
||||
end: {
|
||||
capturedAt: "2026-05-01T00:00:30Z",
|
||||
source: "ssh-linux",
|
||||
load1: 1.2,
|
||||
memoryPercent: 60,
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
);
|
||||
expect(finish.status).toBe(200);
|
||||
const finished = (await finish.json()) as { run: RunRecord };
|
||||
expect(finished.run.telemetry?.end).toMatchObject({ load1: 1.2, memoryPercent: 60 });
|
||||
expect(finished.run.telemetry?.samples?.map((sample) => sample.load1)).toEqual([0.4, 0.9]);
|
||||
});
|
||||
|
||||
it("accepts Go nil slices in passing test results", async () => {
|
||||
const fleet = testFleet();
|
||||
const create = await fleet.fetch(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user