fix: keep read commands available during tail
This commit is contained in:
parent
fb969672e0
commit
40317aa538
@ -12,6 +12,7 @@
|
||||
### Fixes
|
||||
|
||||
- Made the terminal browser more useful and accurate: default guild scoping, newest-message startup, compact panes, selected-message detail panes, count-header sorting, local/remote status labels, right-click actions, Discord message URLs, row labels, direct-message pane labels, mention rendering, inline mention resolution, attachment details, and reply-context hydration without broad thread scans.
|
||||
- Kept read-only commands such as `search`, `messages`, and safe `sql` usable while `tail` or another writer holds the sync lock. Thanks @PrinceOfEgypt.
|
||||
- Kept `tui --help`, status, and terminal-browser reads safe for fresh or missing local databases without triggering Git snapshot auto-update.
|
||||
- Kept local-only snapshot rows filtered during shared archive imports and forwarded snapshot import progress through the crawlkit import path.
|
||||
- Made stale Git snapshot imports plan shard deltas from crawlkit file fingerprints or Git object identity, so routine shared-archive refreshes import changed message tail shards instead of rebuilding every table and FTS index.
|
||||
|
||||
@ -20,11 +20,11 @@ func (r *runtime) runAnalytics(args []string) error {
|
||||
subArgs := args[1:]
|
||||
switch subcommand {
|
||||
case "quiet":
|
||||
return r.withLocalStoreDefaultLocked(true, true, func() error {
|
||||
return r.withLocalStoreRead(true, func() error {
|
||||
return r.runAnalyticsQuiet(subArgs)
|
||||
})
|
||||
case "trends":
|
||||
return r.withLocalStoreDefaultLocked(true, true, func() error {
|
||||
return r.withLocalStoreRead(true, func() error {
|
||||
return r.runAnalyticsTrends(subArgs)
|
||||
})
|
||||
default:
|
||||
|
||||
@ -155,7 +155,7 @@ func (r *runtime) dispatch(rest []string) error {
|
||||
return r.withLocalStoreLocked(false, func() error { return r.runWiretap(rest[1:]) })
|
||||
case "search":
|
||||
autoShareUpdate := !hasBoolFlag(rest[1:], "--dm")
|
||||
return r.withLocalStoreDefaultLocked(autoShareUpdate, autoShareUpdate, func() error { return r.runSearch(rest[1:]) })
|
||||
return r.withLocalStoreRead(autoShareUpdate, func() error { return r.runSearch(rest[1:]) })
|
||||
case "tui":
|
||||
if hasHelpArg(rest[1:]) {
|
||||
return r.runTUI(rest[1:])
|
||||
@ -166,27 +166,30 @@ func (r *runtime) dispatch(rest []string) error {
|
||||
return r.withServicesAutoLocked(true, true, true, func() error { return r.runMessages(rest[1:]) })
|
||||
}
|
||||
autoShareUpdate := !hasBoolFlag(rest[1:], "--dm")
|
||||
return r.withLocalStoreDefaultLocked(autoShareUpdate, autoShareUpdate, func() error { return r.runMessages(rest[1:]) })
|
||||
return r.withLocalStoreRead(autoShareUpdate, func() error { return r.runMessages(rest[1:]) })
|
||||
case "digest":
|
||||
return r.withLocalStoreDefaultLocked(true, true, func() error { return r.runDigest(rest[1:]) })
|
||||
return r.withLocalStoreRead(true, func() error { return r.runDigest(rest[1:]) })
|
||||
case "analytics":
|
||||
return r.runAnalytics(rest[1:])
|
||||
case "dms":
|
||||
return r.withLocalStoreDefault(false, func() error { return r.runDirectMessages(rest[1:]) })
|
||||
return r.withLocalStoreRead(false, func() error { return r.runDirectMessages(rest[1:]) })
|
||||
case "mentions":
|
||||
return r.withLocalStoreLocked(true, func() error { return r.runMentions(rest[1:]) })
|
||||
return r.withLocalStoreRead(true, func() error { return r.runMentions(rest[1:]) })
|
||||
case "embed":
|
||||
return r.withLocalStoreLocked(true, func() error { return r.runEmbed(rest[1:]) })
|
||||
case "sql":
|
||||
return r.withLocalStoreLocked(true, func() error { return r.runSQL(rest[1:]) })
|
||||
if boolFlagEnabled(rest[1:], "--unsafe") {
|
||||
return r.withLocalStoreLocked(true, func() error { return r.runSQL(rest[1:]) })
|
||||
}
|
||||
return r.withLocalStoreRead(true, func() error { return r.runSQL(rest[1:]) })
|
||||
case "members":
|
||||
return r.withLocalStoreLocked(true, func() error { return r.runMembers(rest[1:]) })
|
||||
return r.withLocalStoreRead(true, func() error { return r.runMembers(rest[1:]) })
|
||||
case "channels":
|
||||
return r.withLocalStoreLocked(true, func() error { return r.runChannels(rest[1:]) })
|
||||
return r.withLocalStoreRead(true, func() error { return r.runChannels(rest[1:]) })
|
||||
case "status":
|
||||
return r.withLocalStoreReadOnly(func() error { return r.runStatus(rest[1:]) })
|
||||
case "report":
|
||||
return r.withLocalStoreLocked(true, func() error { return r.runReport(rest[1:]) })
|
||||
return r.withLocalStoreRead(true, func() error { return r.runReport(rest[1:]) })
|
||||
case "publish":
|
||||
return r.withServicesAutoLocked(false, false, true, func() error { return r.runPublish(rest[1:]) })
|
||||
case "subscribe":
|
||||
@ -220,6 +223,37 @@ func (r *runtime) withLocalStoreDefaultLocked(autoShareUpdate, lockDB bool, fn f
|
||||
return r.withLocalStoreUpdateLocked(boolShareUpdateMode(autoShareUpdate), lockDB, fn)
|
||||
}
|
||||
|
||||
func (r *runtime) withLocalStoreRead(autoShareUpdate bool, fn func() error) error {
|
||||
return r.withLocalStoreReadUpdate(boolShareUpdateMode(autoShareUpdate), fn)
|
||||
}
|
||||
|
||||
func (r *runtime) withLocalStoreReadUpdate(updateMode shareUpdateMode, fn func() error) error {
|
||||
cfg, err := config.Load(r.configPath)
|
||||
if err != nil {
|
||||
if !errors.Is(err, os.ErrNotExist) {
|
||||
return configErr(err)
|
||||
}
|
||||
cfg = config.Default()
|
||||
if err := cfg.Normalize(); err != nil {
|
||||
return configErr(err)
|
||||
}
|
||||
}
|
||||
if err := config.EnsureRuntimeDirs(cfg); err != nil {
|
||||
return configErr(err)
|
||||
}
|
||||
dbPath, err := config.ExpandPath(cfg.DBPath)
|
||||
if err != nil {
|
||||
return configErr(err)
|
||||
}
|
||||
r.cfg = cfg
|
||||
if r.shouldAutoUpdateShare(updateMode) {
|
||||
if err := r.autoUpdateShareIfLockAvailable(dbPath, updateMode); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return r.openLocalStoreReadOnly(dbPath, fn)
|
||||
}
|
||||
|
||||
func (r *runtime) withLocalStoreUpdateLocked(updateMode shareUpdateMode, lockDB bool, fn func() error) error {
|
||||
cfg, err := config.Load(r.configPath)
|
||||
if err != nil {
|
||||
@ -247,6 +281,38 @@ func (r *runtime) withLocalStoreUpdateLocked(updateMode shareUpdateMode, lockDB
|
||||
return r.openLocalStore(dbPath, updateMode, fn)
|
||||
}
|
||||
|
||||
func (r *runtime) shouldAutoUpdateShare(mode shareUpdateMode) bool {
|
||||
return os.Getenv("DISCRAWL_NO_AUTO_UPDATE") != "1" &&
|
||||
r.cfg.ShareEnabled() &&
|
||||
(mode == shareUpdateForce || mode == shareUpdateAuto || (mode == shareUpdateConfigured && r.cfg.Share.AutoUpdate))
|
||||
}
|
||||
|
||||
func (r *runtime) autoUpdateShareIfLockAvailable(dbPath string, updateMode shareUpdateMode) error {
|
||||
locked, err := r.tryWithSyncLock(func() error {
|
||||
storeFactory := r.openStore
|
||||
if storeFactory == nil {
|
||||
storeFactory = store.Open
|
||||
}
|
||||
var openErr error
|
||||
r.store, openErr = storeFactory(r.ctx, dbPath)
|
||||
if openErr != nil {
|
||||
return dbErr(openErr)
|
||||
}
|
||||
defer func() {
|
||||
_ = r.store.Close()
|
||||
r.store = nil
|
||||
}()
|
||||
return r.autoUpdateShare(updateMode)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !locked {
|
||||
r.logger.Info("share update skipped; sync lock is held")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *runtime) openLocalStore(dbPath string, updateMode shareUpdateMode, fn func() error) error {
|
||||
storeFactory := r.openStore
|
||||
if storeFactory == nil {
|
||||
@ -295,6 +361,50 @@ func (r *runtime) withLocalStoreReadOnly(fn func() error) error {
|
||||
return fn()
|
||||
}
|
||||
|
||||
func (r *runtime) openLocalStoreReadOnly(dbPath string, fn func() error) error {
|
||||
var openErr error
|
||||
r.store, openErr = store.OpenReadOnly(r.ctx, dbPath)
|
||||
if openErr != nil {
|
||||
if errors.Is(openErr, os.ErrNotExist) {
|
||||
storeFactory := r.openStore
|
||||
if storeFactory == nil {
|
||||
storeFactory = store.Open
|
||||
}
|
||||
r.store, openErr = storeFactory(r.ctx, dbPath)
|
||||
if openErr == nil {
|
||||
defer func() { _ = r.store.Close() }()
|
||||
return fn()
|
||||
}
|
||||
}
|
||||
if errors.Is(openErr, store.ErrSchemaVersionMismatch) {
|
||||
if err := r.withSyncLock(func() error {
|
||||
storeFactory := r.openStore
|
||||
if storeFactory == nil {
|
||||
storeFactory = store.Open
|
||||
}
|
||||
var migrateErr error
|
||||
r.store, migrateErr = storeFactory(r.ctx, dbPath)
|
||||
if migrateErr != nil {
|
||||
return dbErr(migrateErr)
|
||||
}
|
||||
closeErr := r.store.Close()
|
||||
r.store = nil
|
||||
return closeErr
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
r.store, openErr = store.OpenReadOnly(r.ctx, dbPath)
|
||||
if openErr == nil {
|
||||
defer func() { _ = r.store.Close() }()
|
||||
return fn()
|
||||
}
|
||||
}
|
||||
return dbErr(openErr)
|
||||
}
|
||||
defer func() { _ = r.store.Close() }()
|
||||
return fn()
|
||||
}
|
||||
|
||||
func (r *runtime) withServicesAuto(withDiscord, autoShareUpdate bool, fn func() error) error {
|
||||
return r.withServicesAutoLocked(withDiscord, autoShareUpdate, false, fn)
|
||||
}
|
||||
|
||||
@ -1053,6 +1053,63 @@ func TestSyncLockSerializesConcurrentRuns(t *testing.T) {
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
}
|
||||
|
||||
func TestReadCommandsDoNotWaitForSyncLock(t *testing.T) {
|
||||
if goruntime.GOOS == "windows" {
|
||||
t.Skip("sync lock timing is flaky on Windows")
|
||||
}
|
||||
ctx := context.Background()
|
||||
dir := t.TempDir()
|
||||
cfg := config.Default()
|
||||
cfg.DBPath = filepath.Join(dir, "discrawl.db")
|
||||
cfgPath := filepath.Join(dir, "config.toml")
|
||||
require.NoError(t, config.Write(cfgPath, cfg))
|
||||
|
||||
s := seedCLIStore(t, cfg.DBPath)
|
||||
require.NoError(t, s.Close())
|
||||
|
||||
firstRelease, err := acquireSyncLock(ctx, filepath.Join(dir, ".discrawl-sync.lock"))
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = firstRelease() }()
|
||||
|
||||
for _, args := range [][]string{
|
||||
{"--config", cfgPath, "search", "automatic"},
|
||||
{"--config", cfgPath, "messages", "--channel", "general", "--last", "1"},
|
||||
{"--config", cfgPath, "sql", "select count(*) as total from messages"},
|
||||
} {
|
||||
runCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
|
||||
var out bytes.Buffer
|
||||
err := Run(runCtx, args, &out, &bytes.Buffer{})
|
||||
cancel()
|
||||
require.NoError(t, err, args)
|
||||
require.NotEmpty(t, out.String(), args)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadCommandsMigrateOlderLocalStore(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dir := t.TempDir()
|
||||
cfg := config.Default()
|
||||
cfg.DBPath = filepath.Join(dir, "discrawl.db")
|
||||
cfgPath := filepath.Join(dir, "config.toml")
|
||||
require.NoError(t, config.Write(cfgPath, cfg))
|
||||
|
||||
s := seedCLIStore(t, cfg.DBPath)
|
||||
_, err := s.DB().ExecContext(ctx, `pragma user_version = 1`)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, s.Close())
|
||||
|
||||
var out bytes.Buffer
|
||||
require.NoError(t, Run(ctx, []string{"--config", cfgPath, "search", "automatic"}, &out, &bytes.Buffer{}))
|
||||
require.Contains(t, out.String(), "automatic updates work")
|
||||
|
||||
reader, err := store.OpenReadOnly(ctx, cfg.DBPath)
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = reader.Close() }()
|
||||
var version int
|
||||
require.NoError(t, reader.DB().QueryRowContext(ctx, `pragma user_version`).Scan(&version))
|
||||
require.Equal(t, 2, version)
|
||||
}
|
||||
|
||||
func seedCLIStore(t *testing.T, path string) *store.Store {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
|
||||
@ -97,6 +97,21 @@ func hasBoolFlag(args []string, name string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func boolFlagEnabled(args []string, name string) bool {
|
||||
for _, arg := range args {
|
||||
if arg == name {
|
||||
return true
|
||||
}
|
||||
if raw, ok := strings.CutPrefix(arg, name+"="); ok {
|
||||
switch strings.ToLower(strings.TrimSpace(raw)) {
|
||||
case "1", "t", "true", "y", "yes", "on":
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func hasHelpArg(args []string) bool {
|
||||
for _, arg := range args {
|
||||
if arg == "help" || arg == "--help" || arg == "-h" {
|
||||
|
||||
@ -34,6 +34,29 @@ func (r *runtime) withSyncLock(fn func() error) error {
|
||||
return fn()
|
||||
}
|
||||
|
||||
func (r *runtime) tryWithSyncLock(fn func() error) (bool, error) {
|
||||
if r.dbLockHeld {
|
||||
return true, fn()
|
||||
}
|
||||
lockPath, err := r.syncLockPath()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
release, locked, err := tryAcquireSyncLock(lockPath)
|
||||
if err != nil || !locked {
|
||||
return locked, err
|
||||
}
|
||||
r.dbLockHeld = true
|
||||
r.lockStarted = r.nowUTC()
|
||||
r.setSyncLockPhase("locked")
|
||||
defer func() {
|
||||
r.dbLockHeld = false
|
||||
r.lockStarted = time.Time{}
|
||||
_ = release()
|
||||
}()
|
||||
return true, fn()
|
||||
}
|
||||
|
||||
func (r *runtime) setSyncLockPhase(phase string) {
|
||||
if !r.dbLockHeld {
|
||||
return
|
||||
|
||||
@ -7,3 +7,7 @@ import "context"
|
||||
func acquireSyncLock(context.Context, string) (func() error, error) {
|
||||
return func() error { return nil }, nil
|
||||
}
|
||||
|
||||
func tryAcquireSyncLock(string) (func() error, bool, error) {
|
||||
return func() error { return nil }, true, nil
|
||||
}
|
||||
|
||||
@ -51,3 +51,29 @@ func acquireSyncLock(ctx context.Context, path string) (func() error, error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func tryAcquireSyncLock(path string) (func() error, bool, error) {
|
||||
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0o600)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("open sync lock: %w", err)
|
||||
}
|
||||
err = unix.Flock(int(file.Fd()), unix.LOCK_EX|unix.LOCK_NB)
|
||||
if err != nil {
|
||||
_ = file.Close()
|
||||
if errors.Is(err, unix.EWOULDBLOCK) || errors.Is(err, unix.EAGAIN) {
|
||||
return nil, false, nil
|
||||
}
|
||||
return nil, false, fmt.Errorf("acquire sync lock: %w", err)
|
||||
}
|
||||
_, _ = file.Seek(0, 0)
|
||||
_ = file.Truncate(0)
|
||||
_, _ = fmt.Fprintf(file, "pid=%d\n", os.Getpid())
|
||||
return func() error {
|
||||
unlockErr := unix.Flock(int(file.Fd()), unix.LOCK_UN)
|
||||
closeErr := file.Close()
|
||||
if unlockErr != nil {
|
||||
return unlockErr
|
||||
}
|
||||
return closeErr
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
@ -49,3 +49,28 @@ func acquireSyncLock(ctx context.Context, path string) (func() error, error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func tryAcquireSyncLock(path string) (func() error, bool, error) {
|
||||
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0o600)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("open sync lock: %w", err)
|
||||
}
|
||||
handle := windows.Handle(file.Fd())
|
||||
overlapped := &windows.Overlapped{}
|
||||
err = windows.LockFileEx(handle, windows.LOCKFILE_EXCLUSIVE_LOCK|windows.LOCKFILE_FAIL_IMMEDIATELY, 0, 1, 0, overlapped)
|
||||
if err != nil {
|
||||
_ = file.Close()
|
||||
return nil, false, nil
|
||||
}
|
||||
_, _ = file.Seek(0, 0)
|
||||
_ = file.Truncate(0)
|
||||
_, _ = fmt.Fprintf(file, "pid=%d\n", os.Getpid())
|
||||
return func() error {
|
||||
unlockErr := windows.UnlockFileEx(handle, 0, 1, 0, overlapped)
|
||||
closeErr := file.Close()
|
||||
if unlockErr != nil {
|
||||
return unlockErr
|
||||
}
|
||||
return closeErr
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@ package store
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"strconv"
|
||||
@ -18,6 +19,8 @@ const (
|
||||
storeSchemaVersion = 2
|
||||
)
|
||||
|
||||
var ErrSchemaVersionMismatch = errors.New("database schema version mismatch")
|
||||
|
||||
type Store struct {
|
||||
db *sql.DB
|
||||
path string
|
||||
@ -135,7 +138,7 @@ func OpenReadOnly(ctx context.Context, path string) (*Store, error) {
|
||||
return nil, err
|
||||
} else if version != storeSchemaVersion {
|
||||
_ = base.Close()
|
||||
return nil, fmt.Errorf("database schema version mismatch: got %d want %d", version, storeSchemaVersion)
|
||||
return nil, fmt.Errorf("%w: got %d want %d", ErrSchemaVersionMismatch, version, storeSchemaVersion)
|
||||
}
|
||||
return store, nil
|
||||
}
|
||||
@ -179,7 +182,7 @@ func (s *Store) migrate(ctx context.Context) error {
|
||||
if version, err := s.schemaVersion(ctx); err != nil {
|
||||
return err
|
||||
} else if version != storeSchemaVersion {
|
||||
return fmt.Errorf("database schema version mismatch: got %d want %d", version, storeSchemaVersion)
|
||||
return fmt.Errorf("%w: got %d want %d", ErrSchemaVersionMismatch, version, storeSchemaVersion)
|
||||
}
|
||||
if err := s.applyQueryIndexMigration(ctx); err != nil {
|
||||
return err
|
||||
|
||||
Loading…
Reference in New Issue
Block a user