feat: add telegram archive cli
This commit is contained in:
parent
002e787416
commit
af7de2621d
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
/telecrawl
|
||||
coverage.out
|
||||
dist/
|
||||
.DS_Store
|
||||
50
README.md
Normal file
50
README.md
Normal file
@ -0,0 +1,50 @@
|
||||
# telecrawl
|
||||
|
||||
Telegram Desktop archive CLI.
|
||||
|
||||
`telecrawl` reads your local Telegram Desktop `tdata` through `opentele2` /
|
||||
Telethon, stores a searchable SQLite archive in `~/.telecrawl/telecrawl.db`,
|
||||
and can back it up to GitHub as encrypted age shards.
|
||||
|
||||
## Setup
|
||||
|
||||
```bash
|
||||
telecrawl deps install
|
||||
```
|
||||
|
||||
## Import
|
||||
|
||||
```bash
|
||||
telecrawl import
|
||||
telecrawl status
|
||||
telecrawl chats --limit 20
|
||||
telecrawl messages --limit 20
|
||||
telecrawl search "query"
|
||||
```
|
||||
|
||||
Import limits default to the latest 200 dialogs and 500 messages per dialog.
|
||||
Use `0` for no limit:
|
||||
|
||||
```bash
|
||||
telecrawl import --dialogs-limit 0 --messages-limit 0
|
||||
```
|
||||
|
||||
## Backup
|
||||
|
||||
Create `https://github.com/steipete/backup-telecrawl` first, then:
|
||||
|
||||
```bash
|
||||
telecrawl backup init
|
||||
telecrawl backup push
|
||||
```
|
||||
|
||||
Backup payloads are encrypted before Git sees them. Cleartext Git metadata is
|
||||
limited to manifest counts, shard paths, export time, public age recipients,
|
||||
encrypted sizes, and hashes.
|
||||
|
||||
Restore:
|
||||
|
||||
```bash
|
||||
telecrawl backup pull
|
||||
telecrawl status
|
||||
```
|
||||
21
cmd/telecrawl/main.go
Normal file
21
cmd/telecrawl/main.go
Normal file
@ -0,0 +1,21 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/openclaw/telecrawl/internal/cli"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
err := cli.Run(ctx, os.Args[1:], os.Stdout, os.Stderr)
|
||||
stop()
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err.Error())
|
||||
os.Exit(cli.ExitCode(err))
|
||||
}
|
||||
}
|
||||
23
go.mod
Normal file
23
go.mod
Normal file
@ -0,0 +1,23 @@
|
||||
module github.com/openclaw/telecrawl
|
||||
|
||||
go 1.26.2
|
||||
|
||||
require (
|
||||
filippo.io/age v1.3.1
|
||||
modernc.org/sqlite v1.50.0
|
||||
)
|
||||
|
||||
require (
|
||||
filippo.io/hpke v0.4.0 // indirect
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.22 // indirect
|
||||
github.com/ncruces/go-strftime v1.0.0 // indirect
|
||||
github.com/openclaw/crawlkit v0.5.1
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
golang.org/x/crypto v0.45.0 // indirect
|
||||
golang.org/x/sys v0.42.0 // indirect
|
||||
modernc.org/libc v1.72.0 // indirect
|
||||
modernc.org/mathutil v1.7.1 // indirect
|
||||
modernc.org/memory v1.11.0 // indirect
|
||||
)
|
||||
60
go.sum
Normal file
60
go.sum
Normal file
@ -0,0 +1,60 @@
|
||||
c2sp.org/CCTV/age v0.0.0-20251208015420-e9274a7bdbfd h1:ZLsPO6WdZ5zatV4UfVpr7oAwLGRZ+sebTUruuM4Ra3M=
|
||||
c2sp.org/CCTV/age v0.0.0-20251208015420-e9274a7bdbfd/go.mod h1:SrHC2C7r5GkDk8R+NFVzYy/sdj0Ypg9htaPXQq5Cqeo=
|
||||
filippo.io/age v1.3.1 h1:hbzdQOJkuaMEpRCLSN1/C5DX74RPcNCk6oqhKMXmZi0=
|
||||
filippo.io/age v1.3.1/go.mod h1:EZorDTYUxt836i3zdori5IJX/v2Lj6kWFU0cfh6C0D4=
|
||||
filippo.io/hpke v0.4.0 h1:p575VVQ6ted4pL+it6M00V/f2qTZITO0zgmdKCkd5+A=
|
||||
filippo.io/hpke v0.4.0/go.mod h1:EmAN849/P3qdeK+PCMkDpDm83vRHM5cDipBJ8xbQLVY=
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs=
|
||||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/mattn/go-isatty v0.0.22 h1:j8l17JJ9i6VGPUFUYoTUKPSgKe/83EYU2zBC7YNKMw4=
|
||||
github.com/mattn/go-isatty v0.0.22/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4=
|
||||
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
|
||||
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||
github.com/openclaw/crawlkit v0.5.1 h1:35RcK1mhQRAX79inlrkr2F7f0C/21HIOzQJhpS+fccA=
|
||||
github.com/openclaw/crawlkit v0.5.1/go.mod h1:nZ6mqFjdTODgYb15Q/NIAB8Ue/94YhvAAaUQy0NtdqY=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
|
||||
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
|
||||
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
|
||||
golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8=
|
||||
golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w=
|
||||
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
|
||||
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
|
||||
golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
|
||||
modernc.org/cc/v4 v4.27.3 h1:uNCgn37E5U09mTv1XgskEVUJ8ADKpmFMPxzGJ0TSo+U=
|
||||
modernc.org/cc/v4 v4.27.3/go.mod h1:3YjcbCqhoTTHPycJDRl2WZKKFj0nwcOIPBfEZK0Hdk8=
|
||||
modernc.org/ccgo/v4 v4.32.4 h1:L5OB8rpEX4ZsXEQwGozRfJyJSFHbbNVOoQ59DU9/KuU=
|
||||
modernc.org/ccgo/v4 v4.32.4/go.mod h1:lY7f+fiTDHfcv6YlRgSkxYfhs+UvOEEzj49jAn2TOx0=
|
||||
modernc.org/fileutil v1.4.0 h1:j6ZzNTftVS054gi281TyLjHPp6CPHr2KCxEXjEbD6SM=
|
||||
modernc.org/fileutil v1.4.0/go.mod h1:EqdKFDxiByqxLk8ozOxObDSfcVOv/54xDs/DUHdvCUU=
|
||||
modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI=
|
||||
modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito=
|
||||
modernc.org/gc/v3 v3.1.2 h1:ZtDCnhonXSZexk/AYsegNRV1lJGgaNZJuKjJSWKyEqo=
|
||||
modernc.org/gc/v3 v3.1.2/go.mod h1:HFK/6AGESC7Ex+EZJhJ2Gni6cTaYpSMmU/cT9RmlfYY=
|
||||
modernc.org/goabi0 v0.2.0 h1:HvEowk7LxcPd0eq6mVOAEMai46V+i7Jrj13t4AzuNks=
|
||||
modernc.org/goabi0 v0.2.0/go.mod h1:CEFRnnJhKvWT1c1JTI3Avm+tgOWbkOu5oPA8eH8LnMI=
|
||||
modernc.org/libc v1.72.0 h1:IEu559v9a0XWjw0DPoVKtXpO2qt5NVLAnFaBbjq+n8c=
|
||||
modernc.org/libc v1.72.0/go.mod h1:tTU8DL8A+XLVkEY3x5E/tO7s2Q/q42EtnNWda/L5QhQ=
|
||||
modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU=
|
||||
modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg=
|
||||
modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI=
|
||||
modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw=
|
||||
modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8=
|
||||
modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns=
|
||||
modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w=
|
||||
modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE=
|
||||
modernc.org/sqlite v1.50.0 h1:eMowQSWLK0MeiQTdmz3lqoF5dqclujdlIKeJA11+7oM=
|
||||
modernc.org/sqlite v1.50.0/go.mod h1:m0w8xhwYUVY3H6pSDwc3gkJ/irZT/0YEXwBlhaxQEew=
|
||||
modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0=
|
||||
modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A=
|
||||
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
|
||||
modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
|
||||
579
internal/backup/backup.go
Normal file
579
internal/backup/backup.go
Normal file
@ -0,0 +1,579 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/openclaw/telecrawl/internal/store"
|
||||
)
|
||||
|
||||
const formatVersion = 1
|
||||
|
||||
type Manifest struct {
|
||||
Format int `json:"format"`
|
||||
Encrypted bool `json:"encrypted"`
|
||||
Exported time.Time `json:"exported"`
|
||||
Recipients []string `json:"recipients,omitempty"`
|
||||
Counts Counts `json:"counts"`
|
||||
Shards []ShardEntry `json:"shards"`
|
||||
}
|
||||
|
||||
type Counts struct {
|
||||
Contacts int `json:"contacts"`
|
||||
Chats int `json:"chats"`
|
||||
Groups int `json:"groups"`
|
||||
Participants int `json:"participants"`
|
||||
Messages int `json:"messages"`
|
||||
}
|
||||
|
||||
type ShardEntry struct {
|
||||
Table string `json:"table"`
|
||||
Path string `json:"path"`
|
||||
Rows int `json:"rows"`
|
||||
SHA256 string `json:"sha256"`
|
||||
Bytes int64 `json:"bytes"`
|
||||
}
|
||||
|
||||
type Result struct {
|
||||
Repo string `json:"repo"`
|
||||
Changed bool `json:"changed"`
|
||||
Encrypted bool `json:"encrypted"`
|
||||
Shards int `json:"shards"`
|
||||
Messages int `json:"messages"`
|
||||
}
|
||||
|
||||
func Init(ctx context.Context, opts Options) (Config, string, error) {
|
||||
cfg, err := ResolveOptions(opts)
|
||||
if err != nil {
|
||||
return Config{}, "", err
|
||||
}
|
||||
recipient, err := EnsureIdentity(cfg.Identity)
|
||||
if err != nil {
|
||||
return Config{}, "", err
|
||||
}
|
||||
if len(cfg.Recipients) == 0 {
|
||||
cfg.Recipients = []string{recipient}
|
||||
}
|
||||
if err := SaveConfig(opts.ConfigPath, cfg); err != nil {
|
||||
return Config{}, "", err
|
||||
}
|
||||
if err := ensureRepo(ctx, cfg); err != nil {
|
||||
return Config{}, "", err
|
||||
}
|
||||
if err := writeBackupReadme(cfg.Repo); err != nil {
|
||||
return Config{}, "", err
|
||||
}
|
||||
_, err = commitAndPush(ctx, cfg, "docs: describe encrypted telecrawl backup", opts.Push)
|
||||
return cfg, recipient, err
|
||||
}
|
||||
|
||||
func Push(ctx context.Context, st *store.Store, opts Options) (Result, error) {
|
||||
cfg, err := ResolveOptions(opts)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
if len(cfg.Recipients) == 0 {
|
||||
recipient, err := RecipientFromIdentity(cfg.Identity)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
cfg.Recipients = []string{recipient}
|
||||
}
|
||||
if err := ensureRepo(ctx, cfg); err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
if err := writeBackupReadme(cfg.Repo); err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
oldManifest, _ := readManifest(cfg.Repo)
|
||||
data, err := st.ExportAll(ctx)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
manifest, err := writeSnapshot(ctx, cfg, data, oldManifest)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
changed, err := commitAndPush(ctx, cfg, "sync: update encrypted telecrawl backup", opts.Push)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
return Result{Repo: cfg.Repo, Changed: changed, Encrypted: true, Shards: len(manifest.Shards), Messages: manifest.Counts.Messages}, nil
|
||||
}
|
||||
|
||||
func Pull(ctx context.Context, st *store.Store, opts Options) (Result, error) {
|
||||
cfg, err := ResolveOptions(opts)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
if err := ensureRepo(ctx, cfg); err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
manifest, err := readManifest(cfg.Repo)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
data, err := readSnapshot(cfg, manifest)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
if err := data.Validate(); err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
if err := st.ImportSnapshot(ctx, data, "backup:"+cfg.Repo, manifest.Exported); err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
return Result{Repo: cfg.Repo, Changed: true, Encrypted: manifest.Encrypted, Shards: len(manifest.Shards), Messages: len(data.Messages)}, nil
|
||||
}
|
||||
|
||||
func Status(ctx context.Context, opts Options) (Manifest, string, error) {
|
||||
cfg, err := ResolveOptions(opts)
|
||||
if err != nil {
|
||||
return Manifest{}, "", err
|
||||
}
|
||||
if err := ensureRepo(ctx, cfg); err != nil {
|
||||
return Manifest{}, "", err
|
||||
}
|
||||
manifest, err := readManifest(cfg.Repo)
|
||||
if err != nil {
|
||||
return Manifest{}, "", err
|
||||
}
|
||||
return manifest, cfg.Repo, nil
|
||||
}
|
||||
|
||||
func writeSnapshot(ctx context.Context, cfg Config, data store.SnapshotData, old Manifest) (Manifest, error) {
|
||||
_ = ctx
|
||||
recipients := normalizedStrings(cfg.Recipients)
|
||||
reuseEncrypted := sameStrings(old.Recipients, recipients)
|
||||
var shards []ShardEntry
|
||||
add := func(table, rel string, rows any) error {
|
||||
plaintext, count, err := encodeJSONL(rows)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
entry, err := writeShard(cfg, old, table, rel, plaintext, count, reuseEncrypted)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
shards = append(shards, entry)
|
||||
return nil
|
||||
}
|
||||
staticTables := []struct {
|
||||
table string
|
||||
path string
|
||||
rows any
|
||||
}{
|
||||
{"contacts", "data/contacts.jsonl.gz.age", data.Contacts},
|
||||
{"chats", "data/chats.jsonl.gz.age", data.Chats},
|
||||
{"groups", "data/groups.jsonl.gz.age", data.Groups},
|
||||
{"group_participants", "data/group_participants.jsonl.gz.age", data.Participants},
|
||||
}
|
||||
for _, table := range staticTables {
|
||||
if err := add(table.table, table.path, table.rows); err != nil {
|
||||
return Manifest{}, err
|
||||
}
|
||||
}
|
||||
for _, shard := range messageShards(data.Messages) {
|
||||
if err := add("messages", shard.path, shard.messages); err != nil {
|
||||
return Manifest{}, err
|
||||
}
|
||||
}
|
||||
sort.Slice(shards, func(i, j int) bool { return shards[i].Path < shards[j].Path })
|
||||
manifest := Manifest{
|
||||
Format: formatVersion,
|
||||
Encrypted: true,
|
||||
Exported: time.Now().UTC(),
|
||||
Recipients: recipients,
|
||||
Counts: Counts{
|
||||
Contacts: len(data.Contacts),
|
||||
Chats: len(data.Chats),
|
||||
Groups: len(data.Groups),
|
||||
Participants: len(data.Participants),
|
||||
Messages: len(data.Messages),
|
||||
},
|
||||
Shards: shards,
|
||||
}
|
||||
if equivalentManifest(old, manifest) {
|
||||
return old, nil
|
||||
}
|
||||
if err := removeStaleShards(cfg.Repo, shards); err != nil {
|
||||
return Manifest{}, err
|
||||
}
|
||||
if err := writeManifest(cfg.Repo, manifest); err != nil {
|
||||
return Manifest{}, err
|
||||
}
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
func readSnapshot(cfg Config, manifest Manifest) (store.SnapshotData, error) {
|
||||
if manifest.Format != formatVersion {
|
||||
return store.SnapshotData{}, fmt.Errorf("unsupported backup format %d", manifest.Format)
|
||||
}
|
||||
var data store.SnapshotData
|
||||
for _, shard := range manifest.Shards {
|
||||
plaintext, err := decryptShardFile(cfg, shard)
|
||||
if err != nil {
|
||||
return store.SnapshotData{}, err
|
||||
}
|
||||
if got := sha256Hex(plaintext); got != shard.SHA256 {
|
||||
return store.SnapshotData{}, fmt.Errorf("backup shard hash mismatch for %s", shard.Path)
|
||||
}
|
||||
switch shard.Table {
|
||||
case "contacts":
|
||||
if err := decodeJSONL(plaintext, &data.Contacts); err != nil {
|
||||
return store.SnapshotData{}, err
|
||||
}
|
||||
case "chats":
|
||||
if err := decodeJSONL(plaintext, &data.Chats); err != nil {
|
||||
return store.SnapshotData{}, err
|
||||
}
|
||||
case "groups":
|
||||
if err := decodeJSONL(plaintext, &data.Groups); err != nil {
|
||||
return store.SnapshotData{}, err
|
||||
}
|
||||
case "group_participants":
|
||||
if err := decodeJSONL(plaintext, &data.Participants); err != nil {
|
||||
return store.SnapshotData{}, err
|
||||
}
|
||||
case "messages":
|
||||
var messages []store.Message
|
||||
if err := decodeJSONL(plaintext, &messages); err != nil {
|
||||
return store.SnapshotData{}, err
|
||||
}
|
||||
data.Messages = append(data.Messages, messages...)
|
||||
default:
|
||||
return store.SnapshotData{}, fmt.Errorf("unknown backup table %q", shard.Table)
|
||||
}
|
||||
}
|
||||
sort.Slice(data.Messages, func(i, j int) bool {
|
||||
if data.Messages[i].Timestamp.Equal(data.Messages[j].Timestamp) {
|
||||
return data.Messages[i].SourcePK < data.Messages[j].SourcePK
|
||||
}
|
||||
return data.Messages[i].Timestamp.Before(data.Messages[j].Timestamp)
|
||||
})
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func writeShard(cfg Config, old Manifest, table, rel string, plaintext []byte, rows int, reuseEncrypted bool) (ShardEntry, error) {
|
||||
hash := sha256Hex(plaintext)
|
||||
path, err := resolveShardPath(cfg.Repo, rel)
|
||||
if err != nil {
|
||||
return ShardEntry{}, err
|
||||
}
|
||||
if oldEntry, ok := old.entry(rel); reuseEncrypted && ok && oldEntry.SHA256 == hash {
|
||||
if info, err := os.Stat(path); err == nil {
|
||||
oldEntry.Bytes = info.Size()
|
||||
return oldEntry, nil
|
||||
}
|
||||
}
|
||||
encrypted, _, err := encryptShard(plaintext, cfg.Recipients)
|
||||
if err != nil {
|
||||
return ShardEntry{}, err
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
|
||||
return ShardEntry{}, err
|
||||
}
|
||||
if err := os.WriteFile(path, encrypted, 0o600); err != nil {
|
||||
return ShardEntry{}, err
|
||||
}
|
||||
return ShardEntry{Table: table, Path: rel, Rows: rows, SHA256: hash, Bytes: int64(len(encrypted))}, nil
|
||||
}
|
||||
|
||||
func decryptShardFile(cfg Config, shard ShardEntry) ([]byte, error) {
|
||||
path, err := resolveShardPath(cfg.Repo, shard.Path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ciphertext, err := os.ReadFile(path) // #nosec G304 -- resolveShardPath confines manifest-controlled shard paths to data/*.age inside the backup repo.
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return decryptShard(ciphertext, cfg.Identity)
|
||||
}
|
||||
|
||||
func resolveShardPath(repo, rel string) (string, error) {
|
||||
clean := path.Clean(strings.TrimSpace(rel))
|
||||
if clean == "." || clean == ".." || strings.HasPrefix(clean, "../") || path.IsAbs(clean) {
|
||||
return "", fmt.Errorf("backup shard path escapes backup root: %s", rel)
|
||||
}
|
||||
if !strings.HasPrefix(clean, "data/") || !strings.HasSuffix(clean, ".age") {
|
||||
return "", fmt.Errorf("invalid backup shard path: %s", rel)
|
||||
}
|
||||
full := filepath.Join(repo, filepath.FromSlash(clean))
|
||||
root := filepath.Clean(filepath.Join(repo, "data"))
|
||||
parent := filepath.Clean(filepath.Dir(full))
|
||||
if parent != root && !strings.HasPrefix(parent, root+string(filepath.Separator)) {
|
||||
return "", fmt.Errorf("backup shard path escapes backup root: %s", rel)
|
||||
}
|
||||
return full, nil
|
||||
}
|
||||
|
||||
func encodeJSONL(rows any) ([]byte, int, error) {
|
||||
value := reflect.ValueOf(rows)
|
||||
if value.Kind() != reflect.Slice {
|
||||
return nil, 0, fmt.Errorf("unsupported JSONL rows %T", rows)
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
enc := json.NewEncoder(&buf)
|
||||
for i := 0; i < value.Len(); i++ {
|
||||
if err := enc.Encode(value.Index(i).Interface()); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
return buf.Bytes(), value.Len(), nil
|
||||
}
|
||||
|
||||
func decodeJSONL[T any](plaintext []byte, out *[]T) error {
|
||||
scanner := bufio.NewScanner(bytes.NewReader(plaintext))
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 16*1024*1024)
|
||||
for scanner.Scan() {
|
||||
var value T
|
||||
if err := json.Unmarshal(scanner.Bytes(), &value); err != nil {
|
||||
return err
|
||||
}
|
||||
*out = append(*out, value)
|
||||
}
|
||||
return scanner.Err()
|
||||
}
|
||||
|
||||
type messageShard struct {
|
||||
path string
|
||||
messages []store.Message
|
||||
}
|
||||
|
||||
func messageShards(messages []store.Message) []messageShard {
|
||||
buckets := map[string][]store.Message{}
|
||||
for _, message := range messages {
|
||||
t := message.Timestamp.UTC()
|
||||
year, month := "unknown", "00"
|
||||
if !t.IsZero() {
|
||||
year = fmt.Sprintf("%04d", t.Year())
|
||||
month = fmt.Sprintf("%02d", int(t.Month()))
|
||||
}
|
||||
rel := fmt.Sprintf("data/messages/%s/%s.jsonl.gz.age", year, month)
|
||||
buckets[rel] = append(buckets[rel], message)
|
||||
}
|
||||
paths := make([]string, 0, len(buckets))
|
||||
for path := range buckets {
|
||||
paths = append(paths, path)
|
||||
}
|
||||
sort.Strings(paths)
|
||||
out := make([]messageShard, 0, len(paths))
|
||||
for _, path := range paths {
|
||||
values := buckets[path]
|
||||
sort.Slice(values, func(i, j int) bool {
|
||||
if values[i].Timestamp.Equal(values[j].Timestamp) {
|
||||
return values[i].SourcePK < values[j].SourcePK
|
||||
}
|
||||
return values[i].Timestamp.Before(values[j].Timestamp)
|
||||
})
|
||||
out = append(out, messageShard{path: path, messages: values})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func readManifest(repo string) (Manifest, error) {
|
||||
data, err := os.ReadFile(filepath.Join(repo, "manifest.json")) // #nosec G304 -- repo is the configured local backup repository.
|
||||
if err != nil {
|
||||
return Manifest{}, err
|
||||
}
|
||||
var manifest Manifest
|
||||
if err := json.Unmarshal(data, &manifest); err != nil {
|
||||
return Manifest{}, err
|
||||
}
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
func writeManifest(repo string, manifest Manifest) error {
|
||||
data, err := json.MarshalIndent(manifest, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data = append(data, '\n')
|
||||
return os.WriteFile(filepath.Join(repo, "manifest.json"), data, 0o600)
|
||||
}
|
||||
|
||||
func (m Manifest) entry(path string) (ShardEntry, bool) {
|
||||
for _, shard := range m.Shards {
|
||||
if shard.Path == path {
|
||||
return shard, true
|
||||
}
|
||||
}
|
||||
return ShardEntry{}, false
|
||||
}
|
||||
|
||||
func equivalentManifest(a, b Manifest) bool {
|
||||
if a.Format != b.Format || a.Encrypted != b.Encrypted || !sameStrings(a.Recipients, b.Recipients) || a.Counts != b.Counts || len(a.Shards) != len(b.Shards) {
|
||||
return false
|
||||
}
|
||||
for i := range a.Shards {
|
||||
left, right := a.Shards[i], b.Shards[i]
|
||||
left.Bytes, right.Bytes = 0, 0
|
||||
if left != right {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func normalizedStrings(values []string) []string {
|
||||
seen := map[string]struct{}{}
|
||||
out := make([]string, 0, len(values))
|
||||
for _, value := range values {
|
||||
value = strings.TrimSpace(value)
|
||||
if value == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[value]; ok {
|
||||
continue
|
||||
}
|
||||
seen[value] = struct{}{}
|
||||
out = append(out, value)
|
||||
}
|
||||
sort.Strings(out)
|
||||
return out
|
||||
}
|
||||
|
||||
func sameStrings(a, b []string) bool {
|
||||
a, b = normalizedStrings(a), normalizedStrings(b)
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
for i := range a {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func removeStaleShards(repo string, shards []ShardEntry) error {
|
||||
keep := map[string]struct{}{}
|
||||
for _, shard := range shards {
|
||||
keep[filepath.Clean(filepath.Join(repo, filepath.FromSlash(shard.Path)))] = struct{}{}
|
||||
}
|
||||
root := filepath.Join(repo, "data")
|
||||
if _, err := os.Stat(root); os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
var stale []string
|
||||
if err := filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
|
||||
if err != nil || d == nil || d.IsDir() {
|
||||
return err
|
||||
}
|
||||
if !strings.HasSuffix(path, ".age") {
|
||||
return nil
|
||||
}
|
||||
clean := filepath.Clean(path)
|
||||
if _, ok := keep[clean]; ok {
|
||||
return nil
|
||||
}
|
||||
stale = append(stale, clean)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, path := range stale {
|
||||
rel, err := filepath.Rel(root, path)
|
||||
if err != nil || rel == "." || strings.HasPrefix(rel, ".."+string(filepath.Separator)) || filepath.IsAbs(rel) {
|
||||
return fmt.Errorf("stale shard path escapes backup root: %s", path)
|
||||
}
|
||||
if err := os.Remove(path); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeBackupReadme(repo string) error {
|
||||
path := filepath.Join(repo, "README.md")
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
return nil
|
||||
}
|
||||
const body = `# backup-telecrawl
|
||||
|
||||
Encrypted Git backup for a local telecrawl archive.
|
||||
|
||||
This repository is written by ` + "`telecrawl backup push`" + `. It is safe to keep on
|
||||
GitHub because the archive payload is encrypted before Git sees it.
|
||||
|
||||
## Layout
|
||||
|
||||
` + "```text" + `
|
||||
README.md
|
||||
manifest.json
|
||||
data/chats.jsonl.gz.age
|
||||
data/contacts.jsonl.gz.age
|
||||
data/groups.jsonl.gz.age
|
||||
data/group_participants.jsonl.gz.age
|
||||
data/messages/YYYY/MM.jsonl.gz.age
|
||||
` + "```" + `
|
||||
|
||||
` + "`manifest.json`" + ` is cleartext and contains format version, export time,
|
||||
public age recipients, table counts, shard paths, encrypted byte sizes, and
|
||||
plaintext hashes used for restore verification. Message text, contacts, chat
|
||||
names, participant IDs, and media metadata stay inside encrypted ` + "`*.jsonl.gz.age`" + ` shards.
|
||||
|
||||
## Security Model
|
||||
|
||||
Shard contents are JSONL, gzip-compressed with a fixed gzip timestamp, and
|
||||
encrypted with age for every configured public recipient. The local
|
||||
` + "`~/.telecrawl/age.key`" + ` identity is required to decrypt.
|
||||
|
||||
Git can still see manifest metadata: export time, public recipients, table
|
||||
names, row counts, shard paths, encrypted byte sizes, plaintext shard hashes,
|
||||
backup cadence, and which encrypted shards changed. Git cannot read message
|
||||
text, contacts, chat names, participant IDs, or media metadata without an age
|
||||
identity.
|
||||
|
||||
Anyone who can push to this repository can replace encrypted backup data with
|
||||
different data encrypted to your public recipient. Keep repository write access
|
||||
restricted and review unexpected backup commits. If an age identity is
|
||||
compromised, remove its public recipient and push a new backup; old Git history
|
||||
may still contain shards decryptable by the compromised key.
|
||||
|
||||
## Push
|
||||
|
||||
` + "```bash" + `
|
||||
telecrawl backup push
|
||||
` + "```" + `
|
||||
|
||||
The command pulls/rebases this checkout, refreshes the local telecrawl archive
|
||||
according to the normal sync policy, writes encrypted shards, updates the
|
||||
manifest, commits, and pushes this repository.
|
||||
|
||||
## Restore
|
||||
|
||||
` + "```bash" + `
|
||||
telecrawl backup pull
|
||||
` + "```" + `
|
||||
|
||||
` + "`backup pull`" + ` decrypts every shard with the local age identity, verifies the
|
||||
manifest hashes, validates the snapshot, and imports it into the configured
|
||||
telecrawl archive database.
|
||||
|
||||
## Recovery
|
||||
|
||||
Install telecrawl, clone this repo to the path in ` + "`~/.telecrawl/backup.json`" + `,
|
||||
restore the local age identity file, then run:
|
||||
|
||||
` + "```bash" + `
|
||||
telecrawl backup pull
|
||||
telecrawl --sync never status
|
||||
` + "```" + `
|
||||
|
||||
Do not commit the age identity. Only public ` + "`age1...`" + ` recipients belong in
|
||||
config; ` + "`AGE-SECRET-KEY-...`" + ` values must stay local or in a password manager.
|
||||
`
|
||||
return os.WriteFile(path, []byte(body), 0o600)
|
||||
}
|
||||
478
internal/backup/backup_test.go
Normal file
478
internal/backup/backup_test.go
Normal file
@ -0,0 +1,478 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"filippo.io/age"
|
||||
"github.com/openclaw/telecrawl/internal/store"
|
||||
)
|
||||
|
||||
func TestEncryptedBackupPushPull(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
source := openFixtureStore(t, "source.db")
|
||||
now := time.Date(2026, 4, 27, 12, 0, 0, 0, time.UTC)
|
||||
data := store.SnapshotData{
|
||||
Contacts: []store.Contact{{JID: "alice@s.whatsapp.net", FullName: "Alice", UpdatedAt: now}},
|
||||
Chats: []store.Chat{{JID: "chat@g.us", Kind: "group", Name: "Launch Group", LastMessageAt: now}},
|
||||
Groups: []store.Group{{JID: "chat@g.us", Name: "Launch Group", OwnerJID: "owner@s.whatsapp.net", CreatedAt: now}},
|
||||
Participants: []store.GroupParticipant{{GroupJID: "chat@g.us", UserJID: "alice@s.whatsapp.net", ContactName: "Alice", IsAdmin: true, IsActive: true}},
|
||||
Messages: []store.Message{
|
||||
{SourcePK: 1, ChatJID: "chat@g.us", ChatName: "Launch Group", MessageID: "a", SenderJID: "alice@s.whatsapp.net", SenderName: "Alice", Timestamp: now, Text: "secret launch text", RawType: 0, MessageType: "text"},
|
||||
},
|
||||
}
|
||||
if err := source.ImportSnapshot(ctx, data, "/fixture", now); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
remote := filepath.Join(t.TempDir(), "remote.git")
|
||||
runGit(t, "", "init", "--bare", remote)
|
||||
repo := filepath.Join(t.TempDir(), "backup")
|
||||
identity := filepath.Join(t.TempDir(), "age.key")
|
||||
configPath := filepath.Join(t.TempDir(), "backup.json")
|
||||
cfg, recipient, err := Init(ctx, Options{ConfigPath: configPath, Repo: repo, Remote: remote, Identity: identity, Push: false})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if cfg.Repo != repo || !strings.HasPrefix(recipient, "age1") {
|
||||
t.Fatalf("unexpected init cfg=%+v recipient=%q", cfg, recipient)
|
||||
}
|
||||
opts := Options{ConfigPath: configPath, Push: false}
|
||||
result, err := Push(ctx, source, opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !result.Changed || result.Messages != 1 || result.Shards == 0 {
|
||||
t.Fatalf("unexpected push result: %+v", result)
|
||||
}
|
||||
second, err := Push(ctx, source, opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if second.Changed {
|
||||
t.Fatalf("second push should be unchanged: %+v", second)
|
||||
}
|
||||
status, statusRepo, err := Status(ctx, opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if statusRepo != repo || status.Counts.Messages != 1 {
|
||||
t.Fatalf("unexpected backup status repo=%s status=%+v", statusRepo, status)
|
||||
}
|
||||
manifest, err := readManifest(repo)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !manifest.Encrypted || manifest.Counts.Messages != 1 {
|
||||
t.Fatalf("unexpected manifest: %+v", manifest)
|
||||
}
|
||||
ciphertext, err := os.ReadFile(filepath.Join(repo, filepath.FromSlash(manifest.Shards[len(manifest.Shards)-1].Path))) // #nosec G304 -- test reads a generated shard path from its temp repo manifest.
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if strings.Contains(string(ciphertext), "secret launch text") {
|
||||
t.Fatal("encrypted shard contains plaintext")
|
||||
}
|
||||
|
||||
restored := openFixtureStore(t, "restored.db")
|
||||
pulled, err := Pull(ctx, restored, opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if pulled.Messages != 1 {
|
||||
t.Fatalf("unexpected pull result: %+v", pulled)
|
||||
}
|
||||
results, err := restored.Search(ctx, store.MessageFilter{Query: "secret", Limit: 10})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(results) != 1 || results[0].Text != "secret launch text" {
|
||||
t.Fatalf("restore search mismatch: %+v", results)
|
||||
}
|
||||
|
||||
secondIdentity := filepath.Join(t.TempDir(), "second-age.key")
|
||||
secondRecipient, err := EnsureIdentity(secondIdentity)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
updatedCfg, err := LoadConfig(configPath)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
updatedCfg.Recipients = append(updatedCfg.Recipients, secondRecipient)
|
||||
if err := SaveConfig(configPath, updatedCfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
recipientChange, err := Push(ctx, source, opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !recipientChange.Changed {
|
||||
t.Fatal("adding a recipient should re-encrypt unchanged shards")
|
||||
}
|
||||
secondRestored := openFixtureStore(t, "second-restored.db")
|
||||
secondPulled, err := Pull(ctx, secondRestored, Options{ConfigPath: configPath, Identity: secondIdentity})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if secondPulled.Messages != 1 {
|
||||
t.Fatalf("unexpected second-recipient pull result: %+v", secondPulled)
|
||||
}
|
||||
secondResults, err := secondRestored.Search(ctx, store.MessageFilter{Query: "secret", Limit: 10})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(secondResults) != 1 || secondResults[0].Text != "secret launch text" {
|
||||
t.Fatalf("second-recipient restore mismatch: %+v", secondResults)
|
||||
}
|
||||
sameRecipients, err := Push(ctx, source, opts)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if sameRecipients.Changed {
|
||||
t.Fatalf("unchanged recipients should not rewrite backup: %+v", sameRecipients)
|
||||
}
|
||||
|
||||
derivedRepo := filepath.Join(t.TempDir(), "derived-recipient")
|
||||
if err := os.MkdirAll(derivedRepo, 0o700); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
runGit(t, derivedRepo, "init")
|
||||
derived, err := Push(ctx, source, Options{Repo: derivedRepo, Identity: identity, Push: false})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !derived.Changed || derived.Messages != 1 {
|
||||
t.Fatalf("unexpected derived-recipient push: %+v", derived)
|
||||
}
|
||||
|
||||
data.Messages = append(data.Messages, store.Message{SourcePK: 2, ChatJID: "chat@g.us", ChatName: "Launch Group", MessageID: "b", SenderJID: "alice@s.whatsapp.net", SenderName: "Alice", Timestamp: now.Add(time.Second), Text: "second secret", RawType: 0, MessageType: "text"})
|
||||
if err := source.ImportSnapshot(ctx, data, "/fixture", now); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pushed, err := Push(ctx, source, Options{ConfigPath: configPath, Push: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !pushed.Changed || pushed.Messages != 2 {
|
||||
t.Fatalf("unexpected pushed backup: %+v", pushed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigRoundTrip(t *testing.T) {
|
||||
path := filepath.Join(t.TempDir(), "backup.json")
|
||||
cfg := DefaultConfig()
|
||||
cfg.Repo = "~/Projects/example"
|
||||
cfg.Recipients = []string{"age1example"}
|
||||
if err := SaveConfig(path, cfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
loaded, err := LoadConfig(path)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if loaded.Repo != cfg.Repo || loaded.Recipients[0] != "age1example" {
|
||||
t.Fatalf("config mismatch: %+v", loaded)
|
||||
}
|
||||
if DefaultConfigPath() == "" {
|
||||
t.Fatal("default config path should not be empty")
|
||||
}
|
||||
if expandHome("~") == "~" || !strings.Contains(expandHome("~/Projects/example"), "Projects") {
|
||||
t.Fatal("home expansion did not expand")
|
||||
}
|
||||
if _, err := LoadConfig(filepath.Join(t.TempDir(), "missing.json")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := LoadConfig(t.TempDir()); err == nil {
|
||||
t.Fatal("expected directory config load error")
|
||||
}
|
||||
if err := SaveConfig(t.TempDir(), cfg); err == nil {
|
||||
t.Fatal("expected directory config save error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCryptoHelpers(t *testing.T) {
|
||||
identity := filepath.Join(t.TempDir(), "age.key")
|
||||
recipient, err := EnsureIdentity(identity)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
again, err := EnsureIdentity(identity)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if again != recipient {
|
||||
t.Fatalf("recipient changed: %q != %q", again, recipient)
|
||||
}
|
||||
fromIdentity, err := RecipientFromIdentity(identity)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if fromIdentity != recipient {
|
||||
t.Fatalf("recipient mismatch: %q != %q", fromIdentity, recipient)
|
||||
}
|
||||
encrypted, hash, err := encryptShard([]byte("private text\n"), []string{recipient})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if hash != sha256Hex([]byte("private text\n")) || strings.Contains(string(encrypted), "private text") {
|
||||
t.Fatal("encrypted shard mismatch")
|
||||
}
|
||||
tmp := filepath.Join(t.TempDir(), "shard.age")
|
||||
if err := os.WriteFile(tmp, encrypted, 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
plain, err := decryptShard(encrypted, identity)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(plain) != "private text\n" {
|
||||
t.Fatalf("decrypt mismatch: %q", plain)
|
||||
}
|
||||
if _, _, err := encryptShard([]byte("x"), []string{"bad"}); err == nil {
|
||||
t.Fatal("expected bad recipient error")
|
||||
}
|
||||
if _, _, err := encryptShard([]byte("x"), nil); err == nil {
|
||||
t.Fatal("expected missing recipient encrypt error")
|
||||
}
|
||||
emptyIdentity := filepath.Join(t.TempDir(), "empty.key")
|
||||
if err := os.WriteFile(emptyIdentity, []byte("# comment\n\n"), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := RecipientFromIdentity(emptyIdentity); err == nil {
|
||||
t.Fatal("expected empty identity error")
|
||||
}
|
||||
badIdentity := filepath.Join(t.TempDir(), "bad.key")
|
||||
if err := os.WriteFile(badIdentity, []byte("bad\n"), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := EnsureIdentity(badIdentity); err == nil {
|
||||
t.Fatal("expected bad existing identity error")
|
||||
}
|
||||
if _, err := RecipientFromIdentity(filepath.Join(t.TempDir(), "missing.key")); err == nil {
|
||||
t.Fatal("expected missing identity error")
|
||||
}
|
||||
if _, err := RecipientFromIdentity(badIdentity); err == nil {
|
||||
t.Fatal("expected bad identity parse error")
|
||||
}
|
||||
if _, err := decryptShard([]byte("not age"), identity); err == nil {
|
||||
t.Fatal("expected bad ciphertext error")
|
||||
}
|
||||
otherIdentity := filepath.Join(t.TempDir(), "other.key")
|
||||
if _, err := EnsureIdentity(otherIdentity); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := decryptShard(encrypted, otherIdentity); err == nil {
|
||||
t.Fatal("expected wrong identity decrypt error")
|
||||
}
|
||||
recipientValue, err := age.ParseX25519Recipient(recipient)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var rawAge bytes.Buffer
|
||||
w, err := age.Encrypt(&rawAge, recipientValue)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := w.Write([]byte("not gzip")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := w.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := decryptShard(rawAge.Bytes(), identity); err == nil {
|
||||
t.Fatal("expected non-gzip decrypt error")
|
||||
}
|
||||
if _, err := EnsureIdentity(filepath.Join(t.TempDir(), "missing", "dir")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotErrorAndUtilityPaths(t *testing.T) {
|
||||
if _, _, err := encodeJSONL(1); err == nil {
|
||||
t.Fatal("expected unsupported JSONL row type")
|
||||
}
|
||||
var contacts []store.Contact
|
||||
if err := decodeJSONL([]byte("{bad json}\n"), &contacts); err == nil {
|
||||
t.Fatal("expected invalid JSONL error")
|
||||
}
|
||||
if err := removeStaleShards(t.TempDir(), nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if equivalentManifest(Manifest{Format: 1}, Manifest{Format: 2}) {
|
||||
t.Fatal("different manifests should not be equivalent")
|
||||
}
|
||||
if _, err := readSnapshot(Config{}, Manifest{Format: 99}); err == nil {
|
||||
t.Fatal("expected unsupported format error")
|
||||
}
|
||||
if _, err := readSnapshot(Config{}, Manifest{Format: formatVersion, Shards: []ShardEntry{{Table: "nope"}}}); err == nil {
|
||||
t.Fatal("expected shard read error")
|
||||
}
|
||||
identity := filepath.Join(t.TempDir(), "age.key")
|
||||
recipient, err := EnsureIdentity(identity)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
repo := t.TempDir()
|
||||
if _, err := resolveShardPath(repo, "../outside.age"); err == nil {
|
||||
t.Fatal("expected escaping shard path error")
|
||||
}
|
||||
if _, err := resolveShardPath(repo, "manifest.json"); err == nil {
|
||||
t.Fatal("expected invalid shard path error")
|
||||
}
|
||||
encrypted, hash, err := encryptShard([]byte("{}\n"), []string{recipient})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
shardPath := filepath.Join("data", "unknown.jsonl.gz.age")
|
||||
fullShardPath := filepath.Join(repo, shardPath)
|
||||
if err := os.MkdirAll(filepath.Dir(fullShardPath), 0o700); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(fullShardPath, encrypted, 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cfg := Config{Repo: repo, Identity: identity}
|
||||
unknownManifest := Manifest{Format: formatVersion, Shards: []ShardEntry{{Table: "unknown", Path: filepath.ToSlash(shardPath), SHA256: hash}}}
|
||||
if _, err := readSnapshot(cfg, unknownManifest); err == nil {
|
||||
t.Fatal("expected unknown table error")
|
||||
}
|
||||
badHashManifest := Manifest{Format: formatVersion, Shards: []ShardEntry{{Table: "contacts", Path: filepath.ToSlash(shardPath), SHA256: "bad"}}}
|
||||
if _, err := readSnapshot(cfg, badHashManifest); err == nil {
|
||||
t.Fatal("expected hash mismatch")
|
||||
}
|
||||
duplicatePlain, duplicateHash, err := encryptShard([]byte(`{"source_pk":1,"chat_jid":"chat","message_id":"a","timestamp":"2026-04-27T12:00:00Z","raw_type":0}`+"\n"+`{"source_pk":1,"chat_jid":"chat","message_id":"b","timestamp":"2026-04-27T12:00:01Z","raw_type":0}`+"\n"), []string{recipient})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
duplicatePath := filepath.Join("data", "duplicate.jsonl.gz.age")
|
||||
fullDuplicatePath := filepath.Join(repo, duplicatePath)
|
||||
if err := os.WriteFile(fullDuplicatePath, duplicatePlain, 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
duplicateManifest := Manifest{Format: formatVersion, Shards: []ShardEntry{{Table: "messages", Path: filepath.ToSlash(duplicatePath), SHA256: duplicateHash}}}
|
||||
duplicateData, err := readSnapshot(cfg, duplicateManifest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := duplicateData.Validate(); err == nil {
|
||||
t.Fatal("expected duplicate restored data validation error")
|
||||
}
|
||||
if err := writeManifest(repo, Manifest{Format: formatVersion}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := readManifest(repo); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := readManifest(filepath.Join(repo, "missing")); err == nil {
|
||||
t.Fatal("expected missing manifest error")
|
||||
}
|
||||
unknown := store.Message{SourcePK: 1, ChatJID: "chat", MessageID: "a"}
|
||||
shards := messageShards([]store.Message{unknown})
|
||||
if len(shards) != 1 || !strings.Contains(shards[0].path, "unknown") {
|
||||
t.Fatalf("unexpected unknown-time shard: %+v", shards)
|
||||
}
|
||||
stalePath := filepath.Join(repo, "data", "stale.age")
|
||||
if err := os.WriteFile(stalePath, []byte("stale"), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := removeStaleShards(repo, []ShardEntry{{Path: filepath.ToSlash(shardPath)}}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := os.Stat(stalePath); !os.IsNotExist(err) {
|
||||
t.Fatal("expected stale shard removal")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGitHelpersWithoutRemote(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
repo := t.TempDir()
|
||||
cfg := Config{Repo: repo}
|
||||
if err := ensureRepo(ctx, cfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
changed, err := commitAndPush(ctx, cfg, "test: no changes", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if changed {
|
||||
t.Fatal("empty repo without changes should not commit")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTopLevelErrorPaths(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
source := openFixtureStore(t, "source.db")
|
||||
badConfig := t.TempDir()
|
||||
if _, _, err := Init(ctx, Options{ConfigPath: badConfig}); err == nil {
|
||||
t.Fatal("expected init config load error")
|
||||
}
|
||||
if _, err := Push(ctx, source, Options{ConfigPath: badConfig}); err == nil {
|
||||
t.Fatal("expected push config load error")
|
||||
}
|
||||
if _, err := Pull(ctx, source, Options{ConfigPath: badConfig}); err == nil {
|
||||
t.Fatal("expected pull config load error")
|
||||
}
|
||||
if _, _, err := Status(ctx, Options{ConfigPath: badConfig}); err == nil {
|
||||
t.Fatal("expected status config load error")
|
||||
}
|
||||
|
||||
repo := t.TempDir()
|
||||
runGit(t, repo, "init")
|
||||
if _, err := Pull(ctx, source, Options{Repo: repo, Identity: filepath.Join(t.TempDir(), "age.key")}); err == nil {
|
||||
t.Fatal("expected missing manifest pull error")
|
||||
}
|
||||
if _, _, err := Status(ctx, Options{Repo: repo}); err == nil {
|
||||
t.Fatal("expected missing manifest status error")
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
if err := source.ImportSnapshot(ctx, store.SnapshotData{
|
||||
Chats: []store.Chat{{JID: "chat", Kind: "dm", Name: "Chat", LastMessageAt: now}},
|
||||
Messages: []store.Message{{SourcePK: 1, ChatJID: "chat", MessageID: "a", Timestamp: now, RawType: 0, Text: "hello"}},
|
||||
}, "/fixture", now); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := Push(ctx, source, Options{Repo: repo, Recipients: []string{"bad"}, Push: false}); err == nil {
|
||||
t.Fatal("expected bad recipient push error")
|
||||
}
|
||||
if err := source.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
identity := filepath.Join(t.TempDir(), "age.key")
|
||||
recipient, err := EnsureIdentity(identity)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := Push(ctx, source, Options{Repo: repo, Identity: identity, Recipients: []string{recipient}, Push: false}); err == nil {
|
||||
t.Fatal("expected closed store push error")
|
||||
}
|
||||
if err := ensureRepo(ctx, Config{}); err == nil {
|
||||
t.Fatal("expected empty repo path error")
|
||||
}
|
||||
if _, err := commitAndPush(ctx, Config{Repo: filepath.Join(t.TempDir(), "missing")}, "test", false); err == nil {
|
||||
t.Fatal("expected commit in missing repo error")
|
||||
}
|
||||
}
|
||||
|
||||
func openFixtureStore(t *testing.T, name string) *store.Store {
|
||||
t.Helper()
|
||||
st, err := store.Open(context.Background(), filepath.Join(t.TempDir(), name))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() { _ = st.Close() })
|
||||
return st
|
||||
}
|
||||
|
||||
func runGit(t *testing.T, dir string, args ...string) {
|
||||
t.Helper()
|
||||
if err := git(context.Background(), dir, args...); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
119
internal/backup/config.go
Normal file
119
internal/backup/config.go
Normal file
@ -0,0 +1,119 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultRemote = "https://github.com/steipete/backup-telecrawl.git"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Repo string `json:"repo"`
|
||||
Remote string `json:"remote"`
|
||||
Identity string `json:"identity"`
|
||||
Recipients []string `json:"recipients"`
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
ConfigPath string
|
||||
Repo string
|
||||
Remote string
|
||||
Identity string
|
||||
Recipients []string
|
||||
Push bool
|
||||
}
|
||||
|
||||
func DefaultConfig() Config {
|
||||
return Config{
|
||||
Repo: "~/Projects/backup-telecrawl",
|
||||
Remote: defaultRemote,
|
||||
Identity: "~/.telecrawl/age.key",
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultConfigPath() string {
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return "backup.json"
|
||||
}
|
||||
return filepath.Join(home, ".telecrawl", "backup.json")
|
||||
}
|
||||
|
||||
func LoadConfig(path string) (Config, error) {
|
||||
if strings.TrimSpace(path) == "" {
|
||||
path = DefaultConfigPath()
|
||||
}
|
||||
cfg := DefaultConfig()
|
||||
data, err := os.ReadFile(expandHome(path))
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return cfg, nil
|
||||
}
|
||||
return Config{}, err
|
||||
}
|
||||
if len(strings.TrimSpace(string(data))) == 0 {
|
||||
return cfg, nil
|
||||
}
|
||||
if err := json.Unmarshal(data, &cfg); err != nil {
|
||||
return Config{}, fmt.Errorf("read backup config: %w", err)
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func SaveConfig(path string, cfg Config) error {
|
||||
if strings.TrimSpace(path) == "" {
|
||||
path = DefaultConfigPath()
|
||||
}
|
||||
path = expandHome(path)
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := json.MarshalIndent(cfg, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data = append(data, '\n')
|
||||
return os.WriteFile(path, data, 0o600)
|
||||
}
|
||||
|
||||
func ResolveOptions(opts Options) (Config, error) {
|
||||
cfg, err := LoadConfig(opts.ConfigPath)
|
||||
if err != nil {
|
||||
return Config{}, err
|
||||
}
|
||||
if strings.TrimSpace(opts.Repo) != "" {
|
||||
cfg.Repo = opts.Repo
|
||||
}
|
||||
if strings.TrimSpace(opts.Remote) != "" {
|
||||
cfg.Remote = opts.Remote
|
||||
}
|
||||
if strings.TrimSpace(opts.Identity) != "" {
|
||||
cfg.Identity = opts.Identity
|
||||
}
|
||||
if len(opts.Recipients) > 0 {
|
||||
cfg.Recipients = opts.Recipients
|
||||
}
|
||||
cfg.Repo = expandHome(cfg.Repo)
|
||||
cfg.Identity = expandHome(cfg.Identity)
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func expandHome(path string) string {
|
||||
if path == "~" {
|
||||
if home, err := os.UserHomeDir(); err == nil {
|
||||
return home
|
||||
}
|
||||
}
|
||||
if after, ok := strings.CutPrefix(path, "~/"); ok {
|
||||
if home, err := os.UserHomeDir(); err == nil {
|
||||
return filepath.Join(home, after)
|
||||
}
|
||||
}
|
||||
return path
|
||||
}
|
||||
23
internal/backup/crypto.go
Normal file
23
internal/backup/crypto.go
Normal file
@ -0,0 +1,23 @@
|
||||
package backup
|
||||
|
||||
import ckbackup "github.com/openclaw/crawlkit/backup"
|
||||
|
||||
func EnsureIdentity(path string) (string, error) {
|
||||
return ckbackup.EnsureIdentity(path)
|
||||
}
|
||||
|
||||
func RecipientFromIdentity(path string) (string, error) {
|
||||
return ckbackup.RecipientFromIdentity(path)
|
||||
}
|
||||
|
||||
func encryptShard(plaintext []byte, recipientStrings []string) ([]byte, string, error) {
|
||||
return ckbackup.EncryptShard(plaintext, recipientStrings)
|
||||
}
|
||||
|
||||
func decryptShard(ciphertext []byte, identityPath string) ([]byte, error) {
|
||||
return ckbackup.DecryptShard(ciphertext, identityPath)
|
||||
}
|
||||
|
||||
func sha256Hex(data []byte) string {
|
||||
return ckbackup.SHA256Hex(data)
|
||||
}
|
||||
91
internal/backup/git.go
Normal file
91
internal/backup/git.go
Normal file
@ -0,0 +1,91 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func ensureRepo(ctx context.Context, cfg Config) error {
|
||||
if strings.TrimSpace(cfg.Repo) == "" {
|
||||
return fmt.Errorf("backup repo path is required")
|
||||
}
|
||||
if _, err := os.Stat(filepath.Join(cfg.Repo, ".git")); err == nil {
|
||||
pullErr := git(ctx, cfg.Repo, "pull", "--rebase")
|
||||
if pullErr != nil {
|
||||
hasHead := git(ctx, cfg.Repo, "rev-parse", "--verify", "HEAD") == nil
|
||||
if !hasHead {
|
||||
return nil
|
||||
}
|
||||
if strings.Contains(pullErr.Error(), "no tracking information") ||
|
||||
strings.Contains(pullErr.Error(), "No remote repository specified") ||
|
||||
strings.Contains(pullErr.Error(), "no such ref was fetched") {
|
||||
return nil
|
||||
}
|
||||
return pullErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if strings.TrimSpace(cfg.Remote) != "" {
|
||||
if err := os.MkdirAll(filepath.Dir(cfg.Repo), 0o700); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := git(ctx, "", "clone", cfg.Remote, cfg.Repo); err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
if err := os.MkdirAll(cfg.Repo, 0o700); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := git(ctx, cfg.Repo, "init"); err != nil {
|
||||
return err
|
||||
}
|
||||
if strings.TrimSpace(cfg.Remote) != "" {
|
||||
if err := git(ctx, cfg.Repo, "remote", "add", "origin", cfg.Remote); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func commitAndPush(ctx context.Context, cfg Config, message string, push bool) (bool, error) {
|
||||
if err := git(ctx, cfg.Repo, "add", "."); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if err := git(ctx, cfg.Repo, "diff", "--cached", "--quiet"); err == nil {
|
||||
return false, nil
|
||||
}
|
||||
if err := git(ctx, cfg.Repo, "commit", "-m", message); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if push {
|
||||
if err := git(ctx, cfg.Repo, "push", "-u", "origin", "HEAD"); err != nil {
|
||||
return true, err
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func git(ctx context.Context, dir string, args ...string) error {
|
||||
cmd := exec.CommandContext(ctx, "git", args...) // #nosec G204 -- telecrawl only passes fixed git subcommands plus configured repo paths.
|
||||
cmd.Dir = dir
|
||||
cmd.Env = append(os.Environ(),
|
||||
"GIT_AUTHOR_NAME=telecrawl",
|
||||
"GIT_AUTHOR_EMAIL=telecrawl@example.invalid",
|
||||
"GIT_COMMITTER_NAME=telecrawl",
|
||||
"GIT_COMMITTER_EMAIL=telecrawl@example.invalid",
|
||||
)
|
||||
var stderr bytes.Buffer
|
||||
cmd.Stderr = &stderr
|
||||
if err := cmd.Run(); err != nil {
|
||||
if stderr.Len() > 0 {
|
||||
return fmt.Errorf("git %s: %w: %s", strings.Join(args, " "), err, strings.TrimSpace(stderr.String()))
|
||||
}
|
||||
return fmt.Errorf("git %s: %w", strings.Join(args, " "), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
505
internal/cli/cli.go
Normal file
505
internal/cli/cli.go
Normal file
@ -0,0 +1,505 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/openclaw/telecrawl/internal/backup"
|
||||
"github.com/openclaw/telecrawl/internal/store"
|
||||
"github.com/openclaw/telecrawl/internal/telegramdesktop"
|
||||
)
|
||||
|
||||
type cliError struct {
|
||||
code int
|
||||
err error
|
||||
}
|
||||
|
||||
func (e *cliError) Error() string {
|
||||
return e.err.Error()
|
||||
}
|
||||
|
||||
func (e *cliError) Unwrap() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
func ExitCode(err error) int {
|
||||
if err == nil {
|
||||
return 0
|
||||
}
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return 1
|
||||
}
|
||||
var codeErr *cliError
|
||||
if errors.As(err, &codeErr) {
|
||||
return codeErr.code
|
||||
}
|
||||
return 1
|
||||
}
|
||||
|
||||
type runtime struct {
|
||||
ctx context.Context
|
||||
stdout io.Writer
|
||||
stderr io.Writer
|
||||
json bool
|
||||
dbPath string
|
||||
source string
|
||||
python string
|
||||
}
|
||||
|
||||
func Run(ctx context.Context, args []string, stdout, stderr io.Writer) error {
|
||||
if len(args) == 0 || args[0] == "help" || args[0] == "--help" || args[0] == "-h" {
|
||||
printUsage(stdout)
|
||||
return nil
|
||||
}
|
||||
global := flag.NewFlagSet("telecrawl", flag.ContinueOnError)
|
||||
global.SetOutput(io.Discard)
|
||||
jsonOut := global.Bool("json", false, "")
|
||||
dbPath := global.String("db", defaultDBPath(), "")
|
||||
source := global.String("source", "", "")
|
||||
python := global.String("python", "", "")
|
||||
versionFlag := global.Bool("version", false, "")
|
||||
if err := global.Parse(args); err != nil {
|
||||
return usageErr(err)
|
||||
}
|
||||
if *versionFlag {
|
||||
_, _ = io.WriteString(stdout, version+"\n")
|
||||
return nil
|
||||
}
|
||||
rest := global.Args()
|
||||
if len(rest) == 0 || rest[0] == "help" || rest[0] == "--help" || rest[0] == "-h" {
|
||||
printUsage(stdout)
|
||||
return nil
|
||||
}
|
||||
if rest[0] == "version" {
|
||||
_, _ = io.WriteString(stdout, version+"\n")
|
||||
return nil
|
||||
}
|
||||
r := &runtime{ctx: ctx, stdout: stdout, stderr: stderr, json: *jsonOut, dbPath: *dbPath, source: *source, python: *python}
|
||||
return r.dispatch(rest)
|
||||
}
|
||||
|
||||
func (r *runtime) dispatch(args []string) error {
|
||||
switch args[0] {
|
||||
case "import", "sync":
|
||||
return r.runImport(args[1:])
|
||||
case "doctor":
|
||||
return r.runDoctor(args[1:])
|
||||
case "status":
|
||||
return r.runStatus(args[1:])
|
||||
case "chats":
|
||||
return r.runChats(args[1:])
|
||||
case "messages":
|
||||
return r.runMessages(args[1:])
|
||||
case "search":
|
||||
return r.runSearch(args[1:])
|
||||
case "backup":
|
||||
return r.runBackup(args[1:])
|
||||
case "deps":
|
||||
return r.runDeps(args[1:])
|
||||
case "wiretap":
|
||||
return r.runImport(args[1:])
|
||||
default:
|
||||
return usageErr(fmt.Errorf("unknown command %q", args[0]))
|
||||
}
|
||||
}
|
||||
|
||||
func (r *runtime) runDeps(args []string) error {
|
||||
if len(args) != 1 || args[0] != "install" {
|
||||
return usageErr(errors.New("usage: telecrawl deps install"))
|
||||
}
|
||||
venv := filepath.Join(defaultBaseDir(), "venv")
|
||||
python, err := exec.LookPath("python3.11")
|
||||
if err != nil {
|
||||
python, err = exec.LookPath("python3")
|
||||
if err != nil {
|
||||
return errors.New("python3.11 or python3 required")
|
||||
}
|
||||
}
|
||||
if err := os.MkdirAll(defaultBaseDir(), 0o700); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := os.Stat(filepath.Join(venv, "bin", "python")); os.IsNotExist(err) {
|
||||
if err := runLogged(r.ctx, r.stderr, python, "-m", "venv", venv); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
pipPython := filepath.Join(venv, "bin", "python")
|
||||
if err := runLogged(r.ctx, r.stderr, pipPython, "-m", "pip", "install", "--upgrade", "pip"); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := runLogged(r.ctx, r.stderr, pipPython, "-m", "pip", "install", "opentele2", "telethon"); err != nil {
|
||||
return err
|
||||
}
|
||||
return r.print(map[string]any{"python": pipPython, "installed": true})
|
||||
}
|
||||
|
||||
func (r *runtime) withStore(fn func(*store.Store) error) error {
|
||||
st, err := store.Open(r.ctx, r.dbPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer st.Close()
|
||||
return fn(st)
|
||||
}
|
||||
|
||||
func (r *runtime) runDoctor(args []string) error {
|
||||
fs := flag.NewFlagSet("telecrawl doctor", flag.ContinueOnError)
|
||||
fs.SetOutput(io.Discard)
|
||||
path := fs.String("path", r.source, "")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return usageErr(err)
|
||||
}
|
||||
return r.printProbe(telegramdesktop.Probe(r.ctx, telegramdesktop.Options{Path: *path}))
|
||||
}
|
||||
|
||||
func (r *runtime) runStatus(args []string) error {
|
||||
fs := flag.NewFlagSet("telecrawl status", flag.ContinueOnError)
|
||||
fs.SetOutput(io.Discard)
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return usageErr(err)
|
||||
}
|
||||
return r.withStore(func(st *store.Store) error {
|
||||
status, err := st.Status(r.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.print(status)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *runtime) runImport(args []string) error {
|
||||
fs := flag.NewFlagSet("telecrawl import", flag.ContinueOnError)
|
||||
fs.SetOutput(io.Discard)
|
||||
path := fs.String("path", r.source, "")
|
||||
python := fs.String("python", r.python, "")
|
||||
dialogsLimit := fs.Int("dialogs-limit", 200, "")
|
||||
messagesLimit := fs.Int("messages-limit", 500, "")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return usageErr(err)
|
||||
}
|
||||
if fs.NArg() != 0 {
|
||||
return usageErr(errors.New("import takes flags only"))
|
||||
}
|
||||
return r.withStore(func(st *store.Store) error {
|
||||
result, err := telegramdesktop.Import(r.ctx, telegramdesktop.ImportOptions{
|
||||
Path: *path,
|
||||
Python: *python,
|
||||
DialogsLimit: *dialogsLimit,
|
||||
MessagesLimit: *messagesLimit,
|
||||
}, st.Path())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := st.ReplaceAll(r.ctx, result.Stats, result.Chats, result.Messages); err != nil {
|
||||
return err
|
||||
}
|
||||
return r.print(result.Stats)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *runtime) runChats(args []string) error {
|
||||
fs := flag.NewFlagSet("telecrawl chats", flag.ContinueOnError)
|
||||
fs.SetOutput(io.Discard)
|
||||
limit := fs.Int("limit", 50, "")
|
||||
unread := fs.Bool("unread", false, "")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return usageErr(err)
|
||||
}
|
||||
return r.withStore(func(st *store.Store) error {
|
||||
chats, err := st.ListChats(r.ctx, *limit, *unread)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.print(chats)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *runtime) runMessages(args []string) error {
|
||||
filter, err := r.messageFilter("telecrawl messages", args, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.withStore(func(st *store.Store) error {
|
||||
messages, err := st.Messages(r.ctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.print(messages)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *runtime) runSearch(args []string) error {
|
||||
filter, err := r.messageFilter("telecrawl search", args, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.withStore(func(st *store.Store) error {
|
||||
messages, err := st.Search(r.ctx, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.print(messages)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *runtime) messageFilter(name string, args []string, requireQuery bool) (store.MessageFilter, error) {
|
||||
fs := flag.NewFlagSet(name, flag.ContinueOnError)
|
||||
fs.SetOutput(io.Discard)
|
||||
var filter store.MessageFilter
|
||||
fs.StringVar(&filter.ChatJID, "chat", "", "")
|
||||
fs.StringVar(&filter.Sender, "sender", "", "")
|
||||
fs.IntVar(&filter.Limit, "limit", 50, "")
|
||||
after := fs.String("after", "", "")
|
||||
before := fs.String("before", "", "")
|
||||
fromMe := fs.Bool("from-me", false, "")
|
||||
fromThem := fs.Bool("from-them", false, "")
|
||||
fs.BoolVar(&filter.HasMedia, "media", false, "")
|
||||
fs.BoolVar(&filter.Asc, "asc", false, "")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return filter, usageErr(err)
|
||||
}
|
||||
if requireQuery {
|
||||
if fs.NArg() != 1 {
|
||||
return filter, usageErr(errors.New("search takes exactly one query"))
|
||||
}
|
||||
filter.Query = fs.Arg(0)
|
||||
} else if fs.NArg() != 0 {
|
||||
return filter, usageErr(errors.New("messages takes flags only"))
|
||||
}
|
||||
if *after != "" {
|
||||
t, err := parseDate(*after)
|
||||
if err != nil {
|
||||
return filter, usageErr(err)
|
||||
}
|
||||
filter.After = &t
|
||||
}
|
||||
if *before != "" {
|
||||
t, err := parseDate(*before)
|
||||
if err != nil {
|
||||
return filter, usageErr(err)
|
||||
}
|
||||
filter.Before = &t
|
||||
}
|
||||
if *fromMe && *fromThem {
|
||||
return filter, usageErr(errors.New("--from-me and --from-them conflict"))
|
||||
}
|
||||
if *fromMe || *fromThem {
|
||||
v := *fromMe
|
||||
filter.FromMe = &v
|
||||
}
|
||||
return filter, nil
|
||||
}
|
||||
|
||||
func (r *runtime) runBackup(args []string) error {
|
||||
if len(args) == 0 {
|
||||
return usageErr(errors.New("backup needs subcommand: init, push, pull, status"))
|
||||
}
|
||||
switch args[0] {
|
||||
case "init":
|
||||
return r.backupInit(args[1:])
|
||||
case "push":
|
||||
return r.backupPush(args[1:])
|
||||
case "pull":
|
||||
return r.backupPull(args[1:])
|
||||
case "status":
|
||||
return r.backupStatus(args[1:])
|
||||
default:
|
||||
return usageErr(fmt.Errorf("unknown backup command %q", args[0]))
|
||||
}
|
||||
}
|
||||
|
||||
func backupFlags(name string) (*flag.FlagSet, *backup.Options, *bool) {
|
||||
fs := flag.NewFlagSet(name, flag.ContinueOnError)
|
||||
fs.SetOutput(io.Discard)
|
||||
opts := &backup.Options{}
|
||||
fs.StringVar(&opts.ConfigPath, "config", backup.DefaultConfigPath(), "")
|
||||
fs.StringVar(&opts.Repo, "repo", "", "")
|
||||
fs.StringVar(&opts.Remote, "remote", "", "")
|
||||
fs.StringVar(&opts.Identity, "identity", "", "")
|
||||
fs.Func("recipient", "", func(value string) error {
|
||||
opts.Recipients = append(opts.Recipients, value)
|
||||
return nil
|
||||
})
|
||||
noPush := fs.Bool("no-push", false, "")
|
||||
return fs, opts, noPush
|
||||
}
|
||||
|
||||
func (r *runtime) backupInit(args []string) error {
|
||||
fs, opts, noPush := backupFlags("telecrawl backup init")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return usageErr(err)
|
||||
}
|
||||
opts.Push = !*noPush
|
||||
cfg, recipient, err := backup.Init(r.ctx, *opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.print(map[string]any{"repo": cfg.Repo, "remote": cfg.Remote, "identity": cfg.Identity, "recipient": recipient})
|
||||
}
|
||||
|
||||
func (r *runtime) backupPush(args []string) error {
|
||||
fs, opts, noPush := backupFlags("telecrawl backup push")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return usageErr(err)
|
||||
}
|
||||
opts.Push = !*noPush
|
||||
return r.withStore(func(st *store.Store) error {
|
||||
result, err := backup.Push(r.ctx, st, *opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.print(result)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *runtime) backupPull(args []string) error {
|
||||
fs, opts, _ := backupFlags("telecrawl backup pull")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return usageErr(err)
|
||||
}
|
||||
return r.withStore(func(st *store.Store) error {
|
||||
result, err := backup.Pull(r.ctx, st, *opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.print(result)
|
||||
})
|
||||
}
|
||||
|
||||
func (r *runtime) backupStatus(args []string) error {
|
||||
fs, opts, _ := backupFlags("telecrawl backup status")
|
||||
if err := fs.Parse(args); err != nil {
|
||||
return usageErr(err)
|
||||
}
|
||||
manifest, repo, err := backup.Status(r.ctx, *opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.print(map[string]any{"repo": repo, "manifest": manifest})
|
||||
}
|
||||
|
||||
func (r *runtime) printProbe(report telegramdesktop.Report) error {
|
||||
if r.json {
|
||||
enc := json.NewEncoder(r.stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(report)
|
||||
}
|
||||
fmt.Fprintf(r.stdout, "path: %s\n", report.Path)
|
||||
fmt.Fprintf(r.stdout, "exists: %t\n", report.Exists)
|
||||
fmt.Fprintf(r.stdout, "accessible: %t\n", report.Accessible)
|
||||
fmt.Fprintf(r.stdout, "store: %s\n", report.Store)
|
||||
fmt.Fprintf(r.stdout, "sqlite_files: %d\n", report.SQLiteFiles)
|
||||
fmt.Fprintf(r.stdout, "tdesktop_files: %d\n", report.TDesktopFiles)
|
||||
fmt.Fprintf(r.stdout, "files_scanned: %d\n", report.FilesScanned)
|
||||
fmt.Fprintf(r.stdout, "bytes_scanned: %d\n", report.BytesScanned)
|
||||
if report.AccountDirs > 0 {
|
||||
fmt.Fprintf(r.stdout, "account_dirs: %d\n", report.AccountDirs)
|
||||
}
|
||||
if report.Error != "" {
|
||||
fmt.Fprintf(r.stdout, "error: %s\n", report.Error)
|
||||
}
|
||||
if report.Note != "" {
|
||||
fmt.Fprintf(r.stdout, "note: %s\n", report.Note)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *runtime) print(v any) error {
|
||||
enc := json.NewEncoder(r.stdout)
|
||||
if r.json {
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(v)
|
||||
}
|
||||
switch value := v.(type) {
|
||||
case store.Status:
|
||||
fmt.Fprintf(r.stdout, "db_path: %s\nchats: %d\nmessages: %d\nunread_chats: %d\nunread_messages: %d\nmedia_messages: %d\n",
|
||||
value.DBPath, value.Chats, value.Messages, value.UnreadChats, value.UnreadMessages, value.MediaMessages)
|
||||
if !value.OldestMessage.IsZero() {
|
||||
fmt.Fprintf(r.stdout, "oldest_message: %s\n", value.OldestMessage.Format(time.RFC3339))
|
||||
}
|
||||
if !value.NewestMessage.IsZero() {
|
||||
fmt.Fprintf(r.stdout, "newest_message: %s\n", value.NewestMessage.Format(time.RFC3339))
|
||||
}
|
||||
if !value.LastImportAt.IsZero() {
|
||||
fmt.Fprintf(r.stdout, "last_import_at: %s\n", value.LastImportAt.Format(time.RFC3339))
|
||||
}
|
||||
return nil
|
||||
case store.ImportStats:
|
||||
fmt.Fprintf(r.stdout, "source_path: %s\ndb_path: %s\nchats: %d\nmessages: %d\nmedia_messages: %d\nstarted_at: %s\nfinished_at: %s\n",
|
||||
value.SourcePath, value.DBPath, value.Chats, value.Messages, value.MediaMessages, value.StartedAt.Format(time.RFC3339), value.FinishedAt.Format(time.RFC3339))
|
||||
return nil
|
||||
default:
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(v)
|
||||
}
|
||||
}
|
||||
|
||||
func usageErr(err error) error {
|
||||
return &cliError{code: 2, err: err}
|
||||
}
|
||||
|
||||
func printUsage(w io.Writer) {
|
||||
_, _ = io.WriteString(w, `telecrawl: Telegram archive probe/import CLI
|
||||
|
||||
usage:
|
||||
telecrawl [--json] doctor [--path PATH]
|
||||
telecrawl [--json] import [--path PATH] [--dialogs-limit N] [--messages-limit N]
|
||||
telecrawl [--json] status
|
||||
telecrawl [--json] chats [--limit N] [--unread]
|
||||
telecrawl [--json] messages [--chat ID] [--limit N] [--after DATE]
|
||||
telecrawl [--json] search "query" [--chat ID]
|
||||
telecrawl [--json] backup init|push|pull|status
|
||||
telecrawl deps install
|
||||
telecrawl version
|
||||
|
||||
notes:
|
||||
import uses Telegram Desktop tdata via opentele2/Telethon
|
||||
backup writes encrypted age shards to a git repo
|
||||
`)
|
||||
}
|
||||
|
||||
func defaultDBPath() string {
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return "telecrawl.db"
|
||||
}
|
||||
return filepath.Join(home, ".telecrawl", "telecrawl.db")
|
||||
}
|
||||
|
||||
func parseDate(value string) (time.Time, error) {
|
||||
value = strings.TrimSpace(value)
|
||||
if t, err := time.Parse(time.RFC3339, value); err == nil {
|
||||
return t, nil
|
||||
}
|
||||
if t, err := time.Parse("2006-01-02", value); err == nil {
|
||||
return t, nil
|
||||
}
|
||||
return time.Time{}, fmt.Errorf("invalid date %q", value)
|
||||
}
|
||||
|
||||
func defaultBaseDir() string {
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return ".telecrawl"
|
||||
}
|
||||
return filepath.Join(home, ".telecrawl")
|
||||
}
|
||||
|
||||
func runLogged(ctx context.Context, stderr io.Writer, name string, args ...string) error {
|
||||
cmd := exec.CommandContext(ctx, name, args...) // #nosec G204 -- dependency install uses fixed commands.
|
||||
cmd.Stdout = stderr
|
||||
cmd.Stderr = stderr
|
||||
if err := cmd.Run(); err != nil {
|
||||
return fmt.Errorf("%s %s: %w", name, strings.Join(args, " "), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
3
internal/cli/version.go
Normal file
3
internal/cli/version.go
Normal file
@ -0,0 +1,3 @@
|
||||
package cli
|
||||
|
||||
var version = "dev"
|
||||
54
internal/store/export.go
Normal file
54
internal/store/export.go
Normal file
@ -0,0 +1,54 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
type SnapshotData struct {
|
||||
Contacts []Contact
|
||||
Chats []Chat
|
||||
Groups []Group
|
||||
Participants []GroupParticipant
|
||||
Messages []Message
|
||||
}
|
||||
|
||||
func (d SnapshotData) Validate() error {
|
||||
seen := map[int64]struct{}{}
|
||||
for _, message := range d.Messages {
|
||||
if message.SourcePK == 0 {
|
||||
return fmt.Errorf("message with empty source_pk")
|
||||
}
|
||||
if _, ok := seen[message.SourcePK]; ok {
|
||||
return fmt.Errorf("duplicate message source_pk %d", message.SourcePK)
|
||||
}
|
||||
seen[message.SourcePK] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) ExportAll(ctx context.Context) (SnapshotData, error) {
|
||||
chats, err := s.ListChats(ctx, int(^uint(0)>>1), false)
|
||||
if err != nil {
|
||||
return SnapshotData{}, err
|
||||
}
|
||||
messages, err := s.Messages(ctx, MessageFilter{Limit: int(^uint(0) >> 1), Asc: true})
|
||||
if err != nil {
|
||||
return SnapshotData{}, err
|
||||
}
|
||||
return SnapshotData{Chats: chats, Messages: messages}, nil
|
||||
}
|
||||
|
||||
func (s *Store) ImportSnapshot(ctx context.Context, data SnapshotData, sourcePath string, finishedAt time.Time) error {
|
||||
if finishedAt.IsZero() {
|
||||
finishedAt = time.Now().UTC()
|
||||
}
|
||||
stats := ImportStats{SourcePath: sourcePath, DBPath: s.Path(), Chats: len(data.Chats), Messages: len(data.Messages), StartedAt: finishedAt, FinishedAt: finishedAt}
|
||||
for _, message := range data.Messages {
|
||||
if message.MediaType != "" || message.MediaPath != "" || message.MediaURL != "" {
|
||||
stats.MediaMessages++
|
||||
}
|
||||
}
|
||||
return s.ReplaceAll(ctx, stats, data.Chats, data.Messages)
|
||||
}
|
||||
77
internal/store/schema.go
Normal file
77
internal/store/schema.go
Normal file
@ -0,0 +1,77 @@
|
||||
package store
|
||||
|
||||
const schemaSQL = `
|
||||
create table if not exists chats (
|
||||
id integer primary key,
|
||||
kind text not null,
|
||||
name text,
|
||||
username text,
|
||||
last_message_at integer,
|
||||
unread_count integer not null default 0,
|
||||
message_count integer not null default 0
|
||||
);
|
||||
|
||||
create table if not exists contacts (
|
||||
jid text primary key,
|
||||
phone text,
|
||||
full_name text,
|
||||
first_name text,
|
||||
last_name text,
|
||||
business_name text,
|
||||
username text,
|
||||
lid text,
|
||||
about_text text,
|
||||
updated_at integer
|
||||
);
|
||||
|
||||
create table if not exists groups (
|
||||
jid text primary key,
|
||||
name text,
|
||||
owner_jid text,
|
||||
created_at integer
|
||||
);
|
||||
|
||||
create table if not exists group_participants (
|
||||
group_jid text not null,
|
||||
user_jid text not null,
|
||||
contact_name text,
|
||||
first_name text,
|
||||
is_admin integer not null default 0,
|
||||
is_active integer not null default 0,
|
||||
primary key (group_jid, user_jid)
|
||||
);
|
||||
|
||||
create table if not exists messages (
|
||||
rowid integer primary key autoincrement,
|
||||
source_pk integer not null unique,
|
||||
chat_jid text not null,
|
||||
chat_name text,
|
||||
msg_id text not null,
|
||||
sender_jid text,
|
||||
sender_name text,
|
||||
ts integer not null,
|
||||
from_me integer not null,
|
||||
text text,
|
||||
raw_type integer not null default 0,
|
||||
message_type text,
|
||||
media_type text,
|
||||
media_title text,
|
||||
media_path text,
|
||||
media_url text,
|
||||
media_size integer,
|
||||
starred integer not null default 0
|
||||
);
|
||||
|
||||
create index if not exists idx_messages_chat_ts on messages(chat_jid, ts);
|
||||
create index if not exists idx_messages_chat_msg on messages(chat_jid, msg_id);
|
||||
create index if not exists idx_messages_ts on messages(ts);
|
||||
create index if not exists idx_messages_sender on messages(sender_jid);
|
||||
|
||||
create virtual table if not exists messages_fts using fts5(text, chat, sender, media);
|
||||
|
||||
create table if not exists sync_state (
|
||||
key text primary key,
|
||||
value text not null,
|
||||
updated_at integer not null
|
||||
);
|
||||
`
|
||||
349
internal/store/store.go
Normal file
349
internal/store/store.go
Normal file
@ -0,0 +1,349 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
)
|
||||
|
||||
const schemaVersion = 1
|
||||
|
||||
type Store struct {
|
||||
db *sql.DB
|
||||
path string
|
||||
}
|
||||
|
||||
type ImportStats struct {
|
||||
SourcePath string `json:"source_path"`
|
||||
DBPath string `json:"db_path"`
|
||||
Chats int `json:"chats"`
|
||||
Messages int `json:"messages"`
|
||||
MediaMessages int `json:"media_messages"`
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
FinishedAt time.Time `json:"finished_at"`
|
||||
}
|
||||
|
||||
type Status struct {
|
||||
DBPath string `json:"db_path"`
|
||||
Chats int `json:"chats"`
|
||||
UnreadChats int `json:"unread_chats"`
|
||||
UnreadMessages int `json:"unread_messages"`
|
||||
Messages int `json:"messages"`
|
||||
MediaMessages int `json:"media_messages"`
|
||||
OldestMessage time.Time `json:"oldest_message,omitzero"`
|
||||
NewestMessage time.Time `json:"newest_message,omitzero"`
|
||||
LastImportAt time.Time `json:"last_import_at,omitzero"`
|
||||
LastSource string `json:"last_source,omitempty"`
|
||||
}
|
||||
|
||||
type Chat struct {
|
||||
JID string `json:"jid"`
|
||||
Kind string `json:"kind"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Username string `json:"username,omitempty"`
|
||||
LastMessageAt time.Time `json:"last_message_at,omitzero"`
|
||||
UnreadCount int `json:"unread_count"`
|
||||
MessageCount int `json:"message_count"`
|
||||
}
|
||||
|
||||
type Contact struct {
|
||||
JID string `json:"jid"`
|
||||
Phone string `json:"phone,omitempty"`
|
||||
FullName string `json:"full_name,omitempty"`
|
||||
FirstName string `json:"first_name,omitempty"`
|
||||
LastName string `json:"last_name,omitempty"`
|
||||
BusinessName string `json:"business_name,omitempty"`
|
||||
Username string `json:"username,omitempty"`
|
||||
LID string `json:"lid,omitempty"`
|
||||
AboutText string `json:"about_text,omitempty"`
|
||||
UpdatedAt time.Time `json:"updated_at,omitzero"`
|
||||
}
|
||||
|
||||
type Group struct {
|
||||
JID string `json:"jid"`
|
||||
Name string `json:"name,omitempty"`
|
||||
OwnerJID string `json:"owner_jid,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at,omitzero"`
|
||||
}
|
||||
|
||||
type GroupParticipant struct {
|
||||
GroupJID string `json:"group_jid"`
|
||||
UserJID string `json:"user_jid"`
|
||||
ContactName string `json:"contact_name,omitempty"`
|
||||
FirstName string `json:"first_name,omitempty"`
|
||||
IsAdmin bool `json:"is_admin,omitempty"`
|
||||
IsActive bool `json:"is_active,omitempty"`
|
||||
}
|
||||
|
||||
type Message struct {
|
||||
SourcePK int64 `json:"source_pk"`
|
||||
ChatJID string `json:"chat_jid"`
|
||||
ChatName string `json:"chat_name,omitempty"`
|
||||
MessageID string `json:"message_id"`
|
||||
SenderJID string `json:"sender_jid,omitempty"`
|
||||
SenderName string `json:"sender_name,omitempty"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
FromMe bool `json:"from_me"`
|
||||
Text string `json:"text,omitempty"`
|
||||
RawType int `json:"raw_type"`
|
||||
MessageType string `json:"message_type,omitempty"`
|
||||
MediaType string `json:"media_type,omitempty"`
|
||||
MediaTitle string `json:"media_title,omitempty"`
|
||||
MediaPath string `json:"media_path,omitempty"`
|
||||
MediaURL string `json:"media_url,omitempty"`
|
||||
MediaSize int64 `json:"media_size,omitempty"`
|
||||
Starred bool `json:"starred,omitempty"`
|
||||
Snippet string `json:"snippet,omitempty"`
|
||||
}
|
||||
|
||||
type MessageFilter struct {
|
||||
Query string
|
||||
ChatJID string
|
||||
Sender string
|
||||
Limit int
|
||||
After *time.Time
|
||||
Before *time.Time
|
||||
FromMe *bool
|
||||
HasMedia bool
|
||||
Asc bool
|
||||
}
|
||||
|
||||
func Open(ctx context.Context, path string) (*Store, error) {
|
||||
if strings.TrimSpace(path) == "" {
|
||||
return nil, errors.New("db path is required")
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
|
||||
return nil, fmt.Errorf("mkdir db dir: %w", err)
|
||||
}
|
||||
dsn := fmt.Sprintf("file:%s?_pragma=foreign_keys(1)&_pragma=journal_mode(WAL)&_pragma=synchronous(NORMAL)&_pragma=busy_timeout(5000)", path)
|
||||
db, err := sql.Open("sqlite", dsn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
db.SetMaxOpenConns(1)
|
||||
if err := db.PingContext(ctx); err != nil {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
s := &Store{db: db, path: path}
|
||||
if _, err := db.ExecContext(ctx, schemaSQL); err != nil {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
if _, err := db.ExecContext(ctx, fmt.Sprintf("pragma user_version = %d", schemaVersion)); err != nil {
|
||||
_ = db.Close()
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Store) Close() error { return s.db.Close() }
|
||||
func (s *Store) Path() string { return s.path }
|
||||
|
||||
func (s *Store) ReplaceAll(ctx context.Context, stats ImportStats, chats []Chat, messages []Message) error {
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rollback(tx)
|
||||
for _, q := range []string{"delete from messages_fts", "delete from messages", "delete from chats", "delete from contacts", "delete from groups", "delete from group_participants", "delete from sync_state"} {
|
||||
if _, err := tx.ExecContext(ctx, q); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, c := range chats {
|
||||
if _, err := tx.ExecContext(ctx, `insert into chats(id,kind,name,username,last_message_at,unread_count,message_count) values(?,?,?,?,?,?,?)`,
|
||||
parseInt64(c.JID), c.Kind, c.Name, c.Username, unix(c.LastMessageAt), c.UnreadCount, c.MessageCount); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, m := range messages {
|
||||
if _, err := tx.ExecContext(ctx, `insert into messages(source_pk,chat_jid,chat_name,msg_id,sender_jid,sender_name,ts,from_me,text,raw_type,message_type,media_type,media_title,media_path,media_url,media_size,starred) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)`,
|
||||
m.SourcePK, m.ChatJID, m.ChatName, m.MessageID, m.SenderJID, m.SenderName, unix(m.Timestamp), boolInt(m.FromMe), m.Text, m.RawType, m.MessageType, m.MediaType, m.MediaTitle, m.MediaPath, m.MediaURL, m.MediaSize, boolInt(m.Starred)); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx, `insert into messages_fts(rowid,text,chat,sender,media) values((select rowid from messages where source_pk=?),?,?,?,?)`,
|
||||
m.SourcePK, strings.TrimSpace(m.Text+" "+m.MediaTitle), m.ChatName, m.SenderName, m.MediaType); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
now := stats.FinishedAt
|
||||
if now.IsZero() {
|
||||
now = time.Now().UTC()
|
||||
}
|
||||
for key, value := range map[string]string{"last_import_at": now.Format(time.RFC3339Nano), "source_path": stats.SourcePath} {
|
||||
if _, err := tx.ExecContext(ctx, `insert into sync_state(key,value,updated_at) values(?,?,?)`, key, value, unix(now)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (s *Store) Status(ctx context.Context) (Status, error) {
|
||||
out := Status{DBPath: s.path}
|
||||
for _, c := range []struct {
|
||||
dst *int
|
||||
q string
|
||||
}{
|
||||
{&out.Chats, "select count(*) from chats"},
|
||||
{&out.UnreadChats, "select count(*) from chats where unread_count > 0"},
|
||||
{&out.UnreadMessages, "select coalesce(sum(unread_count), 0) from chats"},
|
||||
{&out.Messages, "select count(*) from messages"},
|
||||
{&out.MediaMessages, "select count(*) from messages where media_type <> ''"},
|
||||
} {
|
||||
if err := s.db.QueryRowContext(ctx, c.q).Scan(c.dst); err != nil {
|
||||
return out, err
|
||||
}
|
||||
}
|
||||
var oldest, newest sql.NullInt64
|
||||
if err := s.db.QueryRowContext(ctx, `select min(ts), max(ts) from messages`).Scan(&oldest, &newest); err != nil {
|
||||
return out, err
|
||||
}
|
||||
if oldest.Valid {
|
||||
out.OldestMessage = fromUnix(oldest.Int64)
|
||||
}
|
||||
if newest.Valid {
|
||||
out.NewestMessage = fromUnix(newest.Int64)
|
||||
}
|
||||
var lastImport string
|
||||
_ = s.db.QueryRowContext(ctx, `select value from sync_state where key='last_import_at'`).Scan(&lastImport)
|
||||
if t, err := time.Parse(time.RFC3339Nano, lastImport); err == nil {
|
||||
out.LastImportAt = t
|
||||
}
|
||||
_ = s.db.QueryRowContext(ctx, `select value from sync_state where key='source_path'`).Scan(&out.LastSource)
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *Store) ListChats(ctx context.Context, limit int, unread bool) ([]Chat, error) {
|
||||
if limit <= 0 {
|
||||
limit = 50
|
||||
}
|
||||
where := ""
|
||||
if unread {
|
||||
where = "where unread_count > 0"
|
||||
}
|
||||
rows, err := s.db.QueryContext(ctx, fmt.Sprintf(`select cast(id as text),kind,name,username,last_message_at,unread_count,message_count from chats %s order by last_message_at desc limit ?`, where), limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []Chat
|
||||
for rows.Next() {
|
||||
var c Chat
|
||||
var ts int64
|
||||
if err := rows.Scan(&c.JID, &c.Kind, &c.Name, &c.Username, &ts, &c.UnreadCount, &c.MessageCount); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.LastMessageAt = fromUnix(ts)
|
||||
out = append(out, c)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func (s *Store) Messages(ctx context.Context, filter MessageFilter) ([]Message, error) {
|
||||
return s.messages(ctx, filter, false)
|
||||
}
|
||||
|
||||
func (s *Store) Search(ctx context.Context, filter MessageFilter) ([]Message, error) {
|
||||
if strings.TrimSpace(filter.Query) == "" {
|
||||
return nil, errors.New("search query required")
|
||||
}
|
||||
return s.messages(ctx, filter, true)
|
||||
}
|
||||
|
||||
func (s *Store) messages(ctx context.Context, filter MessageFilter, search bool) ([]Message, error) {
|
||||
if filter.Limit <= 0 {
|
||||
filter.Limit = 50
|
||||
}
|
||||
query := `select source_pk,chat_jid,chat_name,msg_id,sender_jid,sender_name,ts,from_me,text,raw_type,message_type,media_type,media_title,media_path,media_url,media_size,starred,'' from messages where 1=1`
|
||||
args := []any{}
|
||||
prefix := ""
|
||||
if search {
|
||||
query = `select m.source_pk,m.chat_jid,m.chat_name,m.msg_id,m.sender_jid,m.sender_name,m.ts,m.from_me,m.text,m.raw_type,m.message_type,m.media_type,m.media_title,m.media_path,m.media_url,m.media_size,m.starred,snippet(messages_fts,0,'[',']','...',12) from messages_fts f join messages m on m.rowid=f.rowid where messages_fts match ?`
|
||||
args = append(args, filter.Query)
|
||||
prefix = "m."
|
||||
}
|
||||
if filter.ChatJID != "" {
|
||||
query += " and " + prefix + "chat_jid = ?"
|
||||
args = append(args, filter.ChatJID)
|
||||
}
|
||||
if filter.Sender != "" {
|
||||
query += " and " + prefix + "sender_jid = ?"
|
||||
args = append(args, filter.Sender)
|
||||
}
|
||||
if filter.After != nil {
|
||||
query += " and " + prefix + "ts >= ?"
|
||||
args = append(args, unix(*filter.After))
|
||||
}
|
||||
if filter.Before != nil {
|
||||
query += " and " + prefix + "ts <= ?"
|
||||
args = append(args, unix(*filter.Before))
|
||||
}
|
||||
if filter.FromMe != nil {
|
||||
query += " and " + prefix + "from_me = ?"
|
||||
args = append(args, boolInt(*filter.FromMe))
|
||||
}
|
||||
if filter.HasMedia {
|
||||
query += " and " + prefix + "media_type <> ''"
|
||||
}
|
||||
if search {
|
||||
query += " order by bm25(messages_fts) limit ?"
|
||||
} else if filter.Asc {
|
||||
query += " order by ts asc, source_pk asc limit ?"
|
||||
} else {
|
||||
query += " order by ts desc, source_pk desc limit ?"
|
||||
}
|
||||
args = append(args, filter.Limit)
|
||||
rows, err := s.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var out []Message
|
||||
for rows.Next() {
|
||||
var m Message
|
||||
var ts int64
|
||||
var fromMe, starred int
|
||||
if err := rows.Scan(&m.SourcePK, &m.ChatJID, &m.ChatName, &m.MessageID, &m.SenderJID, &m.SenderName, &ts, &fromMe, &m.Text, &m.RawType, &m.MessageType, &m.MediaType, &m.MediaTitle, &m.MediaPath, &m.MediaURL, &m.MediaSize, &starred, &m.Snippet); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m.Timestamp = fromUnix(ts)
|
||||
m.FromMe = fromMe != 0
|
||||
m.Starred = starred != 0
|
||||
out = append(out, m)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func boolInt(v bool) int {
|
||||
if v {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
}
|
||||
func unix(t time.Time) int64 {
|
||||
if t.IsZero() {
|
||||
return 0
|
||||
}
|
||||
return t.UTC().Unix()
|
||||
}
|
||||
func fromUnix(v int64) time.Time {
|
||||
if v <= 0 {
|
||||
return time.Time{}
|
||||
}
|
||||
return time.Unix(v, 0).UTC()
|
||||
}
|
||||
func rollback(tx *sql.Tx) { _ = tx.Rollback() }
|
||||
|
||||
func parseInt64(s string) int64 {
|
||||
var out int64
|
||||
_, _ = fmt.Sscan(s, &out)
|
||||
return out
|
||||
}
|
||||
213
internal/telegramdesktop/importer.go
Normal file
213
internal/telegramdesktop/importer.go
Normal file
@ -0,0 +1,213 @@
|
||||
package telegramdesktop
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
_ "embed"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/openclaw/telecrawl/internal/store"
|
||||
)
|
||||
|
||||
//go:embed scripts/import_tdata.py
|
||||
var importScript string
|
||||
|
||||
type ImportOptions struct {
|
||||
Path string
|
||||
Python string
|
||||
Session string
|
||||
DialogsLimit int
|
||||
MessagesLimit int
|
||||
}
|
||||
|
||||
type ImportResult struct {
|
||||
Stats store.ImportStats
|
||||
Chats []store.Chat
|
||||
Messages []store.Message
|
||||
}
|
||||
|
||||
type pyResult struct {
|
||||
SourcePath string `json:"source_path"`
|
||||
StartedAt string `json:"started_at"`
|
||||
FinishedAt string `json:"finished_at"`
|
||||
Chats []struct {
|
||||
ID string `json:"id"`
|
||||
Kind string `json:"kind"`
|
||||
Name string `json:"name"`
|
||||
Username string `json:"username"`
|
||||
LastMessageAt string `json:"last_message_at"`
|
||||
UnreadCount int `json:"unread_count"`
|
||||
MessageCount int `json:"message_count"`
|
||||
} `json:"chats"`
|
||||
Messages []struct {
|
||||
SourcePK int64 `json:"source_pk"`
|
||||
ChatID string `json:"chat_id"`
|
||||
ChatName string `json:"chat_name"`
|
||||
MessageID string `json:"message_id"`
|
||||
SenderID string `json:"sender_id"`
|
||||
SenderName string `json:"sender_name"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
FromMe bool `json:"from_me"`
|
||||
Text string `json:"text"`
|
||||
MessageType string `json:"message_type"`
|
||||
MediaType string `json:"media_type"`
|
||||
MediaTitle string `json:"media_title"`
|
||||
} `json:"messages"`
|
||||
}
|
||||
|
||||
func Import(ctx context.Context, opts ImportOptions, dbPath string) (ImportResult, error) {
|
||||
tdata := strings.TrimSpace(opts.Path)
|
||||
if tdata == "" {
|
||||
tdata = DefaultPath()
|
||||
}
|
||||
python, err := resolvePython(opts.Python)
|
||||
if err != nil {
|
||||
return ImportResult{}, err
|
||||
}
|
||||
session := strings.TrimSpace(opts.Session)
|
||||
if session == "" {
|
||||
session = defaultSessionPath(dbPath)
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Dir(session), 0o700); err != nil {
|
||||
return ImportResult{}, err
|
||||
}
|
||||
script, cleanup, err := writeTempScript()
|
||||
if err != nil {
|
||||
return ImportResult{}, err
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
args := []string{
|
||||
script,
|
||||
"--tdata", tdata,
|
||||
"--session", session,
|
||||
"--dialogs-limit", fmt.Sprint(opts.DialogsLimit),
|
||||
"--messages-limit", fmt.Sprint(opts.MessagesLimit),
|
||||
}
|
||||
cmd := exec.CommandContext(ctx, python, args...) // #nosec G204 -- python and args are explicit CLI configuration.
|
||||
var stdout, stderr bytes.Buffer
|
||||
cmd.Stdout = &stdout
|
||||
cmd.Stderr = &stderr
|
||||
if err := cmd.Run(); err != nil {
|
||||
msg := strings.TrimSpace(stderr.String())
|
||||
if strings.Contains(msg, "ModuleNotFoundError") || strings.Contains(msg, "No module named") {
|
||||
return ImportResult{}, fmt.Errorf("python dependency missing: run `%s -m pip install opentele2 telethon`: %s", python, msg)
|
||||
}
|
||||
if msg != "" {
|
||||
return ImportResult{}, fmt.Errorf("telegram import failed: %w: %s", err, msg)
|
||||
}
|
||||
return ImportResult{}, fmt.Errorf("telegram import failed: %w", err)
|
||||
}
|
||||
var raw pyResult
|
||||
if err := json.Unmarshal(stdout.Bytes(), &raw); err != nil {
|
||||
return ImportResult{}, fmt.Errorf("decode importer output: %w", err)
|
||||
}
|
||||
result := ImportResult{}
|
||||
started := parseTime(raw.StartedAt)
|
||||
finished := parseTime(raw.FinishedAt)
|
||||
result.Stats = store.ImportStats{SourcePath: raw.SourcePath, DBPath: dbPath, StartedAt: started, FinishedAt: finished}
|
||||
for _, c := range raw.Chats {
|
||||
result.Chats = append(result.Chats, store.Chat{
|
||||
JID: c.ID,
|
||||
Kind: c.Kind,
|
||||
Name: c.Name,
|
||||
Username: c.Username,
|
||||
LastMessageAt: parseTime(c.LastMessageAt),
|
||||
UnreadCount: c.UnreadCount,
|
||||
MessageCount: c.MessageCount,
|
||||
})
|
||||
}
|
||||
for _, m := range raw.Messages {
|
||||
msg := store.Message{
|
||||
SourcePK: m.SourcePK,
|
||||
ChatJID: m.ChatID,
|
||||
ChatName: m.ChatName,
|
||||
MessageID: m.MessageID,
|
||||
SenderJID: m.SenderID,
|
||||
SenderName: m.SenderName,
|
||||
Timestamp: parseTime(m.Timestamp),
|
||||
FromMe: m.FromMe,
|
||||
Text: m.Text,
|
||||
MessageType: m.MessageType,
|
||||
MediaType: m.MediaType,
|
||||
MediaTitle: m.MediaTitle,
|
||||
}
|
||||
if msg.MediaType != "" {
|
||||
result.Stats.MediaMessages++
|
||||
}
|
||||
result.Messages = append(result.Messages, msg)
|
||||
}
|
||||
result.Stats.Chats = len(result.Chats)
|
||||
result.Stats.Messages = len(result.Messages)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func resolvePython(configured string) (string, error) {
|
||||
if strings.TrimSpace(configured) != "" {
|
||||
return configured, nil
|
||||
}
|
||||
if env := strings.TrimSpace(os.Getenv("TELECRAWL_PYTHON")); env != "" {
|
||||
return env, nil
|
||||
}
|
||||
candidates := []string{
|
||||
filepath.Join(defaultBaseDir(), "venv", "bin", "python"),
|
||||
filepath.Join("/tmp", "telecrawl-opentele311", "bin", "python"),
|
||||
filepath.Join(os.TempDir(), "telecrawl-opentele311", "bin", "python"),
|
||||
"python3.11",
|
||||
"python3.12",
|
||||
"python3",
|
||||
}
|
||||
for _, candidate := range candidates {
|
||||
if path, err := exec.LookPath(candidate); err == nil {
|
||||
return path, nil
|
||||
}
|
||||
if strings.HasPrefix(candidate, string(filepath.Separator)) {
|
||||
if info, err := os.Stat(candidate); err == nil && !info.IsDir() {
|
||||
return candidate, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", errors.New("python not found; install python3.11 or set TELECRAWL_PYTHON")
|
||||
}
|
||||
|
||||
func defaultSessionPath(dbPath string) string {
|
||||
sum := sha256.Sum256([]byte(dbPath))
|
||||
return filepath.Join(defaultBaseDir(), "sessions", fmt.Sprintf("tdata-%x.session", sum[:6]))
|
||||
}
|
||||
|
||||
func defaultBaseDir() string {
|
||||
home, _ := os.UserHomeDir()
|
||||
return filepath.Join(home, ".telecrawl")
|
||||
}
|
||||
|
||||
func writeTempScript() (string, func(), error) {
|
||||
dir, err := os.MkdirTemp("", "telecrawl-import-*")
|
||||
if err != nil {
|
||||
return "", func() {}, err
|
||||
}
|
||||
path := filepath.Join(dir, "import_tdata.py")
|
||||
if err := os.WriteFile(path, []byte(importScript), 0o600); err != nil {
|
||||
_ = os.RemoveAll(dir)
|
||||
return "", func() {}, err
|
||||
}
|
||||
return path, func() { _ = os.RemoveAll(dir) }, nil
|
||||
}
|
||||
|
||||
func parseTime(value string) time.Time {
|
||||
value = strings.TrimSpace(value)
|
||||
if value == "" {
|
||||
return time.Time{}
|
||||
}
|
||||
if t, err := time.Parse(time.RFC3339Nano, value); err == nil {
|
||||
return t.UTC()
|
||||
}
|
||||
return time.Time{}
|
||||
}
|
||||
178
internal/telegramdesktop/probe.go
Normal file
178
internal/telegramdesktop/probe.go
Normal file
@ -0,0 +1,178 @@
|
||||
package telegramdesktop
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const maxProbeBytes = 16
|
||||
|
||||
type Options struct {
|
||||
Path string
|
||||
}
|
||||
|
||||
type Report struct {
|
||||
Path string `json:"path"`
|
||||
Exists bool `json:"exists"`
|
||||
Accessible bool `json:"accessible"`
|
||||
Store string `json:"store"`
|
||||
SQLiteFiles int `json:"sqlite_files"`
|
||||
TDesktopFiles int `json:"tdesktop_files"`
|
||||
AccountDirs int `json:"account_dirs,omitempty"`
|
||||
FilesScanned int `json:"files_scanned"`
|
||||
BytesScanned int64 `json:"bytes_scanned"`
|
||||
DryRun bool `json:"dry_run,omitempty"`
|
||||
Samples []Sample `json:"samples,omitempty"`
|
||||
Note string `json:"note,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type Sample struct {
|
||||
Path string `json:"path"`
|
||||
Kind string `json:"kind"`
|
||||
Size int64 `json:"size"`
|
||||
}
|
||||
|
||||
func DefaultPath() string {
|
||||
home, _ := os.UserHomeDir()
|
||||
switch runtime.GOOS {
|
||||
case "darwin":
|
||||
return filepath.Join(home, "Library", "Application Support", "Telegram Desktop", "tdata")
|
||||
case "windows":
|
||||
if appData := strings.TrimSpace(os.Getenv("APPDATA")); appData != "" {
|
||||
return filepath.Join(appData, "Telegram Desktop", "tdata")
|
||||
}
|
||||
return filepath.Join(home, "AppData", "Roaming", "Telegram Desktop", "tdata")
|
||||
default:
|
||||
if dataHome := strings.TrimSpace(os.Getenv("XDG_DATA_HOME")); dataHome != "" {
|
||||
return filepath.Join(dataHome, "TelegramDesktop", "tdata")
|
||||
}
|
||||
return filepath.Join(home, ".local", "share", "TelegramDesktop", "tdata")
|
||||
}
|
||||
}
|
||||
|
||||
func Probe(ctx context.Context, opts Options) Report {
|
||||
path := strings.TrimSpace(opts.Path)
|
||||
if path == "" {
|
||||
path = DefaultPath()
|
||||
}
|
||||
report := Report{Path: path, Store: "missing"}
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
report.Error = err.Error()
|
||||
return report
|
||||
}
|
||||
report.Exists = true
|
||||
if !info.IsDir() {
|
||||
report.Store = "unsupported-file"
|
||||
report.Error = "path is not a directory"
|
||||
return report
|
||||
}
|
||||
err = filepath.WalkDir(path, func(p string, entry fs.DirEntry, walkErr error) error {
|
||||
if walkErr != nil {
|
||||
if report.Error == "" {
|
||||
report.Error = walkErr.Error()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
if entry.IsDir() {
|
||||
if isLikelyAccountDir(entry.Name()) && p != path {
|
||||
report.AccountDirs++
|
||||
}
|
||||
return nil
|
||||
}
|
||||
info, err := entry.Info()
|
||||
if err != nil {
|
||||
if report.Error == "" {
|
||||
report.Error = err.Error()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
kind, ok := sniffFile(p)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
report.FilesScanned++
|
||||
report.BytesScanned += minInt64(info.Size(), maxProbeBytes)
|
||||
switch kind {
|
||||
case "sqlite":
|
||||
report.SQLiteFiles++
|
||||
case "tdesktop":
|
||||
report.TDesktopFiles++
|
||||
}
|
||||
if len(report.Samples) < 8 {
|
||||
report.Samples = append(report.Samples, Sample{Path: p, Kind: kind, Size: info.Size()})
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
report.Error = err.Error()
|
||||
}
|
||||
report.Accessible = report.FilesScanned > 0 && report.Error == ""
|
||||
switch {
|
||||
case report.SQLiteFiles > 0:
|
||||
report.Store = "sqlite"
|
||||
case report.TDesktopFiles > 0:
|
||||
report.Store = "tdesktop-binary"
|
||||
report.Note = "Telegram Desktop tdata is readable, but messages are in TDesktop binary/encrypted storage, not SQLite"
|
||||
case report.FilesScanned > 0:
|
||||
report.Store = "unknown"
|
||||
default:
|
||||
report.Store = "empty"
|
||||
}
|
||||
return report
|
||||
}
|
||||
|
||||
func sniffFile(path string) (string, bool) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return "", false
|
||||
}
|
||||
defer f.Close()
|
||||
var header [maxProbeBytes]byte
|
||||
n, err := io.ReadFull(f, header[:])
|
||||
if err != nil && !errorsIsEOF(err) {
|
||||
return "", false
|
||||
}
|
||||
buf := header[:n]
|
||||
switch {
|
||||
case bytes.HasPrefix(buf, []byte("SQLite format 3")):
|
||||
return "sqlite", true
|
||||
case bytes.HasPrefix(buf, []byte("TDF$")), bytes.HasPrefix(buf, []byte("TDDF")):
|
||||
return "tdesktop", true
|
||||
default:
|
||||
return "other", true
|
||||
}
|
||||
}
|
||||
|
||||
func errorsIsEOF(err error) bool {
|
||||
return err == io.EOF || err == io.ErrUnexpectedEOF
|
||||
}
|
||||
|
||||
func isLikelyAccountDir(name string) bool {
|
||||
if len(name) != 16 {
|
||||
return false
|
||||
}
|
||||
for _, r := range name {
|
||||
if !((r >= '0' && r <= '9') || (r >= 'A' && r <= 'F')) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func minInt64(a, b int64) int64 {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
39
internal/telegramdesktop/probe_test.go
Normal file
39
internal/telegramdesktop/probe_test.go
Normal file
@ -0,0 +1,39 @@
|
||||
package telegramdesktop
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestProbeDetectsTDesktopStore(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
if err := os.WriteFile(filepath.Join(dir, "key_datas"), []byte("TDF$hello"), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
report := Probe(context.Background(), Options{Path: dir})
|
||||
if !report.Accessible {
|
||||
t.Fatalf("expected accessible report: %+v", report)
|
||||
}
|
||||
if report.Store != "tdesktop-binary" {
|
||||
t.Fatalf("store = %q, want tdesktop-binary", report.Store)
|
||||
}
|
||||
if report.TDesktopFiles != 1 {
|
||||
t.Fatalf("tdesktop_files = %d, want 1", report.TDesktopFiles)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProbeDetectsSQLiteStore(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
if err := os.WriteFile(filepath.Join(dir, "messages.sqlite"), []byte("SQLite format 3\x00"), 0o600); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
report := Probe(context.Background(), Options{Path: dir})
|
||||
if report.Store != "sqlite" {
|
||||
t.Fatalf("store = %q, want sqlite", report.Store)
|
||||
}
|
||||
if report.SQLiteFiles != 1 {
|
||||
t.Fatalf("sqlite_files = %d, want 1", report.SQLiteFiles)
|
||||
}
|
||||
}
|
||||
139
internal/telegramdesktop/scripts/import_tdata.py
Normal file
139
internal/telegramdesktop/scripts/import_tdata.py
Normal file
@ -0,0 +1,139 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from opentele2.api import UseCurrentSession
|
||||
from opentele2.td import TDesktop
|
||||
|
||||
|
||||
def iso(dt):
|
||||
if not dt:
|
||||
return ""
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt.astimezone(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def stable_pk(chat_id, message_id):
|
||||
digest = hashlib.blake2b(f"{chat_id}:{message_id}".encode(), digest_size=8).digest()
|
||||
value = int.from_bytes(digest, "big", signed=False) & ((1 << 63) - 1)
|
||||
return value or 1
|
||||
|
||||
|
||||
def entity_kind(entity):
|
||||
name = type(entity).__name__.lower()
|
||||
if "user" in name:
|
||||
return "user"
|
||||
if "channel" in name:
|
||||
return "channel"
|
||||
if "chat" in name:
|
||||
return "group"
|
||||
return name or "unknown"
|
||||
|
||||
|
||||
def display_name(entity, fallback):
|
||||
for attr in ("title", "first_name", "last_name", "username"):
|
||||
value = getattr(entity, attr, None)
|
||||
if value:
|
||||
if attr == "first_name":
|
||||
last = getattr(entity, "last_name", None)
|
||||
return f"{value} {last}".strip() if last else value
|
||||
return value
|
||||
return fallback or str(getattr(entity, "id", ""))
|
||||
|
||||
|
||||
def media_type(message):
|
||||
media = getattr(message, "media", None)
|
||||
if not media:
|
||||
return ""
|
||||
name = type(media).__name__
|
||||
return name.replace("MessageMedia", "").lower() or name.lower()
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--tdata", required=True)
|
||||
parser.add_argument("--session", required=True)
|
||||
parser.add_argument("--dialogs-limit", type=int, default=200)
|
||||
parser.add_argument("--messages-limit", type=int, default=500)
|
||||
args = parser.parse_args()
|
||||
|
||||
started = datetime.now(timezone.utc)
|
||||
td = TDesktop(args.tdata)
|
||||
if not td.isLoaded():
|
||||
raise SystemExit("tdata did not load")
|
||||
client = await td.ToTelethon(session=args.session, flag=UseCurrentSession)
|
||||
await client.connect()
|
||||
if not await client.is_user_authorized():
|
||||
raise SystemExit("Telegram session is not authorized")
|
||||
|
||||
dialogs = await client.get_dialogs(limit=None if args.dialogs_limit <= 0 else args.dialogs_limit)
|
||||
out_chats = []
|
||||
out_messages = []
|
||||
for dialog in dialogs:
|
||||
entity = dialog.entity
|
||||
chat_id = str(dialog.id)
|
||||
chat_name = display_name(entity, getattr(dialog, "name", ""))
|
||||
limit = None if args.messages_limit <= 0 else args.messages_limit
|
||||
messages = await client.get_messages(entity, limit=limit)
|
||||
last_message_at = None
|
||||
for msg in messages:
|
||||
if not getattr(msg, "id", None):
|
||||
continue
|
||||
if getattr(msg, "date", None) and (last_message_at is None or msg.date > last_message_at):
|
||||
last_message_at = msg.date
|
||||
sender_id = ""
|
||||
sender = getattr(msg, "sender", None)
|
||||
if sender is not None:
|
||||
sender_id = str(getattr(sender, "id", "") or "")
|
||||
elif getattr(msg, "sender_id", None):
|
||||
sender_id = str(msg.sender_id)
|
||||
sender_name = display_name(sender, "") if sender else ""
|
||||
text = getattr(msg, "message", "") or ""
|
||||
out_messages.append(
|
||||
{
|
||||
"source_pk": stable_pk(chat_id, msg.id),
|
||||
"chat_id": chat_id,
|
||||
"chat_name": chat_name,
|
||||
"message_id": str(msg.id),
|
||||
"sender_id": sender_id,
|
||||
"sender_name": sender_name,
|
||||
"timestamp": iso(getattr(msg, "date", None)),
|
||||
"from_me": bool(getattr(msg, "out", False)),
|
||||
"text": text,
|
||||
"message_type": type(msg).__name__,
|
||||
"media_type": media_type(msg),
|
||||
"media_title": "",
|
||||
}
|
||||
)
|
||||
out_chats.append(
|
||||
{
|
||||
"id": chat_id,
|
||||
"kind": entity_kind(entity),
|
||||
"name": chat_name,
|
||||
"username": getattr(entity, "username", "") or "",
|
||||
"last_message_at": iso(last_message_at),
|
||||
"unread_count": int(getattr(dialog, "unread_count", 0) or 0),
|
||||
"message_count": len(messages),
|
||||
}
|
||||
)
|
||||
|
||||
await client.disconnect()
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"source_path": args.tdata,
|
||||
"started_at": iso(started),
|
||||
"finished_at": iso(datetime.now(timezone.utc)),
|
||||
"chats": out_chats,
|
||||
"messages": out_messages,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
Loading…
Reference in New Issue
Block a user