1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use crate::{
persistent_safety_storage::PersistentSafetyStorage,
serializer::{SafetyRulesInput, SerializerClient, SerializerService, TSerializerClient},
Error, SafetyRules, TSafetyRules,
};
use diem_logger::warn;
use diem_secure_net::{NetworkClient, NetworkServer};
use std::net::SocketAddr;
pub trait RemoteService {
fn client(&self) -> SerializerClient {
let network_client = NetworkClient::new(
"safety-rules",
self.server_address(),
self.network_timeout_ms(),
);
let service = Box::new(RemoteClient::new(network_client));
SerializerClient::new_client(service)
}
fn server_address(&self) -> SocketAddr;
fn network_timeout_ms(&self) -> u64;
}
pub fn execute(
storage: PersistentSafetyStorage,
listen_addr: SocketAddr,
verify_vote_proposal_signature: bool,
export_consensus_key: bool,
network_timeout_ms: u64,
decoupled_execution: bool,
) {
let mut safety_rules = SafetyRules::new(
storage,
verify_vote_proposal_signature,
export_consensus_key,
decoupled_execution,
);
if let Err(e) = safety_rules.consensus_state() {
warn!("Unable to print consensus state: {}", e);
}
let mut serializer_service = SerializerService::new(safety_rules);
let mut network_server = NetworkServer::new("safety-rules", listen_addr, network_timeout_ms);
loop {
if let Err(e) = process_one_message(&mut network_server, &mut serializer_service) {
warn!("Failed to process message: {}", e);
}
}
}
fn process_one_message(
network_server: &mut NetworkServer,
serializer_service: &mut SerializerService,
) -> Result<(), Error> {
let request = network_server.read()?;
let response = serializer_service.handle_message(request)?;
network_server.write(&response)?;
Ok(())
}
struct RemoteClient {
network_client: NetworkClient,
}
impl RemoteClient {
pub fn new(network_client: NetworkClient) -> Self {
Self { network_client }
}
fn process_one_message(&mut self, input: &[u8]) -> Result<Vec<u8>, Error> {
self.network_client.write(input)?;
self.network_client.read().map_err(|e| e.into())
}
}
impl TSerializerClient for RemoteClient {
fn request(&mut self, input: SafetyRulesInput) -> Result<Vec<u8>, Error> {
let input_message = serde_json::to_vec(&input)?;
loop {
match self.process_one_message(&input_message) {
Err(err) => warn!("Failed to communicate with SafetyRules service: {}", err),
Ok(value) => return Ok(value),
}
}
}
}