Add Control mode to print out status of all peers.
This commit is contained in:
parent
637998c8f8
commit
9b515b97ba
@ -6,8 +6,12 @@ package main
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/signalapp/svr2/peerid"
|
||||
"github.com/signalapp/svr2/web/client"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
"google.golang.org/protobuf/proto"
|
||||
@ -18,6 +22,7 @@ import (
|
||||
var (
|
||||
addr = flag.String("addr", "localhost:8081", "Address (hostname:port) where control server is listening")
|
||||
binary = flag.Bool("bin", false, "If true, assume a binary formatted proto file. Otherwise, protojson")
|
||||
mode = flag.String("mode", "command", "One of 'command' or 'status'")
|
||||
)
|
||||
|
||||
func main() {
|
||||
@ -27,6 +32,19 @@ func main() {
|
||||
flag.PrintDefaults()
|
||||
}
|
||||
flag.Parse()
|
||||
cc := &client.ControlClient{Addr: *addr}
|
||||
switch *mode {
|
||||
case "command":
|
||||
sendCommand(cc)
|
||||
case "status":
|
||||
if err := getStatus(cc); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
log.Println("success")
|
||||
}
|
||||
}
|
||||
|
||||
func sendCommand(cc *client.ControlClient) {
|
||||
if flag.NArg() == 0 {
|
||||
flag.Usage()
|
||||
os.Exit(1)
|
||||
@ -38,7 +56,6 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
cc := client.ControlClient{Addr: *addr}
|
||||
resp, err := cc.DoJSON(bs)
|
||||
if err != nil {
|
||||
fmt.Fprint(os.Stderr, err)
|
||||
@ -71,3 +88,64 @@ func requestBody(filename string) ([]byte, error) {
|
||||
}
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
func getStatus(cc *client.ControlClient) error {
|
||||
peers := map[peerid.PeerID]*pb.PeerEntry{}
|
||||
peerResp, err := cc.Peers()
|
||||
if err != nil {
|
||||
log.Printf("Unable to get peers: %v", err)
|
||||
} else {
|
||||
for _, p := range peerResp.Entries {
|
||||
pid, err := peerid.Make(p.Id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Invalid peer ID %x: %v", p.Id, err)
|
||||
}
|
||||
peers[pid] = p.Entry
|
||||
}
|
||||
}
|
||||
resp, err := cc.Do(&pb.HostToEnclaveRequest{
|
||||
Inner: &pb.HostToEnclaveRequest_GetEnclaveStatus{GetEnclaveStatus: true},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting status: %w", err)
|
||||
}
|
||||
status := resp.Inner.(*pb.HostToEnclaveResponse_GetEnclaveStatusReply).GetEnclaveStatusReply
|
||||
log.Printf("Status: %v\n", status)
|
||||
fmt.Printf("Raft state: %v\n", status.RaftState)
|
||||
fmt.Printf("\n")
|
||||
fmt.Printf("PeerID,Addr,Role,ConnState,Hostname\n")
|
||||
for _, peer := range status.Peers {
|
||||
pid, err := peerid.Make(peer.PeerId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid status peer ID %x: %v", peer.PeerId, err)
|
||||
}
|
||||
role := "unknown"
|
||||
switch {
|
||||
case peer.IsLeader:
|
||||
role = "LEADER"
|
||||
case peer.IsVoting:
|
||||
role = "VOTER"
|
||||
case peer.InRaft:
|
||||
role = "non-voter"
|
||||
default:
|
||||
role = "none"
|
||||
}
|
||||
connectionState := "unknown"
|
||||
switch {
|
||||
case peer.Me:
|
||||
connectionState = "ME"
|
||||
case peer.ConnectionStatus != nil:
|
||||
connectionState = peer.ConnectionStatus.State.String()
|
||||
}
|
||||
addr := "unknown"
|
||||
hostname := "unknown"
|
||||
if entry := peers[pid]; entry != nil {
|
||||
addr = entry.Addr
|
||||
if name, err := net.LookupAddr(strings.Split(addr, ":")[0]); err != nil && len(name) > 0 {
|
||||
hostname = name[0]
|
||||
}
|
||||
}
|
||||
fmt.Printf("%s,%s,%s,%s,%s\n", pid, addr, role, connectionState, hostname)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -134,7 +134,7 @@ func betterThan(e1 *pb.PeerEntry, e2 *pb.PeerEntry) bool {
|
||||
func (p *PeerDB) FindRaftMember(ctx context.Context, me peerid.PeerID, localPeerAddr string) (peerid.PeerID, error) {
|
||||
// retry until we find an eligible peer or we acquire the exclusive creation lock
|
||||
return util.RetrySupplierWithBackoff(ctx, func() (peerid.PeerID, error) {
|
||||
peers, err := p.list(ctx)
|
||||
peers, err := p.List(ctx)
|
||||
if err != nil {
|
||||
logger.Infow("failed to fetch raft members", "err", err)
|
||||
return peerid.PeerID{}, err
|
||||
@ -184,8 +184,8 @@ func (p *PeerDB) acquireCreationLock(ctx context.Context, me peerid.PeerID) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// list fetches all the peers in the database
|
||||
func (p *PeerDB) list(ctx context.Context) (map[peerid.PeerID]*pb.PeerEntry, error) {
|
||||
// List fetches all the peers in the database
|
||||
func (p *PeerDB) List(ctx context.Context) (map[peerid.PeerID]*pb.PeerEntry, error) {
|
||||
var mu sync.Mutex
|
||||
var shardResults [][]string
|
||||
|
||||
|
||||
@ -9,9 +9,17 @@ option optimize_for = LITE_RUNTIME;
|
||||
|
||||
|
||||
message PeerEntry {
|
||||
int64 join_ts = 1;
|
||||
int64 last_update_ts = 2;
|
||||
string addr = 3; // hostname:port that other nodes can use to access this peer
|
||||
bool raft_member = 4; // if true, this peer is a raft member
|
||||
bool leader = 5; // if true, the peer thinks it is the leader
|
||||
int64 join_ts = 1;
|
||||
int64 last_update_ts = 2;
|
||||
string addr = 3; // hostname:port that other nodes can use to access this peer
|
||||
bool raft_member = 4; // if true, this peer is a raft member
|
||||
bool leader = 5; // if true, the peer thinks it is the leader
|
||||
}
|
||||
|
||||
message PeerMap {
|
||||
message Entry {
|
||||
bytes id = 1;
|
||||
PeerEntry entry = 2;
|
||||
}
|
||||
repeated Entry entries = 1;
|
||||
}
|
||||
|
||||
@ -104,6 +104,7 @@ func Start(ctx context.Context, hconfig *config.Config, authenticator auth.Auth,
|
||||
// control endpoints
|
||||
controlMux.Handle("/control/loglevel", middleware.Instrument(handlers.NewSetLogLevel(hconfig, dispatcher)))
|
||||
controlMux.Handle("/control", middleware.Instrument(handlers.NewControl(dispatcher)))
|
||||
controlMux.Handle("/control/peers", middleware.Instrument(handlers.NewPeers(peerDB)))
|
||||
|
||||
g.Go(func() error {
|
||||
logger.Infof("Starting client http server on %v", hconfig.ClientListenAddr)
|
||||
|
||||
@ -26,6 +26,27 @@ func (cc *ControlClient) Do(request *pb.HostToEnclaveRequest) (*pb.HostToEnclave
|
||||
return cc.DoJSON(bs)
|
||||
}
|
||||
|
||||
func (cc *ControlClient) Peers() (*pb.PeerMap, error) {
|
||||
url := fmt.Sprintf("http://%v/control/peers", cc.Addr)
|
||||
resp, err := http.Get(url)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("requesting peers via GET: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("requesting peers via GET: status=%v", resp.StatusCode)
|
||||
}
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading body: %v", err)
|
||||
}
|
||||
pbResponse := pb.PeerMap{}
|
||||
if err := protojson.Unmarshal(body, &pbResponse); err != nil {
|
||||
return nil, fmt.Errorf("could not parse server response, body=%s : %w", body, err)
|
||||
}
|
||||
return &pbResponse, nil
|
||||
}
|
||||
|
||||
func (cc *ControlClient) DoJSON(request []byte) (*pb.HostToEnclaveResponse, error) {
|
||||
url := fmt.Sprintf("http://%v/control", cc.Addr)
|
||||
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(request))
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"net/http"
|
||||
|
||||
"github.com/signalapp/svr2/logger"
|
||||
"github.com/signalapp/svr2/peer/peerdb"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
|
||||
pb "github.com/signalapp/svr2/proto"
|
||||
@ -77,3 +78,26 @@ func (c *controlHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
logger.Warnw("error writing control response", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
func NewPeers(peerDB *peerdb.PeerDB) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
peers, err := peerDB.List(r.Context())
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("failed to list peers: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var resp pb.PeerMap
|
||||
for id, entry := range peers {
|
||||
resp.Entries = append(resp.Entries, &pb.PeerMap_Entry{Id: id[:], Entry: entry})
|
||||
}
|
||||
out, err := protojson.Marshal(&resp)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("marshaling JSON: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
if _, err := w.Write(out); err != nil {
|
||||
logger.Warnw("error writing control response", "err", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user