From 199057a0e4c096edea9baf2d139e91a0aef08807 Mon Sep 17 00:00:00 2001 From: Katherine Date: Fri, 6 Mar 2026 13:37:49 -0500 Subject: [PATCH] Guarantee linearity for all search keys --- cmd/internal/config/config.go | 17 +- cmd/kt-server/main.go | 14 +- cmd/kt-server/stream.go | 247 +++++++++++++------------- cmd/kt-server/stream_test.go | 324 +++++++++++++++++++++++----------- 4 files changed, 368 insertions(+), 234 deletions(-) diff --git a/cmd/internal/config/config.go b/cmd/internal/config/config.go index ef9466d..251fc9e 100644 --- a/cmd/internal/config/config.go +++ b/cmd/internal/config/config.go @@ -125,8 +125,11 @@ type FakeUpdates struct { } type StreamConfig struct { - Name envstr `yaml:"name"` - // If Name is provided but TableName is not, backfill will not be attempted. + AciStreamName envstr `yaml:"aci-stream-name"` + E164StreamName envstr `yaml:"e164-stream-name"` + UsernameStreamName envstr `yaml:"username-stream-name"` + + // If TableName is not provided, backfill will not be attempted. TableName envstr `yaml:"table"` } @@ -260,8 +263,14 @@ func Read(filename string) (*Config, error) { } if parsed.StreamConfig != nil { - if parsed.StreamConfig.Name == "" { - return nil, fmt.Errorf("field not provided: stream.name") + if parsed.StreamConfig.AciStreamName == "" { + return nil, fmt.Errorf("field not provided: stream.aci-stream-name") + } + if parsed.StreamConfig.E164StreamName == "" { + return nil, fmt.Errorf("field not provided: stream.e164-stream-name") + } + if parsed.StreamConfig.UsernameStreamName == "" { + return nil, fmt.Errorf("field not provided: stream.username-stream-name") } } diff --git a/cmd/kt-server/main.go b/cmd/kt-server/main.go index 243ae3d..4cd0517 100644 --- a/cmd/kt-server/main.go +++ b/cmd/kt-server/main.go @@ -198,8 +198,18 @@ func main() { util.Log().Fatalf("stream backfill failed: %v", err) } } - util.Log().Infof("Starting stream processing from Kinesis stream %q", config.StreamConfig.Name.String()) - s.run(ctx, config.StreamConfig.Name.String(), streamStartTimestamp, updateHandler) + + for streamName, updateFromStreamFunc := range map[string]updateFunc{ + config.StreamConfig.AciStreamName.String(): updateFromAciStream, + config.StreamConfig.E164StreamName.String(): updateFromE164Stream, + config.StreamConfig.UsernameStreamName.String(): updateFromUsernameStream, + } { + util.Log().Infof("Starting stream processing from Kinesis stream: %s", streamName) + go func() { + s.run(ctx, streamName, streamStartTimestamp, updateHandler, updateFromStreamFunc) + }() + } + }() } diff --git a/cmd/kt-server/stream.go b/cmd/kt-server/stream.go index 3809e43..e67006e 100644 --- a/cmd/kt-server/stream.go +++ b/cmd/kt-server/stream.go @@ -117,9 +117,9 @@ type shardState struct { sinceLast int // waitGroup tracks when all pending updates have been processed. waitGroup sync.WaitGroup - // aciLocks is a map from ACI to chan struct{}. It prevents multiple updates - // from being processed for the same ACI simultaneously. - aciLocks sync.Map + // searchKeyLocks is a map from a search key to chan struct{}. It prevents multiple updates + // from being processed for the same search key simultaneously. + searchKeyLocks sync.Map // didFail is set to true if any updates failed to process. didFail atomic.Bool } @@ -137,20 +137,20 @@ func (ss *shardState) wait() bool { return ss.didFail.Load() } -// lockACI blocks until it is able to get an exclusive lock on the given ACI. No +// lockSearchKey blocks until it is able to get an exclusive lock on the given search key. No // other goroutines are able to obtain a lock until `unlock` is called. -func (ss *shardState) lockACI(aci []byte) (unlock func()) { - aciString := fmt.Sprintf("%x", aci) +func (ss *shardState) lockSearchKey(searchKey []byte) (unlock func()) { + searchKeyString := fmt.Sprintf("%x", searchKey) ch := make(chan struct{}) for { - existing, locked := ss.aciLocks.LoadOrStore(aciString, ch) + existing, locked := ss.searchKeyLocks.LoadOrStore(searchKeyString, ch) if locked { - // This ACI is already locked. Wait for it to be unlocked and retry. + // This search key is already locked. Wait for it to be unlocked and retry. <-existing.(chan struct{}) continue } return func() { - ss.aciLocks.CompareAndDelete(aciString, ch) + ss.searchKeyLocks.CompareAndDelete(searchKeyString, ch) close(ch) } } @@ -165,7 +165,7 @@ type Streamer struct { } // run runs the streamer, blocking forever. -func (s *Streamer) run(ctx context.Context, name string, startAtTimestamp *time.Time, updateHandler *KtUpdateHandler) { +func (s *Streamer) run(ctx context.Context, name string, startAtTimestamp *time.Time, updateHandler *KtUpdateHandler, updateFunc updateFunc) { i := 0 for { // Create a new context and shard map for each run. @@ -216,7 +216,7 @@ func (s *Streamer) run(ctx context.Context, name string, startAtTimestamp *time. state.failed() return default: - err := updateFromStream(ctx, data, state, updateHandler, logUpdater) + err := updateFunc(ctx, data, state, updateHandler, logUpdater) if err != nil { util.Log().Infof("failed to update entry from stream: %v", err) metrics.IncrCounter([]string{withinStream, "errors"}, 1) @@ -312,19 +312,6 @@ func backfillScanOutput(ctx context.Context, scan *dynamodb.ScanOutput, updateHa return nil } -type accountPair struct { - Prev *account `json:"prev"` - Next *account `json:"next"` -} - -type account struct { - ACI []byte `json:"aci"` - ACIIdentityKey []byte `json:"aciIdentityKey"` - - Number string `json:"number"` - UsernameHash []byte `json:"usernameHash"` -} - func updateFromBackfill(ctx context.Context, item map[string]types.AttributeValue, updateHandler *KtUpdateHandler, updater Updater) error { type backfillAccount struct { Number string `json:"number"` @@ -384,114 +371,122 @@ func updateFromBackfill(ctx context.Context, item map[string]types.AttributeValu return nil } -func updateFromStream(ctx context.Context, data []byte, state *shardState, updateHandler *KtUpdateHandler, updater Updater) error { - pair := &accountPair{} +type mappingPair struct { + PrevKey []byte + PrevVal []byte + NextKey []byte + NextVal []byte + Type string +} + +func update(ctx context.Context, state *shardState, updateHandler *KtUpdateHandler, updater Updater, pair *mappingPair) error { + if pair.PrevKey == nil && pair.NextKey == nil { + // This should never happen, but we want to know about it if it does + metrics.IncrCounterWithLabels([]string{"stream_empty_pair"}, 1, []metrics.Label{{Name: "search_key_type", Value: pair.Type}}) + return nil + } else if pair.NextKey == nil { + defer state.lockSearchKey(pair.PrevKey)() + if err := updater.update(ctx, withinStream, + pair.PrevKey, tombstoneBytes, updateHandler, marshalValue(pair.PrevVal)); err != nil { + return fmt.Errorf("updating %s: %w", pair.Type, err) + } + } else { + defer state.lockSearchKey(pair.NextKey)() + if !bytes.Equal(pair.PrevVal, pair.NextVal) { + if err := updater.update(ctx, withinStream, + pair.NextKey, marshalValue(pair.NextVal), updateHandler, nil); err != nil { + return fmt.Errorf("updating %s: %w", pair.Type, err) + } + } else { + // This should also never happen, but we want to know about it if it does + metrics.IncrCounterWithLabels([]string{"stream_duplicate_pair"}, 1, []metrics.Label{{Name: "search_key_type", Value: pair.Type}}) + } + } + return nil +} + +type SearchKey interface { + e164 | usernameHash | aci +} + +type streamPair[T SearchKey] struct { + Prev *T `json:"prev"` + Next *T `json:"next"` +} + +func updateFromStream[T SearchKey]( + ctx context.Context, + data []byte, + state *shardState, + updateHandler *KtUpdateHandler, + updater Updater, + streamType string, + extractKeyVal func(*T) (key []byte, value []byte), +) error { + pair := &streamPair[T]{} if err := json.Unmarshal(data, pair); err != nil { // Note: This is not a temporary error and will intentionally cause the // scanner to get stuck until new code is deployed that can handle // whatever is in the stream. - return fmt.Errorf("unmarshaling from stream: %w", err) - } else if pair.Prev == nil && pair.Next == nil { - // This should never happen, but we want to know about it if it does - metrics.IncrCounter([]string{"stream_empty_pair"}, 1) - return nil - } else if pair.Prev == nil { - defer state.lockACI(pair.Next.ACI)() - - // New registration. ACI and number should always be present on these updates. - if err := updater.update(ctx, withinStream, - append([]byte{shared.AciPrefix}, pair.Next.ACI...), - marshalValue(pair.Next.ACIIdentityKey), updateHandler, nil); err != nil { - return fmt.Errorf("updating ACI: %w", err) - } - - if err := updater.update(ctx, withinStream, - append([]byte{shared.NumberPrefix}, []byte(pair.Next.Number)...), - marshalValue(pair.Next.ACI), updateHandler, nil); err != nil { - return fmt.Errorf("updating number: %w", err) - } - - if len(pair.Next.UsernameHash) > 0 { - if err := updater.update(ctx, withinStream, - append([]byte{shared.UsernameHashPrefix}, pair.Next.UsernameHash...), - marshalValue(pair.Next.ACI), updateHandler, nil); err != nil { - return fmt.Errorf("updating username hash: %w", err) - } - } - } else if pair.Next == nil { - defer state.lockACI(pair.Prev.ACI)() - - // Account deletion. Overwrite all associated mappings to point to a tombstone value. - if err := updater.update(ctx, withinStream, - append([]byte{shared.AciPrefix}, pair.Prev.ACI...), - tombstoneBytes, updateHandler, marshalValue(pair.Prev.ACIIdentityKey)); err != nil { - return fmt.Errorf("updating ACI: %w", err) - } - - if err := updater.update(ctx, withinStream, - append([]byte{shared.NumberPrefix}, []byte(pair.Prev.Number)...), - tombstoneBytes, updateHandler, marshalValue(pair.Prev.ACI)); err != nil { - return fmt.Errorf("updating number: %w", err) - } - - if len(pair.Prev.UsernameHash) > 0 { - if err := updater.update(ctx, withinStream, - append([]byte{shared.UsernameHashPrefix}, pair.Prev.UsernameHash...), - tombstoneBytes, updateHandler, marshalValue(pair.Prev.ACI)); err != nil { - return fmt.Errorf("updating username hash: %w", err) - } - } - } else { - defer state.lockACI(pair.Next.ACI)() - - if !bytes.Equal(pair.Prev.ACIIdentityKey, pair.Next.ACIIdentityKey) { - if err := updater.update(ctx, withinStream, - append([]byte{shared.AciPrefix}, pair.Next.ACI...), - marshalValue(pair.Next.ACIIdentityKey), updateHandler, nil); err != nil { - return fmt.Errorf("updating ACI: %w", err) - } - } - - if !bytes.Equal(pair.Prev.UsernameHash, pair.Next.UsernameHash) { - if len(pair.Prev.UsernameHash) > 0 { - // Tombstone the old username hash - if err := updater.update(ctx, withinStream, - append([]byte{shared.UsernameHashPrefix}, pair.Prev.UsernameHash...), - tombstoneBytes, updateHandler, marshalValue(pair.Prev.ACI)); err != nil { - return fmt.Errorf("updating username hash: %w", err) - } - } - - if len(pair.Next.UsernameHash) > 0 { - if err := updater.update(ctx, withinStream, - append([]byte{shared.UsernameHashPrefix}, pair.Next.UsernameHash...), - marshalValue(pair.Next.ACI), updateHandler, nil); err != nil { - return fmt.Errorf("updating username hash: %w", err) - } - } - } - - if pair.Prev.Number != pair.Next.Number { - if len(pair.Prev.Number) > 0 { - // Tombstone the old phone number - if err := updater.update(ctx, withinStream, - append([]byte{shared.NumberPrefix}, pair.Prev.Number...), - tombstoneBytes, updateHandler, marshalValue(pair.Prev.ACI)); err != nil { - return fmt.Errorf("updating number: %w", err) - } - } - - if len(pair.Next.Number) > 0 { - if err := updater.update(ctx, withinStream, - append([]byte{shared.NumberPrefix}, []byte(pair.Next.Number)...), - marshalValue(pair.Next.ACI), updateHandler, nil); err != nil { - return fmt.Errorf("updating number: %w", err) - } - } - } + return fmt.Errorf("unmarshaling from %s stream: %w", streamType, err) } - return nil + var prevKey, prevVal, nextKey, nextVal []byte + if pair.Prev != nil { + prevKey, prevVal = extractKeyVal(pair.Prev) + } + if pair.Next != nil { + nextKey, nextVal = extractKeyVal(pair.Next) + } + + return update(ctx, state, updateHandler, updater, &mappingPair{ + PrevKey: prevKey, + PrevVal: prevVal, + NextKey: nextKey, + NextVal: nextVal, + Type: streamType, + }) +} + +type e164 struct { + Number string `json:"e164"` + ACI []byte `json:"aci"` +} + +type updateFunc func(context.Context, []byte, *shardState, *KtUpdateHandler, Updater) error + +func updateFromE164Stream(ctx context.Context, data []byte, state *shardState, updateHandler *KtUpdateHandler, updater Updater) error { + return updateFromStream(ctx, data, state, updateHandler, updater, "number", + func(e *e164) (key []byte, value []byte) { + return append([]byte{shared.NumberPrefix}, []byte(e.Number)...), e.ACI + }, + ) +} + +type usernameHash struct { + UsernameHash []byte `json:"usernameHash"` + ACI []byte `json:"aci"` +} + +func updateFromUsernameStream(ctx context.Context, data []byte, state *shardState, updateHandler *KtUpdateHandler, updater Updater) error { + return updateFromStream(ctx, data, state, updateHandler, updater, "usernameHash", + func(u *usernameHash) (key []byte, value []byte) { + return append([]byte{shared.UsernameHashPrefix}, u.UsernameHash...), u.ACI + }, + ) +} + +type aci struct { + ACI []byte `json:"aci"` + ACIIdentityKey []byte `json:"aciIdentityKey"` +} + +func updateFromAciStream(ctx context.Context, data []byte, state *shardState, updateHandler *KtUpdateHandler, updater Updater) error { + return updateFromStream(ctx, data, state, updateHandler, updater, "aci", + func(a *aci) (key []byte, value []byte) { + return append([]byte{shared.AciPrefix}, a.ACI...), a.ACIIdentityKey + }, + ) } // Updater interface supports mocking in tests diff --git a/cmd/kt-server/stream_test.go b/cmd/kt-server/stream_test.go index c4df0b5..d0522da 100644 --- a/cmd/kt-server/stream_test.go +++ b/cmd/kt-server/stream_test.go @@ -20,9 +20,8 @@ import ( ) var ( + validAci2 = random(16) validAciIdentityKey2 = createDistinctValue(validAciIdentityKey1) - validUsernameHash2 = createDistinctValue(validUsernameHash1) - validPhoneNumber2 = "+14155550102" ) type mockLogUpdater struct { @@ -40,136 +39,84 @@ type expectedUpdateInputs struct { preUpdateValue []byte } -var testUpdateAccountPairs = []struct { - prev *account - next *account - expectedNumUpdates int - expectedUpdateInputs []expectedUpdateInputs -}{ +var testUpdateAciPairs = []testCase[aci]{ // No change { - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1}, - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1}, + &streamPair[aci]{ + Prev: &aci{ + ACI: validAci1, + ACIIdentityKey: validAciIdentityKey1, + }, + Next: &aci{ + ACI: validAci1, + ACIIdentityKey: validAciIdentityKey1, + }, + }, 0, []expectedUpdateInputs{}, }, - // New registration + // Add ACI { - nil, - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1}, - 3, + &streamPair[aci]{ + Prev: nil, + Next: &aci{ + ACI: validAci1, + ACIIdentityKey: validAciIdentityKey1, + }, + }, + 1, []expectedUpdateInputs{ {key: append([]byte{shared.AciPrefix}, validAci1...), value: marshalValue(validAciIdentityKey1), preUpdateValue: nil}, - {key: append([]byte{shared.NumberPrefix}, []byte(validPhoneNumber1)...), value: marshalValue(validAci1), preUpdateValue: nil}, - {key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: marshalValue(validAci1), preUpdateValue: nil}}, + }, }, - // Re-registration - the server sets the old username to null but keeps it reserved for the client to reclaim + // Update ACI mapping { - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1}, - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey2, Number: validPhoneNumber1}, - 2, + &streamPair[aci]{ + Prev: &aci{ + ACI: validAci1, + ACIIdentityKey: validAciIdentityKey1, + }, + Next: &aci{ + ACI: validAci1, + ACIIdentityKey: validAciIdentityKey2, + }, + }, + 1, []expectedUpdateInputs{ {key: append([]byte{shared.AciPrefix}, validAci1...), value: marshalValue(validAciIdentityKey2), preUpdateValue: nil}, - {key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}}, + }, }, - // Re-registration - client reclaims username + // Delete ACI { - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey2, Number: validPhoneNumber1}, - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey2, UsernameHash: validUsernameHash1, Number: validPhoneNumber1}, + &streamPair[aci]{ + Prev: &aci{ + ACI: validAci1, + ACIIdentityKey: validAciIdentityKey1, + }, + Next: nil, + }, 1, - []expectedUpdateInputs{ - {key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: marshalValue(validAci1), preUpdateValue: nil}}, - }, - // Some re-registrations do not change the identity key - { - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1}, - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, Number: validPhoneNumber1}, - 1, - []expectedUpdateInputs{ - {key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}}, - }, - // Account deletion with username - { - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1}, - nil, - 3, []expectedUpdateInputs{ {key: append([]byte{shared.AciPrefix}, validAci1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAciIdentityKey1)}, - {key: append([]byte{shared.NumberPrefix}, []byte(validPhoneNumber1)...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}, - {key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}}, - }, - // Account deletion with no username - { - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, Number: validPhoneNumber1}, - nil, - 2, - []expectedUpdateInputs{ - {key: append([]byte{shared.AciPrefix}, validAci1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAciIdentityKey1)}, - {key: append([]byte{shared.NumberPrefix}, []byte(validPhoneNumber1)...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}}, - }, - // Username change - { - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1}, - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash2, Number: validPhoneNumber1}, - 2, - []expectedUpdateInputs{ - {key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash2...), value: marshalValue(validAci1), preUpdateValue: nil}, - {key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}}, - }, - // Phone number change - { - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1}, - &account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber2}, - 2, - []expectedUpdateInputs{ - {key: append([]byte{shared.NumberPrefix}, validPhoneNumber2...), value: marshalValue(validAci1), preUpdateValue: nil}, - {key: append([]byte{shared.NumberPrefix}, validPhoneNumber1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}}, + }, }, } -func TestUpdateFromStream(t *testing.T) { - mockConfig, _ := config.Read(mockConfigFile) - mockTransparencyStore := db.NewMemoryTransparencyStore() - updateRequestChannel := make(chan updateRequest) - mockUpdateHandler := &KtUpdateHandler{ - config: mockConfig.APIConfig, - tx: mockTransparencyStore, - ch: updateRequestChannel, - } - state := &shardState{} - - for _, p := range testUpdateAccountPairs { - mockUpdater := new(mockLogUpdater) - - accounts := &accountPair{Prev: p.prev, Next: p.next} - marshaledData, err := json.Marshal(accounts) - if err != nil { - t.Fatalf("Unexpected error marshaling acocunt pair") - } - - for _, pair := range p.expectedUpdateInputs { - mockUpdater.On("update", mock.Anything, mock.Anything, pair.key, pair.value, mock.Anything, pair.preUpdateValue).Return(nil) - } - - err = updateFromStream(context.Background(), marshaledData, state, mockUpdateHandler, mockUpdater) - - assert.NoError(t, err) - mockUpdater.AssertNumberOfCalls(t, "update", p.expectedNumUpdates) - mockUpdater.AssertExpectations(t) - } +func TestUpdateFromAciStream(t *testing.T) { + testStreamUpdate[aci](t, testUpdateAciPairs, updateFromAciStream) } -func TestLockACI(t *testing.T) { +func TestLockSearchKey(t *testing.T) { const parallel = 5 state := &shardState{} - defer state.lockACI([]byte("other"))() + defer state.lockSearchKey([]byte("other"))() counter := 0 output := make(chan int) for range parallel { go func() { - defer state.lockACI([]byte("label"))() + defer state.lockSearchKey([]byte("label"))() output <- counter time.Sleep(1 * time.Millisecond) counter++ @@ -182,3 +129,176 @@ func TestLockACI(t *testing.T) { } } } + +type testCase[T SearchKey] struct { + pair *streamPair[T] + expectedNumUpdates int + expectedUpdateInputs []expectedUpdateInputs +} + +func testStreamUpdate[T SearchKey](t *testing.T, + pairs []testCase[T], + updaterFunc func(context.Context, []byte, *shardState, *KtUpdateHandler, Updater) error) { + mockConfig, _ := config.Read(mockConfigFile) + mockTransparencyStore := db.NewMemoryTransparencyStore() + updateRequestChannel := make(chan updateRequest) + mockUpdateHandler := &KtUpdateHandler{ + config: mockConfig.APIConfig, + tx: mockTransparencyStore, + ch: updateRequestChannel, + } + state := &shardState{} + + for _, p := range pairs { + mockUpdater := new(mockLogUpdater) + + marshaledData, err := json.Marshal(p.pair) + if err != nil { + t.Fatalf("Unexpected error marshaling e164 streamPair") + } + + for _, pair := range p.expectedUpdateInputs { + mockUpdater.On("update", mock.Anything, mock.Anything, pair.key, pair.value, mock.Anything, pair.preUpdateValue).Return(nil) + } + + err = updaterFunc(context.Background(), marshaledData, state, mockUpdateHandler, mockUpdater) + + assert.NoError(t, err) + mockUpdater.AssertNumberOfCalls(t, "update", p.expectedNumUpdates) + mockUpdater.AssertExpectations(t) + } +} + +var testUpdateE164Pairs = []testCase[e164]{ + // No change + { + &streamPair[e164]{ + Prev: &e164{ + Number: validPhoneNumber1, + ACI: validAci1, + }, + Next: &e164{ + Number: validPhoneNumber1, + ACI: validAci1, + }, + }, + 0, + []expectedUpdateInputs{}, + }, + // Add E164 + { + &streamPair[e164]{ + Prev: nil, + Next: &e164{ + Number: validPhoneNumber1, + ACI: validAci1, + }, + }, + 1, + []expectedUpdateInputs{ + {key: append([]byte{shared.NumberPrefix}, []byte(validPhoneNumber1)...), value: marshalValue(validAci1), preUpdateValue: nil}, + }, + }, + // Delete E164 + { + &streamPair[e164]{ + Prev: &e164{ + Number: validPhoneNumber1, + ACI: validAci1, + }, + Next: nil, + }, + 1, + []expectedUpdateInputs{ + {key: append([]byte{shared.NumberPrefix}, []byte(validPhoneNumber1)...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}, + }, + }, + // Update E164 + { + &streamPair[e164]{ + Prev: &e164{ + Number: validPhoneNumber1, + ACI: validAci1, + }, + Next: &e164{ + Number: validPhoneNumber1, + ACI: validAci2, + }, + }, + 1, + []expectedUpdateInputs{ + {key: append([]byte{shared.NumberPrefix}, []byte(validPhoneNumber1)...), value: marshalValue(validAci2), preUpdateValue: nil}, + }, + }, +} + +func TestUpdateFromE164Stream(t *testing.T) { + testStreamUpdate[e164](t, testUpdateE164Pairs, updateFromE164Stream) +} + +var testUpdateUsernameHashPairs = []testCase[usernameHash]{ + // No change + { + &streamPair[usernameHash]{ + Prev: &usernameHash{ + UsernameHash: validUsernameHash1, + ACI: validAci1, + }, + Next: &usernameHash{ + UsernameHash: validUsernameHash1, + ACI: validAci1, + }, + }, + 0, + []expectedUpdateInputs{}, + }, + // Add username hash + { + &streamPair[usernameHash]{ + Prev: nil, + Next: &usernameHash{ + UsernameHash: validUsernameHash1, + ACI: validAci1, + }, + }, + 1, + []expectedUpdateInputs{ + {key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: marshalValue(validAci1), preUpdateValue: nil}, + }, + }, + // Delete username hash + { + &streamPair[usernameHash]{ + Prev: &usernameHash{ + UsernameHash: validUsernameHash1, + ACI: validAci1, + }, + Next: nil, + }, + 1, + []expectedUpdateInputs{ + {key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}, + }, + }, + // Update username hash + { + &streamPair[usernameHash]{ + Prev: &usernameHash{ + UsernameHash: validUsernameHash1, + ACI: validAci1, + }, + Next: &usernameHash{ + UsernameHash: validUsernameHash1, + ACI: validAci2, + }, + }, + 1, + []expectedUpdateInputs{ + {key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: marshalValue(validAci2), preUpdateValue: nil}, + }, + }, +} + +func TestUpdateFromUsernameHashStream(t *testing.T) { + testStreamUpdate[usernameHash](t, testUpdateUsernameHashPairs, updateFromUsernameStream) +}