From fd4aa751dd6574da9ea3c85acbdb49bec76b5f03 Mon Sep 17 00:00:00 2001 From: gram-signal <84339875+gram-signal@users.noreply.github.com> Date: Thu, 25 Jan 2024 19:53:00 -0700 Subject: [PATCH] Add regular liveness checks to service. --- host/config/config.go | 11 +++++--- host/health/health.go | 15 ++++++++--- host/health/health_test.go | 4 +-- host/service/service.go | 53 ++++++++++++++++++++++++++++++++++---- 4 files changed, 69 insertions(+), 14 deletions(-) diff --git a/host/config/config.go b/host/config/config.go index e08d1dc..8eda50b 100644 --- a/host/config/config.go +++ b/host/config/config.go @@ -43,6 +43,9 @@ type Config struct { RecurringRedisPeerDBTTL time.Duration `yaml:"recurringRedisPeerDBTTL"` // Configuration for the client websocket handler Request RequestConfig `yaml:"request"` + // Periodicity/timeout for local liveness checks + LocalLivenessCheckPeriod time.Duration `yaml:"localLivenessCheckPeriod"` + LocalLivenessCheckTimeout time.Duration `yaml:"localLivenessCheckTimeout"` } // validate returns a list of validation errors, or empty if there are no errors. @@ -133,8 +136,10 @@ func Default() *Config { WebsocketHandshakeTimeout: time.Second * 30, SocketTimeout: time.Second * 30, }, - EnclaveID: "enclave", - InitialRedisPeerDBTTL: time.Minute * 120, - RecurringRedisPeerDBTTL: time.Minute * 5, + EnclaveID: "enclave", + InitialRedisPeerDBTTL: time.Minute * 120, + RecurringRedisPeerDBTTL: time.Minute * 5, + LocalLivenessCheckPeriod: time.Minute, + LocalLivenessCheckTimeout: time.Minute, } } diff --git a/host/health/health.go b/host/health/health.go index 00c9625..e11d29b 100644 --- a/host/health/health.go +++ b/host/health/health.go @@ -7,26 +7,33 @@ import ( "fmt" "net/http" "sync" + + "github.com/signalapp/svr2/logger" ) // Health wraps an error (nil means "healthy"), and provides HTTP handling // logic to serve that error. type Health struct { - mu sync.Mutex - err error + mu sync.Mutex + name string + err error } // New creates a new health object, with initial health set based on the // 'initial' error (nil==healthy). -func New(initial error) *Health { - return &Health{err: initial} +func New(name string, initial error) *Health { + return &Health{name: name, err: initial} } // Set sets the underlying error for this Health object; err=nil means "OK" func (h *Health) Set(err error) { h.mu.Lock() + oldErr := h.err h.err = err h.mu.Unlock() + if (err == nil) != (oldErr == nil) { + logger.Infof("Setting health %q from [%v] -> [%v]", h.name, oldErr, err) + } } // ServeHTTP implements http.Handler. diff --git a/host/health/health_test.go b/host/health/health_test.go index 9445dbd..434415e 100644 --- a/host/health/health_test.go +++ b/host/health/health_test.go @@ -11,7 +11,7 @@ import ( ) func TestServingFromHealthy(t *testing.T) { - h := New(nil) + h := New("test", nil) ts := httptest.NewServer(h) defer ts.Close() res, err := http.Get(ts.URL) @@ -32,7 +32,7 @@ func TestServingFromHealthy(t *testing.T) { } func TestServingFromUnhealthy(t *testing.T) { - h := New(errors.New("FUBAR")) + h := New("test", errors.New("FUBAR")) ts := httptest.NewServer(h) defer ts.Close() res, err := http.Get(ts.URL) diff --git a/host/service/service.go b/host/service/service.go index 01cac65..f1d5093 100644 --- a/host/service/service.go +++ b/host/service/service.go @@ -44,7 +44,7 @@ func Start(ctx context.Context, hconfig *config.Config, authenticator auth.Auth, // Use DefaultServeMux, since it's got PProf stuff already attached by net/http/pprof. controlMux := http.DefaultServeMux healthErr := errors.New("joining raft") - live, ready := health.New(healthErr), health.New(healthErr) + live, ready := health.New("live", healthErr), health.New("ready", healthErr) controlMux.Handle("/health/live", middleware.Instrument(live)) controlMux.Handle("/health/ready", middleware.Instrument(ready)) g.Go(func() error { @@ -52,14 +52,14 @@ func Start(ctx context.Context, hconfig *config.Config, authenticator auth.Auth, return http.ListenAndServe(hconfig.ControlListenAddr, controlMux) }) - c, nodeID := enc.OutputMessages(), enc.PID() + enclaveMessages, nodeID := enc.OutputMessages(), enc.PID() logger.WithGlobal(zap.String("me", nodeID.String())) logger.Infow("created enclave") txGen := &util.TxGenerator{} - dispatcher := dispatch.New(hconfig.Raft, txGen, enc, c) + dispatcher := dispatch.New(hconfig.Raft, txGen, enc, enclaveMessages) // listen for peer network requests ln, err := net.Listen("tcp", hconfig.PeerAddr) @@ -110,8 +110,8 @@ func Start(ctx context.Context, hconfig *config.Config, authenticator auth.Auth, return http.ListenAndServe(hconfig.ClientListenAddr, clientMux) }) - // The enclave is up and the servers are serving, mark live. - live.Set(nil) + // The enclave is up and the servers are serving, start checking liveness. + g.Go(func() error { return livenessChecks(ctx, dispatcher, live, hconfig) }) // wait until we successfully create a raft group or join an existing one raftManager := raftmanager.New(nodeID, dispatcher, peerDB, hconfig) @@ -157,3 +157,46 @@ func Start(ctx context.Context, hconfig *config.Config, authenticator auth.Auth, return g.Wait() } + +func livenessChecks(ctx context.Context, dispatcher *dispatch.Dispatcher, live *health.Health, cfg *config.Config) error { + ticker := time.NewTicker(cfg.LocalLivenessCheckPeriod) + defer ticker.Stop() + live.Set(livenessCheck(ctx, dispatcher, cfg)) + for { + select { + case <-ctx.Done(): + live.Set(fmt.Errorf("livenessChecks context: %w", ctx.Err())) + return ctx.Err() + case <-ticker.C: + live.Set(livenessCheck(ctx, dispatcher, cfg)) + } + } +} + +func livenessCheck(ctx context.Context, dispatcher *dispatch.Dispatcher, cfg *config.Config) error { + // dispatcher.SendTransaction could block forever if something's wrong with the + // enclave... make sure to use a timeout! + dispatcherErr := make(chan error, 1) + ctx, cancel := context.WithTimeout(ctx, cfg.LocalLivenessCheckTimeout) + defer cancel() + go func() { + if resp, err := dispatcher.SendTransaction(&pb.HostToEnclaveRequest{ + Inner: &pb.HostToEnclaveRequest_GetEnclaveStatus{GetEnclaveStatus: true}, + }); err != nil { + // We were unable to talk to the enclave + dispatcherErr <- fmt.Errorf("dispatcher.SendTransaction: %w", err) + } else if s, ok := resp.Inner.(*pb.HostToEnclaveResponse_Status); ok { + // We were able to talk to the enclave, but it couldn't give us a status + dispatcherErr <- fmt.Errorf("HostToEnclaveResponse_Status: %w", s.Status) + } else { + // We could get a status from the enclave + dispatcherErr <- nil + } + }() + select { + case err := <-dispatcherErr: + return fmt.Errorf("dispatcherErr: %w", err) + case <-ctx.Done(): + return fmt.Errorf("livenessCheck context: %w", ctx.Err()) + } +}