1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

//! Low-level module for establishing connections with peers
//!
//! The main component of this module is the [`Transport`] trait, which provides an interface for
//! establishing both inbound and outbound connections with remote peers. The [`TransportExt`]
//! trait contains a variety of combinators for modifying a transport allowing composability and
//! layering of additional transports or protocols.
//!
//! [`Transport`]: crate::transport::Transport
//! [`TransportExt`]: crate::transport::TransportExt

use diem_types::{network_address::NetworkAddress, PeerId};
use futures::{future::Future, stream::Stream};
use serde::Serialize;
use std::fmt;

pub mod and_then;
pub mod boxed;
#[cfg(any(test, feature = "testing", feature = "fuzzing"))]
pub mod memory;
pub mod proxy_protocol;
pub mod tcp;

/// Origin of how a Connection was established.
#[derive(Clone, Copy, Hash, PartialEq, Eq, Serialize)]
pub enum ConnectionOrigin {
    /// `Inbound` indicates that we are the listener for this connection.
    Inbound,
    /// `Outbound` indicates that we are the dialer for this connection.
    Outbound,
}

impl ConnectionOrigin {
    pub fn as_str(self) -> &'static str {
        match self {
            ConnectionOrigin::Inbound => "inbound",
            ConnectionOrigin::Outbound => "outbound",
        }
    }
}

impl fmt::Debug for ConnectionOrigin {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(self.as_str())
    }
}

impl fmt::Display for ConnectionOrigin {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(self.as_str())
    }
}

/// A Transport is responsible for establishing connections with remote Peers.
///
/// Connections are established either by [listening](Transport::listen_on)
/// or [dialing](Transport::dial) on a [`Transport`]. A peer that
/// obtains a connection by listening is often referred to as the *listener* and the
/// peer that initiated the connection through dialing as the *dialer*.
///
/// Additional protocols can be layered on top of the connections established
/// by a [`Transport`] through utilizing the combinators in the [`TransportExt`] trait.
pub trait Transport {
    /// The result of establishing a connection.
    ///
    /// Generally this would include a socket-like streams which allows for sending and receiving
    /// of data through the connection.
    type Output;

    /// The Error type of errors which can happen while establishing a connection.
    type Error: ::std::error::Error + Send + Sync + 'static;

    /// A stream of [`Inbound`](Transport::Inbound) connections and the address of the dialer.
    ///
    /// An item should be produced whenever a connection is received at the lowest level of the
    /// transport stack. Each item is an [`Inbound`](Transport::Inbound) future
    /// that resolves to an [`Output`](Transport::Output) value once all protocol upgrades
    /// have been applied.
    type Listener: Stream<Item = Result<(Self::Inbound, NetworkAddress), Self::Error>>
        + Send
        + Unpin;

    /// A pending [`Output`](Transport::Output) for an inbound connection,
    /// obtained from the [`Listener`](Transport::Listener) stream.
    ///
    /// After a connection has been accepted by the transport, it may need to go through
    /// asynchronous post-processing (i.e. protocol upgrade negotiations). Such
    /// post-processing should not block the `Listener` from producing the next
    /// connection, hence further connection setup proceeds asynchronously.
    /// Once a `Inbound` future resolves it yields the [`Output`](Transport::Output)
    /// of the connection setup process.
    type Inbound: Future<Output = Result<Self::Output, Self::Error>> + Send;

    /// A pending [`Output`](Transport::Output) for an outbound connection,
    /// obtained from [dialing](Transport::dial) stream.
    type Outbound: Future<Output = Result<Self::Output, Self::Error>> + Send;

    /// Listens on the given [`NetworkAddress`], returning a stream of incoming connections.
    ///
    /// The returned [`NetworkAddress`] is the actual listening address, this is done to take into
    /// account OS-assigned port numbers (e.g. listening on port 0).
    fn listen_on(
        &self,
        addr: NetworkAddress,
    ) -> Result<(Self::Listener, NetworkAddress), Self::Error>
    where
        Self: Sized;

    /// Dials the given [`NetworkAddress`], returning a future for a pending outbound connection.
    fn dial(&self, peer_id: PeerId, addr: NetworkAddress) -> Result<Self::Outbound, Self::Error>
    where
        Self: Sized;
}

impl<T: ?Sized> TransportExt for T where T: Transport {}

/// An extension trait for [`Transport`]s that provides a variety of convenient
/// combinators.
///
/// Additional protocols or functionality can be layered on top of an existing
/// [`Transport`] by using this extension trait. For example, one might want to
/// take a raw connection and upgrade it to a secure transport followed by
/// version handshake by chaining calls to [`and_then`](TransportExt::and_then).
/// Each method yields a new [`Transport`] whose connection setup incorporates
/// all earlier upgrades followed by the new upgrade, i.e. the order of the
/// upgrades is significant.
pub trait TransportExt: Transport {
    /// Turns a [`Transport`] into an abstract boxed transport.
    fn boxed(self) -> boxed::BoxedTransport<Self::Output, Self::Error>
    where
        Self: Sized + Send + 'static,
        Self::Listener: Send + 'static,
        Self::Inbound: Send + 'static,
        Self::Outbound: Send + 'static,
    {
        boxed::BoxedTransport::new(self)
    }

    /// Applies a function producing an asynchronous result to every connection
    /// created by this transport.
    ///
    /// This function can be used for ad-hoc protocol upgrades on a transport
    /// or for processing or adapting the output of an earlier upgrade.  The
    /// provided function must take as input the output from the existing
    /// transport and a [`ConnectionOrigin`] which can be used to identify the
    /// origin of the connection (inbound vs outbound).
    fn and_then<F, Fut, O>(self, f: F) -> and_then::AndThen<Self, F>
    where
        Self: Sized,
        F: FnOnce(Self::Output, NetworkAddress, ConnectionOrigin) -> Fut + Clone,
        // Pin the error types to be the same for now
        // TODO don't require the error types to be the same
        Fut: Future<Output = Result<O, Self::Error>>,
    {
        and_then::AndThen::new(self, f)
    }
}