1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#![forbid(unsafe_code)]
use crate::health::{Event, HealthCheck, HealthCheckContext, ValidatorEvent};
use async_trait::async_trait;
use std::collections::{hash_map::Entry, HashMap, HashSet};
type EpochAndRound = (u64, u64);
#[derive(Default)]
pub struct CommitHistoryHealthCheck {
epoch_round_to_commit: HashMap<EpochAndRound, CommitAndValidators>,
latest_committed_epoch_round: HashMap<String, EpochAndRound>,
}
struct CommitAndValidators {
pub hash: String,
pub validators: HashSet<String>,
}
impl CommitHistoryHealthCheck {
pub fn new() -> Self {
Default::default()
}
}
#[async_trait]
impl HealthCheck for CommitHistoryHealthCheck {
fn on_event(&mut self, ve: &ValidatorEvent, ctx: &mut HealthCheckContext) {
let commit = if let Event::Commit(ref commit) = ve.event {
commit
} else {
return;
};
let round_to_commit = self.epoch_round_to_commit.entry(commit.epoch_and_round());
match round_to_commit {
Entry::Occupied(mut oe) => {
let commit_and_validators = oe.get_mut();
if commit_and_validators.hash != commit.commit {
ctx.report_failure(
ve.validator.clone(),
format!(
"produced contradicting commit {} at epoch_round {:?}, expected: {}",
commit.commit,
commit.epoch_and_round(),
commit_and_validators.hash
),
);
} else {
commit_and_validators
.validators
.insert(ve.validator.clone());
}
}
Entry::Vacant(va) => {
let mut validators = HashSet::new();
validators.insert(ve.validator.clone());
va.insert(CommitAndValidators {
hash: commit.commit.clone(),
validators,
});
}
}
let latest_committed_round = self
.latest_committed_epoch_round
.entry(ve.validator.clone());
match latest_committed_round {
Entry::Occupied(mut oe) => {
let previous_epoch_and_round = *oe.get();
if previous_epoch_and_round > commit.epoch_and_round() {
ctx.report_failure(
ve.validator.clone(),
format!(
"committed epoch and round {:?} after committing {:?}",
commit.epoch_and_round(),
previous_epoch_and_round
),
);
}
oe.insert(commit.epoch_and_round());
}
Entry::Vacant(va) => {
va.insert(commit.epoch_and_round());
}
}
if let Some(min_round) = self.latest_committed_epoch_round.values().min() {
self.epoch_round_to_commit.retain(|k, _v| *k >= *min_round);
}
}
async fn verify(&mut self, _ctx: &mut HealthCheckContext) {}
fn clear(&mut self) {
self.epoch_round_to_commit.clear();
self.latest_committed_epoch_round.clear();
}
fn name(&self) -> &'static str {
"commit_check"
}
}