multi: Consolidate waitgroup logic.

This switches the various subsystems over to use a new pattern that
consolidates the waitgroup logic in a single location.

This pattern is easier to reason about and less error prone since it's
trivial to see at a glance that the calls to Done are happening as
intended versus having to chase them down all over the code.
This commit is contained in:
Dave Collins 2023-10-24 15:27:57 -05:00
parent ae64e5164f
commit 4e3263c0d9
No known key found for this signature in database
GPG Key ID: B8904D9D9C93D1F2
8 changed files with 175 additions and 144 deletions

View File

@ -219,9 +219,7 @@ type ConnManager struct {
// with overall connection request count above.
assignIDMtx sync.Mutex
// The following fields are used for lifecycle management of the connection
// manager.
wg sync.WaitGroup
// quit is used for lifecycle management of the connection manager.
quit chan struct{}
// cfg specifies the configuration of the connection manager and is set at
@ -458,7 +456,6 @@ out:
}
}
cm.wg.Done()
log.Trace("Connection handler done")
}
@ -685,7 +682,6 @@ func (cm *ConnManager) listenHandler(ctx context.Context, listener net.Listener)
go cm.cfg.OnAccept(conn)
}
cm.wg.Done()
log.Tracef("Listener handler done for %s", listener.Addr())
}
@ -696,8 +692,12 @@ func (cm *ConnManager) Run(ctx context.Context) {
log.Trace("Starting connection manager")
// Start the connection handler goroutine.
cm.wg.Add(1)
go cm.connHandler(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
cm.connHandler(ctx)
wg.Done()
}()
// Start all the listeners so long as the caller requested them and provided
// a callback to be invoked when connections are accepted.
@ -706,8 +706,11 @@ func (cm *ConnManager) Run(ctx context.Context) {
listeners = cm.cfg.Listeners
}
for _, listener := range cm.cfg.Listeners {
cm.wg.Add(1)
go cm.listenHandler(ctx, listener)
wg.Add(1)
go func(listener net.Listener) {
cm.listenHandler(ctx, listener)
wg.Done()
}(listener)
}
// Start enough outbound connections to reach the target number when not
@ -729,8 +732,7 @@ func (cm *ConnManager) Run(ctx context.Context) {
// to recover anyways.
_ = listener.Close()
}
cm.wg.Wait()
wg.Wait()
log.Trace("Connection manager stopped")
}

View File

@ -128,7 +128,6 @@ type IndexSubscriber struct {
mtx sync.Mutex
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
quit chan struct{}
}
@ -345,7 +344,6 @@ func (s *IndexSubscriber) handleSyncSubscribers(ctx context.Context) {
for {
select {
case <-ctx.Done():
s.wg.Done()
return
case <-ticker.C:
@ -367,7 +365,6 @@ func (s *IndexSubscriber) handleIndexUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
s.wg.Done()
return
case ntfn := <-s.c:
@ -394,9 +391,16 @@ func (s *IndexSubscriber) handleIndexUpdates(ctx context.Context) {
//
// This should be run as a goroutine.
func (s *IndexSubscriber) Run(ctx context.Context) {
s.wg.Add(2)
go s.handleIndexUpdates(ctx)
go s.handleSyncSubscribers(ctx)
var wg sync.WaitGroup
wg.Add(2)
go func() {
s.handleIndexUpdates(ctx)
wg.Done()
}()
go func() {
s.handleSyncSubscribers(ctx)
wg.Done()
}()
// Stop all the subscriptions and shutdown the subscriber when the context
// is cancelled.
@ -411,6 +415,6 @@ func (s *IndexSubscriber) Run(ctx context.Context) {
}
s.mtx.Unlock()
s.cancel()
s.wg.Wait()
wg.Wait()
log.Tracef("Index subscriber stopped")
}

View File

@ -223,7 +223,6 @@ func (wg *waitGroup) Wait() {
// - Block direct access while generating new templates that will make the
// current template stale (e.g. new parent or new votes)
type BgBlkTmplGenerator struct {
wg sync.WaitGroup
quit chan struct{}
// These fields are provided by the caller when the generator is created and
@ -484,7 +483,6 @@ func (g *BgBlkTmplGenerator) notifySubscribersHandler(ctx context.Context) {
g.subscriptionMtx.Unlock()
case <-ctx.Done():
g.wg.Done()
return
}
}
@ -559,7 +557,6 @@ func (g *BgBlkTmplGenerator) regenQueueHandler(ctx context.Context) {
}
case <-ctx.Done():
g.wg.Done()
return
}
}
@ -1402,7 +1399,6 @@ func (g *BgBlkTmplGenerator) regenHandler(ctx context.Context) {
g.genTemplateAsync(ctx, TURNewParent)
case <-ctx.Done():
g.wg.Done()
return
}
}
@ -1477,8 +1473,6 @@ func (g *BgBlkTmplGenerator) ForceRegen() {
//
// This must be run as a goroutine.
func (g *BgBlkTmplGenerator) initialStartupHandler(ctx context.Context) {
defer g.wg.Done()
// Wait until the chain is synced when unsynced mining is not allowed.
if !g.cfg.AllowUnsyncedMining && !g.cfg.IsCurrent() {
ticker := time.NewTicker(time.Second)
@ -1515,14 +1509,27 @@ func (g *BgBlkTmplGenerator) initialStartupHandler(ctx context.Context) {
// necessary for it to function properly and blocks until the provided context
// is cancelled.
func (g *BgBlkTmplGenerator) Run(ctx context.Context) {
g.wg.Add(4)
go g.regenQueueHandler(ctx)
go g.regenHandler(ctx)
go g.notifySubscribersHandler(ctx)
go g.initialStartupHandler(ctx)
var wg sync.WaitGroup
wg.Add(4)
go func() {
g.regenQueueHandler(ctx)
wg.Done()
}()
go func() {
g.regenHandler(ctx)
wg.Done()
}()
go func() {
g.notifySubscribersHandler(ctx)
wg.Done()
}()
go func() {
g.initialStartupHandler(ctx)
wg.Done()
}()
// Shutdown the generator when the context is cancelled.
<-ctx.Done()
close(g.quit)
g.wg.Wait()
wg.Wait()
}

View File

@ -127,8 +127,6 @@ type CPUMiner struct {
normalMining bool
discreteMining bool
submitBlockLock sync.Mutex
wg sync.WaitGroup
workerWg sync.WaitGroup
updateNumWorkers chan struct{}
queryHashesPerSec chan float64
speedStats map[uint64]*speedStats
@ -194,7 +192,6 @@ out:
}
}
m.wg.Done()
log.Trace("CPU miner speed monitor done")
}
@ -380,8 +377,6 @@ func (m *CPUMiner) solveBlock(ctx context.Context, header *wire.BlockHeader,
func (m *CPUMiner) solver(ctx context.Context, template *mining.BlockTemplate,
speedStats *speedStats, isBlake3PowActive bool) {
defer m.workerWg.Done()
for {
if ctx.Err() != nil {
return
@ -464,10 +459,12 @@ func (m *CPUMiner) solver(ctx context.Context, template *mining.BlockTemplate,
// It must be run as a goroutine.
func (m *CPUMiner) generateBlocks(ctx context.Context, workerID uint64) {
log.Trace("Starting generate blocks worker")
defer func() {
m.workerWg.Done()
log.Trace("Generate blocks worker done")
}()
defer log.Trace("Generate blocks worker done")
// Separate waitgroup for solvers to ensure they are stopped prior to
// terminating the goroutine.
var solverWg sync.WaitGroup
defer solverWg.Wait()
// Subscribe for block template updates and ensure the subscription is
// stopped along with the worker.
@ -488,7 +485,8 @@ func (m *CPUMiner) generateBlocks(ctx context.Context, workerID uint64) {
case templateNtfn := <-templateSub.C():
// Clean up the map that tracks the number of blocks mined on a
// given parent whenever a template is received due to a new parent.
prevHash := templateNtfn.Template.Block.Header.PrevBlock
template := templateNtfn.Template
prevHash := template.Block.Header.PrevBlock
if m.cfg.PermitConnectionlessMining {
if templateNtfn.Reason == mining.TURNewParent {
m.Lock()
@ -516,9 +514,11 @@ func (m *CPUMiner) generateBlocks(ctx context.Context, workerID uint64) {
// Start another goroutine for the new template.
solverCtx, solverCancel = context.WithCancel(ctx)
m.workerWg.Add(1)
go m.solver(solverCtx, templateNtfn.Template, &speedStats,
isBlake3PowActive)
solverWg.Add(1)
go func() {
m.solver(solverCtx, template, &speedStats, isBlake3PowActive)
solverWg.Done()
}()
case <-ctx.Done():
// Ensure resources associated with the solver goroutine context are
@ -541,6 +541,11 @@ func (m *CPUMiner) generateBlocks(ctx context.Context, workerID uint64) {
//
// It must be run as a goroutine.
func (m *CPUMiner) miningWorkerController(ctx context.Context) {
// Separate waitgroup for workers to ensure they are stopped prior to
// terminating the goroutine.
var workerWg sync.WaitGroup
defer workerWg.Wait()
// launchWorker groups common code to launch a worker for subscribing for
// template updates and solving blocks.
type workerState struct {
@ -554,8 +559,11 @@ func (m *CPUMiner) miningWorkerController(ctx context.Context) {
cancel: wCancel,
})
m.workerWg.Add(1)
go m.generateBlocks(wCtx, curWorkerID)
workerWg.Add(1)
go func() {
m.generateBlocks(wCtx, curWorkerID)
workerWg.Done()
}()
curWorkerID++
}
@ -603,10 +611,6 @@ out:
break out
}
}
// Wait until all workers shut down.
m.workerWg.Wait()
m.wg.Done()
}
// Run starts the CPU miner with zero workers which means it will be idle. It
@ -617,14 +621,21 @@ out:
func (m *CPUMiner) Run(ctx context.Context) {
log.Trace("Starting CPU miner in idle state")
m.wg.Add(2)
go m.speedMonitor(ctx)
go m.miningWorkerController(ctx)
var wg sync.WaitGroup
wg.Add(2)
go func() {
m.speedMonitor(ctx)
wg.Done()
}()
go func() {
m.miningWorkerController(ctx)
wg.Done()
}()
// Shutdown the miner when the context is cancelled.
<-ctx.Done()
close(m.quit)
m.wg.Wait()
wg.Wait()
log.Trace("CPU miner stopped")
}

View File

@ -264,9 +264,7 @@ func (state *headerSyncState) resetStallTimeout() {
// SyncManager provides a concurrency safe sync manager for handling all
// incoming blocks.
type SyncManager struct {
// The following fields are used for lifecycle management of the sync
// manager.
wg sync.WaitGroup
// quit is used for lifecycle management of the sync manager.
quit chan struct{}
// cfg specifies the configuration of the sync manager and is set at
@ -1527,7 +1525,6 @@ out:
}
}
m.wg.Done()
log.Trace("Sync manager event handler done")
}
@ -1792,13 +1789,17 @@ func (m *SyncManager) Run(ctx context.Context) {
log.Trace("Starting sync manager")
// Start the event handler goroutine.
m.wg.Add(1)
go m.eventHandler(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
m.eventHandler(ctx)
wg.Done()
}()
// Shutdown the sync manager when the context is cancelled.
<-ctx.Done()
close(m.quit)
m.wg.Wait()
wg.Wait()
log.Trace("Sync manager stopped")
}

View File

@ -5014,7 +5014,6 @@ type Server struct {
ntfnMgr NtfnManager
statusLines map[int]string
statusLock sync.RWMutex
wg sync.WaitGroup
workState *workState
helpCacher RPCHelpCacher
requestProcessShutdown chan struct{}
@ -5141,21 +5140,6 @@ func (s *Server) writeHTTPResponseHeaders(req *http.Request, headers http.Header
return err
}
// shutdown terminates the processes of the rpc server.
func (s *Server) shutdown() error {
log.Warnf("RPC server shutting down")
for _, listener := range s.cfg.Listeners {
err := listener.Close()
if err != nil {
log.Errorf("Problem shutting down rpc: %v", err)
return err
}
}
s.wg.Wait()
log.Infof("RPC server shutdown complete")
return nil
}
// RequestedProcessShutdown returns a channel that is sent to when an
// authorized RPC client requests the process to shutdown. If the request can
// not be read immediately, it is dropped.
@ -5831,20 +5815,21 @@ func (s *Server) route(ctx context.Context) *http.Server {
func (s *Server) Run(ctx context.Context) {
log.Trace("Starting RPC server")
server := s.route(ctx)
var wg sync.WaitGroup
for _, listener := range s.cfg.Listeners {
s.wg.Add(1)
wg.Add(1)
go func(listener net.Listener) {
log.Infof("RPC server listening on %s", listener.Addr())
server.Serve(listener)
log.Tracef("RPC listener done for %s", listener.Addr())
s.wg.Done()
wg.Done()
}(listener)
}
// Subscribe for async work notifications when background template
// generation is enabled.
if len(s.cfg.MiningAddrs) > 0 && s.cfg.BlockTemplater != nil {
s.wg.Add(1)
wg.Add(1)
go func(s *Server, ctx context.Context) {
templateSub := s.cfg.BlockTemplater.Subscribe()
defer templateSub.Stop()
@ -5855,19 +5840,30 @@ func (s *Server) Run(ctx context.Context) {
s.ntfnMgr.NotifyWork(templateNtfn)
case <-ctx.Done():
s.wg.Done()
wg.Done()
return
}
}
}(s, ctx)
}
// Run the notification manager and wait for it to terminate.
s.ntfnMgr.Run(ctx)
err := s.shutdown()
if err != nil {
log.Error(err)
return
// Close all listeners and wait for all goroutines to terminate.
log.Warnf("RPC server shutting down")
var hasCloseErr bool
for _, listener := range s.cfg.Listeners {
err := listener.Close()
if err != nil {
log.Errorf("Failed to close listener %s: %v", listener.Addr(), err)
hasCloseErr = true
}
}
if !hasCloseErr {
wg.Wait()
}
log.Info("RPC server shutdown complete")
}
// Config is a descriptor containing the RPC server configuration.

View File

@ -153,9 +153,7 @@ type wsNotificationManager struct {
// Access channel for current number of connected clients.
numClients chan int
// The following fields are used for lifecycle management of the
// notification manager.
wg sync.WaitGroup
// quit is used for lifecycle management of the notification manager.
quit chan struct{}
}
@ -172,7 +170,6 @@ func (m *wsNotificationManager) queueHandler(ctx context.Context) {
select {
case <-ctx.Done():
close(m.notificationMsgs)
m.wg.Done()
return
case n := <-m.queueNotification:
@ -567,7 +564,6 @@ out:
for _, c := range clients {
c.Disconnect()
}
m.wg.Done()
}
// NumClients returns the number of clients actively being served.
@ -1204,14 +1200,21 @@ func (m *wsNotificationManager) RemoveClient(wsc *wsClient) {
// websocket client notifications. It blocks until the provided context is
// cancelled.
func (m *wsNotificationManager) Run(ctx context.Context) {
m.wg.Add(2)
go m.queueHandler(ctx)
go m.notificationHandler(ctx)
var wg sync.WaitGroup
wg.Add(2)
go func() {
m.queueHandler(ctx)
wg.Done()
}()
go func() {
m.notificationHandler(ctx)
wg.Done()
}()
// Shutdown the notification manager when the context is cancelled.
<-ctx.Done()
close(m.quit)
m.wg.Wait()
wg.Wait()
}
// newWsNotificationManager returns a new notification manager ready for use.
@ -1283,7 +1286,6 @@ type wsClient struct {
ntfnChan chan []byte
sendChan chan wsResponse
quit chan struct{}
wg sync.WaitGroup
}
// shouldLogReadError returns whether or not the passed error, which is expected
@ -1720,7 +1722,6 @@ out:
// Ensure the connection is closed.
c.Disconnect()
c.wg.Done()
log.Tracef("Websocket client input handler done for %s", c.addr)
}
@ -1808,7 +1809,6 @@ out:
}
}
c.wg.Done()
log.Tracef("Websocket client notification queue handler done "+
"for %s", c.addr)
}
@ -1837,7 +1837,6 @@ out:
}
}
c.wg.Done()
log.Tracef("Websocket client output handler done for %s", c.addr)
}
@ -1920,10 +1919,20 @@ func (c *wsClient) Run(ctx context.Context) {
log.Tracef("Starting websocket client %s", c.addr)
// Start processing input and output.
c.wg.Add(3)
go c.inHandler(ctx)
go c.notificationQueueHandler()
go c.outHandler()
var wg sync.WaitGroup
wg.Add(3)
go func() {
c.inHandler(ctx)
wg.Done()
}()
go func() {
c.notificationQueueHandler()
wg.Done()
}()
go func() {
c.outHandler()
wg.Done()
}()
// Forcibly disconnect the websocket client when the context is cancelled
// which also closes the quit channel and thus ensures all of the above
@ -1937,7 +1946,7 @@ func (c *wsClient) Run(ctx context.Context) {
case <-c.quit:
}
c.wg.Wait()
wg.Wait()
}
// newWebsocketClient returns a new websocket client given the notification

View File

@ -496,7 +496,6 @@ type server struct {
query chan interface{}
relayInv chan relayMsg
broadcast chan broadcastMsg
wg sync.WaitGroup
nat *upnpNAT
db database.DB
timeSource blockchain.MedianTimeSource
@ -2354,7 +2353,6 @@ out:
}
s.addrManager.Stop()
s.wg.Done()
srvrLog.Tracef("Peer handler done")
}
@ -2980,7 +2978,6 @@ func (s *server) rebroadcastHandler(ctx context.Context) {
case <-ctx.Done():
timer.Stop()
s.wg.Done()
return
}
}
@ -3031,56 +3028,65 @@ func (s *server) Run(ctx context.Context) {
srvrLog.Trace("Starting server")
// Start the peer handler which in turn starts the address manager.
s.wg.Add(1)
go s.peerHandler(ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
s.peerHandler(ctx)
wg.Done()
}()
// Start the sync manager.
s.wg.Add(1)
go func(ctx context.Context, s *server) {
wg.Add(1)
go func() {
s.syncManager.Run(ctx)
s.wg.Done()
}(ctx, s)
wg.Done()
}()
// Query the seeders and start the connection manager.
s.wg.Add(1)
go func(ctx context.Context, s *server) {
wg.Add(1)
go func() {
if !cfg.DisableSeeders {
s.querySeeders(ctx)
}
s.connManager.Run(ctx)
s.wg.Done()
}(ctx, s)
wg.Done()
}()
if s.nat != nil {
s.wg.Add(1)
go s.upnpUpdateThread(ctx)
wg.Add(1)
go func() {
s.upnpUpdateThread(ctx)
wg.Done()
}()
}
if !cfg.DisableRPC {
// Start the rebroadcastHandler, which ensures user tx received by
// the RPC server are rebroadcast until being included in a block.
s.wg.Add(1)
go s.rebroadcastHandler(ctx)
s.wg.Add(1)
go func(ctx context.Context, s *server) {
// Start the RPC server and rebroadcast handler which ensures
// transactions submitted to the RPC server are rebroadcast until being
// included in a block.
wg.Add(2)
go func() {
s.rebroadcastHandler(ctx)
wg.Done()
}()
go func() {
s.rpcServer.Run(ctx)
s.wg.Done()
}(ctx, s)
wg.Done()
}()
}
// Start the background block template generator and CPU miner if the config
// provides a mining address.
if len(cfg.miningAddrs) > 0 {
s.wg.Add(2)
go func(ctx context.Context, s *server) {
wg.Add(2)
go func() {
s.bg.Run(ctx)
s.wg.Done()
}(ctx, s)
go func(ctx context.Context, s *server) {
wg.Done()
}()
go func() {
s.cpuMiner.Run(ctx)
s.wg.Done()
}(ctx, s)
wg.Done()
}()
// The CPU miner is started without any workers which means it is idle.
// Start mining by setting the default number of workers when requested.
@ -3090,23 +3096,20 @@ func (s *server) Run(ctx context.Context) {
}
// Start the chain's index subscriber.
s.wg.Add(1)
go func(ctx context.Context, s *server) {
wg.Add(1)
go func() {
s.indexSubscriber.Run(ctx)
s.wg.Done()
}(ctx, s)
wg.Done()
}()
// Wait until the server is signalled to shutdown.
// Shutdown the server when the context is cancelled.
<-ctx.Done()
s.shutdown.Store(true)
srvrLog.Warnf("Server shutting down")
s.feeEstimator.Close()
s.chain.ShutdownUtxoCache()
s.wg.Wait()
wg.Wait()
srvrLog.Trace("Server stopped")
}
@ -3210,8 +3213,6 @@ out:
} else {
srvrLog.Debugf("successfully disestablished UPnP port mapping")
}
s.wg.Done()
}
// standardScriptVerifyFlags returns the script flags that should be used when