1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{
    cluster::Cluster,
    experiments::{Context, Experiment, ExperimentParam},
    instance::Instance,
    tx_emitter::EmitJobRequest,
};
use anyhow::Result;
use async_trait::async_trait;
use core::fmt;
use futures::FutureExt;
use std::{collections::HashSet, time::Duration};
use structopt::StructOpt;

#[derive(StructOpt, Debug)]
pub struct AccurateMeasurementParams {
    #[structopt(
    long,
    default_value = Box::leak(format!("{}", DEFAULT_BENCH_DURATION).into_boxed_str()),
    help = "Duration of an experiment in seconds"
    )]
    pub duration: u64,
    #[structopt(long, help = "Set fixed tps as the base tps number of experiment")]
    pub base_tps: u64,
    #[structopt(long, help = "Step numbers to change tps")]
    pub step_num: u64,
    #[structopt(long, help = "How may tps change for each step")]
    pub step_length: u64,
}

pub struct AccurateMeasurement {
    validators: Vec<Instance>,
    fullnodes: Vec<Instance>,
    duration: Duration,
    base_tps: u64,
    step_num: u64,
    step_length: u64,
}

pub const DEFAULT_BENCH_DURATION: u64 = 600;

impl ExperimentParam for AccurateMeasurementParams {
    type E = AccurateMeasurement;
    fn build(self, cluster: &Cluster) -> Self::E {
        let validators = cluster.validator_instances().to_vec();
        let fullnodes = cluster.fullnode_instances().to_vec();
        Self::E {
            validators,
            fullnodes,
            duration: Duration::from_secs(self.duration),
            base_tps: self.base_tps,
            step_num: self.step_num,
            step_length: self.step_length,
        }
    }
}

#[async_trait]
impl Experiment for AccurateMeasurement {
    fn affected_validators(&self) -> HashSet<String> {
        HashSet::new()
    }

    async fn run(&mut self, context: &mut Context<'_>) -> Result<()> {
        let instances = if context.emit_to_validator {
            self.validators.clone()
        } else {
            self.fullnodes.clone()
        };
        for i in 0..self.step_num {
            let tps = self.base_tps + self.step_length * i;
            let window = self.duration / self.step_num as u32;
            let emit_job_request = EmitJobRequest::fixed_tps(instances.clone(), tps, 0, 0);
            let emit_txn = context
                .tx_emitter
                .emit_txn_for(window, emit_job_request)
                .boxed();
            let stats = emit_txn.await?;
            // Report
            let test_step = format!("Step {}", i);
            context
                .report
                .report_txn_stats(test_step, stats, window, "");
        }

        Ok(())
    }

    fn deadline(&self) -> Duration {
        let buffer = Duration::from_secs(60);
        self.duration + buffer
    }
}

impl fmt::Display for AccurateMeasurement {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(
            f,
            "Perf Measurement: start tps {}, steps number {}, step length = {}",
            self.base_tps, self.step_num, self.step_length
        )?;
        Ok(())
    }
}