feat(backup): checkpoint long gmail fetches
This commit is contained in:
parent
e6b6046bb1
commit
9d7ca4ec5f
@ -8,6 +8,7 @@
|
||||
- Backup: extend `--services all` with Drive permissions/comments/revisions, Calendar ACL/settings/colors, contact groups, Cloud Identity groups, Workspace Admin Directory users/groups/members, Keep notes, and local Gmail message caching for resumable full-mailbox fetches.
|
||||
- Backup: bound individual Drive content exports with `--drive-content-timeout` so one stuck Google export records an encrypted error row instead of blocking the full backup.
|
||||
- Backup: add Gmail message-list checkpoints, streaming shard construction, and stderr progress counters so full-mailbox backups can resume cleanly after interruption without keeping every raw message in RAM.
|
||||
- Backup: push encrypted incomplete Gmail checkpoint commits during long cached fetches so day-scale mailbox backups have offsite progress before the final manifest is committed.
|
||||
- Calendar: add `--start-timezone` / `--end-timezone` to `calendar create` and `calendar update` for preserving named IANA event timezones when RFC3339 inputs only carry numeric offsets. (#422)
|
||||
- Drive: add `drive search --drive` and `--parent` for scoping search to a shared drive or folder. (#525) — thanks @LeanSheng.
|
||||
- Gmail: add `gmail messages search --body-format html` for returning HTML message bodies when `--include-body` is used. (#520) — thanks @alexknowshtml.
|
||||
|
||||
@ -756,7 +756,11 @@ per-file export into an encrypted error row instead of wedging the run. Gmail
|
||||
raw-message fetches and message-list pages use a local cache by default so
|
||||
interrupted full-mailbox backups can resume. Full Gmail runs build encrypted
|
||||
message shards from cached messages instead of keeping the whole mailbox in
|
||||
memory; progress is written to stderr while stdout stays parseable. Use
|
||||
memory; progress is written to stderr while stdout stays parseable. Cached
|
||||
Gmail runs also push incomplete encrypted checkpoint commits during long fetches
|
||||
by default (`--gmail-checkpoint-rows`, `--gmail-checkpoint-interval`,
|
||||
`--no-gmail-checkpoints`). Checkpoints live under `checkpoints/` and do not
|
||||
replace the authoritative `manifest.json` until the final backup completes. Use
|
||||
`--gmail-refresh-cache` to force a refetch. Workspace inventories
|
||||
Docs/Sheets/Slides and backs up Forms/responses discovered through Drive; add
|
||||
`--workspace-native` for full native Docs/Sheets/Slides API JSON.
|
||||
|
||||
@ -73,8 +73,9 @@ Supported services:
|
||||
|
||||
- `gmail`: labels and raw MIME messages. Fetched raw messages are cached under
|
||||
the local user cache by default so interrupted full-mailbox runs can resume
|
||||
the expensive message download phase; use `--no-gmail-cache` or
|
||||
`--gmail-refresh-cache` to bypass it.
|
||||
the expensive message download phase. Cached runs also write encrypted
|
||||
incomplete checkpoint commits during long fetches; use `--no-gmail-cache`,
|
||||
`--gmail-refresh-cache`, or `--no-gmail-checkpoints` to bypass those layers.
|
||||
- `gmail-settings`: filters, forwarding addresses, auto-forwarding, send-as
|
||||
aliases, vacation responder, delegate visibility, POP, IMAP, and language
|
||||
settings.
|
||||
@ -271,6 +272,22 @@ counters to stderr while stdout stays parseable. `--gmail-refresh-cache` forces
|
||||
a refetch. The cache is plaintext local data; clear it if the machine should not
|
||||
retain local mail copies outside the encrypted backup/export locations.
|
||||
|
||||
Cached Gmail runs also push incomplete encrypted checkpoint snapshots to the
|
||||
backup Git repo. Checkpoint shards and manifests live under
|
||||
`checkpoints/gmail/<account-hash>/<run-id>/`, are encrypted with the same age
|
||||
recipients as normal backup shards, and are committed with messages like
|
||||
`checkpoint: gmail backup 20000/359635`. The checkpoint manifest has
|
||||
`"incomplete": true`; `gog backup status`, `verify`, `cat`, and `export` continue
|
||||
to use the root `manifest.json` as the authoritative completed backup. This
|
||||
keeps long runs crash-tolerant without pretending partial data is a finished
|
||||
snapshot. A checkpoint commit can cover many messages, but its encrypted files
|
||||
are split into smaller shard files to stay below normal GitHub blob limits. Tune
|
||||
the commit cadence with `--gmail-checkpoint-rows` / `--gmail-checkpoint-interval`
|
||||
on `gog backup push`, or `--checkpoint-rows` / `--checkpoint-interval` on
|
||||
`gog backup gmail push`; set the interval or rows to `0` to disable that
|
||||
trigger, or use `--no-gmail-checkpoints` / `--no-checkpoints` to disable
|
||||
checkpoint pushes entirely.
|
||||
|
||||
`--include-spam-trash` defaults to true. Use `--query` and `--max` for bounded
|
||||
test exports; omit them for a full mailbox scan.
|
||||
|
||||
|
||||
@ -33,6 +33,33 @@ type Manifest struct {
|
||||
Shards []ShardEntry `json:"shards"`
|
||||
}
|
||||
|
||||
type Checkpoint struct {
|
||||
RunID string
|
||||
Service string
|
||||
Account string
|
||||
Done int
|
||||
Total int
|
||||
Fetched int
|
||||
CacheHits int
|
||||
}
|
||||
|
||||
type CheckpointManifest struct {
|
||||
Format int `json:"format"`
|
||||
App string `json:"app"`
|
||||
Encrypted bool `json:"encrypted"`
|
||||
Incomplete bool `json:"incomplete"`
|
||||
Exported time.Time `json:"exported"`
|
||||
Recipients []string `json:"recipients,omitempty"`
|
||||
RunID string `json:"run_id"`
|
||||
Service string `json:"service"`
|
||||
Account string `json:"account,omitempty"`
|
||||
Done int `json:"done"`
|
||||
Total int `json:"total"`
|
||||
Fetched int `json:"fetched,omitempty"`
|
||||
CacheHits int `json:"cache_hits,omitempty"`
|
||||
Shards []ShardEntry `json:"shards"`
|
||||
}
|
||||
|
||||
type ShardEntry struct {
|
||||
Service string `json:"service"`
|
||||
Kind string `json:"kind"`
|
||||
@ -124,6 +151,44 @@ func PushSnapshot(ctx context.Context, snapshot Snapshot, opts Options) (Result,
|
||||
return Result{Repo: cfg.Repo, Changed: changed, Encrypted: true, Shards: len(manifest.Shards), Counts: manifest.Counts}, nil
|
||||
}
|
||||
|
||||
func PushCheckpoint(ctx context.Context, snapshot Snapshot, checkpoint Checkpoint, opts Options) (Result, error) {
|
||||
defer cleanupPlainShardFiles(snapshot)
|
||||
cfg, err := ResolveOptions(opts)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
if len(cfg.Recipients) == 0 {
|
||||
recipient, err := RecipientFromIdentity(cfg.Identity)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
cfg.Recipients = []string{recipient}
|
||||
}
|
||||
if err := ensureRepo(ctx, cfg); err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
if err := writeBackupReadme(cfg.Repo); err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
manifest, err := writeCheckpoint(ctx, cfg, snapshot, checkpoint)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
changed, err := commitAndPush(ctx, cfg, fmt.Sprintf("checkpoint: %s backup %d/%d", manifest.Service, manifest.Done, manifest.Total), opts.Push)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
counts := map[string]int{}
|
||||
for _, shard := range manifest.Shards {
|
||||
key := shard.Service
|
||||
if shard.Kind != "" {
|
||||
key += "." + shard.Kind
|
||||
}
|
||||
counts[key] += shard.Rows
|
||||
}
|
||||
return Result{Repo: cfg.Repo, Changed: changed, Encrypted: true, Shards: len(manifest.Shards), Counts: counts}, nil
|
||||
}
|
||||
|
||||
func Verify(ctx context.Context, opts Options) (Result, error) {
|
||||
cfg, err := ResolveOptions(opts)
|
||||
if err != nil {
|
||||
@ -199,6 +264,70 @@ func NewJSONLShard(service, kind, account, rel string, rows any) (PlainShard, er
|
||||
}, nil
|
||||
}
|
||||
|
||||
func writeCheckpoint(ctx context.Context, cfg Config, snapshot Snapshot, checkpoint Checkpoint) (CheckpointManifest, error) {
|
||||
checkpoint.Service = safePathPart(checkpoint.Service)
|
||||
checkpoint.Account = safePathPart(checkpoint.Account)
|
||||
checkpoint.RunID = safePathPart(checkpoint.RunID)
|
||||
if checkpoint.Service == "" || checkpoint.RunID == "" {
|
||||
return CheckpointManifest{}, fmt.Errorf("backup checkpoint service and run id are required")
|
||||
}
|
||||
dir := path.Join("checkpoints", checkpoint.Service, checkpoint.Account, checkpoint.RunID)
|
||||
manifestRel := path.Join(dir, "manifest.json")
|
||||
old, _ := readCheckpointManifest(cfg.Repo, manifestRel)
|
||||
recipients := normalizedStrings(cfg.Recipients)
|
||||
reuseEncrypted := sameStrings(old.Recipients, recipients)
|
||||
replace := map[string]struct{}{}
|
||||
for _, shard := range snapshot.Shards {
|
||||
clean := path.Clean(strings.TrimSpace(shard.Path))
|
||||
if !strings.HasPrefix(clean, dir+"/") {
|
||||
return CheckpointManifest{}, fmt.Errorf("backup checkpoint shard path %q is outside %s", shard.Path, dir)
|
||||
}
|
||||
replace[clean] = struct{}{}
|
||||
}
|
||||
shards := make([]ShardEntry, 0, len(old.Shards)+len(snapshot.Shards))
|
||||
if reuseEncrypted {
|
||||
for _, shard := range old.Shards {
|
||||
if _, ok := replace[shard.Path]; !ok {
|
||||
shards = append(shards, shard)
|
||||
}
|
||||
}
|
||||
}
|
||||
oldManifest := Manifest{Recipients: old.Recipients, Shards: old.Shards}
|
||||
for _, shard := range snapshot.Shards {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return CheckpointManifest{}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
entry, err := writeShard(cfg, oldManifest, shard, reuseEncrypted)
|
||||
if err != nil {
|
||||
return CheckpointManifest{}, err
|
||||
}
|
||||
shards = append(shards, entry)
|
||||
}
|
||||
sort.Slice(shards, func(i, j int) bool { return shards[i].Path < shards[j].Path })
|
||||
manifest := CheckpointManifest{
|
||||
Format: formatVersion,
|
||||
App: "gog",
|
||||
Encrypted: true,
|
||||
Incomplete: true,
|
||||
Exported: time.Now().UTC(),
|
||||
Recipients: recipients,
|
||||
RunID: checkpoint.RunID,
|
||||
Service: checkpoint.Service,
|
||||
Account: checkpoint.Account,
|
||||
Done: checkpoint.Done,
|
||||
Total: checkpoint.Total,
|
||||
Fetched: checkpoint.Fetched,
|
||||
CacheHits: checkpoint.CacheHits,
|
||||
Shards: shards,
|
||||
}
|
||||
if err := writeCheckpointManifest(cfg.Repo, manifestRel, manifest); err != nil {
|
||||
return CheckpointManifest{}, err
|
||||
}
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
func writeSnapshot(ctx context.Context, cfg Config, snapshot Snapshot, old Manifest) (Manifest, error) {
|
||||
recipients := normalizedStrings(cfg.Recipients)
|
||||
reuseEncrypted := sameStrings(old.Recipients, recipients)
|
||||
@ -389,11 +518,15 @@ func resolveShardPath(repo, rel string) (string, error) {
|
||||
if clean == "." || clean == ".." || strings.HasPrefix(clean, "../") || path.IsAbs(clean) {
|
||||
return "", fmt.Errorf("backup shard path escapes backup root: %s", rel)
|
||||
}
|
||||
if !strings.HasPrefix(clean, "data/") || !strings.HasSuffix(clean, ".age") {
|
||||
if (!strings.HasPrefix(clean, "data/") && !strings.HasPrefix(clean, "checkpoints/")) || !strings.HasSuffix(clean, ".age") {
|
||||
return "", fmt.Errorf("invalid backup shard path: %s", rel)
|
||||
}
|
||||
full := filepath.Join(repo, filepath.FromSlash(clean))
|
||||
root := filepath.Clean(filepath.Join(repo, "data"))
|
||||
rootName := "data"
|
||||
if strings.HasPrefix(clean, "checkpoints/") {
|
||||
rootName = "checkpoints"
|
||||
}
|
||||
root := filepath.Clean(filepath.Join(repo, rootName))
|
||||
parent := filepath.Clean(filepath.Dir(full))
|
||||
if parent != root && !strings.HasPrefix(parent, root+string(filepath.Separator)) {
|
||||
return "", fmt.Errorf("backup shard path escapes backup root: %s", rel)
|
||||
@ -453,6 +586,55 @@ func readManifest(repo string) (Manifest, error) {
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
func readCheckpointManifest(repo, rel string) (CheckpointManifest, error) {
|
||||
full, err := resolveCheckpointManifestPath(repo, rel)
|
||||
if err != nil {
|
||||
return CheckpointManifest{}, err
|
||||
}
|
||||
data, err := os.ReadFile(full) // #nosec G304 -- checkpoint manifest path is confined below checkpoints/.
|
||||
if err != nil {
|
||||
return CheckpointManifest{}, err
|
||||
}
|
||||
var manifest CheckpointManifest
|
||||
if err := json.Unmarshal(data, &manifest); err != nil {
|
||||
return CheckpointManifest{}, err
|
||||
}
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
func writeCheckpointManifest(repo, rel string, manifest CheckpointManifest) error {
|
||||
full, err := resolveCheckpointManifestPath(repo, rel)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Dir(full), 0o700); err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := json.MarshalIndent(manifest, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data = append(data, '\n')
|
||||
return os.WriteFile(full, data, 0o600)
|
||||
}
|
||||
|
||||
func resolveCheckpointManifestPath(repo, rel string) (string, error) {
|
||||
clean := path.Clean(strings.TrimSpace(rel))
|
||||
if clean == "." || clean == ".." || strings.HasPrefix(clean, "../") || path.IsAbs(clean) {
|
||||
return "", fmt.Errorf("backup checkpoint path escapes backup root: %s", rel)
|
||||
}
|
||||
if !strings.HasPrefix(clean, "checkpoints/") || !strings.HasSuffix(clean, "/manifest.json") {
|
||||
return "", fmt.Errorf("invalid backup checkpoint path: %s", rel)
|
||||
}
|
||||
full := filepath.Join(repo, filepath.FromSlash(clean))
|
||||
root := filepath.Clean(filepath.Join(repo, "checkpoints"))
|
||||
parent := filepath.Clean(filepath.Dir(full))
|
||||
if parent != root && !strings.HasPrefix(parent, root+string(filepath.Separator)) {
|
||||
return "", fmt.Errorf("backup checkpoint path escapes backup root: %s", rel)
|
||||
}
|
||||
return full, nil
|
||||
}
|
||||
|
||||
func writeManifest(repo string, manifest Manifest) error {
|
||||
data, err := json.MarshalIndent(manifest, "", " ")
|
||||
if err != nil {
|
||||
@ -523,6 +705,24 @@ func sameStrings(a, b []string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func safePathPart(value string) string {
|
||||
value = strings.TrimSpace(value)
|
||||
var b strings.Builder
|
||||
for _, r := range value {
|
||||
switch {
|
||||
case r >= 'a' && r <= 'z':
|
||||
b.WriteRune(r)
|
||||
case r >= 'A' && r <= 'Z':
|
||||
b.WriteRune(r)
|
||||
case r >= '0' && r <= '9':
|
||||
b.WriteRune(r)
|
||||
case r == '-' || r == '_' || r == '.':
|
||||
b.WriteRune(r)
|
||||
}
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func removeStaleShards(repo string, shards []ShardEntry) error {
|
||||
keep := map[string]struct{}{}
|
||||
for _, shard := range shards {
|
||||
|
||||
@ -92,6 +92,55 @@ func TestPushSnapshotEncryptsAndCleansPlaintextPath(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPushCheckpointWritesIncompleteManifestOutsideMainSnapshot(t *testing.T) {
|
||||
ctx, repo, config, _ := initTestBackup(t)
|
||||
shard, err := NewJSONLShard("gmail", "messages", "acct", "checkpoints/gmail/acct/run-one/messages/part-000001.jsonl.gz.age", []map[string]string{
|
||||
{"id": "m1", "raw": "private checkpoint body"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("NewJSONLShard: %v", err)
|
||||
}
|
||||
result, err := PushCheckpoint(ctx, Snapshot{
|
||||
Services: []string{"gmail"},
|
||||
Accounts: []string{"acct"},
|
||||
Counts: map[string]int{"gmail.messages": 1},
|
||||
Shards: []PlainShard{shard},
|
||||
}, Checkpoint{
|
||||
RunID: "run-one",
|
||||
Service: "gmail",
|
||||
Account: "acct",
|
||||
Done: 1,
|
||||
Total: 2,
|
||||
Fetched: 1,
|
||||
CacheHits: 0,
|
||||
}, Options{ConfigPath: config, Push: false})
|
||||
if err != nil {
|
||||
t.Fatalf("PushCheckpoint: %v", err)
|
||||
}
|
||||
if !result.Changed || result.Shards != 1 || result.Counts["gmail.messages"] != 1 {
|
||||
t.Fatalf("unexpected checkpoint result: %+v", result)
|
||||
}
|
||||
if _, statErr := os.Stat(filepath.Join(repo, "manifest.json")); !os.IsNotExist(statErr) {
|
||||
t.Fatalf("main manifest should not be created by checkpoint: %v", statErr)
|
||||
}
|
||||
manifest, err := readCheckpointManifest(repo, "checkpoints/gmail/acct/run-one/manifest.json")
|
||||
if err != nil {
|
||||
t.Fatalf("readCheckpointManifest: %v", err)
|
||||
}
|
||||
if !manifest.Incomplete || manifest.Done != 1 || manifest.Total != 2 || manifest.RunID != "run-one" {
|
||||
t.Fatalf("unexpected checkpoint manifest: %+v", manifest)
|
||||
}
|
||||
ciphertext := readFile(t, filepath.Join(repo, "checkpoints", "gmail", "acct", "run-one", "messages", "part-000001.jsonl.gz.age"))
|
||||
if strings.Contains(string(ciphertext), "private checkpoint body") {
|
||||
t.Fatal("checkpoint shard contains plaintext")
|
||||
}
|
||||
|
||||
pushSingleShard(t, ctx, config, mustGmailMessageShard(t, "data/gmail/acct/messages/2026/04/part-0001.jsonl.gz.age", []map[string]string{{"id": "m1", "raw": "final"}}))
|
||||
if _, err := os.Stat(filepath.Join(repo, "checkpoints", "gmail", "acct", "run-one", "messages", "part-000001.jsonl.gz.age")); err != nil {
|
||||
t.Fatalf("final snapshot removed checkpoint shard: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatAndDecryptSnapshotVerifyPlaintext(t *testing.T) {
|
||||
ctx, repo, config, _ := initTestBackup(t)
|
||||
shardPath := "data/gmail/acct/messages/2026/04/part-0001.jsonl.gz.age"
|
||||
|
||||
@ -124,6 +124,9 @@ type BackupPushCmd struct {
|
||||
WorkspaceMaxFiles int `name:"workspace-max-files" help:"Max Docs/Sheets/Slides files per type for native Workspace metadata; 0 means all" default:"0"`
|
||||
GmailCache bool `name:"gmail-cache" help:"Cache fetched Gmail raw messages locally so interrupted full backups can resume" default:"true" negatable:""`
|
||||
GmailRefreshCache bool `name:"gmail-refresh-cache" help:"Refetch Gmail messages even when a local backup cache entry exists"`
|
||||
GmailCheckpoints bool `name:"gmail-checkpoints" help:"Commit and push incomplete encrypted Gmail checkpoints during long cached fetches" default:"true" negatable:""`
|
||||
GmailCheckpointRows int `name:"gmail-checkpoint-rows" help:"Gmail messages per encrypted checkpoint chunk; 0 disables row-triggered checkpoints" default:"10000"`
|
||||
GmailCheckpointEvery time.Duration `name:"gmail-checkpoint-interval" help:"Max time between Gmail checkpoint pushes during fetch; 0 disables time-triggered checkpoints" default:"30m"`
|
||||
BestEffort bool `name:"best-effort" help:"Record optional service errors as backup rows and continue" default:"true" negatable:""`
|
||||
}
|
||||
|
||||
@ -192,6 +195,10 @@ func (c *BackupPushCmd) Run(ctx context.Context, flags *RootFlags) error {
|
||||
ShardMaxRows: c.ShardMaxRows,
|
||||
CacheMessages: c.GmailCache,
|
||||
RefreshCache: c.GmailRefreshCache,
|
||||
Checkpoints: c.GmailCheckpoints,
|
||||
CheckpointRows: c.GmailCheckpointRows,
|
||||
CheckpointEvery: c.GmailCheckpointEvery,
|
||||
BackupOptions: c.options(),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
@ -270,12 +277,15 @@ func (c *BackupPushCmd) buildOptionalSnapshot(flags *RootFlags, service string,
|
||||
|
||||
type BackupGmailPushCmd struct {
|
||||
backupFlags
|
||||
Query string `name:"query" help:"Gmail query for bounded/test backups"`
|
||||
Max int64 `name:"max" aliases:"limit" help:"Max Gmail messages to export; 0 means all" default:"0"`
|
||||
IncludeSpamTrash bool `name:"include-spam-trash" help:"Include spam and trash" default:"true"`
|
||||
ShardMaxRows int `name:"shard-max-rows" help:"Max messages per encrypted shard" default:"1000"`
|
||||
CacheMessages bool `name:"gmail-cache" help:"Cache fetched raw messages locally so interrupted full backups can resume" default:"true" negatable:""`
|
||||
RefreshCache bool `name:"gmail-refresh-cache" help:"Refetch messages even when a local backup cache entry exists"`
|
||||
Query string `name:"query" help:"Gmail query for bounded/test backups"`
|
||||
Max int64 `name:"max" aliases:"limit" help:"Max Gmail messages to export; 0 means all" default:"0"`
|
||||
IncludeSpamTrash bool `name:"include-spam-trash" help:"Include spam and trash" default:"true"`
|
||||
ShardMaxRows int `name:"shard-max-rows" help:"Max messages per encrypted shard" default:"1000"`
|
||||
CacheMessages bool `name:"gmail-cache" help:"Cache fetched raw messages locally so interrupted full backups can resume" default:"true" negatable:""`
|
||||
RefreshCache bool `name:"gmail-refresh-cache" help:"Refetch messages even when a local backup cache entry exists"`
|
||||
Checkpoints bool `name:"checkpoints" help:"Commit and push incomplete encrypted checkpoints during long cached fetches" default:"true" negatable:""`
|
||||
CheckpointRows int `name:"checkpoint-rows" help:"Gmail messages per encrypted checkpoint chunk; 0 disables row-triggered checkpoints" default:"10000"`
|
||||
CheckpointEvery time.Duration `name:"checkpoint-interval" help:"Max time between checkpoint pushes during fetch; 0 disables time-triggered checkpoints" default:"30m"`
|
||||
}
|
||||
|
||||
func (c *BackupGmailPushCmd) Run(ctx context.Context, flags *RootFlags) error {
|
||||
@ -286,6 +296,10 @@ func (c *BackupGmailPushCmd) Run(ctx context.Context, flags *RootFlags) error {
|
||||
ShardMaxRows: c.ShardMaxRows,
|
||||
CacheMessages: c.CacheMessages,
|
||||
RefreshCache: c.RefreshCache,
|
||||
Checkpoints: c.Checkpoints,
|
||||
CheckpointRows: c.CheckpointRows,
|
||||
CheckpointEvery: c.CheckpointEvery,
|
||||
BackupOptions: c.options(),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@ -28,6 +28,11 @@ type gmailBackupOptions struct {
|
||||
AccountHash string
|
||||
CacheMessages bool
|
||||
RefreshCache bool
|
||||
Checkpoints bool
|
||||
CheckpointRows int
|
||||
CheckpointEvery time.Duration
|
||||
CheckpointRunID string
|
||||
BackupOptions backup.Options
|
||||
}
|
||||
|
||||
type gmailBackupMessage struct {
|
||||
@ -65,6 +70,7 @@ type gmailBackupListState struct {
|
||||
}
|
||||
|
||||
type gmailBackupFetchResult struct {
|
||||
id string
|
||||
cache bool
|
||||
err error
|
||||
}
|
||||
@ -99,6 +105,7 @@ func buildGmailBackupSnapshot(ctx context.Context, flags *RootFlags, opts gmailB
|
||||
if listErr != nil {
|
||||
return backup.Snapshot{}, listErr
|
||||
}
|
||||
opts.CheckpointRunID = gmailBackupCheckpointRunID(opts, ids)
|
||||
if cacheErr := ensureGmailBackupMessageCache(ctx, svc, opts, ids); cacheErr != nil {
|
||||
return backup.Snapshot{}, cacheErr
|
||||
}
|
||||
@ -216,6 +223,7 @@ func fetchGmailBackupMessagesDirect(ctx context.Context, svc *gmail.Service, ids
|
||||
|
||||
func ensureGmailBackupMessageCache(ctx context.Context, svc *gmail.Service, opts gmailBackupOptions, ids []string) error {
|
||||
gmailBackupProgressf(ctx, "backup gmail fetch\tqueued=%d", len(ids))
|
||||
checkpointer := newGmailBackupCheckpointer(ctx, opts, len(ids))
|
||||
const maxConcurrency = 2
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
@ -272,6 +280,11 @@ func ensureGmailBackupMessageCache(ctx context.Context, svc *gmail.Service, opts
|
||||
} else if res.err == nil {
|
||||
fetched++
|
||||
}
|
||||
if err := checkpointer.record(ctx, res.id, done, fetched, cacheHits); err != nil {
|
||||
firstErr = err
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
if done == len(ids) || done%100 == 0 {
|
||||
gmailBackupProgressf(ctx, "backup gmail fetch\t%d/%d\tfetched=%d\tcache=%d", done, len(ids), fetched, cacheHits)
|
||||
}
|
||||
@ -288,17 +301,17 @@ func ensureGmailBackupMessageCache(ctx context.Context, svc *gmail.Service, opts
|
||||
}
|
||||
return fmt.Errorf("gmail backup fetch stopped after %d/%d messages", done, len(ids))
|
||||
}
|
||||
return nil
|
||||
return checkpointer.flush(ctx, done, fetched, cacheHits)
|
||||
}
|
||||
|
||||
func fetchGmailBackupMessageCacheResult(ctx context.Context, svc *gmail.Service, opts gmailBackupOptions, messageID string) gmailBackupFetchResult {
|
||||
if opts.CacheMessages && !opts.RefreshCache {
|
||||
_, ok, err := readGmailBackupMessageCache(opts.AccountHash, messageID)
|
||||
if err != nil {
|
||||
return gmailBackupFetchResult{err: err}
|
||||
return gmailBackupFetchResult{id: messageID, err: err}
|
||||
}
|
||||
if ok {
|
||||
return gmailBackupFetchResult{cache: true}
|
||||
return gmailBackupFetchResult{id: messageID, cache: true}
|
||||
}
|
||||
}
|
||||
msg, err := svc.Users.Messages.Get("me", messageID).
|
||||
@ -307,10 +320,10 @@ func fetchGmailBackupMessageCacheResult(ctx context.Context, svc *gmail.Service,
|
||||
Context(ctx).
|
||||
Do()
|
||||
if err != nil {
|
||||
return gmailBackupFetchResult{err: fmt.Errorf("gmail message %s: %w", messageID, err)}
|
||||
return gmailBackupFetchResult{id: messageID, err: fmt.Errorf("gmail message %s: %w", messageID, err)}
|
||||
}
|
||||
if strings.TrimSpace(msg.Raw) == "" {
|
||||
return gmailBackupFetchResult{err: fmt.Errorf("gmail message %s returned empty raw payload", messageID)}
|
||||
return gmailBackupFetchResult{id: messageID, err: fmt.Errorf("gmail message %s returned empty raw payload", messageID)}
|
||||
}
|
||||
backupMsg := gmailBackupMessage{
|
||||
ID: msg.Id,
|
||||
@ -323,10 +336,98 @@ func fetchGmailBackupMessageCacheResult(ctx context.Context, svc *gmail.Service,
|
||||
}
|
||||
if opts.CacheMessages {
|
||||
if err := writeGmailBackupMessageCache(opts.AccountHash, backupMsg); err != nil {
|
||||
return gmailBackupFetchResult{err: err}
|
||||
return gmailBackupFetchResult{id: messageID, err: err}
|
||||
}
|
||||
}
|
||||
return gmailBackupFetchResult{}
|
||||
return gmailBackupFetchResult{id: messageID}
|
||||
}
|
||||
|
||||
type gmailBackupCheckpointer struct {
|
||||
enabled bool
|
||||
opts gmailBackupOptions
|
||||
total int
|
||||
part int
|
||||
last time.Time
|
||||
pending []string
|
||||
}
|
||||
|
||||
const gmailCheckpointShardMaxRows = 1000
|
||||
|
||||
func newGmailBackupCheckpointer(ctx context.Context, opts gmailBackupOptions, total int) *gmailBackupCheckpointer {
|
||||
enabled := opts.Checkpoints &&
|
||||
opts.CacheMessages &&
|
||||
strings.TrimSpace(opts.AccountHash) != "" &&
|
||||
strings.TrimSpace(opts.CheckpointRunID) != "" &&
|
||||
(opts.CheckpointRows > 0 || opts.CheckpointEvery > 0)
|
||||
cp := &gmailBackupCheckpointer{
|
||||
enabled: enabled,
|
||||
opts: opts,
|
||||
total: total,
|
||||
last: time.Now(),
|
||||
}
|
||||
if enabled {
|
||||
gmailBackupProgressf(ctx, "backup gmail checkpoint\trun=%s\trows=%d\tinterval=%s", opts.CheckpointRunID, opts.CheckpointRows, opts.CheckpointEvery)
|
||||
}
|
||||
return cp
|
||||
}
|
||||
|
||||
func (c *gmailBackupCheckpointer) record(ctx context.Context, messageID string, done, fetched, cacheHits int) error {
|
||||
if c == nil || !c.enabled || strings.TrimSpace(messageID) == "" {
|
||||
return nil
|
||||
}
|
||||
c.pending = append(c.pending, messageID)
|
||||
if c.shouldFlush(done) {
|
||||
return c.flush(ctx, done, fetched, cacheHits)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *gmailBackupCheckpointer) shouldFlush(done int) bool {
|
||||
if len(c.pending) == 0 {
|
||||
return false
|
||||
}
|
||||
if c.opts.CheckpointRows > 0 && len(c.pending) >= c.opts.CheckpointRows {
|
||||
return true
|
||||
}
|
||||
if c.opts.CheckpointEvery > 0 && time.Since(c.last) >= c.opts.CheckpointEvery {
|
||||
return true
|
||||
}
|
||||
return done == c.total
|
||||
}
|
||||
|
||||
func (c *gmailBackupCheckpointer) flush(ctx context.Context, done, fetched, cacheHits int) error {
|
||||
if c == nil || !c.enabled || len(c.pending) == 0 {
|
||||
return nil
|
||||
}
|
||||
c.part++
|
||||
ids := append([]string(nil), c.pending...)
|
||||
c.pending = c.pending[:0]
|
||||
shards, err := buildGmailCheckpointShardsFromCache(c.opts.AccountHash, c.opts.CheckpointRunID, c.part, ids)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.part += len(shards) - 1
|
||||
snapshot := backup.Snapshot{
|
||||
Services: []string{backupServiceGmail},
|
||||
Accounts: []string{c.opts.AccountHash},
|
||||
Counts: map[string]int{"gmail.messages": len(ids)},
|
||||
Shards: shards,
|
||||
}
|
||||
result, err := backup.PushCheckpoint(ctx, snapshot, backup.Checkpoint{
|
||||
RunID: c.opts.CheckpointRunID,
|
||||
Service: backupServiceGmail,
|
||||
Account: c.opts.AccountHash,
|
||||
Done: done,
|
||||
Total: c.total,
|
||||
Fetched: fetched,
|
||||
CacheHits: cacheHits,
|
||||
}, c.opts.BackupOptions)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.last = time.Now()
|
||||
gmailBackupProgressf(ctx, "backup gmail checkpoint\t%d/%d\tparts=%d\trows=%d\tchanged=%t", done, c.total, len(shards), len(ids), result.Changed)
|
||||
return nil
|
||||
}
|
||||
|
||||
func readGmailBackupMessageCache(accountHash, messageID string) (gmailBackupMessage, bool, error) {
|
||||
@ -713,6 +814,119 @@ func gmailBackupTempShardDir(accountHash string) (string, bool) {
|
||||
return filepath.Join(dir, "gogcli", "backup", "gmail", accountHash, "tmp-shards"), true
|
||||
}
|
||||
|
||||
func buildGmailCheckpointShardFromCache(accountHash, runID string, part int, ids []string) (backup.PlainShard, error) {
|
||||
if part <= 0 {
|
||||
return backup.PlainShard{}, fmt.Errorf("gmail checkpoint part must be positive")
|
||||
}
|
||||
tempDir, ok := gmailBackupCheckpointTempShardDir(accountHash, runID)
|
||||
if !ok {
|
||||
return backup.PlainShard{}, fmt.Errorf("gmail backup checkpoint temp shard directory unavailable")
|
||||
}
|
||||
if err := os.MkdirAll(tempDir, 0o700); err != nil {
|
||||
return backup.PlainShard{}, fmt.Errorf("create gmail backup checkpoint temp shard dir: %w", err)
|
||||
}
|
||||
rel := fmt.Sprintf("checkpoints/gmail/%s/%s/messages/part-%06d.jsonl.gz.age", accountHash, runID, part)
|
||||
sum := sha256.Sum256([]byte(rel))
|
||||
path := filepath.Join(tempDir, hex.EncodeToString(sum[:])+".jsonl")
|
||||
f, err := os.Create(path) //nolint:gosec // path is derived from the OS cache dir and a hash of the checkpoint path.
|
||||
if err != nil {
|
||||
return backup.PlainShard{}, fmt.Errorf("create gmail backup checkpoint temp shard: %w", err)
|
||||
}
|
||||
enc := json.NewEncoder(f)
|
||||
count := 0
|
||||
for _, id := range ids {
|
||||
msg, ok, err := readGmailBackupMessageCache(accountHash, id)
|
||||
if err != nil {
|
||||
_ = f.Close()
|
||||
_ = os.Remove(path)
|
||||
return backup.PlainShard{}, err
|
||||
}
|
||||
if !ok {
|
||||
_ = f.Close()
|
||||
_ = os.Remove(path)
|
||||
return backup.PlainShard{}, fmt.Errorf("gmail message %s missing from backup cache", id)
|
||||
}
|
||||
if err := enc.Encode(msg); err != nil {
|
||||
_ = f.Close()
|
||||
_ = os.Remove(path)
|
||||
return backup.PlainShard{}, fmt.Errorf("encode gmail backup checkpoint shard: %w", err)
|
||||
}
|
||||
count++
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
_ = os.Remove(path)
|
||||
return backup.PlainShard{}, fmt.Errorf("close gmail backup checkpoint shard: %w", err)
|
||||
}
|
||||
if err := os.Chmod(path, 0o600); err != nil {
|
||||
_ = os.Remove(path)
|
||||
return backup.PlainShard{}, fmt.Errorf("chmod gmail backup checkpoint shard: %w", err)
|
||||
}
|
||||
return backup.PlainShard{
|
||||
Service: backupServiceGmail,
|
||||
Kind: "messages",
|
||||
Account: accountHash,
|
||||
Path: filepath.ToSlash(rel),
|
||||
Rows: count,
|
||||
PlaintextPath: path,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func buildGmailCheckpointShardsFromCache(accountHash, runID string, firstPart int, ids []string) ([]backup.PlainShard, error) {
|
||||
if len(ids) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
shards := make([]backup.PlainShard, 0, (len(ids)+gmailCheckpointShardMaxRows-1)/gmailCheckpointShardMaxRows)
|
||||
for start := 0; start < len(ids); start += gmailCheckpointShardMaxRows {
|
||||
end := start + gmailCheckpointShardMaxRows
|
||||
if end > len(ids) {
|
||||
end = len(ids)
|
||||
}
|
||||
shard, err := buildGmailCheckpointShardFromCache(accountHash, runID, firstPart+len(shards), ids[start:end])
|
||||
if err != nil {
|
||||
for _, shard := range shards {
|
||||
if strings.TrimSpace(shard.PlaintextPath) != "" {
|
||||
_ = os.Remove(shard.PlaintextPath)
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
shards = append(shards, shard)
|
||||
}
|
||||
return shards, nil
|
||||
}
|
||||
|
||||
func gmailBackupCheckpointTempShardDir(accountHash, runID string) (string, bool) {
|
||||
accountHash = strings.TrimSpace(accountHash)
|
||||
runID = strings.TrimSpace(runID)
|
||||
if accountHash == "" || runID == "" {
|
||||
return "", false
|
||||
}
|
||||
dir, err := os.UserCacheDir()
|
||||
if err != nil || strings.TrimSpace(dir) == "" {
|
||||
return "", false
|
||||
}
|
||||
return filepath.Join(dir, "gogcli", "backup", "gmail", accountHash, "checkpoint-shards", runID), true
|
||||
}
|
||||
|
||||
func gmailBackupCheckpointRunID(opts gmailBackupOptions, ids []string) string {
|
||||
key := struct {
|
||||
AccountHash string `json:"accountHash"`
|
||||
Query string `json:"query,omitempty"`
|
||||
Max int64 `json:"max,omitempty"`
|
||||
IncludeSpamTrash bool `json:"includeSpamTrash"`
|
||||
IDs int `json:"ids"`
|
||||
}{
|
||||
AccountHash: opts.AccountHash,
|
||||
Query: strings.TrimSpace(opts.Query),
|
||||
Max: opts.Max,
|
||||
IncludeSpamTrash: opts.IncludeSpamTrash,
|
||||
IDs: len(ids),
|
||||
}
|
||||
data, _ := json.Marshal(key)
|
||||
sum := sha256.Sum256(data)
|
||||
return time.Now().UTC().Format("20060102T150405Z") + "-" + hex.EncodeToString(sum[:6])
|
||||
}
|
||||
|
||||
func gmailBackupMessageMonthKey(internalDate int64) string {
|
||||
t := time.UnixMilli(internalDate).UTC()
|
||||
if internalDate <= 0 {
|
||||
|
||||
@ -329,6 +329,131 @@ func TestEnsureGmailBackupMessageCacheStopsOnFirstFetchError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnsureGmailBackupMessageCacheWritesEncryptedCheckpoints(t *testing.T) {
|
||||
t.Setenv("HOME", t.TempDir())
|
||||
dir := t.TempDir()
|
||||
repo := filepath.Join(dir, "repo")
|
||||
identity := filepath.Join(dir, "age.key")
|
||||
config := filepath.Join(dir, "backup.json")
|
||||
recipient, err := backup.EnsureIdentity(identity)
|
||||
if err != nil {
|
||||
t.Fatalf("EnsureIdentity: %v", err)
|
||||
}
|
||||
if saveErr := backup.SaveConfig(config, backup.Config{
|
||||
Repo: repo,
|
||||
Identity: identity,
|
||||
Recipients: []string{recipient},
|
||||
}); saveErr != nil {
|
||||
t.Fatalf("SaveConfig: %v", saveErr)
|
||||
}
|
||||
svc, cleanup := newGmailServiceForTest(t, func(w http.ResponseWriter, r *http.Request) {
|
||||
id := filepath.Base(r.URL.Path)
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"id": id,
|
||||
"threadId": "thread-" + id,
|
||||
"historyId": "100",
|
||||
"internalDate": fmt.Sprint(mustUnixMilli(t, "2026-04-02T10:00:00Z")),
|
||||
"labelIds": []string{"INBOX"},
|
||||
"sizeEstimate": 42,
|
||||
"raw": base64.RawURLEncoding.EncodeToString([]byte("Subject: " + id + "\r\n\r\nBody")),
|
||||
})
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
ids := []string{"m1", "m2", "m3", "m4", "m5"}
|
||||
err = ensureGmailBackupMessageCache(context.Background(), svc, gmailBackupOptions{
|
||||
AccountHash: "accthash",
|
||||
CacheMessages: true,
|
||||
IncludeSpamTrash: true,
|
||||
Checkpoints: true,
|
||||
CheckpointRows: 2,
|
||||
CheckpointRunID: "run-test",
|
||||
BackupOptions: backup.Options{ConfigPath: config, Push: false},
|
||||
}, ids)
|
||||
if err != nil {
|
||||
t.Fatalf("ensureGmailBackupMessageCache: %v", err)
|
||||
}
|
||||
manifestPath := filepath.Join(repo, "checkpoints", "gmail", "accthash", "run-test", "manifest.json")
|
||||
data, err := os.ReadFile(manifestPath)
|
||||
if err != nil {
|
||||
t.Fatalf("read checkpoint manifest: %v", err)
|
||||
}
|
||||
var manifest backup.CheckpointManifest
|
||||
if unmarshalErr := json.Unmarshal(data, &manifest); unmarshalErr != nil {
|
||||
t.Fatalf("unmarshal checkpoint manifest: %v", unmarshalErr)
|
||||
}
|
||||
if !manifest.Incomplete || manifest.Done != 5 || manifest.Total != 5 || len(manifest.Shards) != 3 {
|
||||
t.Fatalf("unexpected checkpoint manifest: %+v", manifest)
|
||||
}
|
||||
ciphertext, err := os.ReadFile(filepath.Join(repo, filepath.FromSlash(manifest.Shards[0].Path)))
|
||||
if err != nil {
|
||||
t.Fatalf("read checkpoint shard: %v", err)
|
||||
}
|
||||
if strings.Contains(string(ciphertext), "Subject:") {
|
||||
t.Fatal("checkpoint shard contains plaintext")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildGmailCheckpointShardFromCacheWritesPlaintextPath(t *testing.T) {
|
||||
t.Setenv("HOME", t.TempDir())
|
||||
accountHash := "accthash"
|
||||
for _, message := range []gmailBackupMessage{
|
||||
{ID: "m1", InternalDate: mustUnixMilli(t, "2026-04-01T10:00:00Z"), Raw: "raw-1"},
|
||||
{ID: "m2", InternalDate: mustUnixMilli(t, "2026-04-02T10:00:00Z"), Raw: "raw-2"},
|
||||
} {
|
||||
if err := writeGmailBackupMessageCache(accountHash, message); err != nil {
|
||||
t.Fatalf("writeGmailBackupMessageCache: %v", err)
|
||||
}
|
||||
}
|
||||
shard, err := buildGmailCheckpointShardFromCache(accountHash, "run-test", 3, []string{"m1", "m2"})
|
||||
if err != nil {
|
||||
t.Fatalf("buildGmailCheckpointShardFromCache: %v", err)
|
||||
}
|
||||
if shard.Path != "checkpoints/gmail/accthash/run-test/messages/part-000003.jsonl.gz.age" {
|
||||
t.Fatalf("checkpoint shard path = %q", shard.Path)
|
||||
}
|
||||
if shard.Rows != 2 || shard.PlaintextPath == "" {
|
||||
t.Fatalf("unexpected shard: %+v", shard)
|
||||
}
|
||||
data, err := os.ReadFile(shard.PlaintextPath)
|
||||
if err != nil {
|
||||
t.Fatalf("read checkpoint plaintext: %v", err)
|
||||
}
|
||||
var rows []gmailBackupMessage
|
||||
if err := backup.DecodeJSONL(data, &rows); err != nil {
|
||||
t.Fatalf("DecodeJSONL: %v", err)
|
||||
}
|
||||
if len(rows) != 2 || rows[0].ID != "m1" || rows[1].ID != "m2" {
|
||||
t.Fatalf("rows = %+v", rows)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildGmailCheckpointShardsFromCacheSplitsLargeChunks(t *testing.T) {
|
||||
t.Setenv("HOME", t.TempDir())
|
||||
accountHash := "accthash"
|
||||
ids := make([]string, gmailCheckpointShardMaxRows+1)
|
||||
for i := range ids {
|
||||
id := fmt.Sprintf("m-%04d", i)
|
||||
ids[i] = id
|
||||
if err := writeGmailBackupMessageCache(accountHash, gmailBackupMessage{ID: id, Raw: "raw-" + id}); err != nil {
|
||||
t.Fatalf("writeGmailBackupMessageCache: %v", err)
|
||||
}
|
||||
}
|
||||
shards, err := buildGmailCheckpointShardsFromCache(accountHash, "run-test", 7, ids)
|
||||
if err != nil {
|
||||
t.Fatalf("buildGmailCheckpointShardsFromCache: %v", err)
|
||||
}
|
||||
if len(shards) != 2 {
|
||||
t.Fatalf("len(shards) = %d, want 2", len(shards))
|
||||
}
|
||||
if shards[0].Rows != gmailCheckpointShardMaxRows || shards[1].Rows != 1 {
|
||||
t.Fatalf("rows = %d,%d", shards[0].Rows, shards[1].Rows)
|
||||
}
|
||||
if !strings.HasSuffix(shards[0].Path, "part-000007.jsonl.gz.age") || !strings.HasSuffix(shards[1].Path, "part-000008.jsonl.gz.age") {
|
||||
t.Fatalf("paths = %q %q", shards[0].Path, shards[1].Path)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildGmailMessageShardsFromCacheWritesPlaintextPaths(t *testing.T) {
|
||||
t.Setenv("HOME", t.TempDir())
|
||||
accountHash := "accthash"
|
||||
|
||||
Loading…
Reference in New Issue
Block a user