1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::{
backup_types::{
epoch_ending::restore::EpochHistoryRestoreController,
state_snapshot::restore::{StateSnapshotRestoreController, StateSnapshotRestoreOpt},
transaction::restore::TransactionRestoreBatchController,
},
metadata,
metadata::cache::MetadataCacheOpt,
metrics::verify::{
VERIFY_COORDINATOR_FAIL_TS, VERIFY_COORDINATOR_START_TS, VERIFY_COORDINATOR_SUCC_TS,
},
storage::BackupStorage,
utils::{unix_timestamp_sec, GlobalRestoreOptions, RestoreRunMode, TrustedWaypointOpt},
};
use anyhow::Result;
use diem_logger::prelude::*;
use diem_types::transaction::Version;
use std::sync::Arc;
pub struct VerifyCoordinator {
storage: Arc<dyn BackupStorage>,
metadata_cache_opt: MetadataCacheOpt,
trusted_waypoints_opt: TrustedWaypointOpt,
concurrent_downloads: usize,
}
impl VerifyCoordinator {
pub fn new(
storage: Arc<dyn BackupStorage>,
metadata_cache_opt: MetadataCacheOpt,
trusted_waypoints_opt: TrustedWaypointOpt,
concurrent_downloads: usize,
) -> Result<Self> {
Ok(Self {
storage,
metadata_cache_opt,
trusted_waypoints_opt,
concurrent_downloads,
})
}
pub async fn run(self) -> Result<()> {
info!("Verify coordinator started.");
VERIFY_COORDINATOR_START_TS.set(unix_timestamp_sec());
let ret = self.run_impl().await;
if let Err(e) = &ret {
error!(
error = ?e,
"Verify coordinator failed."
);
VERIFY_COORDINATOR_FAIL_TS.set(unix_timestamp_sec());
} else {
info!("Verify coordinator exiting with success.");
VERIFY_COORDINATOR_SUCC_TS.set(unix_timestamp_sec());
}
ret
}
async fn run_impl(self) -> Result<()> {
let metadata_view = metadata::cache::sync_and_load(
&self.metadata_cache_opt,
Arc::clone(&self.storage),
self.concurrent_downloads,
)
.await?;
let ver_max = Version::max_value();
let state_snapshot = metadata_view.select_state_snapshot(ver_max)?;
let transactions = metadata_view.select_transaction_backups(0, ver_max)?;
let epoch_endings = metadata_view.select_epoch_ending_backups(ver_max)?;
let global_opt = GlobalRestoreOptions {
target_version: ver_max,
trusted_waypoints: Arc::new(self.trusted_waypoints_opt.verify()?),
run_mode: Arc::new(RestoreRunMode::Verify),
concurrent_downloads: self.concurrent_downloads,
};
let epoch_history = Arc::new(
EpochHistoryRestoreController::new(
epoch_endings
.into_iter()
.map(|backup| backup.manifest)
.collect(),
global_opt.clone(),
self.storage.clone(),
)
.run()
.await?,
);
if let Some(backup) = state_snapshot {
StateSnapshotRestoreController::new(
StateSnapshotRestoreOpt {
manifest_handle: backup.manifest,
version: backup.version,
},
global_opt.clone(),
Arc::clone(&self.storage),
Some(Arc::clone(&epoch_history)),
)
.run()
.await?;
}
let txn_manifests = transactions.into_iter().map(|b| b.manifest).collect();
TransactionRestoreBatchController::new(
global_opt,
self.storage,
txn_manifests,
None, Some(epoch_history),
)
.run()
.await?;
Ok(())
}
}