Compare commits
1 Commits
ci/update-
...
perf/trans
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e1019e0dca |
@ -63,26 +63,28 @@ func Ingest(ctx context.Context, st *store.Store, path, cacheDir string) (Summar
|
||||
}
|
||||
defer db.Close()
|
||||
s := Summary{Source: source}
|
||||
if err := st.DeferPageFTS(ctx, func() error {
|
||||
if s.Spaces, err = ingestSpaces(ctx, st, db); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.Users, err = ingestUsers(ctx, st, db); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.Teams, err = ingestTeams(ctx, st, db); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.Collections, err = ingestCollections(ctx, st, db); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.Pages, s.Blocks, s.RawRecords, err = ingestBlocks(ctx, st, db); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.Comments, err = ingestComments(ctx, st, db); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
if err := st.WithTransaction(ctx, func() error {
|
||||
return st.DeferPageFTS(ctx, func() error {
|
||||
if s.Spaces, err = ingestSpaces(ctx, st, db); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.Users, err = ingestUsers(ctx, st, db); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.Teams, err = ingestTeams(ctx, st, db); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.Collections, err = ingestCollections(ctx, st, db); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.Pages, s.Blocks, s.RawRecords, err = ingestBlocks(ctx, st, db); err != nil {
|
||||
return err
|
||||
}
|
||||
if s.Comments, err = ingestComments(ctx, st, db); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return s, err
|
||||
}
|
||||
|
||||
@ -7,7 +7,7 @@ import (
|
||||
)
|
||||
|
||||
func (s *Store) Pages(ctx context.Context) ([]Page, error) {
|
||||
rows, err := s.db.QueryContext(ctx, `select id, space_id, parent_id, parent_table, collection_id, title, url, icon, cover,
|
||||
rows, err := s.queryContext(ctx, `select id, space_id, parent_id, parent_table, collection_id, title, url, icon, cover,
|
||||
properties_json, created_time, last_edited_time, alive, source, raw_json, synced_at
|
||||
from pages where alive = 1 order by coalesce(last_edited_time, 0) desc, title`)
|
||||
if err != nil {
|
||||
@ -29,7 +29,7 @@ func (s *Store) Pages(ctx context.Context) ([]Page, error) {
|
||||
}
|
||||
|
||||
func (s *Store) Collections(ctx context.Context) ([]Collection, error) {
|
||||
rows, err := s.db.QueryContext(ctx, `select id, space_id, parent_id, parent_table, name, schema_json, format_json, raw_json, source, synced_at
|
||||
rows, err := s.queryContext(ctx, `select id, space_id, parent_id, parent_table, name, schema_json, format_json, raw_json, source, synced_at
|
||||
from collections order by lower(coalesce(name, id)), id`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -48,13 +48,13 @@ func (s *Store) Collections(ctx context.Context) ([]Collection, error) {
|
||||
|
||||
func (s *Store) Collection(ctx context.Context, id string) (Collection, error) {
|
||||
var c Collection
|
||||
err := s.db.QueryRowContext(ctx, `select id, space_id, parent_id, parent_table, name, schema_json, format_json, raw_json, source, synced_at
|
||||
err := s.queryRowContext(ctx, `select id, space_id, parent_id, parent_table, name, schema_json, format_json, raw_json, source, synced_at
|
||||
from collections where id = ?`, id).Scan(&c.ID, &c.SpaceID, &c.ParentID, &c.ParentTable, &c.Name, &c.SchemaJSON, &c.FormatJSON, &c.RawJSON, &c.Source, &c.SyncedAt)
|
||||
return c, err
|
||||
}
|
||||
|
||||
func (s *Store) CollectionPages(ctx context.Context, collectionID string) ([]Page, error) {
|
||||
rows, err := s.db.QueryContext(ctx, `select id, space_id, parent_id, parent_table, collection_id, title, url, icon, cover,
|
||||
rows, err := s.queryContext(ctx, `select id, space_id, parent_id, parent_table, collection_id, title, url, icon, cover,
|
||||
properties_json, created_time, last_edited_time, alive, source, raw_json, synced_at
|
||||
from pages where collection_id = ? and alive = 1 order by coalesce(last_edited_time, 0) desc, title`, collectionID)
|
||||
if err != nil {
|
||||
@ -76,7 +76,7 @@ func (s *Store) CollectionPages(ctx context.Context, collectionID string) ([]Pag
|
||||
}
|
||||
|
||||
func (s *Store) PageBlocks(ctx context.Context, pageID string) ([]Block, error) {
|
||||
rows, err := s.db.QueryContext(ctx, `select id, page_id, space_id, parent_id, parent_table, type, text, properties_json,
|
||||
rows, err := s.queryContext(ctx, `select id, page_id, space_id, parent_id, parent_table, type, text, properties_json,
|
||||
content_json, format_json, display_order, created_time, last_edited_time, alive, source, raw_json, synced_at
|
||||
from blocks where page_id = ? and alive = 1 order by parent_id, display_order, created_time, id`, pageID)
|
||||
if err != nil {
|
||||
@ -98,7 +98,7 @@ func (s *Store) PageBlocks(ctx context.Context, pageID string) ([]Block, error)
|
||||
}
|
||||
|
||||
func (s *Store) PageComments(ctx context.Context, pageID string) ([]Comment, error) {
|
||||
rows, err := s.db.QueryContext(ctx, `select id, page_id, space_id, parent_id, text, created_by_id,
|
||||
rows, err := s.queryContext(ctx, `select id, page_id, space_id, parent_id, text, created_by_id,
|
||||
created_time, last_edited_time, alive, raw_json, source, synced_at
|
||||
from comments where page_id = ? and alive = 1 order by created_time, id`, pageID)
|
||||
if err != nil {
|
||||
@ -124,7 +124,7 @@ func (s *Store) SpaceName(ctx context.Context, id string) (string, error) {
|
||||
return "default", nil
|
||||
}
|
||||
var name sql.NullString
|
||||
err := s.db.QueryRowContext(ctx, `select name from spaces where id = ?`, id).Scan(&name)
|
||||
err := s.queryRowContext(ctx, `select name from spaces where id = ?`, id).Scan(&name)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return "space-" + shortID(id), nil
|
||||
@ -142,7 +142,7 @@ func (s *Store) TeamName(ctx context.Context, id string) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
var name sql.NullString
|
||||
err := s.db.QueryRowContext(ctx, `select name from teams where id = ?`, id).Scan(&name)
|
||||
err := s.queryRowContext(ctx, `select name from teams where id = ?`, id).Scan(&name)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return "team-" + shortID(id), nil
|
||||
@ -174,7 +174,7 @@ func (s *Store) resolveTeamID(ctx context.Context, table, id, collectionID strin
|
||||
switch table {
|
||||
case "block":
|
||||
var parentID, parentTable sql.NullString
|
||||
err := s.db.QueryRowContext(ctx, `select parent_id, parent_table from blocks where id = ?`, id).Scan(&parentID, &parentTable)
|
||||
err := s.queryRowContext(ctx, `select parent_id, parent_table from blocks where id = ?`, id).Scan(&parentID, &parentTable)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return "", nil
|
||||
@ -184,7 +184,7 @@ func (s *Store) resolveTeamID(ctx context.Context, table, id, collectionID strin
|
||||
return s.resolveTeamID(ctx, parentTable.String, parentID.String, "", seen)
|
||||
case "collection", "database", "data_source":
|
||||
var parentID, parentTable sql.NullString
|
||||
err := s.db.QueryRowContext(ctx, `select parent_id, parent_table from collections where id = ?`, id).Scan(&parentID, &parentTable)
|
||||
err := s.queryRowContext(ctx, `select parent_id, parent_table from collections where id = ?`, id).Scan(&parentID, &parentTable)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return "", nil
|
||||
|
||||
@ -18,6 +18,7 @@ const schemaVersion = 1
|
||||
|
||||
type Store struct {
|
||||
db *sql.DB
|
||||
tx *sql.Tx
|
||||
path string
|
||||
deferredFTS int
|
||||
deferredFTSPages map[string]bool
|
||||
@ -88,6 +89,45 @@ func (s *Store) DB() *sql.DB {
|
||||
return s.db
|
||||
}
|
||||
|
||||
func (s *Store) execContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
|
||||
if s.tx != nil {
|
||||
return s.tx.ExecContext(ctx, query, args...)
|
||||
}
|
||||
return s.db.ExecContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (s *Store) queryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
|
||||
if s.tx != nil {
|
||||
return s.tx.QueryContext(ctx, query, args...)
|
||||
}
|
||||
return s.db.QueryContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (s *Store) queryRowContext(ctx context.Context, query string, args ...any) *sql.Row {
|
||||
if s.tx != nil {
|
||||
return s.tx.QueryRowContext(ctx, query, args...)
|
||||
}
|
||||
return s.db.QueryRowContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (s *Store) WithTransaction(ctx context.Context, fn func() error) error {
|
||||
if s.tx != nil {
|
||||
return fn()
|
||||
}
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.tx = tx
|
||||
err = fn()
|
||||
s.tx = nil
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return err
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (s *Store) Close() error {
|
||||
if s == nil || s.db == nil {
|
||||
return nil
|
||||
@ -252,12 +292,12 @@ func (s *Store) init(ctx context.Context) error {
|
||||
`create virtual table if not exists comment_fts using fts5(comment_id unindexed, page_id unindexed, body)`,
|
||||
}
|
||||
for _, stmt := range stmts {
|
||||
if _, err := s.db.ExecContext(ctx, stmt); err != nil {
|
||||
if _, err := s.execContext(ctx, stmt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
var current int
|
||||
row := s.db.QueryRowContext(ctx, `select value from meta where key = 'schema_version'`)
|
||||
row := s.queryRowContext(ctx, `select value from meta where key = 'schema_version'`)
|
||||
err := row.Scan(¤t)
|
||||
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
||||
return err
|
||||
@ -271,20 +311,20 @@ func (s *Store) init(ctx context.Context) error {
|
||||
if err := s.ensureColumn(ctx, "collections", "parent_table", "text"); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, `create index if not exists blocks_page_alive_order on blocks(page_id, alive, parent_id, display_order, created_time, id)`); err != nil {
|
||||
if _, err := s.execContext(ctx, `create index if not exists blocks_page_alive_order on blocks(page_id, alive, parent_id, display_order, created_time, id)`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, `create index if not exists blocks_page_alive_created on blocks(page_id, alive, created_time, id)`); err != nil {
|
||||
if _, err := s.execContext(ctx, `create index if not exists blocks_page_alive_created on blocks(page_id, alive, created_time, id)`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, `insert or replace into meta(key, value) values('schema_version', ?)`, schemaVersion); err != nil {
|
||||
if _, err := s.execContext(ctx, `insert or replace into meta(key, value) values('schema_version', ?)`, schemaVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) ensureColumn(ctx context.Context, table, column, definition string) error {
|
||||
rows, err := s.db.QueryContext(ctx, `pragma table_info(`+table+`)`)
|
||||
rows, err := s.queryContext(ctx, `pragma table_info(`+table+`)`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -305,7 +345,7 @@ func (s *Store) ensureColumn(ctx context.Context, table, column, definition stri
|
||||
if err := rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.db.ExecContext(ctx, `alter table `+table+` add column `+column+` `+definition)
|
||||
_, err = s.execContext(ctx, `alter table `+table+` add column `+column+` `+definition)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -325,7 +365,7 @@ func IntBool(v int) bool {
|
||||
}
|
||||
|
||||
func (s *Store) UpsertSpace(ctx context.Context, x Space) error {
|
||||
_, err := s.db.ExecContext(ctx, `insert into spaces(id, name, raw_json, source, synced_at)
|
||||
_, err := s.execContext(ctx, `insert into spaces(id, name, raw_json, source, synced_at)
|
||||
values (?, ?, ?, ?, ?)
|
||||
on conflict(id) do update set name=excluded.name, raw_json=excluded.raw_json, source=excluded.source, synced_at=excluded.synced_at`,
|
||||
x.ID, x.Name, x.RawJSON, x.Source, x.SyncedAt)
|
||||
@ -333,7 +373,7 @@ func (s *Store) UpsertSpace(ctx context.Context, x Space) error {
|
||||
}
|
||||
|
||||
func (s *Store) UpsertUser(ctx context.Context, x User) error {
|
||||
_, err := s.db.ExecContext(ctx, `insert into users(id, name, email, raw_json, source, synced_at)
|
||||
_, err := s.execContext(ctx, `insert into users(id, name, email, raw_json, source, synced_at)
|
||||
values (?, ?, ?, ?, ?, ?)
|
||||
on conflict(id) do update set name=excluded.name, email=excluded.email, raw_json=excluded.raw_json, source=excluded.source, synced_at=excluded.synced_at`,
|
||||
x.ID, x.Name, x.Email, x.RawJSON, x.Source, x.SyncedAt)
|
||||
@ -341,7 +381,7 @@ func (s *Store) UpsertUser(ctx context.Context, x User) error {
|
||||
}
|
||||
|
||||
func (s *Store) UpsertTeam(ctx context.Context, x Team) error {
|
||||
_, err := s.db.ExecContext(ctx, `insert into teams(id, space_id, parent_id, parent_table, name, raw_json, source, synced_at)
|
||||
_, err := s.execContext(ctx, `insert into teams(id, space_id, parent_id, parent_table, name, raw_json, source, synced_at)
|
||||
values (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
on conflict(id) do update set
|
||||
space_id=excluded.space_id,
|
||||
@ -356,7 +396,7 @@ func (s *Store) UpsertTeam(ctx context.Context, x Team) error {
|
||||
}
|
||||
|
||||
func (s *Store) UpsertPage(ctx context.Context, x Page) error {
|
||||
_, err := s.db.ExecContext(ctx, `insert into pages(
|
||||
_, err := s.execContext(ctx, `insert into pages(
|
||||
id, space_id, parent_id, parent_table, collection_id, title, url, icon, cover, properties_json,
|
||||
created_time, last_edited_time, alive, source, raw_json, synced_at)
|
||||
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
@ -385,7 +425,7 @@ func (s *Store) UpsertPage(ctx context.Context, x Page) error {
|
||||
}
|
||||
|
||||
func (s *Store) UpsertBlock(ctx context.Context, x Block) error {
|
||||
_, err := s.db.ExecContext(ctx, `insert into blocks(
|
||||
_, err := s.execContext(ctx, `insert into blocks(
|
||||
id, page_id, space_id, parent_id, parent_table, type, text, properties_json, content_json, format_json,
|
||||
display_order, created_time, last_edited_time, alive, source, raw_json, synced_at)
|
||||
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
@ -418,7 +458,7 @@ func (s *Store) UpsertBlock(ctx context.Context, x Block) error {
|
||||
}
|
||||
|
||||
func (s *Store) UpsertCollection(ctx context.Context, x Collection) error {
|
||||
_, err := s.db.ExecContext(ctx, `insert into collections(id, space_id, parent_id, parent_table, name, schema_json, format_json, raw_json, source, synced_at)
|
||||
_, err := s.execContext(ctx, `insert into collections(id, space_id, parent_id, parent_table, name, schema_json, format_json, raw_json, source, synced_at)
|
||||
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
on conflict(id) do update set space_id=excluded.space_id, parent_id=excluded.parent_id, parent_table=excluded.parent_table, name=excluded.name,
|
||||
schema_json=excluded.schema_json, format_json=excluded.format_json, raw_json=excluded.raw_json,
|
||||
@ -428,7 +468,7 @@ func (s *Store) UpsertCollection(ctx context.Context, x Collection) error {
|
||||
}
|
||||
|
||||
func (s *Store) UpsertComment(ctx context.Context, x Comment) error {
|
||||
_, err := s.db.ExecContext(ctx, `insert into comments(id, page_id, space_id, parent_id, text, created_by_id, created_time, last_edited_time, alive, raw_json, source, synced_at)
|
||||
_, err := s.execContext(ctx, `insert into comments(id, page_id, space_id, parent_id, text, created_by_id, created_time, last_edited_time, alive, raw_json, source, synced_at)
|
||||
values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
on conflict(id) do update set page_id=excluded.page_id, space_id=excluded.space_id, parent_id=excluded.parent_id,
|
||||
text=excluded.text, created_by_id=excluded.created_by_id, created_time=excluded.created_time,
|
||||
@ -438,16 +478,16 @@ func (s *Store) UpsertComment(ctx context.Context, x Comment) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.db.ExecContext(ctx, `delete from comment_fts where comment_id = ?`, x.ID)
|
||||
_, err = s.execContext(ctx, `delete from comment_fts where comment_id = ?`, x.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.db.ExecContext(ctx, `insert into comment_fts(comment_id, page_id, body) values (?, ?, ?)`, x.ID, x.PageID, x.Text)
|
||||
_, err = s.execContext(ctx, `insert into comment_fts(comment_id, page_id, body) values (?, ?, ?)`, x.ID, x.PageID, x.Text)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) UpsertRawRecord(ctx context.Context, x RawRecord) error {
|
||||
_, err := s.db.ExecContext(ctx, `insert into raw_records(source, record_table, record_id, parent_id, space_id, raw_json, synced_at)
|
||||
_, err := s.execContext(ctx, `insert into raw_records(source, record_table, record_id, parent_id, space_id, raw_json, synced_at)
|
||||
values (?, ?, ?, ?, ?, ?, ?)
|
||||
on conflict(source, record_table, record_id) do update set parent_id=excluded.parent_id, space_id=excluded.space_id,
|
||||
raw_json=excluded.raw_json, synced_at=excluded.synced_at`,
|
||||
@ -456,7 +496,7 @@ func (s *Store) UpsertRawRecord(ctx context.Context, x RawRecord) error {
|
||||
}
|
||||
|
||||
func (s *Store) SetSyncState(ctx context.Context, source, entityType, entityID, cursor string) error {
|
||||
_, err := s.db.ExecContext(ctx, `insert into sync_state(source, entity_type, entity_id, cursor, synced_at)
|
||||
_, err := s.execContext(ctx, `insert into sync_state(source, entity_type, entity_id, cursor, synced_at)
|
||||
values (?, ?, ?, ?, ?)
|
||||
on conflict(source, entity_type, entity_id) do update set cursor=excluded.cursor, synced_at=excluded.synced_at`,
|
||||
source, entityType, entityID, cursor, NowMS())
|
||||
@ -503,7 +543,7 @@ func (s *Store) markPageFTS(ctx context.Context, pageID string) error {
|
||||
|
||||
func (s *Store) refreshPageFTS(ctx context.Context, pageID string) error {
|
||||
var title string
|
||||
if err := s.db.QueryRowContext(ctx, `select title from pages where id = ?`, pageID).Scan(&title); err != nil {
|
||||
if err := s.queryRowContext(ctx, `select title from pages where id = ?`, pageID).Scan(&title); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return nil
|
||||
}
|
||||
@ -514,10 +554,10 @@ func (s *Store) refreshPageFTS(ctx context.Context, pageID string) error {
|
||||
return err
|
||||
}
|
||||
parts := pageBlockTextParts(pageID, blocks)
|
||||
if _, err := s.db.ExecContext(ctx, `delete from page_fts where page_id = ?`, pageID); err != nil {
|
||||
if _, err := s.execContext(ctx, `delete from page_fts where page_id = ?`, pageID); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.db.ExecContext(ctx, `insert into page_fts(page_id, title, body) values (?, ?, ?)`, pageID, title, strings.Join(parts, "\n"))
|
||||
_, err = s.execContext(ctx, `insert into page_fts(page_id, title, body) values (?, ?, ?)`, pageID, title, strings.Join(parts, "\n"))
|
||||
return err
|
||||
}
|
||||
|
||||
@ -574,7 +614,7 @@ func (s *Store) Search(ctx context.Context, q string, limit int) ([]SearchResult
|
||||
if limit <= 0 {
|
||||
limit = 20
|
||||
}
|
||||
rows, err := s.db.QueryContext(ctx, `select 'page', page_id, title, snippet(page_fts, 2, '[', ']', '...', 16)
|
||||
rows, err := s.queryContext(ctx, `select 'page', page_id, title, snippet(page_fts, 2, '[', ']', '...', 16)
|
||||
from page_fts where page_fts match ? limit ?`, q, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -592,10 +632,10 @@ func (s *Store) Search(ctx context.Context, q string, limit int) ([]SearchResult
|
||||
}
|
||||
|
||||
func (s *Store) RebuildFTS(ctx context.Context) error {
|
||||
if _, err := s.db.ExecContext(ctx, `delete from page_fts`); err != nil {
|
||||
if _, err := s.execContext(ctx, `delete from page_fts`); err != nil {
|
||||
return err
|
||||
}
|
||||
rows, err := s.db.QueryContext(ctx, `select id from pages`)
|
||||
rows, err := s.queryContext(ctx, `select id from pages`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -616,10 +656,10 @@ func (s *Store) RebuildFTS(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, `delete from comment_fts`); err != nil {
|
||||
if _, err := s.execContext(ctx, `delete from comment_fts`); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.db.ExecContext(ctx, `insert into comment_fts(comment_id, page_id, body) select id, page_id, text from comments where alive = 1`)
|
||||
_, err = s.execContext(ctx, `insert into comment_fts(comment_id, page_id, body) select id, page_id, text from comments where alive = 1`)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -639,11 +679,11 @@ func (s *Store) Status(ctx context.Context) (Status, error) {
|
||||
{`select count(*) from raw_records`, &status.RawRecords},
|
||||
}
|
||||
for _, count := range counts {
|
||||
if err := s.db.QueryRowContext(ctx, count.query).Scan(count.dest); err != nil {
|
||||
if err := s.queryRowContext(ctx, count.query).Scan(count.dest); err != nil {
|
||||
return Status{}, err
|
||||
}
|
||||
}
|
||||
if err := s.db.QueryRowContext(ctx, `select coalesce(max(synced_at), 0) from sync_state`).Scan(&status.LastSyncAt); err != nil {
|
||||
if err := s.queryRowContext(ctx, `select coalesce(max(synced_at), 0) from sync_state`).Scan(&status.LastSyncAt); err != nil {
|
||||
return Status{}, err
|
||||
}
|
||||
status.DBBytes = fileSize(s.path)
|
||||
@ -661,12 +701,12 @@ func (s *Store) Optimize(ctx context.Context, vacuum bool) (MaintenanceSummary,
|
||||
`pragma optimize`,
|
||||
`analyze`,
|
||||
} {
|
||||
if _, err := s.db.ExecContext(ctx, stmt); err != nil {
|
||||
if _, err := s.execContext(ctx, stmt); err != nil {
|
||||
return MaintenanceSummary{}, err
|
||||
}
|
||||
}
|
||||
if vacuum {
|
||||
if _, err := s.db.ExecContext(ctx, `vacuum`); err != nil {
|
||||
if _, err := s.execContext(ctx, `vacuum`); err != nil {
|
||||
return MaintenanceSummary{}, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
@ -69,6 +70,45 @@ func TestStoreDefersPageFTSRefresh(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreTransactionCommitsAndRollsBack(t *testing.T) {
|
||||
st, err := Open(filepath.Join(t.TempDir(), "notcrawl.db"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer st.Close()
|
||||
ctx := context.Background()
|
||||
now := NowMS()
|
||||
if err := st.WithTransaction(ctx, func() error {
|
||||
return st.UpsertPage(ctx, Page{ID: "commit", Title: "Commit", Alive: true, Source: "test", SyncedAt: now})
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var count int
|
||||
if err := st.DB().QueryRowContext(ctx, `select count(*) from pages where id = 'commit'`).Scan(&count); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if count != 1 {
|
||||
t.Fatalf("expected committed page, got %d", count)
|
||||
}
|
||||
|
||||
sentinel := errors.New("rollback")
|
||||
err = st.WithTransaction(ctx, func() error {
|
||||
if err := st.UpsertPage(ctx, Page{ID: "rollback", Title: "Rollback", Alive: true, Source: "test", SyncedAt: now}); err != nil {
|
||||
return err
|
||||
}
|
||||
return sentinel
|
||||
})
|
||||
if !errors.Is(err, sentinel) {
|
||||
t.Fatalf("expected rollback error, got %v", err)
|
||||
}
|
||||
if err := st.DB().QueryRowContext(ctx, `select count(*) from pages where id = 'rollback'`).Scan(&count); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if count != 0 {
|
||||
t.Fatalf("expected rolled back page, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreOrdersBlocksByDisplayOrder(t *testing.T) {
|
||||
st, err := Open(filepath.Join(t.TempDir(), "notcrawl.db"))
|
||||
if err != nil {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user