feat: add git-backed archive sharing

This commit is contained in:
Peter Steinberger 2026-04-21 05:07:39 +01:00
parent 5698de3d58
commit 1808bef68f
No known key found for this signature in database
11 changed files with 1293 additions and 0 deletions

View File

@ -0,0 +1,54 @@
name: publish-discord-backup
on:
schedule:
- cron: "*/15 * * * *"
workflow_dispatch:
permissions:
contents: read
concurrency:
group: publish-discord-backup
cancel-in-progress: false
jobs:
publish:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- name: Checkout discrawl
uses: actions/checkout@v6.0.2
- name: Setup Go
uses: actions/setup-go@v6.3.0
with:
go-version-file: go.mod
cache: true
- name: Configure Git identity
run: |
git config --global user.name "discrawl publisher"
git config --global user.email "discrawl-publisher@users.noreply.github.com"
- name: Sync and publish Discord archive
env:
DISCORD_BOT_TOKEN: ${{ secrets.DISCORD_BOT_TOKEN }}
BACKUP_REMOTE: https://x-access-token:${{ secrets.DISCORD_BACKUP_TOKEN }}@github.com/openclaw/discord-backup.git
CONFIG: ${{ runner.temp }}/discrawl/config.toml
DB: ${{ runner.temp }}/discrawl/discrawl.db
BACKUP_REPO: ${{ runner.temp }}/discord-backup
run: |
mkdir -p "$(dirname "$CONFIG")"
git clone "$BACKUP_REMOTE" "$BACKUP_REPO"
if [ -f "$BACKUP_REPO/manifest.json" ]; then
go run ./cmd/discrawl --config "$CONFIG" subscribe --repo "$BACKUP_REPO" "$BACKUP_REMOTE"
else
go run ./cmd/discrawl --config "$CONFIG" init --db "$DB"
fi
go run ./cmd/discrawl --config "$CONFIG" sync --all
go run ./cmd/discrawl --config "$CONFIG" publish \
--repo "$BACKUP_REPO" \
--remote "$BACKUP_REMOTE" \
--message "sync: discord archive" \
--push

View File

@ -317,6 +317,26 @@ Shows local archive status.
discrawl status
```
### Git-backed sharing
`discrawl` can publish the SQLite archive as sharded, compressed NDJSON snapshots in a private Git repo, then auto-import that repo before local read commands.
Publisher:
```bash
discrawl publish --remote https://github.com/openclaw/discord-backup.git --push
```
Subscriber:
```bash
discrawl subscribe https://github.com/openclaw/discord-backup.git
discrawl search "launch checklist"
discrawl messages --channel general --hours 24
```
Once `share.remote` is configured, read commands auto-fetch and import when the local share import is older than `share.stale_after` (default `15m`). `discrawl update` forces the same pull/import step manually.
### `doctor`
Checks config, auth, DB, and FTS wiring.
@ -360,6 +380,13 @@ provider = "openai"
model = "text-embedding-3-small"
api_key_env = "OPENAI_API_KEY"
batch_size = 64
[share]
remote = ""
repo_path = "~/.discrawl/share"
branch = "main"
auto_update = true
stale_after = "15m"
```
The value above is an example. `init` writes an auto-sized default based on the host: `min(32, max(8, GOMAXPROCS*2))`.

View File

@ -12,6 +12,7 @@ import (
"github.com/bwmarrin/discordgo"
"github.com/steipete/discrawl/internal/config"
"github.com/steipete/discrawl/internal/discord"
"github.com/steipete/discrawl/internal/share"
"github.com/steipete/discrawl/internal/store"
"github.com/steipete/discrawl/internal/syncer"
)
@ -136,6 +137,12 @@ func (r *runtime) dispatch(rest []string) error {
return r.withServices(false, func() error { return r.runChannels(rest[1:]) })
case "status":
return r.withServices(false, func() error { return r.runStatus(rest[1:]) })
case "publish":
return r.withServicesAuto(false, false, func() error { return r.runPublish(rest[1:]) })
case "subscribe":
return r.runSubscribe(rest[1:])
case "update":
return r.withServicesAuto(false, false, func() error { return r.runUpdate(rest[1:]) })
case "doctor":
return r.runDoctor(rest[1:])
default:
@ -144,6 +151,10 @@ func (r *runtime) dispatch(rest []string) error {
}
func (r *runtime) withServices(withDiscord bool, fn func() error) error {
return r.withServicesAuto(withDiscord, !withDiscord, fn)
}
func (r *runtime) withServicesAuto(withDiscord, autoShareUpdate bool, fn func() error) error {
cfg, err := config.Load(r.configPath)
if err != nil {
return configErr(err)
@ -165,6 +176,11 @@ func (r *runtime) withServices(withDiscord bool, fn func() error) error {
return dbErr(err)
}
defer func() { _ = r.store.Close() }()
if autoShareUpdate {
if err := r.autoUpdateShare(); err != nil {
return err
}
}
if withDiscord {
discordFactory := r.newDiscord
if discordFactory == nil {
@ -194,3 +210,40 @@ func (r *runtime) withServices(withDiscord bool, fn func() error) error {
}
return fn()
}
func (r *runtime) autoUpdateShare() error {
if !r.cfg.ShareEnabled() || !r.cfg.Share.AutoUpdate {
return nil
}
staleAfter, err := time.ParseDuration(r.cfg.Share.StaleAfter)
if err != nil {
return configErr(fmt.Errorf("invalid share.stale_after: %w", err))
}
if !share.NeedsImport(r.ctx, r.store, staleAfter) {
return nil
}
opts, err := r.shareOptions()
if err != nil {
return err
}
if err := share.Pull(r.ctx, opts); err != nil {
return err
}
_, err = share.Import(r.ctx, r.store, opts)
if errors.Is(err, share.ErrNoManifest) {
return nil
}
return err
}
func (r *runtime) shareOptions() (share.Options, error) {
repoPath, err := config.ExpandPath(r.cfg.Share.RepoPath)
if err != nil {
return share.Options{}, configErr(err)
}
return share.Options{
RepoPath: repoPath,
Remote: r.cfg.Share.Remote,
Branch: r.cfg.Share.Branch,
}, nil
}

View File

@ -5,6 +5,7 @@ import (
"context"
"log/slog"
"os"
"os/exec"
"path/filepath"
"testing"
"time"
@ -14,6 +15,7 @@ import (
"github.com/steipete/discrawl/internal/config"
discordclient "github.com/steipete/discrawl/internal/discord"
"github.com/steipete/discrawl/internal/share"
"github.com/steipete/discrawl/internal/store"
"github.com/steipete/discrawl/internal/syncer"
)
@ -131,6 +133,135 @@ func TestStatusSearchSQLAndListings(t *testing.T) {
}
}
func TestReadCommandsAutoImportStaleShare(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
sourceDB := filepath.Join(dir, "source.db")
source := seedCLIStore(t, sourceDB)
defer func() { _ = source.Close() }()
workRepo := filepath.Join(dir, "work")
remoteRepo := filepath.Join(dir, "remote.git")
opts := share.Options{RepoPath: workRepo, Branch: "main"}
_, err := share.Export(ctx, source, opts)
require.NoError(t, err)
runGit(t, workRepo, "config", "user.name", "discrawl test")
runGit(t, workRepo, "config", "user.email", "discrawl@example.com")
committed, err := share.Commit(ctx, opts, "test: snapshot")
require.NoError(t, err)
require.True(t, committed)
runGit(t, dir, "init", "--bare", remoteRepo)
runGit(t, workRepo, "remote", "add", "origin", remoteRepo)
runGit(t, workRepo, "push", "-u", "origin", "main")
cfgPath := filepath.Join(dir, "config.toml")
cfg := config.Default()
cfg.DBPath = filepath.Join(dir, "reader.db")
cfg.Share.Remote = remoteRepo
cfg.Share.RepoPath = filepath.Join(dir, "reader-share")
cfg.Share.StaleAfter = "15m"
cfg.Share.AutoUpdate = true
require.NoError(t, config.Write(cfgPath, cfg))
var out bytes.Buffer
require.NoError(t, Run(ctx, []string{"--config", cfgPath, "search", "automatic"}, &out, &bytes.Buffer{}))
require.Contains(t, out.String(), "automatic updates work")
reader, err := store.Open(ctx, cfg.DBPath)
require.NoError(t, err)
defer func() { _ = reader.Close() }()
lastImport, err := reader.GetSyncState(ctx, share.LastImportSyncScope)
require.NoError(t, err)
require.NotEmpty(t, lastImport)
}
func TestShareCommandsPublishSubscribeAndUpdate(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
remoteRepo := filepath.Join(dir, "remote.git")
runGit(t, dir, "init", "--bare", remoteRepo)
cfgPath := filepath.Join(dir, "config.toml")
cfg := config.Default()
cfg.DBPath = filepath.Join(dir, "publisher.db")
cfg.Share.Remote = remoteRepo
cfg.Share.RepoPath = filepath.Join(dir, "publisher-share")
require.NoError(t, config.Write(cfgPath, cfg))
publisher := seedCLIStore(t, cfg.DBPath)
require.NoError(t, publisher.Close())
var out bytes.Buffer
require.NoError(t, Run(ctx, []string{
"--config", cfgPath,
"publish",
"--repo", cfg.Share.RepoPath,
"--remote", remoteRepo,
"--no-commit",
}, &out, &bytes.Buffer{}))
require.FileExists(t, filepath.Join(cfg.Share.RepoPath, share.ManifestName))
runGit(t, cfg.Share.RepoPath, "config", "user.name", "discrawl test")
runGit(t, cfg.Share.RepoPath, "config", "user.email", "discrawl@example.com")
committed, err := share.Commit(ctx, share.Options{RepoPath: cfg.Share.RepoPath, Remote: remoteRepo, Branch: "main"}, "test: snapshot")
require.NoError(t, err)
require.True(t, committed)
require.NoError(t, share.Push(ctx, share.Options{RepoPath: cfg.Share.RepoPath, Remote: remoteRepo, Branch: "main"}))
readerCfgPath := filepath.Join(dir, "reader.toml")
require.NoError(t, Run(ctx, []string{
"--config", readerCfgPath,
"subscribe",
"--repo", filepath.Join(dir, "reader-share"),
"--no-import",
remoteRepo,
}, &bytes.Buffer{}, &bytes.Buffer{}))
readerCfg, err := config.Load(readerCfgPath)
require.NoError(t, err)
require.Equal(t, remoteRepo, readerCfg.Share.Remote)
readerCfg.DBPath = filepath.Join(dir, "reader.db")
require.NoError(t, config.Write(readerCfgPath, readerCfg))
out.Reset()
require.NoError(t, Run(ctx, []string{"--config", readerCfgPath, "update"}, &out, &bytes.Buffer{}))
require.Contains(t, out.String(), "generated_at")
out.Reset()
require.NoError(t, Run(ctx, []string{"--config", readerCfgPath, "search", "automatic"}, &out, &bytes.Buffer{}))
require.Contains(t, out.String(), "automatic updates work")
}
func seedCLIStore(t *testing.T, path string) *store.Store {
t.Helper()
ctx := context.Background()
s, err := store.Open(ctx, path)
require.NoError(t, err)
now := time.Now().UTC()
require.NoError(t, s.UpsertGuild(ctx, store.GuildRecord{ID: "g1", Name: "Guild", RawJSON: `{}`}))
require.NoError(t, s.UpsertChannel(ctx, store.ChannelRecord{ID: "c1", GuildID: "g1", Kind: "text", Name: "general", RawJSON: `{}`}))
require.NoError(t, s.UpsertMessage(ctx, store.MessageRecord{
ID: "m100",
GuildID: "g1",
ChannelID: "c1",
ChannelName: "general",
AuthorID: "u1",
AuthorName: "Peter",
MessageType: 0,
CreatedAt: now.Format(time.RFC3339Nano),
Content: "automatic updates work",
NormalizedContent: "automatic updates work",
RawJSON: `{}`,
}))
return s
}
func runGit(t *testing.T, dir string, args ...string) {
t.Helper()
// #nosec G204 -- fixed git argv in test setup.
cmd := exec.Command("git", args...)
cmd.Dir = dir
out, err := cmd.CombinedOutput()
require.NoError(t, err, string(out))
}
type fakeDiscordClient struct {
guilds []*discordgo.UserGuild
self *discordgo.User

View File

@ -0,0 +1,191 @@
package cli
import (
"flag"
"fmt"
"io"
"os"
"github.com/steipete/discrawl/internal/config"
"github.com/steipete/discrawl/internal/share"
"github.com/steipete/discrawl/internal/store"
)
const defaultShareRemote = "https://github.com/openclaw/discord-backup.git"
func (r *runtime) runPublish(args []string) error {
fs := flag.NewFlagSet("publish", flag.ContinueOnError)
fs.SetOutput(io.Discard)
repoPath := fs.String("repo", r.cfg.Share.RepoPath, "")
remote := fs.String("remote", r.cfg.Share.Remote, "")
branch := fs.String("branch", r.cfg.Share.Branch, "")
message := fs.String("message", "", "")
noCommit := fs.Bool("no-commit", false, "")
push := fs.Bool("push", false, "")
if err := fs.Parse(args); err != nil {
return usageErr(err)
}
if fs.NArg() != 0 {
return usageErr(fmt.Errorf("publish takes no positional arguments"))
}
opts, err := shareOptionsFromFlags(*repoPath, *remote, *branch)
if err != nil {
return err
}
manifest, err := share.Export(r.ctx, r.store, opts)
if err != nil {
return err
}
committed := false
if !*noCommit {
msg := *message
if msg == "" {
msg = "sync: discord archive"
}
committed, err = share.Commit(r.ctx, opts, msg)
if err != nil {
return err
}
}
if *push {
if err := share.Push(r.ctx, opts); err != nil {
return err
}
}
return r.print(map[string]any{
"repo_path": opts.RepoPath,
"remote": opts.Remote,
"generated_at": manifest.GeneratedAt,
"tables": manifest.Tables,
"committed": committed,
"pushed": *push,
})
}
func (r *runtime) runSubscribe(args []string) error {
fs := flag.NewFlagSet("subscribe", flag.ContinueOnError)
fs.SetOutput(io.Discard)
repoPath := fs.String("repo", "", "")
branch := fs.String("branch", "main", "")
noImport := fs.Bool("no-import", false, "")
if err := fs.Parse(args); err != nil {
return usageErr(err)
}
remote := defaultShareRemote
if fs.NArg() > 1 {
return usageErr(fmt.Errorf("subscribe takes at most one remote"))
}
if fs.NArg() == 1 {
remote = fs.Arg(0)
}
cfg, err := loadConfigOrDefault(r.configPath)
if err != nil {
return err
}
if *repoPath != "" {
cfg.Share.RepoPath = *repoPath
}
cfg.Share.Remote = remote
cfg.Share.Branch = *branch
cfg.Share.AutoUpdate = true
if cfg.Share.StaleAfter == "" {
cfg.Share.StaleAfter = "15m"
}
if err := config.Write(r.configPath, cfg); err != nil {
return configErr(err)
}
if *noImport {
return r.print(map[string]any{"config_path": r.configPath, "remote": remote, "repo_path": cfg.Share.RepoPath})
}
if err := config.EnsureRuntimeDirs(cfg); err != nil {
return configErr(err)
}
dbPath, err := config.ExpandPath(cfg.DBPath)
if err != nil {
return configErr(err)
}
s, err := store.Open(r.ctx, dbPath)
if err != nil {
return dbErr(err)
}
defer func() { _ = s.Close() }()
expandedRepo, err := config.ExpandPath(cfg.Share.RepoPath)
if err != nil {
return configErr(err)
}
opts := share.Options{RepoPath: expandedRepo, Remote: cfg.Share.Remote, Branch: cfg.Share.Branch}
if err := share.Pull(r.ctx, opts); err != nil {
return err
}
manifest, err := share.Import(r.ctx, s, opts)
if err != nil {
return err
}
return r.print(map[string]any{
"config_path": r.configPath,
"repo_path": opts.RepoPath,
"remote": opts.Remote,
"generated_at": manifest.GeneratedAt,
"tables": manifest.Tables,
})
}
func (r *runtime) runUpdate(args []string) error {
fs := flag.NewFlagSet("update", flag.ContinueOnError)
fs.SetOutput(io.Discard)
repoPath := fs.String("repo", r.cfg.Share.RepoPath, "")
remote := fs.String("remote", r.cfg.Share.Remote, "")
branch := fs.String("branch", r.cfg.Share.Branch, "")
if err := fs.Parse(args); err != nil {
return usageErr(err)
}
if fs.NArg() != 0 {
return usageErr(fmt.Errorf("update takes no positional arguments"))
}
opts, err := shareOptionsFromFlags(*repoPath, *remote, *branch)
if err != nil {
return err
}
if err := share.Pull(r.ctx, opts); err != nil {
return err
}
manifest, err := share.Import(r.ctx, r.store, opts)
if err != nil {
return err
}
return r.print(map[string]any{
"repo_path": opts.RepoPath,
"remote": opts.Remote,
"generated_at": manifest.GeneratedAt,
"tables": manifest.Tables,
})
}
func shareOptionsFromFlags(repoPath, remote, branch string) (share.Options, error) {
expandedRepo, err := config.ExpandPath(repoPath)
if err != nil {
return share.Options{}, configErr(err)
}
if remote == "" {
remote = defaultShareRemote
}
if branch == "" {
branch = "main"
}
return share.Options{RepoPath: expandedRepo, Remote: remote, Branch: branch}, nil
}
func loadConfigOrDefault(path string) (config.Config, error) {
cfg, err := config.Load(path)
if err == nil {
return cfg, nil
}
if !os.IsNotExist(err) {
return config.Config{}, configErr(err)
}
cfg = config.Default()
if err := cfg.Normalize(); err != nil {
return config.Config{}, configErr(err)
}
return cfg, nil
}

View File

@ -29,6 +29,7 @@ type Config struct {
Discord DiscordConfig `toml:"discord"`
Sync SyncConfig `toml:"sync"`
Search SearchConfig `toml:"search"`
Share ShareConfig `toml:"share"`
}
type DiscordConfig struct {
@ -50,6 +51,14 @@ type SearchConfig struct {
Embeddings EmbeddingsConfig `toml:"embeddings"`
}
type ShareConfig struct {
Remote string `toml:"remote,omitempty"`
RepoPath string `toml:"repo_path,omitempty"`
Branch string `toml:"branch,omitempty"`
AutoUpdate bool `toml:"auto_update"`
StaleAfter string `toml:"stale_after"`
}
type EmbeddingsConfig struct {
Enabled bool `toml:"enabled"`
Provider string `toml:"provider"`
@ -118,6 +127,12 @@ func Default() Config {
BatchSize: 64,
},
},
Share: ShareConfig{
RepoPath: filepath.Join(base, "share"),
Branch: "main",
AutoUpdate: true,
StaleAfter: "15m",
},
}
}
@ -236,6 +251,15 @@ func (c *Config) Normalize() error {
if c.Search.Embeddings.BatchSize <= 0 {
c.Search.Embeddings.BatchSize = 64
}
if c.Share.RepoPath == "" {
c.Share.RepoPath = Default().Share.RepoPath
}
if c.Share.Branch == "" {
c.Share.Branch = "main"
}
if c.Share.StaleAfter == "" {
c.Share.StaleAfter = "15m"
}
c.GuildIDs = uniqueStrings(c.GuildIDs)
return nil
}
@ -258,6 +282,10 @@ func (c Config) AttachmentTextEnabled() bool {
return c.Sync.AttachmentText == nil || *c.Sync.AttachmentText
}
func (c Config) ShareEnabled() bool {
return strings.TrimSpace(c.Share.Remote) != ""
}
func EnsureRuntimeDirs(cfg Config) error {
paths := []string{cfg.CacheDir, cfg.LogDir, filepath.Dir(cfg.DBPath)}
for _, path := range paths {

View File

@ -23,6 +23,12 @@ func TestNormalizeFillsDefaults(t *testing.T) {
require.NotNil(t, cfg.Sync.AttachmentText)
require.True(t, *cfg.Sync.AttachmentText)
require.Equal(t, "fts", cfg.Search.DefaultMode)
require.Equal(t, "main", cfg.Share.Branch)
require.Equal(t, "15m", cfg.Share.StaleAfter)
require.True(t, Default().Share.AutoUpdate)
require.False(t, cfg.ShareEnabled())
cfg.Share.Remote = "git@example.com:org/archive.git"
require.True(t, cfg.ShareEnabled())
}
func TestResolveDiscordTokenPrefersOpenClaw(t *testing.T) {

517
internal/share/share.go Normal file
View File

@ -0,0 +1,517 @@
package share
import (
"compress/gzip"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/steipete/discrawl/internal/store"
)
const (
ManifestName = "manifest.json"
LastImportSyncScope = "share:last_import_at"
)
var ErrNoManifest = errors.New("share manifest not found")
const shardFlushRows = 1024
var maxShardBytes int64 = 40 * 1024 * 1024
var SnapshotTables = []string{
"guilds",
"channels",
"members",
"messages",
"message_events",
"message_attachments",
"mention_events",
"sync_state",
"embedding_jobs",
}
type Options struct {
RepoPath string
Remote string
Branch string
}
type Manifest struct {
Version int `json:"version"`
GeneratedAt time.Time `json:"generated_at"`
Tables []TableManifest `json:"tables"`
Files map[string]string `json:"files,omitempty"`
}
type TableManifest struct {
Name string `json:"name"`
File string `json:"file,omitempty"`
Files []string `json:"files,omitempty"`
Columns []string `json:"columns"`
Rows int `json:"rows"`
}
func EnsureRepo(ctx context.Context, opts Options) error {
if strings.TrimSpace(opts.RepoPath) == "" {
return fmt.Errorf("share repo path is empty")
}
if _, err := os.Stat(filepath.Join(opts.RepoPath, ".git")); err == nil {
return nil
}
if strings.TrimSpace(opts.Remote) != "" {
if err := os.MkdirAll(filepath.Dir(opts.RepoPath), 0o755); err != nil {
return fmt.Errorf("mkdir share parent: %w", err)
}
if err := run(ctx, "", "git", "clone", opts.Remote, opts.RepoPath); err != nil {
return err
}
if strings.TrimSpace(opts.Branch) != "" {
if err := run(ctx, opts.RepoPath, "git", "checkout", "-B", opts.Branch); err != nil {
return err
}
}
return nil
}
if err := os.MkdirAll(opts.RepoPath, 0o755); err != nil {
return fmt.Errorf("mkdir share repo: %w", err)
}
if err := run(ctx, opts.RepoPath, "git", "init"); err != nil {
return err
}
if strings.TrimSpace(opts.Branch) != "" {
if err := run(ctx, opts.RepoPath, "git", "checkout", "-B", opts.Branch); err != nil {
return err
}
}
return nil
}
func Pull(ctx context.Context, opts Options) error {
if strings.TrimSpace(opts.Remote) == "" {
return nil
}
if err := EnsureRepo(ctx, opts); err != nil {
return err
}
if err := run(ctx, opts.RepoPath, "git", "fetch", "--prune", "origin"); err != nil {
return err
}
branch := opts.Branch
if strings.TrimSpace(branch) == "" {
branch = "main"
}
remoteRef := "refs/remotes/origin/" + branch
if _, err := output(ctx, opts.RepoPath, "git", "rev-parse", "--verify", remoteRef); err != nil {
return run(ctx, opts.RepoPath, "git", "checkout", "-B", branch)
}
if err := run(ctx, opts.RepoPath, "git", "checkout", "-B", branch, "origin/"+branch); err != nil {
return err
}
return run(ctx, opts.RepoPath, "git", "pull", "--ff-only", "origin", branch)
}
func Commit(ctx context.Context, opts Options, message string) (bool, error) {
if err := run(ctx, opts.RepoPath, "git", "add", "."); err != nil {
return false, err
}
out, err := output(ctx, opts.RepoPath, "git", "status", "--porcelain")
if err != nil {
return false, err
}
if strings.TrimSpace(out) == "" {
return false, nil
}
if strings.TrimSpace(message) == "" {
message = "sync: discord archive"
}
if err := run(ctx, opts.RepoPath, "git", "commit", "-m", message); err != nil {
return false, err
}
return true, nil
}
func Push(ctx context.Context, opts Options) error {
branch := opts.Branch
if strings.TrimSpace(branch) == "" {
branch = "main"
}
return run(ctx, opts.RepoPath, "git", "push", "-u", "origin", branch)
}
func Export(ctx context.Context, s *store.Store, opts Options) (Manifest, error) {
if err := EnsureRepo(ctx, opts); err != nil {
return Manifest{}, err
}
dataDir := filepath.Join(opts.RepoPath, "tables")
if err := os.RemoveAll(dataDir); err != nil {
return Manifest{}, fmt.Errorf("reset tables dir: %w", err)
}
if err := os.MkdirAll(dataDir, 0o755); err != nil {
return Manifest{}, fmt.Errorf("mkdir tables dir: %w", err)
}
manifest := Manifest{
Version: 1,
GeneratedAt: time.Now().UTC(),
Files: map[string]string{"manifest": ManifestName},
}
for _, table := range SnapshotTables {
entry, err := exportTable(ctx, s.DB(), dataDir, table)
if err != nil {
return Manifest{}, err
}
manifest.Tables = append(manifest.Tables, entry)
}
body, err := json.MarshalIndent(manifest, "", " ")
if err != nil {
return Manifest{}, err
}
body = append(body, '\n')
if err := os.WriteFile(filepath.Join(opts.RepoPath, ManifestName), body, 0o600); err != nil {
return Manifest{}, fmt.Errorf("write manifest: %w", err)
}
return manifest, nil
}
func Import(ctx context.Context, s *store.Store, opts Options) (Manifest, error) {
manifest, err := ReadManifest(opts.RepoPath)
if err != nil {
return Manifest{}, err
}
tx, err := s.DB().BeginTx(ctx, nil)
if err != nil {
return Manifest{}, err
}
committed := false
defer func() {
if !committed {
_ = tx.Rollback()
}
}()
for _, table := range []string{"message_fts", "member_fts"} {
if _, err := tx.ExecContext(ctx, "delete from "+table); err != nil {
return Manifest{}, fmt.Errorf("clear %s: %w", table, err)
}
}
for i := len(SnapshotTables) - 1; i >= 0; i-- {
table := SnapshotTables[i]
if _, err := tx.ExecContext(ctx, "delete from "+table); err != nil {
return Manifest{}, fmt.Errorf("clear %s: %w", table, err)
}
}
for _, table := range manifest.Tables {
if err := importTable(ctx, tx, opts.RepoPath, table); err != nil {
return Manifest{}, err
}
}
if err := tx.Commit(); err != nil {
return Manifest{}, err
}
committed = true
if err := s.RebuildSearchIndexes(ctx); err != nil {
return Manifest{}, err
}
if err := s.SetSyncState(ctx, LastImportSyncScope, time.Now().UTC().Format(time.RFC3339Nano)); err != nil {
return Manifest{}, err
}
return manifest, nil
}
func ReadManifest(repoPath string) (Manifest, error) {
data, err := os.ReadFile(filepath.Join(repoPath, ManifestName))
if err != nil {
if os.IsNotExist(err) {
return Manifest{}, ErrNoManifest
}
return Manifest{}, fmt.Errorf("read share manifest: %w", err)
}
var manifest Manifest
if err := json.Unmarshal(data, &manifest); err != nil {
return Manifest{}, fmt.Errorf("parse share manifest: %w", err)
}
if manifest.Version != 1 {
return Manifest{}, fmt.Errorf("unsupported share manifest version %d", manifest.Version)
}
return manifest, nil
}
func NeedsImport(ctx context.Context, s *store.Store, staleAfter time.Duration) bool {
if staleAfter <= 0 {
staleAfter = 15 * time.Minute
}
last, err := s.GetSyncState(ctx, LastImportSyncScope)
if err != nil || strings.TrimSpace(last) == "" {
return true
}
t, err := time.Parse(time.RFC3339Nano, last)
if err != nil {
return true
}
return time.Since(t) >= staleAfter
}
func exportTable(ctx context.Context, db *sql.DB, dataDir, table string) (TableManifest, error) {
query := "select * from " + table
rows, err := db.QueryContext(ctx, query)
if err != nil {
return TableManifest{}, fmt.Errorf("query %s: %w", table, err)
}
defer func() { _ = rows.Close() }()
columns, err := rows.Columns()
if err != nil {
return TableManifest{}, fmt.Errorf("columns %s: %w", table, err)
}
tableDir := filepath.Join(dataDir, table)
if err := os.MkdirAll(tableDir, 0o755); err != nil {
return TableManifest{}, fmt.Errorf("mkdir %s: %w", table, err)
}
writer := tableShardWriter{dataDir: dataDir, table: table}
if err := writer.open(); err != nil {
return TableManifest{}, err
}
defer func() { _ = writer.close() }()
count := 0
values := make([]any, len(columns))
ptrs := make([]any, len(columns))
for i := range values {
ptrs[i] = &values[i]
}
for rows.Next() {
if err := rows.Scan(ptrs...); err != nil {
return TableManifest{}, fmt.Errorf("scan %s: %w", table, err)
}
row := make(map[string]any, len(columns))
for i, column := range columns {
row[column] = exportValue(values[i])
}
body, err := json.Marshal(row)
if err != nil {
return TableManifest{}, fmt.Errorf("marshal %s row: %w", table, err)
}
if err := writer.rotateIfNeeded(); err != nil {
return TableManifest{}, err
}
if _, err := writer.Write(body); err != nil {
return TableManifest{}, fmt.Errorf("write %s row: %w", table, err)
}
if _, err := writer.Write([]byte{'\n'}); err != nil {
return TableManifest{}, fmt.Errorf("write %s newline: %w", table, err)
}
count++
if err := writer.finishRow(); err != nil {
return TableManifest{}, err
}
}
if err := rows.Err(); err != nil {
return TableManifest{}, fmt.Errorf("iterate %s: %w", table, err)
}
if err := writer.close(); err != nil {
return TableManifest{}, err
}
return TableManifest{Name: table, Files: writer.files, Columns: columns, Rows: count}, nil
}
func importTable(ctx context.Context, tx *sql.Tx, repoPath string, table TableManifest) error {
files := table.Files
if len(files) == 0 && strings.TrimSpace(table.File) != "" {
files = []string{table.File}
}
if len(files) == 0 {
return fmt.Errorf("manifest table %s has no files", table.Name)
}
stmt, err := tx.PrepareContext(ctx, insertSQL(table.Name, table.Columns))
if err != nil {
return fmt.Errorf("prepare import %s: %w", table.Name, err)
}
defer func() { _ = stmt.Close() }()
for _, rel := range files {
if err := importTableFile(ctx, stmt, repoPath, table, rel); err != nil {
return err
}
}
return nil
}
func importTableFile(ctx context.Context, stmt *sql.Stmt, repoPath string, table TableManifest, rel string) error {
path := filepath.Join(repoPath, filepath.FromSlash(rel))
file, err := os.Open(path)
if err != nil {
return fmt.Errorf("open %s: %w", rel, err)
}
defer func() { _ = file.Close() }()
gz, err := gzip.NewReader(file)
if err != nil {
return fmt.Errorf("read gzip %s: %w", rel, err)
}
defer func() { _ = gz.Close() }()
dec := json.NewDecoder(gz)
dec.UseNumber()
for {
row := map[string]any{}
err := dec.Decode(&row)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("decode %s: %w", rel, err)
}
values := make([]any, len(table.Columns))
for i, column := range table.Columns {
values[i] = importValue(row[column])
}
if _, err := stmt.ExecContext(ctx, values...); err != nil {
return fmt.Errorf("insert %s: %w", table.Name, err)
}
}
return nil
}
type tableShardWriter struct {
dataDir string
table string
nextShard int
rowsInShard int
files []string
file *os.File
counter *countingWriter
gz *gzip.Writer
}
func (w *tableShardWriter) open() error {
rel := filepath.ToSlash(filepath.Join("tables", w.table, fmt.Sprintf("%06d.jsonl.gz", w.nextShard)))
path := filepath.Join(w.dataDir, w.table, fmt.Sprintf("%06d.jsonl.gz", w.nextShard))
file, err := os.Create(path)
if err != nil {
return fmt.Errorf("create %s: %w", rel, err)
}
w.nextShard++
w.rowsInShard = 0
w.files = append(w.files, rel)
w.file = file
w.counter = &countingWriter{w: file}
w.gz = gzip.NewWriter(w.counter)
return nil
}
func (w *tableShardWriter) Write(p []byte) (int, error) {
return w.gz.Write(p)
}
func (w *tableShardWriter) rotateIfNeeded() error {
if maxShardBytes <= 0 || w.rowsInShard == 0 || w.counter.n < maxShardBytes {
return nil
}
if err := w.close(); err != nil {
return err
}
return w.open()
}
func (w *tableShardWriter) finishRow() error {
w.rowsInShard++
if maxShardBytes > 1024*1024 && w.rowsInShard%shardFlushRows != 0 {
return nil
}
if err := w.gz.Flush(); err != nil {
return fmt.Errorf("flush %s shard: %w", w.table, err)
}
return nil
}
func (w *tableShardWriter) close() error {
var closeErr error
if w.gz != nil {
if err := w.gz.Close(); err != nil {
closeErr = err
}
w.gz = nil
}
if w.file != nil {
if err := w.file.Close(); err != nil && closeErr == nil {
closeErr = err
}
w.file = nil
}
if closeErr != nil {
return fmt.Errorf("close %s shard: %w", w.table, closeErr)
}
return nil
}
type countingWriter struct {
w io.Writer
n int64
}
func (w *countingWriter) Write(p []byte) (int, error) {
n, err := w.w.Write(p)
w.n += int64(n)
return n, err
}
func exportValue(value any) any {
switch v := value.(type) {
case []byte:
return string(v)
default:
return v
}
}
func importValue(value any) any {
switch v := value.(type) {
case json.Number:
if i, err := strconv.ParseInt(v.String(), 10, 64); err == nil {
return i
}
if f, err := strconv.ParseFloat(v.String(), 64); err == nil {
return f
}
return v.String()
default:
return v
}
}
func insertSQL(table string, columns []string) string {
quoted := make([]string, len(columns))
placeholders := make([]string, len(columns))
for i, column := range columns {
quoted[i] = quoteIdent(column)
placeholders[i] = "?"
}
return "insert into " + quoteIdent(table) + "(" + strings.Join(quoted, ",") + ") values(" + strings.Join(placeholders, ",") + ")"
}
func quoteIdent(s string) string {
return `"` + strings.ReplaceAll(s, `"`, `""`) + `"`
}
func run(ctx context.Context, dir, name string, args ...string) error {
out, err := output(ctx, dir, name, args...)
if err != nil {
return fmt.Errorf("%s %s: %w\n%s", name, strings.Join(args, " "), err, strings.TrimSpace(out))
}
return nil
}
func output(ctx context.Context, dir, name string, args ...string) (string, error) {
// #nosec G204 -- discrawl invokes the Git executable with argv, never through a shell.
cmd := exec.CommandContext(ctx, name, args...)
if dir != "" {
cmd.Dir = dir
}
body, err := cmd.CombinedOutput()
return string(body), err
}

View File

@ -0,0 +1,214 @@
package share
import (
"context"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/steipete/discrawl/internal/store"
)
func TestExportImportRoundTrip(t *testing.T) {
ctx := context.Background()
src := seedStore(t, filepath.Join(t.TempDir(), "src.db"))
defer func() { _ = src.Close() }()
repo := filepath.Join(t.TempDir(), "share")
manifest, err := Export(ctx, src, Options{RepoPath: repo, Branch: "main"})
require.NoError(t, err)
require.NotEmpty(t, manifest.Tables)
require.FileExists(t, filepath.Join(repo, ManifestName))
require.NotEmpty(t, tableEntry(t, manifest, "messages").Files)
dst, err := store.Open(ctx, filepath.Join(t.TempDir(), "dst.db"))
require.NoError(t, err)
defer func() { _ = dst.Close() }()
imported, err := Import(ctx, dst, Options{RepoPath: repo, Branch: "main"})
require.NoError(t, err)
require.Equal(t, manifest.GeneratedAt, imported.GeneratedAt)
results, err := dst.SearchMessages(ctx, store.SearchOptions{Query: "launch", Limit: 10})
require.NoError(t, err)
require.Len(t, results, 1)
require.Equal(t, "m1", results[0].MessageID)
mentions, err := dst.ListMentions(ctx, store.MentionListOptions{Target: "Ops", Limit: 10})
require.NoError(t, err)
require.Len(t, mentions, 1)
lastImport, err := dst.GetSyncState(ctx, LastImportSyncScope)
require.NoError(t, err)
require.NotEmpty(t, lastImport)
require.False(t, NeedsImport(ctx, dst, 15*time.Minute))
}
func TestExportShardsLargeTables(t *testing.T) {
ctx := context.Background()
prevMaxShardBytes := maxShardBytes
maxShardBytes = 150
t.Cleanup(func() { maxShardBytes = prevMaxShardBytes })
src := seedStore(t, filepath.Join(t.TempDir(), "src.db"))
defer func() { _ = src.Close() }()
now := time.Now().UTC()
for i := range 25 {
id := "extra-" + strconv.Itoa(i)
require.NoError(t, src.UpsertMessages(ctx, []store.MessageMutation{{
Record: store.MessageRecord{
ID: id,
GuildID: "g1",
ChannelID: "c1",
ChannelName: "general",
AuthorID: "u1",
AuthorName: "Peter",
MessageType: 0,
CreatedAt: now.Add(time.Duration(i) * time.Second).Format(time.RFC3339Nano),
Content: strings.Repeat("unique launch shard payload "+id+" ", 8),
NormalizedContent: strings.Repeat("unique launch shard payload "+id+" ", 8),
RawJSON: `{}`,
},
EventType: "upsert",
PayloadJSON: `{"id":"` + id + `"}`,
Options: store.WriteOptions{AppendEvent: true},
}}))
}
repo := filepath.Join(t.TempDir(), "share")
manifest, err := Export(ctx, src, Options{RepoPath: repo, Branch: "main"})
require.NoError(t, err)
messages := tableEntry(t, manifest, "messages")
require.Greater(t, len(messages.Files), 1)
require.Empty(t, messages.File)
for _, rel := range messages.Files {
info, err := os.Stat(filepath.Join(repo, filepath.FromSlash(rel)))
require.NoError(t, err)
require.Less(t, info.Size(), int64(100*1024*1024))
}
dst, err := store.Open(ctx, filepath.Join(t.TempDir(), "dst.db"))
require.NoError(t, err)
defer func() { _ = dst.Close() }()
_, err = Import(ctx, dst, Options{RepoPath: repo, Branch: "main"})
require.NoError(t, err)
results, err := dst.SearchMessages(ctx, store.SearchOptions{Query: "shard payload", Limit: 50})
require.NoError(t, err)
require.Len(t, results, 25)
}
func TestGitCommitDetectsNoChanges(t *testing.T) {
ctx := context.Background()
src := seedStore(t, filepath.Join(t.TempDir(), "src.db"))
defer func() { _ = src.Close() }()
repo := filepath.Join(t.TempDir(), "share")
opts := Options{RepoPath: repo, Branch: "main"}
_, err := Export(ctx, src, opts)
require.NoError(t, err)
// #nosec G204 -- fixed git argv in test setup.
require.NoError(t, exec.Command("git", "-C", repo, "config", "user.name", "discrawl test").Run())
// #nosec G204 -- fixed git argv in test setup.
require.NoError(t, exec.Command("git", "-C", repo, "config", "user.email", "discrawl@example.com").Run())
committed, err := Commit(ctx, opts, "test: snapshot")
require.NoError(t, err)
require.True(t, committed)
committed, err = Commit(ctx, opts, "test: snapshot")
require.NoError(t, err)
require.False(t, committed)
}
func TestPullAndPushWithBareRemote(t *testing.T) {
ctx := context.Background()
src := seedStore(t, filepath.Join(t.TempDir(), "src.db"))
defer func() { _ = src.Close() }()
dir := t.TempDir()
remote := filepath.Join(dir, "remote.git")
// #nosec G204 -- fixed git argv in test setup.
require.NoError(t, exec.Command("git", "-C", dir, "init", "--bare", remote).Run())
publisher := filepath.Join(dir, "publisher")
opts := Options{RepoPath: publisher, Remote: remote, Branch: "main"}
_, err := Export(ctx, src, opts)
require.NoError(t, err)
// #nosec G204 -- fixed git argv in test setup.
require.NoError(t, exec.Command("git", "-C", publisher, "config", "user.name", "discrawl test").Run())
// #nosec G204 -- fixed git argv in test setup.
require.NoError(t, exec.Command("git", "-C", publisher, "config", "user.email", "discrawl@example.com").Run())
committed, err := Commit(ctx, opts, "test: snapshot")
require.NoError(t, err)
require.True(t, committed)
require.NoError(t, Push(ctx, opts))
subscriber := filepath.Join(dir, "subscriber")
subOpts := Options{RepoPath: subscriber, Remote: remote, Branch: "main"}
require.NoError(t, Pull(ctx, subOpts))
require.FileExists(t, filepath.Join(subscriber, ManifestName))
}
func seedStore(t *testing.T, path string) *store.Store {
t.Helper()
ctx := context.Background()
s, err := store.Open(ctx, path)
require.NoError(t, err)
now := time.Now().UTC()
require.NoError(t, s.UpsertGuild(ctx, store.GuildRecord{ID: "g1", Name: "Guild", RawJSON: `{}`}))
require.NoError(t, s.UpsertChannel(ctx, store.ChannelRecord{ID: "c1", GuildID: "g1", Kind: "text", Name: "general", RawJSON: `{}`}))
require.NoError(t, s.UpsertMember(ctx, store.MemberRecord{
GuildID: "g1",
UserID: "u1",
Username: "peter",
DisplayName: "Peter",
RoleIDsJSON: `[]`,
RawJSON: `{"bio":"Runs launch ops"}`,
}))
require.NoError(t, s.UpsertMessages(ctx, []store.MessageMutation{{
Record: store.MessageRecord{
ID: "m1",
GuildID: "g1",
ChannelID: "c1",
ChannelName: "general",
AuthorID: "u1",
AuthorName: "Peter",
MessageType: 0,
CreatedAt: now.Format(time.RFC3339Nano),
Content: "launch checklist ready",
NormalizedContent: "launch checklist ready",
RawJSON: `{}`,
},
EventType: "upsert",
PayloadJSON: `{"id":"m1"}`,
Options: store.WriteOptions{AppendEvent: true},
Mentions: []store.MentionEventRecord{{
MessageID: "m1",
GuildID: "g1",
ChannelID: "c1",
AuthorID: "u1",
TargetType: "role",
TargetID: "r1",
TargetName: "Ops",
EventAt: now.Format(time.RFC3339Nano),
}},
}}))
return s
}
func tableEntry(t *testing.T, manifest Manifest, name string) TableManifest {
t.Helper()
for _, table := range manifest.Tables {
if table.Name == name {
return table
}
}
t.Fatalf("table %s not found", name)
return TableManifest{}
}

View File

@ -217,6 +217,26 @@ func (s *Store) migrate(ctx context.Context) error {
return nil
}
func (s *Store) RebuildSearchIndexes(ctx context.Context) error {
if err := s.rebuildFTS(ctx); err != nil {
return err
}
if err := s.rebuildMemberFTS(ctx); err != nil {
return err
}
now := time.Now().UTC().Format(timeLayout)
if _, err := s.db.ExecContext(ctx, `
insert into sync_state(scope, cursor, updated_at)
values(?, ?, ?), (?, ?, ?)
on conflict(scope) do update set
cursor=excluded.cursor,
updated_at=excluded.updated_at
`, "schema:message_fts_rowid_version", messageFTSVersion, now, "schema:member_fts_rowid_version", memberFTSVersion, now); err != nil {
return fmt.Errorf("stamp search index versions: %w", err)
}
return nil
}
func (s *Store) schemaVersion(ctx context.Context) (int, error) {
var version int
if err := s.db.QueryRowContext(ctx, `pragma user_version`).Scan(&version); err != nil {

View File

@ -214,6 +214,58 @@ func TestCheckMessageFTSProbe(t *testing.T) {
require.NoError(t, s.CheckMessageFTS(ctx))
}
func TestRebuildSearchIndexesAndGuildCounts(t *testing.T) {
t.Parallel()
ctx := context.Background()
s, err := Open(ctx, filepath.Join(t.TempDir(), "discrawl.db"))
require.NoError(t, err)
defer func() { _ = s.Close() }()
require.NoError(t, s.UpsertGuild(ctx, GuildRecord{ID: "g1", Name: "Guild", RawJSON: `{}`}))
require.NoError(t, s.UpsertChannel(ctx, ChannelRecord{ID: "c1", GuildID: "g1", Kind: "text", Name: "general", RawJSON: `{}`}))
require.NoError(t, s.UpsertMember(ctx, MemberRecord{
GuildID: "g1",
UserID: "u1",
Username: "peter",
DisplayName: "Peter",
RoleIDsJSON: `[]`,
RawJSON: `{"bio":"Searchable profile"}`,
}))
require.NoError(t, s.UpsertMessage(ctx, MessageRecord{
ID: "1458939673664684210",
GuildID: "g1",
ChannelID: "c1",
ChannelName: "general",
AuthorID: "u1",
AuthorName: "Peter",
MessageType: 0,
CreatedAt: time.Now().UTC().Format(time.RFC3339Nano),
Content: "rebuildable launch text",
NormalizedContent: "rebuildable launch text",
RawJSON: `{}`,
}))
_, err = s.DB().ExecContext(ctx, `delete from message_fts`)
require.NoError(t, err)
_, err = s.DB().ExecContext(ctx, `delete from member_fts`)
require.NoError(t, err)
require.NoError(t, s.RebuildSearchIndexes(ctx))
results, err := s.SearchMessages(ctx, SearchOptions{Query: "rebuildable", Limit: 10})
require.NoError(t, err)
require.Len(t, results, 1)
members, err := s.Members(ctx, "g1", "Searchable", 10)
require.NoError(t, err)
require.Len(t, members, 1)
channelCount, err := s.GuildChannelCount(ctx, "g1")
require.NoError(t, err)
require.Equal(t, 1, channelCount)
memberCount, err := s.GuildMemberCount(ctx, "g1")
require.NoError(t, err)
require.Equal(t, 1, memberCount)
}
func TestOpenSetsSchemaVersion(t *testing.T) {
t.Parallel()