1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use crate::peer_manager::ConnectionNotification;
use channel::{diem_channel, message_queues::QueueStyle};
use diem_types::PeerId;
pub type Sender = diem_channel::Sender<PeerId, ConnectionNotification>;
pub type Receiver = diem_channel::Receiver<PeerId, ConnectionNotification>;
pub fn new() -> (Sender, Receiver) {
diem_channel::new(QueueStyle::LIFO, 1, None)
}
#[cfg(test)]
mod test {
use super::*;
use crate::{peer::DisconnectReason, transport::ConnectionMetadata};
use diem_config::network_id::NetworkContext;
use futures::{executor::block_on, future::FutureExt, stream::StreamExt};
fn send_new_peer(sender: &mut Sender, connection: ConnectionMetadata) {
let peer_id = connection.remote_peer_id;
let notif =
ConnectionNotification::NewPeer(connection, NetworkContext::mock_with_peer_id(peer_id));
sender.push(peer_id, notif).unwrap()
}
fn send_lost_peer(
sender: &mut Sender,
connection: ConnectionMetadata,
reason: DisconnectReason,
) {
let peer_id = connection.remote_peer_id;
let notif = ConnectionNotification::LostPeer(
connection,
NetworkContext::mock_with_peer_id(peer_id),
reason,
);
sender.push(peer_id, notif).unwrap()
}
#[test]
fn send_n_get_1() {
let (mut sender, mut receiver) = super::new();
let peer_id_a = PeerId::random();
let peer_id_b = PeerId::random();
let task = async move {
let conn_a = ConnectionMetadata::mock(peer_id_a);
let conn_b = ConnectionMetadata::mock(peer_id_b);
send_new_peer(&mut sender, conn_a.clone());
send_lost_peer(
&mut sender,
conn_a.clone(),
DisconnectReason::ConnectionLost,
);
send_new_peer(&mut sender, conn_a.clone());
send_lost_peer(&mut sender, conn_a.clone(), DisconnectReason::Requested);
let notif = ConnectionNotification::LostPeer(
conn_a.clone(),
NetworkContext::mock_with_peer_id(peer_id_a),
DisconnectReason::Requested,
);
assert_eq!(receiver.select_next_some().await, notif,);
assert_eq!(receiver.select_next_some().now_or_never(), None);
send_new_peer(&mut sender, conn_a);
send_new_peer(&mut sender, conn_b);
let _ = receiver.select_next_some().await;
let _ = receiver.select_next_some().await;
assert_eq!(receiver.select_next_some().now_or_never(), None);
};
block_on(task);
}
}