Set an initial timestamp for new Kinesis streams
Some checks failed
CI / test (push) Has been cancelled
Some checks failed
CI / test (push) Has been cancelled
This commit is contained in:
parent
199057a0e4
commit
59cbdd1fe0
@ -129,6 +129,8 @@ type StreamConfig struct {
|
||||
E164StreamName envstr `yaml:"e164-stream-name"`
|
||||
UsernameStreamName envstr `yaml:"username-stream-name"`
|
||||
|
||||
NewStreams []string `yaml:"new-streams"`
|
||||
|
||||
// If TableName is not provided, backfill will not be attempted.
|
||||
TableName envstr `yaml:"table"`
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"slices"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@ -204,7 +205,16 @@ func main() {
|
||||
config.StreamConfig.E164StreamName.String(): updateFromE164Stream,
|
||||
config.StreamConfig.UsernameStreamName.String(): updateFromUsernameStream,
|
||||
} {
|
||||
streamStartTimestamp = nil
|
||||
if slices.Contains(config.StreamConfig.NewStreams, streamName) {
|
||||
start := time.Now().Add(-time.Minute * 15)
|
||||
streamStartTimestamp = &start
|
||||
}
|
||||
|
||||
util.Log().Infof("Starting stream processing from Kinesis stream: %s", streamName)
|
||||
if streamStartTimestamp != nil {
|
||||
util.Log().Infof("%s stream start timestamp: %s", streamName, streamStartTimestamp.Format(time.RFC3339))
|
||||
}
|
||||
go func() {
|
||||
s.run(ctx, streamName, streamStartTimestamp, updateHandler, updateFromStreamFunc)
|
||||
}()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user