1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

#![forbid(unsafe_code)]

use std::{collections::HashSet, fmt, time::Duration};

use structopt::StructOpt;
use tokio::time;

use crate::{
    cluster::Cluster,
    experiments::{Context, Experiment, ExperimentParam},
    instance::Instance,
    tx_emitter::EmitJobRequest,
};
use async_trait::async_trait;
use diem_logger::info;
use std::time::Instant;

#[derive(StructOpt, Debug)]
pub struct RecoveryTimeParams {
    #[structopt(
        long,
        default_value = "100",
        help = "Number of accounts to mint before starting the experiment"
    )]
    pub num_accounts_to_mint: u64,
}

pub struct RecoveryTime {
    params: RecoveryTimeParams,
    instance: Instance,
}

impl ExperimentParam for RecoveryTimeParams {
    type E = RecoveryTime;
    fn build(self, cluster: &Cluster) -> Self::E {
        let instance = cluster.random_validator_instance();
        Self::E {
            params: self,
            instance,
        }
    }
}

#[async_trait]
impl Experiment for RecoveryTime {
    fn affected_validators(&self) -> HashSet<String> {
        let mut result = HashSet::new();
        result.insert(self.instance.peer_name().clone());
        result
    }

    async fn run(&mut self, context: &mut Context<'_>) -> anyhow::Result<()> {
        context
            .tx_emitter
            .mint_accounts(
                &EmitJobRequest::for_instances(
                    context.cluster.validator_instances().to_vec(),
                    context.global_emit_job_request,
                    0,
                    0,
                ),
                self.params.num_accounts_to_mint as usize,
            )
            .await?;
        info!("Stopping {}", self.instance);
        self.instance.stop().await?;
        info!("Deleting db and restarting node for {}", self.instance);
        self.instance.clean_data().await?;
        self.instance.start().await?;
        info!("Waiting for instance to be up: {}", self.instance);
        self.instance
            .wait_json_rpc(Instant::now() + Duration::from_secs(120))
            .await?;
        let start_instant = Instant::now();
        info!(
            "Instance {} is up. Waiting for it to start committing.",
            self.instance
        );
        while self
            .instance
            .counter("diem_consensus_last_committed_round")
            .is_err()
        {
            time::sleep(Duration::from_secs(1)).await;
        }
        let time_to_recover = start_instant.elapsed();
        let recovery_rate =
            self.params.num_accounts_to_mint as f64 / time_to_recover.as_secs() as f64;
        let result = format!("Recovery rate : {:.1} txn/sec", recovery_rate,);
        info!("{}", result);
        context.report.report_text(result);
        context
            .report
            .report_metric(self, "recovery_rate", recovery_rate);
        Ok(())
    }

    fn deadline(&self) -> Duration {
        Duration::from_secs(20 * 60)
    }
}

impl fmt::Display for RecoveryTime {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "RecoveryTime")
    }
}