1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#![forbid(unsafe_code)]
use crate::{
cluster::Cluster,
health::{Event, HealthCheck, HealthCheckContext, ValidatorEvent},
};
use async_trait::async_trait;
use std::{collections::HashMap, time::Duration};
pub struct LivenessHealthCheck {
last_committed: HashMap<String, LastCommitInfo>,
}
const MAX_BEHIND: Duration = Duration::from_secs(120);
#[derive(Default)]
struct LastCommitInfo {
ve: Option<ValidatorEvent>,
timestamp: Duration,
}
impl LivenessHealthCheck {
pub fn new(cluster: &Cluster) -> Self {
let mut last_committed = HashMap::new();
for instance in cluster.validator_instances() {
last_committed.insert(instance.peer_name().clone(), LastCommitInfo::default());
}
Self { last_committed }
}
}
#[async_trait]
impl HealthCheck for LivenessHealthCheck {
fn on_event(&mut self, ve: &ValidatorEvent, ctx: &mut HealthCheckContext) {
match ve.event {
Event::Commit(..) => {
if let Some(prev) = self.last_committed.get(&ve.validator) {
if prev.timestamp > ve.timestamp {
return;
}
}
self.last_committed.insert(
ve.validator.clone(),
LastCommitInfo {
ve: Some(ve.clone()),
timestamp: ve.timestamp,
},
);
}
Event::ConsensusStarted => {
ctx.report_failure(ve.validator.clone(), "validator restarted".into());
}
}
}
async fn verify(&mut self, ctx: &mut HealthCheckContext) {
let min_timestamp = ctx.now - MAX_BEHIND;
for (validator, lci) in &self.last_committed {
if lci.timestamp < min_timestamp {
ctx.report_failure(
validator.clone(),
format!(
"Last commit is {} ms behind: {:?}",
(min_timestamp - lci.timestamp).as_millis(),
lci.ve,
),
);
}
}
}
fn invalidate(&mut self, validator: &str) {
self.last_committed
.insert(validator.into(), LastCommitInfo::default());
}
fn name(&self) -> &'static str {
"liveness_check"
}
}