1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use diem_logger::prelude::*;
use futures::{Future, FutureExt, SinkExt};
use std::{pin::Pin, thread, time::Duration};
use crate::counters;
use tokio::{runtime::Handle, time::sleep};
pub trait TimeService: Send + Sync {
fn run_after(&self, timeout: Duration, task: Box<dyn ScheduledTask>);
fn get_current_timestamp(&self) -> Duration;
fn sleep(&self, t: Duration);
fn wait_until(&self, t: Duration) {
while let Some(mut wait_duration) = t.checked_sub(self.get_current_timestamp()) {
wait_duration += Duration::from_millis(1);
if wait_duration > Duration::from_secs(10) {
error!(
"[TimeService] long wait time {} seconds required",
wait_duration.as_secs()
);
}
counters::WAIT_DURATION_S.observe_duration(wait_duration);
self.sleep(wait_duration);
}
}
}
pub trait ScheduledTask: Send {
fn run(&mut self) -> Pin<Box<dyn Future<Output = ()> + Send>>;
}
pub struct SendTask<T>
where
T: Send + 'static,
{
sender: Option<channel::Sender<T>>,
message: Option<T>,
}
impl<T> SendTask<T>
where
T: Send + 'static,
{
pub fn make(sender: channel::Sender<T>, message: T) -> Box<dyn ScheduledTask> {
Box::new(SendTask {
sender: Some(sender),
message: Some(message),
})
}
}
impl<T> ScheduledTask for SendTask<T>
where
T: Send + 'static,
{
fn run(&mut self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let mut sender = self.sender.take().unwrap();
let message = self.message.take().unwrap();
let r = async move {
if let Err(e) = sender.send(message).await {
error!("Error on send: {:?}", e);
};
};
r.boxed()
}
}
pub struct ClockTimeService {
executor: Handle,
}
impl ClockTimeService {
pub fn new(executor: Handle) -> ClockTimeService {
ClockTimeService { executor }
}
}
impl TimeService for ClockTimeService {
fn run_after(&self, timeout: Duration, mut t: Box<dyn ScheduledTask>) {
let task = async move {
sleep(timeout).await;
t.run().await;
};
self.executor.spawn(task);
}
fn get_current_timestamp(&self) -> Duration {
diem_infallible::duration_since_epoch()
}
fn sleep(&self, t: Duration) {
thread::sleep(t)
}
}