1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
use crate::{
client::{CoordinatorMessage, StateSyncClient},
coordinator::StateSyncCoordinator,
executor_proxy::{ExecutorProxy, ExecutorProxyTrait},
network::{StateSyncEvents, StateSyncSender},
};
use consensus_notifications::ConsensusNotificationListener;
use diem_config::{config::NodeConfig, network_id::NodeNetworkId};
use diem_types::waypoint::Waypoint;
use executor_types::ChunkExecutor;
use futures::channel::mpsc;
use mempool_notifications::MempoolNotificationSender;
use std::{boxed::Box, collections::HashMap, sync::Arc};
use storage_interface::DbReader;
use subscription_service::ReconfigSubscription;
use tokio::runtime::{Builder, Runtime};
pub struct StateSyncBootstrapper {
_runtime: Runtime,
coordinator_sender: mpsc::UnboundedSender<CoordinatorMessage>,
}
impl StateSyncBootstrapper {
pub fn bootstrap<M: MempoolNotificationSender + 'static>(
network: Vec<(NodeNetworkId, StateSyncSender, StateSyncEvents)>,
mempool_notifier: M,
consensus_listener: ConsensusNotificationListener,
storage: Arc<dyn DbReader>,
executor: Box<dyn ChunkExecutor>,
node_config: &NodeConfig,
waypoint: Waypoint,
reconfig_event_subscriptions: Vec<ReconfigSubscription>,
) -> Self {
let runtime = Builder::new_multi_thread()
.thread_name("state-sync")
.enable_all()
.build()
.expect("[State Sync] Failed to create runtime!");
let executor_proxy = ExecutorProxy::new(storage, executor, reconfig_event_subscriptions);
Self::bootstrap_with_executor_proxy(
runtime,
network,
mempool_notifier,
consensus_listener,
node_config,
waypoint,
executor_proxy,
)
}
pub fn bootstrap_with_executor_proxy<
E: ExecutorProxyTrait + 'static,
M: MempoolNotificationSender + 'static,
>(
runtime: Runtime,
network: Vec<(NodeNetworkId, StateSyncSender, StateSyncEvents)>,
mempool_notifier: M,
consensus_listener: ConsensusNotificationListener,
node_config: &NodeConfig,
waypoint: Waypoint,
executor_proxy: E,
) -> Self {
let (coordinator_sender, coordinator_receiver) = mpsc::unbounded();
let initial_state = executor_proxy
.get_local_storage_state()
.expect("[State Sync] Starting failure: cannot sync with storage!");
let network_senders: HashMap<_, _> = network
.iter()
.map(|(network_id, sender, _events)| (network_id.clone(), sender.clone()))
.collect();
let coordinator = StateSyncCoordinator::new(
coordinator_receiver,
mempool_notifier,
consensus_listener,
network_senders,
node_config,
waypoint,
executor_proxy,
initial_state,
)
.expect("[State Sync] Unable to create state sync coordinator!");
runtime.spawn(coordinator.start(network));
Self {
_runtime: runtime,
coordinator_sender,
}
}
pub fn create_client(&self) -> StateSyncClient {
StateSyncClient::new(self.coordinator_sender.clone())
}
}