1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use crate::util::time_service::{ScheduledTask, TimeService};
use diem_infallible::Mutex;
use diem_logger::prelude::*;
use std::{sync::Arc, time::Duration};
pub struct SimulatedTimeService {
inner: Arc<Mutex<SimulatedTimeServiceInner>>,
}
struct SimulatedTimeServiceInner {
now: Duration,
pending: Vec<(Duration, Box<dyn ScheduledTask>)>,
time_limit: Duration,
max: Duration,
}
impl TimeService for SimulatedTimeService {
fn run_after(&self, timeout: Duration, mut t: Box<dyn ScheduledTask>) {
let mut inner = self.inner.lock();
let now = inner.now;
let deadline = now + timeout;
if deadline > inner.time_limit {
debug!(
"sched for deadline: {}, now: {}, limit: {}",
deadline.as_millis(),
now.as_millis(),
inner.time_limit.as_millis()
);
inner.pending.push((deadline, t));
} else {
debug!(
"exec deadline: {}, now: {}",
deadline.as_millis(),
now.as_millis()
);
inner.now = deadline;
if inner.now > inner.max {
inner.now = inner.max;
}
futures::executor::block_on(t.run());
}
}
fn get_current_timestamp(&self) -> Duration {
self.inner.lock().now
}
fn sleep(&self, t: Duration) {
let inner = self.inner.clone();
let mut inner = inner.lock();
inner.now += t;
if inner.now > inner.max {
inner.now = inner.max;
}
}
}
impl SimulatedTimeService {
pub fn new() -> SimulatedTimeService {
SimulatedTimeService {
inner: Arc::new(Mutex::new(SimulatedTimeServiceInner {
now: Duration::from_secs(0),
pending: vec![],
time_limit: Duration::from_secs(0),
max: Duration::from_secs(std::u64::MAX),
})),
}
}
pub fn auto_advance_until(time_limit: Duration) -> SimulatedTimeService {
SimulatedTimeService {
inner: Arc::new(Mutex::new(SimulatedTimeServiceInner {
now: Duration::from_secs(0),
pending: vec![],
time_limit,
max: Duration::from_secs(std::u64::MAX),
})),
}
}
#[allow(dead_code)]
pub fn update_auto_advance_limit(&mut self, time: Duration) {
let mut inner = self.inner.lock();
inner.time_limit += time;
let time_limit = inner.time_limit;
let mut i = 0;
let mut drain = vec![];
while i != inner.pending.len() {
let deadline = inner.pending[i].0;
if deadline <= time_limit {
drain.push(inner.pending.remove(i));
} else {
i += 1;
}
}
for (_, mut t) in drain {
futures::executor::block_on(t.run());
}
}
}
impl Clone for SimulatedTimeService {
fn clone(&self) -> SimulatedTimeService {
SimulatedTimeService {
inner: self.inner.clone(),
}
}
}