From 4d976d782ba2f2c409fe3b8fedced4fd16e59cdd Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 8 May 2026 07:10:12 +0100 Subject: [PATCH] feat(snapshot): add incremental shard import planning --- AGENTS.md | 2 +- CHANGELOG.md | 2 + README.md | 4 +- docs/boundary.md | 5 +- docs/publishing.md | 18 +- go.mod | 2 +- snapshot/snapshot.go | 345 ++++++++++++++++++++++++++++++++++++-- snapshot/snapshot_test.go | 154 ++++++++++++++++- 8 files changed, 503 insertions(+), 29 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index e17e409..b5c5e46 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -41,7 +41,7 @@ GOWORK=off go test -count=1 ./... For release readiness, also verify the public module tag: ```bash -GOPROXY=https://proxy.golang.org GONOSUMDB= go list -m github.com/vincentkoc/crawlkit@v0.4.0 +GOPROXY=https://proxy.golang.org GONOSUMDB= go list -m github.com/openclaw/crawlkit@v0.5.0 ``` ## Downstream Compatibility diff --git a/CHANGELOG.md b/CHANGELOG.md index b430262..c959115 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Add snapshot file fingerprints and an incremental import planner/executor so downstream apps can import changed JSONL/Gzip shards without deleting every table. +- Move the module path to `github.com/openclaw/crawlkit`. - Bump routine Go module dependencies. ## v0.4.1 - 2026-05-06 diff --git a/README.md b/README.md index 7f8436e..de15960 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ safe desktop-cache snapshot utilities. ## Install ```bash -go get github.com/vincentkoc/crawlkit@latest +go get github.com/openclaw/crawlkit@latest ``` Go packages are published by tagging this repository. There is no separate @@ -22,7 +22,7 @@ See `docs/boundary.md` for the crawlkit-versus-app ownership boundary. - `config`: standard TOML config paths, runtime dirs, and token diagnostics. - `store`: SQLite open/read-only/transaction/query helpers. -- `snapshot`: `manifest.json` plus JSONL/Gzip table snapshot export and import. +- `snapshot`: `manifest.json` plus JSONL/Gzip table snapshot export, file fingerprints, full import, and planned incremental shard import. - `mirror`: clone/init/pull/commit/push helpers for private snapshot repos. - `state`: generic crawler cursor and freshness records. - `output`: text/json/log output helpers. diff --git a/docs/boundary.md b/docs/boundary.md index e43039c..6c54635 100644 --- a/docs/boundary.md +++ b/docs/boundary.md @@ -28,8 +28,9 @@ parsers, and product-specific ranking in the apps. schema-version checks, transactions, safe identifier quoting, and generic query helpers. - Snapshot packing: manifest format, JSONL/Gzip shards, table filters, - import progress, sidecar registration, backward-compatible manifest reads, - and import callbacks. + per-file fingerprints, import progress, incremental import planning, + sidecar registration, backward-compatible manifest reads, and import + callbacks. - Git mirror mechanics: clone/init, pull, origin management, path-scoped commits, push retry behavior, and portable SQLite checkout cleanup. - Sync freshness semantics: cursor/freshness records, stale checks, manifest diff --git a/docs/publishing.md b/docs/publishing.md index 7b27e8b..31612cb 100644 --- a/docs/publishing.md +++ b/docs/publishing.md @@ -21,30 +21,30 @@ go test ./... 6. Tag the next semver release from `main`: ```bash -git tag -s v0.4.0 +git tag -s v0.5.0 git push origin main -git push origin v0.4.0 +git push origin v0.5.0 ``` 7. Prime and verify module proxy visibility: ```bash -GOPROXY=https://proxy.golang.org go list -m github.com/vincentkoc/crawlkit@v0.4.0 -go list -m github.com/vincentkoc/crawlkit@v0.4.0 +GOPROXY=https://proxy.golang.org go list -m github.com/openclaw/crawlkit@v0.5.0 +go list -m github.com/openclaw/crawlkit@v0.5.0 ``` 8. Bump downstream apps to the new tag and commit their `go.mod`/`go.sum` updates: ```bash -go get github.com/vincentkoc/crawlkit@v0.4.0 +go get github.com/openclaw/crawlkit@v0.5.0 go mod tidy ``` `pkg.go.dev` indexes public modules automatically after the tag is reachable. -Use a patch tag such as `v0.3.17` only for narrow bug fixes on the existing API. -Use a minor tag such as `v0.4.0` for broad shared TUI or crawler infrastructure -changes. This branch is a `v0.4.0`-shaped release. +Use a patch tag only for narrow bug fixes on the existing API. Use a minor tag +for broad crawler infrastructure changes. The module-path move needs a new tag +on `openclaw/crawlkit` before downstream apps can drop local `replace` lines. ## Versioning @@ -52,5 +52,5 @@ Keep `v0.x.y` while the downstream crawler rewires are still settling. If the module ever reaches `v2`, Go requires the module path to become: ```text -github.com/vincentkoc/crawlkit/v2 +github.com/openclaw/crawlkit/v2 ``` diff --git a/go.mod b/go.mod index 257dba4..69c6e83 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/vincentkoc/crawlkit +module github.com/openclaw/crawlkit go 1.26.2 diff --git a/snapshot/snapshot.go b/snapshot/snapshot.go index ac55cd6..3005d09 100644 --- a/snapshot/snapshot.go +++ b/snapshot/snapshot.go @@ -4,10 +4,13 @@ import ( "bufio" "compress/gzip" "context" + "crypto/sha256" "database/sql" + "encoding/hex" "encoding/json" "errors" "fmt" + "hash" "io" "os" "path/filepath" @@ -15,7 +18,7 @@ import ( "strings" "time" - "github.com/vincentkoc/crawlkit/store" + "github.com/openclaw/crawlkit/store" ) const ManifestName = "manifest.json" @@ -38,6 +41,7 @@ type ImportOptions struct { DeleteTables []string DeleteTable DeleteFunc Filter RowFilter + ImportRow RowImportFunc Progress func(ImportProgress) BeforeImport func(context.Context, *sql.Tx) error AfterImport func(context.Context, *sql.Tx) error @@ -45,6 +49,8 @@ type ImportOptions struct { type RowFilter func(table string, row map[string]any) (bool, error) +type RowImportFunc func(ctx context.Context, tx *sql.Tx, table string, row map[string]any) error + type DeleteFunc func(ctx context.Context, tx *sql.Tx, table string) error type ImportProgress struct { @@ -72,15 +78,58 @@ type Manifest struct { } type TableManifest struct { - Name string `json:"name"` - File string `json:"file,omitempty"` - Files []string `json:"files"` - Columns []string `json:"columns"` - Rows int `json:"rows"` + Name string `json:"name"` + File string `json:"file,omitempty"` + Files []string `json:"files"` + FileManifests []FileManifest `json:"file_manifests,omitempty"` + Columns []string `json:"columns"` + Rows int `json:"rows"` +} + +type FileManifest struct { + Path string `json:"path"` + Rows int `json:"rows"` + Size int64 `json:"size,omitempty"` + SHA256 string `json:"sha256,omitempty"` } var ErrNoManifest = errors.New("pack manifest not found") +type TableImportMode string + +const ( + TableImportSkip TableImportMode = "skip" + TableImportReplace TableImportMode = "replace" + TableImportFiles TableImportMode = "files" +) + +type ImportPlan struct { + Full bool + Reason string + Tables []TableImportPlan +} + +type TableImportPlan struct { + Table TableManifest + Mode TableImportMode + Files []FileManifest + Reason string +} + +type IncrementalImportOptions struct { + DB *sql.DB + RootDir string + Previous Manifest + Current Manifest + Plan ImportPlan + DeleteTable DeleteFunc + Filter RowFilter + ImportRow RowImportFunc + Progress func(ImportProgress) + BeforeImport func(context.Context, *sql.Tx) error + AfterImport func(context.Context, *sql.Tx) error +} + func Export(ctx context.Context, opts ExportOptions) (Manifest, error) { if opts.DB == nil { return Manifest{}, errors.New("db is required") @@ -170,7 +219,7 @@ func Import(ctx context.Context, opts ImportOptions) (Manifest, error) { } } for _, table := range manifest.Tables { - rows, err := importTable(ctx, tx, opts.RootDir, table, opts.Filter, opts.Progress) + rows, err := importTable(ctx, tx, opts.RootDir, table, opts.Filter, opts.ImportRow, opts.Progress) if err != nil { return Manifest{}, err } @@ -188,6 +237,130 @@ func Import(ctx context.Context, opts ImportOptions) (Manifest, error) { return manifest, nil } +func PlanIncrementalImport(previous, current Manifest) ImportPlan { + if current.Version != previous.Version { + return ImportPlan{Full: true, Reason: "manifest version changed"} + } + previousTables := make(map[string]TableManifest, len(previous.Tables)) + for _, table := range previous.Tables { + previousTables[table.Name] = table + } + currentTables := make(map[string]TableManifest, len(current.Tables)) + for _, table := range current.Tables { + currentTables[table.Name] = table + } + for name := range previousTables { + if _, ok := currentTables[name]; !ok { + return ImportPlan{Full: true, Reason: "table removed: " + name} + } + } + plan := ImportPlan{} + for _, table := range current.Tables { + previousTable, ok := previousTables[table.Name] + if !ok { + plan.Tables = append(plan.Tables, TableImportPlan{ + Table: table, + Mode: TableImportReplace, + Files: tableFileManifests(table), + Reason: "new table", + }) + continue + } + tablePlan := planTableIncrement(previousTable, table) + plan.Tables = append(plan.Tables, tablePlan) + } + return plan +} + +func (p ImportPlan) Changed() bool { + if p.Full { + return true + } + for _, table := range p.Tables { + if table.Mode != TableImportSkip { + return true + } + } + return false +} + +func ImportIncremental(ctx context.Context, opts IncrementalImportOptions) (Manifest, ImportPlan, error) { + if opts.DB == nil { + return Manifest{}, ImportPlan{}, errors.New("db is required") + } + current := opts.Current + var err error + if len(current.Tables) == 0 { + current, err = ReadManifest(opts.RootDir) + if err != nil { + return Manifest{}, ImportPlan{}, err + } + } + plan := opts.Plan + if len(plan.Tables) == 0 && !plan.Full && plan.Reason == "" { + plan = PlanIncrementalImport(opts.Previous, current) + } + if plan.Full { + return Manifest{}, plan, errors.New("incremental import requires a non-full plan: " + plan.Reason) + } + if !plan.Changed() { + return current, plan, nil + } + tx, err := opts.DB.BeginTx(ctx, nil) + if err != nil { + return Manifest{}, plan, fmt.Errorf("begin incremental import tx: %w", err) + } + committed := false + defer func() { + if !committed { + _ = tx.Rollback() + } + }() + if opts.BeforeImport != nil { + if err := opts.BeforeImport(ctx, tx); err != nil { + return Manifest{}, plan, err + } + } + for _, tablePlan := range plan.Tables { + switch tablePlan.Mode { + case TableImportSkip: + continue + case TableImportReplace: + if err := deleteImportTable(ctx, tx, tablePlan.Table.Name, opts.DeleteTable); err != nil { + return Manifest{}, plan, err + } + rows, err := importTable(ctx, tx, opts.RootDir, tablePlan.Table, opts.Filter, opts.ImportRow, opts.Progress) + if err != nil { + return Manifest{}, plan, err + } + reportImportProgress(opts.Progress, ImportProgress{Phase: "table_done", Table: tablePlan.Table.Name, Rows: rows, TotalRows: tablePlan.Table.Rows}) + case TableImportFiles: + table := tablePlan.Table + table.File = "" + table.Files = fileManifestPaths(tablePlan.Files) + table.FileManifests = tablePlan.Files + table.Rows = fileManifestRows(tablePlan.Files) + rows, err := importTable(ctx, tx, opts.RootDir, table, opts.Filter, opts.ImportRow, opts.Progress) + if err != nil { + return Manifest{}, plan, err + } + reportImportProgress(opts.Progress, ImportProgress{Phase: "table_done", Table: tablePlan.Table.Name, Rows: rows, TotalRows: table.Rows}) + default: + return Manifest{}, plan, fmt.Errorf("unknown table import mode %q for %s", tablePlan.Mode, tablePlan.Table.Name) + } + } + if opts.AfterImport != nil { + if err := opts.AfterImport(ctx, tx); err != nil { + return Manifest{}, plan, err + } + } + if err := tx.Commit(); err != nil { + return Manifest{}, plan, fmt.Errorf("commit incremental import tx: %w", err) + } + committed = true + return current, plan, nil +} + func ReadManifest(rootDir string) (Manifest, error) { data, err := os.ReadFile(filepath.Join(rootDir, ManifestName)) if errors.Is(err, os.ErrNotExist) { @@ -278,10 +451,10 @@ func exportTable(ctx context.Context, db *sql.DB, rootDir, table string, maxShar if err := writer.close(); err != nil { return TableManifest{}, err } - return TableManifest{Name: table, Files: writer.files, Columns: cols, Rows: count}, nil + return TableManifest{Name: table, Files: writer.files, FileManifests: writer.fileManifests, Columns: cols, Rows: count}, nil } -func importTable(ctx context.Context, tx *sql.Tx, rootDir string, table TableManifest, filter RowFilter, progress func(ImportProgress)) (int, error) { +func importTable(ctx context.Context, tx *sql.Tx, rootDir string, table TableManifest, filter RowFilter, importRow RowImportFunc, progress func(ImportProgress)) (int, error) { files := table.Files if len(files) == 0 && strings.TrimSpace(table.File) != "" { files = []string{table.File} @@ -299,7 +472,7 @@ func importTable(ctx context.Context, tx *sql.Tx, rootDir string, table TableMan } fileProgress := ImportProgress{Phase: "file_start", Table: table.Name, File: rel, FileIndex: index + 1, FileCount: len(files), TotalRows: table.Rows} reportImportProgress(progress, fileProgress) - rows, err := importJSONLGzip(ctx, tx, file, table.Name, filter) + rows, err := importJSONLGzip(ctx, tx, file, table.Name, filter, importRow) if err != nil { _ = file.Close() return totalRows, err @@ -315,7 +488,7 @@ func importTable(ctx context.Context, tx *sql.Tx, rootDir string, table TableMan return totalRows, nil } -func importJSONLGzip(ctx context.Context, tx *sql.Tx, reader io.Reader, table string, filter RowFilter) (int, error) { +func importJSONLGzip(ctx context.Context, tx *sql.Tx, reader io.Reader, table string, filter RowFilter, importRow RowImportFunc) (int, error) { gz, err := gzip.NewReader(reader) if err != nil { return 0, fmt.Errorf("open gzip for %s: %w", table, err) @@ -341,7 +514,11 @@ func importJSONLGzip(ctx context.Context, tx *sql.Tx, reader io.Reader, table st continue } } - if err := insertRow(ctx, tx, table, row); err != nil { + importFunc := importRow + if importFunc == nil { + importFunc = insertRow + } + if err := importFunc(ctx, tx, table, row); err != nil { return rows, err } rows++ @@ -358,6 +535,16 @@ func reportImportProgress(progress func(ImportProgress), event ImportProgress) { } } +func deleteImportTable(ctx context.Context, tx *sql.Tx, table string, deleteTable DeleteFunc) error { + if deleteTable != nil { + return deleteTable(ctx, tx, table) + } + if _, err := tx.ExecContext(ctx, "delete from "+store.QuoteIdent(table)); err != nil { + return fmt.Errorf("clear table %s: %w", table, err) + } + return nil +} + func insertRow(ctx context.Context, tx *sql.Tx, table string, row map[string]any) error { cols := make([]string, 0, len(row)) for col := range row { @@ -391,8 +578,11 @@ type shardWriter struct { nextShard int rowsInShard int files []string + fileManifests []FileManifest + currentRel string file *os.File counter *countingWriter + hasher hash.Hash gz *gzip.Writer } @@ -415,8 +605,10 @@ func (w *shardWriter) open() error { w.nextShard++ w.rowsInShard = 0 w.files = append(w.files, rel) + w.currentRel = rel w.file = file - w.counter = &countingWriter{w: file} + w.hasher = sha256.New() + w.counter = &countingWriter{w: io.MultiWriter(file, w.hasher)} w.gz = gzip.NewWriter(w.counter) return nil } @@ -459,6 +651,17 @@ func (w *shardWriter) close() error { if closeErr != nil { return fmt.Errorf("close shard: %w", closeErr) } + if w.currentRel != "" && w.counter != nil && w.hasher != nil { + w.fileManifests = append(w.fileManifests, FileManifest{ + Path: w.currentRel, + Rows: w.rowsInShard, + Size: w.counter.n, + SHA256: hex.EncodeToString(w.hasher.Sum(nil)), + }) + } + w.currentRel = "" + w.counter = nil + w.hasher = nil return nil } @@ -481,3 +684,119 @@ func exportValue(value any) any { return v } } + +func planTableIncrement(previous, current TableManifest) TableImportPlan { + if !sameStrings(previous.Columns, current.Columns) { + return TableImportPlan{Table: current, Mode: TableImportReplace, Files: tableFileManifests(current), Reason: "columns changed"} + } + previousFiles := tableFileManifests(previous) + currentFiles := tableFileManifests(current) + if len(previousFiles) == 0 && len(currentFiles) == 0 { + return TableImportPlan{Table: current, Mode: TableImportSkip, Reason: "unchanged"} + } + if !allFilesHaveFingerprints(previousFiles) || !allFilesHaveFingerprints(currentFiles) { + return TableImportPlan{Table: current, Mode: TableImportReplace, Files: currentFiles, Reason: "missing file fingerprints"} + } + if sameFileManifests(previousFiles, currentFiles) { + return TableImportPlan{Table: current, Mode: TableImportSkip, Reason: "unchanged"} + } + if len(currentFiles) < len(previousFiles) { + return TableImportPlan{Table: current, Mode: TableImportReplace, Files: currentFiles, Reason: "files removed"} + } + for i := 0; i < len(previousFiles)-1; i++ { + if !sameFileManifest(previousFiles[i], currentFiles[i]) { + return TableImportPlan{Table: current, Mode: TableImportReplace, Files: currentFiles, Reason: "non-tail file changed"} + } + } + changed := make([]FileManifest, 0, len(currentFiles)-len(previousFiles)+1) + if len(previousFiles) > 0 { + oldTail := previousFiles[len(previousFiles)-1] + newTail := currentFiles[len(previousFiles)-1] + if oldTail.Path != newTail.Path { + return TableImportPlan{Table: current, Mode: TableImportReplace, Files: currentFiles, Reason: "tail path changed"} + } + if !sameFileManifest(oldTail, newTail) { + if newTail.Rows < oldTail.Rows { + return TableImportPlan{Table: current, Mode: TableImportReplace, Files: currentFiles, Reason: "tail rows removed"} + } + changed = append(changed, newTail) + } + } + for i := len(previousFiles); i < len(currentFiles); i++ { + changed = append(changed, currentFiles[i]) + } + if len(changed) == 0 { + return TableImportPlan{Table: current, Mode: TableImportSkip, Reason: "unchanged"} + } + return TableImportPlan{Table: current, Mode: TableImportFiles, Files: changed, Reason: "tail files changed"} +} + +func tableFileManifests(table TableManifest) []FileManifest { + if len(table.FileManifests) > 0 { + out := make([]FileManifest, len(table.FileManifests)) + copy(out, table.FileManifests) + return out + } + files := table.Files + if len(files) == 0 && strings.TrimSpace(table.File) != "" { + files = []string{table.File} + } + out := make([]FileManifest, 0, len(files)) + for _, file := range files { + out = append(out, FileManifest{Path: file}) + } + return out +} + +func allFilesHaveFingerprints(files []FileManifest) bool { + for _, file := range files { + if file.Path == "" || file.SHA256 == "" { + return false + } + } + return true +} + +func sameFileManifests(a, b []FileManifest) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if !sameFileManifest(a[i], b[i]) { + return false + } + } + return true +} + +func sameFileManifest(a, b FileManifest) bool { + return a.Path == b.Path && a.Rows == b.Rows && a.Size == b.Size && a.SHA256 == b.SHA256 +} + +func fileManifestPaths(files []FileManifest) []string { + paths := make([]string, 0, len(files)) + for _, file := range files { + paths = append(paths, file.Path) + } + return paths +} + +func fileManifestRows(files []FileManifest) int { + rows := 0 + for _, file := range files { + rows += file.Rows + } + return rows +} + +func sameStrings(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/snapshot/snapshot_test.go b/snapshot/snapshot_test.go index cea1829..d288475 100644 --- a/snapshot/snapshot_test.go +++ b/snapshot/snapshot_test.go @@ -10,7 +10,7 @@ import ( "testing" "time" - "github.com/vincentkoc/crawlkit/store" + "github.com/openclaw/crawlkit/store" ) func TestExportImportTablesWithFilter(t *testing.T) { @@ -95,6 +95,158 @@ func TestExportRotatesShards(t *testing.T) { if len(manifest.Tables[0].Files) < 2 { t.Fatalf("expected multiple shards, got %+v", manifest.Tables[0].Files) } + if len(manifest.Tables[0].FileManifests) != len(manifest.Tables[0].Files) { + t.Fatalf("file manifests = %+v, files = %+v", manifest.Tables[0].FileManifests, manifest.Tables[0].Files) + } + for _, file := range manifest.Tables[0].FileManifests { + if file.Path == "" || file.Rows == 0 || file.Size == 0 || len(file.SHA256) != 64 { + t.Fatalf("bad file manifest = %+v", file) + } + } +} + +func TestPlanIncrementalImportDetectsTailFiles(t *testing.T) { + previous := Manifest{ + Version: 1, + Tables: []TableManifest{{ + Name: "things", + Columns: []string{"id", "body"}, + Rows: 2, + Files: []string{"tables/things/000000.jsonl.gz"}, + FileManifests: []FileManifest{{ + Path: "tables/things/000000.jsonl.gz", + Rows: 2, + Size: 100, + SHA256: "old", + }}, + }}, + } + current := Manifest{ + Version: 1, + Tables: []TableManifest{{ + Name: "things", + Columns: []string{"id", "body"}, + Rows: 3, + Files: []string{"tables/things/000000.jsonl.gz"}, + FileManifests: []FileManifest{{ + Path: "tables/things/000000.jsonl.gz", + Rows: 3, + Size: 120, + SHA256: "new", + }}, + }}, + } + plan := PlanIncrementalImport(previous, current) + if plan.Full || len(plan.Tables) != 1 { + t.Fatalf("plan = %+v", plan) + } + table := plan.Tables[0] + if table.Mode != TableImportFiles || len(table.Files) != 1 || table.Files[0].SHA256 != "new" { + t.Fatalf("table plan = %+v", table) + } +} + +func TestPlanIncrementalImportReplacesUnsafeChanges(t *testing.T) { + previous := Manifest{ + Version: 1, + Tables: []TableManifest{{ + Name: "things", + Columns: []string{"id", "body"}, + Rows: 2, + Files: []string{"tables/things/000000.jsonl.gz"}, + FileManifests: []FileManifest{{ + Path: "tables/things/000000.jsonl.gz", + Rows: 2, + Size: 100, + SHA256: "old", + }}, + }}, + } + current := Manifest{ + Version: 1, + Tables: []TableManifest{{ + Name: "things", + Columns: []string{"id", "body"}, + Rows: 1, + Files: []string{"tables/things/000000.jsonl.gz"}, + FileManifests: []FileManifest{{ + Path: "tables/things/000000.jsonl.gz", + Rows: 1, + Size: 100, + SHA256: "new", + }}, + }}, + } + plan := PlanIncrementalImport(previous, current) + if plan.Full || len(plan.Tables) != 1 || plan.Tables[0].Mode != TableImportReplace { + t.Fatalf("plan = %+v", plan) + } +} + +func TestImportIncrementalImportsOnlyPlannedFiles(t *testing.T) { + ctx := context.Background() + src, err := store.Open(ctx, store.Options{ + Path: filepath.Join(t.TempDir(), "src.db"), + Schema: `create table things(id text primary key, body text not null);`, + }) + if err != nil { + t.Fatal(err) + } + defer src.Close() + mustExec(t, src.DB(), `insert into things(id, body) values('one', 'same')`) + mustExec(t, src.DB(), `insert into things(id, body) values('two', 'old')`) + root := t.TempDir() + previous, err := Export(ctx, ExportOptions{ + DB: src.DB(), + RootDir: root, + Tables: []string{"things"}, + }) + if err != nil { + t.Fatal(err) + } + + dst, err := store.Open(ctx, store.Options{ + Path: filepath.Join(t.TempDir(), "dst.db"), + Schema: `create table things(id text primary key, body text not null);`, + }) + if err != nil { + t.Fatal(err) + } + defer dst.Close() + if _, err := Import(ctx, ImportOptions{DB: dst.DB(), RootDir: root}); err != nil { + t.Fatal(err) + } + mustExec(t, dst.DB(), `insert into things(id, body) values('local', 'keep')`) + + mustExec(t, src.DB(), `update things set body = 'new' where id = 'two'`) + mustExec(t, src.DB(), `insert into things(id, body) values('three', 'added')`) + current, err := Export(ctx, ExportOptions{ + DB: src.DB(), + RootDir: root, + Tables: []string{"things"}, + }) + if err != nil { + t.Fatal(err) + } + _, plan, err := ImportIncremental(ctx, IncrementalImportOptions{ + DB: dst.DB(), + RootDir: root, + Previous: previous, + Current: current, + }) + if err != nil { + t.Fatal(err) + } + if len(plan.Tables) != 1 || plan.Tables[0].Mode != TableImportFiles { + t.Fatalf("plan = %+v", plan) + } + var got string + if err := dst.DB().QueryRowContext(ctx, `select group_concat(id || ':' || body, ',') from (select id, body from things order by id)`).Scan(&got); err != nil { + t.Fatal(err) + } + if got != "local:keep,one:same,three:added,two:new" { + t.Fatalf("things = %q", got) + } } func TestImportHooks(t *testing.T) {