From 1808bef68f35981eebef684a9807d4215faa5ecf Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 21 Apr 2026 05:07:39 +0100 Subject: [PATCH] feat: add git-backed archive sharing --- .github/workflows/publish-discord-backup.yml | 54 ++ README.md | 27 + internal/cli/cli.go | 53 ++ internal/cli/cli_test.go | 131 +++++ internal/cli/share_commands.go | 191 +++++++ internal/config/config.go | 28 + internal/config/config_test.go | 6 + internal/share/share.go | 517 +++++++++++++++++++ internal/share/share_test.go | 214 ++++++++ internal/store/store.go | 20 + internal/store/store_test.go | 52 ++ 11 files changed, 1293 insertions(+) create mode 100644 .github/workflows/publish-discord-backup.yml create mode 100644 internal/cli/share_commands.go create mode 100644 internal/share/share.go create mode 100644 internal/share/share_test.go diff --git a/.github/workflows/publish-discord-backup.yml b/.github/workflows/publish-discord-backup.yml new file mode 100644 index 0000000..5f65bbc --- /dev/null +++ b/.github/workflows/publish-discord-backup.yml @@ -0,0 +1,54 @@ +name: publish-discord-backup + +on: + schedule: + - cron: "*/15 * * * *" + workflow_dispatch: + +permissions: + contents: read + +concurrency: + group: publish-discord-backup + cancel-in-progress: false + +jobs: + publish: + runs-on: ubuntu-latest + timeout-minutes: 60 + steps: + - name: Checkout discrawl + uses: actions/checkout@v6.0.2 + + - name: Setup Go + uses: actions/setup-go@v6.3.0 + with: + go-version-file: go.mod + cache: true + + - name: Configure Git identity + run: | + git config --global user.name "discrawl publisher" + git config --global user.email "discrawl-publisher@users.noreply.github.com" + + - name: Sync and publish Discord archive + env: + DISCORD_BOT_TOKEN: ${{ secrets.DISCORD_BOT_TOKEN }} + BACKUP_REMOTE: https://x-access-token:${{ secrets.DISCORD_BACKUP_TOKEN }}@github.com/openclaw/discord-backup.git + CONFIG: ${{ runner.temp }}/discrawl/config.toml + DB: ${{ runner.temp }}/discrawl/discrawl.db + BACKUP_REPO: ${{ runner.temp }}/discord-backup + run: | + mkdir -p "$(dirname "$CONFIG")" + git clone "$BACKUP_REMOTE" "$BACKUP_REPO" + if [ -f "$BACKUP_REPO/manifest.json" ]; then + go run ./cmd/discrawl --config "$CONFIG" subscribe --repo "$BACKUP_REPO" "$BACKUP_REMOTE" + else + go run ./cmd/discrawl --config "$CONFIG" init --db "$DB" + fi + go run ./cmd/discrawl --config "$CONFIG" sync --all + go run ./cmd/discrawl --config "$CONFIG" publish \ + --repo "$BACKUP_REPO" \ + --remote "$BACKUP_REMOTE" \ + --message "sync: discord archive" \ + --push diff --git a/README.md b/README.md index 2133bff..966bc10 100644 --- a/README.md +++ b/README.md @@ -317,6 +317,26 @@ Shows local archive status. discrawl status ``` +### Git-backed sharing + +`discrawl` can publish the SQLite archive as sharded, compressed NDJSON snapshots in a private Git repo, then auto-import that repo before local read commands. + +Publisher: + +```bash +discrawl publish --remote https://github.com/openclaw/discord-backup.git --push +``` + +Subscriber: + +```bash +discrawl subscribe https://github.com/openclaw/discord-backup.git +discrawl search "launch checklist" +discrawl messages --channel general --hours 24 +``` + +Once `share.remote` is configured, read commands auto-fetch and import when the local share import is older than `share.stale_after` (default `15m`). `discrawl update` forces the same pull/import step manually. + ### `doctor` Checks config, auth, DB, and FTS wiring. @@ -360,6 +380,13 @@ provider = "openai" model = "text-embedding-3-small" api_key_env = "OPENAI_API_KEY" batch_size = 64 + +[share] +remote = "" +repo_path = "~/.discrawl/share" +branch = "main" +auto_update = true +stale_after = "15m" ``` The value above is an example. `init` writes an auto-sized default based on the host: `min(32, max(8, GOMAXPROCS*2))`. diff --git a/internal/cli/cli.go b/internal/cli/cli.go index a49c711..65361ec 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -12,6 +12,7 @@ import ( "github.com/bwmarrin/discordgo" "github.com/steipete/discrawl/internal/config" "github.com/steipete/discrawl/internal/discord" + "github.com/steipete/discrawl/internal/share" "github.com/steipete/discrawl/internal/store" "github.com/steipete/discrawl/internal/syncer" ) @@ -136,6 +137,12 @@ func (r *runtime) dispatch(rest []string) error { return r.withServices(false, func() error { return r.runChannels(rest[1:]) }) case "status": return r.withServices(false, func() error { return r.runStatus(rest[1:]) }) + case "publish": + return r.withServicesAuto(false, false, func() error { return r.runPublish(rest[1:]) }) + case "subscribe": + return r.runSubscribe(rest[1:]) + case "update": + return r.withServicesAuto(false, false, func() error { return r.runUpdate(rest[1:]) }) case "doctor": return r.runDoctor(rest[1:]) default: @@ -144,6 +151,10 @@ func (r *runtime) dispatch(rest []string) error { } func (r *runtime) withServices(withDiscord bool, fn func() error) error { + return r.withServicesAuto(withDiscord, !withDiscord, fn) +} + +func (r *runtime) withServicesAuto(withDiscord, autoShareUpdate bool, fn func() error) error { cfg, err := config.Load(r.configPath) if err != nil { return configErr(err) @@ -165,6 +176,11 @@ func (r *runtime) withServices(withDiscord bool, fn func() error) error { return dbErr(err) } defer func() { _ = r.store.Close() }() + if autoShareUpdate { + if err := r.autoUpdateShare(); err != nil { + return err + } + } if withDiscord { discordFactory := r.newDiscord if discordFactory == nil { @@ -194,3 +210,40 @@ func (r *runtime) withServices(withDiscord bool, fn func() error) error { } return fn() } + +func (r *runtime) autoUpdateShare() error { + if !r.cfg.ShareEnabled() || !r.cfg.Share.AutoUpdate { + return nil + } + staleAfter, err := time.ParseDuration(r.cfg.Share.StaleAfter) + if err != nil { + return configErr(fmt.Errorf("invalid share.stale_after: %w", err)) + } + if !share.NeedsImport(r.ctx, r.store, staleAfter) { + return nil + } + opts, err := r.shareOptions() + if err != nil { + return err + } + if err := share.Pull(r.ctx, opts); err != nil { + return err + } + _, err = share.Import(r.ctx, r.store, opts) + if errors.Is(err, share.ErrNoManifest) { + return nil + } + return err +} + +func (r *runtime) shareOptions() (share.Options, error) { + repoPath, err := config.ExpandPath(r.cfg.Share.RepoPath) + if err != nil { + return share.Options{}, configErr(err) + } + return share.Options{ + RepoPath: repoPath, + Remote: r.cfg.Share.Remote, + Branch: r.cfg.Share.Branch, + }, nil +} diff --git a/internal/cli/cli_test.go b/internal/cli/cli_test.go index ef913b8..f7c2826 100644 --- a/internal/cli/cli_test.go +++ b/internal/cli/cli_test.go @@ -5,6 +5,7 @@ import ( "context" "log/slog" "os" + "os/exec" "path/filepath" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/steipete/discrawl/internal/config" discordclient "github.com/steipete/discrawl/internal/discord" + "github.com/steipete/discrawl/internal/share" "github.com/steipete/discrawl/internal/store" "github.com/steipete/discrawl/internal/syncer" ) @@ -131,6 +133,135 @@ func TestStatusSearchSQLAndListings(t *testing.T) { } } +func TestReadCommandsAutoImportStaleShare(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + sourceDB := filepath.Join(dir, "source.db") + source := seedCLIStore(t, sourceDB) + defer func() { _ = source.Close() }() + + workRepo := filepath.Join(dir, "work") + remoteRepo := filepath.Join(dir, "remote.git") + opts := share.Options{RepoPath: workRepo, Branch: "main"} + _, err := share.Export(ctx, source, opts) + require.NoError(t, err) + runGit(t, workRepo, "config", "user.name", "discrawl test") + runGit(t, workRepo, "config", "user.email", "discrawl@example.com") + committed, err := share.Commit(ctx, opts, "test: snapshot") + require.NoError(t, err) + require.True(t, committed) + runGit(t, dir, "init", "--bare", remoteRepo) + runGit(t, workRepo, "remote", "add", "origin", remoteRepo) + runGit(t, workRepo, "push", "-u", "origin", "main") + + cfgPath := filepath.Join(dir, "config.toml") + cfg := config.Default() + cfg.DBPath = filepath.Join(dir, "reader.db") + cfg.Share.Remote = remoteRepo + cfg.Share.RepoPath = filepath.Join(dir, "reader-share") + cfg.Share.StaleAfter = "15m" + cfg.Share.AutoUpdate = true + require.NoError(t, config.Write(cfgPath, cfg)) + + var out bytes.Buffer + require.NoError(t, Run(ctx, []string{"--config", cfgPath, "search", "automatic"}, &out, &bytes.Buffer{})) + require.Contains(t, out.String(), "automatic updates work") + + reader, err := store.Open(ctx, cfg.DBPath) + require.NoError(t, err) + defer func() { _ = reader.Close() }() + lastImport, err := reader.GetSyncState(ctx, share.LastImportSyncScope) + require.NoError(t, err) + require.NotEmpty(t, lastImport) +} + +func TestShareCommandsPublishSubscribeAndUpdate(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + remoteRepo := filepath.Join(dir, "remote.git") + runGit(t, dir, "init", "--bare", remoteRepo) + + cfgPath := filepath.Join(dir, "config.toml") + cfg := config.Default() + cfg.DBPath = filepath.Join(dir, "publisher.db") + cfg.Share.Remote = remoteRepo + cfg.Share.RepoPath = filepath.Join(dir, "publisher-share") + require.NoError(t, config.Write(cfgPath, cfg)) + publisher := seedCLIStore(t, cfg.DBPath) + require.NoError(t, publisher.Close()) + + var out bytes.Buffer + require.NoError(t, Run(ctx, []string{ + "--config", cfgPath, + "publish", + "--repo", cfg.Share.RepoPath, + "--remote", remoteRepo, + "--no-commit", + }, &out, &bytes.Buffer{})) + require.FileExists(t, filepath.Join(cfg.Share.RepoPath, share.ManifestName)) + + runGit(t, cfg.Share.RepoPath, "config", "user.name", "discrawl test") + runGit(t, cfg.Share.RepoPath, "config", "user.email", "discrawl@example.com") + committed, err := share.Commit(ctx, share.Options{RepoPath: cfg.Share.RepoPath, Remote: remoteRepo, Branch: "main"}, "test: snapshot") + require.NoError(t, err) + require.True(t, committed) + require.NoError(t, share.Push(ctx, share.Options{RepoPath: cfg.Share.RepoPath, Remote: remoteRepo, Branch: "main"})) + + readerCfgPath := filepath.Join(dir, "reader.toml") + require.NoError(t, Run(ctx, []string{ + "--config", readerCfgPath, + "subscribe", + "--repo", filepath.Join(dir, "reader-share"), + "--no-import", + remoteRepo, + }, &bytes.Buffer{}, &bytes.Buffer{})) + readerCfg, err := config.Load(readerCfgPath) + require.NoError(t, err) + require.Equal(t, remoteRepo, readerCfg.Share.Remote) + + readerCfg.DBPath = filepath.Join(dir, "reader.db") + require.NoError(t, config.Write(readerCfgPath, readerCfg)) + out.Reset() + require.NoError(t, Run(ctx, []string{"--config", readerCfgPath, "update"}, &out, &bytes.Buffer{})) + require.Contains(t, out.String(), "generated_at") + out.Reset() + require.NoError(t, Run(ctx, []string{"--config", readerCfgPath, "search", "automatic"}, &out, &bytes.Buffer{})) + require.Contains(t, out.String(), "automatic updates work") +} + +func seedCLIStore(t *testing.T, path string) *store.Store { + t.Helper() + ctx := context.Background() + s, err := store.Open(ctx, path) + require.NoError(t, err) + now := time.Now().UTC() + require.NoError(t, s.UpsertGuild(ctx, store.GuildRecord{ID: "g1", Name: "Guild", RawJSON: `{}`})) + require.NoError(t, s.UpsertChannel(ctx, store.ChannelRecord{ID: "c1", GuildID: "g1", Kind: "text", Name: "general", RawJSON: `{}`})) + require.NoError(t, s.UpsertMessage(ctx, store.MessageRecord{ + ID: "m100", + GuildID: "g1", + ChannelID: "c1", + ChannelName: "general", + AuthorID: "u1", + AuthorName: "Peter", + MessageType: 0, + CreatedAt: now.Format(time.RFC3339Nano), + Content: "automatic updates work", + NormalizedContent: "automatic updates work", + RawJSON: `{}`, + })) + return s +} + +func runGit(t *testing.T, dir string, args ...string) { + t.Helper() + // #nosec G204 -- fixed git argv in test setup. + cmd := exec.Command("git", args...) + cmd.Dir = dir + out, err := cmd.CombinedOutput() + require.NoError(t, err, string(out)) +} + type fakeDiscordClient struct { guilds []*discordgo.UserGuild self *discordgo.User diff --git a/internal/cli/share_commands.go b/internal/cli/share_commands.go new file mode 100644 index 0000000..5f2162c --- /dev/null +++ b/internal/cli/share_commands.go @@ -0,0 +1,191 @@ +package cli + +import ( + "flag" + "fmt" + "io" + "os" + + "github.com/steipete/discrawl/internal/config" + "github.com/steipete/discrawl/internal/share" + "github.com/steipete/discrawl/internal/store" +) + +const defaultShareRemote = "https://github.com/openclaw/discord-backup.git" + +func (r *runtime) runPublish(args []string) error { + fs := flag.NewFlagSet("publish", flag.ContinueOnError) + fs.SetOutput(io.Discard) + repoPath := fs.String("repo", r.cfg.Share.RepoPath, "") + remote := fs.String("remote", r.cfg.Share.Remote, "") + branch := fs.String("branch", r.cfg.Share.Branch, "") + message := fs.String("message", "", "") + noCommit := fs.Bool("no-commit", false, "") + push := fs.Bool("push", false, "") + if err := fs.Parse(args); err != nil { + return usageErr(err) + } + if fs.NArg() != 0 { + return usageErr(fmt.Errorf("publish takes no positional arguments")) + } + opts, err := shareOptionsFromFlags(*repoPath, *remote, *branch) + if err != nil { + return err + } + manifest, err := share.Export(r.ctx, r.store, opts) + if err != nil { + return err + } + committed := false + if !*noCommit { + msg := *message + if msg == "" { + msg = "sync: discord archive" + } + committed, err = share.Commit(r.ctx, opts, msg) + if err != nil { + return err + } + } + if *push { + if err := share.Push(r.ctx, opts); err != nil { + return err + } + } + return r.print(map[string]any{ + "repo_path": opts.RepoPath, + "remote": opts.Remote, + "generated_at": manifest.GeneratedAt, + "tables": manifest.Tables, + "committed": committed, + "pushed": *push, + }) +} + +func (r *runtime) runSubscribe(args []string) error { + fs := flag.NewFlagSet("subscribe", flag.ContinueOnError) + fs.SetOutput(io.Discard) + repoPath := fs.String("repo", "", "") + branch := fs.String("branch", "main", "") + noImport := fs.Bool("no-import", false, "") + if err := fs.Parse(args); err != nil { + return usageErr(err) + } + remote := defaultShareRemote + if fs.NArg() > 1 { + return usageErr(fmt.Errorf("subscribe takes at most one remote")) + } + if fs.NArg() == 1 { + remote = fs.Arg(0) + } + cfg, err := loadConfigOrDefault(r.configPath) + if err != nil { + return err + } + if *repoPath != "" { + cfg.Share.RepoPath = *repoPath + } + cfg.Share.Remote = remote + cfg.Share.Branch = *branch + cfg.Share.AutoUpdate = true + if cfg.Share.StaleAfter == "" { + cfg.Share.StaleAfter = "15m" + } + if err := config.Write(r.configPath, cfg); err != nil { + return configErr(err) + } + if *noImport { + return r.print(map[string]any{"config_path": r.configPath, "remote": remote, "repo_path": cfg.Share.RepoPath}) + } + if err := config.EnsureRuntimeDirs(cfg); err != nil { + return configErr(err) + } + dbPath, err := config.ExpandPath(cfg.DBPath) + if err != nil { + return configErr(err) + } + s, err := store.Open(r.ctx, dbPath) + if err != nil { + return dbErr(err) + } + defer func() { _ = s.Close() }() + expandedRepo, err := config.ExpandPath(cfg.Share.RepoPath) + if err != nil { + return configErr(err) + } + opts := share.Options{RepoPath: expandedRepo, Remote: cfg.Share.Remote, Branch: cfg.Share.Branch} + if err := share.Pull(r.ctx, opts); err != nil { + return err + } + manifest, err := share.Import(r.ctx, s, opts) + if err != nil { + return err + } + return r.print(map[string]any{ + "config_path": r.configPath, + "repo_path": opts.RepoPath, + "remote": opts.Remote, + "generated_at": manifest.GeneratedAt, + "tables": manifest.Tables, + }) +} + +func (r *runtime) runUpdate(args []string) error { + fs := flag.NewFlagSet("update", flag.ContinueOnError) + fs.SetOutput(io.Discard) + repoPath := fs.String("repo", r.cfg.Share.RepoPath, "") + remote := fs.String("remote", r.cfg.Share.Remote, "") + branch := fs.String("branch", r.cfg.Share.Branch, "") + if err := fs.Parse(args); err != nil { + return usageErr(err) + } + if fs.NArg() != 0 { + return usageErr(fmt.Errorf("update takes no positional arguments")) + } + opts, err := shareOptionsFromFlags(*repoPath, *remote, *branch) + if err != nil { + return err + } + if err := share.Pull(r.ctx, opts); err != nil { + return err + } + manifest, err := share.Import(r.ctx, r.store, opts) + if err != nil { + return err + } + return r.print(map[string]any{ + "repo_path": opts.RepoPath, + "remote": opts.Remote, + "generated_at": manifest.GeneratedAt, + "tables": manifest.Tables, + }) +} + +func shareOptionsFromFlags(repoPath, remote, branch string) (share.Options, error) { + expandedRepo, err := config.ExpandPath(repoPath) + if err != nil { + return share.Options{}, configErr(err) + } + if remote == "" { + remote = defaultShareRemote + } + if branch == "" { + branch = "main" + } + return share.Options{RepoPath: expandedRepo, Remote: remote, Branch: branch}, nil +} + +func loadConfigOrDefault(path string) (config.Config, error) { + cfg, err := config.Load(path) + if err == nil { + return cfg, nil + } + if !os.IsNotExist(err) { + return config.Config{}, configErr(err) + } + cfg = config.Default() + if err := cfg.Normalize(); err != nil { + return config.Config{}, configErr(err) + } + return cfg, nil +} diff --git a/internal/config/config.go b/internal/config/config.go index 322d3d4..15d5436 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -29,6 +29,7 @@ type Config struct { Discord DiscordConfig `toml:"discord"` Sync SyncConfig `toml:"sync"` Search SearchConfig `toml:"search"` + Share ShareConfig `toml:"share"` } type DiscordConfig struct { @@ -50,6 +51,14 @@ type SearchConfig struct { Embeddings EmbeddingsConfig `toml:"embeddings"` } +type ShareConfig struct { + Remote string `toml:"remote,omitempty"` + RepoPath string `toml:"repo_path,omitempty"` + Branch string `toml:"branch,omitempty"` + AutoUpdate bool `toml:"auto_update"` + StaleAfter string `toml:"stale_after"` +} + type EmbeddingsConfig struct { Enabled bool `toml:"enabled"` Provider string `toml:"provider"` @@ -118,6 +127,12 @@ func Default() Config { BatchSize: 64, }, }, + Share: ShareConfig{ + RepoPath: filepath.Join(base, "share"), + Branch: "main", + AutoUpdate: true, + StaleAfter: "15m", + }, } } @@ -236,6 +251,15 @@ func (c *Config) Normalize() error { if c.Search.Embeddings.BatchSize <= 0 { c.Search.Embeddings.BatchSize = 64 } + if c.Share.RepoPath == "" { + c.Share.RepoPath = Default().Share.RepoPath + } + if c.Share.Branch == "" { + c.Share.Branch = "main" + } + if c.Share.StaleAfter == "" { + c.Share.StaleAfter = "15m" + } c.GuildIDs = uniqueStrings(c.GuildIDs) return nil } @@ -258,6 +282,10 @@ func (c Config) AttachmentTextEnabled() bool { return c.Sync.AttachmentText == nil || *c.Sync.AttachmentText } +func (c Config) ShareEnabled() bool { + return strings.TrimSpace(c.Share.Remote) != "" +} + func EnsureRuntimeDirs(cfg Config) error { paths := []string{cfg.CacheDir, cfg.LogDir, filepath.Dir(cfg.DBPath)} for _, path := range paths { diff --git a/internal/config/config_test.go b/internal/config/config_test.go index cf2442b..a43e3fd 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -23,6 +23,12 @@ func TestNormalizeFillsDefaults(t *testing.T) { require.NotNil(t, cfg.Sync.AttachmentText) require.True(t, *cfg.Sync.AttachmentText) require.Equal(t, "fts", cfg.Search.DefaultMode) + require.Equal(t, "main", cfg.Share.Branch) + require.Equal(t, "15m", cfg.Share.StaleAfter) + require.True(t, Default().Share.AutoUpdate) + require.False(t, cfg.ShareEnabled()) + cfg.Share.Remote = "git@example.com:org/archive.git" + require.True(t, cfg.ShareEnabled()) } func TestResolveDiscordTokenPrefersOpenClaw(t *testing.T) { diff --git a/internal/share/share.go b/internal/share/share.go new file mode 100644 index 0000000..d57e127 --- /dev/null +++ b/internal/share/share.go @@ -0,0 +1,517 @@ +package share + +import ( + "compress/gzip" + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/steipete/discrawl/internal/store" +) + +const ( + ManifestName = "manifest.json" + LastImportSyncScope = "share:last_import_at" +) + +var ErrNoManifest = errors.New("share manifest not found") + +const shardFlushRows = 1024 + +var maxShardBytes int64 = 40 * 1024 * 1024 + +var SnapshotTables = []string{ + "guilds", + "channels", + "members", + "messages", + "message_events", + "message_attachments", + "mention_events", + "sync_state", + "embedding_jobs", +} + +type Options struct { + RepoPath string + Remote string + Branch string +} + +type Manifest struct { + Version int `json:"version"` + GeneratedAt time.Time `json:"generated_at"` + Tables []TableManifest `json:"tables"` + Files map[string]string `json:"files,omitempty"` +} + +type TableManifest struct { + Name string `json:"name"` + File string `json:"file,omitempty"` + Files []string `json:"files,omitempty"` + Columns []string `json:"columns"` + Rows int `json:"rows"` +} + +func EnsureRepo(ctx context.Context, opts Options) error { + if strings.TrimSpace(opts.RepoPath) == "" { + return fmt.Errorf("share repo path is empty") + } + if _, err := os.Stat(filepath.Join(opts.RepoPath, ".git")); err == nil { + return nil + } + if strings.TrimSpace(opts.Remote) != "" { + if err := os.MkdirAll(filepath.Dir(opts.RepoPath), 0o755); err != nil { + return fmt.Errorf("mkdir share parent: %w", err) + } + if err := run(ctx, "", "git", "clone", opts.Remote, opts.RepoPath); err != nil { + return err + } + if strings.TrimSpace(opts.Branch) != "" { + if err := run(ctx, opts.RepoPath, "git", "checkout", "-B", opts.Branch); err != nil { + return err + } + } + return nil + } + if err := os.MkdirAll(opts.RepoPath, 0o755); err != nil { + return fmt.Errorf("mkdir share repo: %w", err) + } + if err := run(ctx, opts.RepoPath, "git", "init"); err != nil { + return err + } + if strings.TrimSpace(opts.Branch) != "" { + if err := run(ctx, opts.RepoPath, "git", "checkout", "-B", opts.Branch); err != nil { + return err + } + } + return nil +} + +func Pull(ctx context.Context, opts Options) error { + if strings.TrimSpace(opts.Remote) == "" { + return nil + } + if err := EnsureRepo(ctx, opts); err != nil { + return err + } + if err := run(ctx, opts.RepoPath, "git", "fetch", "--prune", "origin"); err != nil { + return err + } + branch := opts.Branch + if strings.TrimSpace(branch) == "" { + branch = "main" + } + remoteRef := "refs/remotes/origin/" + branch + if _, err := output(ctx, opts.RepoPath, "git", "rev-parse", "--verify", remoteRef); err != nil { + return run(ctx, opts.RepoPath, "git", "checkout", "-B", branch) + } + if err := run(ctx, opts.RepoPath, "git", "checkout", "-B", branch, "origin/"+branch); err != nil { + return err + } + return run(ctx, opts.RepoPath, "git", "pull", "--ff-only", "origin", branch) +} + +func Commit(ctx context.Context, opts Options, message string) (bool, error) { + if err := run(ctx, opts.RepoPath, "git", "add", "."); err != nil { + return false, err + } + out, err := output(ctx, opts.RepoPath, "git", "status", "--porcelain") + if err != nil { + return false, err + } + if strings.TrimSpace(out) == "" { + return false, nil + } + if strings.TrimSpace(message) == "" { + message = "sync: discord archive" + } + if err := run(ctx, opts.RepoPath, "git", "commit", "-m", message); err != nil { + return false, err + } + return true, nil +} + +func Push(ctx context.Context, opts Options) error { + branch := opts.Branch + if strings.TrimSpace(branch) == "" { + branch = "main" + } + return run(ctx, opts.RepoPath, "git", "push", "-u", "origin", branch) +} + +func Export(ctx context.Context, s *store.Store, opts Options) (Manifest, error) { + if err := EnsureRepo(ctx, opts); err != nil { + return Manifest{}, err + } + dataDir := filepath.Join(opts.RepoPath, "tables") + if err := os.RemoveAll(dataDir); err != nil { + return Manifest{}, fmt.Errorf("reset tables dir: %w", err) + } + if err := os.MkdirAll(dataDir, 0o755); err != nil { + return Manifest{}, fmt.Errorf("mkdir tables dir: %w", err) + } + manifest := Manifest{ + Version: 1, + GeneratedAt: time.Now().UTC(), + Files: map[string]string{"manifest": ManifestName}, + } + for _, table := range SnapshotTables { + entry, err := exportTable(ctx, s.DB(), dataDir, table) + if err != nil { + return Manifest{}, err + } + manifest.Tables = append(manifest.Tables, entry) + } + body, err := json.MarshalIndent(manifest, "", " ") + if err != nil { + return Manifest{}, err + } + body = append(body, '\n') + if err := os.WriteFile(filepath.Join(opts.RepoPath, ManifestName), body, 0o600); err != nil { + return Manifest{}, fmt.Errorf("write manifest: %w", err) + } + return manifest, nil +} + +func Import(ctx context.Context, s *store.Store, opts Options) (Manifest, error) { + manifest, err := ReadManifest(opts.RepoPath) + if err != nil { + return Manifest{}, err + } + tx, err := s.DB().BeginTx(ctx, nil) + if err != nil { + return Manifest{}, err + } + committed := false + defer func() { + if !committed { + _ = tx.Rollback() + } + }() + for _, table := range []string{"message_fts", "member_fts"} { + if _, err := tx.ExecContext(ctx, "delete from "+table); err != nil { + return Manifest{}, fmt.Errorf("clear %s: %w", table, err) + } + } + for i := len(SnapshotTables) - 1; i >= 0; i-- { + table := SnapshotTables[i] + if _, err := tx.ExecContext(ctx, "delete from "+table); err != nil { + return Manifest{}, fmt.Errorf("clear %s: %w", table, err) + } + } + for _, table := range manifest.Tables { + if err := importTable(ctx, tx, opts.RepoPath, table); err != nil { + return Manifest{}, err + } + } + if err := tx.Commit(); err != nil { + return Manifest{}, err + } + committed = true + if err := s.RebuildSearchIndexes(ctx); err != nil { + return Manifest{}, err + } + if err := s.SetSyncState(ctx, LastImportSyncScope, time.Now().UTC().Format(time.RFC3339Nano)); err != nil { + return Manifest{}, err + } + return manifest, nil +} + +func ReadManifest(repoPath string) (Manifest, error) { + data, err := os.ReadFile(filepath.Join(repoPath, ManifestName)) + if err != nil { + if os.IsNotExist(err) { + return Manifest{}, ErrNoManifest + } + return Manifest{}, fmt.Errorf("read share manifest: %w", err) + } + var manifest Manifest + if err := json.Unmarshal(data, &manifest); err != nil { + return Manifest{}, fmt.Errorf("parse share manifest: %w", err) + } + if manifest.Version != 1 { + return Manifest{}, fmt.Errorf("unsupported share manifest version %d", manifest.Version) + } + return manifest, nil +} + +func NeedsImport(ctx context.Context, s *store.Store, staleAfter time.Duration) bool { + if staleAfter <= 0 { + staleAfter = 15 * time.Minute + } + last, err := s.GetSyncState(ctx, LastImportSyncScope) + if err != nil || strings.TrimSpace(last) == "" { + return true + } + t, err := time.Parse(time.RFC3339Nano, last) + if err != nil { + return true + } + return time.Since(t) >= staleAfter +} + +func exportTable(ctx context.Context, db *sql.DB, dataDir, table string) (TableManifest, error) { + query := "select * from " + table + rows, err := db.QueryContext(ctx, query) + if err != nil { + return TableManifest{}, fmt.Errorf("query %s: %w", table, err) + } + defer func() { _ = rows.Close() }() + columns, err := rows.Columns() + if err != nil { + return TableManifest{}, fmt.Errorf("columns %s: %w", table, err) + } + tableDir := filepath.Join(dataDir, table) + if err := os.MkdirAll(tableDir, 0o755); err != nil { + return TableManifest{}, fmt.Errorf("mkdir %s: %w", table, err) + } + writer := tableShardWriter{dataDir: dataDir, table: table} + if err := writer.open(); err != nil { + return TableManifest{}, err + } + defer func() { _ = writer.close() }() + + count := 0 + values := make([]any, len(columns)) + ptrs := make([]any, len(columns)) + for i := range values { + ptrs[i] = &values[i] + } + for rows.Next() { + if err := rows.Scan(ptrs...); err != nil { + return TableManifest{}, fmt.Errorf("scan %s: %w", table, err) + } + row := make(map[string]any, len(columns)) + for i, column := range columns { + row[column] = exportValue(values[i]) + } + body, err := json.Marshal(row) + if err != nil { + return TableManifest{}, fmt.Errorf("marshal %s row: %w", table, err) + } + if err := writer.rotateIfNeeded(); err != nil { + return TableManifest{}, err + } + if _, err := writer.Write(body); err != nil { + return TableManifest{}, fmt.Errorf("write %s row: %w", table, err) + } + if _, err := writer.Write([]byte{'\n'}); err != nil { + return TableManifest{}, fmt.Errorf("write %s newline: %w", table, err) + } + count++ + if err := writer.finishRow(); err != nil { + return TableManifest{}, err + } + } + if err := rows.Err(); err != nil { + return TableManifest{}, fmt.Errorf("iterate %s: %w", table, err) + } + if err := writer.close(); err != nil { + return TableManifest{}, err + } + return TableManifest{Name: table, Files: writer.files, Columns: columns, Rows: count}, nil +} + +func importTable(ctx context.Context, tx *sql.Tx, repoPath string, table TableManifest) error { + files := table.Files + if len(files) == 0 && strings.TrimSpace(table.File) != "" { + files = []string{table.File} + } + if len(files) == 0 { + return fmt.Errorf("manifest table %s has no files", table.Name) + } + stmt, err := tx.PrepareContext(ctx, insertSQL(table.Name, table.Columns)) + if err != nil { + return fmt.Errorf("prepare import %s: %w", table.Name, err) + } + defer func() { _ = stmt.Close() }() + for _, rel := range files { + if err := importTableFile(ctx, stmt, repoPath, table, rel); err != nil { + return err + } + } + return nil +} + +func importTableFile(ctx context.Context, stmt *sql.Stmt, repoPath string, table TableManifest, rel string) error { + path := filepath.Join(repoPath, filepath.FromSlash(rel)) + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("open %s: %w", rel, err) + } + defer func() { _ = file.Close() }() + gz, err := gzip.NewReader(file) + if err != nil { + return fmt.Errorf("read gzip %s: %w", rel, err) + } + defer func() { _ = gz.Close() }() + dec := json.NewDecoder(gz) + dec.UseNumber() + for { + row := map[string]any{} + err := dec.Decode(&row) + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("decode %s: %w", rel, err) + } + values := make([]any, len(table.Columns)) + for i, column := range table.Columns { + values[i] = importValue(row[column]) + } + if _, err := stmt.ExecContext(ctx, values...); err != nil { + return fmt.Errorf("insert %s: %w", table.Name, err) + } + } + return nil +} + +type tableShardWriter struct { + dataDir string + table string + nextShard int + rowsInShard int + files []string + file *os.File + counter *countingWriter + gz *gzip.Writer +} + +func (w *tableShardWriter) open() error { + rel := filepath.ToSlash(filepath.Join("tables", w.table, fmt.Sprintf("%06d.jsonl.gz", w.nextShard))) + path := filepath.Join(w.dataDir, w.table, fmt.Sprintf("%06d.jsonl.gz", w.nextShard)) + file, err := os.Create(path) + if err != nil { + return fmt.Errorf("create %s: %w", rel, err) + } + w.nextShard++ + w.rowsInShard = 0 + w.files = append(w.files, rel) + w.file = file + w.counter = &countingWriter{w: file} + w.gz = gzip.NewWriter(w.counter) + return nil +} + +func (w *tableShardWriter) Write(p []byte) (int, error) { + return w.gz.Write(p) +} + +func (w *tableShardWriter) rotateIfNeeded() error { + if maxShardBytes <= 0 || w.rowsInShard == 0 || w.counter.n < maxShardBytes { + return nil + } + if err := w.close(); err != nil { + return err + } + return w.open() +} + +func (w *tableShardWriter) finishRow() error { + w.rowsInShard++ + if maxShardBytes > 1024*1024 && w.rowsInShard%shardFlushRows != 0 { + return nil + } + if err := w.gz.Flush(); err != nil { + return fmt.Errorf("flush %s shard: %w", w.table, err) + } + return nil +} + +func (w *tableShardWriter) close() error { + var closeErr error + if w.gz != nil { + if err := w.gz.Close(); err != nil { + closeErr = err + } + w.gz = nil + } + if w.file != nil { + if err := w.file.Close(); err != nil && closeErr == nil { + closeErr = err + } + w.file = nil + } + if closeErr != nil { + return fmt.Errorf("close %s shard: %w", w.table, closeErr) + } + return nil +} + +type countingWriter struct { + w io.Writer + n int64 +} + +func (w *countingWriter) Write(p []byte) (int, error) { + n, err := w.w.Write(p) + w.n += int64(n) + return n, err +} + +func exportValue(value any) any { + switch v := value.(type) { + case []byte: + return string(v) + default: + return v + } +} + +func importValue(value any) any { + switch v := value.(type) { + case json.Number: + if i, err := strconv.ParseInt(v.String(), 10, 64); err == nil { + return i + } + if f, err := strconv.ParseFloat(v.String(), 64); err == nil { + return f + } + return v.String() + default: + return v + } +} + +func insertSQL(table string, columns []string) string { + quoted := make([]string, len(columns)) + placeholders := make([]string, len(columns)) + for i, column := range columns { + quoted[i] = quoteIdent(column) + placeholders[i] = "?" + } + return "insert into " + quoteIdent(table) + "(" + strings.Join(quoted, ",") + ") values(" + strings.Join(placeholders, ",") + ")" +} + +func quoteIdent(s string) string { + return `"` + strings.ReplaceAll(s, `"`, `""`) + `"` +} + +func run(ctx context.Context, dir, name string, args ...string) error { + out, err := output(ctx, dir, name, args...) + if err != nil { + return fmt.Errorf("%s %s: %w\n%s", name, strings.Join(args, " "), err, strings.TrimSpace(out)) + } + return nil +} + +func output(ctx context.Context, dir, name string, args ...string) (string, error) { + // #nosec G204 -- discrawl invokes the Git executable with argv, never through a shell. + cmd := exec.CommandContext(ctx, name, args...) + if dir != "" { + cmd.Dir = dir + } + body, err := cmd.CombinedOutput() + return string(body), err +} diff --git a/internal/share/share_test.go b/internal/share/share_test.go new file mode 100644 index 0000000..7f70636 --- /dev/null +++ b/internal/share/share_test.go @@ -0,0 +1,214 @@ +package share + +import ( + "context" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/steipete/discrawl/internal/store" +) + +func TestExportImportRoundTrip(t *testing.T) { + ctx := context.Background() + src := seedStore(t, filepath.Join(t.TempDir(), "src.db")) + defer func() { _ = src.Close() }() + + repo := filepath.Join(t.TempDir(), "share") + manifest, err := Export(ctx, src, Options{RepoPath: repo, Branch: "main"}) + require.NoError(t, err) + require.NotEmpty(t, manifest.Tables) + require.FileExists(t, filepath.Join(repo, ManifestName)) + require.NotEmpty(t, tableEntry(t, manifest, "messages").Files) + + dst, err := store.Open(ctx, filepath.Join(t.TempDir(), "dst.db")) + require.NoError(t, err) + defer func() { _ = dst.Close() }() + + imported, err := Import(ctx, dst, Options{RepoPath: repo, Branch: "main"}) + require.NoError(t, err) + require.Equal(t, manifest.GeneratedAt, imported.GeneratedAt) + + results, err := dst.SearchMessages(ctx, store.SearchOptions{Query: "launch", Limit: 10}) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "m1", results[0].MessageID) + + mentions, err := dst.ListMentions(ctx, store.MentionListOptions{Target: "Ops", Limit: 10}) + require.NoError(t, err) + require.Len(t, mentions, 1) + + lastImport, err := dst.GetSyncState(ctx, LastImportSyncScope) + require.NoError(t, err) + require.NotEmpty(t, lastImport) + require.False(t, NeedsImport(ctx, dst, 15*time.Minute)) +} + +func TestExportShardsLargeTables(t *testing.T) { + ctx := context.Background() + prevMaxShardBytes := maxShardBytes + maxShardBytes = 150 + t.Cleanup(func() { maxShardBytes = prevMaxShardBytes }) + + src := seedStore(t, filepath.Join(t.TempDir(), "src.db")) + defer func() { _ = src.Close() }() + now := time.Now().UTC() + for i := range 25 { + id := "extra-" + strconv.Itoa(i) + require.NoError(t, src.UpsertMessages(ctx, []store.MessageMutation{{ + Record: store.MessageRecord{ + ID: id, + GuildID: "g1", + ChannelID: "c1", + ChannelName: "general", + AuthorID: "u1", + AuthorName: "Peter", + MessageType: 0, + CreatedAt: now.Add(time.Duration(i) * time.Second).Format(time.RFC3339Nano), + Content: strings.Repeat("unique launch shard payload "+id+" ", 8), + NormalizedContent: strings.Repeat("unique launch shard payload "+id+" ", 8), + RawJSON: `{}`, + }, + EventType: "upsert", + PayloadJSON: `{"id":"` + id + `"}`, + Options: store.WriteOptions{AppendEvent: true}, + }})) + } + + repo := filepath.Join(t.TempDir(), "share") + manifest, err := Export(ctx, src, Options{RepoPath: repo, Branch: "main"}) + require.NoError(t, err) + messages := tableEntry(t, manifest, "messages") + require.Greater(t, len(messages.Files), 1) + require.Empty(t, messages.File) + for _, rel := range messages.Files { + info, err := os.Stat(filepath.Join(repo, filepath.FromSlash(rel))) + require.NoError(t, err) + require.Less(t, info.Size(), int64(100*1024*1024)) + } + + dst, err := store.Open(ctx, filepath.Join(t.TempDir(), "dst.db")) + require.NoError(t, err) + defer func() { _ = dst.Close() }() + _, err = Import(ctx, dst, Options{RepoPath: repo, Branch: "main"}) + require.NoError(t, err) + results, err := dst.SearchMessages(ctx, store.SearchOptions{Query: "shard payload", Limit: 50}) + require.NoError(t, err) + require.Len(t, results, 25) +} + +func TestGitCommitDetectsNoChanges(t *testing.T) { + ctx := context.Background() + src := seedStore(t, filepath.Join(t.TempDir(), "src.db")) + defer func() { _ = src.Close() }() + + repo := filepath.Join(t.TempDir(), "share") + opts := Options{RepoPath: repo, Branch: "main"} + _, err := Export(ctx, src, opts) + require.NoError(t, err) + // #nosec G204 -- fixed git argv in test setup. + require.NoError(t, exec.Command("git", "-C", repo, "config", "user.name", "discrawl test").Run()) + // #nosec G204 -- fixed git argv in test setup. + require.NoError(t, exec.Command("git", "-C", repo, "config", "user.email", "discrawl@example.com").Run()) + + committed, err := Commit(ctx, opts, "test: snapshot") + require.NoError(t, err) + require.True(t, committed) + + committed, err = Commit(ctx, opts, "test: snapshot") + require.NoError(t, err) + require.False(t, committed) +} + +func TestPullAndPushWithBareRemote(t *testing.T) { + ctx := context.Background() + src := seedStore(t, filepath.Join(t.TempDir(), "src.db")) + defer func() { _ = src.Close() }() + + dir := t.TempDir() + remote := filepath.Join(dir, "remote.git") + // #nosec G204 -- fixed git argv in test setup. + require.NoError(t, exec.Command("git", "-C", dir, "init", "--bare", remote).Run()) + + publisher := filepath.Join(dir, "publisher") + opts := Options{RepoPath: publisher, Remote: remote, Branch: "main"} + _, err := Export(ctx, src, opts) + require.NoError(t, err) + // #nosec G204 -- fixed git argv in test setup. + require.NoError(t, exec.Command("git", "-C", publisher, "config", "user.name", "discrawl test").Run()) + // #nosec G204 -- fixed git argv in test setup. + require.NoError(t, exec.Command("git", "-C", publisher, "config", "user.email", "discrawl@example.com").Run()) + committed, err := Commit(ctx, opts, "test: snapshot") + require.NoError(t, err) + require.True(t, committed) + require.NoError(t, Push(ctx, opts)) + + subscriber := filepath.Join(dir, "subscriber") + subOpts := Options{RepoPath: subscriber, Remote: remote, Branch: "main"} + require.NoError(t, Pull(ctx, subOpts)) + require.FileExists(t, filepath.Join(subscriber, ManifestName)) +} + +func seedStore(t *testing.T, path string) *store.Store { + t.Helper() + ctx := context.Background() + s, err := store.Open(ctx, path) + require.NoError(t, err) + now := time.Now().UTC() + require.NoError(t, s.UpsertGuild(ctx, store.GuildRecord{ID: "g1", Name: "Guild", RawJSON: `{}`})) + require.NoError(t, s.UpsertChannel(ctx, store.ChannelRecord{ID: "c1", GuildID: "g1", Kind: "text", Name: "general", RawJSON: `{}`})) + require.NoError(t, s.UpsertMember(ctx, store.MemberRecord{ + GuildID: "g1", + UserID: "u1", + Username: "peter", + DisplayName: "Peter", + RoleIDsJSON: `[]`, + RawJSON: `{"bio":"Runs launch ops"}`, + })) + require.NoError(t, s.UpsertMessages(ctx, []store.MessageMutation{{ + Record: store.MessageRecord{ + ID: "m1", + GuildID: "g1", + ChannelID: "c1", + ChannelName: "general", + AuthorID: "u1", + AuthorName: "Peter", + MessageType: 0, + CreatedAt: now.Format(time.RFC3339Nano), + Content: "launch checklist ready", + NormalizedContent: "launch checklist ready", + RawJSON: `{}`, + }, + EventType: "upsert", + PayloadJSON: `{"id":"m1"}`, + Options: store.WriteOptions{AppendEvent: true}, + Mentions: []store.MentionEventRecord{{ + MessageID: "m1", + GuildID: "g1", + ChannelID: "c1", + AuthorID: "u1", + TargetType: "role", + TargetID: "r1", + TargetName: "Ops", + EventAt: now.Format(time.RFC3339Nano), + }}, + }})) + return s +} + +func tableEntry(t *testing.T, manifest Manifest, name string) TableManifest { + t.Helper() + for _, table := range manifest.Tables { + if table.Name == name { + return table + } + } + t.Fatalf("table %s not found", name) + return TableManifest{} +} diff --git a/internal/store/store.go b/internal/store/store.go index 6d2ec62..ecfa97c 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -217,6 +217,26 @@ func (s *Store) migrate(ctx context.Context) error { return nil } +func (s *Store) RebuildSearchIndexes(ctx context.Context) error { + if err := s.rebuildFTS(ctx); err != nil { + return err + } + if err := s.rebuildMemberFTS(ctx); err != nil { + return err + } + now := time.Now().UTC().Format(timeLayout) + if _, err := s.db.ExecContext(ctx, ` + insert into sync_state(scope, cursor, updated_at) + values(?, ?, ?), (?, ?, ?) + on conflict(scope) do update set + cursor=excluded.cursor, + updated_at=excluded.updated_at + `, "schema:message_fts_rowid_version", messageFTSVersion, now, "schema:member_fts_rowid_version", memberFTSVersion, now); err != nil { + return fmt.Errorf("stamp search index versions: %w", err) + } + return nil +} + func (s *Store) schemaVersion(ctx context.Context) (int, error) { var version int if err := s.db.QueryRowContext(ctx, `pragma user_version`).Scan(&version); err != nil { diff --git a/internal/store/store_test.go b/internal/store/store_test.go index 70607a7..35ae5c8 100644 --- a/internal/store/store_test.go +++ b/internal/store/store_test.go @@ -214,6 +214,58 @@ func TestCheckMessageFTSProbe(t *testing.T) { require.NoError(t, s.CheckMessageFTS(ctx)) } +func TestRebuildSearchIndexesAndGuildCounts(t *testing.T) { + t.Parallel() + + ctx := context.Background() + s, err := Open(ctx, filepath.Join(t.TempDir(), "discrawl.db")) + require.NoError(t, err) + defer func() { _ = s.Close() }() + + require.NoError(t, s.UpsertGuild(ctx, GuildRecord{ID: "g1", Name: "Guild", RawJSON: `{}`})) + require.NoError(t, s.UpsertChannel(ctx, ChannelRecord{ID: "c1", GuildID: "g1", Kind: "text", Name: "general", RawJSON: `{}`})) + require.NoError(t, s.UpsertMember(ctx, MemberRecord{ + GuildID: "g1", + UserID: "u1", + Username: "peter", + DisplayName: "Peter", + RoleIDsJSON: `[]`, + RawJSON: `{"bio":"Searchable profile"}`, + })) + require.NoError(t, s.UpsertMessage(ctx, MessageRecord{ + ID: "1458939673664684210", + GuildID: "g1", + ChannelID: "c1", + ChannelName: "general", + AuthorID: "u1", + AuthorName: "Peter", + MessageType: 0, + CreatedAt: time.Now().UTC().Format(time.RFC3339Nano), + Content: "rebuildable launch text", + NormalizedContent: "rebuildable launch text", + RawJSON: `{}`, + })) + + _, err = s.DB().ExecContext(ctx, `delete from message_fts`) + require.NoError(t, err) + _, err = s.DB().ExecContext(ctx, `delete from member_fts`) + require.NoError(t, err) + require.NoError(t, s.RebuildSearchIndexes(ctx)) + + results, err := s.SearchMessages(ctx, SearchOptions{Query: "rebuildable", Limit: 10}) + require.NoError(t, err) + require.Len(t, results, 1) + members, err := s.Members(ctx, "g1", "Searchable", 10) + require.NoError(t, err) + require.Len(t, members, 1) + channelCount, err := s.GuildChannelCount(ctx, "g1") + require.NoError(t, err) + require.Equal(t, 1, channelCount) + memberCount, err := s.GuildMemberCount(ctx, "g1") + require.NoError(t, err) + require.Equal(t, 1, memberCount) +} + func TestOpenSetsSchemaVersion(t *testing.T) { t.Parallel()