perf(backup): stream gmail backup shards
This commit is contained in:
parent
ff5e85b159
commit
b8f53df696
@ -7,7 +7,7 @@
|
||||
- Backup: expand `gog backup push --services all` with Drive content export/download, Gmail settings, native Workspace Docs/Sheets/Slides/Form data, Apps Script projects, Chat, Classroom, best-effort optional service error shards, and plaintext Drive file export.
|
||||
- Backup: extend `--services all` with Drive permissions/comments/revisions, Calendar ACL/settings/colors, contact groups, Cloud Identity groups, Workspace Admin Directory users/groups/members, Keep notes, and local Gmail message caching for resumable full-mailbox fetches.
|
||||
- Backup: bound individual Drive content exports with `--drive-content-timeout` so one stuck Google export records an encrypted error row instead of blocking the full backup.
|
||||
- Backup: add Gmail message-list checkpoints and stderr progress counters so full-mailbox backups can resume cleanly after interruption.
|
||||
- Backup: add Gmail message-list checkpoints, streaming shard construction, and stderr progress counters so full-mailbox backups can resume cleanly after interruption without keeping every raw message in RAM.
|
||||
|
||||
### Fixed
|
||||
- Gmail: auto-fill draft reply subjects from the original message when `gmail drafts create --reply-to-message-id` omits `--subject`. (#488) — thanks @jbowerbir.
|
||||
|
||||
@ -754,9 +754,10 @@ content by default. Non-Google binary Drive files are metadata-only unless
|
||||
`--drive-binary-contents` is set. `--drive-content-timeout` turns a stuck
|
||||
per-file export into an encrypted error row instead of wedging the run. Gmail
|
||||
raw-message fetches and message-list pages use a local cache by default so
|
||||
interrupted full-mailbox backups can resume; progress is written to stderr
|
||||
while stdout stays parseable. Use `--gmail-refresh-cache` to force a refetch.
|
||||
Workspace inventories
|
||||
interrupted full-mailbox backups can resume. Full Gmail runs build encrypted
|
||||
message shards from cached messages instead of keeping the whole mailbox in
|
||||
memory; progress is written to stderr while stdout stays parseable. Use
|
||||
`--gmail-refresh-cache` to force a refetch. Workspace inventories
|
||||
Docs/Sheets/Slides and backs up Forms/responses discovered through Drive; add
|
||||
`--workspace-native` for full native Docs/Sheets/Slides API JSON.
|
||||
Optional Workspace-only services use `--best-effort` by default, recording
|
||||
|
||||
@ -264,10 +264,12 @@ directory (`gogcli/backup/gmail/<account-hash>/`). Message-list page checkpoints
|
||||
live under `list-v1/`, and fetched raw messages live under `raw-v1/`. Raw-message
|
||||
cache files store the same row that will be encrypted into shards and are keyed
|
||||
by a SHA-256 of the Gmail message ID, so rerunning after an interruption can
|
||||
reuse already fetched messages. Long Gmail runs report list/fetch counters to
|
||||
stderr while stdout stays parseable. `--gmail-refresh-cache` forces a refetch.
|
||||
The cache is plaintext local data; clear it if the machine should not retain
|
||||
local mail copies outside the encrypted backup/export locations.
|
||||
reuse already fetched messages. The encrypted message shards are streamed from
|
||||
that cache in temporary per-shard files, so a full mailbox run does not retain
|
||||
every raw message in RAM. Long Gmail runs report list, fetch, and shard-build
|
||||
counters to stderr while stdout stays parseable. `--gmail-refresh-cache` forces
|
||||
a refetch. The cache is plaintext local data; clear it if the machine should not
|
||||
retain local mail copies outside the encrypted backup/export locations.
|
||||
|
||||
`--include-spam-trash` defaults to true. Use `--query` and `--max` for bounded
|
||||
test exports; omit them for a full mailbox scan.
|
||||
|
||||
@ -5,8 +5,11 @@ import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
@ -41,12 +44,13 @@ type ShardEntry struct {
|
||||
}
|
||||
|
||||
type PlainShard struct {
|
||||
Service string
|
||||
Kind string
|
||||
Account string
|
||||
Path string
|
||||
Rows int
|
||||
Plaintext []byte
|
||||
Service string
|
||||
Kind string
|
||||
Account string
|
||||
Path string
|
||||
Rows int
|
||||
Plaintext []byte
|
||||
PlaintextPath string
|
||||
}
|
||||
|
||||
type Snapshot struct {
|
||||
@ -90,6 +94,7 @@ func Init(ctx context.Context, opts Options) (Config, string, error) {
|
||||
}
|
||||
|
||||
func PushSnapshot(ctx context.Context, snapshot Snapshot, opts Options) (Result, error) {
|
||||
defer cleanupPlainShardFiles(snapshot)
|
||||
cfg, err := ResolveOptions(opts)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
@ -300,7 +305,10 @@ func writeShard(cfg Config, old Manifest, shard PlainShard, reuseEncrypted bool)
|
||||
if strings.TrimSpace(shard.Service) == "" {
|
||||
return ShardEntry{}, fmt.Errorf("backup shard service is required")
|
||||
}
|
||||
hash := sha256Hex(shard.Plaintext)
|
||||
hash, err := shardPlaintextHash(shard)
|
||||
if err != nil {
|
||||
return ShardEntry{}, err
|
||||
}
|
||||
path, err := resolveShardPath(cfg.Repo, shard.Path)
|
||||
if err != nil {
|
||||
return ShardEntry{}, err
|
||||
@ -311,14 +319,11 @@ func writeShard(cfg Config, old Manifest, shard PlainShard, reuseEncrypted bool)
|
||||
return oldEntry, nil
|
||||
}
|
||||
}
|
||||
encrypted, _, err := encryptShard(shard.Plaintext, cfg.Recipients)
|
||||
if err != nil {
|
||||
return ShardEntry{}, err
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
|
||||
return ShardEntry{}, err
|
||||
}
|
||||
if err := os.WriteFile(path, encrypted, 0o600); err != nil {
|
||||
bytesWritten, err := encryptShardToFile(shardPlaintextReader(shard), path, cfg.Recipients)
|
||||
if err != nil {
|
||||
return ShardEntry{}, err
|
||||
}
|
||||
return ShardEntry{
|
||||
@ -328,10 +333,45 @@ func writeShard(cfg Config, old Manifest, shard PlainShard, reuseEncrypted bool)
|
||||
Path: shard.Path,
|
||||
Rows: shard.Rows,
|
||||
SHA256: hash,
|
||||
Bytes: int64(len(encrypted)),
|
||||
Bytes: bytesWritten,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func cleanupPlainShardFiles(snapshot Snapshot) {
|
||||
for _, shard := range snapshot.Shards {
|
||||
if strings.TrimSpace(shard.PlaintextPath) != "" {
|
||||
_ = os.Remove(shard.PlaintextPath)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func shardPlaintextHash(shard PlainShard) (string, error) {
|
||||
if strings.TrimSpace(shard.PlaintextPath) == "" {
|
||||
return sha256Hex(shard.Plaintext), nil
|
||||
}
|
||||
f, err := os.Open(shard.PlaintextPath) // #nosec G304 -- PlaintextPath is created by gog as a temporary backup shard file.
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer func() { _ = f.Close() }()
|
||||
h := sha256.New()
|
||||
if _, err := io.Copy(h, f); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return hex.EncodeToString(h.Sum(nil)), nil
|
||||
}
|
||||
|
||||
func shardPlaintextReader(shard PlainShard) func() (io.ReadCloser, error) {
|
||||
if strings.TrimSpace(shard.PlaintextPath) == "" {
|
||||
return func() (io.ReadCloser, error) {
|
||||
return io.NopCloser(bytes.NewReader(shard.Plaintext)), nil
|
||||
}
|
||||
}
|
||||
return func() (io.ReadCloser, error) {
|
||||
return os.Open(shard.PlaintextPath) // #nosec G304 -- PlaintextPath is created by gog as a temporary backup shard file.
|
||||
}
|
||||
}
|
||||
|
||||
func decryptShardFile(cfg Config, shard ShardEntry) ([]byte, error) {
|
||||
path, err := resolveShardPath(cfg.Repo, shard.Path)
|
||||
if err != nil {
|
||||
|
||||
@ -58,6 +58,40 @@ func TestPushSnapshotAndVerify(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestPushSnapshotEncryptsAndCleansPlaintextPath(t *testing.T) {
|
||||
ctx, _, config, _ := initTestBackup(t)
|
||||
tempPath := filepath.Join(t.TempDir(), "messages.jsonl")
|
||||
if err := os.WriteFile(tempPath, []byte("{\"id\":\"m1\",\"raw\":\"private\"}\n"), 0o600); err != nil {
|
||||
t.Fatalf("write plaintext path: %v", err)
|
||||
}
|
||||
shard := PlainShard{
|
||||
Service: "gmail",
|
||||
Kind: "messages",
|
||||
Account: "acct",
|
||||
Path: "data/gmail/acct/messages/2026/04/part-0001.jsonl.gz.age",
|
||||
Rows: 1,
|
||||
PlaintextPath: tempPath,
|
||||
}
|
||||
if _, err := PushSnapshot(ctx, Snapshot{
|
||||
Services: []string{"gmail"},
|
||||
Accounts: []string{"acct"},
|
||||
Counts: map[string]int{"gmail.messages": 1},
|
||||
Shards: []PlainShard{shard},
|
||||
}, Options{ConfigPath: config, Push: false}); err != nil {
|
||||
t.Fatalf("PushSnapshot: %v", err)
|
||||
}
|
||||
if _, err := os.Stat(tempPath); !os.IsNotExist(err) {
|
||||
t.Fatalf("plaintext temp file still exists or stat failed: %v", err)
|
||||
}
|
||||
verify, err := Verify(ctx, Options{ConfigPath: config})
|
||||
if err != nil {
|
||||
t.Fatalf("Verify: %v", err)
|
||||
}
|
||||
if verify.Counts["gmail.messages"] != 1 {
|
||||
t.Fatalf("unexpected verify counts: %+v", verify.Counts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatAndDecryptSnapshotVerifyPlaintext(t *testing.T) {
|
||||
ctx, repo, config, _ := initTestBackup(t)
|
||||
shardPath := "data/gmail/acct/messages/2026/04/part-0001.jsonl.gz.age"
|
||||
|
||||
@ -77,6 +77,74 @@ func encryptShard(plaintext []byte, recipientStrings []string) ([]byte, string,
|
||||
return encrypted.Bytes(), sha256Hex(plaintext), nil
|
||||
}
|
||||
|
||||
func encryptShardToFile(openPlaintext func() (io.ReadCloser, error), dst string, recipientStrings []string) (int64, error) {
|
||||
recipients, err := parseRecipients(recipientStrings)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
in, err := openPlaintext()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer func() { _ = in.Close() }()
|
||||
tmp, err := os.CreateTemp(filepath.Dir(dst), ".shard-*.age")
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
tmpPath := tmp.Name()
|
||||
cw := &countingWriter{w: tmp}
|
||||
ageWriter, err := age.Encrypt(cw, recipients...)
|
||||
if err != nil {
|
||||
_ = tmp.Close()
|
||||
_ = os.Remove(tmpPath)
|
||||
return 0, err
|
||||
}
|
||||
gz := gzip.NewWriter(ageWriter)
|
||||
gz.ModTime = time.Unix(0, 0).UTC()
|
||||
if _, err := io.Copy(gz, in); err != nil {
|
||||
_ = gz.Close()
|
||||
_ = ageWriter.Close()
|
||||
_ = tmp.Close()
|
||||
_ = os.Remove(tmpPath)
|
||||
return 0, err
|
||||
}
|
||||
if err := gz.Close(); err != nil {
|
||||
_ = ageWriter.Close()
|
||||
_ = tmp.Close()
|
||||
_ = os.Remove(tmpPath)
|
||||
return 0, err
|
||||
}
|
||||
if err := ageWriter.Close(); err != nil {
|
||||
_ = tmp.Close()
|
||||
_ = os.Remove(tmpPath)
|
||||
return 0, err
|
||||
}
|
||||
if err := tmp.Close(); err != nil {
|
||||
_ = os.Remove(tmpPath)
|
||||
return 0, err
|
||||
}
|
||||
if err := os.Chmod(tmpPath, 0o600); err != nil {
|
||||
_ = os.Remove(tmpPath)
|
||||
return 0, err
|
||||
}
|
||||
if err := os.Rename(tmpPath, dst); err != nil {
|
||||
_ = os.Remove(tmpPath)
|
||||
return 0, err
|
||||
}
|
||||
return cw.n, nil
|
||||
}
|
||||
|
||||
type countingWriter struct {
|
||||
w io.Writer
|
||||
n int64
|
||||
}
|
||||
|
||||
func (w *countingWriter) Write(p []byte) (int, error) {
|
||||
n, err := w.w.Write(p)
|
||||
w.n += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func decryptShard(ciphertext []byte, identityPath string) ([]byte, error) {
|
||||
data, err := os.ReadFile(expandHome(identityPath)) // #nosec G304 -- path is the configured local age identity file.
|
||||
if err != nil {
|
||||
|
||||
@ -81,27 +81,45 @@ func buildGmailBackupSnapshot(ctx context.Context, flags *RootFlags, opts gmailB
|
||||
if err != nil {
|
||||
return backup.Snapshot{}, err
|
||||
}
|
||||
messages, err := fetchGmailBackupMessages(ctx, svc, opts)
|
||||
if err != nil {
|
||||
return backup.Snapshot{}, err
|
||||
}
|
||||
shards := make([]backup.PlainShard, 0, 1)
|
||||
labelShard, err := backup.NewJSONLShard(backupServiceGmail, "labels", accountHash, fmt.Sprintf("data/gmail/%s/labels.jsonl.gz.age", accountHash), labels)
|
||||
if err != nil {
|
||||
return backup.Snapshot{}, err
|
||||
}
|
||||
shards = append(shards, labelShard)
|
||||
messageShards, err := buildGmailMessageShards(accountHash, messages, opts.ShardMaxRows)
|
||||
if err != nil {
|
||||
return backup.Snapshot{}, err
|
||||
var messageCount int
|
||||
if opts.CacheMessages {
|
||||
ids, listErr := listGmailBackupMessageIDs(ctx, svc, opts)
|
||||
if listErr != nil {
|
||||
return backup.Snapshot{}, listErr
|
||||
}
|
||||
if cacheErr := ensureGmailBackupMessageCache(ctx, svc, opts, ids); cacheErr != nil {
|
||||
return backup.Snapshot{}, cacheErr
|
||||
}
|
||||
messageShards, shardErr := buildGmailMessageShardsFromCache(ctx, opts, ids)
|
||||
if shardErr != nil {
|
||||
return backup.Snapshot{}, shardErr
|
||||
}
|
||||
shards = append(shards, messageShards...)
|
||||
messageCount = len(ids)
|
||||
} else {
|
||||
messages, err := fetchGmailBackupMessages(ctx, svc, opts)
|
||||
if err != nil {
|
||||
return backup.Snapshot{}, err
|
||||
}
|
||||
messageShards, err := buildGmailMessageShards(accountHash, messages, opts.ShardMaxRows)
|
||||
if err != nil {
|
||||
return backup.Snapshot{}, err
|
||||
}
|
||||
shards = append(shards, messageShards...)
|
||||
messageCount = len(messages)
|
||||
}
|
||||
shards = append(shards, messageShards...)
|
||||
return backup.Snapshot{
|
||||
Services: []string{backupServiceGmail},
|
||||
Accounts: []string{accountHash},
|
||||
Counts: map[string]int{
|
||||
"gmail.labels": len(labels),
|
||||
"gmail.messages": len(messages),
|
||||
"gmail.messages": messageCount,
|
||||
},
|
||||
Shards: shards,
|
||||
}, nil
|
||||
@ -138,36 +156,87 @@ func fetchGmailBackupMessages(ctx context.Context, svc *gmail.Service, opts gmai
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !opts.CacheMessages {
|
||||
return fetchGmailBackupMessagesDirect(ctx, svc, ids)
|
||||
}
|
||||
if err := ensureGmailBackupMessageCache(ctx, svc, opts, ids); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ordered := make([]gmailBackupMessage, 0, len(ids))
|
||||
for _, id := range ids {
|
||||
msg, ok, err := readGmailBackupMessageCache(opts.AccountHash, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("gmail message %s missing from backup cache", id)
|
||||
}
|
||||
ordered = append(ordered, msg)
|
||||
}
|
||||
return ordered, nil
|
||||
}
|
||||
|
||||
func fetchGmailBackupMessagesDirect(ctx context.Context, svc *gmail.Service, ids []string) ([]gmailBackupMessage, error) {
|
||||
gmailBackupProgressf(ctx, "backup gmail fetch\tqueued=%d", len(ids))
|
||||
out := make([]gmailBackupMessage, 0, len(ids))
|
||||
for i, id := range ids {
|
||||
msg, err := svc.Users.Messages.Get("me", id).
|
||||
Format(gmailFormatRaw).
|
||||
Fields("id,threadId,historyId,internalDate,labelIds,sizeEstimate,raw").
|
||||
Context(ctx).
|
||||
Do()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("gmail message %s: %w", id, err)
|
||||
}
|
||||
if strings.TrimSpace(msg.Raw) == "" {
|
||||
return nil, fmt.Errorf("gmail message %s returned empty raw payload", id)
|
||||
}
|
||||
out = append(out, gmailBackupMessage{
|
||||
ID: msg.Id,
|
||||
ThreadID: msg.ThreadId,
|
||||
HistoryID: formatHistoryID(msg.HistoryId),
|
||||
InternalDate: msg.InternalDate,
|
||||
LabelIDs: append([]string(nil), msg.LabelIds...),
|
||||
SizeEstimate: msg.SizeEstimate,
|
||||
Raw: msg.Raw,
|
||||
})
|
||||
done := i + 1
|
||||
if done == len(ids) || done%100 == 0 {
|
||||
gmailBackupProgressf(ctx, "backup gmail fetch\t%d/%d\tfetched=%d\tcache=0", done, len(ids), done)
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func ensureGmailBackupMessageCache(ctx context.Context, svc *gmail.Service, opts gmailBackupOptions, ids []string) error {
|
||||
gmailBackupProgressf(ctx, "backup gmail fetch\tqueued=%d", len(ids))
|
||||
const maxConcurrency = 8
|
||||
sem := make(chan struct{}, maxConcurrency)
|
||||
type result struct {
|
||||
index int
|
||||
msg gmailBackupMessage
|
||||
cache bool
|
||||
err error
|
||||
}
|
||||
results := make(chan result, len(ids))
|
||||
var wg sync.WaitGroup
|
||||
for i, id := range ids {
|
||||
for _, id := range ids {
|
||||
wg.Add(1)
|
||||
go func(index int, messageID string) {
|
||||
go func(messageID string) {
|
||||
defer wg.Done()
|
||||
select {
|
||||
case sem <- struct{}{}:
|
||||
defer func() { <-sem }()
|
||||
case <-ctx.Done():
|
||||
results <- result{index: index, err: ctx.Err()}
|
||||
results <- result{err: ctx.Err()}
|
||||
return
|
||||
}
|
||||
if opts.CacheMessages && !opts.RefreshCache {
|
||||
msg, ok, err := readGmailBackupMessageCache(opts.AccountHash, messageID)
|
||||
_, ok, err := readGmailBackupMessageCache(opts.AccountHash, messageID)
|
||||
if err != nil {
|
||||
results <- result{index: index, err: err}
|
||||
results <- result{err: err}
|
||||
return
|
||||
}
|
||||
if ok {
|
||||
results <- result{index: index, msg: msg, cache: true}
|
||||
results <- result{cache: true}
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -177,11 +246,11 @@ func fetchGmailBackupMessages(ctx context.Context, svc *gmail.Service, opts gmai
|
||||
Context(ctx).
|
||||
Do()
|
||||
if err != nil {
|
||||
results <- result{index: index, err: fmt.Errorf("gmail message %s: %w", messageID, err)}
|
||||
results <- result{err: fmt.Errorf("gmail message %s: %w", messageID, err)}
|
||||
return
|
||||
}
|
||||
if strings.TrimSpace(msg.Raw) == "" {
|
||||
results <- result{index: index, err: fmt.Errorf("gmail message %s returned empty raw payload", messageID)}
|
||||
results <- result{err: fmt.Errorf("gmail message %s returned empty raw payload", messageID)}
|
||||
return
|
||||
}
|
||||
backupMsg := gmailBackupMessage{
|
||||
@ -195,18 +264,17 @@ func fetchGmailBackupMessages(ctx context.Context, svc *gmail.Service, opts gmai
|
||||
}
|
||||
if opts.CacheMessages {
|
||||
if err := writeGmailBackupMessageCache(opts.AccountHash, backupMsg); err != nil {
|
||||
results <- result{index: index, err: err}
|
||||
results <- result{err: err}
|
||||
return
|
||||
}
|
||||
}
|
||||
results <- result{index: index, msg: backupMsg}
|
||||
}(i, id)
|
||||
results <- result{}
|
||||
}(id)
|
||||
}
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(results)
|
||||
}()
|
||||
ordered := make([]gmailBackupMessage, len(ids))
|
||||
var firstErr error
|
||||
done := 0
|
||||
cacheHits := 0
|
||||
@ -215,7 +283,6 @@ func fetchGmailBackupMessages(ctx context.Context, svc *gmail.Service, opts gmai
|
||||
if res.err != nil && firstErr == nil {
|
||||
firstErr = res.err
|
||||
}
|
||||
ordered[res.index] = res.msg
|
||||
done++
|
||||
if res.cache {
|
||||
cacheHits++
|
||||
@ -227,9 +294,9 @@ func fetchGmailBackupMessages(ctx context.Context, svc *gmail.Service, opts gmai
|
||||
}
|
||||
}
|
||||
if firstErr != nil {
|
||||
return nil, firstErr
|
||||
return firstErr
|
||||
}
|
||||
return ordered, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func readGmailBackupMessageCache(accountHash, messageID string) (gmailBackupMessage, bool, error) {
|
||||
@ -480,17 +547,157 @@ func gmailBackupProgressf(ctx context.Context, format string, args ...any) {
|
||||
u.Err().Printf(format, args...)
|
||||
}
|
||||
|
||||
type gmailBackupMessageRef struct {
|
||||
ID string
|
||||
InternalDate int64
|
||||
}
|
||||
|
||||
func buildGmailMessageShardsFromCache(ctx context.Context, opts gmailBackupOptions, ids []string) ([]backup.PlainShard, error) {
|
||||
if opts.ShardMaxRows <= 0 {
|
||||
opts.ShardMaxRows = 1000
|
||||
}
|
||||
tempDir, ok := gmailBackupTempShardDir(opts.AccountHash)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("gmail backup temp shard directory unavailable")
|
||||
}
|
||||
if err := os.RemoveAll(tempDir); err != nil {
|
||||
return nil, fmt.Errorf("clear gmail backup temp shard dir: %w", err)
|
||||
}
|
||||
if err := os.MkdirAll(tempDir, 0o700); err != nil {
|
||||
return nil, fmt.Errorf("create gmail backup temp shard dir: %w", err)
|
||||
}
|
||||
buckets := map[string][]gmailBackupMessageRef{}
|
||||
for i, id := range ids {
|
||||
msg, ok, err := readGmailBackupMessageCache(opts.AccountHash, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("gmail message %s missing from backup cache", id)
|
||||
}
|
||||
key := gmailBackupMessageMonthKey(msg.InternalDate)
|
||||
buckets[key] = append(buckets[key], gmailBackupMessageRef{ID: msg.ID, InternalDate: msg.InternalDate})
|
||||
done := i + 1
|
||||
if done == len(ids) || done%5000 == 0 {
|
||||
gmailBackupProgressf(ctx, "backup gmail shard-index\t%d/%d", done, len(ids))
|
||||
}
|
||||
}
|
||||
keys := make([]string, 0, len(buckets))
|
||||
for key := range buckets {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
shards := make([]backup.PlainShard, 0, len(keys))
|
||||
shardCount := 0
|
||||
for _, key := range keys {
|
||||
refs := buckets[key]
|
||||
sort.Slice(refs, func(i, j int) bool {
|
||||
if refs[i].InternalDate == refs[j].InternalDate {
|
||||
return refs[i].ID < refs[j].ID
|
||||
}
|
||||
return refs[i].InternalDate < refs[j].InternalDate
|
||||
})
|
||||
for part, start := 1, 0; start < len(refs); part, start = part+1, start+opts.ShardMaxRows {
|
||||
end := start + opts.ShardMaxRows
|
||||
if end > len(refs) {
|
||||
end = len(refs)
|
||||
}
|
||||
rel := fmt.Sprintf("data/gmail/%s/messages/%s/part-%04d.jsonl.gz.age", opts.AccountHash, key, part)
|
||||
shard, err := buildGmailMessageShardFromCache(opts.AccountHash, rel, tempDir, refs[start:end])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
shards = append(shards, shard)
|
||||
shardCount++
|
||||
if shardCount%25 == 0 || end == len(refs) {
|
||||
gmailBackupProgressf(ctx, "backup gmail shard-build\tshards=%d\tmessages=%d/%d", shardCount, countGmailShardRows(shards), len(ids))
|
||||
}
|
||||
}
|
||||
}
|
||||
return shards, nil
|
||||
}
|
||||
|
||||
func buildGmailMessageShardFromCache(accountHash, rel, tempDir string, refs []gmailBackupMessageRef) (backup.PlainShard, error) {
|
||||
sum := sha256.Sum256([]byte(rel))
|
||||
path := filepath.Join(tempDir, hex.EncodeToString(sum[:])+".jsonl")
|
||||
f, err := os.Create(path) //nolint:gosec // path is derived from the OS cache dir and a hash of the shard path.
|
||||
if err != nil {
|
||||
return backup.PlainShard{}, fmt.Errorf("create gmail backup temp shard: %w", err)
|
||||
}
|
||||
enc := json.NewEncoder(f)
|
||||
count := 0
|
||||
for _, ref := range refs {
|
||||
msg, ok, err := readGmailBackupMessageCache(accountHash, ref.ID)
|
||||
if err != nil {
|
||||
_ = f.Close()
|
||||
_ = os.Remove(path)
|
||||
return backup.PlainShard{}, err
|
||||
}
|
||||
if !ok {
|
||||
_ = f.Close()
|
||||
_ = os.Remove(path)
|
||||
return backup.PlainShard{}, fmt.Errorf("gmail message %s missing from backup cache", ref.ID)
|
||||
}
|
||||
if err := enc.Encode(msg); err != nil {
|
||||
_ = f.Close()
|
||||
_ = os.Remove(path)
|
||||
return backup.PlainShard{}, fmt.Errorf("encode gmail backup temp shard: %w", err)
|
||||
}
|
||||
count++
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
_ = os.Remove(path)
|
||||
return backup.PlainShard{}, fmt.Errorf("close gmail backup temp shard: %w", err)
|
||||
}
|
||||
if err := os.Chmod(path, 0o600); err != nil {
|
||||
_ = os.Remove(path)
|
||||
return backup.PlainShard{}, fmt.Errorf("chmod gmail backup temp shard: %w", err)
|
||||
}
|
||||
return backup.PlainShard{
|
||||
Service: backupServiceGmail,
|
||||
Kind: "messages",
|
||||
Account: accountHash,
|
||||
Path: filepath.ToSlash(rel),
|
||||
Rows: count,
|
||||
PlaintextPath: path,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func countGmailShardRows(shards []backup.PlainShard) int {
|
||||
total := 0
|
||||
for _, shard := range shards {
|
||||
total += shard.Rows
|
||||
}
|
||||
return total
|
||||
}
|
||||
|
||||
func gmailBackupTempShardDir(accountHash string) (string, bool) {
|
||||
accountHash = strings.TrimSpace(accountHash)
|
||||
if accountHash == "" {
|
||||
return "", false
|
||||
}
|
||||
dir, err := os.UserCacheDir()
|
||||
if err != nil || strings.TrimSpace(dir) == "" {
|
||||
return "", false
|
||||
}
|
||||
return filepath.Join(dir, "gogcli", "backup", "gmail", accountHash, "tmp-shards"), true
|
||||
}
|
||||
|
||||
func gmailBackupMessageMonthKey(internalDate int64) string {
|
||||
t := time.UnixMilli(internalDate).UTC()
|
||||
if internalDate <= 0 {
|
||||
t = time.Unix(0, 0).UTC()
|
||||
}
|
||||
return fmt.Sprintf("%04d/%02d", t.Year(), int(t.Month()))
|
||||
}
|
||||
|
||||
func buildGmailMessageShards(accountHash string, messages []gmailBackupMessage, shardMaxRows int) ([]backup.PlainShard, error) {
|
||||
if shardMaxRows <= 0 {
|
||||
shardMaxRows = 1000
|
||||
}
|
||||
buckets := map[string][]gmailBackupMessage{}
|
||||
for _, message := range messages {
|
||||
t := time.UnixMilli(message.InternalDate).UTC()
|
||||
if message.InternalDate <= 0 {
|
||||
t = time.Unix(0, 0).UTC()
|
||||
}
|
||||
key := fmt.Sprintf("%04d/%02d", t.Year(), int(t.Month()))
|
||||
key := gmailBackupMessageMonthKey(message.InternalDate)
|
||||
buckets[key] = append(buckets[key], message)
|
||||
}
|
||||
keys := make([]string, 0, len(buckets))
|
||||
|
||||
@ -301,6 +301,55 @@ func TestListGmailBackupMessageIDsMarksMaxLimitedRunComplete(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildGmailMessageShardsFromCacheWritesPlaintextPaths(t *testing.T) {
|
||||
t.Setenv("HOME", t.TempDir())
|
||||
accountHash := "accthash"
|
||||
messages := []gmailBackupMessage{
|
||||
{ID: "april-b", InternalDate: mustUnixMilli(t, "2026-04-02T10:00:00Z"), Raw: "raw-b"},
|
||||
{ID: "april-a", InternalDate: mustUnixMilli(t, "2026-04-01T10:00:00Z"), Raw: "raw-a"},
|
||||
{ID: "march-a", InternalDate: mustUnixMilli(t, "2026-03-01T10:00:00Z"), Raw: "raw-m"},
|
||||
}
|
||||
for _, message := range messages {
|
||||
if err := writeGmailBackupMessageCache(accountHash, message); err != nil {
|
||||
t.Fatalf("writeGmailBackupMessageCache: %v", err)
|
||||
}
|
||||
}
|
||||
shards, err := buildGmailMessageShardsFromCache(context.Background(), gmailBackupOptions{
|
||||
AccountHash: accountHash,
|
||||
ShardMaxRows: 1,
|
||||
}, []string{"april-b", "april-a", "march-a"})
|
||||
if err != nil {
|
||||
t.Fatalf("buildGmailMessageShardsFromCache: %v", err)
|
||||
}
|
||||
if len(shards) != 3 {
|
||||
t.Fatalf("len(shards) = %d, want 3", len(shards))
|
||||
}
|
||||
wantPaths := []string{
|
||||
"data/gmail/accthash/messages/2026/03/part-0001.jsonl.gz.age",
|
||||
"data/gmail/accthash/messages/2026/04/part-0001.jsonl.gz.age",
|
||||
"data/gmail/accthash/messages/2026/04/part-0002.jsonl.gz.age",
|
||||
}
|
||||
for i, want := range wantPaths {
|
||||
if shards[i].Path != want {
|
||||
t.Fatalf("shards[%d].Path = %q, want %q", i, shards[i].Path, want)
|
||||
}
|
||||
if shards[i].PlaintextPath == "" {
|
||||
t.Fatalf("shards[%d] missing PlaintextPath", i)
|
||||
}
|
||||
data, err := os.ReadFile(shards[i].PlaintextPath)
|
||||
if err != nil {
|
||||
t.Fatalf("read plaintext shard: %v", err)
|
||||
}
|
||||
var rows []gmailBackupMessage
|
||||
if err := backup.DecodeJSONL(data, &rows); err != nil {
|
||||
t.Fatalf("DecodeJSONL: %v", err)
|
||||
}
|
||||
if len(rows) != 1 {
|
||||
t.Fatalf("rows len = %d, want 1", len(rows))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFetchBackupDriveCollaborationCollectsMetadataAndErrors(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user