fix(cli): serialize discrawl sync runs
This commit is contained in:
parent
28676d38f3
commit
59f42cb0ab
2
go.mod
2
go.mod
@ -7,6 +7,7 @@ require (
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/pelletier/go-toml/v2 v2.3.0
|
||||
github.com/stretchr/testify v1.11.1
|
||||
golang.org/x/sys v0.43.0
|
||||
golang.org/x/text v0.36.0
|
||||
modernc.org/sqlite v1.50.0
|
||||
)
|
||||
@ -22,7 +23,6 @@ require (
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
golang.org/x/crypto v0.50.0 // indirect
|
||||
golang.org/x/sys v0.43.0 // indirect
|
||||
golang.org/x/tools v0.44.0 // indirect
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
|
||||
@ -143,6 +143,12 @@ func (r *runtime) runSync(args []string) error {
|
||||
SkipMembers: syncSkipsMembers(*skipMembers, defaultLatest),
|
||||
LatestOnly: syncLatestOnly(*latestOnly, defaultLatest),
|
||||
}
|
||||
return r.withSyncLock(func() error {
|
||||
return r.runSyncLocked(sources, opts)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *runtime) runSyncLocked(sources syncSources, opts syncer.SyncOptions) error {
|
||||
var apiStats *syncer.SyncStats
|
||||
if sources.discord {
|
||||
shouldClose := r.client == nil
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
goruntime "runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -655,6 +656,33 @@ func TestSyncImportsGitShareBeforeLiveDiscord(t *testing.T) {
|
||||
require.Contains(t, contents, "live discord filled the delta")
|
||||
}
|
||||
|
||||
func TestSyncLockSerializesConcurrentRuns(t *testing.T) {
|
||||
if goruntime.GOOS == "windows" {
|
||||
t.Skip("sync lock is currently a no-op on Windows")
|
||||
}
|
||||
ctx := context.Background()
|
||||
dir := t.TempDir()
|
||||
cfg := config.Default()
|
||||
cfg.DBPath = filepath.Join(dir, "discrawl.db")
|
||||
cfgPath := filepath.Join(dir, "config.toml")
|
||||
require.NoError(t, config.Write(cfgPath, cfg))
|
||||
|
||||
rt := &runtime{
|
||||
ctx: ctx,
|
||||
configPath: cfgPath,
|
||||
cfg: cfg,
|
||||
}
|
||||
firstRelease, err := acquireSyncLock(ctx, filepath.Join(dir, ".discrawl-sync.lock"))
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = firstRelease() }()
|
||||
|
||||
waitCtx, cancel := context.WithTimeout(ctx, 25*time.Millisecond)
|
||||
defer cancel()
|
||||
rt.ctx = waitCtx
|
||||
err = rt.withSyncLock(func() error { return nil })
|
||||
require.ErrorIs(t, err, context.DeadlineExceeded)
|
||||
}
|
||||
|
||||
func seedCLIStore(t *testing.T, path string) *store.Store {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
|
||||
37
internal/cli/sync_lock.go
Normal file
37
internal/cli/sync_lock.go
Normal file
@ -0,0 +1,37 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/steipete/discrawl/internal/config"
|
||||
)
|
||||
|
||||
func (r *runtime) withSyncLock(fn func() error) error {
|
||||
lockPath, err := r.syncLockPath()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
release, err := acquireSyncLock(r.ctx, lockPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = release() }()
|
||||
return fn()
|
||||
}
|
||||
|
||||
func (r *runtime) syncLockPath() (string, error) {
|
||||
dbPath, err := config.ExpandPath(r.cfg.DBPath)
|
||||
if err != nil {
|
||||
return "", configErr(err)
|
||||
}
|
||||
return filepath.Join(filepath.Dir(dbPath), ".discrawl-sync.lock"), nil
|
||||
}
|
||||
|
||||
func syncLockErr(ctx context.Context, path string) error {
|
||||
if ctx.Err() != nil {
|
||||
return fmt.Errorf("wait for sync lock %s: %w", path, ctx.Err())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
9
internal/cli/sync_lock_other.go
Normal file
9
internal/cli/sync_lock_other.go
Normal file
@ -0,0 +1,9 @@
|
||||
//go:build !unix && !windows
|
||||
|
||||
package cli
|
||||
|
||||
import "context"
|
||||
|
||||
func acquireSyncLock(context.Context, string) (func() error, error) {
|
||||
return func() error { return nil }, nil
|
||||
}
|
||||
53
internal/cli/sync_lock_unix.go
Normal file
53
internal/cli/sync_lock_unix.go
Normal file
@ -0,0 +1,53 @@
|
||||
//go:build unix
|
||||
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func acquireSyncLock(ctx context.Context, path string) (func() error, error) {
|
||||
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0o600)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open sync lock: %w", err)
|
||||
}
|
||||
locked := false
|
||||
defer func() {
|
||||
if !locked {
|
||||
_ = file.Close()
|
||||
}
|
||||
}()
|
||||
ticker := time.NewTicker(200 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
err = unix.Flock(int(file.Fd()), unix.LOCK_EX|unix.LOCK_NB)
|
||||
if err == nil {
|
||||
locked = true
|
||||
_, _ = file.Seek(0, 0)
|
||||
_ = file.Truncate(0)
|
||||
_, _ = fmt.Fprintf(file, "pid=%d\n", os.Getpid())
|
||||
return func() error {
|
||||
unlockErr := unix.Flock(int(file.Fd()), unix.LOCK_UN)
|
||||
closeErr := file.Close()
|
||||
if unlockErr != nil {
|
||||
return unlockErr
|
||||
}
|
||||
return closeErr
|
||||
}, nil
|
||||
}
|
||||
if !errors.Is(err, unix.EWOULDBLOCK) && !errors.Is(err, unix.EAGAIN) {
|
||||
return nil, fmt.Errorf("acquire sync lock: %w", err)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, syncLockErr(ctx, path)
|
||||
case <-ticker.C:
|
||||
}
|
||||
}
|
||||
}
|
||||
51
internal/cli/sync_lock_windows.go
Normal file
51
internal/cli/sync_lock_windows.go
Normal file
@ -0,0 +1,51 @@
|
||||
//go:build windows
|
||||
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sys/windows"
|
||||
)
|
||||
|
||||
func acquireSyncLock(ctx context.Context, path string) (func() error, error) {
|
||||
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0o600)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open sync lock: %w", err)
|
||||
}
|
||||
locked := false
|
||||
defer func() {
|
||||
if !locked {
|
||||
_ = file.Close()
|
||||
}
|
||||
}()
|
||||
ticker := time.NewTicker(200 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
handle := windows.Handle(file.Fd())
|
||||
overlapped := &windows.Overlapped{}
|
||||
for {
|
||||
err = windows.LockFileEx(handle, windows.LOCKFILE_EXCLUSIVE_LOCK|windows.LOCKFILE_FAIL_IMMEDIATELY, 0, 1, 0, overlapped)
|
||||
if err == nil {
|
||||
locked = true
|
||||
_, _ = file.Seek(0, 0)
|
||||
_ = file.Truncate(0)
|
||||
_, _ = fmt.Fprintf(file, "pid=%d\n", os.Getpid())
|
||||
return func() error {
|
||||
unlockErr := windows.UnlockFileEx(handle, 0, 1, 0, overlapped)
|
||||
closeErr := file.Close()
|
||||
if unlockErr != nil {
|
||||
return unlockErr
|
||||
}
|
||||
return closeErr
|
||||
}, nil
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, syncLockErr(ctx, path)
|
||||
case <-ticker.C:
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user