From 692cd4bc4f6787dc14ed93ee24f1f996e30e4b07 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 19 Jan 2023 06:37:57 +0800 Subject: [PATCH] lnutils+lntemp: move `SyncMap` to `lnutils` This commit moves the `SyncMap` from `lntemp/node` into `lnutils` so it can be used by other packages. --- lntemp/node/state.go | 37 ++-- lntemp/node/watcher.go | 13 +- {lntemp/node => lnutils}/sync_map.go | 2 +- lnutils/sync_map_bench_test.go | 255 +++++++++++++++++++++++++++ 4 files changed, 286 insertions(+), 21 deletions(-) rename {lntemp/node => lnutils}/sync_map.go (98%) create mode 100644 lnutils/sync_map_bench_test.go diff --git a/lntemp/node/state.go b/lntemp/node/state.go index e852bf9ab..2c3c45f02 100644 --- a/lntemp/node/state.go +++ b/lntemp/node/state.go @@ -10,6 +10,7 @@ import ( "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lntemp/rpc" + "github.com/lightningnetwork/lnd/lnutils" ) type ( @@ -153,18 +154,18 @@ type State struct { // openChans records each opened channel and how many times it has // heard the announcements from its graph subscription. - openChans *SyncMap[wire.OutPoint, []*OpenChannelUpdate] + openChans *lnutils.SyncMap[wire.OutPoint, []*OpenChannelUpdate] // closedChans records each closed channel and its close channel update // message received from its graph subscription. - closedChans *SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate] + closedChans *lnutils.SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate] // numChanUpdates records the number of channel updates seen by each // channel. - numChanUpdates *SyncMap[wire.OutPoint, int] + numChanUpdates *lnutils.SyncMap[wire.OutPoint, int] // nodeUpdates records the node announcements seen by each node. - nodeUpdates *SyncMap[string, []*lnrpc.NodeUpdate] + nodeUpdates *lnutils.SyncMap[string, []*lnrpc.NodeUpdate] // policyUpdates defines a type to store channel policy updates. It has // the format, @@ -179,21 +180,23 @@ type State struct { // }, // "chanPoint2": ... // } - policyUpdates *SyncMap[wire.OutPoint, PolicyUpdate] + policyUpdates *lnutils.SyncMap[wire.OutPoint, PolicyUpdate] } // newState initialize a new state with every field being set to its zero // value. func newState(rpc *rpc.HarnessRPC) *State { return &State{ - rpc: rpc, - openChans: &SyncMap[wire.OutPoint, []*OpenChannelUpdate]{}, - closedChans: &SyncMap[ + rpc: rpc, + openChans: &lnutils.SyncMap[ + wire.OutPoint, []*OpenChannelUpdate, + ]{}, + closedChans: &lnutils.SyncMap[ wire.OutPoint, *lnrpc.ClosedChannelUpdate, ]{}, - numChanUpdates: &SyncMap[wire.OutPoint, int]{}, - nodeUpdates: &SyncMap[string, []*lnrpc.NodeUpdate]{}, - policyUpdates: &SyncMap[wire.OutPoint, PolicyUpdate]{}, + numChanUpdates: &lnutils.SyncMap[wire.OutPoint, int]{}, + nodeUpdates: &lnutils.SyncMap[string, []*lnrpc.NodeUpdate]{}, + policyUpdates: &lnutils.SyncMap[wire.OutPoint, PolicyUpdate]{}, } } @@ -352,9 +355,11 @@ func (s *State) resetEphermalStates(rpc *rpc.HarnessRPC) { // Reset ephermal states which are used to record info from finished // tests. - s.openChans = &SyncMap[wire.OutPoint, []*OpenChannelUpdate]{} - s.closedChans = &SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate]{} - s.numChanUpdates = &SyncMap[wire.OutPoint, int]{} - s.nodeUpdates = &SyncMap[string, []*lnrpc.NodeUpdate]{} - s.policyUpdates = &SyncMap[wire.OutPoint, PolicyUpdate]{} + s.openChans = &lnutils.SyncMap[wire.OutPoint, []*OpenChannelUpdate]{} + s.closedChans = &lnutils.SyncMap[ + wire.OutPoint, *lnrpc.ClosedChannelUpdate, + ]{} + s.numChanUpdates = &lnutils.SyncMap[wire.OutPoint, int]{} + s.nodeUpdates = &lnutils.SyncMap[string, []*lnrpc.NodeUpdate]{} + s.policyUpdates = &lnutils.SyncMap[wire.OutPoint, PolicyUpdate]{} } diff --git a/lntemp/node/watcher.go b/lntemp/node/watcher.go index c081daaad..af0bf66cb 100644 --- a/lntemp/node/watcher.go +++ b/lntemp/node/watcher.go @@ -14,6 +14,7 @@ import ( "github.com/lightningnetwork/lnd/lntemp/rpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest/wait" + "github.com/lightningnetwork/lnd/lnutils" ) type chanWatchType uint8 @@ -68,8 +69,8 @@ type nodeWatcher struct { // of edges seen for that channel within the network. When this number // reaches 2, then it means that both edge advertisements has // propagated through the network. - openChanWatchers *SyncMap[wire.OutPoint, []chan struct{}] - closeChanWatchers *SyncMap[wire.OutPoint, []chan struct{}] + openChanWatchers *lnutils.SyncMap[wire.OutPoint, []chan struct{}] + closeChanWatchers *lnutils.SyncMap[wire.OutPoint, []chan struct{}] wg sync.WaitGroup } @@ -79,8 +80,12 @@ func newNodeWatcher(rpc *rpc.HarnessRPC, state *State) *nodeWatcher { rpc: rpc, state: state, chanWatchRequests: make(chan *chanWatchRequest, 100), - openChanWatchers: &SyncMap[wire.OutPoint, []chan struct{}]{}, - closeChanWatchers: &SyncMap[wire.OutPoint, []chan struct{}]{}, + openChanWatchers: &lnutils.SyncMap[ + wire.OutPoint, []chan struct{}, + ]{}, + closeChanWatchers: &lnutils.SyncMap[ + wire.OutPoint, []chan struct{}, + ]{}, } } diff --git a/lntemp/node/sync_map.go b/lnutils/sync_map.go similarity index 98% rename from lntemp/node/sync_map.go rename to lnutils/sync_map.go index 1ef157331..83789d7b5 100644 --- a/lntemp/node/sync_map.go +++ b/lnutils/sync_map.go @@ -1,4 +1,4 @@ -package node +package lnutils import "sync" diff --git a/lnutils/sync_map_bench_test.go b/lnutils/sync_map_bench_test.go new file mode 100644 index 000000000..e1e0cbc8f --- /dev/null +++ b/lnutils/sync_map_bench_test.go @@ -0,0 +1,255 @@ +package lnutils_test + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/lightningnetwork/lnd/lnutils" +) + +func BenchmarkReadMutexMap(b *testing.B) { + // Create a map with a mutex. + m := make(map[int64]struct{}) + + // k is the unique key for each goroutine. + k := int64(0) + + // Create a general mutex. + var mu sync.Mutex + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Perform a lock read. + mu.Lock() + _ = m[k] + mu.Unlock() + } + }) +} + +func BenchmarkReadRWMutexMap(b *testing.B) { + // Create a map with a mutex. + m := make(map[int64]struct{}) + + // k is the unique key for each goroutine. + k := int64(0) + + // Create a read write mutex. + var mu sync.RWMutex + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Perform a lock read. + mu.RLock() + _ = m[k] + mu.RUnlock() + } + }) +} + +func BenchmarkReadSyncMap(b *testing.B) { + // Create a sync.Map. + syncMap := &sync.Map{} + + // k is the unique key for each goroutine. + k := int64(0) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Read the value. + syncMap.Load(k) + } + }) +} + +func BenchmarkReadLndSyncMap(b *testing.B) { + // Create a sync.Map. + syncMap := &lnutils.SyncMap[int64, struct{}]{} + + // k is the unique key for each goroutine. + k := int64(0) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Read the value. + syncMap.Load(k) + } + }) +} + +func BenchmarkWriteMutexMap(b *testing.B) { + // Create a map with a mutex. + m := make(map[int64]struct{}) + + // k is the unique key for each goroutine. + k := int64(0) + + // Create a general mutex. + var mu sync.Mutex + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Perform a lock write. + mu.Lock() + m[k] = struct{}{} + mu.Unlock() + } + }) +} + +func BenchmarkWriteRWMutexMap(b *testing.B) { + // Create a map with a mutex. + m := make(map[int64]struct{}) + + // k is the unique key for each goroutine. + k := int64(0) + + // Create a read write mutex. + var mu sync.RWMutex + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Perform a lock write. + mu.Lock() + m[k] = struct{}{} + mu.Unlock() + } + }) +} + +func BenchmarkWriteSyncMap(b *testing.B) { + // Create a sync.Map. + syncMap := &sync.Map{} + + // k is the unique key for each goroutine. + k := int64(0) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Write the value. + syncMap.Store(k, struct{}{}) + } + }) +} + +func BenchmarkWriteLndSyncMap(b *testing.B) { + // Create a sync.Map. + syncMap := &lnutils.SyncMap[int64, struct{}]{} + + // k is the unique key for each goroutine. + k := int64(0) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Write the value. + syncMap.Store(k, struct{}{}) + } + }) +} + +func BenchmarkDeleteMutexMap(b *testing.B) { + // Create a map with a mutex. + m := make(map[int64]struct{}) + + // k is the unique key for each goroutine. + k := int64(0) + + // Create a general mutex. + var mu sync.Mutex + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Perform a lock delete. + mu.Lock() + delete(m, k) + mu.Unlock() + } + }) +} + +func BenchmarkDeleteRWMutexMap(b *testing.B) { + // Create a map with a mutex. + m := make(map[int64]struct{}) + + // k is the unique key for each goroutine. + k := int64(0) + + // Create a read write mutex. + var mu sync.RWMutex + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Perform a lock delete. + mu.Lock() + delete(m, k) + mu.Unlock() + } + }) +} + +func BenchmarkDeleteSyncMap(b *testing.B) { + // Create a sync.Map. + syncMap := &sync.Map{} + + // k is the unique key for each goroutine. + k := int64(0) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Delete the value. + syncMap.Delete(k) + } + }) +} + +func BenchmarkDeleteLndSyncMap(b *testing.B) { + // Create a sync.Map. + syncMap := &lnutils.SyncMap[int64, struct{}]{} + + // k is the unique key for each goroutine. + k := int64(0) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Increment k. + atomic.AddInt64(&k, 1) + + // Delete the value. + syncMap.Delete(k) + } + }) +}