feat(state): add legacy sync adapters
This commit is contained in:
parent
8cda2498b2
commit
bb51c9ea12
@ -7,6 +7,8 @@
|
||||
inference, sync state, snapshots, SQLite, and git mirrors.
|
||||
- Add safer `mirror` helpers for origin updates, existing-origin pulls,
|
||||
path-scoped commits, and portable SQLite sidecar cleanup.
|
||||
- Add `state.ScopedStore` and `state.CursorStore` adapters for legacy sync
|
||||
state table shapes used by downstream apps.
|
||||
- Add `tui`, a shared Bubble Tea terminal archive browser used by the crawl apps for consistent `tui` command behavior.
|
||||
- Improve `tui` rows with compact column rendering, pane-specific scrolling, and full-height pane borders.
|
||||
- Tune `tui` pane colors and mouse-wheel buffering to better match the `gitcrawl` terminal browser feel.
|
||||
|
||||
@ -81,8 +81,9 @@ parsers, and product-specific ranking in the apps.
|
||||
3. Add state adapters instead of one forced schema.
|
||||
Keep the current source/entity/value schema as the canonical new shape, but
|
||||
add adapters for `scope -> cursor` and `source/entity/cursor/synced_at`
|
||||
stores. This avoids risky migrations while making freshness and stale checks
|
||||
uniform.
|
||||
stores. `state.ScopedStore` and `state.CursorStore` cover those legacy
|
||||
shapes so apps can share freshness and stale checks without risky
|
||||
migrations.
|
||||
|
||||
4. Extract embeddings and vector search.
|
||||
Start from `discrawl/internal/embed` for provider clients and from
|
||||
|
||||
191
state/adapters.go
Normal file
191
state/adapters.go
Normal file
@ -0,0 +1,191 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
const ScopedSchema = `
|
||||
create table if not exists sync_state (
|
||||
scope text primary key,
|
||||
cursor text not null,
|
||||
updated_at text not null
|
||||
);
|
||||
create index if not exists idx_sync_state_updated_at on sync_state(updated_at desc);
|
||||
`
|
||||
|
||||
const CursorSchema = `
|
||||
create table if not exists sync_state (
|
||||
source text not null,
|
||||
entity_type text not null,
|
||||
entity_id text not null,
|
||||
cursor text not null,
|
||||
synced_at text not null,
|
||||
primary key (source, entity_type, entity_id)
|
||||
);
|
||||
create index if not exists idx_sync_state_synced_at on sync_state(synced_at desc);
|
||||
`
|
||||
|
||||
type ScopedStore struct {
|
||||
db execQuerier
|
||||
now func() time.Time
|
||||
}
|
||||
|
||||
type ScopedRecord struct {
|
||||
Scope string `json:"scope"`
|
||||
Cursor string `json:"cursor"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
|
||||
type CursorStore struct {
|
||||
db execQuerier
|
||||
now func() time.Time
|
||||
}
|
||||
|
||||
type CursorRecord struct {
|
||||
Source string `json:"source"`
|
||||
EntityType string `json:"entity_type"`
|
||||
EntityID string `json:"entity_id"`
|
||||
Cursor string `json:"cursor"`
|
||||
SyncedAt time.Time `json:"synced_at"`
|
||||
}
|
||||
|
||||
func EnsureScopedSchema(ctx context.Context, db execQuerier) error {
|
||||
if _, err := db.ExecContext(ctx, ScopedSchema); err != nil {
|
||||
return fmt.Errorf("ensure scoped sync_state schema: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func EnsureCursorSchema(ctx context.Context, db execQuerier) error {
|
||||
if _, err := db.ExecContext(ctx, CursorSchema); err != nil {
|
||||
return fmt.Errorf("ensure cursor sync_state schema: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewScoped(db execQuerier) *ScopedStore {
|
||||
return NewScopedWithClock(db, nil)
|
||||
}
|
||||
|
||||
func NewScopedWithClock(db execQuerier, now func() time.Time) *ScopedStore {
|
||||
if now == nil {
|
||||
now = func() time.Time { return time.Now().UTC() }
|
||||
}
|
||||
return &ScopedStore{db: db, now: now}
|
||||
}
|
||||
|
||||
func (s *ScopedStore) Set(ctx context.Context, scope, cursor string) error {
|
||||
updatedAt := s.now().UTC()
|
||||
_, err := s.db.ExecContext(ctx, `
|
||||
insert into sync_state(scope, cursor, updated_at)
|
||||
values (?, ?, ?)
|
||||
on conflict(scope) do update set
|
||||
cursor = excluded.cursor,
|
||||
updated_at = excluded.updated_at
|
||||
`, scope, cursor, updatedAt.Format(time.RFC3339Nano))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set scoped sync state: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *ScopedStore) Get(ctx context.Context, scope string) (ScopedRecord, bool, error) {
|
||||
var rec ScopedRecord
|
||||
var updatedAt string
|
||||
err := s.db.QueryRowContext(ctx, `
|
||||
select scope, cursor, updated_at
|
||||
from sync_state
|
||||
where scope = ?
|
||||
`, scope).Scan(&rec.Scope, &rec.Cursor, &updatedAt)
|
||||
if err == sql.ErrNoRows {
|
||||
return ScopedRecord{}, false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return ScopedRecord{}, false, err
|
||||
}
|
||||
parsed, err := time.Parse(time.RFC3339Nano, updatedAt)
|
||||
if err != nil {
|
||||
return ScopedRecord{}, false, fmt.Errorf("parse scoped sync state updated_at: %w", err)
|
||||
}
|
||||
rec.UpdatedAt = parsed
|
||||
return rec, true, nil
|
||||
}
|
||||
|
||||
func (s *ScopedStore) IsStale(ctx context.Context, scope string, maxAge time.Duration) (bool, error) {
|
||||
rec, ok, err := s.Get(ctx, scope)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !ok {
|
||||
return true, nil
|
||||
}
|
||||
if maxAge <= 0 {
|
||||
return false, nil
|
||||
}
|
||||
return s.now().UTC().Sub(rec.UpdatedAt) > maxAge, nil
|
||||
}
|
||||
|
||||
func NewCursor(db execQuerier) *CursorStore {
|
||||
return NewCursorWithClock(db, nil)
|
||||
}
|
||||
|
||||
func NewCursorWithClock(db execQuerier, now func() time.Time) *CursorStore {
|
||||
if now == nil {
|
||||
now = func() time.Time { return time.Now().UTC() }
|
||||
}
|
||||
return &CursorStore{db: db, now: now}
|
||||
}
|
||||
|
||||
func (s *CursorStore) Set(ctx context.Context, source, entityType, entityID, cursor string) error {
|
||||
syncedAt := s.now().UTC()
|
||||
_, err := s.db.ExecContext(ctx, `
|
||||
insert into sync_state(source, entity_type, entity_id, cursor, synced_at)
|
||||
values (?, ?, ?, ?, ?)
|
||||
on conflict(source, entity_type, entity_id) do update set
|
||||
cursor = excluded.cursor,
|
||||
synced_at = excluded.synced_at
|
||||
`, source, entityType, entityID, cursor, syncedAt.Format(time.RFC3339Nano))
|
||||
if err != nil {
|
||||
return fmt.Errorf("set cursor sync state: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *CursorStore) Get(ctx context.Context, source, entityType, entityID string) (CursorRecord, bool, error) {
|
||||
var rec CursorRecord
|
||||
var syncedAt string
|
||||
err := s.db.QueryRowContext(ctx, `
|
||||
select source, entity_type, entity_id, cursor, synced_at
|
||||
from sync_state
|
||||
where source = ? and entity_type = ? and entity_id = ?
|
||||
`, source, entityType, entityID).Scan(&rec.Source, &rec.EntityType, &rec.EntityID, &rec.Cursor, &syncedAt)
|
||||
if err == sql.ErrNoRows {
|
||||
return CursorRecord{}, false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return CursorRecord{}, false, err
|
||||
}
|
||||
parsed, err := time.Parse(time.RFC3339Nano, syncedAt)
|
||||
if err != nil {
|
||||
return CursorRecord{}, false, fmt.Errorf("parse cursor sync state synced_at: %w", err)
|
||||
}
|
||||
rec.SyncedAt = parsed
|
||||
return rec, true, nil
|
||||
}
|
||||
|
||||
func (s *CursorStore) IsStale(ctx context.Context, source, entityType, entityID string, maxAge time.Duration) (bool, error) {
|
||||
rec, ok, err := s.Get(ctx, source, entityType, entityID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !ok {
|
||||
return true, nil
|
||||
}
|
||||
if maxAge <= 0 {
|
||||
return false, nil
|
||||
}
|
||||
return s.now().UTC().Sub(rec.SyncedAt) > maxAge, nil
|
||||
}
|
||||
@ -48,3 +48,81 @@ func TestSetGetAndStale(t *testing.T) {
|
||||
t.Fatal("old record reported fresh")
|
||||
}
|
||||
}
|
||||
|
||||
func TestScopedStoreSetGetAndStale(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
db, err := sql.Open("sqlite", "file:"+filepath.Join(t.TempDir(), "scoped.db"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
if err := EnsureScopedSchema(ctx, db); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
now := time.Date(2026, 5, 1, 12, 0, 0, 0, time.UTC)
|
||||
store := NewScopedWithClock(db, func() time.Time { return now })
|
||||
if err := store.Set(ctx, "share:last_import_at", "2026-05-01T12:00:00Z"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rec, ok, err := store.Get(ctx, "share:last_import_at")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !ok || rec.Cursor == "" {
|
||||
t.Fatalf("record not found: %+v", rec)
|
||||
}
|
||||
stale, err := store.IsStale(ctx, "share:last_import_at", time.Hour)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if stale {
|
||||
t.Fatal("fresh scoped record reported stale")
|
||||
}
|
||||
store.now = func() time.Time { return now.Add(2 * time.Hour) }
|
||||
stale, err = store.IsStale(ctx, "share:last_import_at", time.Hour)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !stale {
|
||||
t.Fatal("old scoped record reported fresh")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCursorStoreSetGetAndStale(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
db, err := sql.Open("sqlite", "file:"+filepath.Join(t.TempDir(), "cursor.db"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer db.Close()
|
||||
if err := EnsureCursorSchema(ctx, db); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
now := time.Date(2026, 5, 1, 12, 0, 0, 0, time.UTC)
|
||||
store := NewCursorWithClock(db, func() time.Time { return now })
|
||||
if err := store.Set(ctx, "share", "manifest", "generated_at", "2026-05-01T12:00:00Z"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rec, ok, err := store.Get(ctx, "share", "manifest", "generated_at")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !ok || rec.Cursor == "" {
|
||||
t.Fatalf("record not found: %+v", rec)
|
||||
}
|
||||
stale, err := store.IsStale(ctx, "share", "manifest", "generated_at", time.Hour)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if stale {
|
||||
t.Fatal("fresh cursor record reported stale")
|
||||
}
|
||||
store.now = func() time.Time { return now.Add(2 * time.Hour) }
|
||||
stale, err = store.IsStale(ctx, "share", "manifest", "generated_at", time.Hour)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !stale {
|
||||
t.Fatal("old cursor record reported fresh")
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user