This commit is contained in:
parent
aadb00136b
commit
199057a0e4
@ -125,8 +125,11 @@ type FakeUpdates struct {
|
||||
}
|
||||
|
||||
type StreamConfig struct {
|
||||
Name envstr `yaml:"name"`
|
||||
// If Name is provided but TableName is not, backfill will not be attempted.
|
||||
AciStreamName envstr `yaml:"aci-stream-name"`
|
||||
E164StreamName envstr `yaml:"e164-stream-name"`
|
||||
UsernameStreamName envstr `yaml:"username-stream-name"`
|
||||
|
||||
// If TableName is not provided, backfill will not be attempted.
|
||||
TableName envstr `yaml:"table"`
|
||||
}
|
||||
|
||||
@ -260,8 +263,14 @@ func Read(filename string) (*Config, error) {
|
||||
}
|
||||
|
||||
if parsed.StreamConfig != nil {
|
||||
if parsed.StreamConfig.Name == "" {
|
||||
return nil, fmt.Errorf("field not provided: stream.name")
|
||||
if parsed.StreamConfig.AciStreamName == "" {
|
||||
return nil, fmt.Errorf("field not provided: stream.aci-stream-name")
|
||||
}
|
||||
if parsed.StreamConfig.E164StreamName == "" {
|
||||
return nil, fmt.Errorf("field not provided: stream.e164-stream-name")
|
||||
}
|
||||
if parsed.StreamConfig.UsernameStreamName == "" {
|
||||
return nil, fmt.Errorf("field not provided: stream.username-stream-name")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -198,8 +198,18 @@ func main() {
|
||||
util.Log().Fatalf("stream backfill failed: %v", err)
|
||||
}
|
||||
}
|
||||
util.Log().Infof("Starting stream processing from Kinesis stream %q", config.StreamConfig.Name.String())
|
||||
s.run(ctx, config.StreamConfig.Name.String(), streamStartTimestamp, updateHandler)
|
||||
|
||||
for streamName, updateFromStreamFunc := range map[string]updateFunc{
|
||||
config.StreamConfig.AciStreamName.String(): updateFromAciStream,
|
||||
config.StreamConfig.E164StreamName.String(): updateFromE164Stream,
|
||||
config.StreamConfig.UsernameStreamName.String(): updateFromUsernameStream,
|
||||
} {
|
||||
util.Log().Infof("Starting stream processing from Kinesis stream: %s", streamName)
|
||||
go func() {
|
||||
s.run(ctx, streamName, streamStartTimestamp, updateHandler, updateFromStreamFunc)
|
||||
}()
|
||||
}
|
||||
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
@ -117,9 +117,9 @@ type shardState struct {
|
||||
sinceLast int
|
||||
// waitGroup tracks when all pending updates have been processed.
|
||||
waitGroup sync.WaitGroup
|
||||
// aciLocks is a map from ACI to chan struct{}. It prevents multiple updates
|
||||
// from being processed for the same ACI simultaneously.
|
||||
aciLocks sync.Map
|
||||
// searchKeyLocks is a map from a search key to chan struct{}. It prevents multiple updates
|
||||
// from being processed for the same search key simultaneously.
|
||||
searchKeyLocks sync.Map
|
||||
// didFail is set to true if any updates failed to process.
|
||||
didFail atomic.Bool
|
||||
}
|
||||
@ -137,20 +137,20 @@ func (ss *shardState) wait() bool {
|
||||
return ss.didFail.Load()
|
||||
}
|
||||
|
||||
// lockACI blocks until it is able to get an exclusive lock on the given ACI. No
|
||||
// lockSearchKey blocks until it is able to get an exclusive lock on the given search key. No
|
||||
// other goroutines are able to obtain a lock until `unlock` is called.
|
||||
func (ss *shardState) lockACI(aci []byte) (unlock func()) {
|
||||
aciString := fmt.Sprintf("%x", aci)
|
||||
func (ss *shardState) lockSearchKey(searchKey []byte) (unlock func()) {
|
||||
searchKeyString := fmt.Sprintf("%x", searchKey)
|
||||
ch := make(chan struct{})
|
||||
for {
|
||||
existing, locked := ss.aciLocks.LoadOrStore(aciString, ch)
|
||||
existing, locked := ss.searchKeyLocks.LoadOrStore(searchKeyString, ch)
|
||||
if locked {
|
||||
// This ACI is already locked. Wait for it to be unlocked and retry.
|
||||
// This search key is already locked. Wait for it to be unlocked and retry.
|
||||
<-existing.(chan struct{})
|
||||
continue
|
||||
}
|
||||
return func() {
|
||||
ss.aciLocks.CompareAndDelete(aciString, ch)
|
||||
ss.searchKeyLocks.CompareAndDelete(searchKeyString, ch)
|
||||
close(ch)
|
||||
}
|
||||
}
|
||||
@ -165,7 +165,7 @@ type Streamer struct {
|
||||
}
|
||||
|
||||
// run runs the streamer, blocking forever.
|
||||
func (s *Streamer) run(ctx context.Context, name string, startAtTimestamp *time.Time, updateHandler *KtUpdateHandler) {
|
||||
func (s *Streamer) run(ctx context.Context, name string, startAtTimestamp *time.Time, updateHandler *KtUpdateHandler, updateFunc updateFunc) {
|
||||
i := 0
|
||||
for {
|
||||
// Create a new context and shard map for each run.
|
||||
@ -216,7 +216,7 @@ func (s *Streamer) run(ctx context.Context, name string, startAtTimestamp *time.
|
||||
state.failed()
|
||||
return
|
||||
default:
|
||||
err := updateFromStream(ctx, data, state, updateHandler, logUpdater)
|
||||
err := updateFunc(ctx, data, state, updateHandler, logUpdater)
|
||||
if err != nil {
|
||||
util.Log().Infof("failed to update entry from stream: %v", err)
|
||||
metrics.IncrCounter([]string{withinStream, "errors"}, 1)
|
||||
@ -312,19 +312,6 @@ func backfillScanOutput(ctx context.Context, scan *dynamodb.ScanOutput, updateHa
|
||||
return nil
|
||||
}
|
||||
|
||||
type accountPair struct {
|
||||
Prev *account `json:"prev"`
|
||||
Next *account `json:"next"`
|
||||
}
|
||||
|
||||
type account struct {
|
||||
ACI []byte `json:"aci"`
|
||||
ACIIdentityKey []byte `json:"aciIdentityKey"`
|
||||
|
||||
Number string `json:"number"`
|
||||
UsernameHash []byte `json:"usernameHash"`
|
||||
}
|
||||
|
||||
func updateFromBackfill(ctx context.Context, item map[string]types.AttributeValue, updateHandler *KtUpdateHandler, updater Updater) error {
|
||||
type backfillAccount struct {
|
||||
Number string `json:"number"`
|
||||
@ -384,114 +371,122 @@ func updateFromBackfill(ctx context.Context, item map[string]types.AttributeValu
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateFromStream(ctx context.Context, data []byte, state *shardState, updateHandler *KtUpdateHandler, updater Updater) error {
|
||||
pair := &accountPair{}
|
||||
type mappingPair struct {
|
||||
PrevKey []byte
|
||||
PrevVal []byte
|
||||
NextKey []byte
|
||||
NextVal []byte
|
||||
Type string
|
||||
}
|
||||
|
||||
func update(ctx context.Context, state *shardState, updateHandler *KtUpdateHandler, updater Updater, pair *mappingPair) error {
|
||||
if pair.PrevKey == nil && pair.NextKey == nil {
|
||||
// This should never happen, but we want to know about it if it does
|
||||
metrics.IncrCounterWithLabels([]string{"stream_empty_pair"}, 1, []metrics.Label{{Name: "search_key_type", Value: pair.Type}})
|
||||
return nil
|
||||
} else if pair.NextKey == nil {
|
||||
defer state.lockSearchKey(pair.PrevKey)()
|
||||
if err := updater.update(ctx, withinStream,
|
||||
pair.PrevKey, tombstoneBytes, updateHandler, marshalValue(pair.PrevVal)); err != nil {
|
||||
return fmt.Errorf("updating %s: %w", pair.Type, err)
|
||||
}
|
||||
} else {
|
||||
defer state.lockSearchKey(pair.NextKey)()
|
||||
if !bytes.Equal(pair.PrevVal, pair.NextVal) {
|
||||
if err := updater.update(ctx, withinStream,
|
||||
pair.NextKey, marshalValue(pair.NextVal), updateHandler, nil); err != nil {
|
||||
return fmt.Errorf("updating %s: %w", pair.Type, err)
|
||||
}
|
||||
} else {
|
||||
// This should also never happen, but we want to know about it if it does
|
||||
metrics.IncrCounterWithLabels([]string{"stream_duplicate_pair"}, 1, []metrics.Label{{Name: "search_key_type", Value: pair.Type}})
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type SearchKey interface {
|
||||
e164 | usernameHash | aci
|
||||
}
|
||||
|
||||
type streamPair[T SearchKey] struct {
|
||||
Prev *T `json:"prev"`
|
||||
Next *T `json:"next"`
|
||||
}
|
||||
|
||||
func updateFromStream[T SearchKey](
|
||||
ctx context.Context,
|
||||
data []byte,
|
||||
state *shardState,
|
||||
updateHandler *KtUpdateHandler,
|
||||
updater Updater,
|
||||
streamType string,
|
||||
extractKeyVal func(*T) (key []byte, value []byte),
|
||||
) error {
|
||||
pair := &streamPair[T]{}
|
||||
if err := json.Unmarshal(data, pair); err != nil {
|
||||
// Note: This is not a temporary error and will intentionally cause the
|
||||
// scanner to get stuck until new code is deployed that can handle
|
||||
// whatever is in the stream.
|
||||
return fmt.Errorf("unmarshaling from stream: %w", err)
|
||||
} else if pair.Prev == nil && pair.Next == nil {
|
||||
// This should never happen, but we want to know about it if it does
|
||||
metrics.IncrCounter([]string{"stream_empty_pair"}, 1)
|
||||
return nil
|
||||
} else if pair.Prev == nil {
|
||||
defer state.lockACI(pair.Next.ACI)()
|
||||
|
||||
// New registration. ACI and number should always be present on these updates.
|
||||
if err := updater.update(ctx, withinStream,
|
||||
append([]byte{shared.AciPrefix}, pair.Next.ACI...),
|
||||
marshalValue(pair.Next.ACIIdentityKey), updateHandler, nil); err != nil {
|
||||
return fmt.Errorf("updating ACI: %w", err)
|
||||
}
|
||||
|
||||
if err := updater.update(ctx, withinStream,
|
||||
append([]byte{shared.NumberPrefix}, []byte(pair.Next.Number)...),
|
||||
marshalValue(pair.Next.ACI), updateHandler, nil); err != nil {
|
||||
return fmt.Errorf("updating number: %w", err)
|
||||
}
|
||||
|
||||
if len(pair.Next.UsernameHash) > 0 {
|
||||
if err := updater.update(ctx, withinStream,
|
||||
append([]byte{shared.UsernameHashPrefix}, pair.Next.UsernameHash...),
|
||||
marshalValue(pair.Next.ACI), updateHandler, nil); err != nil {
|
||||
return fmt.Errorf("updating username hash: %w", err)
|
||||
}
|
||||
}
|
||||
} else if pair.Next == nil {
|
||||
defer state.lockACI(pair.Prev.ACI)()
|
||||
|
||||
// Account deletion. Overwrite all associated mappings to point to a tombstone value.
|
||||
if err := updater.update(ctx, withinStream,
|
||||
append([]byte{shared.AciPrefix}, pair.Prev.ACI...),
|
||||
tombstoneBytes, updateHandler, marshalValue(pair.Prev.ACIIdentityKey)); err != nil {
|
||||
return fmt.Errorf("updating ACI: %w", err)
|
||||
}
|
||||
|
||||
if err := updater.update(ctx, withinStream,
|
||||
append([]byte{shared.NumberPrefix}, []byte(pair.Prev.Number)...),
|
||||
tombstoneBytes, updateHandler, marshalValue(pair.Prev.ACI)); err != nil {
|
||||
return fmt.Errorf("updating number: %w", err)
|
||||
}
|
||||
|
||||
if len(pair.Prev.UsernameHash) > 0 {
|
||||
if err := updater.update(ctx, withinStream,
|
||||
append([]byte{shared.UsernameHashPrefix}, pair.Prev.UsernameHash...),
|
||||
tombstoneBytes, updateHandler, marshalValue(pair.Prev.ACI)); err != nil {
|
||||
return fmt.Errorf("updating username hash: %w", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
defer state.lockACI(pair.Next.ACI)()
|
||||
|
||||
if !bytes.Equal(pair.Prev.ACIIdentityKey, pair.Next.ACIIdentityKey) {
|
||||
if err := updater.update(ctx, withinStream,
|
||||
append([]byte{shared.AciPrefix}, pair.Next.ACI...),
|
||||
marshalValue(pair.Next.ACIIdentityKey), updateHandler, nil); err != nil {
|
||||
return fmt.Errorf("updating ACI: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if !bytes.Equal(pair.Prev.UsernameHash, pair.Next.UsernameHash) {
|
||||
if len(pair.Prev.UsernameHash) > 0 {
|
||||
// Tombstone the old username hash
|
||||
if err := updater.update(ctx, withinStream,
|
||||
append([]byte{shared.UsernameHashPrefix}, pair.Prev.UsernameHash...),
|
||||
tombstoneBytes, updateHandler, marshalValue(pair.Prev.ACI)); err != nil {
|
||||
return fmt.Errorf("updating username hash: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(pair.Next.UsernameHash) > 0 {
|
||||
if err := updater.update(ctx, withinStream,
|
||||
append([]byte{shared.UsernameHashPrefix}, pair.Next.UsernameHash...),
|
||||
marshalValue(pair.Next.ACI), updateHandler, nil); err != nil {
|
||||
return fmt.Errorf("updating username hash: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if pair.Prev.Number != pair.Next.Number {
|
||||
if len(pair.Prev.Number) > 0 {
|
||||
// Tombstone the old phone number
|
||||
if err := updater.update(ctx, withinStream,
|
||||
append([]byte{shared.NumberPrefix}, pair.Prev.Number...),
|
||||
tombstoneBytes, updateHandler, marshalValue(pair.Prev.ACI)); err != nil {
|
||||
return fmt.Errorf("updating number: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(pair.Next.Number) > 0 {
|
||||
if err := updater.update(ctx, withinStream,
|
||||
append([]byte{shared.NumberPrefix}, []byte(pair.Next.Number)...),
|
||||
marshalValue(pair.Next.ACI), updateHandler, nil); err != nil {
|
||||
return fmt.Errorf("updating number: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("unmarshaling from %s stream: %w", streamType, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
var prevKey, prevVal, nextKey, nextVal []byte
|
||||
if pair.Prev != nil {
|
||||
prevKey, prevVal = extractKeyVal(pair.Prev)
|
||||
}
|
||||
if pair.Next != nil {
|
||||
nextKey, nextVal = extractKeyVal(pair.Next)
|
||||
}
|
||||
|
||||
return update(ctx, state, updateHandler, updater, &mappingPair{
|
||||
PrevKey: prevKey,
|
||||
PrevVal: prevVal,
|
||||
NextKey: nextKey,
|
||||
NextVal: nextVal,
|
||||
Type: streamType,
|
||||
})
|
||||
}
|
||||
|
||||
type e164 struct {
|
||||
Number string `json:"e164"`
|
||||
ACI []byte `json:"aci"`
|
||||
}
|
||||
|
||||
type updateFunc func(context.Context, []byte, *shardState, *KtUpdateHandler, Updater) error
|
||||
|
||||
func updateFromE164Stream(ctx context.Context, data []byte, state *shardState, updateHandler *KtUpdateHandler, updater Updater) error {
|
||||
return updateFromStream(ctx, data, state, updateHandler, updater, "number",
|
||||
func(e *e164) (key []byte, value []byte) {
|
||||
return append([]byte{shared.NumberPrefix}, []byte(e.Number)...), e.ACI
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
type usernameHash struct {
|
||||
UsernameHash []byte `json:"usernameHash"`
|
||||
ACI []byte `json:"aci"`
|
||||
}
|
||||
|
||||
func updateFromUsernameStream(ctx context.Context, data []byte, state *shardState, updateHandler *KtUpdateHandler, updater Updater) error {
|
||||
return updateFromStream(ctx, data, state, updateHandler, updater, "usernameHash",
|
||||
func(u *usernameHash) (key []byte, value []byte) {
|
||||
return append([]byte{shared.UsernameHashPrefix}, u.UsernameHash...), u.ACI
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
type aci struct {
|
||||
ACI []byte `json:"aci"`
|
||||
ACIIdentityKey []byte `json:"aciIdentityKey"`
|
||||
}
|
||||
|
||||
func updateFromAciStream(ctx context.Context, data []byte, state *shardState, updateHandler *KtUpdateHandler, updater Updater) error {
|
||||
return updateFromStream(ctx, data, state, updateHandler, updater, "aci",
|
||||
func(a *aci) (key []byte, value []byte) {
|
||||
return append([]byte{shared.AciPrefix}, a.ACI...), a.ACIIdentityKey
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// Updater interface supports mocking in tests
|
||||
|
||||
@ -20,9 +20,8 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
validAci2 = random(16)
|
||||
validAciIdentityKey2 = createDistinctValue(validAciIdentityKey1)
|
||||
validUsernameHash2 = createDistinctValue(validUsernameHash1)
|
||||
validPhoneNumber2 = "+14155550102"
|
||||
)
|
||||
|
||||
type mockLogUpdater struct {
|
||||
@ -40,136 +39,84 @@ type expectedUpdateInputs struct {
|
||||
preUpdateValue []byte
|
||||
}
|
||||
|
||||
var testUpdateAccountPairs = []struct {
|
||||
prev *account
|
||||
next *account
|
||||
expectedNumUpdates int
|
||||
expectedUpdateInputs []expectedUpdateInputs
|
||||
}{
|
||||
var testUpdateAciPairs = []testCase[aci]{
|
||||
// No change
|
||||
{
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1},
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1},
|
||||
&streamPair[aci]{
|
||||
Prev: &aci{
|
||||
ACI: validAci1,
|
||||
ACIIdentityKey: validAciIdentityKey1,
|
||||
},
|
||||
Next: &aci{
|
||||
ACI: validAci1,
|
||||
ACIIdentityKey: validAciIdentityKey1,
|
||||
},
|
||||
},
|
||||
0,
|
||||
[]expectedUpdateInputs{},
|
||||
},
|
||||
// New registration
|
||||
// Add ACI
|
||||
{
|
||||
nil,
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1},
|
||||
3,
|
||||
&streamPair[aci]{
|
||||
Prev: nil,
|
||||
Next: &aci{
|
||||
ACI: validAci1,
|
||||
ACIIdentityKey: validAciIdentityKey1,
|
||||
},
|
||||
},
|
||||
1,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.AciPrefix}, validAci1...), value: marshalValue(validAciIdentityKey1), preUpdateValue: nil},
|
||||
{key: append([]byte{shared.NumberPrefix}, []byte(validPhoneNumber1)...), value: marshalValue(validAci1), preUpdateValue: nil},
|
||||
{key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: marshalValue(validAci1), preUpdateValue: nil}},
|
||||
},
|
||||
},
|
||||
// Re-registration - the server sets the old username to null but keeps it reserved for the client to reclaim
|
||||
// Update ACI mapping
|
||||
{
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1},
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey2, Number: validPhoneNumber1},
|
||||
2,
|
||||
&streamPair[aci]{
|
||||
Prev: &aci{
|
||||
ACI: validAci1,
|
||||
ACIIdentityKey: validAciIdentityKey1,
|
||||
},
|
||||
Next: &aci{
|
||||
ACI: validAci1,
|
||||
ACIIdentityKey: validAciIdentityKey2,
|
||||
},
|
||||
},
|
||||
1,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.AciPrefix}, validAci1...), value: marshalValue(validAciIdentityKey2), preUpdateValue: nil},
|
||||
{key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}},
|
||||
},
|
||||
},
|
||||
// Re-registration - client reclaims username
|
||||
// Delete ACI
|
||||
{
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey2, Number: validPhoneNumber1},
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey2, UsernameHash: validUsernameHash1, Number: validPhoneNumber1},
|
||||
&streamPair[aci]{
|
||||
Prev: &aci{
|
||||
ACI: validAci1,
|
||||
ACIIdentityKey: validAciIdentityKey1,
|
||||
},
|
||||
Next: nil,
|
||||
},
|
||||
1,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: marshalValue(validAci1), preUpdateValue: nil}},
|
||||
},
|
||||
// Some re-registrations do not change the identity key
|
||||
{
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1},
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, Number: validPhoneNumber1},
|
||||
1,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}},
|
||||
},
|
||||
// Account deletion with username
|
||||
{
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1},
|
||||
nil,
|
||||
3,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.AciPrefix}, validAci1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAciIdentityKey1)},
|
||||
{key: append([]byte{shared.NumberPrefix}, []byte(validPhoneNumber1)...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)},
|
||||
{key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}},
|
||||
},
|
||||
// Account deletion with no username
|
||||
{
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, Number: validPhoneNumber1},
|
||||
nil,
|
||||
2,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.AciPrefix}, validAci1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAciIdentityKey1)},
|
||||
{key: append([]byte{shared.NumberPrefix}, []byte(validPhoneNumber1)...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}},
|
||||
},
|
||||
// Username change
|
||||
{
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1},
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash2, Number: validPhoneNumber1},
|
||||
2,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash2...), value: marshalValue(validAci1), preUpdateValue: nil},
|
||||
{key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}},
|
||||
},
|
||||
// Phone number change
|
||||
{
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber1},
|
||||
&account{ACI: validAci1, ACIIdentityKey: validAciIdentityKey1, UsernameHash: validUsernameHash1, Number: validPhoneNumber2},
|
||||
2,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.NumberPrefix}, validPhoneNumber2...), value: marshalValue(validAci1), preUpdateValue: nil},
|
||||
{key: append([]byte{shared.NumberPrefix}, validPhoneNumber1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestUpdateFromStream(t *testing.T) {
|
||||
mockConfig, _ := config.Read(mockConfigFile)
|
||||
mockTransparencyStore := db.NewMemoryTransparencyStore()
|
||||
updateRequestChannel := make(chan updateRequest)
|
||||
mockUpdateHandler := &KtUpdateHandler{
|
||||
config: mockConfig.APIConfig,
|
||||
tx: mockTransparencyStore,
|
||||
ch: updateRequestChannel,
|
||||
}
|
||||
state := &shardState{}
|
||||
|
||||
for _, p := range testUpdateAccountPairs {
|
||||
mockUpdater := new(mockLogUpdater)
|
||||
|
||||
accounts := &accountPair{Prev: p.prev, Next: p.next}
|
||||
marshaledData, err := json.Marshal(accounts)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error marshaling acocunt pair")
|
||||
}
|
||||
|
||||
for _, pair := range p.expectedUpdateInputs {
|
||||
mockUpdater.On("update", mock.Anything, mock.Anything, pair.key, pair.value, mock.Anything, pair.preUpdateValue).Return(nil)
|
||||
}
|
||||
|
||||
err = updateFromStream(context.Background(), marshaledData, state, mockUpdateHandler, mockUpdater)
|
||||
|
||||
assert.NoError(t, err)
|
||||
mockUpdater.AssertNumberOfCalls(t, "update", p.expectedNumUpdates)
|
||||
mockUpdater.AssertExpectations(t)
|
||||
}
|
||||
func TestUpdateFromAciStream(t *testing.T) {
|
||||
testStreamUpdate[aci](t, testUpdateAciPairs, updateFromAciStream)
|
||||
}
|
||||
|
||||
func TestLockACI(t *testing.T) {
|
||||
func TestLockSearchKey(t *testing.T) {
|
||||
const parallel = 5
|
||||
|
||||
state := &shardState{}
|
||||
defer state.lockACI([]byte("other"))()
|
||||
defer state.lockSearchKey([]byte("other"))()
|
||||
|
||||
counter := 0
|
||||
output := make(chan int)
|
||||
for range parallel {
|
||||
go func() {
|
||||
defer state.lockACI([]byte("label"))()
|
||||
defer state.lockSearchKey([]byte("label"))()
|
||||
output <- counter
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
counter++
|
||||
@ -182,3 +129,176 @@ func TestLockACI(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type testCase[T SearchKey] struct {
|
||||
pair *streamPair[T]
|
||||
expectedNumUpdates int
|
||||
expectedUpdateInputs []expectedUpdateInputs
|
||||
}
|
||||
|
||||
func testStreamUpdate[T SearchKey](t *testing.T,
|
||||
pairs []testCase[T],
|
||||
updaterFunc func(context.Context, []byte, *shardState, *KtUpdateHandler, Updater) error) {
|
||||
mockConfig, _ := config.Read(mockConfigFile)
|
||||
mockTransparencyStore := db.NewMemoryTransparencyStore()
|
||||
updateRequestChannel := make(chan updateRequest)
|
||||
mockUpdateHandler := &KtUpdateHandler{
|
||||
config: mockConfig.APIConfig,
|
||||
tx: mockTransparencyStore,
|
||||
ch: updateRequestChannel,
|
||||
}
|
||||
state := &shardState{}
|
||||
|
||||
for _, p := range pairs {
|
||||
mockUpdater := new(mockLogUpdater)
|
||||
|
||||
marshaledData, err := json.Marshal(p.pair)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error marshaling e164 streamPair")
|
||||
}
|
||||
|
||||
for _, pair := range p.expectedUpdateInputs {
|
||||
mockUpdater.On("update", mock.Anything, mock.Anything, pair.key, pair.value, mock.Anything, pair.preUpdateValue).Return(nil)
|
||||
}
|
||||
|
||||
err = updaterFunc(context.Background(), marshaledData, state, mockUpdateHandler, mockUpdater)
|
||||
|
||||
assert.NoError(t, err)
|
||||
mockUpdater.AssertNumberOfCalls(t, "update", p.expectedNumUpdates)
|
||||
mockUpdater.AssertExpectations(t)
|
||||
}
|
||||
}
|
||||
|
||||
var testUpdateE164Pairs = []testCase[e164]{
|
||||
// No change
|
||||
{
|
||||
&streamPair[e164]{
|
||||
Prev: &e164{
|
||||
Number: validPhoneNumber1,
|
||||
ACI: validAci1,
|
||||
},
|
||||
Next: &e164{
|
||||
Number: validPhoneNumber1,
|
||||
ACI: validAci1,
|
||||
},
|
||||
},
|
||||
0,
|
||||
[]expectedUpdateInputs{},
|
||||
},
|
||||
// Add E164
|
||||
{
|
||||
&streamPair[e164]{
|
||||
Prev: nil,
|
||||
Next: &e164{
|
||||
Number: validPhoneNumber1,
|
||||
ACI: validAci1,
|
||||
},
|
||||
},
|
||||
1,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.NumberPrefix}, []byte(validPhoneNumber1)...), value: marshalValue(validAci1), preUpdateValue: nil},
|
||||
},
|
||||
},
|
||||
// Delete E164
|
||||
{
|
||||
&streamPair[e164]{
|
||||
Prev: &e164{
|
||||
Number: validPhoneNumber1,
|
||||
ACI: validAci1,
|
||||
},
|
||||
Next: nil,
|
||||
},
|
||||
1,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.NumberPrefix}, []byte(validPhoneNumber1)...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)},
|
||||
},
|
||||
},
|
||||
// Update E164
|
||||
{
|
||||
&streamPair[e164]{
|
||||
Prev: &e164{
|
||||
Number: validPhoneNumber1,
|
||||
ACI: validAci1,
|
||||
},
|
||||
Next: &e164{
|
||||
Number: validPhoneNumber1,
|
||||
ACI: validAci2,
|
||||
},
|
||||
},
|
||||
1,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.NumberPrefix}, []byte(validPhoneNumber1)...), value: marshalValue(validAci2), preUpdateValue: nil},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestUpdateFromE164Stream(t *testing.T) {
|
||||
testStreamUpdate[e164](t, testUpdateE164Pairs, updateFromE164Stream)
|
||||
}
|
||||
|
||||
var testUpdateUsernameHashPairs = []testCase[usernameHash]{
|
||||
// No change
|
||||
{
|
||||
&streamPair[usernameHash]{
|
||||
Prev: &usernameHash{
|
||||
UsernameHash: validUsernameHash1,
|
||||
ACI: validAci1,
|
||||
},
|
||||
Next: &usernameHash{
|
||||
UsernameHash: validUsernameHash1,
|
||||
ACI: validAci1,
|
||||
},
|
||||
},
|
||||
0,
|
||||
[]expectedUpdateInputs{},
|
||||
},
|
||||
// Add username hash
|
||||
{
|
||||
&streamPair[usernameHash]{
|
||||
Prev: nil,
|
||||
Next: &usernameHash{
|
||||
UsernameHash: validUsernameHash1,
|
||||
ACI: validAci1,
|
||||
},
|
||||
},
|
||||
1,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: marshalValue(validAci1), preUpdateValue: nil},
|
||||
},
|
||||
},
|
||||
// Delete username hash
|
||||
{
|
||||
&streamPair[usernameHash]{
|
||||
Prev: &usernameHash{
|
||||
UsernameHash: validUsernameHash1,
|
||||
ACI: validAci1,
|
||||
},
|
||||
Next: nil,
|
||||
},
|
||||
1,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: tombstoneBytes, preUpdateValue: marshalValue(validAci1)},
|
||||
},
|
||||
},
|
||||
// Update username hash
|
||||
{
|
||||
&streamPair[usernameHash]{
|
||||
Prev: &usernameHash{
|
||||
UsernameHash: validUsernameHash1,
|
||||
ACI: validAci1,
|
||||
},
|
||||
Next: &usernameHash{
|
||||
UsernameHash: validUsernameHash1,
|
||||
ACI: validAci2,
|
||||
},
|
||||
},
|
||||
1,
|
||||
[]expectedUpdateInputs{
|
||||
{key: append([]byte{shared.UsernameHashPrefix}, validUsernameHash1...), value: marshalValue(validAci2), preUpdateValue: nil},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestUpdateFromUsernameHashStream(t *testing.T) {
|
||||
testStreamUpdate[usernameHash](t, testUpdateUsernameHashPairs, updateFromUsernameStream)
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user