This commit is contained in:
parent
59cbdd1fe0
commit
3407212661
@ -128,6 +128,7 @@ type StreamConfig struct {
|
||||
AciStreamName envstr `yaml:"aci-stream-name"`
|
||||
E164StreamName envstr `yaml:"e164-stream-name"`
|
||||
UsernameStreamName envstr `yaml:"username-stream-name"`
|
||||
CheckpointSize uint `yaml:"checkpoint-size"`
|
||||
|
||||
NewStreams []string `yaml:"new-streams"`
|
||||
|
||||
@ -274,6 +275,9 @@ func Read(filename string) (*Config, error) {
|
||||
if parsed.StreamConfig.UsernameStreamName == "" {
|
||||
return nil, fmt.Errorf("field not provided: stream.username-stream-name")
|
||||
}
|
||||
if parsed.StreamConfig.CheckpointSize == 0 {
|
||||
return nil, fmt.Errorf("stream.checkpoint-size cannot be 0")
|
||||
}
|
||||
}
|
||||
|
||||
if parsed.APIConfig.JitterPercent < 0 || parsed.APIConfig.JitterPercent > 100 {
|
||||
|
||||
@ -216,7 +216,7 @@ func main() {
|
||||
util.Log().Infof("%s stream start timestamp: %s", streamName, streamStartTimestamp.Format(time.RFC3339))
|
||||
}
|
||||
go func() {
|
||||
s.run(ctx, streamName, streamStartTimestamp, updateHandler, updateFromStreamFunc)
|
||||
s.run(ctx, streamName, streamStartTimestamp, updateHandler, updateFromStreamFunc, config.StreamConfig.CheckpointSize)
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
@ -165,7 +165,8 @@ type Streamer struct {
|
||||
}
|
||||
|
||||
// run runs the streamer, blocking forever.
|
||||
func (s *Streamer) run(ctx context.Context, name string, startAtTimestamp *time.Time, updateHandler *KtUpdateHandler, updateFunc updateFunc) {
|
||||
func (s *Streamer) run(ctx context.Context, name string, startAtTimestamp *time.Time, updateHandler *KtUpdateHandler,
|
||||
updateFunc updateFunc, checkpointSize uint) {
|
||||
i := 0
|
||||
for {
|
||||
// Create a new context and shard map for each run.
|
||||
@ -229,7 +230,7 @@ func (s *Streamer) run(ctx context.Context, name string, startAtTimestamp *time.
|
||||
}(runCtx, dup(r.Data), state)
|
||||
|
||||
// If only a few entries have been sequenced from this shard, move on.
|
||||
if state.sinceLast < 100 {
|
||||
if uint(state.sinceLast) < checkpointSize {
|
||||
return consumer.ErrSkipCheckpoint
|
||||
}
|
||||
// If many entries have been sequenced, we need to checkpoint. First
|
||||
|
||||
Loading…
Reference in New Issue
Block a user