Add regular liveness checks to service.
This commit is contained in:
parent
db55ad3dbf
commit
fd4aa751dd
@ -43,6 +43,9 @@ type Config struct {
|
||||
RecurringRedisPeerDBTTL time.Duration `yaml:"recurringRedisPeerDBTTL"`
|
||||
// Configuration for the client websocket handler
|
||||
Request RequestConfig `yaml:"request"`
|
||||
// Periodicity/timeout for local liveness checks
|
||||
LocalLivenessCheckPeriod time.Duration `yaml:"localLivenessCheckPeriod"`
|
||||
LocalLivenessCheckTimeout time.Duration `yaml:"localLivenessCheckTimeout"`
|
||||
}
|
||||
|
||||
// validate returns a list of validation errors, or empty if there are no errors.
|
||||
@ -133,8 +136,10 @@ func Default() *Config {
|
||||
WebsocketHandshakeTimeout: time.Second * 30,
|
||||
SocketTimeout: time.Second * 30,
|
||||
},
|
||||
EnclaveID: "enclave",
|
||||
InitialRedisPeerDBTTL: time.Minute * 120,
|
||||
RecurringRedisPeerDBTTL: time.Minute * 5,
|
||||
EnclaveID: "enclave",
|
||||
InitialRedisPeerDBTTL: time.Minute * 120,
|
||||
RecurringRedisPeerDBTTL: time.Minute * 5,
|
||||
LocalLivenessCheckPeriod: time.Minute,
|
||||
LocalLivenessCheckTimeout: time.Minute,
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,26 +7,33 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/signalapp/svr2/logger"
|
||||
)
|
||||
|
||||
// Health wraps an error (nil means "healthy"), and provides HTTP handling
|
||||
// logic to serve that error.
|
||||
type Health struct {
|
||||
mu sync.Mutex
|
||||
err error
|
||||
mu sync.Mutex
|
||||
name string
|
||||
err error
|
||||
}
|
||||
|
||||
// New creates a new health object, with initial health set based on the
|
||||
// 'initial' error (nil==healthy).
|
||||
func New(initial error) *Health {
|
||||
return &Health{err: initial}
|
||||
func New(name string, initial error) *Health {
|
||||
return &Health{name: name, err: initial}
|
||||
}
|
||||
|
||||
// Set sets the underlying error for this Health object; err=nil means "OK"
|
||||
func (h *Health) Set(err error) {
|
||||
h.mu.Lock()
|
||||
oldErr := h.err
|
||||
h.err = err
|
||||
h.mu.Unlock()
|
||||
if (err == nil) != (oldErr == nil) {
|
||||
logger.Infof("Setting health %q from [%v] -> [%v]", h.name, oldErr, err)
|
||||
}
|
||||
}
|
||||
|
||||
// ServeHTTP implements http.Handler.
|
||||
|
||||
@ -11,7 +11,7 @@ import (
|
||||
)
|
||||
|
||||
func TestServingFromHealthy(t *testing.T) {
|
||||
h := New(nil)
|
||||
h := New("test", nil)
|
||||
ts := httptest.NewServer(h)
|
||||
defer ts.Close()
|
||||
res, err := http.Get(ts.URL)
|
||||
@ -32,7 +32,7 @@ func TestServingFromHealthy(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestServingFromUnhealthy(t *testing.T) {
|
||||
h := New(errors.New("FUBAR"))
|
||||
h := New("test", errors.New("FUBAR"))
|
||||
ts := httptest.NewServer(h)
|
||||
defer ts.Close()
|
||||
res, err := http.Get(ts.URL)
|
||||
|
||||
@ -44,7 +44,7 @@ func Start(ctx context.Context, hconfig *config.Config, authenticator auth.Auth,
|
||||
// Use DefaultServeMux, since it's got PProf stuff already attached by net/http/pprof.
|
||||
controlMux := http.DefaultServeMux
|
||||
healthErr := errors.New("joining raft")
|
||||
live, ready := health.New(healthErr), health.New(healthErr)
|
||||
live, ready := health.New("live", healthErr), health.New("ready", healthErr)
|
||||
controlMux.Handle("/health/live", middleware.Instrument(live))
|
||||
controlMux.Handle("/health/ready", middleware.Instrument(ready))
|
||||
g.Go(func() error {
|
||||
@ -52,14 +52,14 @@ func Start(ctx context.Context, hconfig *config.Config, authenticator auth.Auth,
|
||||
return http.ListenAndServe(hconfig.ControlListenAddr, controlMux)
|
||||
})
|
||||
|
||||
c, nodeID := enc.OutputMessages(), enc.PID()
|
||||
enclaveMessages, nodeID := enc.OutputMessages(), enc.PID()
|
||||
|
||||
logger.WithGlobal(zap.String("me", nodeID.String()))
|
||||
logger.Infow("created enclave")
|
||||
|
||||
txGen := &util.TxGenerator{}
|
||||
|
||||
dispatcher := dispatch.New(hconfig.Raft, txGen, enc, c)
|
||||
dispatcher := dispatch.New(hconfig.Raft, txGen, enc, enclaveMessages)
|
||||
|
||||
// listen for peer network requests
|
||||
ln, err := net.Listen("tcp", hconfig.PeerAddr)
|
||||
@ -110,8 +110,8 @@ func Start(ctx context.Context, hconfig *config.Config, authenticator auth.Auth,
|
||||
return http.ListenAndServe(hconfig.ClientListenAddr, clientMux)
|
||||
})
|
||||
|
||||
// The enclave is up and the servers are serving, mark live.
|
||||
live.Set(nil)
|
||||
// The enclave is up and the servers are serving, start checking liveness.
|
||||
g.Go(func() error { return livenessChecks(ctx, dispatcher, live, hconfig) })
|
||||
|
||||
// wait until we successfully create a raft group or join an existing one
|
||||
raftManager := raftmanager.New(nodeID, dispatcher, peerDB, hconfig)
|
||||
@ -157,3 +157,46 @@ func Start(ctx context.Context, hconfig *config.Config, authenticator auth.Auth,
|
||||
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
func livenessChecks(ctx context.Context, dispatcher *dispatch.Dispatcher, live *health.Health, cfg *config.Config) error {
|
||||
ticker := time.NewTicker(cfg.LocalLivenessCheckPeriod)
|
||||
defer ticker.Stop()
|
||||
live.Set(livenessCheck(ctx, dispatcher, cfg))
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
live.Set(fmt.Errorf("livenessChecks context: %w", ctx.Err()))
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
live.Set(livenessCheck(ctx, dispatcher, cfg))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func livenessCheck(ctx context.Context, dispatcher *dispatch.Dispatcher, cfg *config.Config) error {
|
||||
// dispatcher.SendTransaction could block forever if something's wrong with the
|
||||
// enclave... make sure to use a timeout!
|
||||
dispatcherErr := make(chan error, 1)
|
||||
ctx, cancel := context.WithTimeout(ctx, cfg.LocalLivenessCheckTimeout)
|
||||
defer cancel()
|
||||
go func() {
|
||||
if resp, err := dispatcher.SendTransaction(&pb.HostToEnclaveRequest{
|
||||
Inner: &pb.HostToEnclaveRequest_GetEnclaveStatus{GetEnclaveStatus: true},
|
||||
}); err != nil {
|
||||
// We were unable to talk to the enclave
|
||||
dispatcherErr <- fmt.Errorf("dispatcher.SendTransaction: %w", err)
|
||||
} else if s, ok := resp.Inner.(*pb.HostToEnclaveResponse_Status); ok {
|
||||
// We were able to talk to the enclave, but it couldn't give us a status
|
||||
dispatcherErr <- fmt.Errorf("HostToEnclaveResponse_Status: %w", s.Status)
|
||||
} else {
|
||||
// We could get a status from the enclave
|
||||
dispatcherErr <- nil
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case err := <-dispatcherErr:
|
||||
return fmt.Errorf("dispatcherErr: %w", err)
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("livenessCheck context: %w", ctx.Err())
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user