Gate high-value operations on attestation from quorum timestamp
This commit is contained in:
parent
b6b8b459ac
commit
80829647c7
@ -98,6 +98,7 @@ static bool ContainsMe(const peerid::PeerID& me, const raft::ReplicaGroup& group
|
||||
|
||||
Core::Core(const enclaveconfig::RaftGroupConfig& group_config)
|
||||
: raft_config_template_(group_config),
|
||||
peers_attested_with_raft_quorum_timestamp_(false),
|
||||
db_version_(group_config.db_version()),
|
||||
db_protocol_(db::DB::P(group_config.db_version())),
|
||||
e2e_txn_id_(0) {
|
||||
@ -479,6 +480,7 @@ void Core::HandleCreateNewRaftGroupRequest(context::Context* ctx, internal::Tran
|
||||
raft_.loaded.UpdateLastAppliedLog(ctx, 0);
|
||||
GAUGE(core, last_index_applied_to_db)->Set(0);
|
||||
RaftStep(ctx);
|
||||
MaybeUpdateGroupTimeLocked(ctx);
|
||||
ReplyWithError(ctx, tx, error::OK);
|
||||
}
|
||||
|
||||
@ -495,6 +497,7 @@ void Core::HandleJoinRaft(context::Context* ctx, const JoinRaftRequest& msg, int
|
||||
return;
|
||||
}
|
||||
raft_.ClearState();
|
||||
peers_attested_with_raft_quorum_timestamp_ = false;
|
||||
raft_.state = svr2::RAFTSTATE_WAITING_FOR_FIRST_CONNECTION;
|
||||
raft_.waiting_for_first_connection = {
|
||||
.peer = peer,
|
||||
@ -633,6 +636,7 @@ void Core::RequestRaftReplication(context::Context* ctx) {
|
||||
}
|
||||
ACQUIRE_LOCK(raft_.mu, ctx, lock_core_raft);
|
||||
IDLOG(INFO) << "finished replicating database, fully loaded " << raft_.loading.db->row_count() << " rows";
|
||||
MaybeUpdateGroupTimeLocked(ctx);
|
||||
PromoteRaftToLoaded(ctx);
|
||||
});
|
||||
}
|
||||
@ -904,9 +908,9 @@ void Core::HandleTimerTick(context::Context* ctx, const TimerTick& tick) {
|
||||
auto time = tick.new_timestamp_unix_secs();
|
||||
clock_.SetLocalTime(time);
|
||||
GAUGE(core, current_local_time)->Set(time);
|
||||
MaybeUpdateGroupTime(ctx);
|
||||
timeout_.TimerTick(ctx);
|
||||
ACQUIRE_LOCK(raft_.mu, ctx, lock_core_raft);
|
||||
MaybeUpdateGroupTimeLocked(ctx);
|
||||
if (raft_.state == svr2::RAFTSTATE_LOADED_PART_OF_GROUP) {
|
||||
ConnectToRaftMembers(ctx);
|
||||
{
|
||||
@ -929,29 +933,51 @@ void Core::HandleTimerTick(context::Context* ctx, const TimerTick& tick) {
|
||||
}
|
||||
|
||||
void Core::MaybeUpdateGroupTime(context::Context* ctx) {
|
||||
MEASURE_CPU(ctx, cpu_core_updating_group_time);
|
||||
std::set<peerid::PeerID> peers = GroupTimeParticipants(ctx);
|
||||
auto ts = clock_.GetTime(ctx, peers);
|
||||
GAUGE(core, current_groupclock_time)->Set(ts);
|
||||
peer_manager_->SetPeerAttestationTimestamp(ctx, ts, raft_config_template_.attestation_timeout());
|
||||
ACQUIRE_LOCK(raft_.mu, ctx, lock_core_raft);
|
||||
MaybeUpdateGroupTimeLocked(ctx);
|
||||
}
|
||||
|
||||
std::set<peerid::PeerID> Core::GroupTimeParticipants(context::Context* ctx) {
|
||||
void Core::MaybeUpdateGroupTimeLocked(context::Context* ctx) {
|
||||
MEASURE_CPU(ctx, cpu_core_updating_group_time);
|
||||
// We loop here since GroupTimeParticipants is based on only connected
|
||||
// peers, but peer_manager_->SetPeerAttestationTimestamp can disconnect
|
||||
// peers. Note that it can only disconnect them, and only if they're
|
||||
// already connected. So this loop should not be infinite... it will
|
||||
// only keep looping if it disconnects peers, and there are a finite
|
||||
// (small) number of them.
|
||||
util::UnixSecs ts;
|
||||
do {
|
||||
size_t remote_peers_required_for_voting_quorum = SIZE_MAX;
|
||||
std::set<peerid::PeerID> peers = GroupTimeParticipants(ctx, &remote_peers_required_for_voting_quorum);
|
||||
size_t remote_peers_used = 0;
|
||||
ts = clock_.GetTime(ctx, peers, &remote_peers_used);
|
||||
GAUGE(core, current_groupclock_time)->Set(ts);
|
||||
peers_attested_with_raft_quorum_timestamp_ = remote_peers_used >= remote_peers_required_for_voting_quorum;
|
||||
LOG(DEBUG) << "MaybeUpdateGroupTime updating with time:" << ts << " remote_peers_used:" << remote_peers_used << " remote_peers_required_for_voting_quorum:" << remote_peers_required_for_voting_quorum;
|
||||
} while (peer_manager_->SetPeerAttestationTimestamp(
|
||||
ctx, ts, raft_config_template_.attestation_timeout()));
|
||||
}
|
||||
|
||||
std::set<peerid::PeerID> Core::GroupTimeParticipants(context::Context* ctx, size_t* remote_peers_required_for_voting_quorum) {
|
||||
std::set<peerid::PeerID> connected_peers = peer_manager_->ConnectedPeers(ctx);
|
||||
std::set<peerid::PeerID> voting_peers;
|
||||
{
|
||||
ACQUIRE_LOCK(raft_.mu, ctx, lock_core_raft);
|
||||
switch (raft_.state) {
|
||||
case RAFTSTATE_LOADED_PART_OF_GROUP:
|
||||
case RAFTSTATE_LOADED_REQUESTING_MEMBERSHIP:
|
||||
voting_peers = raft_.loaded.raft->membership().voting_replicas();
|
||||
break;
|
||||
case RAFTSTATE_LOADING:
|
||||
voting_peers = raft_.loading.mem->voting_replicas();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
*remote_peers_required_for_voting_quorum = SIZE_MAX;
|
||||
switch (raft_.state) {
|
||||
case RAFTSTATE_LOADED_PART_OF_GROUP:
|
||||
case RAFTSTATE_LOADED_REQUESTING_MEMBERSHIP:
|
||||
voting_peers = raft_.loaded.raft->membership().voting_replicas();
|
||||
*remote_peers_required_for_voting_quorum = voting_peers.size();
|
||||
if (raft_.loaded.raft->voting()) {
|
||||
// We're one of the voting members, and we will use our own timestamp.
|
||||
*remote_peers_required_for_voting_quorum -= 1;
|
||||
}
|
||||
break;
|
||||
case RAFTSTATE_LOADING:
|
||||
voting_peers = raft_.loading.mem->voting_replicas();
|
||||
*remote_peers_required_for_voting_quorum = voting_peers.size();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
// If we have no voting replicas, we always just use our own clock.
|
||||
@ -1203,9 +1229,10 @@ error::Error Core::HandleReplicateStateRequest(context::Context* ctx, const peer
|
||||
ACQUIRE_LOCK(raft_.mu, ctx, lock_core_raft);
|
||||
if (raft_.state != svr2::RAFTSTATE_LOADED_PART_OF_GROUP) {
|
||||
return SendE2EError(ctx, target, req.request_id(), COUNTED_ERROR(Replicate_RaftState));
|
||||
}
|
||||
if (msg.group_id() != raft_.loaded.raft->group_id()) {
|
||||
} else if (msg.group_id() != raft_.loaded.raft->group_id()) {
|
||||
return SendE2EError(ctx, target, req.request_id(), COUNTED_ERROR(Replicate_GroupMismatch));
|
||||
} else if (!peers_attested_with_raft_quorum_timestamp_) {
|
||||
return SendE2EError(ctx, target, req.request_id(), COUNTED_ERROR(Core_RefusingWithoutQuorumTimestamp));
|
||||
}
|
||||
// push_state will live for the duration of this replication.
|
||||
auto push_state = std::make_shared<Core::ReplicationPushState>(
|
||||
@ -1411,6 +1438,8 @@ error::Error Core::HandleRequestRaftMembership(context::Context* ctx, const peer
|
||||
ACQUIRE_LOCK(raft_.mu, ctx, lock_core_raft);
|
||||
if (raft_.state != svr2::RAFTSTATE_LOADED_PART_OF_GROUP) {
|
||||
return COUNTED_ERROR(Core_RaftState);
|
||||
} else if (!peers_attested_with_raft_quorum_timestamp_) {
|
||||
return COUNTED_ERROR(Core_RefusingWithoutQuorumTimestamp);
|
||||
}
|
||||
std::string peer_string = from.AsString();
|
||||
raft::ReplicaGroup g = raft_.loaded.raft->membership().AsProto();
|
||||
@ -1435,11 +1464,12 @@ error::Error Core::HandleRequestRaftVoting(context::Context* ctx, const peerid::
|
||||
ACQUIRE_LOCK(raft_.mu, ctx, lock_core_raft);
|
||||
if (raft_.state != svr2::RAFTSTATE_LOADED_PART_OF_GROUP) {
|
||||
return COUNTED_ERROR(Core_RaftState);
|
||||
}
|
||||
if (raft_.loaded.raft->membership().all_replicas().count(from) != 1) {
|
||||
} else if (raft_.loaded.raft->membership().all_replicas().count(from) != 1) {
|
||||
return COUNTED_ERROR(Core_VotingRequestedForNonMember);
|
||||
} else if (raft_.loaded.raft->membership().voting_replicas().count(from) != 0) {
|
||||
return COUNTED_ERROR(Core_VotingRequestedForVotingMember);
|
||||
} else if (!peers_attested_with_raft_quorum_timestamp_) {
|
||||
return COUNTED_ERROR(Core_RefusingWithoutQuorumTimestamp);
|
||||
}
|
||||
|
||||
// This does not respect the max_voting attribute of the RaftConfig. That's
|
||||
@ -1467,9 +1497,10 @@ error::Error Core::HandleRaftWrite(context::Context* ctx, const std::string& dat
|
||||
ACQUIRE_LOCK(raft_.mu, ctx, lock_core_raft);
|
||||
if (raft_.state != svr2::RAFTSTATE_LOADED_PART_OF_GROUP) {
|
||||
return COUNTED_ERROR(Core_RaftState);
|
||||
}
|
||||
if (raft_.loaded.raft->membership().voting_replicas().size() < raft_.loaded.group_config.min_voting_replicas()) {
|
||||
} else if (raft_.loaded.raft->membership().voting_replicas().size() < raft_.loaded.group_config.min_voting_replicas()) {
|
||||
return COUNTED_ERROR(Core_NotEnoughVotingReplicas);
|
||||
} else if (!peers_attested_with_raft_quorum_timestamp_) {
|
||||
return COUNTED_ERROR(Core_RefusingWithoutQuorumTimestamp);
|
||||
}
|
||||
auto log_entry = ctx->Protobuf<raft::LogEntry>();
|
||||
if (!log_entry->ParseFromString(data)) {
|
||||
@ -1809,6 +1840,9 @@ void Core::HandleRaftMembershipChange(
|
||||
CHECK(nullptr == "in HandleRaftMembershipChange but not part of group or requesting membership");
|
||||
break;
|
||||
}
|
||||
// A potential change in leadership may have affected the quorum of
|
||||
// voting replicas, which we use for time calculation.
|
||||
MaybeUpdateGroupTimeLocked(ctx);
|
||||
}
|
||||
|
||||
void Core::HandleRaftMinimumsChange(
|
||||
|
||||
@ -148,9 +148,10 @@ class Core {
|
||||
void HandleTimerTick(context::Context* ctx, const TimerTick& tick);
|
||||
// Update our group-based concept of time.
|
||||
void MaybeUpdateGroupTime(context::Context* ctx) EXCLUDES(raft_.mu);
|
||||
void MaybeUpdateGroupTimeLocked(context::Context* ctx) REQUIRES(raft_.mu);
|
||||
public_for_test:
|
||||
// Return the set of peers that should participate in a group time calculation
|
||||
std::set<peerid::PeerID> GroupTimeParticipants(context::Context* ctx) EXCLUDES(raft_.mu);
|
||||
// Return the set of peers that should participate in a group time calculation
|
||||
std::set<peerid::PeerID> GroupTimeParticipants(context::Context* ctx, size_t* remotes_required_for_voting_quorum) REQUIRES(raft_.mu);
|
||||
private:
|
||||
// If we're in Raft with some other replicas but don't yet have peer connections
|
||||
// to them, try to establish them.
|
||||
@ -254,6 +255,7 @@ class Core {
|
||||
merkle::Tree merkle_tree_ GUARDED_BY(raft_.mu);
|
||||
|
||||
internal::Raft raft_;
|
||||
bool peers_attested_with_raft_quorum_timestamp_ GUARDED_BY(raft_.mu);
|
||||
const enclaveconfig::DatabaseVersion db_version_;
|
||||
const db::DB::Protocol* const db_protocol_;
|
||||
groupclock::Clock clock_;
|
||||
|
||||
@ -2981,7 +2981,7 @@ TEST_F(CoreTest, DB4NewNodeReplication) {
|
||||
EXPECT_EQ(h1.commit_idx(), h2.commit_idx());
|
||||
}
|
||||
|
||||
TEST_F(CoreTest, GroupTimeParticipants) {
|
||||
TEST_F(CoreTest, GroupTimeParticipants) NO_THREAD_SAFETY_ANALYSIS {
|
||||
auto [core1, err1] = Core::Create(ctx, valid_init_config);
|
||||
ASSERT_EQ(err1, error::OK);
|
||||
auto [core2, err2] = Core::Create(ctx, valid_init_config);
|
||||
@ -2996,6 +2996,14 @@ TEST_F(CoreTest, GroupTimeParticipants) {
|
||||
cores[core2->ID()] = core2.get();
|
||||
cores[core3->ID()] = core3.get();
|
||||
|
||||
size_t remotes = 0;
|
||||
{
|
||||
LOG(INFO) << "\n\nPre-raft";
|
||||
context::Context ctx;
|
||||
std::set<peerid::PeerID> want{};
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx, &remotes));
|
||||
ASSERT_EQ(remotes, SIZE_MAX);
|
||||
}
|
||||
{
|
||||
LOG(INFO) << "\n\nSet up as one-replica Raft on core 1";
|
||||
UntrustedMessage msg;
|
||||
@ -3013,7 +3021,7 @@ TEST_F(CoreTest, GroupTimeParticipants) {
|
||||
ASSERT_EQ(resp.status(), error::OK);
|
||||
|
||||
std::set<peerid::PeerID> want{};
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx));
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx, &remotes));
|
||||
}
|
||||
|
||||
{
|
||||
@ -3034,7 +3042,7 @@ TEST_F(CoreTest, GroupTimeParticipants) {
|
||||
ASSERT_EQ(resp.status(), error::OK);
|
||||
|
||||
std::set<peerid::PeerID> want{}; // core2 isn't voting yet
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx));
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx, &remotes));
|
||||
}
|
||||
|
||||
{
|
||||
@ -3054,7 +3062,7 @@ TEST_F(CoreTest, GroupTimeParticipants) {
|
||||
ASSERT_EQ(resp.status(), error::OK);
|
||||
|
||||
std::set<peerid::PeerID> want{ core2->ID() };
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx));
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx, &remotes));
|
||||
}
|
||||
|
||||
{
|
||||
@ -3076,7 +3084,7 @@ TEST_F(CoreTest, GroupTimeParticipants) {
|
||||
|
||||
// core3 has joined but is not voting, use only core2.
|
||||
std::set<peerid::PeerID> want{ core2->ID() };
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx));
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx, &remotes));
|
||||
}
|
||||
|
||||
{
|
||||
@ -3099,7 +3107,7 @@ TEST_F(CoreTest, GroupTimeParticipants) {
|
||||
// It should fall back to using all connected peers, which in this
|
||||
// case is core3.
|
||||
std::set<peerid::PeerID> want{ }; // core3 is not voting
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx));
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx, &remotes));
|
||||
}
|
||||
|
||||
{
|
||||
@ -3121,7 +3129,7 @@ TEST_F(CoreTest, GroupTimeParticipants) {
|
||||
// core1 has disconnected from all cores, it should just use
|
||||
// its own local time.
|
||||
std::set<peerid::PeerID> want{};
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx));
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx, &remotes));
|
||||
}
|
||||
|
||||
{
|
||||
@ -3143,7 +3151,7 @@ TEST_F(CoreTest, GroupTimeParticipants) {
|
||||
// core1 has disconnected from all cores, it should just use
|
||||
// its own local time.
|
||||
std::set<peerid::PeerID> want{ }; // core3 is not voting
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx));
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx, &remotes));
|
||||
}
|
||||
|
||||
{
|
||||
@ -3165,7 +3173,7 @@ TEST_F(CoreTest, GroupTimeParticipants) {
|
||||
// core1 has disconnected from all cores, it should just use
|
||||
// its own local time.
|
||||
std::set<peerid::PeerID> want{ core2->ID() };
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx));
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx, &remotes));
|
||||
}
|
||||
|
||||
{
|
||||
@ -3185,7 +3193,7 @@ TEST_F(CoreTest, GroupTimeParticipants) {
|
||||
ASSERT_EQ(resp.status(), error::OK);
|
||||
|
||||
std::set<peerid::PeerID> want{ core2->ID(), core3->ID() };
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx));
|
||||
ASSERT_EQ(want, core1->GroupTimeParticipants(&ctx, &remotes));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -18,7 +18,7 @@ void Clock::SetRemoteTime(context::Context* ctx, const peerid::PeerID& peer, uti
|
||||
remotes_[peer] = secs;
|
||||
}
|
||||
|
||||
util::UnixSecs Clock::GetTime(context::Context* ctx, const std::set<peerid::PeerID>& remotes) const {
|
||||
util::UnixSecs Clock::GetTime(context::Context* ctx, const std::set<peerid::PeerID>& remotes, size_t* remotes_used) const {
|
||||
std::vector<util::UnixSecs> secs(1 /* local_ */ + remotes.size());
|
||||
ACQUIRE_LOCK(mu_, ctx, lock_groupclock);
|
||||
auto set_iter = remotes.begin();
|
||||
@ -39,6 +39,7 @@ util::UnixSecs Clock::GetTime(context::Context* ctx, const std::set<peerid::Peer
|
||||
}
|
||||
}
|
||||
secs.resize(secs_size);
|
||||
*remotes_used = secs_size - 1;
|
||||
// `secs` now contains a list of my timestamp and the timestamps of all
|
||||
// peers in `remotes` that we've received a timestamp from. Get the median.
|
||||
std::sort(secs.begin(), secs.end());
|
||||
|
||||
@ -23,7 +23,7 @@ class Clock {
|
||||
Clock() : local_(0) {};
|
||||
void SetLocalTime(util::UnixSecs secs);
|
||||
void SetRemoteTime(context::Context* ctx, const peerid::PeerID& peer, util::UnixSecs secs) EXCLUDES(mu_);
|
||||
util::UnixSecs GetTime(context::Context* ctx, const std::set<peerid::PeerID>& remotes) const EXCLUDES(mu_);
|
||||
util::UnixSecs GetTime(context::Context* ctx, const std::set<peerid::PeerID>& remotes, size_t* remotes_used) const EXCLUDES(mu_);
|
||||
util::UnixSecs GetLocalTime() const;
|
||||
|
||||
private:
|
||||
|
||||
@ -31,25 +31,36 @@ class ClockTest : public ::testing::Test {
|
||||
|
||||
TEST_F(ClockTest, BasicUsage) {
|
||||
Clock c;
|
||||
EXPECT_EQ(0, c.GetTime(&ctx, std::set<peerid::PeerID>{}));
|
||||
size_t remotes_used = 0;
|
||||
EXPECT_EQ(0, c.GetTime(&ctx, std::set<peerid::PeerID>{}, &remotes_used));
|
||||
EXPECT_EQ(0, remotes_used);
|
||||
c.SetLocalTime(1000);
|
||||
EXPECT_EQ(1000, c.GetTime(&ctx, std::set<peerid::PeerID>{}));
|
||||
EXPECT_EQ(1000, c.GetTime(&ctx, std::set<peerid::PeerID>{}, &remotes_used));
|
||||
EXPECT_EQ(0, remotes_used);
|
||||
peerid::PeerID p1((uint8_t[32]){1});
|
||||
peerid::PeerID p2((uint8_t[32]){2});
|
||||
peerid::PeerID p3((uint8_t[32]){3});
|
||||
peerid::PeerID p4((uint8_t[32]){4});
|
||||
EXPECT_EQ(1000, c.GetTime(&ctx, std::set<peerid::PeerID>{p1}, &remotes_used));
|
||||
EXPECT_EQ(0, remotes_used);
|
||||
c.SetRemoteTime(&ctx, p1, 1001);
|
||||
c.SetRemoteTime(&ctx, p2, 1002);
|
||||
c.SetRemoteTime(&ctx, p3, 1003);
|
||||
c.SetRemoteTime(&ctx, p4, 1004);
|
||||
EXPECT_EQ(1001, c.GetTime(&ctx, std::set<peerid::PeerID>{p1}));
|
||||
EXPECT_EQ(1001, c.GetTime(&ctx, std::set<peerid::PeerID>{p1, p2}));
|
||||
EXPECT_EQ(1002, c.GetTime(&ctx, std::set<peerid::PeerID>{p1, p2, p3}));
|
||||
EXPECT_EQ(1002, c.GetTime(&ctx, std::set<peerid::PeerID>{p1, p2, p3, p4}));
|
||||
EXPECT_EQ(1001, c.GetTime(&ctx, std::set<peerid::PeerID>{p1}, &remotes_used));
|
||||
EXPECT_EQ(1, remotes_used);
|
||||
EXPECT_EQ(1001, c.GetTime(&ctx, std::set<peerid::PeerID>{p1, p2}, &remotes_used));
|
||||
EXPECT_EQ(2, remotes_used);
|
||||
EXPECT_EQ(1002, c.GetTime(&ctx, std::set<peerid::PeerID>{p1, p2, p3}, &remotes_used));
|
||||
EXPECT_EQ(3, remotes_used);
|
||||
EXPECT_EQ(1002, c.GetTime(&ctx, std::set<peerid::PeerID>{p1, p2, p3, p4}, &remotes_used));
|
||||
EXPECT_EQ(4, remotes_used);
|
||||
c.SetLocalTime(1005);
|
||||
EXPECT_EQ(1003, c.GetTime(&ctx, std::set<peerid::PeerID>{p1, p2, p3, p4}));
|
||||
EXPECT_EQ(1003, c.GetTime(&ctx, std::set<peerid::PeerID>{p1, p2, p3, p4}, &remotes_used));
|
||||
EXPECT_EQ(4, remotes_used);
|
||||
c.SetRemoteTime(&ctx, p1, 1004);
|
||||
EXPECT_EQ(1004, c.GetTime(&ctx, std::set<peerid::PeerID>{p1, p2, p3, p4}));
|
||||
EXPECT_EQ(1004, c.GetTime(&ctx, std::set<peerid::PeerID>{p1, p2, p3, p4}, &remotes_used));
|
||||
EXPECT_EQ(4, remotes_used);
|
||||
}
|
||||
|
||||
} // namespace svr2::groupclock
|
||||
|
||||
@ -83,7 +83,9 @@ void Gauge::Clear() {
|
||||
namespace internal {
|
||||
error::Error RecordError(error::Error e, const char* file, int line) {
|
||||
LOG(VERBOSE) << e << " @ " << file << ":" << line;
|
||||
recorded_errors[e].fetch_add(1);
|
||||
if (0 == recorded_errors[e].fetch_add(1)) {
|
||||
LOG(INFO) << "First time seeing error: " << e;
|
||||
}
|
||||
return e;
|
||||
}
|
||||
|
||||
|
||||
@ -451,7 +451,7 @@ error::Error Peer::CheckNextAttestation(context::Context* ctx, const e2e::Attest
|
||||
return error::OK;
|
||||
}
|
||||
|
||||
void Peer::MaybeDisconnectIfAttestationTooOld(context::Context* ctx, util::UnixSecs now, util::UnixSecs attestation_timeout) {
|
||||
bool Peer::MaybeDisconnectIfAttestationTooOld(context::Context* ctx, util::UnixSecs now, util::UnixSecs attestation_timeout) {
|
||||
ACQUIRE_LOCK(mu_, ctx, lock_peer);
|
||||
auto state = InternalCurrentState();
|
||||
if (// If we're already disconnected ...
|
||||
@ -461,11 +461,12 @@ void Peer::MaybeDisconnectIfAttestationTooOld(context::Context* ctx, util::UnixS
|
||||
// ... or we're connecting and we haven't yet received a synack with an attestation ...
|
||||
(state == PEER_CONNECTING && last_attestation_ == 0)) {
|
||||
// ... then there's no need for us to disconnect due to attestation timestamp.
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
LOG(WARNING) << "Attestation for " << id_ << " too old (ts=" << last_attestation_ << ", now=" << now << "), disconnecting";
|
||||
InternalDisconnect();
|
||||
SendRst(ctx, id_);
|
||||
return true;
|
||||
}
|
||||
|
||||
void Peer::PopulateConnectionStatus(context::Context* ctx, ConnectionStatus* status) const {
|
||||
@ -716,17 +717,21 @@ std::set<peerid::PeerID> PeerManager::AllPeers(context::Context* ctx) const {
|
||||
return out;
|
||||
}
|
||||
|
||||
void PeerManager::SetPeerAttestationTimestamp(context::Context* ctx, util::UnixSecs secs, util::UnixSecs attestation_timeout) {
|
||||
bool PeerManager::SetPeerAttestationTimestamp(context::Context* ctx, util::UnixSecs secs, util::UnixSecs attestation_timeout) {
|
||||
auto old_secs = time_.exchange(secs);
|
||||
if (old_secs == secs) {
|
||||
return;
|
||||
return false;
|
||||
} else if (old_secs > secs) {
|
||||
LOG(WARNING) << "PeerManager timestamp went backwards: " << old_secs << " -> " << secs;
|
||||
}
|
||||
ACQUIRE_LOCK(mu_, ctx, lock_peermanager);
|
||||
bool any_peers_disconnected = false;
|
||||
for (auto iter = peers_.begin(); iter != peers_.end(); ++iter) {
|
||||
iter->second->MaybeDisconnectIfAttestationTooOld(ctx, secs, attestation_timeout);
|
||||
if (iter->second->MaybeDisconnectIfAttestationTooOld(ctx, secs, attestation_timeout)) {
|
||||
any_peers_disconnected = true;
|
||||
}
|
||||
}
|
||||
return any_peers_disconnected;
|
||||
}
|
||||
|
||||
void PeerManager::MinimumsUpdated(context::Context* ctx) {
|
||||
|
||||
@ -102,7 +102,7 @@ class Peer {
|
||||
// Disconnect the peer.
|
||||
void Disconnect(context::Context* ctx) EXCLUDES(mu_);
|
||||
// Disconnect the peer if its attestation timestamp is out of date.
|
||||
void MaybeDisconnectIfAttestationTooOld(context::Context* ctx, util::UnixSecs now, util::UnixSecs attestation_timeout) EXCLUDES(mu_);
|
||||
bool MaybeDisconnectIfAttestationTooOld(context::Context* ctx, util::UnixSecs now, util::UnixSecs attestation_timeout) EXCLUDES(mu_);
|
||||
|
||||
PeerState CurrentState(context::Context* ctx) const EXCLUDES(mu_);
|
||||
void PopulateConnectionStatus(context::Context* ctx, ConnectionStatus* status) const EXCLUDES(mu_);
|
||||
@ -236,7 +236,8 @@ class PeerManager {
|
||||
std::set<peerid::PeerID> AllPeers(context::Context* ctx) const;
|
||||
void PeerStatus(context::Context* ctx, const peerid::PeerID& id, ConnectionStatus* status) const;
|
||||
|
||||
void SetPeerAttestationTimestamp(context::Context* ctx, util::UnixSecs secs, util::UnixSecs attestation_timeout) EXCLUDES(mu_);
|
||||
// Returns true if any peers were disconnected due to the attestation timestamp change.
|
||||
bool SetPeerAttestationTimestamp(context::Context* ctx, util::UnixSecs secs, util::UnixSecs attestation_timeout) EXCLUDES(mu_);
|
||||
util::UnixSecs CurrentTime() const { return time_.load(); }
|
||||
|
||||
void MinimumsUpdated(context::Context* ctx) EXCLUDES(mu_);
|
||||
|
||||
@ -59,6 +59,7 @@ enum Error {
|
||||
Core_SerializeRaftWrite = 241;
|
||||
Core_DeserializeRaftWrite = 242;
|
||||
Core_E2ETransactionReset = 243;
|
||||
Core_RefusingWithoutQuorumTimestamp = 244;
|
||||
|
||||
Env_NS = 300;
|
||||
Env_SerializeCustomClaims = 301;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user