1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{
    data::{get_events, get_transactions},
    errors::JsonRpcError,
    stream_rpc::subscription_types::{Subscription, SubscriptionHelper},
    views::{EventView, TransactionView},
};
use diem_json_rpc_types::stream::request::{
    SubscribeToEventsParams, SubscribeToTransactionsParams,
};
use diem_logger::warn;
use std::borrow::Borrow;

#[derive(Clone, Copy, Debug, Default)]
pub struct TransactionsSubscription {
    pub(crate) latest_version: u64,
}

impl Subscription<SubscribeToTransactionsParams, TransactionView> for TransactionsSubscription {
    fn init(
        &mut self,
        _helper: &SubscriptionHelper,
        params: &SubscribeToTransactionsParams,
    ) -> Result<(), JsonRpcError> {
        self.latest_version = params.starting_version;
        Ok(())
    }

    fn next(
        &self,
        helper: &SubscriptionHelper,
        params: &SubscribeToTransactionsParams,
    ) -> Vec<TransactionView> {
        match get_transactions(
            helper.db.borrow(),
            helper.db.get_latest_version().unwrap_or(0),
            self.latest_version,
            helper.client.config.fetch_size,
            params.include_events.unwrap_or(false),
        ) {
            Ok(transactions) => transactions.0,
            Err(e) => {
                warn!(
                    "Client#{} Could not fetch transactions: {}",
                    helper.client.id, e
                );
                vec![]
            }
        }
    }

    fn on_send(&mut self, tx: Option<&TransactionView>) {
        if tx.is_some() {
            self.latest_version += 1;
        }
    }
}

#[derive(Clone, Copy, Debug, Default)]
pub struct EventsSubscription {
    pub(crate) latest_event: u64,
}

impl Subscription<SubscribeToEventsParams, EventView> for EventsSubscription {
    fn init(
        &mut self,
        _helper: &SubscriptionHelper,
        params: &SubscribeToEventsParams,
    ) -> Result<(), JsonRpcError> {
        self.latest_event = params.event_seq_num;
        Ok(())
    }

    fn next(
        &self,
        helper: &SubscriptionHelper,
        params: &SubscribeToEventsParams,
    ) -> Vec<EventView> {
        match get_events(
            helper.db.borrow(),
            helper.db.get_latest_version().unwrap_or(0),
            params.event_key,
            self.latest_event,
            helper.client.config.fetch_size,
        ) {
            Ok(events) => events,
            Err(e) => {
                warn!("Client#{} Could not fetch events: {}", helper.client.id, e);
                vec![]
            }
        }
    }

    fn on_send(&mut self, event: Option<&EventView>) {
        if let Some(event) = event {
            self.latest_event = event.sequence_number + 1
        }
    }
}