perf: skip unchanged wiretap cache files
This commit is contained in:
parent
5a0ca81644
commit
a2ed8c6e8b
@ -6,6 +6,7 @@ All notable changes to `discrawl` will be documented in this file.
|
||||
|
||||
### Fixes
|
||||
|
||||
- Repeated `sync --source wiretap` runs now skip unchanged Discord Desktop cache files and report unchanged file counts, making steady-state local-cache refreshes much faster.
|
||||
- `sync --full --skip-members` now also skips member crawls when resuming incomplete stored channels, so backfills do not unexpectedly refresh the full guild member list.
|
||||
|
||||
### Maintenance
|
||||
|
||||
@ -121,8 +121,8 @@ func printHuman(w io.Writer, value any) error {
|
||||
}
|
||||
}
|
||||
if v.Wiretap != nil {
|
||||
if _, err := fmt.Fprintf(w, "wiretap_messages=%d\nwiretap_dm_messages=%d\nwiretap_dm_channels=%d\nwiretap_guild_messages=%d\nwiretap_skipped_messages=%d\nwiretap_skipped_channels=%d\n",
|
||||
v.Wiretap.Messages, v.Wiretap.DMMessages, v.Wiretap.DMChannels, v.Wiretap.GuildMessages, v.Wiretap.SkippedMessages, v.Wiretap.SkippedChannels); err != nil {
|
||||
if _, err := fmt.Fprintf(w, "wiretap_files=%d\nwiretap_unchanged=%d\nwiretap_messages=%d\nwiretap_dm_messages=%d\nwiretap_dm_channels=%d\nwiretap_guild_messages=%d\nwiretap_skipped_messages=%d\nwiretap_skipped_channels=%d\n",
|
||||
v.Wiretap.FilesScanned, v.Wiretap.FilesUnchanged, v.Wiretap.Messages, v.Wiretap.DMMessages, v.Wiretap.DMChannels, v.Wiretap.GuildMessages, v.Wiretap.SkippedMessages, v.Wiretap.SkippedChannels); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -131,8 +131,8 @@ func printHuman(w io.Writer, value any) error {
|
||||
_, err := fmt.Fprintf(w, "guilds=%d channels=%d threads=%d members=%d messages=%d\n", v.Guilds, v.Channels, v.Threads, v.Members, v.Messages)
|
||||
return err
|
||||
case discorddesktop.Stats:
|
||||
_, err := fmt.Fprintf(w, "path=%s\nfiles=%d\nskipped=%d\nobjects=%d\nguilds=%d\nchannels=%d\nmessages=%d\ndm_messages=%d\ndm_channels=%d\nguild_messages=%d\nskipped_messages=%d\nskipped_channels=%d\ndry_run=%t\n",
|
||||
v.Path, v.FilesScanned, v.FilesSkipped, v.JSONObjects, v.Guilds, v.Channels, v.Messages, v.DMMessages, v.DMChannels, v.GuildMessages, v.SkippedMessages, v.SkippedChannels, v.DryRun)
|
||||
_, err := fmt.Fprintf(w, "path=%s\nfiles=%d\nskipped=%d\nunchanged=%d\nobjects=%d\nguilds=%d\nchannels=%d\nmessages=%d\ndm_messages=%d\ndm_channels=%d\nguild_messages=%d\nskipped_messages=%d\nskipped_channels=%d\ndry_run=%t\n",
|
||||
v.Path, v.FilesScanned, v.FilesSkipped, v.FilesUnchanged, v.JSONObjects, v.Guilds, v.Channels, v.Messages, v.DMMessages, v.DMChannels, v.GuildMessages, v.SkippedMessages, v.SkippedChannels, v.DryRun)
|
||||
return err
|
||||
case store.Status:
|
||||
_, err := fmt.Fprintf(w, "db=%s\nguilds=%d\nchannels=%d\nthreads=%d\nmessages=%d\nmembers=%d\nembedding_backlog=%d\nlast_sync=%s\nlast_tail_event=%s\n",
|
||||
|
||||
@ -42,6 +42,7 @@ type Stats struct {
|
||||
Path string `json:"path"`
|
||||
FilesScanned int `json:"files_scanned"`
|
||||
FilesSkipped int `json:"files_skipped"`
|
||||
FilesUnchanged int `json:"files_unchanged"`
|
||||
BytesScanned int64 `json:"bytes_scanned"`
|
||||
JSONObjects int `json:"json_objects"`
|
||||
Guilds int `json:"guilds"`
|
||||
@ -65,6 +66,19 @@ type snapshot struct {
|
||||
userLabels map[string]userLabel
|
||||
}
|
||||
|
||||
type fileFingerprint struct {
|
||||
Size int64 `json:"size"`
|
||||
ModUnixNS int64 `json:"mod_unix_ns"`
|
||||
}
|
||||
|
||||
type scanState struct {
|
||||
previous map[string]fileFingerprint
|
||||
current map[string]fileFingerprint
|
||||
channels map[string]store.ChannelRecord
|
||||
}
|
||||
|
||||
const wiretapFileIndexScope = "wiretap:file_index:v1"
|
||||
|
||||
func DefaultPath() string {
|
||||
home, _ := os.UserHomeDir()
|
||||
switch runtime.GOOS {
|
||||
@ -87,7 +101,11 @@ func Import(ctx context.Context, st *store.Store, opts Options) (Stats, error) {
|
||||
if st == nil && !opts.DryRun {
|
||||
return Stats{}, errors.New("store is required")
|
||||
}
|
||||
stats, snap, err := scan(ctx, opts)
|
||||
state, err := loadScanState(ctx, st, opts)
|
||||
if err != nil {
|
||||
return Stats{}, err
|
||||
}
|
||||
stats, snap, err := scan(ctx, opts, state)
|
||||
if err != nil {
|
||||
return stats, err
|
||||
}
|
||||
@ -95,13 +113,66 @@ func Import(ctx context.Context, st *store.Store, opts Options) (Stats, error) {
|
||||
if opts.DryRun {
|
||||
return stats, nil
|
||||
}
|
||||
if err := writeSnapshot(ctx, st, snap); err != nil {
|
||||
fullScan := len(state.previous) == 0
|
||||
if snapshotHasChanges(snap) || fullScan {
|
||||
if err := writeSnapshot(ctx, st, snap, fullScan); err != nil {
|
||||
return stats, err
|
||||
}
|
||||
} else if err := st.SetSyncState(ctx, "wiretap:last_import", time.Now().UTC().Format(time.RFC3339Nano)); err != nil {
|
||||
return stats, err
|
||||
}
|
||||
if err := saveFileIndex(ctx, st, state.current); err != nil {
|
||||
return stats, err
|
||||
}
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func scan(ctx context.Context, opts Options) (Stats, snapshot, error) {
|
||||
func loadScanState(ctx context.Context, st *store.Store, opts Options) (scanState, error) {
|
||||
state := scanState{
|
||||
previous: map[string]fileFingerprint{},
|
||||
current: map[string]fileFingerprint{},
|
||||
channels: map[string]store.ChannelRecord{},
|
||||
}
|
||||
if st == nil || opts.DryRun {
|
||||
return state, nil
|
||||
}
|
||||
raw, err := st.GetSyncState(ctx, wiretapFileIndexScope)
|
||||
if err != nil {
|
||||
return state, err
|
||||
}
|
||||
if strings.TrimSpace(raw) != "" {
|
||||
if err := json.Unmarshal([]byte(raw), &state.previous); err != nil {
|
||||
state.previous = map[string]fileFingerprint{}
|
||||
}
|
||||
}
|
||||
channels, err := st.Channels(ctx, "")
|
||||
if err != nil {
|
||||
return state, err
|
||||
}
|
||||
for _, channel := range channels {
|
||||
state.channels[channel.ID] = store.ChannelRecord{
|
||||
ID: channel.ID,
|
||||
GuildID: channel.GuildID,
|
||||
Kind: channel.Kind,
|
||||
Name: channel.Name,
|
||||
}
|
||||
}
|
||||
return state, nil
|
||||
}
|
||||
|
||||
func saveFileIndex(ctx context.Context, st *store.Store, index map[string]fileFingerprint) error {
|
||||
body, err := json.Marshal(index)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return st.SetSyncState(ctx, wiretapFileIndexScope, string(body))
|
||||
}
|
||||
|
||||
func snapshotHasChanges(snap snapshot) bool {
|
||||
return len(snap.guilds) > 0 || len(snap.channels) > 0 || len(snap.messages) > 0
|
||||
}
|
||||
|
||||
func scan(ctx context.Context, opts Options, state scanState) (Stats, snapshot, error) {
|
||||
now := opts.Now
|
||||
if now == nil {
|
||||
now = time.Now
|
||||
@ -155,6 +226,16 @@ func scan(ctx context.Context, opts Options) (Stats, snapshot, error) {
|
||||
stats.FilesSkipped++
|
||||
return ignoreCacheFileError(err)
|
||||
}
|
||||
relKey := filepath.ToSlash(relPath)
|
||||
fingerprint := fileFingerprint{
|
||||
Size: info.Size(),
|
||||
ModUnixNS: info.ModTime().UnixNano(),
|
||||
}
|
||||
state.current[relKey] = fingerprint
|
||||
if previous, ok := state.previous[relKey]; ok && previous == fingerprint {
|
||||
stats.FilesUnchanged++
|
||||
return nil
|
||||
}
|
||||
data, err := rootFS.ReadFile(relPath)
|
||||
if err != nil {
|
||||
stats.FilesSkipped++
|
||||
@ -174,15 +255,15 @@ func scan(ctx context.Context, opts Options) (Stats, snapshot, error) {
|
||||
if err := json.Unmarshal(raw, &value); err != nil {
|
||||
continue
|
||||
}
|
||||
collectValue(snap, value, info.ModTime().UTC())
|
||||
collectValue(snap, state.channels, value, info.ModTime().UTC())
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return stats, snap, err
|
||||
}
|
||||
reconcileMessages(snap)
|
||||
reconcileMessages(snap, state.channels)
|
||||
inferDirectMessageNames(snap)
|
||||
reconcileMessages(snap)
|
||||
reconcileMessages(snap, state.channels)
|
||||
skippedChannels := map[string]struct{}{}
|
||||
for id, msg := range snap.messages {
|
||||
guildID := msg.Record.GuildID
|
||||
@ -230,9 +311,11 @@ func ignoreCacheFileError(error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeSnapshot(ctx context.Context, st *store.Store, snap snapshot) error {
|
||||
if err := st.DeleteGuildData(ctx, "@unknown"); err != nil {
|
||||
return err
|
||||
func writeSnapshot(ctx context.Context, st *store.Store, snap snapshot, prune bool) error {
|
||||
if prune {
|
||||
if err := st.DeleteGuildData(ctx, "@unknown"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
guilds := mapValues(snap.guilds)
|
||||
sort.Slice(guilds, func(i, j int) bool { return guilds[i].ID < guilds[j].ID })
|
||||
@ -253,33 +336,36 @@ func writeSnapshot(ctx context.Context, st *store.Store, snap snapshot) error {
|
||||
if err := st.UpsertMessages(ctx, messages); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := st.DeleteOrphanChannels(ctx, DirectMessageGuildID); err != nil {
|
||||
return err
|
||||
if prune {
|
||||
if err := st.DeleteOrphanChannels(ctx, DirectMessageGuildID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return st.SetSyncState(ctx, "wiretap:last_import", time.Now().UTC().Format(time.RFC3339Nano))
|
||||
}
|
||||
|
||||
func collectValue(snap snapshot, value any, fallbackTime time.Time) {
|
||||
func collectValue(snap snapshot, channelLookup map[string]store.ChannelRecord, value any, fallbackTime time.Time) {
|
||||
switch typed := value.(type) {
|
||||
case map[string]any:
|
||||
collectUserLabel(snap, typed)
|
||||
if channel, ok := parseChannel(typed); ok {
|
||||
snap.channels[channel.ID] = channel
|
||||
channelLookup[channel.ID] = channel
|
||||
if channel.GuildID == DirectMessageGuildID {
|
||||
if _, ok := snap.guilds[channel.GuildID]; !ok {
|
||||
snap.guilds[channel.GuildID] = syntheticGuild(channel.GuildID, guildName(channel.GuildID))
|
||||
}
|
||||
}
|
||||
}
|
||||
if message, ok := parseMessage(typed, fallbackTime, snap.channels); ok {
|
||||
if message, ok := parseMessage(typed, fallbackTime, channelLookup); ok {
|
||||
snap.messages[message.Record.ID] = message
|
||||
}
|
||||
for _, child := range typed {
|
||||
collectValue(snap, child, fallbackTime)
|
||||
collectValue(snap, channelLookup, child, fallbackTime)
|
||||
}
|
||||
case []any:
|
||||
for _, child := range typed {
|
||||
collectValue(snap, child, fallbackTime)
|
||||
collectValue(snap, channelLookup, child, fallbackTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -415,15 +501,16 @@ func parseMessage(raw map[string]any, fallbackTime time.Time, channels map[strin
|
||||
}, true
|
||||
}
|
||||
|
||||
func reconcileMessages(snap snapshot) {
|
||||
func reconcileMessages(snap snapshot, channelLookup map[string]store.ChannelRecord) {
|
||||
for id, msg := range snap.messages {
|
||||
channel, ok := snap.channels[msg.Record.ChannelID]
|
||||
channel, ok := channelLookup[msg.Record.ChannelID]
|
||||
if !ok {
|
||||
if guildID := snap.routes[msg.Record.ChannelID]; guildID != "" {
|
||||
msg.Record.GuildID = guildID
|
||||
if guildID == DirectMessageGuildID {
|
||||
channel = syntheticChannel(msg.Record.ChannelID, guildID, "")
|
||||
snap.channels[msg.Record.ChannelID] = channel
|
||||
channelLookup[msg.Record.ChannelID] = channel
|
||||
ok = true
|
||||
}
|
||||
}
|
||||
|
||||
@ -115,6 +115,48 @@ binary-ish {"t":"MESSAGE_CREATE","token":"do-not-store","d":{"id":"3333333333333
|
||||
require.NotContains(t, rows[0][0], "do-not-store")
|
||||
}
|
||||
|
||||
func TestImportSkipsUnchangedDesktopCacheFiles(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dir := t.TempDir()
|
||||
cachePath := filepath.Join(dir, "Local Storage", "leveldb")
|
||||
require.NoError(t, os.MkdirAll(cachePath, 0o755))
|
||||
channelPath := filepath.Join(cachePath, "000001.log")
|
||||
messagePath := filepath.Join(cachePath, "000002.log")
|
||||
require.NoError(t, os.WriteFile(channelPath, []byte(`{"id":"111111111111111121","guild_id":"999999999999999996","type":0,"name":"wiretap-fast"}`), 0o600))
|
||||
require.NoError(t, os.WriteFile(messagePath, []byte(`{"id":"333333333333333346","channel_id":"111111111111111121","content":"first incremental message","timestamp":"2026-04-23T18:20:43Z","author":{"id":"222222222222222232","username":"alice"}}`), 0o600))
|
||||
|
||||
dbPath := filepath.Join(dir, "discrawl.db")
|
||||
st, err := store.Open(ctx, dbPath)
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = st.Close() }()
|
||||
|
||||
stats, err := Import(ctx, st, Options{Path: dir})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, stats.FilesScanned)
|
||||
require.Equal(t, 1, stats.Messages)
|
||||
|
||||
stats, err = Import(ctx, st, Options{Path: dir})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, stats.FilesScanned)
|
||||
require.Equal(t, 2, stats.FilesUnchanged)
|
||||
require.Equal(t, 0, stats.Messages)
|
||||
|
||||
require.NoError(t, os.WriteFile(messagePath, []byte(`{"id":"333333333333333347","channel_id":"111111111111111121","content":"second incremental message","timestamp":"2026-04-23T18:20:44Z","author":{"id":"222222222222222233","username":"bob"}}`), 0o600))
|
||||
require.NoError(t, os.Chtimes(messagePath, time.Date(2026, 4, 23, 18, 21, 0, 0, time.UTC), time.Date(2026, 4, 23, 18, 21, 0, 0, time.UTC)))
|
||||
|
||||
stats, err = Import(ctx, st, Options{Path: dir})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, stats.FilesScanned)
|
||||
require.Equal(t, 1, stats.FilesUnchanged)
|
||||
require.Equal(t, 1, stats.Messages)
|
||||
|
||||
results, err := st.SearchMessages(ctx, store.SearchOptions{Query: "second incremental", Limit: 10})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, results, 1)
|
||||
require.Equal(t, "999999999999999996", results[0].GuildID)
|
||||
require.Equal(t, "wiretap-fast", results[0].ChannelName)
|
||||
}
|
||||
|
||||
func TestImportDryRunDoesNotWrite(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dir := t.TempDir()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user