192 lines
5.0 KiB
Go
192 lines
5.0 KiB
Go
package state
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"time"
|
|
)
|
|
|
|
const ScopedSchema = `
|
|
create table if not exists sync_state (
|
|
scope text primary key,
|
|
cursor text not null,
|
|
updated_at text not null
|
|
);
|
|
create index if not exists idx_sync_state_updated_at on sync_state(updated_at desc);
|
|
`
|
|
|
|
const CursorSchema = `
|
|
create table if not exists sync_state (
|
|
source text not null,
|
|
entity_type text not null,
|
|
entity_id text not null,
|
|
cursor text not null,
|
|
synced_at text not null,
|
|
primary key (source, entity_type, entity_id)
|
|
);
|
|
create index if not exists idx_sync_state_synced_at on sync_state(synced_at desc);
|
|
`
|
|
|
|
type ScopedStore struct {
|
|
db execQuerier
|
|
now func() time.Time
|
|
}
|
|
|
|
type ScopedRecord struct {
|
|
Scope string `json:"scope"`
|
|
Cursor string `json:"cursor"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
}
|
|
|
|
type CursorStore struct {
|
|
db execQuerier
|
|
now func() time.Time
|
|
}
|
|
|
|
type CursorRecord struct {
|
|
Source string `json:"source"`
|
|
EntityType string `json:"entity_type"`
|
|
EntityID string `json:"entity_id"`
|
|
Cursor string `json:"cursor"`
|
|
SyncedAt time.Time `json:"synced_at"`
|
|
}
|
|
|
|
func EnsureScopedSchema(ctx context.Context, db execQuerier) error {
|
|
if _, err := db.ExecContext(ctx, ScopedSchema); err != nil {
|
|
return fmt.Errorf("ensure scoped sync_state schema: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func EnsureCursorSchema(ctx context.Context, db execQuerier) error {
|
|
if _, err := db.ExecContext(ctx, CursorSchema); err != nil {
|
|
return fmt.Errorf("ensure cursor sync_state schema: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func NewScoped(db execQuerier) *ScopedStore {
|
|
return NewScopedWithClock(db, nil)
|
|
}
|
|
|
|
func NewScopedWithClock(db execQuerier, now func() time.Time) *ScopedStore {
|
|
if now == nil {
|
|
now = func() time.Time { return time.Now().UTC() }
|
|
}
|
|
return &ScopedStore{db: db, now: now}
|
|
}
|
|
|
|
func (s *ScopedStore) Set(ctx context.Context, scope, cursor string) error {
|
|
updatedAt := s.now().UTC()
|
|
_, err := s.db.ExecContext(ctx, `
|
|
insert into sync_state(scope, cursor, updated_at)
|
|
values (?, ?, ?)
|
|
on conflict(scope) do update set
|
|
cursor = excluded.cursor,
|
|
updated_at = excluded.updated_at
|
|
`, scope, cursor, updatedAt.Format(time.RFC3339Nano))
|
|
if err != nil {
|
|
return fmt.Errorf("set scoped sync state: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *ScopedStore) Get(ctx context.Context, scope string) (ScopedRecord, bool, error) {
|
|
var rec ScopedRecord
|
|
var updatedAt string
|
|
err := s.db.QueryRowContext(ctx, `
|
|
select scope, cursor, updated_at
|
|
from sync_state
|
|
where scope = ?
|
|
`, scope).Scan(&rec.Scope, &rec.Cursor, &updatedAt)
|
|
if err == sql.ErrNoRows {
|
|
return ScopedRecord{}, false, nil
|
|
}
|
|
if err != nil {
|
|
return ScopedRecord{}, false, err
|
|
}
|
|
parsed, err := time.Parse(time.RFC3339Nano, updatedAt)
|
|
if err != nil {
|
|
return ScopedRecord{}, false, fmt.Errorf("parse scoped sync state updated_at: %w", err)
|
|
}
|
|
rec.UpdatedAt = parsed
|
|
return rec, true, nil
|
|
}
|
|
|
|
func (s *ScopedStore) IsStale(ctx context.Context, scope string, maxAge time.Duration) (bool, error) {
|
|
rec, ok, err := s.Get(ctx, scope)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !ok {
|
|
return true, nil
|
|
}
|
|
if maxAge <= 0 {
|
|
return false, nil
|
|
}
|
|
return s.now().UTC().Sub(rec.UpdatedAt) > maxAge, nil
|
|
}
|
|
|
|
func NewCursor(db execQuerier) *CursorStore {
|
|
return NewCursorWithClock(db, nil)
|
|
}
|
|
|
|
func NewCursorWithClock(db execQuerier, now func() time.Time) *CursorStore {
|
|
if now == nil {
|
|
now = func() time.Time { return time.Now().UTC() }
|
|
}
|
|
return &CursorStore{db: db, now: now}
|
|
}
|
|
|
|
func (s *CursorStore) Set(ctx context.Context, source, entityType, entityID, cursor string) error {
|
|
syncedAt := s.now().UTC()
|
|
_, err := s.db.ExecContext(ctx, `
|
|
insert into sync_state(source, entity_type, entity_id, cursor, synced_at)
|
|
values (?, ?, ?, ?, ?)
|
|
on conflict(source, entity_type, entity_id) do update set
|
|
cursor = excluded.cursor,
|
|
synced_at = excluded.synced_at
|
|
`, source, entityType, entityID, cursor, syncedAt.Format(time.RFC3339Nano))
|
|
if err != nil {
|
|
return fmt.Errorf("set cursor sync state: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *CursorStore) Get(ctx context.Context, source, entityType, entityID string) (CursorRecord, bool, error) {
|
|
var rec CursorRecord
|
|
var syncedAt string
|
|
err := s.db.QueryRowContext(ctx, `
|
|
select source, entity_type, entity_id, cursor, synced_at
|
|
from sync_state
|
|
where source = ? and entity_type = ? and entity_id = ?
|
|
`, source, entityType, entityID).Scan(&rec.Source, &rec.EntityType, &rec.EntityID, &rec.Cursor, &syncedAt)
|
|
if err == sql.ErrNoRows {
|
|
return CursorRecord{}, false, nil
|
|
}
|
|
if err != nil {
|
|
return CursorRecord{}, false, err
|
|
}
|
|
parsed, err := time.Parse(time.RFC3339Nano, syncedAt)
|
|
if err != nil {
|
|
return CursorRecord{}, false, fmt.Errorf("parse cursor sync state synced_at: %w", err)
|
|
}
|
|
rec.SyncedAt = parsed
|
|
return rec, true, nil
|
|
}
|
|
|
|
func (s *CursorStore) IsStale(ctx context.Context, source, entityType, entityID string, maxAge time.Duration) (bool, error) {
|
|
rec, ok, err := s.Get(ctx, source, entityType, entityID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if !ok {
|
|
return true, nil
|
|
}
|
|
if maxAge <= 0 {
|
|
return false, nil
|
|
}
|
|
return s.now().UTC().Sub(rec.SyncedAt) > maxAge, nil
|
|
}
|