1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#![forbid(unsafe_code)]
pub use diem_metrics_core::{
register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec, Histogram, HistogramTimer, HistogramVec,
IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
};
use diem_logger::{error, info};
use diem_metrics_core::{Encoder, TextEncoder};
use std::{env, sync::mpsc, thread, thread::JoinHandle, time::Duration};
const DEFAULT_PUSH_FREQUENCY_SECS: u64 = 15;
#[must_use = "Assign the contructed pusher to a variable, \
otherwise the worker thread is joined immediately."]
pub struct MetricsPusher {
worker_thread: Option<JoinHandle<()>>,
quit_sender: mpsc::Sender<()>,
}
impl MetricsPusher {
fn push(push_metrics_endpoint: &str) {
let mut buffer = Vec::new();
if let Err(e) = TextEncoder::new().encode(&diem_metrics_core::gather(), &mut buffer) {
error!("Failed to encode push metrics: {}.", e.to_string());
} else {
let response = ureq::post(push_metrics_endpoint)
.timeout_connect(10_000)
.send_bytes(&buffer);
if let Some(error) = response.synthetic_error() {
error!(
"Failed to push metrics to {}. Error: {}",
push_metrics_endpoint, error
);
}
}
}
fn worker(
quit_receiver: mpsc::Receiver<()>,
push_metrics_endpoint: String,
push_metrics_frequency_secs: u64,
) {
while quit_receiver
.recv_timeout(Duration::from_secs(push_metrics_frequency_secs))
.is_err()
{
Self::push(&push_metrics_endpoint);
}
Self::push(&push_metrics_endpoint);
}
fn start_worker_thread(quit_receiver: mpsc::Receiver<()>) -> Option<JoinHandle<()>> {
let push_metrics_endpoint = match env::var("PUSH_METRICS_ENDPOINT") {
Ok(s) => s,
Err(_) => {
info!("PUSH_METRICS_ENDPOINT env var is not set. Skipping sending metrics.");
return None;
}
};
let push_metrics_frequency_secs = match env::var("PUSH_METRICS_FREQUENCY_SECS") {
Ok(s) => match s.parse::<u64>() {
Ok(i) => i,
Err(_) => {
error!("Invalid value for PUSH_METRICS_FREQUENCY_SECS: {}", s);
return None;
}
},
Err(_) => DEFAULT_PUSH_FREQUENCY_SECS,
};
info!(
"Starting push metrics loop. Sending metrics to {} with a frequency of {} seconds",
push_metrics_endpoint, push_metrics_frequency_secs
);
Some(thread::spawn(move || {
Self::worker(
quit_receiver,
push_metrics_endpoint,
push_metrics_frequency_secs,
)
}))
}
pub fn start() -> Self {
let (tx, rx) = mpsc::channel();
let worker_thread = Self::start_worker_thread(rx);
Self {
worker_thread,
quit_sender: tx,
}
}
pub fn join(&mut self) {
if let Some(worker_thread) = self.worker_thread.take() {
if let Err(e) = self.quit_sender.send(()) {
error!(
"Failed to send quit signal to metric pushing worker thread: {:?}",
e
);
}
if let Err(e) = worker_thread.join() {
error!("Failed to join metric pushing worker thread: {:?}", e);
}
}
}
}
impl Drop for MetricsPusher {
fn drop(&mut self) {
self.join()
}
}