diff --git a/CHANGELOG.md b/CHANGELOG.md index 45c607b..8edddb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to `discrawl` will be documented in this file. +## Unreleased + +### Fixes + +- `wiretap` now uses a fast default path for Discord Chromium cache imports: it scans cheap context files plus route-bearing HTTP cache entries, checkpoints file progress in batches, and leaves exhaustive historical cache archaeology behind `--full-cache` / `desktop.full_cache`. + ## 0.6.5 - 2026-05-03 ### Fixes diff --git a/README.md b/README.md index 58bc1b8..61d38a1 100644 --- a/README.md +++ b/README.md @@ -253,6 +253,7 @@ This is the path for searchable DMs because bot tokens cannot read personal dire discrawl wiretap discrawl wiretap --path "$HOME/Library/Application Support/discord" discrawl wiretap --dry-run +discrawl wiretap --full-cache discrawl wiretap --watch-every 2m ``` @@ -264,7 +265,8 @@ Notes: - preserves existing local `@me` guilds, channels, messages, and attachments when importing a Git snapshot, so a shared guild mirror refresh does not wipe local wiretap DM search - drops message payloads whose channel cannot be classified from cached channel metadata or Discord route URLs; dropped rows are counted as `skipped_messages` - imports what Discord Desktop has cached locally, not complete live DM history -- scans local `.ldb`, `.log`, `.json`, and `.txt` artifacts for Discord message JSON +- scans local `.ldb`, `.log`, `.json`, and `.txt` artifacts for Discord message JSON, plus route-bearing Chromium HTTP cache entries by default +- use `--full-cache` or `desktop.full_cache = true` for exhaustive Chromium cache import when you want slower historical guild-cache archaeology - does not extract, store, or print Discord auth tokens - `--max-file-bytes` skips unusually large files; default is 64 MiB @@ -573,6 +575,7 @@ attachment_text = true [desktop] path = "~/.config/discord" # macOS default: "~/Library/Application Support/discord" max_file_bytes = 67108864 +full_cache = false [search] default_mode = "fts" diff --git a/SPEC.md b/SPEC.md index 8c5c07f..257baa4 100644 --- a/SPEC.md +++ b/SPEC.md @@ -465,12 +465,14 @@ Expected flags: - `--dry-run` - `--watch-every ` - `--max-file-bytes ` +- `--full-cache` Requirements: - never use Discord user tokens - never extract or persist auth tokens from desktop cache - scan bounded local files only +- default to route-bearing HTTP cache entries; exhaustive Chromium cache scans require explicit full-cache mode - store sanitized raw metadata, not full arbitrary cache blobs ### `search` diff --git a/internal/cli/admin_commands.go b/internal/cli/admin_commands.go index ddbb82f..3cf04b6 100644 --- a/internal/cli/admin_commands.go +++ b/internal/cli/admin_commands.go @@ -181,6 +181,7 @@ func (r *runtime) runSyncLocked(sources syncSources, opts syncer.SyncOptions) er stats, err := discorddesktop.Import(r.ctx, r.store, discorddesktop.Options{ Path: r.cfg.Desktop.Path, MaxFileBytes: r.cfg.Desktop.MaxFileBytes, + FullCache: r.cfg.Desktop.FullCache, Now: r.now, }) if err != nil { @@ -262,6 +263,7 @@ func (r *runtime) runWiretap(args []string) error { fs.SetOutput(io.Discard) path := fs.String("path", r.cfg.Desktop.Path, "") maxFileBytes := fs.Int64("max-file-bytes", r.cfg.Desktop.MaxFileBytes, "") + fullCache := fs.Bool("full-cache", r.cfg.Desktop.FullCache, "") dryRun := fs.Bool("dry-run", false, "") watchEvery := fs.Duration("watch-every", 0, "") if err := fs.Parse(args); err != nil { @@ -277,6 +279,7 @@ func (r *runtime) runWiretap(args []string) error { stats, err := discorddesktop.Import(ctx, r.store, discorddesktop.Options{ Path: *path, MaxFileBytes: *maxFileBytes, + FullCache: *fullCache, DryRun: *dryRun, Now: r.now, }) diff --git a/internal/cli/output.go b/internal/cli/output.go index 328a415..22e07ab 100644 --- a/internal/cli/output.go +++ b/internal/cli/output.go @@ -142,8 +142,8 @@ func printHuman(w io.Writer, value any) error { } } if v.Wiretap != nil { - if _, err := fmt.Fprintf(w, "wiretap_files=%d\nwiretap_unchanged=%d\nwiretap_messages=%d\nwiretap_dm_messages=%d\nwiretap_dm_channels=%d\nwiretap_guild_messages=%d\nwiretap_skipped_messages=%d\nwiretap_skipped_channels=%d\n", - v.Wiretap.FilesScanned, v.Wiretap.FilesUnchanged, v.Wiretap.Messages, v.Wiretap.DMMessages, v.Wiretap.DMChannels, v.Wiretap.GuildMessages, v.Wiretap.SkippedMessages, v.Wiretap.SkippedChannels); err != nil { + if _, err := fmt.Fprintf(w, "wiretap_visited=%d\nwiretap_files=%d\nwiretap_unchanged=%d\nwiretap_fast_skipped=%d\nwiretap_messages=%d\nwiretap_dm_messages=%d\nwiretap_dm_channels=%d\nwiretap_guild_messages=%d\nwiretap_skipped_messages=%d\nwiretap_skipped_channels=%d\nwiretap_checkpoints=%d\n", + v.Wiretap.FilesVisited, v.Wiretap.FilesScanned, v.Wiretap.FilesUnchanged, v.Wiretap.CacheFilesFastSkipped, v.Wiretap.Messages, v.Wiretap.DMMessages, v.Wiretap.DMChannels, v.Wiretap.GuildMessages, v.Wiretap.SkippedMessages, v.Wiretap.SkippedChannels, v.Wiretap.Checkpoints); err != nil { return err } } @@ -152,8 +152,8 @@ func printHuman(w io.Writer, value any) error { _, err := fmt.Fprintf(w, "guilds=%d channels=%d threads=%d members=%d messages=%d\n", v.Guilds, v.Channels, v.Threads, v.Members, v.Messages) return err case discorddesktop.Stats: - _, err := fmt.Fprintf(w, "path=%s\nfiles=%d\nskipped=%d\nunchanged=%d\nobjects=%d\nguilds=%d\nchannels=%d\nmessages=%d\ndm_messages=%d\ndm_channels=%d\nguild_messages=%d\nskipped_messages=%d\nskipped_channels=%d\ndry_run=%t\n", - v.Path, v.FilesScanned, v.FilesSkipped, v.FilesUnchanged, v.JSONObjects, v.Guilds, v.Channels, v.Messages, v.DMMessages, v.DMChannels, v.GuildMessages, v.SkippedMessages, v.SkippedChannels, v.DryRun) + _, err := fmt.Fprintf(w, "path=%s\nvisited=%d\nfiles=%d\nskipped=%d\nunchanged=%d\nfast_skipped=%d\nobjects=%d\nguilds=%d\nchannels=%d\nmessages=%d\ndm_messages=%d\ndm_channels=%d\nguild_messages=%d\nskipped_messages=%d\nskipped_channels=%d\ncheckpoints=%d\nfull_cache=%t\ndry_run=%t\n", + v.Path, v.FilesVisited, v.FilesScanned, v.FilesSkipped, v.FilesUnchanged, v.CacheFilesFastSkipped, v.JSONObjects, v.Guilds, v.Channels, v.Messages, v.DMMessages, v.DMChannels, v.GuildMessages, v.SkippedMessages, v.SkippedChannels, v.Checkpoints, v.FullCache, v.DryRun) return err case store.Status: _, err := fmt.Fprintf(w, "db=%s\nguilds=%d\nchannels=%d\nthreads=%d\nmessages=%d\nmembers=%d\nembedding_backlog=%d\nlast_sync=%s\nlast_tail_event=%s\n", diff --git a/internal/config/config.go b/internal/config/config.go index f6234b8..8d767f1 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -44,6 +44,7 @@ type DiscordConfig struct { type DesktopConfig struct { Path string `toml:"path"` MaxFileBytes int64 `toml:"max_file_bytes"` + FullCache bool `toml:"full_cache"` } type SyncConfig struct { diff --git a/internal/discorddesktop/import.go b/internal/discorddesktop/import.go index 7f3ec96..4c9b761 100644 --- a/internal/discorddesktop/import.go +++ b/internal/discorddesktop/import.go @@ -27,35 +27,43 @@ const ( DirectMessageGuildName = "Discord Direct Messages" defaultMaxFileBytes = 64 << 20 maxObjectBytes = 4 << 20 + cacheSniffBytes = 1 << 20 + checkpointEveryFiles = 256 ) var channelRouteRE = regexp.MustCompile(`/channels/(@me|[0-9]{12,24})/([0-9]{12,24})`) +var apiMessagesRouteRE = regexp.MustCompile(`/api/v[0-9]+/channels/[0-9]{12,24}/messages`) type Options struct { Path string MaxFileBytes int64 DryRun bool + FullCache bool Now func() time.Time } type Stats struct { - Path string `json:"path"` - FilesScanned int `json:"files_scanned"` - FilesSkipped int `json:"files_skipped"` - FilesUnchanged int `json:"files_unchanged"` - BytesScanned int64 `json:"bytes_scanned"` - JSONObjects int `json:"json_objects"` - Guilds int `json:"guilds"` - Channels int `json:"channels"` - Messages int `json:"messages"` - DMMessages int `json:"dm_messages"` - DMChannels int `json:"dm_channels"` - GuildMessages int `json:"guild_messages"` - SkippedMessages int `json:"skipped_messages"` - SkippedChannels int `json:"skipped_channels"` - DryRun bool `json:"dry_run,omitempty"` - StartedAt time.Time `json:"started_at"` - FinishedAt time.Time `json:"finished_at"` + Path string `json:"path"` + FilesVisited int `json:"files_visited"` + FilesScanned int `json:"files_scanned"` + FilesSkipped int `json:"files_skipped"` + FilesUnchanged int `json:"files_unchanged"` + CacheFilesFastSkipped int `json:"cache_files_fast_skipped"` + BytesScanned int64 `json:"bytes_scanned"` + JSONObjects int `json:"json_objects"` + Guilds int `json:"guilds"` + Channels int `json:"channels"` + Messages int `json:"messages"` + DMMessages int `json:"dm_messages"` + DMChannels int `json:"dm_channels"` + GuildMessages int `json:"guild_messages"` + SkippedMessages int `json:"skipped_messages"` + SkippedChannels int `json:"skipped_channels"` + Checkpoints int `json:"checkpoints"` + DryRun bool `json:"dry_run,omitempty"` + FullCache bool `json:"full_cache,omitempty"` + StartedAt time.Time `json:"started_at"` + FinishedAt time.Time `json:"finished_at"` } type snapshot struct { @@ -67,8 +75,9 @@ type snapshot struct { } type fileFingerprint struct { - Size int64 `json:"size"` - ModUnixNS int64 `json:"mod_unix_ns"` + Size int64 `json:"size"` + ModUnixNS int64 `json:"mod_unix_ns"` + Status string `json:"status,omitempty"` } type scanState struct { @@ -77,8 +86,42 @@ type scanState struct { channels map[string]store.ChannelRecord } +type fileSource int + +const ( + fileSourceContext fileSource = iota + fileSourceCacheData +) + +type fileCandidate struct { + absPath string + relPath string + relKey string + source fileSource + info fs.FileInfo + fingerprint fileFingerprint +} + +type scanTotals struct { + guilds map[string]struct{} + channels map[string]struct{} + messages map[string]struct{} + dmMessages map[string]struct{} + guildMessages map[string]struct{} + dmChannels map[string]struct{} + skippedMessages map[string]struct{} + skippedChannels map[string]struct{} +} + +type unresolvedMessages map[string]string + const wiretapFileIndexScope = "wiretap:file_index:v1" +const ( + fileStatusImported = "imported" + fileStatusSkipped = "skipped" +) + func DefaultPath() string { home, _ := os.UserHomeDir() switch runtime.GOOS { @@ -105,25 +148,29 @@ func Import(ctx context.Context, st *store.Store, opts Options) (Stats, error) { if err != nil { return Stats{}, err } - stats, snap, err := scan(ctx, opts, state) + if opts.FullCache { + stats, snap, err := scanFullCache(ctx, opts, state) + if err != nil { + return stats, err + } + stats.DryRun = opts.DryRun + if opts.DryRun { + return stats, nil + } + if err := writeSnapshot(ctx, st, snap, len(state.previous) == 0); err != nil { + return stats, err + } + if err := saveFileIndex(ctx, st, opts, state.current); err != nil { + return stats, err + } + stats.Checkpoints = 1 + return stats, nil + } + stats, err := scanAndImport(ctx, st, opts, state) if err != nil { return stats, err } stats.DryRun = opts.DryRun - if opts.DryRun { - return stats, nil - } - fullScan := len(state.previous) == 0 - if snapshotHasChanges(snap) || fullScan { - if err := writeSnapshot(ctx, st, snap, fullScan); err != nil { - return stats, err - } - } else if err := st.SetSyncState(ctx, "wiretap:last_import", time.Now().UTC().Format(time.RFC3339Nano)); err != nil { - return stats, err - } - if err := saveFileIndex(ctx, st, state.current); err != nil { - return stats, err - } return stats, nil } @@ -136,7 +183,7 @@ func loadScanState(ctx context.Context, st *store.Store, opts Options) (scanStat if st == nil || opts.DryRun { return state, nil } - raw, err := st.GetSyncState(ctx, wiretapFileIndexScope) + raw, err := st.GetSyncState(ctx, fileIndexScope(opts)) if err != nil { return state, err } @@ -160,19 +207,107 @@ func loadScanState(ctx context.Context, st *store.Store, opts Options) (scanStat return state, nil } -func saveFileIndex(ctx context.Context, st *store.Store, index map[string]fileFingerprint) error { +func fileIndexScope(Options) string { + return wiretapFileIndexScope +} + +func saveFileIndex(ctx context.Context, st *store.Store, opts Options, index map[string]fileFingerprint) error { body, err := json.Marshal(index) if err != nil { return err } - return st.SetSyncState(ctx, wiretapFileIndexScope, string(body)) + return st.SetSyncState(ctx, fileIndexScope(opts), string(body)) +} + +func sameFileFingerprint(a, b fileFingerprint) bool { + return a.Size == b.Size && a.ModUnixNS == b.ModUnixNS +} + +func isImportedFingerprint(fingerprint fileFingerprint) bool { + return fingerprint.Status == "" || fingerprint.Status == fileStatusImported +} + +func importedFingerprint(fingerprint fileFingerprint) fileFingerprint { + fingerprint.Status = fileStatusImported + return fingerprint +} + +func skippedFingerprint(fingerprint fileFingerprint) fileFingerprint { + fingerprint.Status = fileStatusSkipped + return fingerprint } func snapshotHasChanges(snap snapshot) bool { return len(snap.guilds) > 0 || len(snap.channels) > 0 || len(snap.messages) > 0 } -func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot, error) { +func scanAndImport(ctx context.Context, st *store.Store, opts Options, state scanState) (Stats, error) { + now := opts.Now + if now == nil { + now = time.Now + } + root := strings.TrimSpace(opts.Path) + if root == "" { + root = DefaultPath() + } + stats := Stats{Path: root, FullCache: opts.FullCache, StartedAt: now().UTC()} + rootFS, err := os.OpenRoot(root) + if err != nil { + stats.FinishedAt = now().UTC() + return stats, ignoreCacheFileError(err) + } + defer func() { _ = rootFS.Close() }() + contextFiles, cacheFiles, err := discoverCandidates(ctx, root, rootFS, opts, state, &stats) + if err != nil { + stats.FinishedAt = now().UTC() + return stats, err + } + fullScan := len(state.previous) == 0 + if fullScan && !opts.DryRun { + if err := st.DeleteGuildData(ctx, "@unknown"); err != nil { + stats.FinishedAt = now().UTC() + return stats, err + } + } + run := newImportRun(ctx, st, opts, state, rootFS, &stats) + if err := run.scanContext(contextFiles); err != nil { + stats.FinishedAt = now().UTC() + return stats, err + } + if err := collectCacheRouteHints(ctx, rootFS, cacheFiles, run.base); err != nil { + stats.FinishedAt = now().UTC() + return stats, err + } + if err := run.scanCacheBatches(cacheFiles); err != nil { + stats.FinishedAt = now().UTC() + return stats, err + } + if err := run.retryPending(); err != nil { + stats.FinishedAt = now().UTC() + return stats, err + } + if !opts.DryRun { + if len(contextFiles) == 0 && len(cacheFiles) == 0 { + if err := st.SetSyncState(ctx, "wiretap:last_import", time.Now().UTC().Format(time.RFC3339Nano)); err != nil { + stats.FinishedAt = now().UTC() + return stats, err + } + if err := saveFileIndex(ctx, st, opts, state.current); err != nil { + stats.FinishedAt = now().UTC() + return stats, err + } + stats.Checkpoints++ + } + if err := st.DeleteOrphanChannels(ctx, DirectMessageGuildID); err != nil { + stats.FinishedAt = now().UTC() + return stats, err + } + } + stats.FinishedAt = now().UTC() + return stats, nil +} + +func scanFullCache(ctx context.Context, opts Options, state scanState) (Stats, snapshot, error) { now := opts.Now if now == nil { now = time.Now @@ -185,14 +320,8 @@ func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot, if maxBytes <= 0 { maxBytes = defaultMaxFileBytes } - stats := Stats{Path: root, StartedAt: now().UTC()} - snap := snapshot{ - guilds: map[string]store.GuildRecord{}, - channels: map[string]store.ChannelRecord{}, - messages: map[string]store.MessageMutation{}, - routes: map[string]string{}, - userLabels: map[string]userLabel{}, - } + stats := Stats{Path: root, FullCache: true, StartedAt: now().UTC()} + snap := newSnapshot() rootFS, err := os.OpenRoot(root) if err != nil { stats.FinishedAt = now().UTC() @@ -212,6 +341,7 @@ func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot, } return nil } + stats.FilesVisited++ info, err := entry.Info() if err != nil { stats.FilesSkipped++ @@ -231,8 +361,8 @@ func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot, Size: info.Size(), ModUnixNS: info.ModTime().UnixNano(), } - state.current[relKey] = fingerprint - if previous, ok := state.previous[relKey]; ok && previous == fingerprint { + state.current[relKey] = importedFingerprint(fingerprint) + if previous, ok := state.previous[relKey]; ok && sameFileFingerprint(previous, fingerprint) && isImportedFingerprint(previous) { stats.FilesUnchanged++ return nil } @@ -267,15 +397,181 @@ func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot, }); err != nil { return stats, snap, err } - reconcileMessages(snap, state.channels) + totals := newScanTotals() + finalizeSnapshot(snap, state.channels, totals, &stats, true) + stats.FinishedAt = now().UTC() + return stats, snap, nil +} + +func discoverCandidates(ctx context.Context, root string, rootFS *os.Root, opts Options, state scanState, stats *Stats) ([]fileCandidate, []fileCandidate, error) { + var contextFiles []fileCandidate + var cacheFiles []fileCandidate + maxBytes := opts.MaxFileBytes + if maxBytes <= 0 { + maxBytes = defaultMaxFileBytes + } + err := filepath.WalkDir(root, func(path string, entry fs.DirEntry, err error) error { + if err != nil { + return ignoreCacheFileError(err) + } + if ctx.Err() != nil { + return ctx.Err() + } + if entry.IsDir() { + if shouldSkipDir(entry.Name()) && path != root { + return filepath.SkipDir + } + return nil + } + stats.FilesVisited++ + info, err := entry.Info() + if err != nil { + stats.FilesSkipped++ + return ignoreCacheFileError(err) + } + if !isCandidateFile(path) || info.Size() <= 0 || info.Size() > maxBytes { + stats.FilesSkipped++ + return nil + } + relPath, err := filepath.Rel(root, path) + if err != nil { + stats.FilesSkipped++ + return ignoreCacheFileError(err) + } + relKey := filepath.ToSlash(relPath) + fingerprint := fileFingerprint{ + Size: info.Size(), + ModUnixNS: info.ModTime().UnixNano(), + } + candidate := fileCandidate{ + absPath: path, + relPath: relPath, + relKey: relKey, + source: sourceForPath(root, path, relPath), + info: info, + fingerprint: fingerprint, + } + if candidate.source == fileSourceCacheData { + if previous, ok := state.previous[relKey]; ok && sameFileFingerprint(previous, fingerprint) { + if !opts.FullCache || isImportedFingerprint(previous) { + state.current[relKey] = previous + stats.FilesUnchanged++ + return nil + } + } + if !opts.FullCache { + ok, err := cacheFileHasRouteHint(rootFS, relPath) + if err != nil { + stats.FilesSkipped++ + return ignoreCacheFileError(err) + } + if !ok { + state.current[relKey] = skippedFingerprint(fingerprint) + stats.FilesSkipped++ + stats.CacheFilesFastSkipped++ + return nil + } + } + cacheFiles = append(cacheFiles, candidate) + return nil + } + if previous, ok := state.previous[relKey]; ok && sameFileFingerprint(previous, fingerprint) { + state.current[relKey] = previous + stats.FilesUnchanged++ + return nil + } + contextFiles = append(contextFiles, candidate) + return nil + }) + return contextFiles, cacheFiles, err +} + +func scanCandidates(ctx context.Context, rootFS *os.Root, opts Options, candidates []fileCandidate, snap snapshot, channelLookup map[string]store.ChannelRecord, stats *Stats) error { + maxBytes := opts.MaxFileBytes + if maxBytes <= 0 { + maxBytes = defaultMaxFileBytes + } + for _, candidate := range candidates { + if err := ctx.Err(); err != nil { + return err + } + data, err := rootFS.ReadFile(candidate.relPath) + if err != nil { + stats.FilesSkipped++ + if err := ignoreCacheFileError(err); err != nil { + return err + } + continue + } + stats.FilesScanned++ + stats.BytesScanned += int64(len(data)) + collectChannelRoutes(snap, bytes.ToValidUTF8(data, nil)) + objects := extractJSONValues(bytes.ToValidUTF8(data, nil)) + for _, payload := range extractGzipPayloads(data, maxBytes) { + if err := ctx.Err(); err != nil { + return err + } + collectChannelRoutes(snap, bytes.ToValidUTF8(payload, nil)) + objects = append(objects, extractJSONValues(bytes.ToValidUTF8(payload, nil))...) + } + stats.JSONObjects += len(objects) + for _, raw := range objects { + if err := ctx.Err(); err != nil { + return err + } + var value any + if err := json.Unmarshal(raw, &value); err != nil { + continue + } + collectValue(snap, channelLookup, value, candidate.info.ModTime().UTC()) + } + } + return nil +} + +func collectCacheRouteHints(ctx context.Context, rootFS *os.Root, candidates []fileCandidate, snap snapshot) error { + for _, candidate := range candidates { + if err := ctx.Err(); err != nil { + return err + } + data, err := readFilePrefix(rootFS, candidate.relPath) + if err != nil { + if err := ignoreCacheFileError(err); err != nil { + return err + } + continue + } + collectChannelRoutes(snap, bytes.ToValidUTF8(data, nil)) + } + return nil +} + +func newScanTotals() scanTotals { + return scanTotals{ + guilds: map[string]struct{}{}, + channels: map[string]struct{}{}, + messages: map[string]struct{}{}, + dmMessages: map[string]struct{}{}, + guildMessages: map[string]struct{}{}, + dmChannels: map[string]struct{}{}, + skippedMessages: map[string]struct{}{}, + skippedChannels: map[string]struct{}{}, + } +} + +func finalizeSnapshot(snap snapshot, channelLookup map[string]store.ChannelRecord, totals scanTotals, stats *Stats, recordSkipped bool) unresolvedMessages { + reconcileMessages(snap, channelLookup) inferDirectMessageNames(snap) - reconcileMessages(snap, state.channels) - skippedChannels := map[string]struct{}{} + reconcileMessages(snap, channelLookup) + unresolved := unresolvedMessages{} for id, msg := range snap.messages { guildID := msg.Record.GuildID if guildID == "" { - stats.SkippedMessages++ - skippedChannels[msg.Record.ChannelID] = struct{}{} + unresolved[id] = msg.Record.ChannelID + if recordSkipped { + totals.skippedMessages[id] = struct{}{} + totals.skippedChannels[msg.Record.ChannelID] = struct{}{} + } delete(snap.messages, id) continue } @@ -283,34 +579,195 @@ func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot, snap.guilds[guildID] = syntheticGuild(guildID, guildName(guildID)) } if _, ok := snap.channels[msg.Record.ChannelID]; !ok { - snap.channels[msg.Record.ChannelID] = syntheticChannel(msg.Record.ChannelID, guildID, msg.Record.ChannelName) + if channel, ok := channelLookup[msg.Record.ChannelID]; ok && channel.GuildID != "" { + snap.channels[msg.Record.ChannelID] = channel + } else { + snap.channels[msg.Record.ChannelID] = syntheticChannel(msg.Record.ChannelID, guildID, msg.Record.ChannelName) + } } snap.messages[id] = msg } - messageChannels := map[string]struct{}{} - dmChannels := map[string]struct{}{} for _, msg := range snap.messages { - messageChannels[msg.Record.ChannelID] = struct{}{} + totals.messages[msg.Record.ID] = struct{}{} switch msg.Record.GuildID { case DirectMessageGuildID: - stats.DMMessages++ - dmChannels[msg.Record.ChannelID] = struct{}{} + totals.dmMessages[msg.Record.ID] = struct{}{} + totals.dmChannels[msg.Record.ChannelID] = struct{}{} default: - stats.GuildMessages++ + totals.guildMessages[msg.Record.ID] = struct{}{} } } - for id := range snap.channels { - if _, ok := messageChannels[id]; !ok { - delete(snap.channels, id) - } + for id, channel := range snap.channels { + channelLookup[id] = channel + totals.channels[id] = struct{}{} } - stats.DMChannels = len(dmChannels) - stats.SkippedChannels = len(skippedChannels) - stats.Guilds = len(snap.guilds) - stats.Channels = len(snap.channels) - stats.Messages = len(snap.messages) - stats.FinishedAt = now().UTC() - return stats, snap, nil + for id := range snap.guilds { + totals.guilds[id] = struct{}{} + } + stats.DMChannels = len(totals.dmChannels) + stats.SkippedChannels = len(totals.skippedChannels) + stats.Guilds = len(totals.guilds) + stats.Channels = len(totals.channels) + stats.Messages = len(totals.messages) + stats.DMMessages = len(totals.dmMessages) + stats.GuildMessages = len(totals.guildMessages) + stats.SkippedMessages = len(totals.skippedMessages) + return unresolved +} + +func mergeUnresolved(dst, src unresolvedMessages) { + for messageID, channelID := range src { + dst[messageID] = channelID + } +} + +func recordUnresolved(unresolved unresolvedMessages, totals scanTotals, stats *Stats) { + for messageID, channelID := range unresolved { + totals.skippedMessages[messageID] = struct{}{} + totals.skippedChannels[channelID] = struct{}{} + } + stats.SkippedChannels = len(totals.skippedChannels) + stats.SkippedMessages = len(totals.skippedMessages) +} + +func commitSnapshot(ctx context.Context, st *store.Store, opts Options, state scanState, candidates []fileCandidate, snap snapshot, checkpoint bool, stats *Stats) error { + if opts.DryRun { + return nil + } + if !checkpoint { + if snapshotHasChanges(snap) { + return writeSnapshot(ctx, st, snapshotWithoutMessageEvents(snap), false) + } + return nil + } + if snapshotHasChanges(snap) { + if err := writeSnapshot(ctx, st, snap, false); err != nil { + return err + } + } else if err := st.SetSyncState(ctx, "wiretap:last_import", time.Now().UTC().Format(time.RFC3339Nano)); err != nil { + return err + } + for _, candidate := range candidates { + state.current[candidate.relKey] = importedFingerprint(candidate.fingerprint) + } + if err := saveFileIndex(ctx, st, opts, state.current); err != nil { + return err + } + stats.Checkpoints++ + return nil +} + +func checkpointScannedCandidates(ctx context.Context, st *store.Store, opts Options, state scanState, candidates []fileCandidate, stats *Stats) error { + if opts.DryRun { + return nil + } + if err := st.SetSyncState(ctx, "wiretap:last_import", time.Now().UTC().Format(time.RFC3339Nano)); err != nil { + return err + } + for _, candidate := range candidates { + state.current[candidate.relKey] = importedFingerprint(candidate.fingerprint) + } + if err := saveFileIndex(ctx, st, opts, state.current); err != nil { + return err + } + stats.Checkpoints++ + return nil +} + +func snapshotWithoutMessageEvents(snap snapshot) snapshot { + out := snapshot{ + guilds: snap.guilds, + channels: snap.channels, + messages: make(map[string]store.MessageMutation, len(snap.messages)), + routes: snap.routes, + userLabels: snap.userLabels, + } + for id, message := range snap.messages { + message.Options.AppendEvent = false + out.messages[id] = message + } + return out +} + +func newSnapshot() snapshot { + return snapshot{ + guilds: map[string]store.GuildRecord{}, + channels: map[string]store.ChannelRecord{}, + messages: map[string]store.MessageMutation{}, + routes: map[string]string{}, + userLabels: map[string]userLabel{}, + } +} + +func newSnapshotWithContext(base snapshot) snapshot { + snap := newSnapshot() + for channelID, guildID := range base.routes { + snap.routes[channelID] = guildID + } + for userID, label := range base.userLabels { + snap.userLabels[userID] = label + } + return snap +} + +func mergeSnapshotContext(base snapshot, next snapshot) { + for channelID, guildID := range next.routes { + collectChannelRoute(base, channelID, guildID) + } + for userID, label := range next.userLabels { + base.userLabels[userID] = label + } + for channelID, channel := range next.channels { + base.channels[channelID] = channel + } +} + +func copyChannelLookup(in map[string]store.ChannelRecord) map[string]store.ChannelRecord { + out := make(map[string]store.ChannelRecord, len(in)) + for id, channel := range in { + out[id] = channel + } + return out +} + +func sourceForPath(root, path, relPath string) fileSource { + if isRouteFilteredCachePath(root, path, relPath) { + return fileSourceCacheData + } + return fileSourceContext +} + +func isRouteFilteredCachePath(root, path, relPath string) bool { + cleanRoot := filepath.ToSlash(root) + cleanPath := filepath.ToSlash(path) + cleanRel := filepath.ToSlash(relPath) + return filepath.Base(cleanRoot) == "Cache_Data" || + filepath.Base(cleanRoot) == "CacheStorage" || + strings.Contains(cleanPath, "/Cache/Cache_Data/") || + strings.Contains(cleanPath, "/Service Worker/CacheStorage/") || + strings.HasPrefix(cleanRel, "Cache_Data/") || + strings.HasPrefix(cleanRel, "Service Worker/CacheStorage/") +} + +func cacheFileHasRouteHint(rootFS *os.Root, relPath string) (bool, error) { + data, err := readFilePrefix(rootFS, relPath) + if err != nil { + return false, err + } + return channelRouteRE.Match(data) || apiMessagesRouteRE.Match(data), nil +} + +func readFilePrefix(rootFS *os.Root, relPath string) ([]byte, error) { + file, err := rootFS.Open(relPath) + if err != nil { + return nil, err + } + defer func() { _ = file.Close() }() + data, err := io.ReadAll(io.LimitReader(file, cacheSniffBytes)) + if err != nil { + return nil, err + } + return data, nil } func ignoreCacheFileError(error) error { diff --git a/internal/discorddesktop/import_pipeline_test.go b/internal/discorddesktop/import_pipeline_test.go new file mode 100644 index 0000000..c8bf974 --- /dev/null +++ b/internal/discorddesktop/import_pipeline_test.go @@ -0,0 +1,382 @@ +package discorddesktop + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/steipete/discrawl/internal/store" +) + +func TestImportFastCacheSkipsUnroutedCacheDataUnlessFullCache(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + cachePath := filepath.Join(dir, "Cache", "Cache_Data") + require.NoError(t, os.MkdirAll(cachePath, 0o755)) + require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_0"), []byte(` +{"id":"111111111111111121","guild_id":"999999999999999996","type":0,"name":"slow-cache"} +{"id":"333333333333333346","channel_id":"111111111111111121","content":"unrouted historical cache","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}} +`), 0o600)) + + fastStore, err := store.Open(ctx, filepath.Join(dir, "fast.db")) + require.NoError(t, err) + defer func() { _ = fastStore.Close() }() + + stats, err := Import(ctx, fastStore, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 0, stats.FilesScanned) + require.Equal(t, 1, stats.CacheFilesFastSkipped) + require.Equal(t, 0, stats.Messages) + + results, err := fastStore.SearchMessages(ctx, store.SearchOptions{Query: "unrouted historical", Limit: 10}) + require.NoError(t, err) + require.Empty(t, results) + + stats, err = Import(ctx, fastStore, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 0, stats.FilesScanned) + require.Equal(t, 0, stats.CacheFilesFastSkipped) + require.Equal(t, 1, stats.FilesUnchanged) + + stats, err = Import(ctx, fastStore, Options{Path: dir, FullCache: true}) + require.NoError(t, err) + require.Equal(t, 1, stats.FilesScanned) + require.Equal(t, 1, stats.Messages) + + fullStore, err := store.Open(ctx, filepath.Join(dir, "full.db")) + require.NoError(t, err) + defer func() { _ = fullStore.Close() }() + + stats, err = Import(ctx, fullStore, Options{Path: dir, FullCache: true}) + require.NoError(t, err) + require.Equal(t, 1, stats.FilesScanned) + require.Equal(t, 0, stats.CacheFilesFastSkipped) + require.Equal(t, 1, stats.Messages) + + results, err = fullStore.SearchMessages(ctx, store.SearchOptions{Query: "unrouted historical", Limit: 10}) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "slow-cache", results[0].ChannelName) +} + +func TestImportCheckpointsCacheBatches(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + cachePath := filepath.Join(dir, "Cache", "Cache_Data") + require.NoError(t, os.MkdirAll(cachePath, 0o755)) + for i := range checkpointEveryFiles + 1 { + channelID := "111111111111111121" + messageID := 333333333333333346 + i + body := []byte(fmt.Sprintf(`https://discord.com/channels/999999999999999996/%s +{"id":"%d","channel_id":"%s","content":"checkpoint cache %d","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}} +`, channelID, messageID, channelID, i)) + require.NoError(t, os.WriteFile(filepath.Join(cachePath, fmt.Sprintf("entry_%03d", i)), body, 0o600)) + } + + st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db")) + require.NoError(t, err) + defer func() { _ = st.Close() }() + + stats, err := Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, checkpointEveryFiles+1, stats.FilesScanned) + require.Equal(t, checkpointEveryFiles+1, stats.Messages) + require.GreaterOrEqual(t, stats.Checkpoints, 2) + + stats, err = Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 0, stats.FilesScanned) + require.Equal(t, checkpointEveryFiles+1, stats.FilesUnchanged) +} + +func TestImportUsesLaterCacheMetadataBeforeCheckpointingEarlierBatch(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + cachePath := filepath.Join(dir, "Cache", "Cache_Data") + require.NoError(t, os.MkdirAll(cachePath, 0o755)) + + channelID := "111111111111111121" + guildID := "999999999999999996" + require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_000"), []byte(fmt.Sprintf(`https://discord.com/api/v9/channels/%s/messages?limit=50 +{"id":"333333333333333346","channel_id":"%s","content":"needs later channel metadata","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}} +`, channelID, channelID)), 0o600)) + for i := 1; i < checkpointEveryFiles; i++ { + require.NoError(t, os.WriteFile(filepath.Join(cachePath, fmt.Sprintf("entry_%03d", i)), []byte(fmt.Sprintf( + "https://discord.com/api/v9/channels/%s/messages?limit=50\n", + channelID, + )), 0o600)) + } + require.NoError(t, os.WriteFile(filepath.Join(cachePath, fmt.Sprintf("entry_%03d", checkpointEveryFiles)), []byte(fmt.Sprintf(`https://discord.com/api/v9/channels/%s/messages?limit=50 +{"id":"%s","guild_id":"%s","type":0,"name":"later-metadata"} +`, channelID, channelID, guildID)), 0o600)) + + st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db")) + require.NoError(t, err) + defer func() { _ = st.Close() }() + + stats, err := Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, checkpointEveryFiles+1+checkpointEveryFiles, stats.FilesScanned) + require.Equal(t, 1, stats.Messages) + require.GreaterOrEqual(t, stats.Checkpoints, 2) + + results, err := st.SearchMessages(ctx, store.SearchOptions{Query: "needs later channel metadata", Limit: 10}) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, guildID, results[0].GuildID) + require.Equal(t, "later-metadata", results[0].ChannelName) + requireMessageCount(t, ctx, st, "message_events", 1) + + stats, err = Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 0, stats.FilesScanned) + require.Equal(t, checkpointEveryFiles+1, stats.FilesUnchanged) + requireMessageCount(t, ctx, st, "message_events", 1) +} + +func TestImportCheckpointsPartiallyResolvedRetryBatch(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + cachePath := filepath.Join(dir, "Cache", "Cache_Data") + require.NoError(t, os.MkdirAll(cachePath, 0o755)) + + resolvedChannelID := "111111111111111121" + unresolvedChannelID := "111111111111111122" + guildID := "999999999999999996" + require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_000"), []byte(fmt.Sprintf(`https://discord.com/api/v9/channels/%s/messages?limit=50 +https://discord.com/api/v9/channels/%s/messages?limit=50 +{"id":"333333333333333346","channel_id":"%s","content":"partially resolved retry message","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}} +{"id":"333333333333333347","channel_id":"%s","content":"still unresolved retry message","timestamp":"2026-04-23T18:20:44Z","author":{"id":"222222222222222232","username":"alice"}} +`, resolvedChannelID, unresolvedChannelID, resolvedChannelID, unresolvedChannelID)), 0o600)) + for i := 1; i < checkpointEveryFiles; i++ { + require.NoError(t, os.WriteFile(filepath.Join(cachePath, fmt.Sprintf("entry_%03d", i)), []byte(fmt.Sprintf( + "https://discord.com/api/v9/channels/%s/messages?limit=50\n", + resolvedChannelID, + )), 0o600)) + } + require.NoError(t, os.WriteFile(filepath.Join(cachePath, fmt.Sprintf("entry_%03d", checkpointEveryFiles)), []byte(fmt.Sprintf(`https://discord.com/api/v9/channels/%s/messages?limit=50 +{"id":"%s","guild_id":"%s","type":0,"name":"partially-resolved"} +`, resolvedChannelID, resolvedChannelID, guildID)), 0o600)) + + st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db")) + require.NoError(t, err) + defer func() { _ = st.Close() }() + + stats, err := Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, checkpointEveryFiles+1+checkpointEveryFiles, stats.FilesScanned) + require.Equal(t, 1, stats.Messages) + require.Equal(t, 1, stats.SkippedMessages) + require.GreaterOrEqual(t, stats.Checkpoints, 2) + + results, err := st.SearchMessages(ctx, store.SearchOptions{Query: "partially resolved retry", Limit: 10}) + require.NoError(t, err) + require.Len(t, results, 1) + require.Equal(t, "partially-resolved", results[0].ChannelName) + results, err = st.SearchMessages(ctx, store.SearchOptions{Query: "still unresolved retry", Limit: 10}) + require.NoError(t, err) + require.Empty(t, results) + requireMessageCount(t, ctx, st, "message_events", 1) + + stats, err = Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 0, stats.FilesScanned) + require.Equal(t, checkpointEveryFiles+1, stats.FilesUnchanged) + requireMessageCount(t, ctx, st, "message_events", 1) +} + +func TestImportCheckpointsUnresolvableRouteBearingCacheMisses(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + cachePath := filepath.Join(dir, "Cache", "Cache_Data") + require.NoError(t, os.MkdirAll(cachePath, 0o755)) + + channelID := "111111111111111121" + require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_000"), []byte(fmt.Sprintf(`https://discord.com/api/v9/channels/%s/messages?limit=50 +{"id":"333333333333333346","channel_id":"%s","content":"permanent unresolved cache miss","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}} +`, channelID, channelID)), 0o600)) + + st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db")) + require.NoError(t, err) + defer func() { _ = st.Close() }() + + stats, err := Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 1, stats.FilesScanned) + require.Equal(t, 1, stats.SkippedMessages) + require.Equal(t, 1, stats.Checkpoints) + + results, err := st.SearchMessages(ctx, store.SearchOptions{Query: "permanent unresolved", Limit: 10}) + require.NoError(t, err) + require.Empty(t, results) + + stats, err = Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 0, stats.FilesScanned) + require.Equal(t, 1, stats.FilesUnchanged) +} + +func TestImportDoesNotAppendEventsForSkippedMixedBatch(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + cachePath := filepath.Join(dir, "Cache", "Cache_Data") + require.NoError(t, os.MkdirAll(cachePath, 0o755)) + + guildID := "999999999999999996" + resolvedChannelID := "111111111111111121" + unresolvedChannelID := "111111111111111122" + require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_000"), []byte(fmt.Sprintf(`https://discord.com/channels/%s/%s +https://discord.com/api/v9/channels/%s/messages?limit=50 +{"id":"333333333333333346","channel_id":"%s","content":"mixed resolved message","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}} +{"id":"333333333333333347","channel_id":"%s","content":"mixed unresolved message","timestamp":"2026-04-23T18:20:44Z","author":{"id":"222222222222222232","username":"alice"}} +`, guildID, resolvedChannelID, unresolvedChannelID, resolvedChannelID, unresolvedChannelID)), 0o600)) + + st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db")) + require.NoError(t, err) + defer func() { _ = st.Close() }() + + stats, err := Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 1, stats.FilesScanned) + require.Equal(t, 1, stats.Checkpoints) + requireMessageCount(t, ctx, st, "message_events", 0) + + results, err := st.SearchMessages(ctx, store.SearchOptions{Query: "mixed resolved", Limit: 10}) + require.NoError(t, err) + require.Len(t, results, 1) + results, err = st.SearchMessages(ctx, store.SearchOptions{Query: "mixed unresolved", Limit: 10}) + require.NoError(t, err) + require.Empty(t, results) + + stats, err = Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 0, stats.FilesScanned) + require.Equal(t, 1, stats.FilesUnchanged) + requireMessageCount(t, ctx, st, "message_events", 0) +} + +func TestImportDoesNotDuplicateEventsWhenSwitchingFullCacheModes(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + cachePath := filepath.Join(dir, "Cache", "Cache_Data") + require.NoError(t, os.MkdirAll(cachePath, 0o755)) + + channelID := "111111111111111121" + guildID := "999999999999999996" + require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_000"), []byte(fmt.Sprintf(`https://discord.com/channels/%s/%s +{"id":"%s","guild_id":"%s","type":0,"name":"mode-switch"} +{"id":"333333333333333346","channel_id":"%s","content":"mode switch event once","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}} +`, guildID, channelID, channelID, guildID, channelID)), 0o600)) + + t.Run("full then default", func(t *testing.T) { + st, err := store.Open(ctx, filepath.Join(dir, "full-first.db")) + require.NoError(t, err) + defer func() { _ = st.Close() }() + + stats, err := Import(ctx, st, Options{Path: dir, FullCache: true}) + require.NoError(t, err) + require.Equal(t, 1, stats.FilesScanned) + require.Equal(t, 1, stats.Messages) + requireMessageCount(t, ctx, st, "message_events", 1) + + stats, err = Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 0, stats.FilesScanned) + require.Equal(t, 1, stats.FilesUnchanged) + requireMessageCount(t, ctx, st, "message_events", 1) + }) + + t.Run("default then full", func(t *testing.T) { + st, err := store.Open(ctx, filepath.Join(dir, "default-first.db")) + require.NoError(t, err) + defer func() { _ = st.Close() }() + + stats, err := Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 1, stats.FilesScanned) + require.Equal(t, 1, stats.Messages) + requireMessageCount(t, ctx, st, "message_events", 1) + + stats, err = Import(ctx, st, Options{Path: dir, FullCache: true}) + require.NoError(t, err) + require.Equal(t, 0, stats.FilesScanned) + require.Equal(t, 1, stats.FilesUnchanged) + requireMessageCount(t, ctx, st, "message_events", 1) + }) +} + +func TestImportFastCachePreservesKnownChannelMetadataAcrossBatches(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + leveldbPath := filepath.Join(dir, "Local Storage", "leveldb") + cachePath := filepath.Join(dir, "Cache", "Cache_Data") + require.NoError(t, os.MkdirAll(leveldbPath, 0o755)) + require.NoError(t, os.MkdirAll(cachePath, 0o755)) + + channelID := "111111111111111121" + guildID := "999999999999999996" + require.NoError(t, os.WriteFile(filepath.Join(leveldbPath, "000001.log"), []byte(fmt.Sprintf( + `{"id":"%s","guild_id":"%s","type":11,"name":"known-thread","thread_metadata":{"archived":false}}`, + channelID, + guildID, + )), 0o600)) + require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_0"), []byte(fmt.Sprintf(`https://discord.com/channels/%s/%s +{"id":"333333333333333346","channel_id":"%s","content":"thread metadata cache","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}} +`, guildID, channelID, channelID)), 0o600)) + + st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db")) + require.NoError(t, err) + defer func() { _ = st.Close() }() + + stats, err := Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 1, stats.Messages) + + channels, err := st.Channels(ctx, guildID) + require.NoError(t, err) + require.Len(t, channels, 1) + require.Equal(t, "known-thread", channels[0].Name) + require.Equal(t, "thread_public", channels[0].Kind) + + _, rows, err := st.ReadOnlyQuery(ctx, "select raw_json from channels where id = '111111111111111121'") + require.NoError(t, err) + require.Len(t, rows, 1) + require.Contains(t, rows[0][0], `"type":11`) +} + +func TestImportFastCacheRouteFiltersServiceWorkerCacheStorage(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + cachePath := filepath.Join(dir, "Service Worker", "CacheStorage", "cache-id") + require.NoError(t, os.MkdirAll(cachePath, 0o755)) + require.NoError(t, os.WriteFile(filepath.Join(cachePath, "unrouted"), []byte(` +{"id":"111111111111111121","guild_id":"999999999999999996","type":0,"name":"service-worker-cache"} +{"id":"333333333333333346","channel_id":"111111111111111121","content":"service worker historical cache","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}} +`), 0o600)) + + st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db")) + require.NoError(t, err) + defer func() { _ = st.Close() }() + + stats, err := Import(ctx, st, Options{Path: dir}) + require.NoError(t, err) + require.Equal(t, 0, stats.FilesScanned) + require.Equal(t, 1, stats.CacheFilesFastSkipped) + + results, err := st.SearchMessages(ctx, store.SearchOptions{Query: "service worker historical", Limit: 10}) + require.NoError(t, err) + require.Empty(t, results) +} + +func requireMessageCount(t *testing.T, ctx context.Context, st *store.Store, table string, expected int) { + t.Helper() + _, rows, err := st.ReadOnlyQuery(ctx, fmt.Sprintf("select count(*) from %s", table)) + require.NoError(t, err) + require.Len(t, rows, 1) + require.Len(t, rows[0], 1) + require.Equal(t, fmt.Sprint(expected), rows[0][0]) +} diff --git a/internal/discorddesktop/import_run.go b/internal/discorddesktop/import_run.go new file mode 100644 index 0000000..0142836 --- /dev/null +++ b/internal/discorddesktop/import_run.go @@ -0,0 +1,113 @@ +package discorddesktop + +import ( + "context" + "os" + + "github.com/steipete/discrawl/internal/store" +) + +type importRun struct { + ctx context.Context + st *store.Store + opts Options + state scanState + rootFS *os.Root + channelLookup map[string]store.ChannelRecord + totals scanTotals + stats *Stats + base snapshot + pending []fileCandidate + pendingUnresolved unresolvedMessages + pendingLookupSize int + pendingRouteSize int +} + +func newImportRun(ctx context.Context, st *store.Store, opts Options, state scanState, rootFS *os.Root, stats *Stats) *importRun { + return &importRun{ + ctx: ctx, + st: st, + opts: opts, + state: state, + rootFS: rootFS, + channelLookup: copyChannelLookup(state.channels), + totals: newScanTotals(), + stats: stats, + base: newSnapshot(), + pendingUnresolved: unresolvedMessages{}, + pendingLookupSize: -1, + pendingRouteSize: -1, + } +} + +func (r *importRun) scanContext(candidates []fileCandidate) error { + if err := scanCandidates(r.ctx, r.rootFS, r.opts, candidates, r.base, r.channelLookup, r.stats); err != nil { + return err + } + return r.finalizeAndCommit(candidates, r.base, false) +} + +func (r *importRun) scanCacheBatches(candidates []fileCandidate) error { + for start := 0; start < len(candidates); start += checkpointEveryFiles { + end := start + checkpointEveryFiles + if end > len(candidates) { + end = len(candidates) + } + batchCandidates := candidates[start:end] + batch := newSnapshotWithContext(r.base) + if err := scanCandidates(r.ctx, r.rootFS, r.opts, batchCandidates, batch, r.channelLookup, r.stats); err != nil { + return err + } + if err := r.finalizeAndCommit(batchCandidates, batch, false); err != nil { + return err + } + mergeSnapshotContext(r.base, batch) + } + return nil +} + +func (r *importRun) finalizeAndCommit(candidates []fileCandidate, snap snapshot, recordSkipped bool) error { + unresolved := finalizeSnapshot(snap, r.channelLookup, r.totals, r.stats, recordSkipped) + checkpoint := len(unresolved) == 0 + if !checkpoint { + r.deferCandidates(candidates, unresolved) + } + if len(candidates) == 0 && !snapshotHasChanges(snap) { + return nil + } + return commitSnapshot(r.ctx, r.st, r.opts, r.state, candidates, snap, checkpoint, r.stats) +} + +func (r *importRun) deferCandidates(candidates []fileCandidate, unresolved unresolvedMessages) { + r.pending = append(r.pending, candidates...) + mergeUnresolved(r.pendingUnresolved, unresolved) + if r.pendingLookupSize >= 0 { + return + } + r.pendingLookupSize = len(r.channelLookup) + r.pendingRouteSize = len(r.base.routes) +} + +func (r *importRun) retryPending() error { + if len(r.pending) == 0 { + return nil + } + if !r.pendingCanResolve() { + recordUnresolved(r.pendingUnresolved, r.totals, r.stats) + return checkpointScannedCandidates(r.ctx, r.st, r.opts, r.state, r.pending, r.stats) + } + retry := newSnapshotWithContext(r.base) + if err := scanCandidates(r.ctx, r.rootFS, r.opts, r.pending, retry, r.channelLookup, r.stats); err != nil { + return err + } + finalizeSnapshot(retry, r.channelLookup, r.totals, r.stats, true) + if err := commitSnapshot(r.ctx, r.st, r.opts, r.state, r.pending, retry, true, r.stats); err != nil { + return err + } + mergeSnapshotContext(r.base, retry) + return nil +} + +func (r *importRun) pendingCanResolve() bool { + return len(r.channelLookup) > r.pendingLookupSize || len(r.base.routes) > r.pendingRouteSize +}