From a90dec4e5344fd4299254c3781492e1218ed7af5 Mon Sep 17 00:00:00 2001 From: Katherine Date: Thu, 12 Mar 2026 12:26:20 -0400 Subject: [PATCH] Fix closure capture bug in reading from streams --- cmd/kt-server/main.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/cmd/kt-server/main.go b/cmd/kt-server/main.go index 7e7692e..0b54395 100644 --- a/cmd/kt-server/main.go +++ b/cmd/kt-server/main.go @@ -186,12 +186,12 @@ func main() { s := &Streamer{config: config.APIConfig, tx: cachedTransparencyStore.Clone()} go func() { - var streamStartTimestamp *time.Time + var backfillStreamStartTimestamp *time.Time if first && config.StreamConfig.TableName != "" { // start the stream from when the backfill started, minus some padding for clock drift start := time.Now().Add(-time.Minute * 15) - streamStartTimestamp = &start + backfillStreamStartTimestamp = &start util.Log().Infof("Backfilling from DynamoDB table %q", config.StreamConfig.TableName) if err := backfill(ctx, config.StreamConfig.TableName.String(), updateHandler); err != nil { @@ -205,10 +205,14 @@ func main() { config.StreamConfig.E164StreamName.String(): updateFromE164Stream, config.StreamConfig.UsernameStreamName.String(): updateFromUsernameStream, } { - streamStartTimestamp = nil - if slices.Contains(config.StreamConfig.NewStreams, streamName) { - start := time.Now().Add(-time.Minute * 15) - streamStartTimestamp = &start + var streamStartTimestamp *time.Time + if backfillStreamStartTimestamp != nil { + streamStartTimestamp = backfillStreamStartTimestamp + } else { + if slices.Contains(config.StreamConfig.NewStreams, streamName) { + start := time.Now().Add(-time.Minute * 15) + streamStartTimestamp = &start + } } util.Log().Infof("Starting stream processing from Kinesis stream: %s", streamName)