Fix closure capture bug in reading from streams
Some checks failed
CI / test (push) Has been cancelled

This commit is contained in:
Katherine 2026-03-12 12:26:20 -04:00 committed by GitHub
parent 5fb822474e
commit a90dec4e53
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -186,12 +186,12 @@ func main() {
s := &Streamer{config: config.APIConfig, tx: cachedTransparencyStore.Clone()}
go func() {
var streamStartTimestamp *time.Time
var backfillStreamStartTimestamp *time.Time
if first && config.StreamConfig.TableName != "" {
// start the stream from when the backfill started, minus some padding for clock drift
start := time.Now().Add(-time.Minute * 15)
streamStartTimestamp = &start
backfillStreamStartTimestamp = &start
util.Log().Infof("Backfilling from DynamoDB table %q", config.StreamConfig.TableName)
if err := backfill(ctx, config.StreamConfig.TableName.String(), updateHandler); err != nil {
@ -205,10 +205,14 @@ func main() {
config.StreamConfig.E164StreamName.String(): updateFromE164Stream,
config.StreamConfig.UsernameStreamName.String(): updateFromUsernameStream,
} {
streamStartTimestamp = nil
if slices.Contains(config.StreamConfig.NewStreams, streamName) {
start := time.Now().Add(-time.Minute * 15)
streamStartTimestamp = &start
var streamStartTimestamp *time.Time
if backfillStreamStartTimestamp != nil {
streamStartTimestamp = backfillStreamStartTimestamp
} else {
if slices.Contains(config.StreamConfig.NewStreams, streamName) {
start := time.Now().Add(-time.Minute * 15)
streamStartTimestamp = &start
}
}
util.Log().Infof("Starting stream processing from Kinesis stream: %s", streamName)