1use std::{
6 cmp::{min, Ordering},
7 collections::{BinaryHeap, HashMap, HashSet, VecDeque},
8 fmt::Formatter,
9 fs,
10 io::{self, Read, Write},
11 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
12 path::{Path, PathBuf},
13 str::FromStr,
14 sync::{atomic::Ordering as AtomicOrdering, Arc},
15 time::{Duration, Instant},
16};
17
18use keccak_hash::keccak;
19use mio::{
20 net::{TcpListener, TcpStream, UdpSocket},
21 Interest, Registry, Token,
22};
23use parity_path::restrict_permissions_owner;
24use parking_lot::{Mutex, RwLock};
25use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
26
27use crate::keylib::{sign, Generator, KeyPair, Random, Secret};
28use cfx_addr::Network;
29use cfx_bytes::Bytes;
30use cfx_util_macros::bail;
31use diem_crypto::ValidCryptoMaterialStringExt;
32use diem_types::{
33 account_address::from_consensus_public_key,
34 validator_config::{
35 ConsensusPrivateKey, ConsensusPublicKey, ConsensusVRFPrivateKey,
36 ConsensusVRFPublicKey,
37 },
38};
39use log::{debug, info, trace, warn};
40use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
41use priority_send_queue::SendQueuePriority;
42use serde::{Deserialize, Serialize};
43
44use crate::{
45 discovery::Discovery,
46 handshake::BYPASS_CRYPTOGRAPHY,
47 iolib::{
48 IoContext, IoHandler, IoService, MapNonBlock, StreamToken, TimerToken,
49 },
50 ip_utils::{map_external_address, select_public_address},
51 node_database::NodeDatabase,
52 node_table::*,
53 parse_msg_id_leb128_2_bytes_at_most,
54 session::{self, Session, SessionData, SessionDetails},
55 session_manager::SessionManager,
56 Error, HandlerWorkType, IpFilter, NatType, NetworkConfiguration,
57 NetworkContext as NetworkContextTrait, NetworkIoMessage,
58 NetworkProtocolHandler, PeerInfo, ProtocolId, ProtocolInfo,
59 UpdateNodeOperation, NODE_TAG_ARCHIVE, NODE_TAG_NODE_TYPE,
60};
61
62use super::DisconnectReason;
63
64const MAX_SESSIONS: usize = 2048;
65
66const DEFAULT_PORT: u16 = 32323;
67
68const FIRST_SESSION: StreamToken = 0;
69const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1;
70const SYS_TIMER: TimerToken = LAST_SESSION + 1;
71const TCP_ACCEPT: StreamToken = SYS_TIMER + 1;
72const HOUSEKEEPING: TimerToken = SYS_TIMER + 2;
73const UDP_MESSAGE: StreamToken = SYS_TIMER + 3;
74const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4;
75const FAST_DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 5;
76const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 6;
77const NODE_TABLE: TimerToken = SYS_TIMER + 7;
78const SEND_DELAYED_MESSAGES: TimerToken = SYS_TIMER + 8;
79const CHECK_SESSIONS: TimerToken = SYS_TIMER + 9;
80const HANDLER_TIMER: TimerToken = LAST_SESSION + 256;
81const STOP_NET_POLL: TimerToken = HANDLER_TIMER + 1;
82
83pub const DEFAULT_HOUSEKEEPING_TIMEOUT: Duration = Duration::from_secs(2);
84pub const DEFAULT_DISCOVERY_REFRESH_TIMEOUT: Duration =
86 Duration::from_secs(120);
87pub const DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT: Duration =
89 Duration::from_secs(10);
90pub const DEFAULT_DISCOVERY_ROUND_TIMEOUT: Duration =
92 Duration::from_millis(500);
93pub const DEFAULT_NODE_TABLE_TIMEOUT: Duration = Duration::from_secs(300);
96pub const DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION: Duration =
99 Duration::from_secs(3 * 24 * 3600);
100const DEFAULT_CHECK_SESSIONS_TIMEOUT: Duration = Duration::from_secs(10);
101
102#[derive(
103 Clone,
104 Copy,
105 Debug,
106 Default,
107 Eq,
108 PartialOrd,
109 PartialEq,
110 Serialize,
111 Deserialize,
112)]
113pub struct ProtocolVersion(pub u8);
114
115impl MallocSizeOf for ProtocolVersion {
116 fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { 0 }
117}
118
119pub const MAX_DATAGRAM_SIZE: usize = 1280;
120pub const UDP_PROTOCOL_DISCOVERY: u8 = 1;
121
122pub struct Datagram {
123 pub payload: Bytes,
124 pub address: SocketAddr,
125}
126
127pub struct UdpChannel {
128 pub send_queue: VecDeque<Datagram>,
129}
130
131impl Default for UdpChannel {
132 fn default() -> Self { Self::new() }
133}
134
135impl UdpChannel {
136 pub fn new() -> UdpChannel {
137 UdpChannel {
138 send_queue: VecDeque::new(),
139 }
140 }
141
142 pub fn any_sends_queued(&self) -> bool { !self.send_queue.is_empty() }
143
144 pub fn dequeue_send(&mut self) -> Option<Datagram> {
145 self.send_queue.pop_front()
146 }
147
148 pub fn requeue_send(&mut self, datagram: Datagram) {
149 self.send_queue.push_front(datagram)
150 }
151}
152
153pub struct UdpIoContext<'a> {
154 pub channel: &'a RwLock<UdpChannel>,
155 pub node_db: &'a RwLock<NodeDatabase>,
156}
157
158impl<'a> UdpIoContext<'a> {
159 pub fn new(
160 channel: &'a RwLock<UdpChannel>, node_db: &'a RwLock<NodeDatabase>,
161 ) -> UdpIoContext<'a> {
162 UdpIoContext { channel, node_db }
163 }
164
165 pub fn send(&self, payload: Bytes, address: SocketAddr) {
166 self.channel
167 .write()
168 .send_queue
169 .push_back(Datagram { payload, address });
170 }
171}
172
173pub struct NetworkService {
178 pub io_service: Option<IoService<NetworkIoMessage>>,
179 pub inner: Option<Arc<NetworkServiceInner>>,
180 config: NetworkConfiguration,
181}
182
183impl NetworkService {
184 pub fn new(config: NetworkConfiguration) -> NetworkService {
185 NetworkService {
186 io_service: None,
187 inner: None,
188 config,
189 }
190 }
191
192 pub fn is_consortium(&self) -> bool { self.config.is_consortium }
193
194 pub fn get_network_type(&self) -> &Network {
195 self.config.get_network_type()
196 }
197
198 pub fn network_id(&self) -> u64 { self.config.id }
199
200 pub fn is_test_mode(&self) -> bool { self.config.test_mode }
201
202 pub fn initialize(
204 &mut self, pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
205 ) -> Result<(), Error> {
206 let raw_io_service =
207 IoService::<NetworkIoMessage>::start(STOP_NET_POLL)?;
208 self.io_service = Some(raw_io_service);
209
210 if self.inner.is_none() {
211 if self.config.test_mode {
212 BYPASS_CRYPTOGRAPHY.store(true, AtomicOrdering::Relaxed);
213 }
214
215 let inner = Arc::new(match self.config.test_mode {
216 true => NetworkServiceInner::new_with_latency(
217 &self.config,
218 pos_pub_keys,
219 )?,
220 false => NetworkServiceInner::new(&self.config, pos_pub_keys)?,
221 });
222 self.io_service
223 .as_ref()
224 .unwrap()
225 .register_handler(inner.clone())?;
226 self.inner = Some(inner);
227 }
228 Ok(())
229 }
230
231 pub fn start(&self) {
232 let handler = self.inner.as_ref().unwrap().clone();
233 self.io_service
234 .as_ref()
235 .expect("Already set")
236 .start_network_poll(handler, MAX_SESSIONS);
237 }
238
239 pub fn add_peer(&self, node: NodeEntry) -> Result<(), Error> {
241 if let Some(ref x) = self.inner {
242 x.node_db.write().insert_trusted(node);
243 Ok(())
244 } else {
245 Err("Network service not started yet!".into())
246 }
247 }
248
249 pub fn drop_peer(&self, node: NodeEntry) -> Result<(), Error> {
251 if let Some(ref x) = self.inner {
252 x.drop_node(node.id)
253 } else {
254 Err("Network service not started yet!".into())
255 }
256 }
257
258 pub fn local_addr(&self) -> Option<SocketAddr> {
260 self.inner.as_ref().map(|inner_ref| inner_ref.local_addr())
261 }
262
263 pub fn register_protocol(
265 &self, handler: Arc<dyn NetworkProtocolHandler + Sync>,
266 protocol: ProtocolId, version: ProtocolVersion,
267 ) -> Result<(), Error> {
268 let (tx, rx) = std::sync::mpsc::sync_channel(0);
269 self.io_service.as_ref().unwrap().send_message(
270 NetworkIoMessage::AddHandler {
271 handler,
272 protocol,
273 version,
274 callback: tx,
275 },
276 )?;
277 rx.recv().expect("protocol register error");
279 Ok(())
280 }
281
282 pub fn with_context<F, R>(
284 &self, handler: Arc<dyn NetworkProtocolHandler + Sync>,
285 protocol: ProtocolId, action: F,
286 ) -> Result<R, String>
287 where
288 F: FnOnce(&NetworkContext) -> R,
289 {
290 let io = IoContext::new(self.io_service.as_ref().unwrap().channel(), 0);
291 match self.inner {
292 Some(ref inner) => {
293 Ok(inner.with_context(handler, protocol, &io, action))
294 }
295 None => Err("Network service not started yet!".to_owned()),
296 }
297 }
298
299 pub fn get_peer_info(&self) -> Option<Vec<PeerInfo>> {
301 self.inner.as_ref().map(|inner| inner.get_peer_info())
302 }
303
304 pub fn sign_challenge(&self, challenge: Vec<u8>) -> Result<Vec<u8>, Error> {
306 let hash = keccak(challenge);
307 if let Some(ref inner) = self.inner {
308 let signature = match sign(inner.metadata.keys.secret(), &hash) {
309 Ok(s) => s,
310 Err(e) => {
311 warn!("Error signing hello packet");
312 return Err(Error::from(e));
313 }
314 };
315 Ok(signature[..].to_owned())
316 } else {
317 Err("Network service not started yet!".into())
318 }
319 }
320
321 pub fn net_key_pair(&self) -> Result<KeyPair, Error> {
322 if let Some(ref inner) = self.inner {
323 Ok(inner.metadata.keys.clone())
324 } else {
325 Err("Network service not started yet!".into())
326 }
327 }
328
329 pub fn pos_public_key(&self) -> Option<ConsensusPublicKey> {
330 if let Some(ref inner) = self.inner {
331 inner.sessions.self_pos_public_key.clone().map(|k| k.0)
332 } else {
333 None
334 }
335 }
336
337 pub fn add_latency(
338 &self, id: NodeId, latency_ms: f64,
339 ) -> Result<(), Error> {
340 if let Some(ref inner) = self.inner {
341 inner.add_latency(id, latency_ms)
342 } else {
343 Err("Network service not started yet!".into())
344 }
345 }
346
347 pub fn get_node(&self, id: &NodeId) -> Option<(bool, Node)> {
348 let inner = self.inner.as_ref()?;
349 let node_db = inner.node_db.read();
350 let (trusted, node) = node_db.get_with_trusty(id)?;
351 Some((trusted, node.clone()))
352 }
353
354 pub fn get_detailed_sessions(
355 &self, node_id: Option<NodeId>,
356 ) -> Option<Vec<SessionDetails>> {
357 let inner = self.inner.as_ref()?;
358 match node_id {
359 None => Some(
360 inner
361 .sessions
362 .all()
363 .iter()
364 .map(|s| s.read().details())
365 .collect(),
366 ),
367 Some(id) => {
368 let session = inner.sessions.get_by_id(&id)?;
369 let details = session.read().details();
370 Some(vec![details])
371 }
372 }
373 }
374
375 pub fn disconnect_node(
376 &self, id: &NodeId, op: Option<UpdateNodeOperation>,
377 ) -> bool {
378 if self.inner.is_none() || self.io_service.is_none() {
379 return false;
380 }
381 let inner = self.inner.as_ref().unwrap();
382 let io = IoContext::new(self.io_service.as_ref().unwrap().channel(), 0);
383 inner.kill_connection(
384 id,
385 &io,
386 true,
387 op,
388 "disconnect requested", );
390 true
391 }
392
393 pub fn save_node_db(&self) {
394 if let Some(inner) = &self.inner {
395 inner.node_db.write().save();
396 }
397 }
398}
399
400type SharedSession = Arc<RwLock<Session>>;
401
402pub struct HostMetadata {
403 pub network_id: u64,
404 pub keys: KeyPair,
406 pub protocols: RwLock<Vec<ProtocolInfo>>,
407 pub minimum_peer_protocol_version: RwLock<Vec<ProtocolInfo>>,
408 pub local_address: SocketAddr,
409 pub local_endpoint: NodeEndpoint,
411 pub public_endpoint: NodeEndpoint,
413}
414
415impl HostMetadata {
416 pub(crate) fn secret(&self) -> &Secret { self.keys.secret() }
417
418 pub(crate) fn id(&self) -> &NodeId { self.keys.public() }
419}
420
421#[derive(Copy, Clone)]
422struct ProtocolTimer {
423 pub protocol: ProtocolId,
424 pub token: TimerToken, }
426
427pub struct NetworkServiceInner {
430 pub sessions: SessionManager,
431 pub metadata: HostMetadata,
432 pub config: NetworkConfiguration,
433 udp_socket: Mutex<UdpSocket>,
434 tcp_listener: Mutex<TcpListener>,
435 udp_channel: RwLock<UdpChannel>,
436 discovery: Mutex<Option<Discovery>>,
437 handlers:
438 RwLock<HashMap<ProtocolId, Arc<dyn NetworkProtocolHandler + Sync>>>,
439 timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
440 timer_counter: RwLock<usize>,
441 pub node_db: RwLock<NodeDatabase>,
442 reserved_nodes: RwLock<HashSet<NodeId>>,
443 dropped_nodes: RwLock<HashSet<NodeId>>,
444
445 is_consortium: bool,
446
447 delayed_queue: Option<DelayedQueue>,
449}
450
451struct DelayedQueue {
452 queue: Mutex<BinaryHeap<DelayMessageContext>>,
453 latencies: RwLock<HashMap<NodeId, Duration>>,
454}
455
456impl DelayedQueue {
457 fn new() -> Self {
458 DelayedQueue {
459 queue: Mutex::new(BinaryHeap::new()),
460 latencies: RwLock::new(HashMap::new()),
461 }
462 }
463
464 fn send_delayed_messages(&self, network_service: &NetworkServiceInner) {
465 let context = self.queue.lock().pop().unwrap();
466 let r = context.session.write().send_packet(
467 &context.io,
468 Some(context.protocol),
469 context.min_protocol_version,
470 session::PACKET_USER,
471 context.msg,
472 context.priority,
473 );
474 match r {
475 Ok(_) => {}
476 Err(Error::Expired) => {
477 info!(
481 "Error sending delayed message to expired connection {:?}",
482 context.peer
483 );
484 }
485 Err(e) => {
486 info!(
487 "Error sending delayed message: peer={:?} err={:?}",
488 context.peer, e
489 );
490 network_service.kill_connection(
491 &context.peer,
492 &context.io,
493 true,
494 Some(UpdateNodeOperation::Failure),
495 "failed to send delayed message", );
497 }
498 };
499 }
500}
501
502impl NetworkServiceInner {
503 pub fn new(
504 config: &NetworkConfiguration,
505 pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
506 ) -> Result<NetworkServiceInner, Error> {
507 let mut listen_address = match config.listen_address {
508 None => SocketAddr::V4(SocketAddrV4::new(
509 Ipv4Addr::new(0, 0, 0, 0),
510 DEFAULT_PORT,
511 )),
512 Some(addr) => addr,
513 };
514
515 let keys = if let Some(ref secret) = config.use_secret {
516 KeyPair::from_secret(secret.clone())?
517 } else {
518 config
519 .config_path
520 .clone()
521 .and_then(|ref p| load_key(Path::new(&p)))
522 .map_or_else(
523 || {
524 let key = Random
525 .generate()
526 .expect("Error generating random key pair");
527 if let Some(path) = config.config_path.clone() {
528 save_key(Path::new(&path), key.secret());
529 }
530 key
531 },
532 |s| {
533 KeyPair::from_secret(s)
534 .expect("Error creating node secret key")
535 },
536 )
537 };
538 info!("Self pos public key: {:?}", pos_pub_keys);
539
540 info!("Self node id: {:?}", *keys.public());
541
542 let tcp_listener = TcpListener::bind(listen_address)?;
543 listen_address = SocketAddr::new(
544 listen_address.ip(),
545 tcp_listener.local_addr()?.port(),
546 );
547 debug!("Listening at {:?}", listen_address);
548 let udp_port = config.udp_port.unwrap_or_else(|| listen_address.port());
549 let local_endpoint = NodeEndpoint {
550 address: listen_address,
551 udp_port,
552 };
553 let mut udp_addr = local_endpoint.address;
554 udp_addr.set_port(local_endpoint.udp_port);
555 let udp_socket =
556 UdpSocket::bind(udp_addr).expect("Error binding UDP socket");
557
558 let public_address = config.public_address;
559 let public_endpoint = match public_address {
560 None => {
561 let public_address =
562 select_public_address(local_endpoint.address.port());
563 let public_endpoint = NodeEndpoint {
564 address: public_address,
565 udp_port: local_endpoint.udp_port,
566 };
567 if config.nat_enabled {
568 match map_external_address(&local_endpoint, &NatType::Any) {
569 Some(endpoint) => {
570 info!(
571 "NAT mapped to external address {}",
572 endpoint.address
573 );
574 endpoint
575 }
576 None => public_endpoint,
577 }
578 } else {
579 public_endpoint
580 }
581 }
582 Some(addr) => NodeEndpoint {
583 address: addr,
584 udp_port: local_endpoint.udp_port,
585 },
586 };
587
588 let allow_ips = config.ip_filter.clone();
589 let discovery = {
590 if config.discovery_enabled {
591 Some(Discovery::new(
592 &keys,
593 public_endpoint.clone(),
594 allow_ips,
595 config.discovery_config.clone(),
596 ))
597 } else {
598 None
599 }
600 };
601
602 let nodes_path = config.config_path.clone();
603 let own_node_id = *keys.public();
604
605 let mut inner = NetworkServiceInner {
606 metadata: HostMetadata {
607 network_id: config.id,
608 keys,
609 protocols: RwLock::new(Vec::new()),
610 minimum_peer_protocol_version: Default::default(),
611 local_address: listen_address,
612 local_endpoint,
613 public_endpoint,
614 },
615 config: config.clone(),
616 udp_channel: RwLock::new(UdpChannel::new()),
617 discovery: Mutex::new(discovery),
618 udp_socket: Mutex::new(udp_socket),
619 tcp_listener: Mutex::new(tcp_listener),
620 sessions: SessionManager::new(
621 FIRST_SESSION,
622 MAX_SESSIONS,
623 config.max_incoming_peers,
624 own_node_id,
625 &config.session_ip_limit_config,
626 Some(pos_pub_keys),
627 ),
628 handlers: RwLock::new(HashMap::new()),
629 timers: RwLock::new(HashMap::new()),
630 timer_counter: RwLock::new(HANDLER_TIMER),
631 node_db: RwLock::new(NodeDatabase::new(
632 nodes_path,
633 config.subnet_quota,
634 )),
635 reserved_nodes: RwLock::new(HashSet::new()),
636 dropped_nodes: RwLock::new(HashSet::new()),
637 is_consortium: config.is_consortium,
638 delayed_queue: None,
639 };
640
641 for n in &config.boot_nodes {
642 inner.add_boot_node(n);
643 }
644
645 let reserved_nodes = config.reserved_nodes.clone();
646 for n in reserved_nodes {
647 if let Err(e) = inner.add_reserved_node(&n) {
648 debug!("Error parsing node id: {}: {:?}", n, e);
649 }
650 }
651
652 Ok(inner)
653 }
654
655 pub fn new_with_latency(
656 config: &NetworkConfiguration,
657 pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
658 ) -> Result<NetworkServiceInner, Error> {
659 let mut inner = NetworkServiceInner::new(config, pos_pub_keys)?;
660 inner.delayed_queue = Some(DelayedQueue::new());
661 Ok(inner)
662 }
663
664 pub fn add_latency(
665 &self, peer: NodeId, latency_ms: f64,
666 ) -> Result<(), Error> {
667 match self.delayed_queue {
668 Some(ref queue) => {
669 let mut latencies = queue.latencies.write();
670 latencies
671 .insert(peer, Duration::from_millis(latency_ms as u64));
672 Ok(())
673 }
674 None => Err(
675 "conflux not in test mode, and does not support add_latency"
676 .into(),
677 ),
678 }
679 }
680
681 pub fn get_ip_filter(&self) -> &IpFilter { &self.config.ip_filter }
682
683 fn add_boot_node(&self, id: &str) {
684 match Node::from_str(id) {
685 Err(e) => {
686 debug!("Could not add node {}: {:?}", id, e);
687 }
688 Ok(n) => {
689 self.node_db.write().insert_trusted(NodeEntry {
690 id: n.id,
691 endpoint: n.endpoint,
692 });
693 }
694 }
695 }
696
697 fn add_reserved_node(&mut self, id: &str) -> Result<(), Error> {
698 let n = Node::from_str(id)?;
699 self.node_db.write().insert_trusted(NodeEntry {
700 id: n.id,
701 endpoint: n.endpoint.clone(),
702 });
703 self.reserved_nodes.write().insert(n.id);
704 Ok(())
705 }
706
707 fn initialize_udp_protocols(
708 &self, io: &IoContext<NetworkIoMessage>,
709 ) -> Result<(), Error> {
710 if let Some(discovery) = self.discovery.lock().as_mut() {
712 let allow_ips = self.config.ip_filter.clone();
713 let nodes = self.node_db.read().sample_trusted_nodes(
714 self.config.discovery_config.discover_node_count,
715 &allow_ips,
716 );
717 discovery.try_ping_nodes(
718 &UdpIoContext::new(&self.udp_channel, &self.node_db),
719 nodes,
720 );
721 io.register_timer(
722 FAST_DISCOVERY_REFRESH,
723 self.config.fast_discovery_refresh_timeout,
724 )?;
725 io.register_timer(
726 DISCOVERY_REFRESH,
727 self.config.discovery_refresh_timeout,
728 )?;
729 io.register_timer(
730 DISCOVERY_ROUND,
731 self.config.discovery_round_timeout,
732 )?;
733 }
734 io.register_timer(NODE_TABLE, self.config.node_table_timeout)?;
735 io.register_timer(CHECK_SESSIONS, DEFAULT_CHECK_SESSIONS_TIMEOUT)?;
736
737 Ok(())
738 }
739
740 fn try_promote_untrusted(&self) {
741 let mut incoming_ids: Vec<NodeId> = Vec::new();
743 for s in self.sessions.all() {
744 if let Some(s) = s.try_read() {
745 if s.is_ready() && !s.metadata.originated && !s.expired() {
746 if let Some(id) = s.metadata.id {
748 incoming_ids.push(id);
749 }
750 }
751 }
752 }
753
754 self.node_db.write().promote(
757 incoming_ids,
758 self.config.connection_lifetime_for_promotion,
759 );
760 }
761
762 pub fn local_addr(&self) -> SocketAddr { self.metadata.local_address }
763
764 fn drop_node(&self, local_id: NodeId) -> Result<(), Error> {
765 let removed_node = self.node_db.write().remove(&local_id);
766
767 if let Some(node) = removed_node {
768 assert_eq!(node.id, local_id);
769 if let Some(_stream_token) = node.stream_token {
770 let mut wd = self.dropped_nodes.write();
771 wd.insert(node.id);
772 }
773 }
774
775 Ok(())
776 }
777
778 fn has_enough_outgoing_peers(
779 &self, tag: Option<(&str, &str)>, max: usize,
780 ) -> bool {
781 let count = match tag {
782 Some((k, v)) => self.sessions.count_with_tag(&k.into(), &v.into()),
783 None => self.sessions.stat().1, };
785
786 count >= max
787 }
788
789 fn on_housekeeping(&self, io: &IoContext<NetworkIoMessage>) {
790 if self.is_consortium {
791 unimplemented!();
792 } else {
793 self.connect_peers(io);
794 }
795 self.drop_peers(io);
796 }
797
798 fn connect_peers(&self, io: &IoContext<NetworkIoMessage>) {
800 if self
801 .metadata
802 .minimum_peer_protocol_version
803 .read()
804 .is_empty()
805 {
806 return;
809 }
810
811 let self_id = *self.metadata.id();
812
813 let sampled_archive_nodes = self.sample_archive_nodes();
814
815 let (handshake_count, egress_count, ingress_count) =
816 self.sessions.stat();
817 let samples;
818 {
819 let egress_attempt_count = if self.config.max_outgoing_peers
820 > egress_count + sampled_archive_nodes.len()
821 {
822 self.config.max_outgoing_peers
823 - egress_count
824 - sampled_archive_nodes.len()
825 } else {
826 0
827 };
828 samples = self.node_db.read().sample_trusted_node_ids(
829 egress_attempt_count as u32,
830 &self.config.ip_filter,
831 );
832 }
833
834 let reserved_nodes = self.reserved_nodes.read();
835 let nodes = reserved_nodes
837 .iter()
838 .cloned()
839 .chain(sampled_archive_nodes)
840 .chain(samples);
841
842 let max_handshakes_per_round = self.config.max_handshakes / 2;
843 let mut started: usize = 0;
844 for id in nodes
845 .filter(|id| !self.sessions.contains_node(id) && *id != self_id)
846 .take(min(
847 max_handshakes_per_round,
848 self.config.max_handshakes.saturating_sub(handshake_count),
849 ))
850 {
851 self.connect_peer(&id, io);
852 started += 1;
853 }
854 debug!(
855 "Connecting peers: {} sessions, {} pending + {} started",
856 egress_count + ingress_count,
857 handshake_count,
858 started
859 );
860 if egress_count + ingress_count == 0 {
861 warn!(
862 "No peers connected at this moment, {} pending + {} started",
863 handshake_count, started
864 );
865 }
866 }
867
868 fn sample_archive_nodes(&self) -> HashSet<NodeId> {
870 if self.config.max_outgoing_peers_archive == 0 {
871 return HashSet::new();
872 }
873
874 let key: String = NODE_TAG_NODE_TYPE.into();
875 let value: String = NODE_TAG_ARCHIVE.into();
876 let archive_sessions = self.sessions.count_with_tag(&key, &value);
877
878 if archive_sessions >= self.config.max_outgoing_peers_archive {
879 return HashSet::new();
880 }
881
882 self.node_db.read().sample_trusted_node_ids_with_tag(
883 (self.config.max_outgoing_peers_archive - archive_sessions) as u32,
884 &key,
885 &value,
886 )
887 }
888
889 fn drop_peers(&self, io: &IoContext<NetworkIoMessage>) {
891 {
892 if self.dropped_nodes.read().is_empty() {
893 return;
894 }
895 }
896
897 let mut killed_nodes = HashSet::new();
898 let mut w = self.dropped_nodes.write();
899 for node_id in w.iter() {
900 if self.kill_connection(
901 node_id,
902 io,
903 true,
904 Some(UpdateNodeOperation::Failure),
905 "peer dropped in manual", ) {
907 killed_nodes.insert(*node_id);
908 }
909 }
910
911 for node_id in killed_nodes.iter() {
912 w.remove(node_id);
913 }
914 }
915
916 fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage>) {
917 if self.sessions.contains_node(id) {
918 trace!("Abort connect. Node already connected");
919 return;
920 }
921
922 let (socket, address) = {
923 let address = {
924 if let Some(node) = self.node_db.read().get(id, true) {
926 node.endpoint.address
927 } else {
928 debug!("Abort connect. Node expired");
929 return;
930 }
931 };
932
933 if !self.sessions.is_ip_allowed(&address.ip()) {
934 debug!("cannot create outgoing connection to node, id = {:?}, address = {:?}", id, address);
935 return;
936 }
937
938 match TcpStream::connect(address) {
939 Ok(socket) => {
940 trace!("{}: connecting to {:?}", id, address);
941 (socket, address)
942 }
943 Err(e) => {
944 self.node_db.write().note_failure(
945 id, true, true, );
948 debug!(
949 "{}: can't connect o address {:?} {:?}",
950 id, address, e
951 );
952 return;
953 }
954 }
955 };
956
957 if let Err(e) = self.create_connection(socket, address, Some(id), io) {
958 self.node_db.write().note_failure(
959 id, true, true, );
962 debug!("Can't create connection: {:?}", e);
963 }
964 }
965
966 pub fn get_peer_info(&self) -> Vec<PeerInfo> {
967 debug!("get_peer_info: enter");
968
969 let mut peers = Vec::with_capacity(self.sessions.count());
970 debug!("get_peer_info: {} sessions in total", peers.capacity());
971
972 for session in self.sessions.all() {
973 let sess = session.read();
974 if !sess.expired() {
975 peers.push(PeerInfo {
976 id: sess.token(),
977 nodeid: *sess.id().unwrap_or(&NodeId::default()),
978 addr: sess.address(),
979 protocols: sess.metadata.peer_protocols.clone(),
980 });
981 }
982 }
983
984 debug!("get_peer_info: leave, {} peers retrieved", peers.len());
985
986 peers
987 }
988
989 pub fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool> {
990 match self.sessions.get_by_id(node_id) {
991 Some(session) => {
992 let sess = session.read();
993 Some(sess.originated())
994 }
995 None => None,
996 }
997 }
998
999 fn start(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
1000 self.initialize_udp_protocols(io)?;
1001 io.register_stream(UDP_MESSAGE)?;
1002 io.register_stream(TCP_ACCEPT)?;
1003 Ok(())
1004 }
1005
1006 fn create_connection(
1010 &self, socket: TcpStream, address: SocketAddr, id: Option<&NodeId>,
1011 io: &IoContext<NetworkIoMessage>,
1012 ) -> Result<(), Error> {
1013 match self.sessions.create(socket, address, id, io, self) {
1014 Ok(token) => {
1015 debug!("new session created, token = {}, address = {:?}, id = {:?}", token, address, id);
1016 if let Some(id) = id {
1017 self.node_db.write().note_success(id, Some(token), true);
1021 }
1022 io.register_stream(token).map(|_| ()).map_err(Into::into)
1023 }
1024 Err(reason) => {
1025 debug!("failed to create session, reason = {}, address = {:?}, id = {:?}", reason, address, id);
1026 Ok(())
1027 }
1028 }
1029 }
1030
1031 fn connection_closed(
1032 &self, stream: StreamToken, io: &IoContext<NetworkIoMessage>,
1033 ) {
1034 trace!("Connection closed: {}", stream);
1035 self.kill_connection_by_token(
1036 stream,
1037 io,
1038 true,
1039 Some(UpdateNodeOperation::Failure),
1040 "connection closed", );
1042 }
1043
1044 fn session_readable(
1045 &self, stream: StreamToken, io: &IoContext<NetworkIoMessage>,
1046 ) {
1047 let mut handshake_done = false;
1048
1049 let mut messages: Vec<(ProtocolId, Vec<u8>)> = Vec::new();
1050 let mut kill = false;
1051 let mut token_to_disconnect = None;
1052
1053 let session = if let Some(session) = self.sessions.get(stream) {
1054 session
1055 } else {
1056 return;
1057 };
1058
1059 let mut session_node_id = session.read().id().copied();
1062 let mut pos_public_key_opt = None;
1063 if let Some(node_id) = session_node_id {
1064 let to_drop = self.dropped_nodes.read().contains(&node_id);
1065 self.drop_peers(io);
1066 if to_drop {
1067 return;
1068 }
1069 }
1070
1071 loop {
1072 let mut sess = session.write();
1073 let data = sess.readable(io, self);
1074 let session_data = match data {
1075 Ok(session_data) => session_data,
1076 Err(e) => {
1077 debug!("Failed to read session data, error = {:?}, session = {:?}", e, *sess);
1078 kill = true;
1079 break;
1080 }
1081 };
1082
1083 if session_data.token_to_disconnect.is_some() {
1084 debug!(
1085 "session_readable: set token_to_disconnect to {:?}",
1086 session_data.token_to_disconnect
1087 );
1088 token_to_disconnect = session_data.token_to_disconnect;
1089 }
1090
1091 match session_data.session_data {
1092 SessionData::Ready { pos_public_key } => {
1093 debug!(
1094 "receive Ready with pos_public_key={:?} account={:?}",
1095 pos_public_key,
1096 pos_public_key
1097 .as_ref()
1098 .map(|k| from_consensus_public_key(&k.0, &k.1)),
1099 );
1100 handshake_done = true;
1101 session_node_id = Some(*sess.id().unwrap());
1102 pos_public_key_opt = pos_public_key;
1103 }
1104 SessionData::Message { data, protocol } => {
1105 drop(sess);
1106 match self.handlers.read().get(&protocol) {
1107 None => warn!(
1108 "No handler found for protocol: {:?}",
1109 protocol
1110 ),
1111 Some(_) => {
1112 messages.push((protocol, data));
1113 }
1114 }
1115 }
1116 SessionData::None => break,
1117 SessionData::Continue => {}
1118 }
1119 }
1120
1121 if let Some(token_to_disconnect) = token_to_disconnect {
1122 self.kill_connection_by_token(
1125 token_to_disconnect.0,
1126 io,
1127 true,
1128 None,
1129 token_to_disconnect.1.as_str(), );
1131 }
1132
1133 if kill {
1134 self.kill_connection_by_token(
1135 stream,
1136 io,
1137 true,
1138 Some(UpdateNodeOperation::Failure),
1139 "session readable error", );
1141 return;
1142 }
1143
1144 if handshake_done {
1147 let handlers = self.handlers.read();
1148 let session_metadata = session.read().metadata.clone();
1152 for protocol in &session_metadata.peer_protocols {
1153 if let Some(handler) = handlers.get(&protocol.protocol).cloned()
1154 {
1155 debug!("session handshaked, token = {}", stream);
1156 let network_context = NetworkContext::new(
1157 io,
1158 handler,
1159 protocol.protocol,
1160 self,
1161 );
1162 network_context.protocol_handler().on_peer_connected(
1163 &network_context,
1164 session_node_id.as_ref().unwrap(),
1165 protocol.version,
1166 pos_public_key_opt.clone(),
1167 );
1168 }
1169 }
1170 }
1171
1172 for (protocol, data) in messages {
1173 if let Err(e) = io.handle(
1174 stream,
1175 0, NetworkIoMessage::HandleProtocolMessage {
1179 protocol,
1180 peer: stream,
1181 node_id: *session_node_id.as_ref().unwrap(),
1182 data,
1183 },
1184 ) {
1185 warn!("Error occurs, discard protocol message: err={}", e);
1186 }
1187 }
1188 }
1189
1190 fn session_writable(
1191 &self, stream: StreamToken, io: &IoContext<NetworkIoMessage>,
1192 ) {
1193 if let Some(session) = self.sessions.get(stream) {
1194 {
1195 let sess_id = session.read().id().copied();
1198 if let Some(node_id) = sess_id {
1199 let to_drop = self.dropped_nodes.read().contains(&node_id);
1200 self.drop_peers(io);
1201 if to_drop {
1202 return;
1203 }
1204 }
1205 }
1206 let mut sess = session.write();
1207 if let Err(e) = sess.writable(io) {
1208 trace!("{}: Session write error: {:?}", stream, e);
1209 }
1210 if sess.done() {
1211 io.deregister_stream(stream).unwrap_or_else(|e| {
1212 debug!("Error deregistering stream: {:?}", e)
1213 });
1214 }
1215 }
1216 }
1217
1218 fn accept(&self, io: &IoContext<NetworkIoMessage>) {
1219 trace!("Accepting incoming connection");
1220 loop {
1221 let (socket, address) = match self.tcp_listener.lock().accept() {
1222 Ok((sock, addr)) => (sock, addr),
1223 Err(e) => {
1224 if e.kind() != io::ErrorKind::WouldBlock {
1225 debug!("Error accepting connection: {:?}", e);
1226 }
1227 break;
1228 }
1229 };
1230
1231 if let Err(e) = self.create_connection(socket, address, None, io) {
1232 debug!("Can't accept connection: {:?}", e);
1233 }
1234 }
1235 }
1236
1237 fn kill_connection_by_token(
1238 &self, token: StreamToken, io: &IoContext<NetworkIoMessage>,
1239 remote: bool, op: Option<UpdateNodeOperation>, reason: &str,
1240 ) {
1241 let mut to_disconnect: Vec<ProtocolId> = Vec::new();
1242 let mut failure_id = None;
1243 let mut deregister = false;
1244
1245 if let FIRST_SESSION..=LAST_SESSION = token {
1246 if let Some(session) = self.sessions.get(token) {
1247 let mut sess = session.write();
1248 if !sess.expired() {
1249 if let Some(id) = sess.id() {
1252 self.sessions.remove_node_id_entry(id, token);
1253 }
1254 if sess.is_ready() {
1255 for (p, _) in self.handlers.read().iter() {
1256 if sess.have_capability(*p) {
1257 to_disconnect.push(*p);
1258 sess.send_disconnect(DisconnectReason::Custom(
1259 reason.into(),
1260 ));
1261 }
1262 }
1263 }
1264 sess.set_expired();
1265 }
1266 deregister = remote || sess.done();
1267 failure_id = sess.id().cloned();
1268 debug!(
1269 "kill connection by token, deregister = {}, reason = {:?}, session = {:?}, op = {:?}",
1270 deregister, reason, *sess, op
1271 );
1272 }
1273 }
1274
1275 if let Some(id) = failure_id {
1276 if remote {
1277 if let Some(op) = op {
1278 match op {
1279 UpdateNodeOperation::Failure => {
1280 self.node_db.write().note_failure(
1281 &id, true, false, );
1284 }
1285 UpdateNodeOperation::Demotion => {
1286 let mut node_db = self.node_db.write();
1287 node_db.demote(&id);
1288 node_db.note_demoted(
1289 &id, true, );
1291 }
1292 UpdateNodeOperation::Remove => {
1293 self.node_db.write().set_blacklisted(&id);
1294 }
1295 }
1296 }
1297 }
1298
1299 for p in to_disconnect {
1300 if let Some(h) = self.handlers.read().get(&p).cloned() {
1301 let network_context = NetworkContext::new(io, h, p, self);
1302 network_context
1303 .protocol_handler()
1304 .on_peer_disconnected(&network_context, &id);
1305 }
1306 }
1307 }
1308
1309 if deregister {
1310 io.deregister_stream(token).unwrap_or_else(|e| {
1311 debug!("Error deregistering stream {:?}", e);
1312 });
1313 }
1314 }
1315
1316 fn kill_connection(
1317 &self, node_id: &NodeId, io: &IoContext<NetworkIoMessage>,
1318 remote: bool, op: Option<UpdateNodeOperation>, reason: &str,
1319 ) -> bool {
1320 let mut to_disconnect: Vec<ProtocolId> = Vec::new();
1321 let mut deregister = false;
1322 let mut token = 0;
1323
1324 if let Some(session) = self.sessions.get_by_id(node_id) {
1325 let mut sess = session.write();
1326 if !sess.expired() {
1327 self.sessions.remove_node_id_entry(node_id, sess.token());
1330 if sess.is_ready() {
1331 for (p, _) in self.handlers.read().iter() {
1332 if sess.have_capability(*p) {
1333 to_disconnect.push(*p);
1334 sess.send_disconnect(DisconnectReason::Custom(
1335 reason.into(),
1336 ));
1337 }
1338 }
1339 }
1340 sess.set_expired();
1341 }
1342 deregister = remote || sess.done();
1343 token = sess.token();
1344 assert_eq!(sess.id().unwrap().clone(), node_id.clone());
1345 debug!(
1346 "kill connection, deregister = {}, reason = {:?}, session = {:?}, op = {:?}",
1347 deregister, reason, *sess, op
1348 );
1349
1350 if remote {
1351 if let Some(op) = op {
1352 match op {
1353 UpdateNodeOperation::Failure => {
1354 self.node_db.write().note_failure(
1355 node_id, true, false, );
1358 }
1359 UpdateNodeOperation::Demotion => {
1360 let mut node_db = self.node_db.write();
1361 node_db.demote(node_id);
1362 node_db.note_demoted(
1363 node_id, true, );
1365 }
1366 UpdateNodeOperation::Remove => {
1367 self.node_db.write().set_blacklisted(node_id);
1368 }
1369 }
1370 }
1371 }
1372 }
1373
1374 for p in to_disconnect {
1375 if let Some(h) = self.handlers.read().get(&p).cloned() {
1376 let network_context = NetworkContext::new(io, h, p, self);
1377 network_context
1378 .protocol_handler()
1379 .on_peer_disconnected(&network_context, node_id);
1380 }
1381 }
1382
1383 if deregister {
1384 io.deregister_stream(token).unwrap_or_else(|e| {
1385 debug!("Error deregistering stream {:?}", e);
1386 });
1387 }
1388
1389 deregister
1390 }
1391
1392 pub fn with_context<F, R>(
1393 &self, handler: Arc<dyn NetworkProtocolHandler + Sync>,
1394 protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F,
1395 ) -> R
1396 where
1397 F: FnOnce(&NetworkContext) -> R,
1398 {
1399 action(&NetworkContext::new(io, handler, protocol, self))
1400 }
1401
1402 fn udp_readable(&self, io: &IoContext<NetworkIoMessage>) {
1403 let udp_socket = self.udp_socket.lock();
1404 let writable;
1405 {
1406 let udp_channel = self.udp_channel.read();
1407 writable = udp_channel.any_sends_queued();
1408 }
1409
1410 let mut buf = [0u8; MAX_DATAGRAM_SIZE];
1411 match udp_socket.recv_from(&mut buf).map_non_block() {
1412 Ok(Some((len, address))) => self
1413 .on_udp_packet(&buf[0..len], address)
1414 .unwrap_or_else(|e| {
1415 debug!("Error processing UDP packet: {:?}", e);
1416 }),
1417 Ok(_) => {}
1418 Err(e) => {
1419 debug!("Error reading UDP socket: {:?}", e);
1420 }
1421 };
1422
1423 let new_writable;
1424 {
1425 let udp_channel = self.udp_channel.read();
1426 new_writable = udp_channel.any_sends_queued();
1427 }
1428
1429 if writable != new_writable {
1433 io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1434 debug!("Error updating UDP registration: {:?}", e)
1435 });
1436 }
1437 }
1438
1439 fn udp_writable(&self, io: &IoContext<NetworkIoMessage>) {
1440 let udp_socket = self.udp_socket.lock();
1441 let mut udp_channel = self.udp_channel.write();
1442 while let Some(data) = udp_channel.dequeue_send() {
1443 match udp_socket
1444 .send_to(&data.payload, data.address)
1445 .map_non_block()
1446 {
1447 Ok(Some(size)) if size == data.payload.len() => {}
1448 Ok(Some(_)) => {
1449 warn!("UDP sent incomplete datagram");
1450 }
1451 Ok(None) => {
1452 udp_channel.requeue_send(data);
1453 return;
1454 }
1455 Err(e) => {
1456 debug!(
1457 "UDP send error: {:?}, address: {:?}",
1458 e, &data.address
1459 );
1460 return;
1461 }
1462 }
1463 }
1464 io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1466 debug!("Error updating UDP registration: {:?}", e)
1467 });
1468 }
1469
1470 fn on_udp_packet(
1471 &self, packet: &[u8], from: SocketAddr,
1472 ) -> Result<(), Error> {
1473 if packet.is_empty() {
1474 return Ok(());
1475 }
1476
1477 let res = match packet[0] {
1478 UDP_PROTOCOL_DISCOVERY => {
1479 if let Some(discovery) = self.discovery.lock().as_mut() {
1480 discovery.on_packet(
1481 &UdpIoContext::new(&self.udp_channel, &self.node_db),
1482 &packet[1..],
1483 from,
1484 )?;
1485 Ok(())
1486 } else {
1487 warn!("Discovery is not ready. Drop the message!");
1488 Ok(())
1489 }
1490 }
1491 _ => {
1492 warn!("Unknown UDP protocol. Simply drops the message!");
1493 Ok(())
1494 }
1495 };
1496 res
1497 }
1498
1499 fn on_check_sessions(&self, io: &IoContext<NetworkIoMessage>) {
1500 let mut disconnect_peers = Vec::new();
1501
1502 for session in self.sessions.all() {
1503 if let Some(sess) = session.try_read() {
1504 if let (true, op) = sess.check_timeout() {
1505 disconnect_peers.push((sess.token(), op));
1506 }
1507 }
1508 }
1509
1510 for (token, op) in disconnect_peers {
1511 self.kill_connection_by_token(
1512 token,
1513 io,
1514 true,
1515 op,
1516 "session timeout", );
1518 }
1519 }
1520}
1521
1522impl IoHandler<NetworkIoMessage> for NetworkServiceInner {
1523 fn initialize(&self, io: &IoContext<NetworkIoMessage>) {
1524 io.register_timer(HOUSEKEEPING, self.config.housekeeping_timeout)
1525 .expect("Error registering housekeeping timer");
1526 io.message(NetworkIoMessage::Start).unwrap_or_else(|e| {
1527 warn!("Error sending IO notification: {:?}", e)
1528 });
1529 }
1530
1531 fn stream_hup(
1532 &self, io: &IoContext<NetworkIoMessage>, stream: StreamToken,
1533 ) {
1534 trace!("Hup: {}", stream);
1535 match stream {
1536 FIRST_SESSION..=LAST_SESSION => self.connection_closed(stream, io),
1537 _ => warn!("Unexpected hup"),
1538 }
1539 }
1540
1541 fn stream_readable(
1542 &self, io: &IoContext<NetworkIoMessage>, stream: StreamToken,
1543 ) {
1544 match stream {
1545 FIRST_SESSION..=LAST_SESSION => self.session_readable(stream, io),
1546 TCP_ACCEPT => self.accept(io),
1547 UDP_MESSAGE => self.udp_readable(io),
1548 _ => panic!("Received unknown readable token"),
1549 }
1550 }
1551
1552 fn stream_writable(
1553 &self, io: &IoContext<NetworkIoMessage>, stream: StreamToken,
1554 ) {
1555 match stream {
1556 FIRST_SESSION..=LAST_SESSION => self.session_writable(stream, io),
1557 UDP_MESSAGE => self.udp_writable(io),
1558 _ => panic!("Received unknown writable token"),
1559 }
1560 }
1561
1562 fn timeout(&self, io: &IoContext<NetworkIoMessage>, token: TimerToken) {
1563 match token {
1564 FIRST_SESSION..=LAST_SESSION => {
1565 debug!("Connection timeout: {}", token);
1566 self.kill_connection_by_token(
1567 token,
1568 io,
1569 true,
1570 Some(UpdateNodeOperation::Failure),
1571 "handshake timeout", );
1573 }
1574 HOUSEKEEPING => self.on_housekeeping(io),
1575 DISCOVERY_REFRESH => {
1576 let disc_general = self.has_enough_outgoing_peers(
1578 None,
1579 self.config.max_outgoing_peers,
1580 );
1581 let disc_archive = self.has_enough_outgoing_peers(
1582 Some((NODE_TAG_NODE_TYPE, NODE_TAG_ARCHIVE)),
1583 self.config.max_outgoing_peers_archive,
1584 );
1585 if disc_general || disc_archive {
1586 if let Some(d) = self.discovery.lock().as_mut() {
1587 d.disc_option.general = disc_general;
1588 d.disc_option.archive = disc_archive;
1589 d.refresh();
1590 }
1591 io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1592 debug!("Error updating discovery registration: {:?}", e)
1593 });
1594 }
1595 }
1596 FAST_DISCOVERY_REFRESH => {
1597 let disc_general = !self.has_enough_outgoing_peers(
1599 None,
1600 self.config.max_outgoing_peers,
1601 );
1602 let disc_archive = !self.has_enough_outgoing_peers(
1603 Some((NODE_TAG_NODE_TYPE, NODE_TAG_ARCHIVE)),
1604 self.config.max_outgoing_peers_archive,
1605 );
1606 if disc_general || disc_archive {
1607 if let Some(d) = self.discovery.lock().as_mut() {
1608 d.disc_option.general = disc_general;
1609 d.disc_option.archive = disc_archive;
1610 d.refresh();
1611 }
1612 io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1613 debug!("Error updating discovery registration: {:?}", e)
1614 });
1615 }
1616 }
1617 DISCOVERY_ROUND => {
1618 if let Some(d) = self.discovery.lock().as_mut() {
1619 d.round(&UdpIoContext::new(
1620 &self.udp_channel,
1621 &self.node_db,
1622 ))
1623 }
1624 io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1625 debug!("Error updating discovery registration: {:?}", e)
1626 });
1627 }
1628 NODE_TABLE => {
1629 trace!("Refreshing node table");
1630 self.try_promote_untrusted();
1631 self.node_db.write().save();
1632 }
1633 CHECK_SESSIONS => self.on_check_sessions(io),
1634 SEND_DELAYED_MESSAGES => {
1635 if let Some(ref queue) = self.delayed_queue {
1636 queue.send_delayed_messages(self);
1637 }
1638 }
1639 _ => match self.timers.read().get(&token).cloned() {
1640 Some(timer) => {
1641 match self.handlers.read().get(&timer.protocol).cloned() {
1642 None => warn!(
1643 "No handler found for protocol: {:?}",
1644 timer.protocol
1645 ),
1646 Some(h) => {
1647 let network_context = NetworkContext::new(
1648 io,
1649 h,
1650 timer.protocol,
1651 self,
1652 );
1653 network_context
1654 .protocol_handler()
1655 .on_timeout(&network_context, timer.token);
1656 }
1657 }
1658 }
1659 None => {
1660 warn!("Unknown timer token: {}", token);
1661 } },
1663 }
1664 }
1665
1666 fn message(
1667 &self, io: &IoContext<NetworkIoMessage>, message: &NetworkIoMessage,
1668 ) {
1669 match message {
1670 NetworkIoMessage::Start => self.start(io).unwrap_or_else(|e| {
1671 warn!("Error starting network service: {:?}", e)
1672 }),
1673 NetworkIoMessage::AddHandler {
1674 handler,
1675 protocol,
1676 version,
1677 callback,
1678 } => {
1679 let h = handler.clone();
1680 let network_context =
1681 NetworkContext::new(io, h, *protocol, self);
1682 network_context
1683 .protocol_handler()
1684 .initialize(&network_context);
1685 self.handlers.write().insert(*protocol, handler.clone());
1686 {
1687 let protocols = &mut *self.metadata.protocols.write();
1688 for protocol_info in protocols.iter() {
1689 assert_ne!(
1690 protocol, &protocol_info.protocol,
1691 "Do not register same protocol twice"
1692 );
1693 }
1694 protocols.push(ProtocolInfo {
1695 protocol: *protocol,
1696 version: *version,
1697 });
1698 self.metadata.minimum_peer_protocol_version.write().push(
1699 ProtocolInfo {
1700 protocol: *protocol,
1701 version: handler.minimum_supported_version(),
1702 },
1703 );
1704 }
1705 info!(
1706 "Protocol {:?} version {:?} registered.",
1707 protocol, version
1708 );
1709 callback.send(()).expect("protocol register error");
1710 }
1711 NetworkIoMessage::AddTimer {
1712 ref protocol,
1713 ref delay,
1714 ref token,
1715 } => {
1716 let handler_token = {
1717 let mut timer_counter = self.timer_counter.write();
1718 let counter = &mut *timer_counter;
1719 let handler_token = *counter;
1720 *counter += 1;
1721 handler_token
1722 };
1723 self.timers.write().insert(
1724 handler_token,
1725 ProtocolTimer {
1726 protocol: *protocol,
1727 token: *token,
1728 },
1729 );
1730 io.register_timer(handler_token, *delay)
1731 .unwrap_or_else(|e| {
1732 debug!("Error registering timer {}: {:?}", token, e)
1733 });
1734 }
1735 NetworkIoMessage::DispatchWork {
1736 ref protocol,
1737 ref work_type,
1738 } => {
1739 if let Some(handler) =
1740 self.handlers.read().get(protocol).cloned()
1741 {
1742 let network_context =
1743 NetworkContext::new(io, handler, *protocol, self);
1744 network_context
1745 .protocol_handler()
1746 .on_work_dispatch(&network_context, *work_type);
1747 } else {
1748 warn!("Work is dispatched to unknown handler");
1749 }
1750 }
1751 NetworkIoMessage::HandleProtocolMessage {
1752 ref protocol,
1753 peer: _,
1754 ref node_id,
1755 ref data,
1756 } => {
1757 debug!("Receive ProtocolMsg {:?}", protocol);
1758 if let Some(handler) =
1759 self.handlers.read().get(protocol).cloned()
1760 {
1761 let network_context =
1762 NetworkContext::new(io, handler, *protocol, self);
1763 network_context.protocol_handler().on_message(
1764 &network_context,
1765 node_id,
1766 data,
1767 );
1768 } else {
1769 warn!("Work is handled by unknown handler");
1770 }
1771 }
1772 }
1773 }
1774
1775 fn register_stream(
1776 &self, stream: StreamToken, reg: Token, poll_registry: &Registry,
1777 ) {
1778 match stream {
1779 FIRST_SESSION..=LAST_SESSION => {
1780 if let Some(session) = self.sessions.get(stream) {
1781 session
1782 .write()
1783 .register_socket(reg, poll_registry)
1784 .expect("Error registering socket");
1785 }
1786 }
1787 TCP_ACCEPT => {
1788 poll_registry
1789 .register(
1790 &mut *self.tcp_listener.lock(),
1791 Token(TCP_ACCEPT),
1792 Interest::READABLE | Interest::WRITABLE,
1793 )
1794 .expect("Error registering stream");
1795 }
1796 UDP_MESSAGE => {
1797 poll_registry
1798 .register(
1799 &mut *self.udp_socket.lock(),
1800 reg,
1801 Interest::READABLE | Interest::WRITABLE,
1802 )
1803 .expect("Error registering UDP socket");
1804 }
1805 _ => warn!("Unexpected stream registeration"),
1806 }
1807 }
1808
1809 fn deregister_stream(&self, stream: StreamToken, poll_registry: &Registry) {
1810 match stream {
1811 FIRST_SESSION..=LAST_SESSION => {
1812 if let Some(session) = self.sessions.get(stream) {
1813 let mut sess = session.write();
1814 if sess.expired() {
1815 sess.deregister_socket(poll_registry)
1816 .expect("Error deregistering socket");
1817 if let Some(node_id) = sess.id() {
1818 self.node_db.write().note_failure(
1819 node_id, true, false, );
1822 }
1823 self.sessions.remove(&sess);
1824 debug!("Removed session: {:?}", *sess);
1825 }
1826 }
1827 }
1828 _ => warn!("Unexpected stream deregistration"),
1829 }
1830 }
1831
1832 fn update_stream(
1833 &self, stream: StreamToken, reg: Token, poll_registry: &Registry,
1834 ) {
1835 match stream {
1836 FIRST_SESSION..=LAST_SESSION => {
1837 if let Some(session) = self.sessions.get(stream) {
1838 session
1839 .write()
1840 .update_socket(reg, poll_registry)
1841 .expect("Error updating socket");
1842 }
1843 }
1844 TCP_ACCEPT => poll_registry
1845 .reregister(
1846 &mut *self.tcp_listener.lock(),
1847 Token(TCP_ACCEPT),
1848 Interest::READABLE | Interest::WRITABLE,
1849 )
1850 .expect("Error reregistering stream"),
1851 UDP_MESSAGE => {
1852 let mut udp_socket = self.udp_socket.lock();
1853 let udp_channel = self.udp_channel.read();
1854
1855 let registration = if udp_channel.any_sends_queued() {
1856 Interest::READABLE | Interest::WRITABLE
1857 } else {
1858 Interest::READABLE
1859 };
1860 poll_registry
1861 .reregister(&mut *udp_socket, reg, registration)
1862 .expect("Error reregistering UDP socket");
1863 }
1864 _ => warn!("Unexpected stream update"),
1865 }
1866 }
1867}
1868
1869struct DelayMessageContext {
1870 ts: Instant,
1871 io: IoContext<NetworkIoMessage>,
1872 protocol: ProtocolId,
1873 session: SharedSession,
1874 peer: NodeId,
1875 msg: Vec<u8>,
1876 min_protocol_version: ProtocolVersion,
1878 priority: SendQueuePriority,
1879}
1880
1881impl DelayMessageContext {
1882 #[allow(clippy::too_many_arguments)]
1883 pub fn new(
1884 ts: Instant, io: IoContext<NetworkIoMessage>, protocol: ProtocolId,
1885 session: SharedSession, peer: NodeId, msg: Vec<u8>,
1886 min_protocol_version: ProtocolVersion, priority: SendQueuePriority,
1887 ) -> Self {
1888 DelayMessageContext {
1889 ts,
1890 io,
1891 protocol,
1892 session,
1893 peer,
1894 msg,
1895 min_protocol_version,
1896 priority,
1897 }
1898 }
1899}
1900
1901impl Ord for DelayMessageContext {
1902 fn cmp(&self, other: &Self) -> Ordering { other.ts.cmp(&self.ts) }
1903}
1904
1905impl PartialOrd for DelayMessageContext {
1906 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1907 Some(self.cmp(other))
1908 }
1909}
1910
1911impl Eq for DelayMessageContext {}
1912
1913impl PartialEq for DelayMessageContext {
1914 fn eq(&self, other: &Self) -> bool { self.ts == other.ts }
1915}
1916
1917pub struct NetworkContext<'a> {
1919 io: &'a IoContext<NetworkIoMessage>,
1920 handler: Arc<dyn NetworkProtocolHandler + Sync>,
1921 protocol: ProtocolId,
1922 min_supported_version: ProtocolVersion,
1923 network_service: &'a NetworkServiceInner,
1924}
1925
1926impl<'a> NetworkContext<'a> {
1927 pub fn new(
1928 io: &'a IoContext<NetworkIoMessage>,
1929 handler: Arc<dyn NetworkProtocolHandler + Sync>, protocol: ProtocolId,
1930 network_service: &'a NetworkServiceInner,
1931 ) -> NetworkContext<'a> {
1932 let min_supported_version = handler.minimum_supported_version();
1933 NetworkContext {
1934 io,
1935 handler,
1936 protocol,
1937 min_supported_version,
1938 network_service,
1939 }
1940 }
1941
1942 pub fn protocol_handler(&self) -> &dyn NetworkProtocolHandler {
1943 &*self.handler
1944 }
1945}
1946
1947impl<'a> NetworkContextTrait for NetworkContext<'a> {
1948 fn get_protocol(&self) -> ProtocolId { self.protocol }
1949
1950 fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool> {
1951 self.network_service.get_peer_connection_origin(node_id)
1952 }
1953
1954 fn is_peer_self(&self, node_id: &NodeId) -> bool {
1955 *node_id == *self.network_service.metadata.id()
1956 }
1957
1958 fn self_node_id(&self) -> NodeId { *self.network_service.metadata.id() }
1959
1960 fn send(
1962 &self, node_id: &NodeId, msg: Vec<u8>,
1963 min_protocol_version: ProtocolVersion,
1964 version_valid_till: ProtocolVersion, priority: SendQueuePriority,
1965 ) -> Result<(), Error> {
1966 if version_valid_till < self.min_supported_version {
1967 bail!(Error::SendUnsupportedMessage {
1968 protocol: self.protocol,
1969 msg_id: parse_msg_id_leb128_2_bytes_at_most(&mut &*msg)
1970 .expect("locally encoded message must contain a valid leb128 msg id"),
1971 peer_protocol_version: None,
1972 min_supported_version: Some(self.min_supported_version),
1973 });
1974 }
1975
1976 if *node_id == *self.network_service.metadata.id() {
1977 self.handler.send_local_message(self, msg);
1978 return Ok(());
1979 }
1980
1981 let session = self.network_service.sessions.get_by_id(node_id);
1982 trace!("Sending {} bytes to {}", msg.len(), node_id);
1983 if let Some(session) = session {
1984 let latency =
1985 self.network_service.delayed_queue.as_ref().and_then(|q| {
1986 session
1987 .write()
1988 .metadata
1989 .id
1990 .and_then(|id| q.latencies.read().get(&id).copied())
1991 });
1992 match latency {
1993 Some(latency) => {
1994 let q =
1995 self.network_service.delayed_queue.as_ref().unwrap();
1996 let mut queue = q.queue.lock();
1997 let ts_to_send = Instant::now() + latency;
1998 queue.push(DelayMessageContext::new(
1999 ts_to_send,
2000 self.io.clone(),
2001 self.protocol,
2002 session,
2003 *node_id,
2004 msg,
2005 min_protocol_version,
2006 priority,
2007 ));
2008 self.io.register_timer_once_nocancel(
2009 SEND_DELAYED_MESSAGES,
2010 latency,
2011 )?;
2012 trace!("register delayed timer delay:{:?} ts_to_send:{:?} length:{}", latency, ts_to_send, queue.len());
2013 }
2014 None => {
2015 session.write().send_packet(
2016 self.io,
2017 Some(self.protocol),
2018 min_protocol_version,
2019 session::PACKET_USER,
2020 msg,
2021 priority,
2022 )?;
2023 }
2024 }
2025 }
2027 Ok(())
2028 }
2029
2030 fn disconnect_peer(
2031 &self, node_id: &NodeId, op: Option<UpdateNodeOperation>, reason: &str,
2032 ) {
2033 self.network_service
2034 .kill_connection(node_id, self.io, true, op, reason);
2035 }
2036
2037 fn register_timer(
2038 &self, token: TimerToken, delay: Duration,
2039 ) -> Result<(), Error> {
2040 self.io
2041 .message(NetworkIoMessage::AddTimer {
2042 token,
2043 delay,
2044 protocol: self.protocol,
2045 })
2046 .unwrap_or_else(|e| {
2047 warn!("Error sending network IO message: {:?}", e)
2048 });
2049 Ok(())
2050 }
2051
2052 fn dispatch_work(&self, work_type: HandlerWorkType) {
2053 self.io
2054 .message(NetworkIoMessage::DispatchWork {
2055 protocol: self.protocol,
2056 work_type,
2057 })
2058 .expect("Error sending network IO message");
2059 }
2060
2061 fn insert_peer_node_tag(&self, peer: NodeId, key: &str, value: &str) {
2062 self.network_service
2063 .node_db
2064 .write()
2065 .set_tag(peer, key, value);
2066 }
2067}
2068
2069fn save_key(path: &Path, key: &Secret) {
2070 let mut path_buf = PathBuf::from(path);
2071 if let Err(e) = fs::create_dir_all(path_buf.as_path()) {
2072 warn!("Error creating key directory: {:?}", e);
2073 return;
2074 };
2075 path_buf.push("key");
2076 let path = path_buf.as_path();
2077 let mut file = match fs::File::create(path) {
2078 Ok(file) => file,
2079 Err(e) => {
2080 warn!("Error creating key file: {:?}", e);
2081 return;
2082 }
2083 };
2084 if let Err(e) = restrict_permissions_owner(path, true, false) {
2085 warn!("Failed to modify permissions of the file ({})", e);
2086 }
2087 if let Err(e) = file.write(&key.to_hex().into_bytes()) {
2088 warn!("Error writing key file: {:?}", e);
2089 }
2090}
2091
2092fn load_key(path: &Path) -> Option<Secret> {
2093 let mut path_buf = PathBuf::from(path);
2094 path_buf.push("key");
2095 let mut file = match fs::File::open(path_buf.as_path()) {
2096 Ok(file) => file,
2097 Err(e) => {
2098 debug!("failed to open key file: {:?}", e);
2099 return None;
2100 }
2101 };
2102 let mut buf = String::new();
2103 match file.read_to_string(&mut buf) {
2104 Ok(_) => {}
2105 Err(e) => {
2106 warn!("Error reading key file: {:?}", e);
2107 return None;
2108 }
2109 }
2110 match Secret::from_str(&buf) {
2111 Ok(key) => Some(key),
2112 Err(e) => {
2113 warn!("Error parsing key file: {:?}", e);
2114 None
2115 }
2116 }
2117}
2118
2119pub fn load_pos_private_key(
2120 path: &Path,
2121) -> Option<(ConsensusPrivateKey, Option<ConsensusVRFPrivateKey>)> {
2122 let mut path_buf = PathBuf::from(path);
2123 path_buf.push("pos_key");
2124 let mut file = match fs::File::open(path_buf.as_path()) {
2125 Ok(file) => file,
2126 Err(e) => {
2127 debug!("failed to open key file: {:?}", e);
2128 return None;
2129 }
2130 };
2131 let mut buf = String::new();
2132 match file.read_to_string(&mut buf) {
2133 Ok(_) => {}
2134 Err(e) => {
2135 warn!("Error reading key file: {:?}", e);
2136 return None;
2137 }
2138 }
2139 let key_str: Vec<_> = buf.split(",").collect();
2140 let private_key = match ConsensusPrivateKey::from_encoded_string(key_str[0])
2141 {
2142 Ok(key) => Some(key),
2143 Err(e) => {
2144 warn!("Error parsing key file: {:?}", e);
2145 None
2146 }
2147 }?;
2148 if key_str.len() <= 1 {
2149 return Some((private_key, None));
2150 }
2151 let vrf_private_key =
2152 match ConsensusVRFPrivateKey::from_encoded_string(key_str[1]) {
2153 Ok(key) => Some(key),
2154 Err(e) => {
2155 warn!("Error parsing key file: {:?}", e);
2156 None
2157 }
2158 }?;
2159 Some((private_key, Some(vrf_private_key)))
2160}
2161
2162impl std::fmt::Display for ProtocolVersion {
2163 fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> {
2164 write!(f, "{}", self.0)
2165 }
2166}
2167
2168impl std::ops::Deref for ProtocolVersion {
2169 type Target = u8;
2170
2171 fn deref(&self) -> &Self::Target { &self.0 }
2172}
2173
2174impl Encodable for ProtocolVersion {
2175 fn rlp_append(&self, s: &mut RlpStream) { s.append_internal(&self.0); }
2176}
2177
2178impl Decodable for ProtocolVersion {
2179 fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
2180 Ok(Self(rlp.as_val()?))
2181 }
2182}