network/
service.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use std::{
6    cmp::{min, Ordering},
7    collections::{BinaryHeap, HashMap, HashSet, VecDeque},
8    fmt::Formatter,
9    fs,
10    io::{self, Read, Write},
11    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
12    path::{Path, PathBuf},
13    str::FromStr,
14    sync::{atomic::Ordering as AtomicOrdering, Arc},
15    time::{Duration, Instant},
16};
17
18use keccak_hash::keccak;
19use mio::{
20    net::{TcpListener, TcpStream, UdpSocket},
21    Interest, Registry, Token,
22};
23use parity_path::restrict_permissions_owner;
24use parking_lot::{Mutex, RwLock};
25use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
26
27use crate::keylib::{sign, Generator, KeyPair, Random, Secret};
28use cfx_addr::Network;
29use cfx_bytes::Bytes;
30use cfx_util_macros::bail;
31use diem_crypto::ValidCryptoMaterialStringExt;
32use diem_types::{
33    account_address::from_consensus_public_key,
34    validator_config::{
35        ConsensusPrivateKey, ConsensusPublicKey, ConsensusVRFPrivateKey,
36        ConsensusVRFPublicKey,
37    },
38};
39use log::{debug, info, trace, warn};
40use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
41use priority_send_queue::SendQueuePriority;
42use serde::{Deserialize, Serialize};
43
44use crate::{
45    discovery::Discovery,
46    handshake::BYPASS_CRYPTOGRAPHY,
47    iolib::{
48        IoContext, IoHandler, IoService, MapNonBlock, StreamToken, TimerToken,
49    },
50    ip_utils::{map_external_address, select_public_address},
51    node_database::NodeDatabase,
52    node_table::*,
53    parse_msg_id_leb128_2_bytes_at_most,
54    session::{self, Session, SessionData, SessionDetails},
55    session_manager::SessionManager,
56    Error, HandlerWorkType, IpFilter, NatType, NetworkConfiguration,
57    NetworkContext as NetworkContextTrait, NetworkIoMessage,
58    NetworkProtocolHandler, PeerInfo, ProtocolId, ProtocolInfo,
59    UpdateNodeOperation, NODE_TAG_ARCHIVE, NODE_TAG_NODE_TYPE,
60};
61
62use super::DisconnectReason;
63
64const MAX_SESSIONS: usize = 2048;
65
66const DEFAULT_PORT: u16 = 32323;
67
68const FIRST_SESSION: StreamToken = 0;
69const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1;
70const SYS_TIMER: TimerToken = LAST_SESSION + 1;
71const TCP_ACCEPT: StreamToken = SYS_TIMER + 1;
72const HOUSEKEEPING: TimerToken = SYS_TIMER + 2;
73const UDP_MESSAGE: StreamToken = SYS_TIMER + 3;
74const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4;
75const FAST_DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 5;
76const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 6;
77const NODE_TABLE: TimerToken = SYS_TIMER + 7;
78const SEND_DELAYED_MESSAGES: TimerToken = SYS_TIMER + 8;
79const CHECK_SESSIONS: TimerToken = SYS_TIMER + 9;
80const HANDLER_TIMER: TimerToken = LAST_SESSION + 256;
81const STOP_NET_POLL: TimerToken = HANDLER_TIMER + 1;
82
83pub const DEFAULT_HOUSEKEEPING_TIMEOUT: Duration = Duration::from_secs(2);
84// for DISCOVERY_REFRESH TimerToken
85pub const DEFAULT_DISCOVERY_REFRESH_TIMEOUT: Duration =
86    Duration::from_secs(120);
87// for FAST_DISCOVERY_REFRESH TimerToken
88pub const DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT: Duration =
89    Duration::from_secs(10);
90// for DISCOVERY_ROUND TimerToken
91pub const DEFAULT_DISCOVERY_ROUND_TIMEOUT: Duration =
92    Duration::from_millis(500);
93// The ticker interval for NODE_TABLE, i.e., how often the program will refresh
94// the NODE_TABLE.
95pub const DEFAULT_NODE_TABLE_TIMEOUT: Duration = Duration::from_secs(300);
96// The lifetime threshold of the connection for promoting a peer from untrusted
97// to trusted.
98pub const DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION: Duration =
99    Duration::from_secs(3 * 24 * 3600);
100const DEFAULT_CHECK_SESSIONS_TIMEOUT: Duration = Duration::from_secs(10);
101
102#[derive(
103    Clone,
104    Copy,
105    Debug,
106    Default,
107    Eq,
108    PartialOrd,
109    PartialEq,
110    Serialize,
111    Deserialize,
112)]
113pub struct ProtocolVersion(pub u8);
114
115impl MallocSizeOf for ProtocolVersion {
116    fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { 0 }
117}
118
119pub const MAX_DATAGRAM_SIZE: usize = 1280;
120pub const UDP_PROTOCOL_DISCOVERY: u8 = 1;
121
122pub struct Datagram {
123    pub payload: Bytes,
124    pub address: SocketAddr,
125}
126
127pub struct UdpChannel {
128    pub send_queue: VecDeque<Datagram>,
129}
130
131impl UdpChannel {
132    pub fn new() -> UdpChannel {
133        UdpChannel {
134            send_queue: VecDeque::new(),
135        }
136    }
137
138    pub fn any_sends_queued(&self) -> bool { !self.send_queue.is_empty() }
139
140    pub fn dequeue_send(&mut self) -> Option<Datagram> {
141        self.send_queue.pop_front()
142    }
143
144    pub fn requeue_send(&mut self, datagram: Datagram) {
145        self.send_queue.push_front(datagram)
146    }
147}
148
149pub struct UdpIoContext<'a> {
150    pub channel: &'a RwLock<UdpChannel>,
151    pub node_db: &'a RwLock<NodeDatabase>,
152}
153
154impl<'a> UdpIoContext<'a> {
155    pub fn new(
156        channel: &'a RwLock<UdpChannel>, node_db: &'a RwLock<NodeDatabase>,
157    ) -> UdpIoContext<'a> {
158        UdpIoContext { channel, node_db }
159    }
160
161    pub fn send(&self, payload: Bytes, address: SocketAddr) {
162        self.channel
163            .write()
164            .send_queue
165            .push_back(Datagram { payload, address });
166    }
167}
168
169/// NetworkService implements the P2P communication between different nodes. It
170/// manages connections between peers, including accepting new peers or dropping
171/// existing peers. Inside NetworkService, it has an IoService event loop with a
172/// thread pool.
173pub struct NetworkService {
174    pub io_service: Option<IoService<NetworkIoMessage>>,
175    pub inner: Option<Arc<NetworkServiceInner>>,
176    config: NetworkConfiguration,
177}
178
179impl NetworkService {
180    pub fn new(config: NetworkConfiguration) -> NetworkService {
181        NetworkService {
182            io_service: None,
183            inner: None,
184            config,
185        }
186    }
187
188    pub fn is_consortium(&self) -> bool { self.config.is_consortium }
189
190    pub fn get_network_type(&self) -> &Network {
191        self.config.get_network_type()
192    }
193
194    pub fn network_id(&self) -> u64 { self.config.id }
195
196    pub fn is_test_mode(&self) -> bool { self.config.test_mode }
197
198    /// Create and start the event loop inside the NetworkService
199    pub fn initialize(
200        &mut self, pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
201    ) -> Result<(), Error> {
202        let raw_io_service =
203            IoService::<NetworkIoMessage>::start(STOP_NET_POLL)?;
204        self.io_service = Some(raw_io_service);
205
206        if self.inner.is_none() {
207            if self.config.test_mode {
208                BYPASS_CRYPTOGRAPHY.store(true, AtomicOrdering::Relaxed);
209            }
210
211            let inner = Arc::new(match self.config.test_mode {
212                true => NetworkServiceInner::new_with_latency(
213                    &self.config,
214                    pos_pub_keys,
215                )?,
216                false => NetworkServiceInner::new(&self.config, pos_pub_keys)?,
217            });
218            self.io_service
219                .as_ref()
220                .unwrap()
221                .register_handler(inner.clone())?;
222            self.inner = Some(inner);
223        }
224        Ok(())
225    }
226
227    pub fn start(&self) {
228        let handler = self.inner.as_ref().unwrap().clone();
229        self.io_service
230            .as_ref()
231            .expect("Already set")
232            .start_network_poll(handler, MAX_SESSIONS);
233    }
234
235    /// Add a P2P peer to the client as a trusted node
236    pub fn add_peer(&self, node: NodeEntry) -> Result<(), Error> {
237        if let Some(ref x) = self.inner {
238            x.node_db.write().insert_trusted(node);
239            Ok(())
240        } else {
241            Err("Network service not started yet!".into())
242        }
243    }
244
245    /// Drop a P2P peer from the client
246    pub fn drop_peer(&self, node: NodeEntry) -> Result<(), Error> {
247        if let Some(ref x) = self.inner {
248            x.drop_node(node.id)
249        } else {
250            Err("Network service not started yet!".into())
251        }
252    }
253
254    /// Get the local address of the client
255    pub fn local_addr(&self) -> Option<SocketAddr> {
256        self.inner.as_ref().map(|inner_ref| inner_ref.local_addr())
257    }
258
259    /// Register a new protocol handler
260    pub fn register_protocol(
261        &self, handler: Arc<dyn NetworkProtocolHandler + Sync>,
262        protocol: ProtocolId, version: ProtocolVersion,
263    ) -> Result<(), Error> {
264        let (tx, rx) = std::sync::mpsc::sync_channel(0);
265        self.io_service.as_ref().unwrap().send_message(
266            NetworkIoMessage::AddHandler {
267                handler,
268                protocol,
269                version,
270                callback: tx,
271            },
272        )?;
273        // Only error if channel closed.
274        rx.recv().expect("protocol register error");
275        Ok(())
276    }
277
278    /// Executes action in the network context
279    pub fn with_context<F, R>(
280        &self, handler: Arc<dyn NetworkProtocolHandler + Sync>,
281        protocol: ProtocolId, action: F,
282    ) -> Result<R, String>
283    where
284        F: FnOnce(&NetworkContext) -> R,
285    {
286        let io = IoContext::new(self.io_service.as_ref().unwrap().channel(), 0);
287        match self.inner {
288            Some(ref inner) => {
289                Ok(inner.with_context(handler, protocol, &io, action))
290            }
291            None => Err("Network service not started yet!".to_owned().into()),
292        }
293    }
294
295    /// Return the current connected peers
296    pub fn get_peer_info(&self) -> Option<Vec<PeerInfo>> {
297        self.inner.as_ref().map(|inner| inner.get_peer_info())
298    }
299
300    /// Sign a challenge to provide self NodeId
301    pub fn sign_challenge(&self, challenge: Vec<u8>) -> Result<Vec<u8>, Error> {
302        let hash = keccak(challenge);
303        if let Some(ref inner) = self.inner {
304            let signature = match sign(inner.metadata.keys.secret(), &hash) {
305                Ok(s) => s,
306                Err(e) => {
307                    warn!("Error signing hello packet");
308                    return Err(Error::from(e));
309                }
310            };
311            Ok(signature[..].to_owned())
312        } else {
313            Err("Network service not started yet!".into())
314        }
315    }
316
317    pub fn net_key_pair(&self) -> Result<KeyPair, Error> {
318        if let Some(ref inner) = self.inner {
319            Ok(inner.metadata.keys.clone())
320        } else {
321            Err("Network service not started yet!".into())
322        }
323    }
324
325    pub fn pos_public_key(&self) -> Option<ConsensusPublicKey> {
326        if let Some(ref inner) = self.inner {
327            inner.sessions.self_pos_public_key.clone().map(|k| k.0)
328        } else {
329            None
330        }
331    }
332
333    pub fn add_latency(
334        &self, id: NodeId, latency_ms: f64,
335    ) -> Result<(), Error> {
336        if let Some(ref inner) = self.inner {
337            inner.add_latency(id, latency_ms)
338        } else {
339            Err("Network service not started yet!".into())
340        }
341    }
342
343    pub fn get_node(&self, id: &NodeId) -> Option<(bool, Node)> {
344        let inner = self.inner.as_ref()?;
345        let node_db = inner.node_db.read();
346        let (trusted, node) = node_db.get_with_trusty(id)?;
347        Some((trusted, node.clone()))
348    }
349
350    pub fn get_detailed_sessions(
351        &self, node_id: Option<NodeId>,
352    ) -> Option<Vec<SessionDetails>> {
353        let inner = self.inner.as_ref()?;
354        match node_id {
355            None => Some(
356                inner
357                    .sessions
358                    .all()
359                    .iter()
360                    .map(|s| s.read().details())
361                    .collect(),
362            ),
363            Some(id) => {
364                let session = inner.sessions.get_by_id(&id)?;
365                let details = session.read().details();
366                Some(vec![details])
367            }
368        }
369    }
370
371    pub fn disconnect_node(
372        &self, id: &NodeId, op: Option<UpdateNodeOperation>,
373    ) -> bool {
374        if self.inner.is_none() || self.io_service.is_none() {
375            return false;
376        }
377        let inner = self.inner.as_ref().unwrap();
378        let io = IoContext::new(self.io_service.as_ref().unwrap().channel(), 0);
379        inner.kill_connection(
380            id,
381            &io,
382            true,
383            op,
384            "disconnect requested", // reason
385        );
386        true
387    }
388
389    pub fn save_node_db(&self) {
390        if let Some(inner) = &self.inner {
391            inner.node_db.write().save();
392        }
393    }
394}
395
396type SharedSession = Arc<RwLock<Session>>;
397
398pub struct HostMetadata {
399    pub network_id: u64,
400    /// Our private and public keys.
401    pub keys: KeyPair,
402    pub protocols: RwLock<Vec<ProtocolInfo>>,
403    pub minimum_peer_protocol_version: RwLock<Vec<ProtocolInfo>>,
404    pub local_address: SocketAddr,
405    /// Local address + discovery port
406    pub local_endpoint: NodeEndpoint,
407    /// Public address + discovery port
408    pub public_endpoint: NodeEndpoint,
409}
410
411impl HostMetadata {
412    pub(crate) fn secret(&self) -> &Secret { self.keys.secret() }
413
414    pub(crate) fn id(&self) -> &NodeId { self.keys.public() }
415}
416
417#[derive(Copy, Clone)]
418struct ProtocolTimer {
419    pub protocol: ProtocolId,
420    pub token: TimerToken, // Handler level token
421}
422
423/// The inner implementation of NetworkService. Note that all accesses to the
424/// RWLocks of the fields have to follow the defined order to avoid race
425pub struct NetworkServiceInner {
426    pub sessions: SessionManager,
427    pub metadata: HostMetadata,
428    pub config: NetworkConfiguration,
429    udp_socket: Mutex<UdpSocket>,
430    tcp_listener: Mutex<TcpListener>,
431    udp_channel: RwLock<UdpChannel>,
432    discovery: Mutex<Option<Discovery>>,
433    handlers:
434        RwLock<HashMap<ProtocolId, Arc<dyn NetworkProtocolHandler + Sync>>>,
435    timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
436    timer_counter: RwLock<usize>,
437    pub node_db: RwLock<NodeDatabase>,
438    reserved_nodes: RwLock<HashSet<NodeId>>,
439    dropped_nodes: RwLock<HashSet<NodeId>>,
440
441    is_consortium: bool,
442
443    /// Delayed message queue and corresponding latency
444    delayed_queue: Option<DelayedQueue>,
445}
446
447struct DelayedQueue {
448    queue: Mutex<BinaryHeap<DelayMessageContext>>,
449    latencies: RwLock<HashMap<NodeId, Duration>>,
450}
451
452impl DelayedQueue {
453    fn new() -> Self {
454        DelayedQueue {
455            queue: Mutex::new(BinaryHeap::new()),
456            latencies: RwLock::new(HashMap::new()),
457        }
458    }
459
460    fn send_delayed_messages(&self, network_service: &NetworkServiceInner) {
461        let context = self.queue.lock().pop().unwrap();
462        let r = context.session.write().send_packet(
463            &context.io,
464            Some(context.protocol),
465            context.min_protocol_version,
466            session::PACKET_USER,
467            context.msg,
468            context.priority,
469        );
470        match r {
471            Ok(_) => {}
472            Err(Error::Expired) => {
473                // If a connection is set expired, it should have been killed
474                // before, and the stored `context.peer` may have been reused by
475                // another connection, so we cannot kill it again
476                info!(
477                    "Error sending delayed message to expired connection {:?}",
478                    context.peer
479                );
480            }
481            Err(e) => {
482                info!(
483                    "Error sending delayed message: peer={:?} err={:?}",
484                    context.peer, e
485                );
486                network_service.kill_connection(
487                    &context.peer,
488                    &context.io,
489                    true,
490                    Some(UpdateNodeOperation::Failure),
491                    "failed to send delayed message", // reason
492                );
493            }
494        };
495    }
496}
497
498impl NetworkServiceInner {
499    pub fn new(
500        config: &NetworkConfiguration,
501        pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
502    ) -> Result<NetworkServiceInner, Error> {
503        let mut listen_address = match config.listen_address {
504            None => SocketAddr::V4(SocketAddrV4::new(
505                Ipv4Addr::new(0, 0, 0, 0),
506                DEFAULT_PORT,
507            )),
508            Some(addr) => addr,
509        };
510
511        let keys = if let Some(ref secret) = config.use_secret {
512            KeyPair::from_secret(secret.clone())?
513        } else {
514            config
515                .config_path
516                .clone()
517                .and_then(|ref p| load_key(Path::new(&p)))
518                .map_or_else(
519                    || {
520                        let key = Random
521                            .generate()
522                            .expect("Error generating random key pair");
523                        if let Some(path) = config.config_path.clone() {
524                            save_key(Path::new(&path), key.secret());
525                        }
526                        key
527                    },
528                    |s| {
529                        KeyPair::from_secret(s)
530                            .expect("Error creating node secret key")
531                    },
532                )
533        };
534        info!("Self pos public key: {:?}", pos_pub_keys);
535
536        info!("Self node id: {:?}", *keys.public());
537
538        let tcp_listener = TcpListener::bind(listen_address)?;
539        listen_address = SocketAddr::new(
540            listen_address.ip(),
541            tcp_listener.local_addr()?.port(),
542        );
543        debug!("Listening at {:?}", listen_address);
544        let udp_port = config.udp_port.unwrap_or_else(|| listen_address.port());
545        let local_endpoint = NodeEndpoint {
546            address: listen_address,
547            udp_port,
548        };
549        let mut udp_addr = local_endpoint.address;
550        udp_addr.set_port(local_endpoint.udp_port);
551        let udp_socket =
552            UdpSocket::bind(udp_addr).expect("Error binding UDP socket");
553
554        let public_address = config.public_address;
555        let public_endpoint = match public_address {
556            None => {
557                let public_address =
558                    select_public_address(local_endpoint.address.port());
559                let public_endpoint = NodeEndpoint {
560                    address: public_address,
561                    udp_port: local_endpoint.udp_port,
562                };
563                if config.nat_enabled {
564                    match map_external_address(&local_endpoint, &NatType::Any) {
565                        Some(endpoint) => {
566                            info!(
567                                "NAT mapped to external address {}",
568                                endpoint.address
569                            );
570                            endpoint
571                        }
572                        None => public_endpoint,
573                    }
574                } else {
575                    public_endpoint
576                }
577            }
578            Some(addr) => NodeEndpoint {
579                address: addr,
580                udp_port: local_endpoint.udp_port,
581            },
582        };
583
584        let allow_ips = config.ip_filter.clone();
585        let discovery = {
586            if config.discovery_enabled {
587                Some(Discovery::new(
588                    &keys,
589                    public_endpoint.clone(),
590                    allow_ips,
591                    config.discovery_config.clone(),
592                ))
593            } else {
594                None
595            }
596        };
597
598        let nodes_path = config.config_path.clone();
599
600        let mut inner = NetworkServiceInner {
601            metadata: HostMetadata {
602                network_id: config.id,
603                keys,
604                protocols: RwLock::new(Vec::new()),
605                minimum_peer_protocol_version: Default::default(),
606                local_address: listen_address,
607                local_endpoint,
608                public_endpoint,
609            },
610            config: config.clone(),
611            udp_channel: RwLock::new(UdpChannel::new()),
612            discovery: Mutex::new(discovery),
613            udp_socket: Mutex::new(udp_socket),
614            tcp_listener: Mutex::new(tcp_listener),
615            sessions: SessionManager::new(
616                FIRST_SESSION,
617                MAX_SESSIONS,
618                config.max_incoming_peers,
619                &config.session_ip_limit_config,
620                Some(pos_pub_keys),
621            ),
622            handlers: RwLock::new(HashMap::new()),
623            timers: RwLock::new(HashMap::new()),
624            timer_counter: RwLock::new(HANDLER_TIMER),
625            node_db: RwLock::new(NodeDatabase::new(
626                nodes_path,
627                config.subnet_quota,
628            )),
629            reserved_nodes: RwLock::new(HashSet::new()),
630            dropped_nodes: RwLock::new(HashSet::new()),
631            is_consortium: config.is_consortium,
632            delayed_queue: None,
633        };
634
635        for n in &config.boot_nodes {
636            inner.add_boot_node(n);
637        }
638
639        let reserved_nodes = config.reserved_nodes.clone();
640        for n in reserved_nodes {
641            if let Err(e) = inner.add_reserved_node(&n) {
642                debug!("Error parsing node id: {}: {:?}", n, e);
643            }
644        }
645
646        Ok(inner)
647    }
648
649    pub fn new_with_latency(
650        config: &NetworkConfiguration,
651        pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
652    ) -> Result<NetworkServiceInner, Error> {
653        let r = NetworkServiceInner::new(config, pos_pub_keys);
654        if r.is_err() {
655            return r;
656        }
657        let mut inner = r.unwrap();
658        inner.delayed_queue = Some(DelayedQueue::new());
659        Ok(inner)
660    }
661
662    pub fn add_latency(
663        &self, peer: NodeId, latency_ms: f64,
664    ) -> Result<(), Error> {
665        match self.delayed_queue {
666            Some(ref queue) => {
667                let mut latencies = queue.latencies.write();
668                latencies
669                    .insert(peer, Duration::from_millis(latency_ms as u64));
670                Ok(())
671            }
672            None => Err(
673                "conflux not in test mode, and does not support add_latency"
674                    .into(),
675            ),
676        }
677    }
678
679    pub fn get_ip_filter(&self) -> &IpFilter { &self.config.ip_filter }
680
681    fn add_boot_node(&self, id: &str) {
682        match Node::from_str(id) {
683            Err(e) => {
684                debug!("Could not add node {}: {:?}", id, e);
685            }
686            Ok(n) => {
687                self.node_db.write().insert_trusted(NodeEntry {
688                    id: n.id,
689                    endpoint: n.endpoint,
690                });
691            }
692        }
693    }
694
695    fn add_reserved_node(&mut self, id: &str) -> Result<(), Error> {
696        let n = Node::from_str(id)?;
697        self.node_db.write().insert_trusted(NodeEntry {
698            id: n.id.clone(),
699            endpoint: n.endpoint.clone(),
700        });
701        self.reserved_nodes.write().insert(n.id);
702        Ok(())
703    }
704
705    fn initialize_udp_protocols(
706        &self, io: &IoContext<NetworkIoMessage>,
707    ) -> Result<(), Error> {
708        // Initialize discovery
709        if let Some(discovery) = self.discovery.lock().as_mut() {
710            let allow_ips = self.config.ip_filter.clone();
711            let nodes = self.node_db.read().sample_trusted_nodes(
712                self.config.discovery_config.discover_node_count,
713                &allow_ips,
714            );
715            discovery.try_ping_nodes(
716                &UdpIoContext::new(&self.udp_channel, &self.node_db),
717                nodes,
718            );
719            io.register_timer(
720                FAST_DISCOVERY_REFRESH,
721                self.config.fast_discovery_refresh_timeout,
722            )?;
723            io.register_timer(
724                DISCOVERY_REFRESH,
725                self.config.discovery_refresh_timeout,
726            )?;
727            io.register_timer(
728                DISCOVERY_ROUND,
729                self.config.discovery_round_timeout,
730            )?;
731        }
732        io.register_timer(NODE_TABLE, self.config.node_table_timeout)?;
733        io.register_timer(CHECK_SESSIONS, DEFAULT_CHECK_SESSIONS_TIMEOUT)?;
734
735        Ok(())
736    }
737
738    fn try_promote_untrusted(&self) {
739        // Get NodeIds from incoming connections
740        let mut incoming_ids: Vec<NodeId> = Vec::new();
741        for s in self.sessions.all() {
742            if let Some(s) = s.try_read() {
743                if s.is_ready() && !s.metadata.originated && !s.expired() {
744                    // is live incoming connection
745                    if let Some(id) = s.metadata.id {
746                        incoming_ids.push(id);
747                    }
748                }
749            }
750        }
751
752        // Check each live connection for its lifetime.
753        // Promote the peers with live connection for a threshold period
754        self.node_db.write().promote(
755            incoming_ids,
756            self.config.connection_lifetime_for_promotion,
757        );
758    }
759
760    pub fn local_addr(&self) -> SocketAddr { self.metadata.local_address }
761
762    fn drop_node(&self, local_id: NodeId) -> Result<(), Error> {
763        let removed_node = self.node_db.write().remove(&local_id);
764
765        if let Some(node) = removed_node {
766            assert_eq!(node.id, local_id);
767            if let Some(_stream_token) = node.stream_token {
768                let mut wd = self.dropped_nodes.write();
769                wd.insert(node.id);
770            }
771        }
772
773        Ok(())
774    }
775
776    fn has_enough_outgoing_peers(
777        &self, tag: Option<(&str, &str)>, max: usize,
778    ) -> bool {
779        let count = match tag {
780            Some((k, v)) => self.sessions.count_with_tag(&k.into(), &v.into()),
781            None => self.sessions.stat().1, // egress count
782        };
783
784        count >= max
785    }
786
787    fn on_housekeeping(&self, io: &IoContext<NetworkIoMessage>) {
788        if self.is_consortium {
789            unimplemented!();
790        } else {
791            self.connect_peers(io);
792        }
793        self.drop_peers(io);
794    }
795
796    // Connect to all reserved and trusted peers if not yet
797    fn connect_peers(&self, io: &IoContext<NetworkIoMessage>) {
798        if self.metadata.minimum_peer_protocol_version.read().len() == 0 {
799            // The protocol handler has not been registered, we just wait for
800            // the next time.
801            return;
802        }
803
804        let self_id = self.metadata.id().clone();
805
806        let sampled_archive_nodes = self.sample_archive_nodes();
807
808        let (handshake_count, egress_count, ingress_count) =
809            self.sessions.stat();
810        let samples;
811        {
812            let egress_attempt_count = if self.config.max_outgoing_peers
813                > egress_count + sampled_archive_nodes.len()
814            {
815                self.config.max_outgoing_peers
816                    - egress_count
817                    - sampled_archive_nodes.len()
818            } else {
819                0
820            };
821            samples = self.node_db.read().sample_trusted_node_ids(
822                egress_attempt_count as u32,
823                &self.config.ip_filter,
824            );
825        }
826
827        let reserved_nodes = self.reserved_nodes.read();
828        // Try to connect all reserved peers and trusted peers
829        let nodes = reserved_nodes
830            .iter()
831            .cloned()
832            .chain(sampled_archive_nodes)
833            .chain(samples);
834
835        let max_handshakes_per_round = self.config.max_handshakes / 2;
836        let mut started: usize = 0;
837        for id in nodes
838            .filter(|id| !self.sessions.contains_node(id) && *id != self_id)
839            .take(min(
840                max_handshakes_per_round,
841                self.config.max_handshakes.saturating_sub(handshake_count),
842            ))
843        {
844            self.connect_peer(&id, io);
845            started += 1;
846        }
847        debug!(
848            "Connecting peers: {} sessions, {} pending + {} started",
849            egress_count + ingress_count,
850            handshake_count,
851            started
852        );
853        if egress_count + ingress_count == 0 {
854            warn!(
855                "No peers connected at this moment, {} pending + {} started",
856                handshake_count, started
857            );
858        }
859    }
860
861    /// Sample archive nodes for outgoing connections if not enough.
862    fn sample_archive_nodes(&self) -> HashSet<NodeId> {
863        if self.config.max_outgoing_peers_archive == 0 {
864            return HashSet::new();
865        }
866
867        let key: String = NODE_TAG_NODE_TYPE.into();
868        let value: String = NODE_TAG_ARCHIVE.into();
869        let archive_sessions = self.sessions.count_with_tag(&key, &value);
870
871        if archive_sessions >= self.config.max_outgoing_peers_archive {
872            return HashSet::new();
873        }
874
875        self.node_db.read().sample_trusted_node_ids_with_tag(
876            (self.config.max_outgoing_peers_archive - archive_sessions) as u32,
877            &key,
878            &value,
879        )
880    }
881
882    // Kill connections of all dropped peers
883    fn drop_peers(&self, io: &IoContext<NetworkIoMessage>) {
884        {
885            if self.dropped_nodes.read().len() == 0 {
886                return;
887            }
888        }
889
890        let mut killed_nodes = HashSet::new();
891        let mut w = self.dropped_nodes.write();
892        for node_id in w.iter() {
893            if self.kill_connection(
894                node_id,
895                io,
896                true,
897                Some(UpdateNodeOperation::Failure),
898                "peer dropped in manual", // reason
899            ) {
900                killed_nodes.insert(*node_id);
901            }
902        }
903
904        for node_id in killed_nodes.iter() {
905            w.remove(node_id);
906        }
907    }
908
909    fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage>) {
910        if self.sessions.contains_node(id) {
911            trace!("Abort connect. Node already connected");
912            return;
913        }
914
915        let (socket, address) = {
916            let address = {
917                // outgoing connection must pick node from trusted node table
918                if let Some(node) = self.node_db.read().get(id, true) {
919                    node.endpoint.address
920                } else {
921                    debug!("Abort connect. Node expired");
922                    return;
923                }
924            };
925
926            if !self.sessions.is_ip_allowed(&address.ip()) {
927                debug!("cannot create outgoing connection to node, id = {:?}, address = {:?}", id, address);
928                return;
929            }
930
931            match TcpStream::connect(address) {
932                Ok(socket) => {
933                    trace!("{}: connecting to {:?}", id, address);
934                    (socket, address)
935                }
936                Err(e) => {
937                    self.node_db.write().note_failure(
938                        id, true, /* by_connection */
939                        true, /* trusted_only */
940                    );
941                    debug!(
942                        "{}: can't connect o address {:?} {:?}",
943                        id, address, e
944                    );
945                    return;
946                }
947            }
948        };
949
950        if let Err(e) = self.create_connection(socket, address, Some(id), io) {
951            self.node_db.write().note_failure(
952                id, true, /* by_connection */
953                true, /* trusted_only */
954            );
955            debug!("Can't create connection: {:?}", e);
956        }
957    }
958
959    pub fn get_peer_info(&self) -> Vec<PeerInfo> {
960        debug!("get_peer_info: enter");
961
962        let mut peers = Vec::with_capacity(self.sessions.count());
963        debug!("get_peer_info: {} sessions in total", peers.capacity());
964
965        for session in self.sessions.all() {
966            let sess = session.read();
967            if !sess.expired() {
968                peers.push(PeerInfo {
969                    id: sess.token(),
970                    nodeid: sess.id().unwrap_or(&NodeId::default()).clone(),
971                    addr: sess.address(),
972                    protocols: sess.metadata.peer_protocols.clone(),
973                });
974            }
975        }
976
977        debug!("get_peer_info: leave, {} peers retrieved", peers.len());
978
979        peers
980    }
981
982    pub fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool> {
983        match self.sessions.get_by_id(node_id) {
984            Some(session) => {
985                let sess = session.read();
986                Some(sess.originated())
987            }
988            None => None,
989        }
990    }
991
992    fn start(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
993        self.initialize_udp_protocols(io)?;
994        io.register_stream(UDP_MESSAGE)?;
995        io.register_stream(TCP_ACCEPT)?;
996        Ok(())
997    }
998
999    // This function can be invoked in either of 2 cases:
1000    // 1. proactively connect to a peer;
1001    // 2. passively connected by a peer;
1002    fn create_connection(
1003        &self, socket: TcpStream, address: SocketAddr, id: Option<&NodeId>,
1004        io: &IoContext<NetworkIoMessage>,
1005    ) -> Result<(), Error> {
1006        match self.sessions.create(socket, address, id, io, self) {
1007            Ok(token) => {
1008                debug!("new session created, token = {}, address = {:?}, id = {:?}", token, address, id);
1009                if let Some(id) = id {
1010                    // This is an outgoing connection.
1011                    // Outgoing connection must pick node from trusted node
1012                    // table
1013                    self.node_db.write().note_success(id, Some(token), true);
1014                }
1015                io.register_stream(token).map(|_| ()).map_err(Into::into)
1016            }
1017            Err(reason) => {
1018                debug!("failed to create session, reason = {}, address = {:?}, id = {:?}", reason, address, id);
1019                Ok(())
1020            }
1021        }
1022    }
1023
1024    fn connection_closed(
1025        &self, stream: StreamToken, io: &IoContext<NetworkIoMessage>,
1026    ) {
1027        trace!("Connection closed: {}", stream);
1028        self.kill_connection_by_token(
1029            stream,
1030            io,
1031            true,
1032            Some(UpdateNodeOperation::Failure),
1033            "connection closed", // reason
1034        );
1035    }
1036
1037    fn session_readable(
1038        &self, stream: StreamToken, io: &IoContext<NetworkIoMessage>,
1039    ) {
1040        let mut handshake_done = false;
1041
1042        let mut messages: Vec<(ProtocolId, Vec<u8>)> = Vec::new();
1043        let mut kill = false;
1044        let mut token_to_disconnect = None;
1045
1046        let session = if let Some(session) = self.sessions.get(stream) {
1047            session
1048        } else {
1049            return;
1050        };
1051
1052        // We check dropped_nodes first to make sure we stop processing
1053        // communications from any dropped peers
1054        let mut session_node_id = session.read().id().map(|id| *id);
1055        let mut pos_public_key_opt = None;
1056        if let Some(node_id) = session_node_id {
1057            let to_drop = self.dropped_nodes.read().contains(&node_id);
1058            self.drop_peers(io);
1059            if to_drop {
1060                return;
1061            }
1062        }
1063
1064        loop {
1065            let mut sess = session.write();
1066            let data = sess.readable(io, self);
1067            let session_data = match data {
1068                Ok(session_data) => session_data,
1069                Err(e) => {
1070                    debug!("Failed to read session data, error = {:?}, session = {:?}", e, *sess);
1071                    kill = true;
1072                    break;
1073                }
1074            };
1075
1076            if session_data.token_to_disconnect.is_some() {
1077                debug!(
1078                    "session_readable: set token_to_disconnect to {:?}",
1079                    session_data.token_to_disconnect
1080                );
1081                token_to_disconnect = session_data.token_to_disconnect;
1082            }
1083
1084            match session_data.session_data {
1085                SessionData::Ready { pos_public_key } => {
1086                    debug!(
1087                        "receive Ready with pos_public_key={:?} account={:?}",
1088                        pos_public_key,
1089                        pos_public_key
1090                            .as_ref()
1091                            .map(|k| from_consensus_public_key(&k.0, &k.1)),
1092                    );
1093                    handshake_done = true;
1094                    session_node_id = Some(*sess.id().unwrap());
1095                    pos_public_key_opt = pos_public_key;
1096                }
1097                SessionData::Message { data, protocol } => {
1098                    drop(sess);
1099                    match self.handlers.read().get(&protocol) {
1100                        None => warn!(
1101                            "No handler found for protocol: {:?}",
1102                            protocol
1103                        ),
1104                        Some(_) => {
1105                            messages.push((protocol, data));
1106                        }
1107                    }
1108                }
1109                SessionData::None => break,
1110                SessionData::Continue => {}
1111            }
1112        }
1113
1114        if let Some(token_to_disconnect) = token_to_disconnect {
1115            self.kill_connection_by_token(
1116                token_to_disconnect.0,
1117                io,
1118                true,
1119                Some(UpdateNodeOperation::Failure),
1120                token_to_disconnect.1.as_str(), // reason
1121            );
1122        }
1123
1124        if kill {
1125            self.kill_connection_by_token(
1126                stream,
1127                io,
1128                true,
1129                Some(UpdateNodeOperation::Failure),
1130                "session readable error", // reason
1131            );
1132            return;
1133        }
1134
1135        // Handshake is just finished, first process the outcome from the
1136        // handshake.
1137        if handshake_done {
1138            let handlers = self.handlers.read();
1139            // Clone the data to prevent deadlock, because handler may
1140            // close the connection within the on_peer_connected
1141            // callback.
1142            let session_metadata = session.read().metadata.clone();
1143            for protocol in &session_metadata.peer_protocols {
1144                if let Some(handler) = handlers.get(&protocol.protocol).cloned()
1145                {
1146                    debug!("session handshaked, token = {}", stream);
1147                    let network_context = NetworkContext::new(
1148                        io,
1149                        handler,
1150                        protocol.protocol,
1151                        self,
1152                    );
1153                    network_context.protocol_handler().on_peer_connected(
1154                        &network_context,
1155                        session_node_id.as_ref().unwrap(),
1156                        protocol.version,
1157                        pos_public_key_opt.clone(),
1158                    );
1159                }
1160            }
1161        }
1162
1163        for (protocol, data) in messages {
1164            if let Err(e) = io.handle(
1165                stream,
1166                0, /* We only have one handler for the execution
1167                    * event_loop,
1168                    * so the handler_id is always 0 */
1169                NetworkIoMessage::HandleProtocolMessage {
1170                    protocol,
1171                    peer: stream,
1172                    node_id: session_node_id.as_ref().unwrap().clone(),
1173                    data,
1174                },
1175            ) {
1176                warn!("Error occurs, discard protocol message: err={}", e);
1177            }
1178        }
1179    }
1180
1181    fn session_writable(
1182        &self, stream: StreamToken, io: &IoContext<NetworkIoMessage>,
1183    ) {
1184        if let Some(session) = self.sessions.get(stream) {
1185            {
1186                // We check dropped_nodes first to make sure we stop processing
1187                // communications from any dropped peers
1188                let sess_id = session.read().id().map(|id| *id);
1189                if let Some(node_id) = sess_id {
1190                    let to_drop = self.dropped_nodes.read().contains(&node_id);
1191                    self.drop_peers(io);
1192                    if to_drop {
1193                        return;
1194                    }
1195                }
1196            }
1197            let mut sess = session.write();
1198            if let Err(e) = sess.writable(io) {
1199                trace!("{}: Session write error: {:?}", stream, e);
1200            }
1201            if sess.done() {
1202                io.deregister_stream(stream).unwrap_or_else(|e| {
1203                    debug!("Error deregistering stream: {:?}", e)
1204                });
1205            }
1206        }
1207    }
1208
1209    fn accept(&self, io: &IoContext<NetworkIoMessage>) {
1210        trace!("Accepting incoming connection");
1211        loop {
1212            let (socket, address) = match self.tcp_listener.lock().accept() {
1213                Ok((sock, addr)) => (sock, addr),
1214                Err(e) => {
1215                    if e.kind() != io::ErrorKind::WouldBlock {
1216                        debug!("Error accepting connection: {:?}", e);
1217                    }
1218                    break;
1219                }
1220            };
1221
1222            if let Err(e) = self.create_connection(socket, address, None, io) {
1223                debug!("Can't accept connection: {:?}", e);
1224            }
1225        }
1226    }
1227
1228    fn kill_connection_by_token(
1229        &self, token: StreamToken, io: &IoContext<NetworkIoMessage>,
1230        remote: bool, op: Option<UpdateNodeOperation>, reason: &str,
1231    ) {
1232        let mut to_disconnect: Vec<ProtocolId> = Vec::new();
1233        let mut failure_id = None;
1234        let mut deregister = false;
1235
1236        if let FIRST_SESSION..=LAST_SESSION = token {
1237            if let Some(session) = self.sessions.get(token) {
1238                let mut sess = session.write();
1239                if !sess.expired() {
1240                    if sess.is_ready() {
1241                        for (p, _) in self.handlers.read().iter() {
1242                            if sess.have_capability(*p) {
1243                                to_disconnect.push(*p);
1244                                sess.send_disconnect(DisconnectReason::Custom(
1245                                    reason.into(),
1246                                ));
1247                            }
1248                        }
1249                    }
1250                    sess.set_expired();
1251                }
1252                deregister = remote || sess.done();
1253                failure_id = sess.id().cloned();
1254                debug!(
1255                    "kill connection by token, deregister = {}, reason = {:?}, session = {:?}, op = {:?}",
1256                    deregister, reason, *sess, op
1257                );
1258            }
1259        }
1260
1261        if let Some(id) = failure_id {
1262            if remote {
1263                if let Some(op) = op {
1264                    match op {
1265                        UpdateNodeOperation::Failure => {
1266                            self.node_db.write().note_failure(
1267                                &id, true,  /* by_connection */
1268                                false, /* trusted_only */
1269                            );
1270                        }
1271                        UpdateNodeOperation::Demotion => {
1272                            let mut node_db = self.node_db.write();
1273                            node_db.demote(&id);
1274                            node_db.note_demoted(
1275                                &id, true, /* by_connection */
1276                            );
1277                        }
1278                        UpdateNodeOperation::Remove => {
1279                            self.node_db.write().set_blacklisted(&id);
1280                        }
1281                    }
1282                }
1283            }
1284
1285            for p in to_disconnect {
1286                if let Some(h) = self.handlers.read().get(&p).cloned() {
1287                    let network_context = NetworkContext::new(io, h, p, self);
1288                    network_context
1289                        .protocol_handler()
1290                        .on_peer_disconnected(&network_context, &id);
1291                }
1292            }
1293        }
1294
1295        if deregister {
1296            io.deregister_stream(token).unwrap_or_else(|e| {
1297                debug!("Error deregistering stream {:?}", e);
1298            });
1299        }
1300    }
1301
1302    fn kill_connection(
1303        &self, node_id: &NodeId, io: &IoContext<NetworkIoMessage>,
1304        remote: bool, op: Option<UpdateNodeOperation>, reason: &str,
1305    ) -> bool {
1306        let mut to_disconnect: Vec<ProtocolId> = Vec::new();
1307        let mut deregister = false;
1308        let mut token = 0;
1309
1310        if let Some(session) = self.sessions.get_by_id(node_id) {
1311            let mut sess = session.write();
1312            if !sess.expired() {
1313                if sess.is_ready() {
1314                    for (p, _) in self.handlers.read().iter() {
1315                        if sess.have_capability(*p) {
1316                            to_disconnect.push(*p);
1317                            sess.send_disconnect(DisconnectReason::Custom(
1318                                reason.into(),
1319                            ));
1320                        }
1321                    }
1322                }
1323                sess.set_expired();
1324            }
1325            deregister = remote || sess.done();
1326            token = sess.token();
1327            assert_eq!(sess.id().unwrap().clone(), node_id.clone());
1328            debug!(
1329                "kill connection, deregister = {}, reason = {:?}, session = {:?}, op = {:?}",
1330                deregister, reason, *sess, op
1331            );
1332
1333            if remote {
1334                if let Some(op) = op {
1335                    match op {
1336                        UpdateNodeOperation::Failure => {
1337                            self.node_db.write().note_failure(
1338                                node_id, true,  /* by_connection */
1339                                false, /* trusted_only */
1340                            );
1341                        }
1342                        UpdateNodeOperation::Demotion => {
1343                            let mut node_db = self.node_db.write();
1344                            node_db.demote(node_id);
1345                            node_db.note_demoted(
1346                                node_id, true, /* by_connection */
1347                            );
1348                        }
1349                        UpdateNodeOperation::Remove => {
1350                            self.node_db.write().set_blacklisted(node_id);
1351                        }
1352                    }
1353                }
1354            }
1355        }
1356
1357        for p in to_disconnect {
1358            if let Some(h) = self.handlers.read().get(&p).cloned() {
1359                let network_context = NetworkContext::new(io, h, p, self);
1360                network_context
1361                    .protocol_handler()
1362                    .on_peer_disconnected(&network_context, node_id);
1363            }
1364        }
1365
1366        if deregister {
1367            io.deregister_stream(token).unwrap_or_else(|e| {
1368                debug!("Error deregistering stream {:?}", e);
1369            });
1370        }
1371
1372        deregister
1373    }
1374
1375    pub fn with_context<F, R>(
1376        &self, handler: Arc<dyn NetworkProtocolHandler + Sync>,
1377        protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F,
1378    ) -> R
1379    where
1380        F: FnOnce(&NetworkContext) -> R,
1381    {
1382        action(&NetworkContext::new(io, handler, protocol, self))
1383    }
1384
1385    fn udp_readable(&self, io: &IoContext<NetworkIoMessage>) {
1386        let udp_socket = self.udp_socket.lock();
1387        let writable;
1388        {
1389            let udp_channel = self.udp_channel.read();
1390            writable = udp_channel.any_sends_queued();
1391        }
1392
1393        let mut buf = [0u8; MAX_DATAGRAM_SIZE];
1394        match udp_socket.recv_from(&mut buf).map_non_block() {
1395            Ok(Some((len, address))) => self
1396                .on_udp_packet(&buf[0..len], address)
1397                .unwrap_or_else(|e| {
1398                    debug!("Error processing UDP packet: {:?}", e);
1399                }),
1400            Ok(_) => {}
1401            Err(e) => {
1402                debug!("Error reading UDP socket: {:?}", e);
1403            }
1404        };
1405
1406        let new_writable;
1407        {
1408            let udp_channel = self.udp_channel.read();
1409            new_writable = udp_channel.any_sends_queued();
1410        }
1411
1412        // Check whether on_udp_packet produces new to-be-sent messages.
1413        // If it does, we might need to change monitor interest to All if
1414        // it is only Readable.
1415        if writable != new_writable {
1416            io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1417                debug!("Error updating UDP registration: {:?}", e)
1418            });
1419        }
1420    }
1421
1422    fn udp_writable(&self, io: &IoContext<NetworkIoMessage>) {
1423        let udp_socket = self.udp_socket.lock();
1424        let mut udp_channel = self.udp_channel.write();
1425        while let Some(data) = udp_channel.dequeue_send() {
1426            match udp_socket
1427                .send_to(&data.payload, data.address)
1428                .map_non_block()
1429            {
1430                Ok(Some(size)) if size == data.payload.len() => {}
1431                Ok(Some(_)) => {
1432                    warn!("UDP sent incomplete datagram");
1433                }
1434                Ok(None) => {
1435                    udp_channel.requeue_send(data);
1436                    return;
1437                }
1438                Err(e) => {
1439                    debug!(
1440                        "UDP send error: {:?}, address: {:?}",
1441                        e, &data.address
1442                    );
1443                    return;
1444                }
1445            }
1446        }
1447        // look at whether the monitor interest can be set as Readable.
1448        io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1449            debug!("Error updating UDP registration: {:?}", e)
1450        });
1451    }
1452
1453    fn on_udp_packet(
1454        &self, packet: &[u8], from: SocketAddr,
1455    ) -> Result<(), Error> {
1456        if packet.is_empty() {
1457            return Ok(());
1458        }
1459
1460        let res = match packet[0] {
1461            UDP_PROTOCOL_DISCOVERY => {
1462                if let Some(discovery) = self.discovery.lock().as_mut() {
1463                    discovery.on_packet(
1464                        &UdpIoContext::new(&self.udp_channel, &self.node_db),
1465                        &packet[1..],
1466                        from,
1467                    )?;
1468                    Ok(())
1469                } else {
1470                    warn!("Discovery is not ready. Drop the message!");
1471                    Ok(())
1472                }
1473            }
1474            _ => {
1475                warn!("Unknown UDP protocol. Simply drops the message!");
1476                Ok(())
1477            }
1478        };
1479        res
1480    }
1481
1482    fn on_check_sessions(&self, io: &IoContext<NetworkIoMessage>) {
1483        let mut disconnect_peers = Vec::new();
1484
1485        for session in self.sessions.all() {
1486            if let Some(sess) = session.try_read() {
1487                if let (true, op) = sess.check_timeout() {
1488                    disconnect_peers.push((sess.token(), op));
1489                }
1490            }
1491        }
1492
1493        for (token, op) in disconnect_peers {
1494            self.kill_connection_by_token(
1495                token,
1496                io,
1497                true,
1498                op,
1499                "session timeout", // reason
1500            );
1501        }
1502    }
1503}
1504
1505impl IoHandler<NetworkIoMessage> for NetworkServiceInner {
1506    fn initialize(&self, io: &IoContext<NetworkIoMessage>) {
1507        io.register_timer(HOUSEKEEPING, self.config.housekeeping_timeout)
1508            .expect("Error registering housekeeping timer");
1509        io.message(NetworkIoMessage::Start).unwrap_or_else(|e| {
1510            warn!("Error sending IO notification: {:?}", e)
1511        });
1512    }
1513
1514    fn stream_hup(
1515        &self, io: &IoContext<NetworkIoMessage>, stream: StreamToken,
1516    ) {
1517        trace!("Hup: {}", stream);
1518        match stream {
1519            FIRST_SESSION..=LAST_SESSION => self.connection_closed(stream, io),
1520            _ => warn!("Unexpected hup"),
1521        }
1522    }
1523
1524    fn stream_readable(
1525        &self, io: &IoContext<NetworkIoMessage>, stream: StreamToken,
1526    ) {
1527        match stream {
1528            FIRST_SESSION..=LAST_SESSION => self.session_readable(stream, io),
1529            TCP_ACCEPT => self.accept(io),
1530            UDP_MESSAGE => self.udp_readable(io),
1531            _ => panic!("Received unknown readable token"),
1532        }
1533    }
1534
1535    fn stream_writable(
1536        &self, io: &IoContext<NetworkIoMessage>, stream: StreamToken,
1537    ) {
1538        match stream {
1539            FIRST_SESSION..=LAST_SESSION => self.session_writable(stream, io),
1540            UDP_MESSAGE => self.udp_writable(io),
1541            _ => panic!("Received unknown writable token"),
1542        }
1543    }
1544
1545    fn timeout(&self, io: &IoContext<NetworkIoMessage>, token: TimerToken) {
1546        match token {
1547            FIRST_SESSION..=LAST_SESSION => {
1548                debug!("Connection timeout: {}", token);
1549                self.kill_connection_by_token(
1550                    token,
1551                    io,
1552                    true,
1553                    Some(UpdateNodeOperation::Failure),
1554                    "handshake timeout", // reason
1555                );
1556            }
1557            HOUSEKEEPING => self.on_housekeeping(io),
1558            DISCOVERY_REFRESH => {
1559                // Run the _slow_ discovery if enough peers are connected
1560                let disc_general = self.has_enough_outgoing_peers(
1561                    None,
1562                    self.config.max_outgoing_peers,
1563                );
1564                let disc_archive = self.has_enough_outgoing_peers(
1565                    Some((NODE_TAG_NODE_TYPE, NODE_TAG_ARCHIVE)),
1566                    self.config.max_outgoing_peers_archive,
1567                );
1568                if disc_general || disc_archive {
1569                    self.discovery.lock().as_mut().map(|d| {
1570                        d.disc_option.general = disc_general;
1571                        d.disc_option.archive = disc_archive;
1572                        d.refresh();
1573                    });
1574                    io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1575                        debug!("Error updating discovery registration: {:?}", e)
1576                    });
1577                }
1578            }
1579            FAST_DISCOVERY_REFRESH => {
1580                // Run the fast discovery if not enough peers are connected
1581                let disc_general = !self.has_enough_outgoing_peers(
1582                    None,
1583                    self.config.max_outgoing_peers,
1584                );
1585                let disc_archive = !self.has_enough_outgoing_peers(
1586                    Some((NODE_TAG_NODE_TYPE, NODE_TAG_ARCHIVE)),
1587                    self.config.max_outgoing_peers_archive,
1588                );
1589                if disc_general || disc_archive {
1590                    self.discovery.lock().as_mut().map(|d| {
1591                        d.disc_option.general = disc_general;
1592                        d.disc_option.archive = disc_archive;
1593                        d.refresh();
1594                    });
1595                    io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1596                        debug!("Error updating discovery registration: {:?}", e)
1597                    });
1598                }
1599            }
1600            DISCOVERY_ROUND => {
1601                if let Some(d) = self.discovery.lock().as_mut() {
1602                    d.round(&UdpIoContext::new(
1603                        &self.udp_channel,
1604                        &self.node_db,
1605                    ))
1606                }
1607                io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1608                    debug!("Error updating discovery registration: {:?}", e)
1609                });
1610            }
1611            NODE_TABLE => {
1612                trace!("Refreshing node table");
1613                self.try_promote_untrusted();
1614                self.node_db.write().save();
1615            }
1616            CHECK_SESSIONS => self.on_check_sessions(io),
1617            SEND_DELAYED_MESSAGES => {
1618                if let Some(ref queue) = self.delayed_queue {
1619                    queue.send_delayed_messages(self);
1620                }
1621            }
1622            _ => match self.timers.read().get(&token).cloned() {
1623                Some(timer) => {
1624                    match self.handlers.read().get(&timer.protocol).cloned() {
1625                        None => warn!(
1626                            "No handler found for protocol: {:?}",
1627                            timer.protocol
1628                        ),
1629                        Some(h) => {
1630                            let network_context = NetworkContext::new(
1631                                io,
1632                                h,
1633                                timer.protocol,
1634                                self,
1635                            );
1636                            network_context
1637                                .protocol_handler()
1638                                .on_timeout(&network_context, timer.token);
1639                        }
1640                    }
1641                }
1642                None => {
1643                    warn!("Unknown timer token: {}", token);
1644                } // timer is not registered through us
1645            },
1646        }
1647    }
1648
1649    fn message(
1650        &self, io: &IoContext<NetworkIoMessage>, message: &NetworkIoMessage,
1651    ) {
1652        match message {
1653            NetworkIoMessage::Start => self.start(io).unwrap_or_else(|e| {
1654                warn!("Error starting network service: {:?}", e)
1655            }),
1656            NetworkIoMessage::AddHandler {
1657                handler,
1658                protocol,
1659                version,
1660                callback,
1661            } => {
1662                let h = handler.clone();
1663                let network_context =
1664                    NetworkContext::new(io, h, *protocol, self);
1665                network_context
1666                    .protocol_handler()
1667                    .initialize(&network_context);
1668                self.handlers.write().insert(*protocol, handler.clone());
1669                {
1670                    let protocols = &mut *self.metadata.protocols.write();
1671                    for protocol_info in protocols.iter() {
1672                        assert_ne!(
1673                            protocol, &protocol_info.protocol,
1674                            "Do not register same protocol twice"
1675                        );
1676                    }
1677                    protocols.push(ProtocolInfo {
1678                        protocol: *protocol,
1679                        version: *version,
1680                    });
1681                    self.metadata.minimum_peer_protocol_version.write().push(
1682                        ProtocolInfo {
1683                            protocol: *protocol,
1684                            version: handler.minimum_supported_version(),
1685                        },
1686                    );
1687                }
1688                info!(
1689                    "Protocol {:?} version {:?} registered.",
1690                    protocol, version
1691                );
1692                callback.send(()).expect("protocol register error");
1693            }
1694            NetworkIoMessage::AddTimer {
1695                ref protocol,
1696                ref delay,
1697                ref token,
1698            } => {
1699                let handler_token = {
1700                    let mut timer_counter = self.timer_counter.write();
1701                    let counter = &mut *timer_counter;
1702                    let handler_token = *counter;
1703                    *counter += 1;
1704                    handler_token
1705                };
1706                self.timers.write().insert(
1707                    handler_token,
1708                    ProtocolTimer {
1709                        protocol: *protocol,
1710                        token: *token,
1711                    },
1712                );
1713                io.register_timer(handler_token, *delay)
1714                    .unwrap_or_else(|e| {
1715                        debug!("Error registering timer {}: {:?}", token, e)
1716                    });
1717            }
1718            NetworkIoMessage::DispatchWork {
1719                ref protocol,
1720                ref work_type,
1721            } => {
1722                if let Some(handler) =
1723                    self.handlers.read().get(protocol).cloned()
1724                {
1725                    let network_context =
1726                        NetworkContext::new(io, handler, *protocol, self);
1727                    network_context
1728                        .protocol_handler()
1729                        .on_work_dispatch(&network_context, *work_type);
1730                } else {
1731                    warn!("Work is dispatched to unknown handler");
1732                }
1733            }
1734            NetworkIoMessage::HandleProtocolMessage {
1735                ref protocol,
1736                peer: _,
1737                ref node_id,
1738                ref data,
1739            } => {
1740                debug!("Receive ProtocolMsg {:?}", protocol);
1741                if let Some(handler) =
1742                    self.handlers.read().get(protocol).cloned()
1743                {
1744                    let network_context =
1745                        NetworkContext::new(io, handler, *protocol, self);
1746                    network_context.protocol_handler().on_message(
1747                        &network_context,
1748                        node_id,
1749                        data,
1750                    );
1751                } else {
1752                    warn!("Work is handled by unknown handler");
1753                }
1754            }
1755        }
1756    }
1757
1758    fn register_stream(
1759        &self, stream: StreamToken, reg: Token, poll_registry: &Registry,
1760    ) {
1761        match stream {
1762            FIRST_SESSION..=LAST_SESSION => {
1763                if let Some(session) = self.sessions.get(stream) {
1764                    session
1765                        .write()
1766                        .register_socket(reg, poll_registry)
1767                        .expect("Error registering socket");
1768                }
1769            }
1770            TCP_ACCEPT => {
1771                poll_registry
1772                    .register(
1773                        &mut *self.tcp_listener.lock(),
1774                        Token(TCP_ACCEPT),
1775                        Interest::READABLE | Interest::WRITABLE,
1776                    )
1777                    .expect("Error registering stream");
1778            }
1779            UDP_MESSAGE => {
1780                poll_registry
1781                    .register(
1782                        &mut *self.udp_socket.lock(),
1783                        reg,
1784                        Interest::READABLE | Interest::WRITABLE,
1785                    )
1786                    .expect("Error registering UDP socket");
1787            }
1788            _ => warn!("Unexpected stream registeration"),
1789        }
1790    }
1791
1792    fn deregister_stream(&self, stream: StreamToken, poll_registry: &Registry) {
1793        match stream {
1794            FIRST_SESSION..=LAST_SESSION => {
1795                if let Some(session) = self.sessions.get(stream) {
1796                    let mut sess = session.write();
1797                    if sess.expired() {
1798                        sess.deregister_socket(poll_registry)
1799                            .expect("Error deregistering socket");
1800                        if let Some(node_id) = sess.id() {
1801                            self.node_db.write().note_failure(
1802                                node_id, true,  /* by_connection */
1803                                false, /* trusted_only */
1804                            );
1805                        }
1806                        self.sessions.remove(&*sess);
1807                        debug!("Removed session: {:?}", *sess);
1808                    }
1809                }
1810            }
1811            _ => warn!("Unexpected stream deregistration"),
1812        }
1813    }
1814
1815    fn update_stream(
1816        &self, stream: StreamToken, reg: Token, poll_registry: &Registry,
1817    ) {
1818        match stream {
1819            FIRST_SESSION..=LAST_SESSION => {
1820                if let Some(session) = self.sessions.get(stream) {
1821                    session
1822                        .write()
1823                        .update_socket(reg, poll_registry)
1824                        .expect("Error updating socket");
1825                }
1826            }
1827            TCP_ACCEPT => poll_registry
1828                .reregister(
1829                    &mut *self.tcp_listener.lock(),
1830                    Token(TCP_ACCEPT),
1831                    Interest::READABLE | Interest::WRITABLE,
1832                )
1833                .expect("Error reregistering stream"),
1834            UDP_MESSAGE => {
1835                let mut udp_socket = self.udp_socket.lock();
1836                let udp_channel = self.udp_channel.read();
1837
1838                let registration = if udp_channel.any_sends_queued() {
1839                    Interest::READABLE | Interest::WRITABLE
1840                } else {
1841                    Interest::READABLE
1842                };
1843                poll_registry
1844                    .reregister(&mut *udp_socket, reg, registration)
1845                    .expect("Error reregistering UDP socket");
1846            }
1847            _ => warn!("Unexpected stream update"),
1848        }
1849    }
1850}
1851
1852struct DelayMessageContext {
1853    ts: Instant,
1854    io: IoContext<NetworkIoMessage>,
1855    protocol: ProtocolId,
1856    session: SharedSession,
1857    peer: NodeId,
1858    msg: Vec<u8>,
1859    /// The minimum peer protocol version since which the message is supported.
1860    min_protocol_version: ProtocolVersion,
1861    priority: SendQueuePriority,
1862}
1863
1864impl DelayMessageContext {
1865    pub fn new(
1866        ts: Instant, io: IoContext<NetworkIoMessage>, protocol: ProtocolId,
1867        session: SharedSession, peer: NodeId, msg: Vec<u8>,
1868        min_protocol_version: ProtocolVersion, priority: SendQueuePriority,
1869    ) -> Self {
1870        DelayMessageContext {
1871            ts,
1872            io,
1873            protocol,
1874            session,
1875            peer,
1876            msg,
1877            min_protocol_version,
1878            priority,
1879        }
1880    }
1881}
1882
1883impl Ord for DelayMessageContext {
1884    fn cmp(&self, other: &Self) -> Ordering { other.ts.cmp(&self.ts) }
1885}
1886
1887impl PartialOrd for DelayMessageContext {
1888    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1889        Some(self.cmp(other))
1890    }
1891}
1892
1893impl Eq for DelayMessageContext {}
1894
1895impl PartialEq for DelayMessageContext {
1896    fn eq(&self, other: &Self) -> bool { self.ts == other.ts }
1897}
1898
1899/// Wrapper around network service.
1900pub struct NetworkContext<'a> {
1901    io: &'a IoContext<NetworkIoMessage>,
1902    handler: Arc<dyn NetworkProtocolHandler + Sync>,
1903    protocol: ProtocolId,
1904    min_supported_version: ProtocolVersion,
1905    network_service: &'a NetworkServiceInner,
1906}
1907
1908impl<'a> NetworkContext<'a> {
1909    pub fn new(
1910        io: &'a IoContext<NetworkIoMessage>,
1911        handler: Arc<dyn NetworkProtocolHandler + Sync>, protocol: ProtocolId,
1912        network_service: &'a NetworkServiceInner,
1913    ) -> NetworkContext<'a> {
1914        let min_supported_version = handler.minimum_supported_version();
1915        NetworkContext {
1916            io,
1917            handler,
1918            protocol,
1919            min_supported_version,
1920            network_service,
1921        }
1922    }
1923
1924    pub fn protocol_handler(&self) -> &dyn NetworkProtocolHandler {
1925        &*self.handler
1926    }
1927}
1928
1929impl<'a> NetworkContextTrait for NetworkContext<'a> {
1930    fn get_protocol(&self) -> ProtocolId { self.protocol }
1931
1932    fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool> {
1933        self.network_service.get_peer_connection_origin(node_id)
1934    }
1935
1936    fn is_peer_self(&self, node_id: &NodeId) -> bool {
1937        *node_id == *self.network_service.metadata.id()
1938    }
1939
1940    fn self_node_id(&self) -> NodeId { *self.network_service.metadata.id() }
1941
1942    /// Message is sent through this method.
1943    fn send(
1944        &self, node_id: &NodeId, msg: Vec<u8>,
1945        min_protocol_version: ProtocolVersion,
1946        version_valid_till: ProtocolVersion, priority: SendQueuePriority,
1947    ) -> Result<(), Error> {
1948        if version_valid_till < self.min_supported_version {
1949            bail!(Error::SendUnsupportedMessage {
1950                protocol: self.protocol,
1951                msg_id: parse_msg_id_leb128_2_bytes_at_most(&mut &*msg),
1952                peer_protocol_version: None,
1953                min_supported_version: Some(self.min_supported_version),
1954            });
1955        }
1956
1957        if *node_id == *self.network_service.metadata.id() {
1958            self.handler.send_local_message(self, msg);
1959            return Ok(());
1960        }
1961
1962        let session = self.network_service.sessions.get_by_id(node_id);
1963        trace!("Sending {} bytes to {}", msg.len(), node_id);
1964        if let Some(session) = session {
1965            let latency =
1966                self.network_service.delayed_queue.as_ref().and_then(|q| {
1967                    session
1968                        .write()
1969                        .metadata
1970                        .id
1971                        .and_then(|id| q.latencies.read().get(&id).copied())
1972                });
1973            match latency {
1974                Some(latency) => {
1975                    let q =
1976                        self.network_service.delayed_queue.as_ref().unwrap();
1977                    let mut queue = q.queue.lock();
1978                    let ts_to_send = Instant::now() + latency;
1979                    queue.push(DelayMessageContext::new(
1980                        ts_to_send,
1981                        self.io.clone(),
1982                        self.protocol,
1983                        session,
1984                        *node_id,
1985                        msg,
1986                        min_protocol_version,
1987                        priority,
1988                    ));
1989                    self.io.register_timer_once_nocancel(
1990                        SEND_DELAYED_MESSAGES,
1991                        latency,
1992                    )?;
1993                    trace!("register delayed timer delay:{:?} ts_to_send:{:?} length:{}", latency, ts_to_send, queue.len());
1994                }
1995                None => {
1996                    session.write().send_packet(
1997                        self.io,
1998                        Some(self.protocol),
1999                        min_protocol_version,
2000                        session::PACKET_USER,
2001                        msg,
2002                        priority,
2003                    )?;
2004                }
2005            }
2006            // TODO: Handle result from send_packet()
2007        }
2008        Ok(())
2009    }
2010
2011    fn disconnect_peer(
2012        &self, node_id: &NodeId, op: Option<UpdateNodeOperation>, reason: &str,
2013    ) {
2014        self.network_service
2015            .kill_connection(node_id, self.io, true, op, reason);
2016    }
2017
2018    fn register_timer(
2019        &self, token: TimerToken, delay: Duration,
2020    ) -> Result<(), Error> {
2021        self.io
2022            .message(NetworkIoMessage::AddTimer {
2023                token,
2024                delay,
2025                protocol: self.protocol,
2026            })
2027            .unwrap_or_else(|e| {
2028                warn!("Error sending network IO message: {:?}", e)
2029            });
2030        Ok(())
2031    }
2032
2033    fn dispatch_work(&self, work_type: HandlerWorkType) {
2034        self.io
2035            .message(NetworkIoMessage::DispatchWork {
2036                protocol: self.protocol,
2037                work_type,
2038            })
2039            .expect("Error sending network IO message");
2040    }
2041
2042    fn insert_peer_node_tag(&self, peer: NodeId, key: &str, value: &str) {
2043        self.network_service
2044            .node_db
2045            .write()
2046            .set_tag(peer, key, value);
2047    }
2048}
2049
2050fn save_key(path: &Path, key: &Secret) {
2051    let mut path_buf = PathBuf::from(path);
2052    if let Err(e) = fs::create_dir_all(path_buf.as_path()) {
2053        warn!("Error creating key directory: {:?}", e);
2054        return;
2055    };
2056    path_buf.push("key");
2057    let path = path_buf.as_path();
2058    let mut file = match fs::File::create(&path) {
2059        Ok(file) => file,
2060        Err(e) => {
2061            warn!("Error creating key file: {:?}", e);
2062            return;
2063        }
2064    };
2065    if let Err(e) = restrict_permissions_owner(path, true, false) {
2066        warn!("Failed to modify permissions of the file ({})", e);
2067    }
2068    if let Err(e) = file.write(&key.to_hex().into_bytes()) {
2069        warn!("Error writing key file: {:?}", e);
2070    }
2071}
2072
2073fn load_key(path: &Path) -> Option<Secret> {
2074    let mut path_buf = PathBuf::from(path);
2075    path_buf.push("key");
2076    let mut file = match fs::File::open(path_buf.as_path()) {
2077        Ok(file) => file,
2078        Err(e) => {
2079            debug!("failed to open key file: {:?}", e);
2080            return None;
2081        }
2082    };
2083    let mut buf = String::new();
2084    match file.read_to_string(&mut buf) {
2085        Ok(_) => {}
2086        Err(e) => {
2087            warn!("Error reading key file: {:?}", e);
2088            return None;
2089        }
2090    }
2091    match Secret::from_str(&buf) {
2092        Ok(key) => Some(key),
2093        Err(e) => {
2094            warn!("Error parsing key file: {:?}", e);
2095            None
2096        }
2097    }
2098}
2099
2100pub fn load_pos_private_key(
2101    path: &Path,
2102) -> Option<(ConsensusPrivateKey, Option<ConsensusVRFPrivateKey>)> {
2103    let mut path_buf = PathBuf::from(path);
2104    path_buf.push("pos_key");
2105    let mut file = match fs::File::open(path_buf.as_path()) {
2106        Ok(file) => file,
2107        Err(e) => {
2108            debug!("failed to open key file: {:?}", e);
2109            return None;
2110        }
2111    };
2112    let mut buf = String::new();
2113    match file.read_to_string(&mut buf) {
2114        Ok(_) => {}
2115        Err(e) => {
2116            warn!("Error reading key file: {:?}", e);
2117            return None;
2118        }
2119    }
2120    let key_str: Vec<_> = buf.split(",").collect();
2121    let private_key =
2122        match ConsensusPrivateKey::from_encoded_string(&key_str[0]) {
2123            Ok(key) => Some(key),
2124            Err(e) => {
2125                warn!("Error parsing key file: {:?}", e);
2126                None
2127            }
2128        }?;
2129    if key_str.len() <= 1 {
2130        return Some((private_key, None));
2131    }
2132    let vrf_private_key =
2133        match ConsensusVRFPrivateKey::from_encoded_string(&key_str[1]) {
2134            Ok(key) => Some(key),
2135            Err(e) => {
2136                warn!("Error parsing key file: {:?}", e);
2137                None
2138            }
2139        }?;
2140    Some((private_key, Some(vrf_private_key)))
2141}
2142
2143impl std::fmt::Display for ProtocolVersion {
2144    fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> {
2145        write!(f, "{}", self.0)
2146    }
2147}
2148
2149impl std::ops::Deref for ProtocolVersion {
2150    type Target = u8;
2151
2152    fn deref(&self) -> &Self::Target { &self.0 }
2153}
2154
2155impl Encodable for ProtocolVersion {
2156    fn rlp_append(&self, s: &mut RlpStream) { s.append_internal(&self.0); }
2157}
2158
2159impl Decodable for ProtocolVersion {
2160    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
2161        Ok(Self(rlp.as_val()?))
2162    }
2163}