1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use crate::{
cluster::Cluster,
experiments::{Context, Experiment, ExperimentParam},
instance::Instance,
tx_emitter::EmitJobRequest,
};
use anyhow::Result;
use async_trait::async_trait;
use core::fmt;
use futures::FutureExt;
use std::{collections::HashSet, time::Duration};
use structopt::StructOpt;
#[derive(StructOpt, Debug)]
pub struct AccurateMeasurementParams {
#[structopt(
long,
default_value = Box::leak(format!("{}", DEFAULT_BENCH_DURATION).into_boxed_str()),
help = "Duration of an experiment in seconds"
)]
pub duration: u64,
#[structopt(long, help = "Set fixed tps as the base tps number of experiment")]
pub base_tps: u64,
#[structopt(long, help = "Step numbers to change tps")]
pub step_num: u64,
#[structopt(long, help = "How may tps change for each step")]
pub step_length: u64,
}
pub struct AccurateMeasurement {
validators: Vec<Instance>,
fullnodes: Vec<Instance>,
duration: Duration,
base_tps: u64,
step_num: u64,
step_length: u64,
}
pub const DEFAULT_BENCH_DURATION: u64 = 600;
impl ExperimentParam for AccurateMeasurementParams {
type E = AccurateMeasurement;
fn build(self, cluster: &Cluster) -> Self::E {
let validators = cluster.validator_instances().to_vec();
let fullnodes = cluster.fullnode_instances().to_vec();
Self::E {
validators,
fullnodes,
duration: Duration::from_secs(self.duration),
base_tps: self.base_tps,
step_num: self.step_num,
step_length: self.step_length,
}
}
}
#[async_trait]
impl Experiment for AccurateMeasurement {
fn affected_validators(&self) -> HashSet<String> {
HashSet::new()
}
async fn run(&mut self, context: &mut Context<'_>) -> Result<()> {
let instances = if context.emit_to_validator {
self.validators.clone()
} else {
self.fullnodes.clone()
};
for i in 0..self.step_num {
let tps = self.base_tps + self.step_length * i;
let window = self.duration / self.step_num as u32;
let emit_job_request = EmitJobRequest::fixed_tps(instances.clone(), tps, 0, 0);
let emit_txn = context
.tx_emitter
.emit_txn_for(window, emit_job_request)
.boxed();
let stats = emit_txn.await?;
let test_step = format!("Step {}", i);
context
.report
.report_txn_stats(test_step, stats, window, "");
}
Ok(())
}
fn deadline(&self) -> Duration {
let buffer = Duration::from_secs(60);
self.duration + buffer
}
}
impl fmt::Display for AccurateMeasurement {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Perf Measurement: start tps {}, steps number {}, step length = {}",
self.base_tps, self.step_num, self.step_length
)?;
Ok(())
}
}