1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
use crate::serializer::{
ExecutionCorrectnessInput, SerializerClient, SerializerService, TSerializerClient,
};
use diem_crypto::ed25519::Ed25519PrivateKey;
use diem_infallible::Mutex;
use diem_logger::warn;
use diem_secure_net::{NetworkClient, NetworkServer};
use diem_vm::DiemVM;
use executor::Executor;
use executor_types::Error;
use std::net::SocketAddr;
use storage_client::StorageClient;
pub trait RemoteService {
fn client(&self) -> SerializerClient {
let network_client =
NetworkClient::new("execution", self.server_address(), self.network_timeout());
let service = Box::new(RemoteClient::new(Mutex::new(network_client)));
SerializerClient::new_client(service)
}
fn server_address(&self) -> SocketAddr;
fn network_timeout(&self) -> u64;
}
pub fn execute(
storage_addr: SocketAddr,
listen_addr: SocketAddr,
prikey: Option<Ed25519PrivateKey>,
network_timeout: u64,
) {
let block_executor = Box::new(Executor::<DiemVM>::new(
StorageClient::new(&storage_addr, network_timeout).into(),
));
let serializer_service = SerializerService::new(block_executor, prikey);
let mut network_server = NetworkServer::new("execution", listen_addr, network_timeout);
loop {
if let Err(e) = process_one_message(&mut network_server, &serializer_service) {
warn!("Warning: Failed to process message: {}", e);
}
}
}
fn process_one_message(
network_server: &mut NetworkServer,
serializer_service: &SerializerService,
) -> Result<(), Error> {
let request = network_server.read()?;
let response = serializer_service.handle_message(request)?;
network_server.write(&response)?;
Ok(())
}
struct RemoteClient {
network_client: Mutex<NetworkClient>,
}
impl RemoteClient {
pub fn new(network_client: Mutex<NetworkClient>) -> Self {
Self { network_client }
}
fn process_one_message(&self, input: &[u8]) -> Result<Vec<u8>, Error> {
let mut client = self.network_client.lock();
client.write(input)?;
client.read().map_err(|e| e.into())
}
}
impl TSerializerClient for RemoteClient {
fn request(&self, input: ExecutionCorrectnessInput) -> Result<Vec<u8>, Error> {
let input_message = bcs::to_bytes(&input)?;
loop {
match self.process_one_message(&input_message) {
Err(err) => warn!("Failed to communicate with LEC service: {}", err),
Ok(value) => return Ok(value),
}
}
}
}