1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use crate::transport::Transport;
use diem_types::{network_address::NetworkAddress, PeerId};
use futures::{
future::{Future, FutureExt},
stream::{Stream, StreamExt},
};
use std::pin::Pin;
pub type Listener<O, E> =
Pin<Box<dyn Stream<Item = Result<(Inbound<O, E>, NetworkAddress), E>> + Send>>;
pub type Inbound<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
pub type Outbound<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
trait AbstractBoxedTransport<O, E> {
fn listen_on(&self, addr: NetworkAddress) -> Result<(Listener<O, E>, NetworkAddress), E>;
fn dial(&self, peer_id: PeerId, addr: NetworkAddress) -> Result<Outbound<O, E>, E>;
}
impl<T, O, E> AbstractBoxedTransport<O, E> for T
where
T: Transport<Output = O, Error = E> + Send + 'static,
T::Listener: Send + 'static,
T::Inbound: Send + 'static,
T::Outbound: Send + 'static,
E: ::std::error::Error + Send + Sync + 'static,
{
fn listen_on(&self, addr: NetworkAddress) -> Result<(Listener<O, E>, NetworkAddress), E> {
let (listener, addr) = self.listen_on(addr)?;
let listener = listener
.map(|result| result.map(|(incoming, addr)| (incoming.boxed() as Inbound<O, E>, addr)));
Ok((listener.boxed() as Listener<O, E>, addr))
}
fn dial(&self, peer_id: PeerId, addr: NetworkAddress) -> Result<Outbound<O, E>, E> {
let outgoing = self.dial(peer_id, addr)?;
Ok(outgoing.boxed() as Outbound<O, E>)
}
}
pub struct BoxedTransport<O, E> {
inner: Box<dyn AbstractBoxedTransport<O, E> + Send + 'static>,
}
impl<O, E> BoxedTransport<O, E>
where
E: ::std::error::Error + Send + Sync + 'static,
{
pub(crate) fn new<T>(transport: T) -> Self
where
T: Transport<Output = O, Error = E> + Send + 'static,
T::Listener: Send + 'static,
T::Inbound: Send + 'static,
T::Outbound: Send + 'static,
{
Self {
inner: Box::new(transport) as Box<_>,
}
}
}
impl<O, E> Transport for BoxedTransport<O, E>
where
E: ::std::error::Error + Send + Sync + 'static,
{
type Output = O;
type Error = E;
type Listener = Listener<O, E>;
type Inbound = Inbound<O, E>;
type Outbound = Outbound<O, E>;
fn listen_on(
&self,
addr: NetworkAddress,
) -> Result<(Self::Listener, NetworkAddress), Self::Error> {
self.inner.listen_on(addr)
}
fn dial(&self, peer_id: PeerId, addr: NetworkAddress) -> Result<Self::Outbound, Self::Error> {
self.inner.dial(peer_id, addr)
}
}