fix: stabilize wiretap cache checkpointing

This commit is contained in:
Peter Steinberger 2026-05-05 01:22:48 +01:00
parent 68b49c90a5
commit 78fcca8204
No known key found for this signature in database
9 changed files with 1043 additions and 76 deletions

View File

@ -2,6 +2,12 @@
All notable changes to `discrawl` will be documented in this file.
## Unreleased
### Fixes
- `wiretap` now uses a fast default path for Discord Chromium cache imports: it scans cheap context files plus route-bearing HTTP cache entries, checkpoints file progress in batches, and leaves exhaustive historical cache archaeology behind `--full-cache` / `desktop.full_cache`.
## 0.6.5 - 2026-05-03
### Fixes

View File

@ -253,6 +253,7 @@ This is the path for searchable DMs because bot tokens cannot read personal dire
discrawl wiretap
discrawl wiretap --path "$HOME/Library/Application Support/discord"
discrawl wiretap --dry-run
discrawl wiretap --full-cache
discrawl wiretap --watch-every 2m
```
@ -264,7 +265,8 @@ Notes:
- preserves existing local `@me` guilds, channels, messages, and attachments when importing a Git snapshot, so a shared guild mirror refresh does not wipe local wiretap DM search
- drops message payloads whose channel cannot be classified from cached channel metadata or Discord route URLs; dropped rows are counted as `skipped_messages`
- imports what Discord Desktop has cached locally, not complete live DM history
- scans local `.ldb`, `.log`, `.json`, and `.txt` artifacts for Discord message JSON
- scans local `.ldb`, `.log`, `.json`, and `.txt` artifacts for Discord message JSON, plus route-bearing Chromium HTTP cache entries by default
- use `--full-cache` or `desktop.full_cache = true` for exhaustive Chromium cache import when you want slower historical guild-cache archaeology
- does not extract, store, or print Discord auth tokens
- `--max-file-bytes` skips unusually large files; default is 64 MiB
@ -573,6 +575,7 @@ attachment_text = true
[desktop]
path = "~/.config/discord" # macOS default: "~/Library/Application Support/discord"
max_file_bytes = 67108864
full_cache = false
[search]
default_mode = "fts"

View File

@ -465,12 +465,14 @@ Expected flags:
- `--dry-run`
- `--watch-every <duration>`
- `--max-file-bytes <bytes>`
- `--full-cache`
Requirements:
- never use Discord user tokens
- never extract or persist auth tokens from desktop cache
- scan bounded local files only
- default to route-bearing HTTP cache entries; exhaustive Chromium cache scans require explicit full-cache mode
- store sanitized raw metadata, not full arbitrary cache blobs
### `search`

View File

@ -181,6 +181,7 @@ func (r *runtime) runSyncLocked(sources syncSources, opts syncer.SyncOptions) er
stats, err := discorddesktop.Import(r.ctx, r.store, discorddesktop.Options{
Path: r.cfg.Desktop.Path,
MaxFileBytes: r.cfg.Desktop.MaxFileBytes,
FullCache: r.cfg.Desktop.FullCache,
Now: r.now,
})
if err != nil {
@ -262,6 +263,7 @@ func (r *runtime) runWiretap(args []string) error {
fs.SetOutput(io.Discard)
path := fs.String("path", r.cfg.Desktop.Path, "")
maxFileBytes := fs.Int64("max-file-bytes", r.cfg.Desktop.MaxFileBytes, "")
fullCache := fs.Bool("full-cache", r.cfg.Desktop.FullCache, "")
dryRun := fs.Bool("dry-run", false, "")
watchEvery := fs.Duration("watch-every", 0, "")
if err := fs.Parse(args); err != nil {
@ -277,6 +279,7 @@ func (r *runtime) runWiretap(args []string) error {
stats, err := discorddesktop.Import(ctx, r.store, discorddesktop.Options{
Path: *path,
MaxFileBytes: *maxFileBytes,
FullCache: *fullCache,
DryRun: *dryRun,
Now: r.now,
})

View File

@ -142,8 +142,8 @@ func printHuman(w io.Writer, value any) error {
}
}
if v.Wiretap != nil {
if _, err := fmt.Fprintf(w, "wiretap_files=%d\nwiretap_unchanged=%d\nwiretap_messages=%d\nwiretap_dm_messages=%d\nwiretap_dm_channels=%d\nwiretap_guild_messages=%d\nwiretap_skipped_messages=%d\nwiretap_skipped_channels=%d\n",
v.Wiretap.FilesScanned, v.Wiretap.FilesUnchanged, v.Wiretap.Messages, v.Wiretap.DMMessages, v.Wiretap.DMChannels, v.Wiretap.GuildMessages, v.Wiretap.SkippedMessages, v.Wiretap.SkippedChannels); err != nil {
if _, err := fmt.Fprintf(w, "wiretap_visited=%d\nwiretap_files=%d\nwiretap_unchanged=%d\nwiretap_fast_skipped=%d\nwiretap_messages=%d\nwiretap_dm_messages=%d\nwiretap_dm_channels=%d\nwiretap_guild_messages=%d\nwiretap_skipped_messages=%d\nwiretap_skipped_channels=%d\nwiretap_checkpoints=%d\n",
v.Wiretap.FilesVisited, v.Wiretap.FilesScanned, v.Wiretap.FilesUnchanged, v.Wiretap.CacheFilesFastSkipped, v.Wiretap.Messages, v.Wiretap.DMMessages, v.Wiretap.DMChannels, v.Wiretap.GuildMessages, v.Wiretap.SkippedMessages, v.Wiretap.SkippedChannels, v.Wiretap.Checkpoints); err != nil {
return err
}
}
@ -152,8 +152,8 @@ func printHuman(w io.Writer, value any) error {
_, err := fmt.Fprintf(w, "guilds=%d channels=%d threads=%d members=%d messages=%d\n", v.Guilds, v.Channels, v.Threads, v.Members, v.Messages)
return err
case discorddesktop.Stats:
_, err := fmt.Fprintf(w, "path=%s\nfiles=%d\nskipped=%d\nunchanged=%d\nobjects=%d\nguilds=%d\nchannels=%d\nmessages=%d\ndm_messages=%d\ndm_channels=%d\nguild_messages=%d\nskipped_messages=%d\nskipped_channels=%d\ndry_run=%t\n",
v.Path, v.FilesScanned, v.FilesSkipped, v.FilesUnchanged, v.JSONObjects, v.Guilds, v.Channels, v.Messages, v.DMMessages, v.DMChannels, v.GuildMessages, v.SkippedMessages, v.SkippedChannels, v.DryRun)
_, err := fmt.Fprintf(w, "path=%s\nvisited=%d\nfiles=%d\nskipped=%d\nunchanged=%d\nfast_skipped=%d\nobjects=%d\nguilds=%d\nchannels=%d\nmessages=%d\ndm_messages=%d\ndm_channels=%d\nguild_messages=%d\nskipped_messages=%d\nskipped_channels=%d\ncheckpoints=%d\nfull_cache=%t\ndry_run=%t\n",
v.Path, v.FilesVisited, v.FilesScanned, v.FilesSkipped, v.FilesUnchanged, v.CacheFilesFastSkipped, v.JSONObjects, v.Guilds, v.Channels, v.Messages, v.DMMessages, v.DMChannels, v.GuildMessages, v.SkippedMessages, v.SkippedChannels, v.Checkpoints, v.FullCache, v.DryRun)
return err
case store.Status:
_, err := fmt.Fprintf(w, "db=%s\nguilds=%d\nchannels=%d\nthreads=%d\nmessages=%d\nmembers=%d\nembedding_backlog=%d\nlast_sync=%s\nlast_tail_event=%s\n",

View File

@ -44,6 +44,7 @@ type DiscordConfig struct {
type DesktopConfig struct {
Path string `toml:"path"`
MaxFileBytes int64 `toml:"max_file_bytes"`
FullCache bool `toml:"full_cache"`
}
type SyncConfig struct {

View File

@ -27,22 +27,28 @@ const (
DirectMessageGuildName = "Discord Direct Messages"
defaultMaxFileBytes = 64 << 20
maxObjectBytes = 4 << 20
cacheSniffBytes = 1 << 20
checkpointEveryFiles = 256
)
var channelRouteRE = regexp.MustCompile(`/channels/(@me|[0-9]{12,24})/([0-9]{12,24})`)
var apiMessagesRouteRE = regexp.MustCompile(`/api/v[0-9]+/channels/[0-9]{12,24}/messages`)
type Options struct {
Path string
MaxFileBytes int64
DryRun bool
FullCache bool
Now func() time.Time
}
type Stats struct {
Path string `json:"path"`
FilesVisited int `json:"files_visited"`
FilesScanned int `json:"files_scanned"`
FilesSkipped int `json:"files_skipped"`
FilesUnchanged int `json:"files_unchanged"`
CacheFilesFastSkipped int `json:"cache_files_fast_skipped"`
BytesScanned int64 `json:"bytes_scanned"`
JSONObjects int `json:"json_objects"`
Guilds int `json:"guilds"`
@ -53,7 +59,9 @@ type Stats struct {
GuildMessages int `json:"guild_messages"`
SkippedMessages int `json:"skipped_messages"`
SkippedChannels int `json:"skipped_channels"`
Checkpoints int `json:"checkpoints"`
DryRun bool `json:"dry_run,omitempty"`
FullCache bool `json:"full_cache,omitempty"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at"`
}
@ -69,6 +77,7 @@ type snapshot struct {
type fileFingerprint struct {
Size int64 `json:"size"`
ModUnixNS int64 `json:"mod_unix_ns"`
Status string `json:"status,omitempty"`
}
type scanState struct {
@ -77,8 +86,42 @@ type scanState struct {
channels map[string]store.ChannelRecord
}
type fileSource int
const (
fileSourceContext fileSource = iota
fileSourceCacheData
)
type fileCandidate struct {
absPath string
relPath string
relKey string
source fileSource
info fs.FileInfo
fingerprint fileFingerprint
}
type scanTotals struct {
guilds map[string]struct{}
channels map[string]struct{}
messages map[string]struct{}
dmMessages map[string]struct{}
guildMessages map[string]struct{}
dmChannels map[string]struct{}
skippedMessages map[string]struct{}
skippedChannels map[string]struct{}
}
type unresolvedMessages map[string]string
const wiretapFileIndexScope = "wiretap:file_index:v1"
const (
fileStatusImported = "imported"
fileStatusSkipped = "skipped"
)
func DefaultPath() string {
home, _ := os.UserHomeDir()
switch runtime.GOOS {
@ -105,7 +148,8 @@ func Import(ctx context.Context, st *store.Store, opts Options) (Stats, error) {
if err != nil {
return Stats{}, err
}
stats, snap, err := scan(ctx, opts, state)
if opts.FullCache {
stats, snap, err := scanFullCache(ctx, opts, state)
if err != nil {
return stats, err
}
@ -113,17 +157,20 @@ func Import(ctx context.Context, st *store.Store, opts Options) (Stats, error) {
if opts.DryRun {
return stats, nil
}
fullScan := len(state.previous) == 0
if snapshotHasChanges(snap) || fullScan {
if err := writeSnapshot(ctx, st, snap, fullScan); err != nil {
if err := writeSnapshot(ctx, st, snap, len(state.previous) == 0); err != nil {
return stats, err
}
} else if err := st.SetSyncState(ctx, "wiretap:last_import", time.Now().UTC().Format(time.RFC3339Nano)); err != nil {
if err := saveFileIndex(ctx, st, opts, state.current); err != nil {
return stats, err
}
if err := saveFileIndex(ctx, st, state.current); err != nil {
stats.Checkpoints = 1
return stats, nil
}
stats, err := scanAndImport(ctx, st, opts, state)
if err != nil {
return stats, err
}
stats.DryRun = opts.DryRun
return stats, nil
}
@ -136,7 +183,7 @@ func loadScanState(ctx context.Context, st *store.Store, opts Options) (scanStat
if st == nil || opts.DryRun {
return state, nil
}
raw, err := st.GetSyncState(ctx, wiretapFileIndexScope)
raw, err := st.GetSyncState(ctx, fileIndexScope(opts))
if err != nil {
return state, err
}
@ -160,19 +207,107 @@ func loadScanState(ctx context.Context, st *store.Store, opts Options) (scanStat
return state, nil
}
func saveFileIndex(ctx context.Context, st *store.Store, index map[string]fileFingerprint) error {
func fileIndexScope(Options) string {
return wiretapFileIndexScope
}
func saveFileIndex(ctx context.Context, st *store.Store, opts Options, index map[string]fileFingerprint) error {
body, err := json.Marshal(index)
if err != nil {
return err
}
return st.SetSyncState(ctx, wiretapFileIndexScope, string(body))
return st.SetSyncState(ctx, fileIndexScope(opts), string(body))
}
func sameFileFingerprint(a, b fileFingerprint) bool {
return a.Size == b.Size && a.ModUnixNS == b.ModUnixNS
}
func isImportedFingerprint(fingerprint fileFingerprint) bool {
return fingerprint.Status == "" || fingerprint.Status == fileStatusImported
}
func importedFingerprint(fingerprint fileFingerprint) fileFingerprint {
fingerprint.Status = fileStatusImported
return fingerprint
}
func skippedFingerprint(fingerprint fileFingerprint) fileFingerprint {
fingerprint.Status = fileStatusSkipped
return fingerprint
}
func snapshotHasChanges(snap snapshot) bool {
return len(snap.guilds) > 0 || len(snap.channels) > 0 || len(snap.messages) > 0
}
func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot, error) {
func scanAndImport(ctx context.Context, st *store.Store, opts Options, state scanState) (Stats, error) {
now := opts.Now
if now == nil {
now = time.Now
}
root := strings.TrimSpace(opts.Path)
if root == "" {
root = DefaultPath()
}
stats := Stats{Path: root, FullCache: opts.FullCache, StartedAt: now().UTC()}
rootFS, err := os.OpenRoot(root)
if err != nil {
stats.FinishedAt = now().UTC()
return stats, ignoreCacheFileError(err)
}
defer func() { _ = rootFS.Close() }()
contextFiles, cacheFiles, err := discoverCandidates(ctx, root, rootFS, opts, state, &stats)
if err != nil {
stats.FinishedAt = now().UTC()
return stats, err
}
fullScan := len(state.previous) == 0
if fullScan && !opts.DryRun {
if err := st.DeleteGuildData(ctx, "@unknown"); err != nil {
stats.FinishedAt = now().UTC()
return stats, err
}
}
run := newImportRun(ctx, st, opts, state, rootFS, &stats)
if err := run.scanContext(contextFiles); err != nil {
stats.FinishedAt = now().UTC()
return stats, err
}
if err := collectCacheRouteHints(ctx, rootFS, cacheFiles, run.base); err != nil {
stats.FinishedAt = now().UTC()
return stats, err
}
if err := run.scanCacheBatches(cacheFiles); err != nil {
stats.FinishedAt = now().UTC()
return stats, err
}
if err := run.retryPending(); err != nil {
stats.FinishedAt = now().UTC()
return stats, err
}
if !opts.DryRun {
if len(contextFiles) == 0 && len(cacheFiles) == 0 {
if err := st.SetSyncState(ctx, "wiretap:last_import", time.Now().UTC().Format(time.RFC3339Nano)); err != nil {
stats.FinishedAt = now().UTC()
return stats, err
}
if err := saveFileIndex(ctx, st, opts, state.current); err != nil {
stats.FinishedAt = now().UTC()
return stats, err
}
stats.Checkpoints++
}
if err := st.DeleteOrphanChannels(ctx, DirectMessageGuildID); err != nil {
stats.FinishedAt = now().UTC()
return stats, err
}
}
stats.FinishedAt = now().UTC()
return stats, nil
}
func scanFullCache(ctx context.Context, opts Options, state scanState) (Stats, snapshot, error) {
now := opts.Now
if now == nil {
now = time.Now
@ -185,14 +320,8 @@ func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot,
if maxBytes <= 0 {
maxBytes = defaultMaxFileBytes
}
stats := Stats{Path: root, StartedAt: now().UTC()}
snap := snapshot{
guilds: map[string]store.GuildRecord{},
channels: map[string]store.ChannelRecord{},
messages: map[string]store.MessageMutation{},
routes: map[string]string{},
userLabels: map[string]userLabel{},
}
stats := Stats{Path: root, FullCache: true, StartedAt: now().UTC()}
snap := newSnapshot()
rootFS, err := os.OpenRoot(root)
if err != nil {
stats.FinishedAt = now().UTC()
@ -212,6 +341,7 @@ func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot,
}
return nil
}
stats.FilesVisited++
info, err := entry.Info()
if err != nil {
stats.FilesSkipped++
@ -231,8 +361,8 @@ func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot,
Size: info.Size(),
ModUnixNS: info.ModTime().UnixNano(),
}
state.current[relKey] = fingerprint
if previous, ok := state.previous[relKey]; ok && previous == fingerprint {
state.current[relKey] = importedFingerprint(fingerprint)
if previous, ok := state.previous[relKey]; ok && sameFileFingerprint(previous, fingerprint) && isImportedFingerprint(previous) {
stats.FilesUnchanged++
return nil
}
@ -267,15 +397,181 @@ func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot,
}); err != nil {
return stats, snap, err
}
reconcileMessages(snap, state.channels)
totals := newScanTotals()
finalizeSnapshot(snap, state.channels, totals, &stats, true)
stats.FinishedAt = now().UTC()
return stats, snap, nil
}
func discoverCandidates(ctx context.Context, root string, rootFS *os.Root, opts Options, state scanState, stats *Stats) ([]fileCandidate, []fileCandidate, error) {
var contextFiles []fileCandidate
var cacheFiles []fileCandidate
maxBytes := opts.MaxFileBytes
if maxBytes <= 0 {
maxBytes = defaultMaxFileBytes
}
err := filepath.WalkDir(root, func(path string, entry fs.DirEntry, err error) error {
if err != nil {
return ignoreCacheFileError(err)
}
if ctx.Err() != nil {
return ctx.Err()
}
if entry.IsDir() {
if shouldSkipDir(entry.Name()) && path != root {
return filepath.SkipDir
}
return nil
}
stats.FilesVisited++
info, err := entry.Info()
if err != nil {
stats.FilesSkipped++
return ignoreCacheFileError(err)
}
if !isCandidateFile(path) || info.Size() <= 0 || info.Size() > maxBytes {
stats.FilesSkipped++
return nil
}
relPath, err := filepath.Rel(root, path)
if err != nil {
stats.FilesSkipped++
return ignoreCacheFileError(err)
}
relKey := filepath.ToSlash(relPath)
fingerprint := fileFingerprint{
Size: info.Size(),
ModUnixNS: info.ModTime().UnixNano(),
}
candidate := fileCandidate{
absPath: path,
relPath: relPath,
relKey: relKey,
source: sourceForPath(root, path, relPath),
info: info,
fingerprint: fingerprint,
}
if candidate.source == fileSourceCacheData {
if previous, ok := state.previous[relKey]; ok && sameFileFingerprint(previous, fingerprint) {
if !opts.FullCache || isImportedFingerprint(previous) {
state.current[relKey] = previous
stats.FilesUnchanged++
return nil
}
}
if !opts.FullCache {
ok, err := cacheFileHasRouteHint(rootFS, relPath)
if err != nil {
stats.FilesSkipped++
return ignoreCacheFileError(err)
}
if !ok {
state.current[relKey] = skippedFingerprint(fingerprint)
stats.FilesSkipped++
stats.CacheFilesFastSkipped++
return nil
}
}
cacheFiles = append(cacheFiles, candidate)
return nil
}
if previous, ok := state.previous[relKey]; ok && sameFileFingerprint(previous, fingerprint) {
state.current[relKey] = previous
stats.FilesUnchanged++
return nil
}
contextFiles = append(contextFiles, candidate)
return nil
})
return contextFiles, cacheFiles, err
}
func scanCandidates(ctx context.Context, rootFS *os.Root, opts Options, candidates []fileCandidate, snap snapshot, channelLookup map[string]store.ChannelRecord, stats *Stats) error {
maxBytes := opts.MaxFileBytes
if maxBytes <= 0 {
maxBytes = defaultMaxFileBytes
}
for _, candidate := range candidates {
if err := ctx.Err(); err != nil {
return err
}
data, err := rootFS.ReadFile(candidate.relPath)
if err != nil {
stats.FilesSkipped++
if err := ignoreCacheFileError(err); err != nil {
return err
}
continue
}
stats.FilesScanned++
stats.BytesScanned += int64(len(data))
collectChannelRoutes(snap, bytes.ToValidUTF8(data, nil))
objects := extractJSONValues(bytes.ToValidUTF8(data, nil))
for _, payload := range extractGzipPayloads(data, maxBytes) {
if err := ctx.Err(); err != nil {
return err
}
collectChannelRoutes(snap, bytes.ToValidUTF8(payload, nil))
objects = append(objects, extractJSONValues(bytes.ToValidUTF8(payload, nil))...)
}
stats.JSONObjects += len(objects)
for _, raw := range objects {
if err := ctx.Err(); err != nil {
return err
}
var value any
if err := json.Unmarshal(raw, &value); err != nil {
continue
}
collectValue(snap, channelLookup, value, candidate.info.ModTime().UTC())
}
}
return nil
}
func collectCacheRouteHints(ctx context.Context, rootFS *os.Root, candidates []fileCandidate, snap snapshot) error {
for _, candidate := range candidates {
if err := ctx.Err(); err != nil {
return err
}
data, err := readFilePrefix(rootFS, candidate.relPath)
if err != nil {
if err := ignoreCacheFileError(err); err != nil {
return err
}
continue
}
collectChannelRoutes(snap, bytes.ToValidUTF8(data, nil))
}
return nil
}
func newScanTotals() scanTotals {
return scanTotals{
guilds: map[string]struct{}{},
channels: map[string]struct{}{},
messages: map[string]struct{}{},
dmMessages: map[string]struct{}{},
guildMessages: map[string]struct{}{},
dmChannels: map[string]struct{}{},
skippedMessages: map[string]struct{}{},
skippedChannels: map[string]struct{}{},
}
}
func finalizeSnapshot(snap snapshot, channelLookup map[string]store.ChannelRecord, totals scanTotals, stats *Stats, recordSkipped bool) unresolvedMessages {
reconcileMessages(snap, channelLookup)
inferDirectMessageNames(snap)
reconcileMessages(snap, state.channels)
skippedChannels := map[string]struct{}{}
reconcileMessages(snap, channelLookup)
unresolved := unresolvedMessages{}
for id, msg := range snap.messages {
guildID := msg.Record.GuildID
if guildID == "" {
stats.SkippedMessages++
skippedChannels[msg.Record.ChannelID] = struct{}{}
unresolved[id] = msg.Record.ChannelID
if recordSkipped {
totals.skippedMessages[id] = struct{}{}
totals.skippedChannels[msg.Record.ChannelID] = struct{}{}
}
delete(snap.messages, id)
continue
}
@ -283,34 +579,195 @@ func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot,
snap.guilds[guildID] = syntheticGuild(guildID, guildName(guildID))
}
if _, ok := snap.channels[msg.Record.ChannelID]; !ok {
if channel, ok := channelLookup[msg.Record.ChannelID]; ok && channel.GuildID != "" {
snap.channels[msg.Record.ChannelID] = channel
} else {
snap.channels[msg.Record.ChannelID] = syntheticChannel(msg.Record.ChannelID, guildID, msg.Record.ChannelName)
}
}
snap.messages[id] = msg
}
messageChannels := map[string]struct{}{}
dmChannels := map[string]struct{}{}
for _, msg := range snap.messages {
messageChannels[msg.Record.ChannelID] = struct{}{}
totals.messages[msg.Record.ID] = struct{}{}
switch msg.Record.GuildID {
case DirectMessageGuildID:
stats.DMMessages++
dmChannels[msg.Record.ChannelID] = struct{}{}
totals.dmMessages[msg.Record.ID] = struct{}{}
totals.dmChannels[msg.Record.ChannelID] = struct{}{}
default:
stats.GuildMessages++
totals.guildMessages[msg.Record.ID] = struct{}{}
}
}
for id := range snap.channels {
if _, ok := messageChannels[id]; !ok {
delete(snap.channels, id)
for id, channel := range snap.channels {
channelLookup[id] = channel
totals.channels[id] = struct{}{}
}
for id := range snap.guilds {
totals.guilds[id] = struct{}{}
}
stats.DMChannels = len(dmChannels)
stats.SkippedChannels = len(skippedChannels)
stats.Guilds = len(snap.guilds)
stats.Channels = len(snap.channels)
stats.Messages = len(snap.messages)
stats.FinishedAt = now().UTC()
return stats, snap, nil
stats.DMChannels = len(totals.dmChannels)
stats.SkippedChannels = len(totals.skippedChannels)
stats.Guilds = len(totals.guilds)
stats.Channels = len(totals.channels)
stats.Messages = len(totals.messages)
stats.DMMessages = len(totals.dmMessages)
stats.GuildMessages = len(totals.guildMessages)
stats.SkippedMessages = len(totals.skippedMessages)
return unresolved
}
func mergeUnresolved(dst, src unresolvedMessages) {
for messageID, channelID := range src {
dst[messageID] = channelID
}
}
func recordUnresolved(unresolved unresolvedMessages, totals scanTotals, stats *Stats) {
for messageID, channelID := range unresolved {
totals.skippedMessages[messageID] = struct{}{}
totals.skippedChannels[channelID] = struct{}{}
}
stats.SkippedChannels = len(totals.skippedChannels)
stats.SkippedMessages = len(totals.skippedMessages)
}
func commitSnapshot(ctx context.Context, st *store.Store, opts Options, state scanState, candidates []fileCandidate, snap snapshot, checkpoint bool, stats *Stats) error {
if opts.DryRun {
return nil
}
if !checkpoint {
if snapshotHasChanges(snap) {
return writeSnapshot(ctx, st, snapshotWithoutMessageEvents(snap), false)
}
return nil
}
if snapshotHasChanges(snap) {
if err := writeSnapshot(ctx, st, snap, false); err != nil {
return err
}
} else if err := st.SetSyncState(ctx, "wiretap:last_import", time.Now().UTC().Format(time.RFC3339Nano)); err != nil {
return err
}
for _, candidate := range candidates {
state.current[candidate.relKey] = importedFingerprint(candidate.fingerprint)
}
if err := saveFileIndex(ctx, st, opts, state.current); err != nil {
return err
}
stats.Checkpoints++
return nil
}
func checkpointScannedCandidates(ctx context.Context, st *store.Store, opts Options, state scanState, candidates []fileCandidate, stats *Stats) error {
if opts.DryRun {
return nil
}
if err := st.SetSyncState(ctx, "wiretap:last_import", time.Now().UTC().Format(time.RFC3339Nano)); err != nil {
return err
}
for _, candidate := range candidates {
state.current[candidate.relKey] = importedFingerprint(candidate.fingerprint)
}
if err := saveFileIndex(ctx, st, opts, state.current); err != nil {
return err
}
stats.Checkpoints++
return nil
}
func snapshotWithoutMessageEvents(snap snapshot) snapshot {
out := snapshot{
guilds: snap.guilds,
channels: snap.channels,
messages: make(map[string]store.MessageMutation, len(snap.messages)),
routes: snap.routes,
userLabels: snap.userLabels,
}
for id, message := range snap.messages {
message.Options.AppendEvent = false
out.messages[id] = message
}
return out
}
func newSnapshot() snapshot {
return snapshot{
guilds: map[string]store.GuildRecord{},
channels: map[string]store.ChannelRecord{},
messages: map[string]store.MessageMutation{},
routes: map[string]string{},
userLabels: map[string]userLabel{},
}
}
func newSnapshotWithContext(base snapshot) snapshot {
snap := newSnapshot()
for channelID, guildID := range base.routes {
snap.routes[channelID] = guildID
}
for userID, label := range base.userLabels {
snap.userLabels[userID] = label
}
return snap
}
func mergeSnapshotContext(base snapshot, next snapshot) {
for channelID, guildID := range next.routes {
collectChannelRoute(base, channelID, guildID)
}
for userID, label := range next.userLabels {
base.userLabels[userID] = label
}
for channelID, channel := range next.channels {
base.channels[channelID] = channel
}
}
func copyChannelLookup(in map[string]store.ChannelRecord) map[string]store.ChannelRecord {
out := make(map[string]store.ChannelRecord, len(in))
for id, channel := range in {
out[id] = channel
}
return out
}
func sourceForPath(root, path, relPath string) fileSource {
if isRouteFilteredCachePath(root, path, relPath) {
return fileSourceCacheData
}
return fileSourceContext
}
func isRouteFilteredCachePath(root, path, relPath string) bool {
cleanRoot := filepath.ToSlash(root)
cleanPath := filepath.ToSlash(path)
cleanRel := filepath.ToSlash(relPath)
return filepath.Base(cleanRoot) == "Cache_Data" ||
filepath.Base(cleanRoot) == "CacheStorage" ||
strings.Contains(cleanPath, "/Cache/Cache_Data/") ||
strings.Contains(cleanPath, "/Service Worker/CacheStorage/") ||
strings.HasPrefix(cleanRel, "Cache_Data/") ||
strings.HasPrefix(cleanRel, "Service Worker/CacheStorage/")
}
func cacheFileHasRouteHint(rootFS *os.Root, relPath string) (bool, error) {
data, err := readFilePrefix(rootFS, relPath)
if err != nil {
return false, err
}
return channelRouteRE.Match(data) || apiMessagesRouteRE.Match(data), nil
}
func readFilePrefix(rootFS *os.Root, relPath string) ([]byte, error) {
file, err := rootFS.Open(relPath)
if err != nil {
return nil, err
}
defer func() { _ = file.Close() }()
data, err := io.ReadAll(io.LimitReader(file, cacheSniffBytes))
if err != nil {
return nil, err
}
return data, nil
}
func ignoreCacheFileError(error) error {

View File

@ -0,0 +1,382 @@
package discorddesktop
import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/steipete/discrawl/internal/store"
)
func TestImportFastCacheSkipsUnroutedCacheDataUnlessFullCache(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
cachePath := filepath.Join(dir, "Cache", "Cache_Data")
require.NoError(t, os.MkdirAll(cachePath, 0o755))
require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_0"), []byte(`
{"id":"111111111111111121","guild_id":"999999999999999996","type":0,"name":"slow-cache"}
{"id":"333333333333333346","channel_id":"111111111111111121","content":"unrouted historical cache","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}}
`), 0o600))
fastStore, err := store.Open(ctx, filepath.Join(dir, "fast.db"))
require.NoError(t, err)
defer func() { _ = fastStore.Close() }()
stats, err := Import(ctx, fastStore, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 0, stats.FilesScanned)
require.Equal(t, 1, stats.CacheFilesFastSkipped)
require.Equal(t, 0, stats.Messages)
results, err := fastStore.SearchMessages(ctx, store.SearchOptions{Query: "unrouted historical", Limit: 10})
require.NoError(t, err)
require.Empty(t, results)
stats, err = Import(ctx, fastStore, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 0, stats.FilesScanned)
require.Equal(t, 0, stats.CacheFilesFastSkipped)
require.Equal(t, 1, stats.FilesUnchanged)
stats, err = Import(ctx, fastStore, Options{Path: dir, FullCache: true})
require.NoError(t, err)
require.Equal(t, 1, stats.FilesScanned)
require.Equal(t, 1, stats.Messages)
fullStore, err := store.Open(ctx, filepath.Join(dir, "full.db"))
require.NoError(t, err)
defer func() { _ = fullStore.Close() }()
stats, err = Import(ctx, fullStore, Options{Path: dir, FullCache: true})
require.NoError(t, err)
require.Equal(t, 1, stats.FilesScanned)
require.Equal(t, 0, stats.CacheFilesFastSkipped)
require.Equal(t, 1, stats.Messages)
results, err = fullStore.SearchMessages(ctx, store.SearchOptions{Query: "unrouted historical", Limit: 10})
require.NoError(t, err)
require.Len(t, results, 1)
require.Equal(t, "slow-cache", results[0].ChannelName)
}
func TestImportCheckpointsCacheBatches(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
cachePath := filepath.Join(dir, "Cache", "Cache_Data")
require.NoError(t, os.MkdirAll(cachePath, 0o755))
for i := range checkpointEveryFiles + 1 {
channelID := "111111111111111121"
messageID := 333333333333333346 + i
body := []byte(fmt.Sprintf(`https://discord.com/channels/999999999999999996/%s
{"id":"%d","channel_id":"%s","content":"checkpoint cache %d","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}}
`, channelID, messageID, channelID, i))
require.NoError(t, os.WriteFile(filepath.Join(cachePath, fmt.Sprintf("entry_%03d", i)), body, 0o600))
}
st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db"))
require.NoError(t, err)
defer func() { _ = st.Close() }()
stats, err := Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, checkpointEveryFiles+1, stats.FilesScanned)
require.Equal(t, checkpointEveryFiles+1, stats.Messages)
require.GreaterOrEqual(t, stats.Checkpoints, 2)
stats, err = Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 0, stats.FilesScanned)
require.Equal(t, checkpointEveryFiles+1, stats.FilesUnchanged)
}
func TestImportUsesLaterCacheMetadataBeforeCheckpointingEarlierBatch(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
cachePath := filepath.Join(dir, "Cache", "Cache_Data")
require.NoError(t, os.MkdirAll(cachePath, 0o755))
channelID := "111111111111111121"
guildID := "999999999999999996"
require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_000"), []byte(fmt.Sprintf(`https://discord.com/api/v9/channels/%s/messages?limit=50
{"id":"333333333333333346","channel_id":"%s","content":"needs later channel metadata","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}}
`, channelID, channelID)), 0o600))
for i := 1; i < checkpointEveryFiles; i++ {
require.NoError(t, os.WriteFile(filepath.Join(cachePath, fmt.Sprintf("entry_%03d", i)), []byte(fmt.Sprintf(
"https://discord.com/api/v9/channels/%s/messages?limit=50\n",
channelID,
)), 0o600))
}
require.NoError(t, os.WriteFile(filepath.Join(cachePath, fmt.Sprintf("entry_%03d", checkpointEveryFiles)), []byte(fmt.Sprintf(`https://discord.com/api/v9/channels/%s/messages?limit=50
{"id":"%s","guild_id":"%s","type":0,"name":"later-metadata"}
`, channelID, channelID, guildID)), 0o600))
st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db"))
require.NoError(t, err)
defer func() { _ = st.Close() }()
stats, err := Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, checkpointEveryFiles+1+checkpointEveryFiles, stats.FilesScanned)
require.Equal(t, 1, stats.Messages)
require.GreaterOrEqual(t, stats.Checkpoints, 2)
results, err := st.SearchMessages(ctx, store.SearchOptions{Query: "needs later channel metadata", Limit: 10})
require.NoError(t, err)
require.Len(t, results, 1)
require.Equal(t, guildID, results[0].GuildID)
require.Equal(t, "later-metadata", results[0].ChannelName)
requireMessageCount(t, ctx, st, "message_events", 1)
stats, err = Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 0, stats.FilesScanned)
require.Equal(t, checkpointEveryFiles+1, stats.FilesUnchanged)
requireMessageCount(t, ctx, st, "message_events", 1)
}
func TestImportCheckpointsPartiallyResolvedRetryBatch(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
cachePath := filepath.Join(dir, "Cache", "Cache_Data")
require.NoError(t, os.MkdirAll(cachePath, 0o755))
resolvedChannelID := "111111111111111121"
unresolvedChannelID := "111111111111111122"
guildID := "999999999999999996"
require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_000"), []byte(fmt.Sprintf(`https://discord.com/api/v9/channels/%s/messages?limit=50
https://discord.com/api/v9/channels/%s/messages?limit=50
{"id":"333333333333333346","channel_id":"%s","content":"partially resolved retry message","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}}
{"id":"333333333333333347","channel_id":"%s","content":"still unresolved retry message","timestamp":"2026-04-23T18:20:44Z","author":{"id":"222222222222222232","username":"alice"}}
`, resolvedChannelID, unresolvedChannelID, resolvedChannelID, unresolvedChannelID)), 0o600))
for i := 1; i < checkpointEveryFiles; i++ {
require.NoError(t, os.WriteFile(filepath.Join(cachePath, fmt.Sprintf("entry_%03d", i)), []byte(fmt.Sprintf(
"https://discord.com/api/v9/channels/%s/messages?limit=50\n",
resolvedChannelID,
)), 0o600))
}
require.NoError(t, os.WriteFile(filepath.Join(cachePath, fmt.Sprintf("entry_%03d", checkpointEveryFiles)), []byte(fmt.Sprintf(`https://discord.com/api/v9/channels/%s/messages?limit=50
{"id":"%s","guild_id":"%s","type":0,"name":"partially-resolved"}
`, resolvedChannelID, resolvedChannelID, guildID)), 0o600))
st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db"))
require.NoError(t, err)
defer func() { _ = st.Close() }()
stats, err := Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, checkpointEveryFiles+1+checkpointEveryFiles, stats.FilesScanned)
require.Equal(t, 1, stats.Messages)
require.Equal(t, 1, stats.SkippedMessages)
require.GreaterOrEqual(t, stats.Checkpoints, 2)
results, err := st.SearchMessages(ctx, store.SearchOptions{Query: "partially resolved retry", Limit: 10})
require.NoError(t, err)
require.Len(t, results, 1)
require.Equal(t, "partially-resolved", results[0].ChannelName)
results, err = st.SearchMessages(ctx, store.SearchOptions{Query: "still unresolved retry", Limit: 10})
require.NoError(t, err)
require.Empty(t, results)
requireMessageCount(t, ctx, st, "message_events", 1)
stats, err = Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 0, stats.FilesScanned)
require.Equal(t, checkpointEveryFiles+1, stats.FilesUnchanged)
requireMessageCount(t, ctx, st, "message_events", 1)
}
func TestImportCheckpointsUnresolvableRouteBearingCacheMisses(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
cachePath := filepath.Join(dir, "Cache", "Cache_Data")
require.NoError(t, os.MkdirAll(cachePath, 0o755))
channelID := "111111111111111121"
require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_000"), []byte(fmt.Sprintf(`https://discord.com/api/v9/channels/%s/messages?limit=50
{"id":"333333333333333346","channel_id":"%s","content":"permanent unresolved cache miss","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}}
`, channelID, channelID)), 0o600))
st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db"))
require.NoError(t, err)
defer func() { _ = st.Close() }()
stats, err := Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 1, stats.FilesScanned)
require.Equal(t, 1, stats.SkippedMessages)
require.Equal(t, 1, stats.Checkpoints)
results, err := st.SearchMessages(ctx, store.SearchOptions{Query: "permanent unresolved", Limit: 10})
require.NoError(t, err)
require.Empty(t, results)
stats, err = Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 0, stats.FilesScanned)
require.Equal(t, 1, stats.FilesUnchanged)
}
func TestImportDoesNotAppendEventsForSkippedMixedBatch(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
cachePath := filepath.Join(dir, "Cache", "Cache_Data")
require.NoError(t, os.MkdirAll(cachePath, 0o755))
guildID := "999999999999999996"
resolvedChannelID := "111111111111111121"
unresolvedChannelID := "111111111111111122"
require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_000"), []byte(fmt.Sprintf(`https://discord.com/channels/%s/%s
https://discord.com/api/v9/channels/%s/messages?limit=50
{"id":"333333333333333346","channel_id":"%s","content":"mixed resolved message","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}}
{"id":"333333333333333347","channel_id":"%s","content":"mixed unresolved message","timestamp":"2026-04-23T18:20:44Z","author":{"id":"222222222222222232","username":"alice"}}
`, guildID, resolvedChannelID, unresolvedChannelID, resolvedChannelID, unresolvedChannelID)), 0o600))
st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db"))
require.NoError(t, err)
defer func() { _ = st.Close() }()
stats, err := Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 1, stats.FilesScanned)
require.Equal(t, 1, stats.Checkpoints)
requireMessageCount(t, ctx, st, "message_events", 0)
results, err := st.SearchMessages(ctx, store.SearchOptions{Query: "mixed resolved", Limit: 10})
require.NoError(t, err)
require.Len(t, results, 1)
results, err = st.SearchMessages(ctx, store.SearchOptions{Query: "mixed unresolved", Limit: 10})
require.NoError(t, err)
require.Empty(t, results)
stats, err = Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 0, stats.FilesScanned)
require.Equal(t, 1, stats.FilesUnchanged)
requireMessageCount(t, ctx, st, "message_events", 0)
}
func TestImportDoesNotDuplicateEventsWhenSwitchingFullCacheModes(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
cachePath := filepath.Join(dir, "Cache", "Cache_Data")
require.NoError(t, os.MkdirAll(cachePath, 0o755))
channelID := "111111111111111121"
guildID := "999999999999999996"
require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_000"), []byte(fmt.Sprintf(`https://discord.com/channels/%s/%s
{"id":"%s","guild_id":"%s","type":0,"name":"mode-switch"}
{"id":"333333333333333346","channel_id":"%s","content":"mode switch event once","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}}
`, guildID, channelID, channelID, guildID, channelID)), 0o600))
t.Run("full then default", func(t *testing.T) {
st, err := store.Open(ctx, filepath.Join(dir, "full-first.db"))
require.NoError(t, err)
defer func() { _ = st.Close() }()
stats, err := Import(ctx, st, Options{Path: dir, FullCache: true})
require.NoError(t, err)
require.Equal(t, 1, stats.FilesScanned)
require.Equal(t, 1, stats.Messages)
requireMessageCount(t, ctx, st, "message_events", 1)
stats, err = Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 0, stats.FilesScanned)
require.Equal(t, 1, stats.FilesUnchanged)
requireMessageCount(t, ctx, st, "message_events", 1)
})
t.Run("default then full", func(t *testing.T) {
st, err := store.Open(ctx, filepath.Join(dir, "default-first.db"))
require.NoError(t, err)
defer func() { _ = st.Close() }()
stats, err := Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 1, stats.FilesScanned)
require.Equal(t, 1, stats.Messages)
requireMessageCount(t, ctx, st, "message_events", 1)
stats, err = Import(ctx, st, Options{Path: dir, FullCache: true})
require.NoError(t, err)
require.Equal(t, 0, stats.FilesScanned)
require.Equal(t, 1, stats.FilesUnchanged)
requireMessageCount(t, ctx, st, "message_events", 1)
})
}
func TestImportFastCachePreservesKnownChannelMetadataAcrossBatches(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
leveldbPath := filepath.Join(dir, "Local Storage", "leveldb")
cachePath := filepath.Join(dir, "Cache", "Cache_Data")
require.NoError(t, os.MkdirAll(leveldbPath, 0o755))
require.NoError(t, os.MkdirAll(cachePath, 0o755))
channelID := "111111111111111121"
guildID := "999999999999999996"
require.NoError(t, os.WriteFile(filepath.Join(leveldbPath, "000001.log"), []byte(fmt.Sprintf(
`{"id":"%s","guild_id":"%s","type":11,"name":"known-thread","thread_metadata":{"archived":false}}`,
channelID,
guildID,
)), 0o600))
require.NoError(t, os.WriteFile(filepath.Join(cachePath, "entry_0"), []byte(fmt.Sprintf(`https://discord.com/channels/%s/%s
{"id":"333333333333333346","channel_id":"%s","content":"thread metadata cache","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}}
`, guildID, channelID, channelID)), 0o600))
st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db"))
require.NoError(t, err)
defer func() { _ = st.Close() }()
stats, err := Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 1, stats.Messages)
channels, err := st.Channels(ctx, guildID)
require.NoError(t, err)
require.Len(t, channels, 1)
require.Equal(t, "known-thread", channels[0].Name)
require.Equal(t, "thread_public", channels[0].Kind)
_, rows, err := st.ReadOnlyQuery(ctx, "select raw_json from channels where id = '111111111111111121'")
require.NoError(t, err)
require.Len(t, rows, 1)
require.Contains(t, rows[0][0], `"type":11`)
}
func TestImportFastCacheRouteFiltersServiceWorkerCacheStorage(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
cachePath := filepath.Join(dir, "Service Worker", "CacheStorage", "cache-id")
require.NoError(t, os.MkdirAll(cachePath, 0o755))
require.NoError(t, os.WriteFile(filepath.Join(cachePath, "unrouted"), []byte(`
{"id":"111111111111111121","guild_id":"999999999999999996","type":0,"name":"service-worker-cache"}
{"id":"333333333333333346","channel_id":"111111111111111121","content":"service worker historical cache","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}}
`), 0o600))
st, err := store.Open(ctx, filepath.Join(dir, "discrawl.db"))
require.NoError(t, err)
defer func() { _ = st.Close() }()
stats, err := Import(ctx, st, Options{Path: dir})
require.NoError(t, err)
require.Equal(t, 0, stats.FilesScanned)
require.Equal(t, 1, stats.CacheFilesFastSkipped)
results, err := st.SearchMessages(ctx, store.SearchOptions{Query: "service worker historical", Limit: 10})
require.NoError(t, err)
require.Empty(t, results)
}
func requireMessageCount(t *testing.T, ctx context.Context, st *store.Store, table string, expected int) {
t.Helper()
_, rows, err := st.ReadOnlyQuery(ctx, fmt.Sprintf("select count(*) from %s", table))
require.NoError(t, err)
require.Len(t, rows, 1)
require.Len(t, rows[0], 1)
require.Equal(t, fmt.Sprint(expected), rows[0][0])
}

View File

@ -0,0 +1,113 @@
package discorddesktop
import (
"context"
"os"
"github.com/steipete/discrawl/internal/store"
)
type importRun struct {
ctx context.Context
st *store.Store
opts Options
state scanState
rootFS *os.Root
channelLookup map[string]store.ChannelRecord
totals scanTotals
stats *Stats
base snapshot
pending []fileCandidate
pendingUnresolved unresolvedMessages
pendingLookupSize int
pendingRouteSize int
}
func newImportRun(ctx context.Context, st *store.Store, opts Options, state scanState, rootFS *os.Root, stats *Stats) *importRun {
return &importRun{
ctx: ctx,
st: st,
opts: opts,
state: state,
rootFS: rootFS,
channelLookup: copyChannelLookup(state.channels),
totals: newScanTotals(),
stats: stats,
base: newSnapshot(),
pendingUnresolved: unresolvedMessages{},
pendingLookupSize: -1,
pendingRouteSize: -1,
}
}
func (r *importRun) scanContext(candidates []fileCandidate) error {
if err := scanCandidates(r.ctx, r.rootFS, r.opts, candidates, r.base, r.channelLookup, r.stats); err != nil {
return err
}
return r.finalizeAndCommit(candidates, r.base, false)
}
func (r *importRun) scanCacheBatches(candidates []fileCandidate) error {
for start := 0; start < len(candidates); start += checkpointEveryFiles {
end := start + checkpointEveryFiles
if end > len(candidates) {
end = len(candidates)
}
batchCandidates := candidates[start:end]
batch := newSnapshotWithContext(r.base)
if err := scanCandidates(r.ctx, r.rootFS, r.opts, batchCandidates, batch, r.channelLookup, r.stats); err != nil {
return err
}
if err := r.finalizeAndCommit(batchCandidates, batch, false); err != nil {
return err
}
mergeSnapshotContext(r.base, batch)
}
return nil
}
func (r *importRun) finalizeAndCommit(candidates []fileCandidate, snap snapshot, recordSkipped bool) error {
unresolved := finalizeSnapshot(snap, r.channelLookup, r.totals, r.stats, recordSkipped)
checkpoint := len(unresolved) == 0
if !checkpoint {
r.deferCandidates(candidates, unresolved)
}
if len(candidates) == 0 && !snapshotHasChanges(snap) {
return nil
}
return commitSnapshot(r.ctx, r.st, r.opts, r.state, candidates, snap, checkpoint, r.stats)
}
func (r *importRun) deferCandidates(candidates []fileCandidate, unresolved unresolvedMessages) {
r.pending = append(r.pending, candidates...)
mergeUnresolved(r.pendingUnresolved, unresolved)
if r.pendingLookupSize >= 0 {
return
}
r.pendingLookupSize = len(r.channelLookup)
r.pendingRouteSize = len(r.base.routes)
}
func (r *importRun) retryPending() error {
if len(r.pending) == 0 {
return nil
}
if !r.pendingCanResolve() {
recordUnresolved(r.pendingUnresolved, r.totals, r.stats)
return checkpointScannedCandidates(r.ctx, r.st, r.opts, r.state, r.pending, r.stats)
}
retry := newSnapshotWithContext(r.base)
if err := scanCandidates(r.ctx, r.rootFS, r.opts, r.pending, retry, r.channelLookup, r.stats); err != nil {
return err
}
finalizeSnapshot(retry, r.channelLookup, r.totals, r.stats, true)
if err := commitSnapshot(r.ctx, r.st, r.opts, r.state, r.pending, retry, true, r.stats); err != nil {
return err
}
mergeSnapshotContext(r.base, retry)
return nil
}
func (r *importRun) pendingCanResolve() bool {
return len(r.channelLookup) > r.pendingLookupSize || len(r.base.routes) > r.pendingRouteSize
}