From 40317aa538342d5d197e7038293017acc964cbf7 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 8 May 2026 09:49:13 +0100 Subject: [PATCH] fix: keep read commands available during tail --- CHANGELOG.md | 1 + internal/cli/analytics.go | 4 +- internal/cli/cli.go | 128 +++++++++++++++++++++++++++--- internal/cli/cli_test.go | 57 +++++++++++++ internal/cli/query_sync.go | 15 ++++ internal/cli/sync_lock.go | 23 ++++++ internal/cli/sync_lock_other.go | 4 + internal/cli/sync_lock_unix.go | 26 ++++++ internal/cli/sync_lock_windows.go | 25 ++++++ internal/store/store.go | 7 +- 10 files changed, 277 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6bd043..f51bafa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ ### Fixes - Made the terminal browser more useful and accurate: default guild scoping, newest-message startup, compact panes, selected-message detail panes, count-header sorting, local/remote status labels, right-click actions, Discord message URLs, row labels, direct-message pane labels, mention rendering, inline mention resolution, attachment details, and reply-context hydration without broad thread scans. +- Kept read-only commands such as `search`, `messages`, and safe `sql` usable while `tail` or another writer holds the sync lock. Thanks @PrinceOfEgypt. - Kept `tui --help`, status, and terminal-browser reads safe for fresh or missing local databases without triggering Git snapshot auto-update. - Kept local-only snapshot rows filtered during shared archive imports and forwarded snapshot import progress through the crawlkit import path. - Made stale Git snapshot imports plan shard deltas from crawlkit file fingerprints or Git object identity, so routine shared-archive refreshes import changed message tail shards instead of rebuilding every table and FTS index. diff --git a/internal/cli/analytics.go b/internal/cli/analytics.go index 463734a..7309271 100644 --- a/internal/cli/analytics.go +++ b/internal/cli/analytics.go @@ -20,11 +20,11 @@ func (r *runtime) runAnalytics(args []string) error { subArgs := args[1:] switch subcommand { case "quiet": - return r.withLocalStoreDefaultLocked(true, true, func() error { + return r.withLocalStoreRead(true, func() error { return r.runAnalyticsQuiet(subArgs) }) case "trends": - return r.withLocalStoreDefaultLocked(true, true, func() error { + return r.withLocalStoreRead(true, func() error { return r.runAnalyticsTrends(subArgs) }) default: diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 76f4614..66ac63b 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -155,7 +155,7 @@ func (r *runtime) dispatch(rest []string) error { return r.withLocalStoreLocked(false, func() error { return r.runWiretap(rest[1:]) }) case "search": autoShareUpdate := !hasBoolFlag(rest[1:], "--dm") - return r.withLocalStoreDefaultLocked(autoShareUpdate, autoShareUpdate, func() error { return r.runSearch(rest[1:]) }) + return r.withLocalStoreRead(autoShareUpdate, func() error { return r.runSearch(rest[1:]) }) case "tui": if hasHelpArg(rest[1:]) { return r.runTUI(rest[1:]) @@ -166,27 +166,30 @@ func (r *runtime) dispatch(rest []string) error { return r.withServicesAutoLocked(true, true, true, func() error { return r.runMessages(rest[1:]) }) } autoShareUpdate := !hasBoolFlag(rest[1:], "--dm") - return r.withLocalStoreDefaultLocked(autoShareUpdate, autoShareUpdate, func() error { return r.runMessages(rest[1:]) }) + return r.withLocalStoreRead(autoShareUpdate, func() error { return r.runMessages(rest[1:]) }) case "digest": - return r.withLocalStoreDefaultLocked(true, true, func() error { return r.runDigest(rest[1:]) }) + return r.withLocalStoreRead(true, func() error { return r.runDigest(rest[1:]) }) case "analytics": return r.runAnalytics(rest[1:]) case "dms": - return r.withLocalStoreDefault(false, func() error { return r.runDirectMessages(rest[1:]) }) + return r.withLocalStoreRead(false, func() error { return r.runDirectMessages(rest[1:]) }) case "mentions": - return r.withLocalStoreLocked(true, func() error { return r.runMentions(rest[1:]) }) + return r.withLocalStoreRead(true, func() error { return r.runMentions(rest[1:]) }) case "embed": return r.withLocalStoreLocked(true, func() error { return r.runEmbed(rest[1:]) }) case "sql": - return r.withLocalStoreLocked(true, func() error { return r.runSQL(rest[1:]) }) + if boolFlagEnabled(rest[1:], "--unsafe") { + return r.withLocalStoreLocked(true, func() error { return r.runSQL(rest[1:]) }) + } + return r.withLocalStoreRead(true, func() error { return r.runSQL(rest[1:]) }) case "members": - return r.withLocalStoreLocked(true, func() error { return r.runMembers(rest[1:]) }) + return r.withLocalStoreRead(true, func() error { return r.runMembers(rest[1:]) }) case "channels": - return r.withLocalStoreLocked(true, func() error { return r.runChannels(rest[1:]) }) + return r.withLocalStoreRead(true, func() error { return r.runChannels(rest[1:]) }) case "status": return r.withLocalStoreReadOnly(func() error { return r.runStatus(rest[1:]) }) case "report": - return r.withLocalStoreLocked(true, func() error { return r.runReport(rest[1:]) }) + return r.withLocalStoreRead(true, func() error { return r.runReport(rest[1:]) }) case "publish": return r.withServicesAutoLocked(false, false, true, func() error { return r.runPublish(rest[1:]) }) case "subscribe": @@ -220,6 +223,37 @@ func (r *runtime) withLocalStoreDefaultLocked(autoShareUpdate, lockDB bool, fn f return r.withLocalStoreUpdateLocked(boolShareUpdateMode(autoShareUpdate), lockDB, fn) } +func (r *runtime) withLocalStoreRead(autoShareUpdate bool, fn func() error) error { + return r.withLocalStoreReadUpdate(boolShareUpdateMode(autoShareUpdate), fn) +} + +func (r *runtime) withLocalStoreReadUpdate(updateMode shareUpdateMode, fn func() error) error { + cfg, err := config.Load(r.configPath) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + return configErr(err) + } + cfg = config.Default() + if err := cfg.Normalize(); err != nil { + return configErr(err) + } + } + if err := config.EnsureRuntimeDirs(cfg); err != nil { + return configErr(err) + } + dbPath, err := config.ExpandPath(cfg.DBPath) + if err != nil { + return configErr(err) + } + r.cfg = cfg + if r.shouldAutoUpdateShare(updateMode) { + if err := r.autoUpdateShareIfLockAvailable(dbPath, updateMode); err != nil { + return err + } + } + return r.openLocalStoreReadOnly(dbPath, fn) +} + func (r *runtime) withLocalStoreUpdateLocked(updateMode shareUpdateMode, lockDB bool, fn func() error) error { cfg, err := config.Load(r.configPath) if err != nil { @@ -247,6 +281,38 @@ func (r *runtime) withLocalStoreUpdateLocked(updateMode shareUpdateMode, lockDB return r.openLocalStore(dbPath, updateMode, fn) } +func (r *runtime) shouldAutoUpdateShare(mode shareUpdateMode) bool { + return os.Getenv("DISCRAWL_NO_AUTO_UPDATE") != "1" && + r.cfg.ShareEnabled() && + (mode == shareUpdateForce || mode == shareUpdateAuto || (mode == shareUpdateConfigured && r.cfg.Share.AutoUpdate)) +} + +func (r *runtime) autoUpdateShareIfLockAvailable(dbPath string, updateMode shareUpdateMode) error { + locked, err := r.tryWithSyncLock(func() error { + storeFactory := r.openStore + if storeFactory == nil { + storeFactory = store.Open + } + var openErr error + r.store, openErr = storeFactory(r.ctx, dbPath) + if openErr != nil { + return dbErr(openErr) + } + defer func() { + _ = r.store.Close() + r.store = nil + }() + return r.autoUpdateShare(updateMode) + }) + if err != nil { + return err + } + if !locked { + r.logger.Info("share update skipped; sync lock is held") + } + return nil +} + func (r *runtime) openLocalStore(dbPath string, updateMode shareUpdateMode, fn func() error) error { storeFactory := r.openStore if storeFactory == nil { @@ -295,6 +361,50 @@ func (r *runtime) withLocalStoreReadOnly(fn func() error) error { return fn() } +func (r *runtime) openLocalStoreReadOnly(dbPath string, fn func() error) error { + var openErr error + r.store, openErr = store.OpenReadOnly(r.ctx, dbPath) + if openErr != nil { + if errors.Is(openErr, os.ErrNotExist) { + storeFactory := r.openStore + if storeFactory == nil { + storeFactory = store.Open + } + r.store, openErr = storeFactory(r.ctx, dbPath) + if openErr == nil { + defer func() { _ = r.store.Close() }() + return fn() + } + } + if errors.Is(openErr, store.ErrSchemaVersionMismatch) { + if err := r.withSyncLock(func() error { + storeFactory := r.openStore + if storeFactory == nil { + storeFactory = store.Open + } + var migrateErr error + r.store, migrateErr = storeFactory(r.ctx, dbPath) + if migrateErr != nil { + return dbErr(migrateErr) + } + closeErr := r.store.Close() + r.store = nil + return closeErr + }); err != nil { + return err + } + r.store, openErr = store.OpenReadOnly(r.ctx, dbPath) + if openErr == nil { + defer func() { _ = r.store.Close() }() + return fn() + } + } + return dbErr(openErr) + } + defer func() { _ = r.store.Close() }() + return fn() +} + func (r *runtime) withServicesAuto(withDiscord, autoShareUpdate bool, fn func() error) error { return r.withServicesAutoLocked(withDiscord, autoShareUpdate, false, fn) } diff --git a/internal/cli/cli_test.go b/internal/cli/cli_test.go index b5b77dd..8481c37 100644 --- a/internal/cli/cli_test.go +++ b/internal/cli/cli_test.go @@ -1053,6 +1053,63 @@ func TestSyncLockSerializesConcurrentRuns(t *testing.T) { require.ErrorIs(t, err, context.DeadlineExceeded) } +func TestReadCommandsDoNotWaitForSyncLock(t *testing.T) { + if goruntime.GOOS == "windows" { + t.Skip("sync lock timing is flaky on Windows") + } + ctx := context.Background() + dir := t.TempDir() + cfg := config.Default() + cfg.DBPath = filepath.Join(dir, "discrawl.db") + cfgPath := filepath.Join(dir, "config.toml") + require.NoError(t, config.Write(cfgPath, cfg)) + + s := seedCLIStore(t, cfg.DBPath) + require.NoError(t, s.Close()) + + firstRelease, err := acquireSyncLock(ctx, filepath.Join(dir, ".discrawl-sync.lock")) + require.NoError(t, err) + defer func() { _ = firstRelease() }() + + for _, args := range [][]string{ + {"--config", cfgPath, "search", "automatic"}, + {"--config", cfgPath, "messages", "--channel", "general", "--last", "1"}, + {"--config", cfgPath, "sql", "select count(*) as total from messages"}, + } { + runCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + var out bytes.Buffer + err := Run(runCtx, args, &out, &bytes.Buffer{}) + cancel() + require.NoError(t, err, args) + require.NotEmpty(t, out.String(), args) + } +} + +func TestReadCommandsMigrateOlderLocalStore(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + cfg := config.Default() + cfg.DBPath = filepath.Join(dir, "discrawl.db") + cfgPath := filepath.Join(dir, "config.toml") + require.NoError(t, config.Write(cfgPath, cfg)) + + s := seedCLIStore(t, cfg.DBPath) + _, err := s.DB().ExecContext(ctx, `pragma user_version = 1`) + require.NoError(t, err) + require.NoError(t, s.Close()) + + var out bytes.Buffer + require.NoError(t, Run(ctx, []string{"--config", cfgPath, "search", "automatic"}, &out, &bytes.Buffer{})) + require.Contains(t, out.String(), "automatic updates work") + + reader, err := store.OpenReadOnly(ctx, cfg.DBPath) + require.NoError(t, err) + defer func() { _ = reader.Close() }() + var version int + require.NoError(t, reader.DB().QueryRowContext(ctx, `pragma user_version`).Scan(&version)) + require.Equal(t, 2, version) +} + func seedCLIStore(t *testing.T, path string) *store.Store { t.Helper() ctx := context.Background() diff --git a/internal/cli/query_sync.go b/internal/cli/query_sync.go index 87f46ec..d6c40b4 100644 --- a/internal/cli/query_sync.go +++ b/internal/cli/query_sync.go @@ -97,6 +97,21 @@ func hasBoolFlag(args []string, name string) bool { return false } +func boolFlagEnabled(args []string, name string) bool { + for _, arg := range args { + if arg == name { + return true + } + if raw, ok := strings.CutPrefix(arg, name+"="); ok { + switch strings.ToLower(strings.TrimSpace(raw)) { + case "1", "t", "true", "y", "yes", "on": + return true + } + } + } + return false +} + func hasHelpArg(args []string) bool { for _, arg := range args { if arg == "help" || arg == "--help" || arg == "-h" { diff --git a/internal/cli/sync_lock.go b/internal/cli/sync_lock.go index 3d81414..44fa5f5 100644 --- a/internal/cli/sync_lock.go +++ b/internal/cli/sync_lock.go @@ -34,6 +34,29 @@ func (r *runtime) withSyncLock(fn func() error) error { return fn() } +func (r *runtime) tryWithSyncLock(fn func() error) (bool, error) { + if r.dbLockHeld { + return true, fn() + } + lockPath, err := r.syncLockPath() + if err != nil { + return false, err + } + release, locked, err := tryAcquireSyncLock(lockPath) + if err != nil || !locked { + return locked, err + } + r.dbLockHeld = true + r.lockStarted = r.nowUTC() + r.setSyncLockPhase("locked") + defer func() { + r.dbLockHeld = false + r.lockStarted = time.Time{} + _ = release() + }() + return true, fn() +} + func (r *runtime) setSyncLockPhase(phase string) { if !r.dbLockHeld { return diff --git a/internal/cli/sync_lock_other.go b/internal/cli/sync_lock_other.go index d321c82..e95878b 100644 --- a/internal/cli/sync_lock_other.go +++ b/internal/cli/sync_lock_other.go @@ -7,3 +7,7 @@ import "context" func acquireSyncLock(context.Context, string) (func() error, error) { return func() error { return nil }, nil } + +func tryAcquireSyncLock(string) (func() error, bool, error) { + return func() error { return nil }, true, nil +} diff --git a/internal/cli/sync_lock_unix.go b/internal/cli/sync_lock_unix.go index cc6d797..eda26fa 100644 --- a/internal/cli/sync_lock_unix.go +++ b/internal/cli/sync_lock_unix.go @@ -51,3 +51,29 @@ func acquireSyncLock(ctx context.Context, path string) (func() error, error) { } } } + +func tryAcquireSyncLock(path string) (func() error, bool, error) { + file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0o600) + if err != nil { + return nil, false, fmt.Errorf("open sync lock: %w", err) + } + err = unix.Flock(int(file.Fd()), unix.LOCK_EX|unix.LOCK_NB) + if err != nil { + _ = file.Close() + if errors.Is(err, unix.EWOULDBLOCK) || errors.Is(err, unix.EAGAIN) { + return nil, false, nil + } + return nil, false, fmt.Errorf("acquire sync lock: %w", err) + } + _, _ = file.Seek(0, 0) + _ = file.Truncate(0) + _, _ = fmt.Fprintf(file, "pid=%d\n", os.Getpid()) + return func() error { + unlockErr := unix.Flock(int(file.Fd()), unix.LOCK_UN) + closeErr := file.Close() + if unlockErr != nil { + return unlockErr + } + return closeErr + }, true, nil +} diff --git a/internal/cli/sync_lock_windows.go b/internal/cli/sync_lock_windows.go index f596753..cb3cdd1 100644 --- a/internal/cli/sync_lock_windows.go +++ b/internal/cli/sync_lock_windows.go @@ -49,3 +49,28 @@ func acquireSyncLock(ctx context.Context, path string) (func() error, error) { } } } + +func tryAcquireSyncLock(path string) (func() error, bool, error) { + file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0o600) + if err != nil { + return nil, false, fmt.Errorf("open sync lock: %w", err) + } + handle := windows.Handle(file.Fd()) + overlapped := &windows.Overlapped{} + err = windows.LockFileEx(handle, windows.LOCKFILE_EXCLUSIVE_LOCK|windows.LOCKFILE_FAIL_IMMEDIATELY, 0, 1, 0, overlapped) + if err != nil { + _ = file.Close() + return nil, false, nil + } + _, _ = file.Seek(0, 0) + _ = file.Truncate(0) + _, _ = fmt.Fprintf(file, "pid=%d\n", os.Getpid()) + return func() error { + unlockErr := windows.UnlockFileEx(handle, 0, 1, 0, overlapped) + closeErr := file.Close() + if unlockErr != nil { + return unlockErr + } + return closeErr + }, true, nil +} diff --git a/internal/store/store.go b/internal/store/store.go index fcabea4..21d59fc 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -3,6 +3,7 @@ package store import ( "context" "database/sql" + "errors" "fmt" "hash/fnv" "strconv" @@ -18,6 +19,8 @@ const ( storeSchemaVersion = 2 ) +var ErrSchemaVersionMismatch = errors.New("database schema version mismatch") + type Store struct { db *sql.DB path string @@ -135,7 +138,7 @@ func OpenReadOnly(ctx context.Context, path string) (*Store, error) { return nil, err } else if version != storeSchemaVersion { _ = base.Close() - return nil, fmt.Errorf("database schema version mismatch: got %d want %d", version, storeSchemaVersion) + return nil, fmt.Errorf("%w: got %d want %d", ErrSchemaVersionMismatch, version, storeSchemaVersion) } return store, nil } @@ -179,7 +182,7 @@ func (s *Store) migrate(ctx context.Context) error { if version, err := s.schemaVersion(ctx); err != nil { return err } else if version != storeSchemaVersion { - return fmt.Errorf("database schema version mismatch: got %d want %d", version, storeSchemaVersion) + return fmt.Errorf("%w: got %d want %d", ErrSchemaVersionMismatch, version, storeSchemaVersion) } if err := s.applyQueryIndexMigration(ctx); err != nil { return err