1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::serializer::{
    ExecutionCorrectnessInput, SerializerClient, SerializerService, TSerializerClient,
};
use diem_crypto::ed25519::Ed25519PrivateKey;
use diem_infallible::Mutex;
use diem_logger::warn;
use diem_secure_net::{NetworkClient, NetworkServer};
use diem_vm::DiemVM;
use executor::Executor;
use executor_types::Error;
use std::net::SocketAddr;
use storage_client::StorageClient;

pub trait RemoteService {
    fn client(&self) -> SerializerClient {
        let network_client =
            NetworkClient::new("execution", self.server_address(), self.network_timeout());
        let service = Box::new(RemoteClient::new(Mutex::new(network_client)));
        SerializerClient::new_client(service)
    }

    fn server_address(&self) -> SocketAddr;
    fn network_timeout(&self) -> u64;
}

pub fn execute(
    storage_addr: SocketAddr,
    listen_addr: SocketAddr,
    prikey: Option<Ed25519PrivateKey>,
    network_timeout: u64,
) {
    let block_executor = Box::new(Executor::<DiemVM>::new(
        StorageClient::new(&storage_addr, network_timeout).into(),
    ));
    let serializer_service = SerializerService::new(block_executor, prikey);
    let mut network_server = NetworkServer::new("execution", listen_addr, network_timeout);

    loop {
        if let Err(e) = process_one_message(&mut network_server, &serializer_service) {
            warn!("Warning: Failed to process message: {}", e);
        }
    }
}

fn process_one_message(
    network_server: &mut NetworkServer,
    serializer_service: &SerializerService,
) -> Result<(), Error> {
    let request = network_server.read()?;
    let response = serializer_service.handle_message(request)?;
    network_server.write(&response)?;
    Ok(())
}

struct RemoteClient {
    network_client: Mutex<NetworkClient>,
}

impl RemoteClient {
    pub fn new(network_client: Mutex<NetworkClient>) -> Self {
        Self { network_client }
    }

    fn process_one_message(&self, input: &[u8]) -> Result<Vec<u8>, Error> {
        let mut client = self.network_client.lock();
        client.write(input)?;
        client.read().map_err(|e| e.into())
    }
}

impl TSerializerClient for RemoteClient {
    fn request(&self, input: ExecutionCorrectnessInput) -> Result<Vec<u8>, Error> {
        let input_message = bcs::to_bytes(&input)?;
        loop {
            match self.process_one_message(&input_message) {
                Err(err) => warn!("Failed to communicate with LEC service: {}", err),
                Ok(value) => return Ok(value),
            }
        }
    }
}