Compare commits

...

1 Commits

Author SHA1 Message Date
Vincent Koc
e1019e0dca
perf(sync): transact desktop ingestion
Some checks failed
Validation / validate (push) Has been cancelled
2026-04-27 12:27:05 -07:00
4 changed files with 142 additions and 60 deletions

View File

@ -63,26 +63,28 @@ func Ingest(ctx context.Context, st *store.Store, path, cacheDir string) (Summar
}
defer db.Close()
s := Summary{Source: source}
if err := st.DeferPageFTS(ctx, func() error {
if s.Spaces, err = ingestSpaces(ctx, st, db); err != nil {
return err
}
if s.Users, err = ingestUsers(ctx, st, db); err != nil {
return err
}
if s.Teams, err = ingestTeams(ctx, st, db); err != nil {
return err
}
if s.Collections, err = ingestCollections(ctx, st, db); err != nil {
return err
}
if s.Pages, s.Blocks, s.RawRecords, err = ingestBlocks(ctx, st, db); err != nil {
return err
}
if s.Comments, err = ingestComments(ctx, st, db); err != nil {
return err
}
return nil
if err := st.WithTransaction(ctx, func() error {
return st.DeferPageFTS(ctx, func() error {
if s.Spaces, err = ingestSpaces(ctx, st, db); err != nil {
return err
}
if s.Users, err = ingestUsers(ctx, st, db); err != nil {
return err
}
if s.Teams, err = ingestTeams(ctx, st, db); err != nil {
return err
}
if s.Collections, err = ingestCollections(ctx, st, db); err != nil {
return err
}
if s.Pages, s.Blocks, s.RawRecords, err = ingestBlocks(ctx, st, db); err != nil {
return err
}
if s.Comments, err = ingestComments(ctx, st, db); err != nil {
return err
}
return nil
})
}); err != nil {
return s, err
}

View File

@ -7,7 +7,7 @@ import (
)
func (s *Store) Pages(ctx context.Context) ([]Page, error) {
rows, err := s.db.QueryContext(ctx, `select id, space_id, parent_id, parent_table, collection_id, title, url, icon, cover,
rows, err := s.queryContext(ctx, `select id, space_id, parent_id, parent_table, collection_id, title, url, icon, cover,
properties_json, created_time, last_edited_time, alive, source, raw_json, synced_at
from pages where alive = 1 order by coalesce(last_edited_time, 0) desc, title`)
if err != nil {
@ -29,7 +29,7 @@ func (s *Store) Pages(ctx context.Context) ([]Page, error) {
}
func (s *Store) Collections(ctx context.Context) ([]Collection, error) {
rows, err := s.db.QueryContext(ctx, `select id, space_id, parent_id, parent_table, name, schema_json, format_json, raw_json, source, synced_at
rows, err := s.queryContext(ctx, `select id, space_id, parent_id, parent_table, name, schema_json, format_json, raw_json, source, synced_at
from collections order by lower(coalesce(name, id)), id`)
if err != nil {
return nil, err
@ -48,13 +48,13 @@ func (s *Store) Collections(ctx context.Context) ([]Collection, error) {
func (s *Store) Collection(ctx context.Context, id string) (Collection, error) {
var c Collection
err := s.db.QueryRowContext(ctx, `select id, space_id, parent_id, parent_table, name, schema_json, format_json, raw_json, source, synced_at
err := s.queryRowContext(ctx, `select id, space_id, parent_id, parent_table, name, schema_json, format_json, raw_json, source, synced_at
from collections where id = ?`, id).Scan(&c.ID, &c.SpaceID, &c.ParentID, &c.ParentTable, &c.Name, &c.SchemaJSON, &c.FormatJSON, &c.RawJSON, &c.Source, &c.SyncedAt)
return c, err
}
func (s *Store) CollectionPages(ctx context.Context, collectionID string) ([]Page, error) {
rows, err := s.db.QueryContext(ctx, `select id, space_id, parent_id, parent_table, collection_id, title, url, icon, cover,
rows, err := s.queryContext(ctx, `select id, space_id, parent_id, parent_table, collection_id, title, url, icon, cover,
properties_json, created_time, last_edited_time, alive, source, raw_json, synced_at
from pages where collection_id = ? and alive = 1 order by coalesce(last_edited_time, 0) desc, title`, collectionID)
if err != nil {
@ -76,7 +76,7 @@ func (s *Store) CollectionPages(ctx context.Context, collectionID string) ([]Pag
}
func (s *Store) PageBlocks(ctx context.Context, pageID string) ([]Block, error) {
rows, err := s.db.QueryContext(ctx, `select id, page_id, space_id, parent_id, parent_table, type, text, properties_json,
rows, err := s.queryContext(ctx, `select id, page_id, space_id, parent_id, parent_table, type, text, properties_json,
content_json, format_json, display_order, created_time, last_edited_time, alive, source, raw_json, synced_at
from blocks where page_id = ? and alive = 1 order by parent_id, display_order, created_time, id`, pageID)
if err != nil {
@ -98,7 +98,7 @@ func (s *Store) PageBlocks(ctx context.Context, pageID string) ([]Block, error)
}
func (s *Store) PageComments(ctx context.Context, pageID string) ([]Comment, error) {
rows, err := s.db.QueryContext(ctx, `select id, page_id, space_id, parent_id, text, created_by_id,
rows, err := s.queryContext(ctx, `select id, page_id, space_id, parent_id, text, created_by_id,
created_time, last_edited_time, alive, raw_json, source, synced_at
from comments where page_id = ? and alive = 1 order by created_time, id`, pageID)
if err != nil {
@ -124,7 +124,7 @@ func (s *Store) SpaceName(ctx context.Context, id string) (string, error) {
return "default", nil
}
var name sql.NullString
err := s.db.QueryRowContext(ctx, `select name from spaces where id = ?`, id).Scan(&name)
err := s.queryRowContext(ctx, `select name from spaces where id = ?`, id).Scan(&name)
if err != nil {
if err == sql.ErrNoRows {
return "space-" + shortID(id), nil
@ -142,7 +142,7 @@ func (s *Store) TeamName(ctx context.Context, id string) (string, error) {
return "", nil
}
var name sql.NullString
err := s.db.QueryRowContext(ctx, `select name from teams where id = ?`, id).Scan(&name)
err := s.queryRowContext(ctx, `select name from teams where id = ?`, id).Scan(&name)
if err != nil {
if err == sql.ErrNoRows {
return "team-" + shortID(id), nil
@ -174,7 +174,7 @@ func (s *Store) resolveTeamID(ctx context.Context, table, id, collectionID strin
switch table {
case "block":
var parentID, parentTable sql.NullString
err := s.db.QueryRowContext(ctx, `select parent_id, parent_table from blocks where id = ?`, id).Scan(&parentID, &parentTable)
err := s.queryRowContext(ctx, `select parent_id, parent_table from blocks where id = ?`, id).Scan(&parentID, &parentTable)
if err != nil {
if err == sql.ErrNoRows {
return "", nil
@ -184,7 +184,7 @@ func (s *Store) resolveTeamID(ctx context.Context, table, id, collectionID strin
return s.resolveTeamID(ctx, parentTable.String, parentID.String, "", seen)
case "collection", "database", "data_source":
var parentID, parentTable sql.NullString
err := s.db.QueryRowContext(ctx, `select parent_id, parent_table from collections where id = ?`, id).Scan(&parentID, &parentTable)
err := s.queryRowContext(ctx, `select parent_id, parent_table from collections where id = ?`, id).Scan(&parentID, &parentTable)
if err != nil {
if err == sql.ErrNoRows {
return "", nil

View File

@ -18,6 +18,7 @@ const schemaVersion = 1
type Store struct {
db *sql.DB
tx *sql.Tx
path string
deferredFTS int
deferredFTSPages map[string]bool
@ -88,6 +89,45 @@ func (s *Store) DB() *sql.DB {
return s.db
}
func (s *Store) execContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
if s.tx != nil {
return s.tx.ExecContext(ctx, query, args...)
}
return s.db.ExecContext(ctx, query, args...)
}
func (s *Store) queryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
if s.tx != nil {
return s.tx.QueryContext(ctx, query, args...)
}
return s.db.QueryContext(ctx, query, args...)
}
func (s *Store) queryRowContext(ctx context.Context, query string, args ...any) *sql.Row {
if s.tx != nil {
return s.tx.QueryRowContext(ctx, query, args...)
}
return s.db.QueryRowContext(ctx, query, args...)
}
func (s *Store) WithTransaction(ctx context.Context, fn func() error) error {
if s.tx != nil {
return fn()
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
s.tx = tx
err = fn()
s.tx = nil
if err != nil {
_ = tx.Rollback()
return err
}
return tx.Commit()
}
func (s *Store) Close() error {
if s == nil || s.db == nil {
return nil
@ -252,12 +292,12 @@ func (s *Store) init(ctx context.Context) error {
`create virtual table if not exists comment_fts using fts5(comment_id unindexed, page_id unindexed, body)`,
}
for _, stmt := range stmts {
if _, err := s.db.ExecContext(ctx, stmt); err != nil {
if _, err := s.execContext(ctx, stmt); err != nil {
return err
}
}
var current int
row := s.db.QueryRowContext(ctx, `select value from meta where key = 'schema_version'`)
row := s.queryRowContext(ctx, `select value from meta where key = 'schema_version'`)
err := row.Scan(&current)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return err
@ -271,20 +311,20 @@ func (s *Store) init(ctx context.Context) error {
if err := s.ensureColumn(ctx, "collections", "parent_table", "text"); err != nil {
return err
}
if _, err := s.db.ExecContext(ctx, `create index if not exists blocks_page_alive_order on blocks(page_id, alive, parent_id, display_order, created_time, id)`); err != nil {
if _, err := s.execContext(ctx, `create index if not exists blocks_page_alive_order on blocks(page_id, alive, parent_id, display_order, created_time, id)`); err != nil {
return err
}
if _, err := s.db.ExecContext(ctx, `create index if not exists blocks_page_alive_created on blocks(page_id, alive, created_time, id)`); err != nil {
if _, err := s.execContext(ctx, `create index if not exists blocks_page_alive_created on blocks(page_id, alive, created_time, id)`); err != nil {
return err
}
if _, err := s.db.ExecContext(ctx, `insert or replace into meta(key, value) values('schema_version', ?)`, schemaVersion); err != nil {
if _, err := s.execContext(ctx, `insert or replace into meta(key, value) values('schema_version', ?)`, schemaVersion); err != nil {
return err
}
return nil
}
func (s *Store) ensureColumn(ctx context.Context, table, column, definition string) error {
rows, err := s.db.QueryContext(ctx, `pragma table_info(`+table+`)`)
rows, err := s.queryContext(ctx, `pragma table_info(`+table+`)`)
if err != nil {
return err
}
@ -305,7 +345,7 @@ func (s *Store) ensureColumn(ctx context.Context, table, column, definition stri
if err := rows.Err(); err != nil {
return err
}
_, err = s.db.ExecContext(ctx, `alter table `+table+` add column `+column+` `+definition)
_, err = s.execContext(ctx, `alter table `+table+` add column `+column+` `+definition)
return err
}
@ -325,7 +365,7 @@ func IntBool(v int) bool {
}
func (s *Store) UpsertSpace(ctx context.Context, x Space) error {
_, err := s.db.ExecContext(ctx, `insert into spaces(id, name, raw_json, source, synced_at)
_, err := s.execContext(ctx, `insert into spaces(id, name, raw_json, source, synced_at)
values (?, ?, ?, ?, ?)
on conflict(id) do update set name=excluded.name, raw_json=excluded.raw_json, source=excluded.source, synced_at=excluded.synced_at`,
x.ID, x.Name, x.RawJSON, x.Source, x.SyncedAt)
@ -333,7 +373,7 @@ func (s *Store) UpsertSpace(ctx context.Context, x Space) error {
}
func (s *Store) UpsertUser(ctx context.Context, x User) error {
_, err := s.db.ExecContext(ctx, `insert into users(id, name, email, raw_json, source, synced_at)
_, err := s.execContext(ctx, `insert into users(id, name, email, raw_json, source, synced_at)
values (?, ?, ?, ?, ?, ?)
on conflict(id) do update set name=excluded.name, email=excluded.email, raw_json=excluded.raw_json, source=excluded.source, synced_at=excluded.synced_at`,
x.ID, x.Name, x.Email, x.RawJSON, x.Source, x.SyncedAt)
@ -341,7 +381,7 @@ func (s *Store) UpsertUser(ctx context.Context, x User) error {
}
func (s *Store) UpsertTeam(ctx context.Context, x Team) error {
_, err := s.db.ExecContext(ctx, `insert into teams(id, space_id, parent_id, parent_table, name, raw_json, source, synced_at)
_, err := s.execContext(ctx, `insert into teams(id, space_id, parent_id, parent_table, name, raw_json, source, synced_at)
values (?, ?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set
space_id=excluded.space_id,
@ -356,7 +396,7 @@ func (s *Store) UpsertTeam(ctx context.Context, x Team) error {
}
func (s *Store) UpsertPage(ctx context.Context, x Page) error {
_, err := s.db.ExecContext(ctx, `insert into pages(
_, err := s.execContext(ctx, `insert into pages(
id, space_id, parent_id, parent_table, collection_id, title, url, icon, cover, properties_json,
created_time, last_edited_time, alive, source, raw_json, synced_at)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
@ -385,7 +425,7 @@ func (s *Store) UpsertPage(ctx context.Context, x Page) error {
}
func (s *Store) UpsertBlock(ctx context.Context, x Block) error {
_, err := s.db.ExecContext(ctx, `insert into blocks(
_, err := s.execContext(ctx, `insert into blocks(
id, page_id, space_id, parent_id, parent_table, type, text, properties_json, content_json, format_json,
display_order, created_time, last_edited_time, alive, source, raw_json, synced_at)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
@ -418,7 +458,7 @@ func (s *Store) UpsertBlock(ctx context.Context, x Block) error {
}
func (s *Store) UpsertCollection(ctx context.Context, x Collection) error {
_, err := s.db.ExecContext(ctx, `insert into collections(id, space_id, parent_id, parent_table, name, schema_json, format_json, raw_json, source, synced_at)
_, err := s.execContext(ctx, `insert into collections(id, space_id, parent_id, parent_table, name, schema_json, format_json, raw_json, source, synced_at)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set space_id=excluded.space_id, parent_id=excluded.parent_id, parent_table=excluded.parent_table, name=excluded.name,
schema_json=excluded.schema_json, format_json=excluded.format_json, raw_json=excluded.raw_json,
@ -428,7 +468,7 @@ func (s *Store) UpsertCollection(ctx context.Context, x Collection) error {
}
func (s *Store) UpsertComment(ctx context.Context, x Comment) error {
_, err := s.db.ExecContext(ctx, `insert into comments(id, page_id, space_id, parent_id, text, created_by_id, created_time, last_edited_time, alive, raw_json, source, synced_at)
_, err := s.execContext(ctx, `insert into comments(id, page_id, space_id, parent_id, text, created_by_id, created_time, last_edited_time, alive, raw_json, source, synced_at)
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
on conflict(id) do update set page_id=excluded.page_id, space_id=excluded.space_id, parent_id=excluded.parent_id,
text=excluded.text, created_by_id=excluded.created_by_id, created_time=excluded.created_time,
@ -438,16 +478,16 @@ func (s *Store) UpsertComment(ctx context.Context, x Comment) error {
if err != nil {
return err
}
_, err = s.db.ExecContext(ctx, `delete from comment_fts where comment_id = ?`, x.ID)
_, err = s.execContext(ctx, `delete from comment_fts where comment_id = ?`, x.ID)
if err != nil {
return err
}
_, err = s.db.ExecContext(ctx, `insert into comment_fts(comment_id, page_id, body) values (?, ?, ?)`, x.ID, x.PageID, x.Text)
_, err = s.execContext(ctx, `insert into comment_fts(comment_id, page_id, body) values (?, ?, ?)`, x.ID, x.PageID, x.Text)
return err
}
func (s *Store) UpsertRawRecord(ctx context.Context, x RawRecord) error {
_, err := s.db.ExecContext(ctx, `insert into raw_records(source, record_table, record_id, parent_id, space_id, raw_json, synced_at)
_, err := s.execContext(ctx, `insert into raw_records(source, record_table, record_id, parent_id, space_id, raw_json, synced_at)
values (?, ?, ?, ?, ?, ?, ?)
on conflict(source, record_table, record_id) do update set parent_id=excluded.parent_id, space_id=excluded.space_id,
raw_json=excluded.raw_json, synced_at=excluded.synced_at`,
@ -456,7 +496,7 @@ func (s *Store) UpsertRawRecord(ctx context.Context, x RawRecord) error {
}
func (s *Store) SetSyncState(ctx context.Context, source, entityType, entityID, cursor string) error {
_, err := s.db.ExecContext(ctx, `insert into sync_state(source, entity_type, entity_id, cursor, synced_at)
_, err := s.execContext(ctx, `insert into sync_state(source, entity_type, entity_id, cursor, synced_at)
values (?, ?, ?, ?, ?)
on conflict(source, entity_type, entity_id) do update set cursor=excluded.cursor, synced_at=excluded.synced_at`,
source, entityType, entityID, cursor, NowMS())
@ -503,7 +543,7 @@ func (s *Store) markPageFTS(ctx context.Context, pageID string) error {
func (s *Store) refreshPageFTS(ctx context.Context, pageID string) error {
var title string
if err := s.db.QueryRowContext(ctx, `select title from pages where id = ?`, pageID).Scan(&title); err != nil {
if err := s.queryRowContext(ctx, `select title from pages where id = ?`, pageID).Scan(&title); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
}
@ -514,10 +554,10 @@ func (s *Store) refreshPageFTS(ctx context.Context, pageID string) error {
return err
}
parts := pageBlockTextParts(pageID, blocks)
if _, err := s.db.ExecContext(ctx, `delete from page_fts where page_id = ?`, pageID); err != nil {
if _, err := s.execContext(ctx, `delete from page_fts where page_id = ?`, pageID); err != nil {
return err
}
_, err = s.db.ExecContext(ctx, `insert into page_fts(page_id, title, body) values (?, ?, ?)`, pageID, title, strings.Join(parts, "\n"))
_, err = s.execContext(ctx, `insert into page_fts(page_id, title, body) values (?, ?, ?)`, pageID, title, strings.Join(parts, "\n"))
return err
}
@ -574,7 +614,7 @@ func (s *Store) Search(ctx context.Context, q string, limit int) ([]SearchResult
if limit <= 0 {
limit = 20
}
rows, err := s.db.QueryContext(ctx, `select 'page', page_id, title, snippet(page_fts, 2, '[', ']', '...', 16)
rows, err := s.queryContext(ctx, `select 'page', page_id, title, snippet(page_fts, 2, '[', ']', '...', 16)
from page_fts where page_fts match ? limit ?`, q, limit)
if err != nil {
return nil, err
@ -592,10 +632,10 @@ func (s *Store) Search(ctx context.Context, q string, limit int) ([]SearchResult
}
func (s *Store) RebuildFTS(ctx context.Context) error {
if _, err := s.db.ExecContext(ctx, `delete from page_fts`); err != nil {
if _, err := s.execContext(ctx, `delete from page_fts`); err != nil {
return err
}
rows, err := s.db.QueryContext(ctx, `select id from pages`)
rows, err := s.queryContext(ctx, `select id from pages`)
if err != nil {
return err
}
@ -616,10 +656,10 @@ func (s *Store) RebuildFTS(ctx context.Context) error {
return err
}
}
if _, err := s.db.ExecContext(ctx, `delete from comment_fts`); err != nil {
if _, err := s.execContext(ctx, `delete from comment_fts`); err != nil {
return err
}
_, err = s.db.ExecContext(ctx, `insert into comment_fts(comment_id, page_id, body) select id, page_id, text from comments where alive = 1`)
_, err = s.execContext(ctx, `insert into comment_fts(comment_id, page_id, body) select id, page_id, text from comments where alive = 1`)
return err
}
@ -639,11 +679,11 @@ func (s *Store) Status(ctx context.Context) (Status, error) {
{`select count(*) from raw_records`, &status.RawRecords},
}
for _, count := range counts {
if err := s.db.QueryRowContext(ctx, count.query).Scan(count.dest); err != nil {
if err := s.queryRowContext(ctx, count.query).Scan(count.dest); err != nil {
return Status{}, err
}
}
if err := s.db.QueryRowContext(ctx, `select coalesce(max(synced_at), 0) from sync_state`).Scan(&status.LastSyncAt); err != nil {
if err := s.queryRowContext(ctx, `select coalesce(max(synced_at), 0) from sync_state`).Scan(&status.LastSyncAt); err != nil {
return Status{}, err
}
status.DBBytes = fileSize(s.path)
@ -661,12 +701,12 @@ func (s *Store) Optimize(ctx context.Context, vacuum bool) (MaintenanceSummary,
`pragma optimize`,
`analyze`,
} {
if _, err := s.db.ExecContext(ctx, stmt); err != nil {
if _, err := s.execContext(ctx, stmt); err != nil {
return MaintenanceSummary{}, err
}
}
if vacuum {
if _, err := s.db.ExecContext(ctx, `vacuum`); err != nil {
if _, err := s.execContext(ctx, `vacuum`); err != nil {
return MaintenanceSummary{}, err
}
}

View File

@ -2,6 +2,7 @@ package store
import (
"context"
"errors"
"os"
"path/filepath"
"testing"
@ -69,6 +70,45 @@ func TestStoreDefersPageFTSRefresh(t *testing.T) {
}
}
func TestStoreTransactionCommitsAndRollsBack(t *testing.T) {
st, err := Open(filepath.Join(t.TempDir(), "notcrawl.db"))
if err != nil {
t.Fatal(err)
}
defer st.Close()
ctx := context.Background()
now := NowMS()
if err := st.WithTransaction(ctx, func() error {
return st.UpsertPage(ctx, Page{ID: "commit", Title: "Commit", Alive: true, Source: "test", SyncedAt: now})
}); err != nil {
t.Fatal(err)
}
var count int
if err := st.DB().QueryRowContext(ctx, `select count(*) from pages where id = 'commit'`).Scan(&count); err != nil {
t.Fatal(err)
}
if count != 1 {
t.Fatalf("expected committed page, got %d", count)
}
sentinel := errors.New("rollback")
err = st.WithTransaction(ctx, func() error {
if err := st.UpsertPage(ctx, Page{ID: "rollback", Title: "Rollback", Alive: true, Source: "test", SyncedAt: now}); err != nil {
return err
}
return sentinel
})
if !errors.Is(err, sentinel) {
t.Fatalf("expected rollback error, got %v", err)
}
if err := st.DB().QueryRowContext(ctx, `select count(*) from pages where id = 'rollback'`).Scan(&count); err != nil {
t.Fatal(err)
}
if count != 0 {
t.Fatalf("expected rolled back page, got %d", count)
}
}
func TestStoreOrdersBlocksByDisplayOrder(t *testing.T) {
st, err := Open(filepath.Join(t.TempDir(), "notcrawl.db"))
if err != nil {