1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use anyhow::Result;
use bytes::Bytes;
use diem_logger::prelude::*;
use diem_metrics::{register_histogram_vec, register_int_counter_vec, HistogramVec, IntCounterVec};
use diemdb::backup::backup_handler::BackupHandler;
use hyper::Body;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::{convert::Infallible, future::Future};
use warp::{reply::Response, Rejection, Reply};
pub(super) static LATENCY_HISTOGRAM: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"diem_backup_service_latency_s",
"Backup service endpoint latency.",
&["endpoint", "status"]
)
.unwrap()
});
pub(super) static THROUGHPUT_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"diem_backup_service_sent_bytes",
"Backup service throughput in bytes.",
&["endpoint"]
)
.unwrap()
});
pub(super) fn reply_with_bcs_bytes<R: Serialize>(
endpoint: &str,
record: &R,
) -> Result<Box<dyn Reply>> {
let bytes = bcs::to_bytes(record)?;
THROUGHPUT_COUNTER
.with_label_values(&[endpoint])
.inc_by(bytes.len() as u64);
Ok(Box::new(bytes))
}
pub(super) struct BytesSender {
endpoint: &'static str,
inner: hyper::body::Sender,
}
impl BytesSender {
fn new(endpoint: &'static str, inner: hyper::body::Sender) -> Self {
Self { endpoint, inner }
}
async fn send_data(&mut self, chunk: Bytes) -> Result<()> {
let n_bytes = chunk.len();
self.inner.send_data(chunk).await?;
THROUGHPUT_COUNTER
.with_label_values(&[self.endpoint])
.inc_by(n_bytes as u64);
Ok(())
}
fn abort(self) {
self.inner.abort()
}
}
pub(super) fn reply_with_async_channel_writer<G, F>(
backup_handler: &BackupHandler,
endpoint: &'static str,
get_channel_writer: G,
) -> Box<dyn Reply>
where
G: FnOnce(BackupHandler, BytesSender) -> F,
F: Future<Output = ()> + Send + 'static,
{
let (sender, body) = Body::channel();
let sender = BytesSender::new(endpoint, sender);
let bh = backup_handler.clone();
tokio::spawn(get_channel_writer(bh, sender));
Box::new(Response::new(body))
}
pub(super) async fn send_size_prefixed_bcs_bytes<I, R>(iter_res: Result<I>, mut sender: BytesSender)
where
I: Iterator<Item = Result<R>>,
R: Serialize,
{
send_size_prefixed_bcs_bytes_impl(iter_res, &mut sender)
.await
.unwrap_or_else(|e| {
warn!("Failed writing to output http body: {:?}", e);
sender.abort()
});
}
async fn send_size_prefixed_bcs_bytes_impl<I, R>(
iter_res: Result<I>,
sender: &mut BytesSender,
) -> Result<()>
where
I: Iterator<Item = Result<R>>,
R: Serialize,
{
for record_res in iter_res? {
let record = record_res?;
let record_bytes = bcs::to_bytes(&record)?;
let size_bytes = (record_bytes.len() as u32).to_be_bytes();
sender.send_data(Bytes::from(size_bytes.to_vec())).await?;
sender.send_data(Bytes::from(record_bytes)).await?;
}
Ok(())
}
pub(super) fn unwrap_or_500(result: Result<Box<dyn Reply>>) -> Box<dyn Reply> {
match result {
Ok(resp) => resp,
Err(e) => {
warn!("Request handler exception: {:#}", e);
Box::new(warp::http::StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
pub(super) async fn handle_rejection(err: Rejection) -> Result<impl Reply, Infallible> {
warn!("bad request: {:?}", err);
Ok(warp::http::StatusCode::BAD_REQUEST)
}