1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

#![forbid(unsafe_code)]

//! Abstract time service

use enum_dispatch::enum_dispatch;
#[cfg(any(test, feature = "async"))]
use pin_project::pin_project;
use std::{
    fmt::Debug,
    time::{Duration, Instant},
};
#[cfg(any(test, feature = "async"))]
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

#[cfg(any(test, feature = "async"))]
pub mod interval;
#[cfg(any(test, feature = "testing"))]
pub mod mock;
pub mod real;
#[cfg(any(test, feature = "async"))]
pub mod timeout;

#[cfg(any(test, feature = "testing"))]
pub use crate::mock::{MockSleep, MockTimeService};
pub use crate::real::RealTimeService;
#[cfg(any(test, feature = "async"))]
pub use crate::{interval::Interval, real::RealSleep, timeout::Timeout};

// TODO(philiphayes): use Duration constants when those stabilize.
#[cfg(any(test, feature = "async"))]
const ZERO_DURATION: Duration = Duration::from_nanos(0);

/// `TimeService` abstracts all time-related operations in one place that can be
/// easily mocked-out and controlled in tests or delegated to the actual
/// underlying runtime (usually tokio). It's provided as an enum so we don't have
/// to infect everything with a generic tag.
///
/// `TimeService` is async-focused: the `sleep`, `interval`, and `timeout` methods
/// all return `Future`s and `Stream`s. That said, `TimeService` supports non-async
/// clients as well; simply use the `sleep_blocking` method instead of `sleep`.
/// Note that the blocking call will actually block the current thread until the
/// sleep time has elapsed.
///
/// `TimeService` tries to mirror the API provided by [`tokio::time`] to an
/// extent. The primary difference is that all time is expressed in relative
/// [`Duration`]s. In other words, "sleep for 5s" vs "sleep until unix time
/// 1607734460". Absolute time is provided by [`TimeService::now`] which returns
/// the current unix time.
///
/// Note: you must also include the [`TimeServiceTrait`] to use the actual
/// time-related functionality.
///
/// Note: we have to provide our own [`Timeout`] and [`Interval`] types that
/// use the [`Sleep`] future, since tokio's implementations are coupled to its
/// internal Sleep future.
///
/// Note: `TimeService`'s should be free (or very cheap) to clone and send around
/// between threads. In production (without test features), this enum is a
/// zero-sized type.
#[enum_dispatch(TimeServiceTrait)]
#[derive(Clone, Debug)]
pub enum TimeService {
    RealTimeService(RealTimeService),

    #[cfg(any(test, feature = "testing"))]
    MockTimeService(MockTimeService),
}

impl TimeService {
    /// Create a new real, production time service that actually uses the systemtime.
    ///
    /// See [`RealTimeService`].
    pub fn real() -> Self {
        RealTimeService::new().into()
    }

    /// Create a mock, simulated time service that does not query the system time
    /// and allows fine-grained control over advancing time and waking sleeping
    /// tasks.
    ///
    /// See [`MockTimeService`].
    #[cfg(any(test, feature = "testing"))]
    pub fn mock() -> Self {
        MockTimeService::new().into()
    }

    #[cfg(any(test, feature = "testing"))]
    pub fn into_mock(self) -> MockTimeService {
        match self {
            TimeService::MockTimeService(inner) => inner,
            ts => panic!("Unexpected TimeService, expected MockTimeService: {:?}", ts),
        }
    }
}

impl Default for TimeService {
    fn default() -> Self {
        Self::real()
    }
}

#[enum_dispatch]
pub trait TimeServiceTrait: Send + Sync + Clone + Debug {
    /// Query a monotonically nondecreasing clock. Returns an opaque type that
    /// can only be compared to other [`Instant`]s, i.e., this is a monotonic
    /// relative time whereas [`now_unix_time`](#method.now_unix_time) is a
    /// non-monotonic absolute time.
    ///
    /// On Linux, this is equivalent to
    /// [`clock_gettime(CLOCK_MONOTONIC, _)`](https://linux.die.net/man/3/clock_gettime)
    ///
    /// See [`Instant`] for more details.
    fn now(&self) -> Instant;

    /// Query the current unix timestamp as a [`Duration`].
    ///
    /// When used on a `TimeService::real()`, this is equivalent to
    /// `SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)`.
    ///
    /// Note: the [`Duration`] returned from this function is _NOT_ guaranteed to
    /// be monotonic. Use [`now`](#method.now) if you need monotonicity.
    ///
    /// From the [`SystemTime`] docs:
    ///
    /// > Distinct from the [`Instant`] type, this time measurement is
    /// > not monotonic. This means that you can save a file to the file system,
    /// > then save another file to the file system, and the second file has a
    /// > [`SystemTime`] measurement earlier than the first. In other words, an
    /// > operation that happens after another operation in real time may have
    /// > an earlier SystemTime!
    ///
    /// For example, the system administrator could [`clock_settime`] into the
    /// past, breaking clock time monotonicity.
    ///
    /// On Linux, this is equivalent to
    /// [`clock_gettime(CLOCK_REALTIME, _)`](https://linux.die.net/man/3/clock_gettime).
    ///
    /// [`Duration`]: std::time::Duration
    /// [`Instant`]: std::time::Instant
    /// [`SystemTime`]: std::time::SystemTime
    /// [`clock_settime`]: https://linux.die.net/man/3/clock_settime
    fn now_unix_time(&self) -> Duration;

    /// Query the current unix timestamp in seconds.
    ///
    /// Equivalent to `self.now_unix_time().as_secs()`.
    /// See [`now_unix_time`](#method.now_unix_time).
    fn now_secs(&self) -> u64 {
        self.now_unix_time().as_secs()
    }

    /// Return a [`Future`] that waits until `duration` has passed.
    ///
    /// No work is performed while awaiting on the sleep future to complete. `Sleep`
    /// operates at millisecond granularity and should not be used for tasks that
    /// require high-resolution timers.
    ///
    /// # Cancelation
    ///
    /// Canceling a sleep instance is done by dropping the returned future. No
    /// additional cleanup work is required.
    #[cfg(any(test, feature = "async"))]
    fn sleep(&self, duration: Duration) -> Sleep;

    /// Return a [`Future`] that waits until the `deadline`.
    ///
    /// If the deadline is in the past, the Sleep will trigger as soons as it's
    /// polled.
    ///
    /// See [`sleep`](#method.sleep) for more details.
    #[cfg(any(test, feature = "async"))]
    fn sleep_until(&self, deadline: Instant) -> Sleep {
        let duration = deadline.saturating_duration_since(self.now());
        self.sleep(duration)
    }

    /// Blocks the current thread until `duration` time has passed.
    fn sleep_blocking(&self, duration: Duration);

    /// Creates a new [`Interval`] that yields with interval of `period`. The
    /// first tick completes immediately. An interval will tick indefinitely.
    ///
    /// # Cancelation
    ///
    /// At any time, the [`Interval`] value can be dropped. This cancels the interval.
    ///
    /// # Panics
    ///
    /// This function panics if `period` is zero.
    #[cfg(any(test, feature = "async"))]
    fn interval(&self, period: Duration) -> Interval {
        let delay = self.sleep(ZERO_DURATION);
        Interval::new(delay, period)
    }

    /// Creates a new [`Interval`] that yields with interval of `period`. The
    /// first tick completes after the `start` deadline. An interval will tick
    /// indefinitely.
    ///
    /// See [`interval`](#method.interval) for more details.
    #[cfg(any(test, feature = "async"))]
    fn interval_at(&self, start: Instant, period: Duration) -> Interval {
        let delay = self.sleep_until(start);
        Interval::new(delay, period)
    }

    /// Require a [`Future`] to complete before the specified duration has elapsed.
    ///
    /// If the future completes before the duration has elapsed, then the completed
    /// value is returned. Otherwise, `Err(Elapsed)` is returned and the future is
    /// canceled.
    ///
    /// # Cancelation
    ///
    /// Cancelling a timeout is done by dropping the future. No additional cleanup
    /// or other work is required.
    ///
    /// The original future may be obtained by calling [`Timeout::into_inner`]. This
    /// consumes the [`Timeout`].
    #[cfg(any(test, feature = "async"))]
    fn timeout<F: Future>(&self, duration: Duration, future: F) -> Timeout<F> {
        let delay = self.sleep(duration);
        Timeout::new(future, delay)
    }

    /// Require a [`Future`] to complete before the `deadline`.
    ///
    /// If the future completes before the duration has elapsed, then the completed
    /// value is returned. Otherwise, `Err(Elapsed)` is returned and the future is
    /// canceled.
    ///
    /// See [`timeout`](#method.timeout) for more details.
    #[cfg(any(test, feature = "async"))]
    fn timeout_at<F: Future>(&self, deadline: Instant, future: F) -> Timeout<F> {
        let delay = self.sleep_until(deadline);
        Timeout::new(future, delay)
    }
}

/// A [`Future`] that resolves after some time has elapsed (either real or
/// simulated, depending on the parent [`TimeService`]).
///
/// `Sleep` is modeled after [`tokio::time::Sleep`].
#[pin_project(project = SleepProject)]
#[derive(Debug)]
#[cfg(any(test, feature = "async"))]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[allow(clippy::large_enum_variant)]
pub enum Sleep {
    RealSleep(#[pin] RealSleep),

    #[cfg(any(test, feature = "testing"))]
    MockSleep(MockSleep),
}

#[cfg(any(test, feature = "async"))]
impl From<RealSleep> for Sleep {
    fn from(sleep: RealSleep) -> Self {
        Sleep::RealSleep(sleep)
    }
}

#[cfg(any(test, feature = "fuzzing", feature = "testing"))]
impl From<MockSleep> for Sleep {
    fn from(sleep: MockSleep) -> Self {
        Sleep::MockSleep(sleep)
    }
}

#[cfg(any(test, feature = "async"))]
impl Future for Sleep {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.project() {
            SleepProject::RealSleep(inner) => inner.poll(cx),
            #[cfg(any(test, feature = "testing"))]
            SleepProject::MockSleep(inner) => Pin::new(inner).poll(cx),
        }
    }
}

#[cfg(any(test, feature = "async"))]
pub trait SleepTrait: Future<Output = ()> + Send + Sync + Debug {
    /// Returns `true` if this `Sleep`'s requested wait duration has elapsed.
    fn is_elapsed(&self) -> bool;

    /// Resets this `Sleep` to wait again for `duration`.
    fn reset(self: Pin<&mut Self>, duration: Duration);

    /// Reset this `Sleep` to wait again until the `deadline`.
    fn reset_until(self: Pin<&mut Self>, deadline: Instant);
}

#[cfg(any(test, feature = "async"))]
impl SleepTrait for Sleep {
    fn is_elapsed(&self) -> bool {
        match self {
            Sleep::RealSleep(inner) => SleepTrait::is_elapsed(inner),
            #[cfg(any(test, feature = "fuzzing", feature = "testing"))]
            Sleep::MockSleep(inner) => SleepTrait::is_elapsed(inner),
        }
    }

    fn reset(self: Pin<&mut Self>, duration: Duration) {
        match self.project() {
            SleepProject::RealSleep(inner) => SleepTrait::reset(inner, duration),
            #[cfg(any(test, feature = "fuzzing", feature = "testing"))]
            SleepProject::MockSleep(inner) => SleepTrait::reset(Pin::new(inner), duration),
        }
    }

    fn reset_until(self: Pin<&mut Self>, deadline: Instant) {
        match self.project() {
            SleepProject::RealSleep(inner) => SleepTrait::reset_until(inner, deadline),
            #[cfg(any(test, feature = "fuzzing", feature = "testing"))]
            SleepProject::MockSleep(inner) => SleepTrait::reset_until(Pin::new(inner), deadline),
        }
    }
}