diff --git a/CHANGELOG.md b/CHANGELOG.md index 5db1b64..e5b3075 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ ## Unreleased +## v0.5.1 - 2026-05-08 + +- Add reusable `backup` helpers for age identities, encrypted JSONL/Gzip shards, + manifests, recipient tracking, shard hash verification, and stale shard + cleanup. + - Add reusable `embed` providers for OpenAI, OpenAI-compatible endpoints, Ollama, and llama.cpp, including probe diagnostics and rate-limit errors. - Add reusable `vector` helpers for float32 blobs, dimension validation, diff --git a/README.md b/README.md index 689d4d6..781a256 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ See `docs/boundary.md` for the crawlkit-versus-app ownership boundary. - `config`: standard TOML config paths, runtime dirs, and token diagnostics. - `store`: SQLite open/read-only/transaction/query helpers. - `snapshot`: `manifest.json` plus JSONL/Gzip table snapshot export, file fingerprints, full import, and planned incremental shard import. +- `backup`: age-encrypted JSONL/Gzip shards, backup manifests, recipient/identity helpers, and shard restore verification. - `mirror`: clone/init/pull/commit/push helpers for private snapshot repos. - `state`: generic crawler cursor and freshness records. - `embed`: reusable OpenAI-compatible, Ollama, and llama.cpp embedding providers plus local probe diagnostics. diff --git a/backup/backup.go b/backup/backup.go new file mode 100644 index 0000000..c961789 --- /dev/null +++ b/backup/backup.go @@ -0,0 +1,339 @@ +package backup + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "os" + "path" + "path/filepath" + "reflect" + "sort" + "strings" + "time" +) + +const FormatVersion = 1 + +type Config struct { + Repo string + Identity string + Recipients []string +} + +type Manifest struct { + Format int `json:"format"` + Encrypted bool `json:"encrypted"` + Exported time.Time `json:"exported"` + Recipients []string `json:"recipients,omitempty"` + Counts map[string]int `json:"counts"` + Shards []ShardEntry `json:"shards"` +} + +type Shard struct { + Table string + Path string + Rows any +} + +type ShardEntry struct { + Table string `json:"table"` + Path string `json:"path"` + Rows int `json:"rows"` + SHA256 string `json:"sha256"` + Bytes int64 `json:"bytes"` +} + +type DecodedShard struct { + Entry ShardEntry + Plaintext []byte +} + +func WriteSnapshot(ctx context.Context, cfg Config, shards []Shard, old Manifest) (Manifest, error) { + _ = ctx + recipients := normalizedStrings(cfg.Recipients) + reuseEncrypted := sameStrings(old.Recipients, recipients) + manifest := Manifest{ + Format: FormatVersion, + Encrypted: true, + Exported: time.Now().UTC(), + Recipients: recipients, + Counts: map[string]int{}, + } + for _, shard := range shards { + plaintext, rows, err := EncodeJSONL(shard.Rows) + if err != nil { + return Manifest{}, fmt.Errorf("encode %s: %w", shard.Table, err) + } + entry, err := WriteShard(cfg, old, shard.Table, shard.Path, plaintext, rows, reuseEncrypted) + if err != nil { + return Manifest{}, err + } + manifest.Counts[shard.Table] += rows + manifest.Shards = append(manifest.Shards, entry) + } + sort.Slice(manifest.Shards, func(i, j int) bool { return manifest.Shards[i].Path < manifest.Shards[j].Path }) + if EquivalentManifest(old, manifest) { + return old, nil + } + if err := RemoveStaleShards(cfg.Repo, manifest.Shards); err != nil { + return Manifest{}, err + } + if err := WriteManifest(cfg.Repo, manifest); err != nil { + return Manifest{}, err + } + return manifest, nil +} + +func ReadSnapshot(cfg Config, manifest Manifest) ([]DecodedShard, error) { + if manifest.Format != FormatVersion { + return nil, fmt.Errorf("unsupported backup format %d", manifest.Format) + } + var out []DecodedShard + for _, shard := range manifest.Shards { + plaintext, err := DecryptShardFile(cfg, shard) + if err != nil { + return nil, err + } + if got := SHA256Hex(plaintext); got != shard.SHA256 { + return nil, fmt.Errorf("backup shard hash mismatch for %s", shard.Path) + } + out = append(out, DecodedShard{Entry: shard, Plaintext: plaintext}) + } + return out, nil +} + +func WriteShard(cfg Config, old Manifest, table, rel string, plaintext []byte, rows int, reuseEncrypted bool) (ShardEntry, error) { + hash := SHA256Hex(plaintext) + target, err := ResolveShardPath(cfg.Repo, rel) + if err != nil { + return ShardEntry{}, err + } + if oldEntry, ok := old.Entry(rel); reuseEncrypted && ok && oldEntry.SHA256 == hash { + if info, err := os.Stat(target); err == nil { + oldEntry.Bytes = info.Size() + return oldEntry, nil + } + } + encrypted, _, err := EncryptShard(plaintext, cfg.Recipients) + if err != nil { + return ShardEntry{}, err + } + if err := os.MkdirAll(filepath.Dir(target), 0o700); err != nil { + return ShardEntry{}, err + } + if err := os.WriteFile(target, encrypted, 0o600); err != nil { + return ShardEntry{}, err + } + return ShardEntry{Table: table, Path: rel, Rows: rows, SHA256: hash, Bytes: int64(len(encrypted))}, nil +} + +func DecryptShardFile(cfg Config, shard ShardEntry) ([]byte, error) { + target, err := ResolveShardPath(cfg.Repo, shard.Path) + if err != nil { + return nil, err + } + ciphertext, err := os.ReadFile(target) // #nosec G304 -- ResolveShardPath confines manifest-controlled paths below data/. + if err != nil { + return nil, err + } + return DecryptShard(ciphertext, cfg.Identity) +} + +func ResolveShardPath(repo, rel string) (string, error) { + clean := path.Clean(strings.TrimSpace(rel)) + if clean == "." || clean == ".." || strings.HasPrefix(clean, "../") || path.IsAbs(clean) { + return "", fmt.Errorf("backup shard path escapes backup root: %s", rel) + } + if !strings.HasPrefix(clean, "data/") || !strings.HasSuffix(clean, ".age") { + return "", fmt.Errorf("invalid backup shard path: %s", rel) + } + full := filepath.Join(repo, filepath.FromSlash(clean)) + root := filepath.Clean(filepath.Join(repo, "data")) + parent := filepath.Clean(filepath.Dir(full)) + if parent != root && !strings.HasPrefix(parent, root+string(filepath.Separator)) { + return "", fmt.Errorf("backup shard path escapes backup root: %s", rel) + } + return full, nil +} + +func EncodeJSONL(rows any) ([]byte, int, error) { + value := reflect.ValueOf(rows) + if value.Kind() != reflect.Slice { + return nil, 0, fmt.Errorf("unsupported JSONL rows %T", rows) + } + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + for i := 0; i < value.Len(); i++ { + if err := enc.Encode(value.Index(i).Interface()); err != nil { + return nil, 0, err + } + } + return buf.Bytes(), value.Len(), nil +} + +func DecodeJSONL[T any](plaintext []byte, out *[]T) error { + scanner := bufio.NewScanner(bytes.NewReader(plaintext)) + scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024) + for scanner.Scan() { + var value T + if err := json.Unmarshal(scanner.Bytes(), &value); err != nil { + return err + } + *out = append(*out, value) + } + return scanner.Err() +} + +func ReadManifest(repo string) (Manifest, error) { + data, err := os.ReadFile(filepath.Join(repo, "manifest.json")) // #nosec G304 -- repo is configured by caller. + if err != nil { + return Manifest{}, err + } + var manifest Manifest + if err := json.Unmarshal(data, &manifest); err != nil { + return Manifest{}, err + } + return manifest, nil +} + +func WriteManifest(repo string, manifest Manifest) error { + data, err := json.MarshalIndent(manifest, "", " ") + if err != nil { + return err + } + data = append(data, '\n') + return os.WriteFile(filepath.Join(repo, "manifest.json"), data, 0o600) +} + +func (m Manifest) Entry(path string) (ShardEntry, bool) { + for _, shard := range m.Shards { + if shard.Path == path { + return shard, true + } + } + return ShardEntry{}, false +} + +func EquivalentManifest(a, b Manifest) bool { + if a.Format != b.Format || a.Encrypted != b.Encrypted || !sameStrings(a.Recipients, b.Recipients) || !sameCounts(a.Counts, b.Counts) || len(a.Shards) != len(b.Shards) { + return false + } + for i := range a.Shards { + left, right := a.Shards[i], b.Shards[i] + left.Bytes, right.Bytes = 0, 0 + if left != right { + return false + } + } + return true +} + +func RemoveStaleShards(repo string, shards []ShardEntry) error { + keep := map[string]struct{}{} + for _, shard := range shards { + keep[filepath.Clean(filepath.Join(repo, filepath.FromSlash(shard.Path)))] = struct{}{} + } + root := filepath.Join(repo, "data") + if _, err := os.Stat(root); os.IsNotExist(err) { + return nil + } + var stale []string + if err := filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error { + if err != nil || d == nil || d.IsDir() { + return err + } + if !strings.HasSuffix(path, ".age") { + return nil + } + clean := filepath.Clean(path) + if _, ok := keep[clean]; ok { + return nil + } + stale = append(stale, clean) + return nil + }); err != nil { + return err + } + for _, path := range stale { + rel, err := filepath.Rel(root, path) + if err != nil || rel == "." || strings.HasPrefix(rel, ".."+string(filepath.Separator)) || filepath.IsAbs(rel) { + return fmt.Errorf("stale shard path escapes backup root: %s", path) + } + if err := os.Remove(path); err != nil { + return err + } + } + return nil +} + +func EncryptShard(plaintext []byte, recipients []string) ([]byte, string, error) { + return encryptShard(plaintext, recipients) +} + +func DecryptShard(ciphertext []byte, identityPath string) ([]byte, error) { + return decryptShard(ciphertext, identityPath) +} + +func SHA256Hex(data []byte) string { + return sha256Hex(data) +} + +func normalizedStrings(values []string) []string { + seen := map[string]struct{}{} + out := make([]string, 0, len(values)) + for _, value := range values { + value = strings.TrimSpace(value) + if value == "" { + continue + } + if _, ok := seen[value]; ok { + continue + } + seen[value] = struct{}{} + out = append(out, value) + } + sort.Strings(out) + return out +} + +func sameStrings(a, b []string) bool { + a, b = normalizedStrings(a), normalizedStrings(b) + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +func sameCounts(a, b map[string]int) bool { + if len(a) != len(b) { + return false + } + for key, left := range a { + if b[key] != left { + return false + } + } + return true +} + +func expandHome(p string) string { + if p == "~" { + if home, err := os.UserHomeDir(); err == nil { + return home + } + } + if after, ok := strings.CutPrefix(p, "~/"); ok { + if home, err := os.UserHomeDir(); err == nil { + return filepath.Join(home, after) + } + } + return p +} diff --git a/backup/backup_test.go b/backup/backup_test.go new file mode 100644 index 0000000..4fe57fd --- /dev/null +++ b/backup/backup_test.go @@ -0,0 +1,54 @@ +package backup + +import ( + "context" + "os" + "path/filepath" + "testing" +) + +type row struct { + ID string `json:"id"` + Body string `json:"body"` +} + +func TestWriteReadEncryptedSnapshot(t *testing.T) { + dir := t.TempDir() + identity := filepath.Join(dir, "age.key") + recipient, err := EnsureIdentity(identity) + if err != nil { + t.Fatal(err) + } + cfg := Config{Repo: filepath.Join(dir, "repo"), Identity: identity, Recipients: []string{recipient}} + if err := os.MkdirAll(cfg.Repo, 0o700); err != nil { + t.Fatal(err) + } + manifest, err := WriteSnapshot(context.Background(), cfg, []Shard{ + {Table: "messages", Path: "data/messages/2026/05.jsonl.gz.age", Rows: []row{{ID: "1", Body: "hello"}}}, + }, Manifest{}) + if err != nil { + t.Fatal(err) + } + if manifest.Counts["messages"] != 1 || len(manifest.Shards) != 1 { + t.Fatalf("unexpected manifest: %+v", manifest) + } + decoded, err := ReadSnapshot(cfg, manifest) + if err != nil { + t.Fatal(err) + } + var rows []row + if err := DecodeJSONL(decoded[0].Plaintext, &rows); err != nil { + t.Fatal(err) + } + if len(rows) != 1 || rows[0].Body != "hello" { + t.Fatalf("unexpected rows: %+v", rows) + } +} + +func TestResolveShardPathRejectsEscapes(t *testing.T) { + for _, rel := range []string{"../x.age", "data/../x.age", "data/x.txt", "/data/x.age"} { + if _, err := ResolveShardPath(t.TempDir(), rel); err == nil { + t.Fatalf("expected error for %q", rel) + } + } +} diff --git a/backup/crypto.go b/backup/crypto.go new file mode 100644 index 0000000..2607fcd --- /dev/null +++ b/backup/crypto.go @@ -0,0 +1,141 @@ +package backup + +import ( + "bytes" + "compress/gzip" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "time" + + "filippo.io/age" +) + +func EnsureIdentity(path string) (string, error) { + path = expandHome(path) + if data, err := os.ReadFile(path); err == nil { // #nosec G304 -- path is the configured local age identity file. + identity, err := parseIdentity(data) + if err != nil { + return "", err + } + return identity.Recipient().String(), nil + } else if !os.IsNotExist(err) { + return "", err + } + + identity, err := age.GenerateX25519Identity() + if err != nil { + return "", err + } + if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return "", err + } + data := []byte(identity.String() + "\n") + if err := os.WriteFile(path, data, 0o600); err != nil { + return "", err + } + return identity.Recipient().String(), nil +} + +func RecipientFromIdentity(path string) (string, error) { + data, err := os.ReadFile(expandHome(path)) + if err != nil { + return "", err + } + identity, err := parseIdentity(data) + if err != nil { + return "", err + } + return identity.Recipient().String(), nil +} + +func encryptShard(plaintext []byte, recipientStrings []string) ([]byte, string, error) { + recipients, err := parseRecipients(recipientStrings) + if err != nil { + return nil, "", err + } + var compressed bytes.Buffer + gz := gzip.NewWriter(&compressed) + gz.ModTime = time.Unix(0, 0).UTC() + _, _ = gz.Write(plaintext) + _ = gz.Close() + + var encrypted bytes.Buffer + w, err := age.Encrypt(&encrypted, recipients...) + if err != nil { + return nil, "", err + } + _, _ = w.Write(compressed.Bytes()) + if err := w.Close(); err != nil { + return nil, "", err + } + return encrypted.Bytes(), sha256Hex(plaintext), nil +} + +func decryptShard(ciphertext []byte, identityPath string) ([]byte, error) { + data, err := os.ReadFile(expandHome(identityPath)) // #nosec G304 -- path is the configured local age identity file. + if err != nil { + return nil, err + } + identity, err := parseIdentity(data) + if err != nil { + return nil, err + } + r, err := age.Decrypt(bytes.NewReader(ciphertext), identity) + if err != nil { + return nil, err + } + gz, err := gzip.NewReader(r) + if err != nil { + return nil, err + } + defer func() { _ = gz.Close() }() + plaintext, err := io.ReadAll(gz) + if err != nil { + return nil, err + } + return plaintext, nil +} + +func parseRecipients(values []string) ([]age.Recipient, error) { + var out []age.Recipient + for _, value := range values { + value = strings.TrimSpace(value) + if value == "" { + continue + } + recipient, err := age.ParseX25519Recipient(value) + if err != nil { + return nil, fmt.Errorf("parse age recipient: %w", err) + } + out = append(out, recipient) + } + if len(out) == 0 { + return nil, fmt.Errorf("at least one age recipient is required") + } + return out, nil +} + +func parseIdentity(data []byte) (*age.X25519Identity, error) { + for _, line := range strings.Split(string(data), "\n") { + line = strings.TrimSpace(line) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + identity, err := age.ParseX25519Identity(line) + if err != nil { + return nil, fmt.Errorf("parse age identity: %w", err) + } + return identity, nil + } + return nil, fmt.Errorf("age identity file is empty") +} + +func sha256Hex(data []byte) string { + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:]) +} diff --git a/go.mod b/go.mod index 69c6e83..7e85ac0 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/openclaw/crawlkit go 1.26.2 require ( + filippo.io/age v1.3.1 github.com/charmbracelet/bubbles v1.0.0 github.com/charmbracelet/bubbletea v1.3.10 github.com/charmbracelet/lipgloss v1.1.0 @@ -14,6 +15,7 @@ require ( ) require ( + filippo.io/hpke v0.4.0 // indirect github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/charmbracelet/colorprofile v0.4.1 // indirect github.com/charmbracelet/x/cellbuf v0.0.15 // indirect @@ -32,8 +34,9 @@ require ( github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect + golang.org/x/crypto v0.45.0 // indirect golang.org/x/sys v0.42.0 // indirect - golang.org/x/text v0.3.8 // indirect + golang.org/x/text v0.31.0 // indirect modernc.org/libc v1.72.0 // indirect modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect diff --git a/go.sum b/go.sum index 9a25b75..2404760 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,9 @@ +c2sp.org/CCTV/age v0.0.0-20251208015420-e9274a7bdbfd h1:ZLsPO6WdZ5zatV4UfVpr7oAwLGRZ+sebTUruuM4Ra3M= +c2sp.org/CCTV/age v0.0.0-20251208015420-e9274a7bdbfd/go.mod h1:SrHC2C7r5GkDk8R+NFVzYy/sdj0Ypg9htaPXQq5Cqeo= +filippo.io/age v1.3.1 h1:hbzdQOJkuaMEpRCLSN1/C5DX74RPcNCk6oqhKMXmZi0= +filippo.io/age v1.3.1/go.mod h1:EZorDTYUxt836i3zdori5IJX/v2Lj6kWFU0cfh6C0D4= +filippo.io/hpke v0.4.0 h1:p575VVQ6ted4pL+it6M00V/f2qTZITO0zgmdKCkd5+A= +filippo.io/hpke v0.4.0/go.mod h1:EmAN849/P3qdeK+PCMkDpDm83vRHM5cDipBJ8xbQLVY= github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiErDT4WkJ2k= github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8= github.com/charmbracelet/bubbles v1.0.0 h1:12J8/ak/uCZEMQ6KU7pcfwceyjLlWsDLAxB5fXonfvc= @@ -52,6 +58,8 @@ github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= +golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= +golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= @@ -61,8 +69,8 @@ golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= -golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY= -golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= modernc.org/cc/v4 v4.27.3 h1:uNCgn37E5U09mTv1XgskEVUJ8ADKpmFMPxzGJ0TSo+U=