From bb51c9ea120dfbe6d2acc4114a9223ba8dad12dd Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Tue, 5 May 2026 17:21:30 -0700 Subject: [PATCH] feat(state): add legacy sync adapters --- CHANGELOG.md | 2 + docs/boundary.md | 5 +- state/adapters.go | 191 ++++++++++++++++++++++++++++++++++++++++++++ state/state_test.go | 78 ++++++++++++++++++ 4 files changed, 274 insertions(+), 2 deletions(-) create mode 100644 state/adapters.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 94515df..21e6173 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ inference, sync state, snapshots, SQLite, and git mirrors. - Add safer `mirror` helpers for origin updates, existing-origin pulls, path-scoped commits, and portable SQLite sidecar cleanup. +- Add `state.ScopedStore` and `state.CursorStore` adapters for legacy sync + state table shapes used by downstream apps. - Add `tui`, a shared Bubble Tea terminal archive browser used by the crawl apps for consistent `tui` command behavior. - Improve `tui` rows with compact column rendering, pane-specific scrolling, and full-height pane borders. - Tune `tui` pane colors and mouse-wheel buffering to better match the `gitcrawl` terminal browser feel. diff --git a/docs/boundary.md b/docs/boundary.md index 4fe9068..bbcfb9f 100644 --- a/docs/boundary.md +++ b/docs/boundary.md @@ -81,8 +81,9 @@ parsers, and product-specific ranking in the apps. 3. Add state adapters instead of one forced schema. Keep the current source/entity/value schema as the canonical new shape, but add adapters for `scope -> cursor` and `source/entity/cursor/synced_at` - stores. This avoids risky migrations while making freshness and stale checks - uniform. + stores. `state.ScopedStore` and `state.CursorStore` cover those legacy + shapes so apps can share freshness and stale checks without risky + migrations. 4. Extract embeddings and vector search. Start from `discrawl/internal/embed` for provider clients and from diff --git a/state/adapters.go b/state/adapters.go new file mode 100644 index 0000000..a17b6b1 --- /dev/null +++ b/state/adapters.go @@ -0,0 +1,191 @@ +package state + +import ( + "context" + "database/sql" + "fmt" + "time" +) + +const ScopedSchema = ` +create table if not exists sync_state ( + scope text primary key, + cursor text not null, + updated_at text not null +); +create index if not exists idx_sync_state_updated_at on sync_state(updated_at desc); +` + +const CursorSchema = ` +create table if not exists sync_state ( + source text not null, + entity_type text not null, + entity_id text not null, + cursor text not null, + synced_at text not null, + primary key (source, entity_type, entity_id) +); +create index if not exists idx_sync_state_synced_at on sync_state(synced_at desc); +` + +type ScopedStore struct { + db execQuerier + now func() time.Time +} + +type ScopedRecord struct { + Scope string `json:"scope"` + Cursor string `json:"cursor"` + UpdatedAt time.Time `json:"updated_at"` +} + +type CursorStore struct { + db execQuerier + now func() time.Time +} + +type CursorRecord struct { + Source string `json:"source"` + EntityType string `json:"entity_type"` + EntityID string `json:"entity_id"` + Cursor string `json:"cursor"` + SyncedAt time.Time `json:"synced_at"` +} + +func EnsureScopedSchema(ctx context.Context, db execQuerier) error { + if _, err := db.ExecContext(ctx, ScopedSchema); err != nil { + return fmt.Errorf("ensure scoped sync_state schema: %w", err) + } + return nil +} + +func EnsureCursorSchema(ctx context.Context, db execQuerier) error { + if _, err := db.ExecContext(ctx, CursorSchema); err != nil { + return fmt.Errorf("ensure cursor sync_state schema: %w", err) + } + return nil +} + +func NewScoped(db execQuerier) *ScopedStore { + return NewScopedWithClock(db, nil) +} + +func NewScopedWithClock(db execQuerier, now func() time.Time) *ScopedStore { + if now == nil { + now = func() time.Time { return time.Now().UTC() } + } + return &ScopedStore{db: db, now: now} +} + +func (s *ScopedStore) Set(ctx context.Context, scope, cursor string) error { + updatedAt := s.now().UTC() + _, err := s.db.ExecContext(ctx, ` +insert into sync_state(scope, cursor, updated_at) +values (?, ?, ?) +on conflict(scope) do update set + cursor = excluded.cursor, + updated_at = excluded.updated_at +`, scope, cursor, updatedAt.Format(time.RFC3339Nano)) + if err != nil { + return fmt.Errorf("set scoped sync state: %w", err) + } + return nil +} + +func (s *ScopedStore) Get(ctx context.Context, scope string) (ScopedRecord, bool, error) { + var rec ScopedRecord + var updatedAt string + err := s.db.QueryRowContext(ctx, ` +select scope, cursor, updated_at +from sync_state +where scope = ? +`, scope).Scan(&rec.Scope, &rec.Cursor, &updatedAt) + if err == sql.ErrNoRows { + return ScopedRecord{}, false, nil + } + if err != nil { + return ScopedRecord{}, false, err + } + parsed, err := time.Parse(time.RFC3339Nano, updatedAt) + if err != nil { + return ScopedRecord{}, false, fmt.Errorf("parse scoped sync state updated_at: %w", err) + } + rec.UpdatedAt = parsed + return rec, true, nil +} + +func (s *ScopedStore) IsStale(ctx context.Context, scope string, maxAge time.Duration) (bool, error) { + rec, ok, err := s.Get(ctx, scope) + if err != nil { + return false, err + } + if !ok { + return true, nil + } + if maxAge <= 0 { + return false, nil + } + return s.now().UTC().Sub(rec.UpdatedAt) > maxAge, nil +} + +func NewCursor(db execQuerier) *CursorStore { + return NewCursorWithClock(db, nil) +} + +func NewCursorWithClock(db execQuerier, now func() time.Time) *CursorStore { + if now == nil { + now = func() time.Time { return time.Now().UTC() } + } + return &CursorStore{db: db, now: now} +} + +func (s *CursorStore) Set(ctx context.Context, source, entityType, entityID, cursor string) error { + syncedAt := s.now().UTC() + _, err := s.db.ExecContext(ctx, ` +insert into sync_state(source, entity_type, entity_id, cursor, synced_at) +values (?, ?, ?, ?, ?) +on conflict(source, entity_type, entity_id) do update set + cursor = excluded.cursor, + synced_at = excluded.synced_at +`, source, entityType, entityID, cursor, syncedAt.Format(time.RFC3339Nano)) + if err != nil { + return fmt.Errorf("set cursor sync state: %w", err) + } + return nil +} + +func (s *CursorStore) Get(ctx context.Context, source, entityType, entityID string) (CursorRecord, bool, error) { + var rec CursorRecord + var syncedAt string + err := s.db.QueryRowContext(ctx, ` +select source, entity_type, entity_id, cursor, synced_at +from sync_state +where source = ? and entity_type = ? and entity_id = ? +`, source, entityType, entityID).Scan(&rec.Source, &rec.EntityType, &rec.EntityID, &rec.Cursor, &syncedAt) + if err == sql.ErrNoRows { + return CursorRecord{}, false, nil + } + if err != nil { + return CursorRecord{}, false, err + } + parsed, err := time.Parse(time.RFC3339Nano, syncedAt) + if err != nil { + return CursorRecord{}, false, fmt.Errorf("parse cursor sync state synced_at: %w", err) + } + rec.SyncedAt = parsed + return rec, true, nil +} + +func (s *CursorStore) IsStale(ctx context.Context, source, entityType, entityID string, maxAge time.Duration) (bool, error) { + rec, ok, err := s.Get(ctx, source, entityType, entityID) + if err != nil { + return false, err + } + if !ok { + return true, nil + } + if maxAge <= 0 { + return false, nil + } + return s.now().UTC().Sub(rec.SyncedAt) > maxAge, nil +} diff --git a/state/state_test.go b/state/state_test.go index 94de38b..908f0e3 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -48,3 +48,81 @@ func TestSetGetAndStale(t *testing.T) { t.Fatal("old record reported fresh") } } + +func TestScopedStoreSetGetAndStale(t *testing.T) { + ctx := context.Background() + db, err := sql.Open("sqlite", "file:"+filepath.Join(t.TempDir(), "scoped.db")) + if err != nil { + t.Fatal(err) + } + defer db.Close() + if err := EnsureScopedSchema(ctx, db); err != nil { + t.Fatal(err) + } + now := time.Date(2026, 5, 1, 12, 0, 0, 0, time.UTC) + store := NewScopedWithClock(db, func() time.Time { return now }) + if err := store.Set(ctx, "share:last_import_at", "2026-05-01T12:00:00Z"); err != nil { + t.Fatal(err) + } + rec, ok, err := store.Get(ctx, "share:last_import_at") + if err != nil { + t.Fatal(err) + } + if !ok || rec.Cursor == "" { + t.Fatalf("record not found: %+v", rec) + } + stale, err := store.IsStale(ctx, "share:last_import_at", time.Hour) + if err != nil { + t.Fatal(err) + } + if stale { + t.Fatal("fresh scoped record reported stale") + } + store.now = func() time.Time { return now.Add(2 * time.Hour) } + stale, err = store.IsStale(ctx, "share:last_import_at", time.Hour) + if err != nil { + t.Fatal(err) + } + if !stale { + t.Fatal("old scoped record reported fresh") + } +} + +func TestCursorStoreSetGetAndStale(t *testing.T) { + ctx := context.Background() + db, err := sql.Open("sqlite", "file:"+filepath.Join(t.TempDir(), "cursor.db")) + if err != nil { + t.Fatal(err) + } + defer db.Close() + if err := EnsureCursorSchema(ctx, db); err != nil { + t.Fatal(err) + } + now := time.Date(2026, 5, 1, 12, 0, 0, 0, time.UTC) + store := NewCursorWithClock(db, func() time.Time { return now }) + if err := store.Set(ctx, "share", "manifest", "generated_at", "2026-05-01T12:00:00Z"); err != nil { + t.Fatal(err) + } + rec, ok, err := store.Get(ctx, "share", "manifest", "generated_at") + if err != nil { + t.Fatal(err) + } + if !ok || rec.Cursor == "" { + t.Fatalf("record not found: %+v", rec) + } + stale, err := store.IsStale(ctx, "share", "manifest", "generated_at", time.Hour) + if err != nil { + t.Fatal(err) + } + if stale { + t.Fatal("fresh cursor record reported stale") + } + store.now = func() time.Time { return now.Add(2 * time.Hour) } + stale, err = store.IsStale(ctx, "share", "manifest", "generated_at", time.Hour) + if err != nil { + t.Fatal(err) + } + if !stale { + t.Fatal("old cursor record reported fresh") + } +}