From 4443bb9a11cbe90be772365f46627000a9e08473 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sat, 2 May 2026 19:19:53 -0700 Subject: [PATCH] feat(progress): add ci-safe tracker --- progress/progress.go | 176 ++++++++++++++++++++++++++++++++++++++ progress/progress_test.go | 88 +++++++++++++++++++ 2 files changed, 264 insertions(+) create mode 100644 progress/progress.go create mode 100644 progress/progress_test.go diff --git a/progress/progress.go b/progress/progress.go new file mode 100644 index 0000000..5c0730c --- /dev/null +++ b/progress/progress.go @@ -0,0 +1,176 @@ +// Package progress provides CI-safe progress logging for crawler jobs. +package progress + +import ( + "fmt" + "log/slog" + "sync" + "time" +) + +const defaultLogEvery = 30 * time.Second + +type Options struct { + Name string + Unit string + Total int64 + LogEvery time.Duration + MinDelta int64 + Attrs []any + Now func() time.Time +} + +type Tracker struct { + logger *slog.Logger + name string + unit string + total int64 + logEvery time.Duration + minDelta int64 + attrs []any + now func() time.Time + + mu sync.Mutex + started time.Time + lastLog time.Time + lastDone int64 + done int64 +} + +func New(logger *slog.Logger, opts Options) *Tracker { + if logger == nil { + return nil + } + opts = normalizeOptions(opts) + now := opts.Now() + t := &Tracker{ + logger: logger, + name: opts.Name, + unit: opts.Unit, + total: opts.Total, + logEvery: opts.LogEvery, + minDelta: opts.MinDelta, + attrs: append([]any(nil), opts.Attrs...), + now: opts.Now, + started: now, + lastLog: now, + } + t.log("started", now, 0, nil) + return t +} + +func (t *Tracker) Add(delta int64, attrs ...any) { + if t == nil || delta == 0 { + return + } + t.Set(t.current()+delta, attrs...) +} + +func (t *Tracker) Set(done int64, attrs ...any) { + if t == nil { + return + } + now := t.now() + t.mu.Lock() + if done < 0 { + done = 0 + } + if t.total > 0 && done > t.total { + done = t.total + } + t.done = done + shouldLog := done == t.total || + done == 0 || + done-t.lastDone >= t.minDelta || + now.Sub(t.lastLog) >= t.logEvery + if !shouldLog { + t.mu.Unlock() + return + } + t.lastDone = done + t.lastLog = now + t.mu.Unlock() + t.log("progress", now, done, attrs) +} + +func (t *Tracker) Finish(err error, attrs ...any) { + if t == nil { + return + } + now := t.now() + done := t.current() + if t.total > 0 && err == nil { + done = t.total + } + state := "finished" + if err != nil { + state = "failed" + attrs = append(attrs, "err", err) + } + t.log(state, now, done, attrs) +} + +func (t *Tracker) current() int64 { + t.mu.Lock() + defer t.mu.Unlock() + return t.done +} + +func (t *Tracker) log(state string, now time.Time, done int64, attrs []any) { + all := []any{ + "name", t.name, + "state", state, + "done", done, + "elapsed", now.Sub(t.started).Round(time.Second).String(), + } + if t.unit != "" { + all = append(all, "unit", t.unit) + } + if t.total > 0 { + all = append(all, + "total", t.total, + "remaining", max(t.total-done, 0), + "percent", Percent(done, t.total), + "completion", Completion(done, t.total), + ) + } + all = append(all, t.attrs...) + all = append(all, attrs...) + t.logger.Info(t.name+" progress", all...) +} + +func Percent(done, total int64) string { + if total <= 0 { + return "" + } + if done < 0 { + done = 0 + } + if done > total { + done = total + } + return fmt.Sprintf("%.1f", float64(done)*100/float64(total)) +} + +func Completion(done, total int64) string { + if total <= 0 { + return "" + } + return Percent(done, total) + "%" +} + +func normalizeOptions(opts Options) Options { + if opts.Name == "" { + opts.Name = "job" + } + if opts.LogEvery <= 0 { + opts.LogEvery = defaultLogEvery + } + if opts.MinDelta <= 0 { + opts.MinDelta = 1 + } + if opts.Now == nil { + opts.Now = time.Now + } + return opts +} diff --git a/progress/progress_test.go b/progress/progress_test.go new file mode 100644 index 0000000..47c5958 --- /dev/null +++ b/progress/progress_test.go @@ -0,0 +1,88 @@ +package progress + +import ( + "bytes" + "errors" + "log/slog" + "strings" + "testing" + "time" +) + +func TestTrackerLogsCIStablePercentLines(t *testing.T) { + var out bytes.Buffer + now := time.Date(2026, 5, 2, 12, 0, 0, 0, time.UTC) + logger := testLogger(&out) + + tracker := New(logger, Options{ + Name: "sync", + Unit: "channels", + Total: 4, + MinDelta: 2, + Attrs: []any{"source", "api"}, + Now: func() time.Time { return now }, + }) + tracker.Set(1) + tracker.Set(2, "phase", "messages") + tracker.Finish(nil) + + logs := out.String() + if strings.Count(logs, `msg="sync progress"`) != 3 { + t.Fatalf("expected start, threshold, and finish lines:\n%s", logs) + } + for _, want := range []string{ + `state=started`, + `state=progress`, + `state=finished`, + `unit=channels`, + `done=2`, + `total=4`, + `percent=50.0`, + `completion=50.0%`, + `source=api`, + `phase=messages`, + } { + if !strings.Contains(logs, want) { + t.Fatalf("missing %q in logs:\n%s", want, logs) + } + } +} + +func TestTrackerFinishReportsFailure(t *testing.T) { + var out bytes.Buffer + logger := testLogger(&out) + tracker := New(logger, Options{Name: "import", Total: 10}) + + tracker.Set(3) + tracker.Finish(errors.New("boom")) + + logs := out.String() + for _, want := range []string{`state=failed`, `done=3`, `percent=30.0`, `err=boom`} { + if !strings.Contains(logs, want) { + t.Fatalf("missing %q in logs:\n%s", want, logs) + } + } +} + +func TestCompletionClampsToRange(t *testing.T) { + if got := Completion(-1, 4); got != "0.0%" { + t.Fatalf("low clamp = %s", got) + } + if got := Completion(5, 4); got != "100.0%" { + t.Fatalf("high clamp = %s", got) + } + if got := Completion(1, 0); got != "" { + t.Fatalf("unknown total = %q", got) + } +} + +func testLogger(out *bytes.Buffer) *slog.Logger { + return slog.New(slog.NewTextHandler(out, &slog.HandlerOptions{ + ReplaceAttr: func(_ []string, attr slog.Attr) slog.Attr { + if attr.Key == slog.TimeKey { + return slog.Attr{} + } + return attr + }, + })) +}