1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
use crate::{
data::{get_events, get_transactions},
errors::JsonRpcError,
stream_rpc::subscription_types::{Subscription, SubscriptionHelper},
views::{EventView, TransactionView},
};
use diem_json_rpc_types::stream::request::{
SubscribeToEventsParams, SubscribeToTransactionsParams,
};
use diem_logger::warn;
use std::borrow::Borrow;
#[derive(Clone, Copy, Debug, Default)]
pub struct TransactionsSubscription {
pub(crate) latest_version: u64,
}
impl Subscription<SubscribeToTransactionsParams, TransactionView> for TransactionsSubscription {
fn init(
&mut self,
_helper: &SubscriptionHelper,
params: &SubscribeToTransactionsParams,
) -> Result<(), JsonRpcError> {
self.latest_version = params.starting_version;
Ok(())
}
fn next(
&self,
helper: &SubscriptionHelper,
params: &SubscribeToTransactionsParams,
) -> Vec<TransactionView> {
match get_transactions(
helper.db.borrow(),
helper.db.get_latest_version().unwrap_or(0),
self.latest_version,
helper.client.config.fetch_size,
params.include_events.unwrap_or(false),
) {
Ok(transactions) => transactions.0,
Err(e) => {
warn!(
"Client#{} Could not fetch transactions: {}",
helper.client.id, e
);
vec![]
}
}
}
fn on_send(&mut self, tx: Option<&TransactionView>) {
if tx.is_some() {
self.latest_version += 1;
}
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct EventsSubscription {
pub(crate) latest_event: u64,
}
impl Subscription<SubscribeToEventsParams, EventView> for EventsSubscription {
fn init(
&mut self,
_helper: &SubscriptionHelper,
params: &SubscribeToEventsParams,
) -> Result<(), JsonRpcError> {
self.latest_event = params.event_seq_num;
Ok(())
}
fn next(
&self,
helper: &SubscriptionHelper,
params: &SubscribeToEventsParams,
) -> Vec<EventView> {
match get_events(
helper.db.borrow(),
helper.db.get_latest_version().unwrap_or(0),
params.event_key,
self.latest_event,
helper.client.config.fetch_size,
) {
Ok(events) => events,
Err(e) => {
warn!("Client#{} Could not fetch events: {}", helper.client.id, e);
vec![]
}
}
}
fn on_send(&mut self, event: Option<&EventView>) {
if let Some(event) = event {
self.latest_event = event.sequence_number + 1
}
}
}