1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#![forbid(unsafe_code)]
use crate::health::ValidatorEvent;
use diem_infallible::duration_since_epoch;
use diem_logger::*;
use std::{
sync::{
atomic::{AtomicI64, Ordering},
mpsc, Arc,
},
thread,
time::{Duration, Instant},
};
pub struct LogTail {
pub event_receiver: mpsc::Receiver<ValidatorEvent>,
pub pending_messages: Arc<AtomicI64>,
}
impl LogTail {
pub fn recv_all_until_deadline(&self, deadline: Instant) -> Vec<ValidatorEvent> {
let mut events = vec![];
while Instant::now() < deadline {
match self.event_receiver.try_recv() {
Ok(event) => events.push(event),
Err(..) => thread::sleep(Duration::from_millis(1)),
}
}
let events_count = events.len() as i64;
let prev = self
.pending_messages
.fetch_sub(events_count, Ordering::Relaxed);
let pending = prev - events_count;
let now = duration_since_epoch();
if let Some(last) = events.last() {
let delay = now - last.received_timestamp;
if delay > Duration::from_secs(1) {
warn!(
"{} Last event delay: {}, pending {}",
now.as_millis(),
delay.as_millis(),
pending
);
}
} else {
debug!("{} No events", now.as_millis());
}
events
}
pub fn recv_all(&self) -> Vec<ValidatorEvent> {
let mut events = vec![];
while let Ok(event) = self.event_receiver.try_recv() {
self.pending_messages.fetch_sub(1, Ordering::Relaxed);
events.push(event);
}
events
}
}