network/
service.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use std::{
6    cmp::{min, Ordering},
7    collections::{BinaryHeap, HashMap, HashSet, VecDeque},
8    fmt::Formatter,
9    fs,
10    io::{self, Read, Write},
11    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
12    path::{Path, PathBuf},
13    str::FromStr,
14    sync::{atomic::Ordering as AtomicOrdering, Arc},
15    time::{Duration, Instant},
16};
17
18use keccak_hash::keccak;
19use mio::{
20    net::{TcpListener, TcpStream, UdpSocket},
21    Interest, Registry, Token,
22};
23use parity_path::restrict_permissions_owner;
24use parking_lot::{Mutex, RwLock};
25use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
26
27use crate::keylib::{sign, Generator, KeyPair, Random, Secret};
28use cfx_addr::Network;
29use cfx_bytes::Bytes;
30use cfx_util_macros::bail;
31use diem_crypto::ValidCryptoMaterialStringExt;
32use diem_types::{
33    account_address::from_consensus_public_key,
34    validator_config::{
35        ConsensusPrivateKey, ConsensusPublicKey, ConsensusVRFPrivateKey,
36        ConsensusVRFPublicKey,
37    },
38};
39use log::{debug, info, trace, warn};
40use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
41use priority_send_queue::SendQueuePriority;
42use serde::{Deserialize, Serialize};
43
44use crate::{
45    discovery::Discovery,
46    handshake::BYPASS_CRYPTOGRAPHY,
47    iolib::{
48        IoContext, IoHandler, IoService, MapNonBlock, StreamToken, TimerToken,
49    },
50    ip_utils::{map_external_address, select_public_address},
51    node_database::NodeDatabase,
52    node_table::*,
53    parse_msg_id_leb128_2_bytes_at_most,
54    session::{self, Session, SessionData, SessionDetails},
55    session_manager::SessionManager,
56    Error, HandlerWorkType, IpFilter, NatType, NetworkConfiguration,
57    NetworkContext as NetworkContextTrait, NetworkIoMessage,
58    NetworkProtocolHandler, PeerInfo, ProtocolId, ProtocolInfo,
59    UpdateNodeOperation, NODE_TAG_ARCHIVE, NODE_TAG_NODE_TYPE,
60};
61
62use super::DisconnectReason;
63
64const MAX_SESSIONS: usize = 2048;
65
66const DEFAULT_PORT: u16 = 32323;
67
68const FIRST_SESSION: StreamToken = 0;
69const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1;
70const SYS_TIMER: TimerToken = LAST_SESSION + 1;
71const TCP_ACCEPT: StreamToken = SYS_TIMER + 1;
72const HOUSEKEEPING: TimerToken = SYS_TIMER + 2;
73const UDP_MESSAGE: StreamToken = SYS_TIMER + 3;
74const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4;
75const FAST_DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 5;
76const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 6;
77const NODE_TABLE: TimerToken = SYS_TIMER + 7;
78const SEND_DELAYED_MESSAGES: TimerToken = SYS_TIMER + 8;
79const CHECK_SESSIONS: TimerToken = SYS_TIMER + 9;
80const HANDLER_TIMER: TimerToken = LAST_SESSION + 256;
81const STOP_NET_POLL: TimerToken = HANDLER_TIMER + 1;
82
83pub const DEFAULT_HOUSEKEEPING_TIMEOUT: Duration = Duration::from_secs(2);
84// for DISCOVERY_REFRESH TimerToken
85pub const DEFAULT_DISCOVERY_REFRESH_TIMEOUT: Duration =
86    Duration::from_secs(120);
87// for FAST_DISCOVERY_REFRESH TimerToken
88pub const DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT: Duration =
89    Duration::from_secs(10);
90// for DISCOVERY_ROUND TimerToken
91pub const DEFAULT_DISCOVERY_ROUND_TIMEOUT: Duration =
92    Duration::from_millis(500);
93// The ticker interval for NODE_TABLE, i.e., how often the program will refresh
94// the NODE_TABLE.
95pub const DEFAULT_NODE_TABLE_TIMEOUT: Duration = Duration::from_secs(300);
96// The lifetime threshold of the connection for promoting a peer from untrusted
97// to trusted.
98pub const DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION: Duration =
99    Duration::from_secs(3 * 24 * 3600);
100const DEFAULT_CHECK_SESSIONS_TIMEOUT: Duration = Duration::from_secs(10);
101
102#[derive(
103    Clone,
104    Copy,
105    Debug,
106    Default,
107    Eq,
108    PartialOrd,
109    PartialEq,
110    Serialize,
111    Deserialize,
112)]
113pub struct ProtocolVersion(pub u8);
114
115impl MallocSizeOf for ProtocolVersion {
116    fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { 0 }
117}
118
119pub const MAX_DATAGRAM_SIZE: usize = 1280;
120pub const UDP_PROTOCOL_DISCOVERY: u8 = 1;
121
122pub struct Datagram {
123    pub payload: Bytes,
124    pub address: SocketAddr,
125}
126
127pub struct UdpChannel {
128    pub send_queue: VecDeque<Datagram>,
129}
130
131impl Default for UdpChannel {
132    fn default() -> Self { Self::new() }
133}
134
135impl UdpChannel {
136    pub fn new() -> UdpChannel {
137        UdpChannel {
138            send_queue: VecDeque::new(),
139        }
140    }
141
142    pub fn any_sends_queued(&self) -> bool { !self.send_queue.is_empty() }
143
144    pub fn dequeue_send(&mut self) -> Option<Datagram> {
145        self.send_queue.pop_front()
146    }
147
148    pub fn requeue_send(&mut self, datagram: Datagram) {
149        self.send_queue.push_front(datagram)
150    }
151}
152
153pub struct UdpIoContext<'a> {
154    pub channel: &'a RwLock<UdpChannel>,
155    pub node_db: &'a RwLock<NodeDatabase>,
156}
157
158impl<'a> UdpIoContext<'a> {
159    pub fn new(
160        channel: &'a RwLock<UdpChannel>, node_db: &'a RwLock<NodeDatabase>,
161    ) -> UdpIoContext<'a> {
162        UdpIoContext { channel, node_db }
163    }
164
165    pub fn send(&self, payload: Bytes, address: SocketAddr) {
166        self.channel
167            .write()
168            .send_queue
169            .push_back(Datagram { payload, address });
170    }
171}
172
173/// NetworkService implements the P2P communication between different nodes. It
174/// manages connections between peers, including accepting new peers or dropping
175/// existing peers. Inside NetworkService, it has an IoService event loop with a
176/// thread pool.
177pub struct NetworkService {
178    pub io_service: Option<IoService<NetworkIoMessage>>,
179    pub inner: Option<Arc<NetworkServiceInner>>,
180    config: NetworkConfiguration,
181}
182
183impl NetworkService {
184    pub fn new(config: NetworkConfiguration) -> NetworkService {
185        NetworkService {
186            io_service: None,
187            inner: None,
188            config,
189        }
190    }
191
192    pub fn is_consortium(&self) -> bool { self.config.is_consortium }
193
194    pub fn get_network_type(&self) -> &Network {
195        self.config.get_network_type()
196    }
197
198    pub fn network_id(&self) -> u64 { self.config.id }
199
200    pub fn is_test_mode(&self) -> bool { self.config.test_mode }
201
202    /// Create and start the event loop inside the NetworkService
203    pub fn initialize(
204        &mut self, pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
205    ) -> Result<(), Error> {
206        let raw_io_service =
207            IoService::<NetworkIoMessage>::start(STOP_NET_POLL)?;
208        self.io_service = Some(raw_io_service);
209
210        if self.inner.is_none() {
211            if self.config.test_mode {
212                BYPASS_CRYPTOGRAPHY.store(true, AtomicOrdering::Relaxed);
213            }
214
215            let inner = Arc::new(match self.config.test_mode {
216                true => NetworkServiceInner::new_with_latency(
217                    &self.config,
218                    pos_pub_keys,
219                )?,
220                false => NetworkServiceInner::new(&self.config, pos_pub_keys)?,
221            });
222            self.io_service
223                .as_ref()
224                .unwrap()
225                .register_handler(inner.clone())?;
226            self.inner = Some(inner);
227        }
228        Ok(())
229    }
230
231    pub fn start(&self) {
232        let handler = self.inner.as_ref().unwrap().clone();
233        self.io_service
234            .as_ref()
235            .expect("Already set")
236            .start_network_poll(handler, MAX_SESSIONS);
237    }
238
239    /// Add a P2P peer to the client as a trusted node
240    pub fn add_peer(&self, node: NodeEntry) -> Result<(), Error> {
241        if let Some(ref x) = self.inner {
242            x.node_db.write().insert_trusted(node);
243            Ok(())
244        } else {
245            Err("Network service not started yet!".into())
246        }
247    }
248
249    /// Drop a P2P peer from the client
250    pub fn drop_peer(&self, node: NodeEntry) -> Result<(), Error> {
251        if let Some(ref x) = self.inner {
252            x.drop_node(node.id)
253        } else {
254            Err("Network service not started yet!".into())
255        }
256    }
257
258    /// Get the local address of the client
259    pub fn local_addr(&self) -> Option<SocketAddr> {
260        self.inner.as_ref().map(|inner_ref| inner_ref.local_addr())
261    }
262
263    /// Register a new protocol handler
264    pub fn register_protocol(
265        &self, handler: Arc<dyn NetworkProtocolHandler + Sync>,
266        protocol: ProtocolId, version: ProtocolVersion,
267    ) -> Result<(), Error> {
268        let (tx, rx) = std::sync::mpsc::sync_channel(0);
269        self.io_service.as_ref().unwrap().send_message(
270            NetworkIoMessage::AddHandler {
271                handler,
272                protocol,
273                version,
274                callback: tx,
275            },
276        )?;
277        // Only error if channel closed.
278        rx.recv().expect("protocol register error");
279        Ok(())
280    }
281
282    /// Executes action in the network context
283    pub fn with_context<F, R>(
284        &self, handler: Arc<dyn NetworkProtocolHandler + Sync>,
285        protocol: ProtocolId, action: F,
286    ) -> Result<R, String>
287    where
288        F: FnOnce(&NetworkContext) -> R,
289    {
290        let io = IoContext::new(self.io_service.as_ref().unwrap().channel(), 0);
291        match self.inner {
292            Some(ref inner) => {
293                Ok(inner.with_context(handler, protocol, &io, action))
294            }
295            None => Err("Network service not started yet!".to_owned()),
296        }
297    }
298
299    /// Return the current connected peers
300    pub fn get_peer_info(&self) -> Option<Vec<PeerInfo>> {
301        self.inner.as_ref().map(|inner| inner.get_peer_info())
302    }
303
304    /// Sign a challenge to provide self NodeId
305    pub fn sign_challenge(&self, challenge: Vec<u8>) -> Result<Vec<u8>, Error> {
306        let hash = keccak(challenge);
307        if let Some(ref inner) = self.inner {
308            let signature = match sign(inner.metadata.keys.secret(), &hash) {
309                Ok(s) => s,
310                Err(e) => {
311                    warn!("Error signing hello packet");
312                    return Err(Error::from(e));
313                }
314            };
315            Ok(signature[..].to_owned())
316        } else {
317            Err("Network service not started yet!".into())
318        }
319    }
320
321    pub fn net_key_pair(&self) -> Result<KeyPair, Error> {
322        if let Some(ref inner) = self.inner {
323            Ok(inner.metadata.keys.clone())
324        } else {
325            Err("Network service not started yet!".into())
326        }
327    }
328
329    pub fn pos_public_key(&self) -> Option<ConsensusPublicKey> {
330        if let Some(ref inner) = self.inner {
331            inner.sessions.self_pos_public_key.clone().map(|k| k.0)
332        } else {
333            None
334        }
335    }
336
337    pub fn add_latency(
338        &self, id: NodeId, latency_ms: f64,
339    ) -> Result<(), Error> {
340        if let Some(ref inner) = self.inner {
341            inner.add_latency(id, latency_ms)
342        } else {
343            Err("Network service not started yet!".into())
344        }
345    }
346
347    pub fn get_node(&self, id: &NodeId) -> Option<(bool, Node)> {
348        let inner = self.inner.as_ref()?;
349        let node_db = inner.node_db.read();
350        let (trusted, node) = node_db.get_with_trusty(id)?;
351        Some((trusted, node.clone()))
352    }
353
354    pub fn get_detailed_sessions(
355        &self, node_id: Option<NodeId>,
356    ) -> Option<Vec<SessionDetails>> {
357        let inner = self.inner.as_ref()?;
358        match node_id {
359            None => Some(
360                inner
361                    .sessions
362                    .all()
363                    .iter()
364                    .map(|s| s.read().details())
365                    .collect(),
366            ),
367            Some(id) => {
368                let session = inner.sessions.get_by_id(&id)?;
369                let details = session.read().details();
370                Some(vec![details])
371            }
372        }
373    }
374
375    pub fn disconnect_node(
376        &self, id: &NodeId, op: Option<UpdateNodeOperation>,
377    ) -> bool {
378        if self.inner.is_none() || self.io_service.is_none() {
379            return false;
380        }
381        let inner = self.inner.as_ref().unwrap();
382        let io = IoContext::new(self.io_service.as_ref().unwrap().channel(), 0);
383        inner.kill_connection(
384            id,
385            &io,
386            true,
387            op,
388            "disconnect requested", // reason
389        );
390        true
391    }
392
393    pub fn save_node_db(&self) {
394        if let Some(inner) = &self.inner {
395            inner.node_db.write().save();
396        }
397    }
398}
399
400type SharedSession = Arc<RwLock<Session>>;
401
402pub struct HostMetadata {
403    pub network_id: u64,
404    /// Our private and public keys.
405    pub keys: KeyPair,
406    pub protocols: RwLock<Vec<ProtocolInfo>>,
407    pub minimum_peer_protocol_version: RwLock<Vec<ProtocolInfo>>,
408    pub local_address: SocketAddr,
409    /// Local address + discovery port
410    pub local_endpoint: NodeEndpoint,
411    /// Public address + discovery port
412    pub public_endpoint: NodeEndpoint,
413}
414
415impl HostMetadata {
416    pub(crate) fn secret(&self) -> &Secret { self.keys.secret() }
417
418    pub(crate) fn id(&self) -> &NodeId { self.keys.public() }
419}
420
421#[derive(Copy, Clone)]
422struct ProtocolTimer {
423    pub protocol: ProtocolId,
424    pub token: TimerToken, // Handler level token
425}
426
427/// The inner implementation of NetworkService. Note that all accesses to the
428/// RWLocks of the fields have to follow the defined order to avoid race
429pub struct NetworkServiceInner {
430    pub sessions: SessionManager,
431    pub metadata: HostMetadata,
432    pub config: NetworkConfiguration,
433    udp_socket: Mutex<UdpSocket>,
434    tcp_listener: Mutex<TcpListener>,
435    udp_channel: RwLock<UdpChannel>,
436    discovery: Mutex<Option<Discovery>>,
437    handlers:
438        RwLock<HashMap<ProtocolId, Arc<dyn NetworkProtocolHandler + Sync>>>,
439    timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
440    timer_counter: RwLock<usize>,
441    pub node_db: RwLock<NodeDatabase>,
442    reserved_nodes: RwLock<HashSet<NodeId>>,
443    dropped_nodes: RwLock<HashSet<NodeId>>,
444
445    is_consortium: bool,
446
447    /// Delayed message queue and corresponding latency
448    delayed_queue: Option<DelayedQueue>,
449}
450
451struct DelayedQueue {
452    queue: Mutex<BinaryHeap<DelayMessageContext>>,
453    latencies: RwLock<HashMap<NodeId, Duration>>,
454}
455
456impl DelayedQueue {
457    fn new() -> Self {
458        DelayedQueue {
459            queue: Mutex::new(BinaryHeap::new()),
460            latencies: RwLock::new(HashMap::new()),
461        }
462    }
463
464    fn send_delayed_messages(&self, network_service: &NetworkServiceInner) {
465        let context = self.queue.lock().pop().unwrap();
466        let r = context.session.write().send_packet(
467            &context.io,
468            Some(context.protocol),
469            context.min_protocol_version,
470            session::PACKET_USER,
471            context.msg,
472            context.priority,
473        );
474        match r {
475            Ok(_) => {}
476            Err(Error::Expired) => {
477                // If a connection is set expired, it should have been killed
478                // before, and the stored `context.peer` may have been reused by
479                // another connection, so we cannot kill it again
480                info!(
481                    "Error sending delayed message to expired connection {:?}",
482                    context.peer
483                );
484            }
485            Err(e) => {
486                info!(
487                    "Error sending delayed message: peer={:?} err={:?}",
488                    context.peer, e
489                );
490                network_service.kill_connection(
491                    &context.peer,
492                    &context.io,
493                    true,
494                    Some(UpdateNodeOperation::Failure),
495                    "failed to send delayed message", // reason
496                );
497            }
498        };
499    }
500}
501
502impl NetworkServiceInner {
503    pub fn new(
504        config: &NetworkConfiguration,
505        pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
506    ) -> Result<NetworkServiceInner, Error> {
507        let mut listen_address = match config.listen_address {
508            None => SocketAddr::V4(SocketAddrV4::new(
509                Ipv4Addr::new(0, 0, 0, 0),
510                DEFAULT_PORT,
511            )),
512            Some(addr) => addr,
513        };
514
515        let keys = if let Some(ref secret) = config.use_secret {
516            KeyPair::from_secret(secret.clone())?
517        } else {
518            config
519                .config_path
520                .clone()
521                .and_then(|ref p| load_key(Path::new(&p)))
522                .map_or_else(
523                    || {
524                        let key = Random
525                            .generate()
526                            .expect("Error generating random key pair");
527                        if let Some(path) = config.config_path.clone() {
528                            save_key(Path::new(&path), key.secret());
529                        }
530                        key
531                    },
532                    |s| {
533                        KeyPair::from_secret(s)
534                            .expect("Error creating node secret key")
535                    },
536                )
537        };
538        info!("Self pos public key: {:?}", pos_pub_keys);
539
540        info!("Self node id: {:?}", *keys.public());
541
542        let tcp_listener = TcpListener::bind(listen_address)?;
543        listen_address = SocketAddr::new(
544            listen_address.ip(),
545            tcp_listener.local_addr()?.port(),
546        );
547        debug!("Listening at {:?}", listen_address);
548        let udp_port = config.udp_port.unwrap_or_else(|| listen_address.port());
549        let local_endpoint = NodeEndpoint {
550            address: listen_address,
551            udp_port,
552        };
553        let mut udp_addr = local_endpoint.address;
554        udp_addr.set_port(local_endpoint.udp_port);
555        let udp_socket =
556            UdpSocket::bind(udp_addr).expect("Error binding UDP socket");
557
558        let public_address = config.public_address;
559        let public_endpoint = match public_address {
560            None => {
561                let public_address =
562                    select_public_address(local_endpoint.address.port());
563                let public_endpoint = NodeEndpoint {
564                    address: public_address,
565                    udp_port: local_endpoint.udp_port,
566                };
567                if config.nat_enabled {
568                    match map_external_address(&local_endpoint, &NatType::Any) {
569                        Some(endpoint) => {
570                            info!(
571                                "NAT mapped to external address {}",
572                                endpoint.address
573                            );
574                            endpoint
575                        }
576                        None => public_endpoint,
577                    }
578                } else {
579                    public_endpoint
580                }
581            }
582            Some(addr) => NodeEndpoint {
583                address: addr,
584                udp_port: local_endpoint.udp_port,
585            },
586        };
587
588        let allow_ips = config.ip_filter.clone();
589        let discovery = {
590            if config.discovery_enabled {
591                Some(Discovery::new(
592                    &keys,
593                    public_endpoint.clone(),
594                    allow_ips,
595                    config.discovery_config.clone(),
596                ))
597            } else {
598                None
599            }
600        };
601
602        let nodes_path = config.config_path.clone();
603        let own_node_id = *keys.public();
604
605        let mut inner = NetworkServiceInner {
606            metadata: HostMetadata {
607                network_id: config.id,
608                keys,
609                protocols: RwLock::new(Vec::new()),
610                minimum_peer_protocol_version: Default::default(),
611                local_address: listen_address,
612                local_endpoint,
613                public_endpoint,
614            },
615            config: config.clone(),
616            udp_channel: RwLock::new(UdpChannel::new()),
617            discovery: Mutex::new(discovery),
618            udp_socket: Mutex::new(udp_socket),
619            tcp_listener: Mutex::new(tcp_listener),
620            sessions: SessionManager::new(
621                FIRST_SESSION,
622                MAX_SESSIONS,
623                config.max_incoming_peers,
624                own_node_id,
625                &config.session_ip_limit_config,
626                Some(pos_pub_keys),
627            ),
628            handlers: RwLock::new(HashMap::new()),
629            timers: RwLock::new(HashMap::new()),
630            timer_counter: RwLock::new(HANDLER_TIMER),
631            node_db: RwLock::new(NodeDatabase::new(
632                nodes_path,
633                config.subnet_quota,
634            )),
635            reserved_nodes: RwLock::new(HashSet::new()),
636            dropped_nodes: RwLock::new(HashSet::new()),
637            is_consortium: config.is_consortium,
638            delayed_queue: None,
639        };
640
641        for n in &config.boot_nodes {
642            inner.add_boot_node(n);
643        }
644
645        let reserved_nodes = config.reserved_nodes.clone();
646        for n in reserved_nodes {
647            if let Err(e) = inner.add_reserved_node(&n) {
648                debug!("Error parsing node id: {}: {:?}", n, e);
649            }
650        }
651
652        Ok(inner)
653    }
654
655    pub fn new_with_latency(
656        config: &NetworkConfiguration,
657        pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
658    ) -> Result<NetworkServiceInner, Error> {
659        let mut inner = NetworkServiceInner::new(config, pos_pub_keys)?;
660        inner.delayed_queue = Some(DelayedQueue::new());
661        Ok(inner)
662    }
663
664    pub fn add_latency(
665        &self, peer: NodeId, latency_ms: f64,
666    ) -> Result<(), Error> {
667        match self.delayed_queue {
668            Some(ref queue) => {
669                let mut latencies = queue.latencies.write();
670                latencies
671                    .insert(peer, Duration::from_millis(latency_ms as u64));
672                Ok(())
673            }
674            None => Err(
675                "conflux not in test mode, and does not support add_latency"
676                    .into(),
677            ),
678        }
679    }
680
681    pub fn get_ip_filter(&self) -> &IpFilter { &self.config.ip_filter }
682
683    fn add_boot_node(&self, id: &str) {
684        match Node::from_str(id) {
685            Err(e) => {
686                debug!("Could not add node {}: {:?}", id, e);
687            }
688            Ok(n) => {
689                self.node_db.write().insert_trusted(NodeEntry {
690                    id: n.id,
691                    endpoint: n.endpoint,
692                });
693            }
694        }
695    }
696
697    fn add_reserved_node(&mut self, id: &str) -> Result<(), Error> {
698        let n = Node::from_str(id)?;
699        self.node_db.write().insert_trusted(NodeEntry {
700            id: n.id,
701            endpoint: n.endpoint.clone(),
702        });
703        self.reserved_nodes.write().insert(n.id);
704        Ok(())
705    }
706
707    fn initialize_udp_protocols(
708        &self, io: &IoContext<NetworkIoMessage>,
709    ) -> Result<(), Error> {
710        // Initialize discovery
711        if let Some(discovery) = self.discovery.lock().as_mut() {
712            let allow_ips = self.config.ip_filter.clone();
713            let nodes = self.node_db.read().sample_trusted_nodes(
714                self.config.discovery_config.discover_node_count,
715                &allow_ips,
716            );
717            discovery.try_ping_nodes(
718                &UdpIoContext::new(&self.udp_channel, &self.node_db),
719                nodes,
720            );
721            io.register_timer(
722                FAST_DISCOVERY_REFRESH,
723                self.config.fast_discovery_refresh_timeout,
724            )?;
725            io.register_timer(
726                DISCOVERY_REFRESH,
727                self.config.discovery_refresh_timeout,
728            )?;
729            io.register_timer(
730                DISCOVERY_ROUND,
731                self.config.discovery_round_timeout,
732            )?;
733        }
734        io.register_timer(NODE_TABLE, self.config.node_table_timeout)?;
735        io.register_timer(CHECK_SESSIONS, DEFAULT_CHECK_SESSIONS_TIMEOUT)?;
736
737        Ok(())
738    }
739
740    fn try_promote_untrusted(&self) {
741        // Get NodeIds from incoming connections
742        let mut incoming_ids: Vec<NodeId> = Vec::new();
743        for s in self.sessions.all() {
744            if let Some(s) = s.try_read() {
745                if s.is_ready() && !s.metadata.originated && !s.expired() {
746                    // is live incoming connection
747                    if let Some(id) = s.metadata.id {
748                        incoming_ids.push(id);
749                    }
750                }
751            }
752        }
753
754        // Check each live connection for its lifetime.
755        // Promote the peers with live connection for a threshold period
756        self.node_db.write().promote(
757            incoming_ids,
758            self.config.connection_lifetime_for_promotion,
759        );
760    }
761
762    pub fn local_addr(&self) -> SocketAddr { self.metadata.local_address }
763
764    fn drop_node(&self, local_id: NodeId) -> Result<(), Error> {
765        let removed_node = self.node_db.write().remove(&local_id);
766
767        if let Some(node) = removed_node {
768            assert_eq!(node.id, local_id);
769            if let Some(_stream_token) = node.stream_token {
770                let mut wd = self.dropped_nodes.write();
771                wd.insert(node.id);
772            }
773        }
774
775        Ok(())
776    }
777
778    fn has_enough_outgoing_peers(
779        &self, tag: Option<(&str, &str)>, max: usize,
780    ) -> bool {
781        let count = match tag {
782            Some((k, v)) => self.sessions.count_with_tag(&k.into(), &v.into()),
783            None => self.sessions.stat().1, // egress count
784        };
785
786        count >= max
787    }
788
789    fn on_housekeeping(&self, io: &IoContext<NetworkIoMessage>) {
790        if self.is_consortium {
791            unimplemented!();
792        } else {
793            self.connect_peers(io);
794        }
795        self.drop_peers(io);
796    }
797
798    // Connect to all reserved and trusted peers if not yet
799    fn connect_peers(&self, io: &IoContext<NetworkIoMessage>) {
800        if self
801            .metadata
802            .minimum_peer_protocol_version
803            .read()
804            .is_empty()
805        {
806            // The protocol handler has not been registered, we just wait for
807            // the next time.
808            return;
809        }
810
811        let self_id = *self.metadata.id();
812
813        let sampled_archive_nodes = self.sample_archive_nodes();
814
815        let (handshake_count, egress_count, ingress_count) =
816            self.sessions.stat();
817        let samples;
818        {
819            let egress_attempt_count = if self.config.max_outgoing_peers
820                > egress_count + sampled_archive_nodes.len()
821            {
822                self.config.max_outgoing_peers
823                    - egress_count
824                    - sampled_archive_nodes.len()
825            } else {
826                0
827            };
828            samples = self.node_db.read().sample_trusted_node_ids(
829                egress_attempt_count as u32,
830                &self.config.ip_filter,
831            );
832        }
833
834        let reserved_nodes = self.reserved_nodes.read();
835        // Try to connect all reserved peers and trusted peers
836        let nodes = reserved_nodes
837            .iter()
838            .cloned()
839            .chain(sampled_archive_nodes)
840            .chain(samples);
841
842        let max_handshakes_per_round = self.config.max_handshakes / 2;
843        let mut started: usize = 0;
844        for id in nodes
845            .filter(|id| !self.sessions.contains_node(id) && *id != self_id)
846            .take(min(
847                max_handshakes_per_round,
848                self.config.max_handshakes.saturating_sub(handshake_count),
849            ))
850        {
851            self.connect_peer(&id, io);
852            started += 1;
853        }
854        debug!(
855            "Connecting peers: {} sessions, {} pending + {} started",
856            egress_count + ingress_count,
857            handshake_count,
858            started
859        );
860        if egress_count + ingress_count == 0 {
861            warn!(
862                "No peers connected at this moment, {} pending + {} started",
863                handshake_count, started
864            );
865        }
866    }
867
868    /// Sample archive nodes for outgoing connections if not enough.
869    fn sample_archive_nodes(&self) -> HashSet<NodeId> {
870        if self.config.max_outgoing_peers_archive == 0 {
871            return HashSet::new();
872        }
873
874        let key: String = NODE_TAG_NODE_TYPE.into();
875        let value: String = NODE_TAG_ARCHIVE.into();
876        let archive_sessions = self.sessions.count_with_tag(&key, &value);
877
878        if archive_sessions >= self.config.max_outgoing_peers_archive {
879            return HashSet::new();
880        }
881
882        self.node_db.read().sample_trusted_node_ids_with_tag(
883            (self.config.max_outgoing_peers_archive - archive_sessions) as u32,
884            &key,
885            &value,
886        )
887    }
888
889    // Kill connections of all dropped peers
890    fn drop_peers(&self, io: &IoContext<NetworkIoMessage>) {
891        {
892            if self.dropped_nodes.read().is_empty() {
893                return;
894            }
895        }
896
897        let mut killed_nodes = HashSet::new();
898        let mut w = self.dropped_nodes.write();
899        for node_id in w.iter() {
900            if self.kill_connection(
901                node_id,
902                io,
903                true,
904                Some(UpdateNodeOperation::Failure),
905                "peer dropped in manual", // reason
906            ) {
907                killed_nodes.insert(*node_id);
908            }
909        }
910
911        for node_id in killed_nodes.iter() {
912            w.remove(node_id);
913        }
914    }
915
916    fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage>) {
917        if self.sessions.contains_node(id) {
918            trace!("Abort connect. Node already connected");
919            return;
920        }
921
922        let (socket, address) = {
923            let address = {
924                // outgoing connection must pick node from trusted node table
925                if let Some(node) = self.node_db.read().get(id, true) {
926                    node.endpoint.address
927                } else {
928                    debug!("Abort connect. Node expired");
929                    return;
930                }
931            };
932
933            if !self.sessions.is_ip_allowed(&address.ip()) {
934                debug!("cannot create outgoing connection to node, id = {:?}, address = {:?}", id, address);
935                return;
936            }
937
938            match TcpStream::connect(address) {
939                Ok(socket) => {
940                    trace!("{}: connecting to {:?}", id, address);
941                    (socket, address)
942                }
943                Err(e) => {
944                    self.node_db.write().note_failure(
945                        id, true, /* by_connection */
946                        true, /* trusted_only */
947                    );
948                    debug!(
949                        "{}: can't connect o address {:?} {:?}",
950                        id, address, e
951                    );
952                    return;
953                }
954            }
955        };
956
957        if let Err(e) = self.create_connection(socket, address, Some(id), io) {
958            self.node_db.write().note_failure(
959                id, true, /* by_connection */
960                true, /* trusted_only */
961            );
962            debug!("Can't create connection: {:?}", e);
963        }
964    }
965
966    pub fn get_peer_info(&self) -> Vec<PeerInfo> {
967        debug!("get_peer_info: enter");
968
969        let mut peers = Vec::with_capacity(self.sessions.count());
970        debug!("get_peer_info: {} sessions in total", peers.capacity());
971
972        for session in self.sessions.all() {
973            let sess = session.read();
974            if !sess.expired() {
975                peers.push(PeerInfo {
976                    id: sess.token(),
977                    nodeid: *sess.id().unwrap_or(&NodeId::default()),
978                    addr: sess.address(),
979                    protocols: sess.metadata.peer_protocols.clone(),
980                });
981            }
982        }
983
984        debug!("get_peer_info: leave, {} peers retrieved", peers.len());
985
986        peers
987    }
988
989    pub fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool> {
990        match self.sessions.get_by_id(node_id) {
991            Some(session) => {
992                let sess = session.read();
993                Some(sess.originated())
994            }
995            None => None,
996        }
997    }
998
999    fn start(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
1000        self.initialize_udp_protocols(io)?;
1001        io.register_stream(UDP_MESSAGE)?;
1002        io.register_stream(TCP_ACCEPT)?;
1003        Ok(())
1004    }
1005
1006    // This function can be invoked in either of 2 cases:
1007    // 1. proactively connect to a peer;
1008    // 2. passively connected by a peer;
1009    fn create_connection(
1010        &self, socket: TcpStream, address: SocketAddr, id: Option<&NodeId>,
1011        io: &IoContext<NetworkIoMessage>,
1012    ) -> Result<(), Error> {
1013        match self.sessions.create(socket, address, id, io, self) {
1014            Ok(token) => {
1015                debug!("new session created, token = {}, address = {:?}, id = {:?}", token, address, id);
1016                if let Some(id) = id {
1017                    // This is an outgoing connection.
1018                    // Outgoing connection must pick node from trusted node
1019                    // table
1020                    self.node_db.write().note_success(id, Some(token), true);
1021                }
1022                io.register_stream(token).map(|_| ()).map_err(Into::into)
1023            }
1024            Err(reason) => {
1025                debug!("failed to create session, reason = {}, address = {:?}, id = {:?}", reason, address, id);
1026                Ok(())
1027            }
1028        }
1029    }
1030
1031    fn connection_closed(
1032        &self, stream: StreamToken, io: &IoContext<NetworkIoMessage>,
1033    ) {
1034        trace!("Connection closed: {}", stream);
1035        self.kill_connection_by_token(
1036            stream,
1037            io,
1038            true,
1039            Some(UpdateNodeOperation::Failure),
1040            "connection closed", // reason
1041        );
1042    }
1043
1044    fn session_readable(
1045        &self, stream: StreamToken, io: &IoContext<NetworkIoMessage>,
1046    ) {
1047        let mut handshake_done = false;
1048
1049        let mut messages: Vec<(ProtocolId, Vec<u8>)> = Vec::new();
1050        let mut kill = false;
1051        let mut token_to_disconnect = None;
1052
1053        let session = if let Some(session) = self.sessions.get(stream) {
1054            session
1055        } else {
1056            return;
1057        };
1058
1059        // We check dropped_nodes first to make sure we stop processing
1060        // communications from any dropped peers
1061        let mut session_node_id = session.read().id().copied();
1062        let mut pos_public_key_opt = None;
1063        if let Some(node_id) = session_node_id {
1064            let to_drop = self.dropped_nodes.read().contains(&node_id);
1065            self.drop_peers(io);
1066            if to_drop {
1067                return;
1068            }
1069        }
1070
1071        loop {
1072            let mut sess = session.write();
1073            let data = sess.readable(io, self);
1074            let session_data = match data {
1075                Ok(session_data) => session_data,
1076                Err(e) => {
1077                    debug!("Failed to read session data, error = {:?}, session = {:?}", e, *sess);
1078                    kill = true;
1079                    break;
1080                }
1081            };
1082
1083            if session_data.token_to_disconnect.is_some() {
1084                debug!(
1085                    "session_readable: set token_to_disconnect to {:?}",
1086                    session_data.token_to_disconnect
1087                );
1088                token_to_disconnect = session_data.token_to_disconnect;
1089            }
1090
1091            match session_data.session_data {
1092                SessionData::Ready { pos_public_key } => {
1093                    debug!(
1094                        "receive Ready with pos_public_key={:?} account={:?}",
1095                        pos_public_key,
1096                        pos_public_key
1097                            .as_ref()
1098                            .map(|k| from_consensus_public_key(&k.0, &k.1)),
1099                    );
1100                    handshake_done = true;
1101                    session_node_id = Some(*sess.id().unwrap());
1102                    pos_public_key_opt = pos_public_key;
1103                }
1104                SessionData::Message { data, protocol } => {
1105                    drop(sess);
1106                    match self.handlers.read().get(&protocol) {
1107                        None => warn!(
1108                            "No handler found for protocol: {:?}",
1109                            protocol
1110                        ),
1111                        Some(_) => {
1112                            messages.push((protocol, data));
1113                        }
1114                    }
1115                }
1116                SessionData::None => break,
1117                SessionData::Continue => {}
1118            }
1119        }
1120
1121        if let Some(token_to_disconnect) = token_to_disconnect {
1122            // Sim-dial dedup, not a peer failure — `op = None`, no
1123            // note_failure.
1124            self.kill_connection_by_token(
1125                token_to_disconnect.0,
1126                io,
1127                true,
1128                None,
1129                token_to_disconnect.1.as_str(), // reason
1130            );
1131        }
1132
1133        if kill {
1134            self.kill_connection_by_token(
1135                stream,
1136                io,
1137                true,
1138                Some(UpdateNodeOperation::Failure),
1139                "session readable error", // reason
1140            );
1141            return;
1142        }
1143
1144        // Handshake is just finished, first process the outcome from the
1145        // handshake.
1146        if handshake_done {
1147            let handlers = self.handlers.read();
1148            // Clone the data to prevent deadlock, because handler may
1149            // close the connection within the on_peer_connected
1150            // callback.
1151            let session_metadata = session.read().metadata.clone();
1152            for protocol in &session_metadata.peer_protocols {
1153                if let Some(handler) = handlers.get(&protocol.protocol).cloned()
1154                {
1155                    debug!("session handshaked, token = {}", stream);
1156                    let network_context = NetworkContext::new(
1157                        io,
1158                        handler,
1159                        protocol.protocol,
1160                        self,
1161                    );
1162                    network_context.protocol_handler().on_peer_connected(
1163                        &network_context,
1164                        session_node_id.as_ref().unwrap(),
1165                        protocol.version,
1166                        pos_public_key_opt.clone(),
1167                    );
1168                }
1169            }
1170        }
1171
1172        for (protocol, data) in messages {
1173            if let Err(e) = io.handle(
1174                stream,
1175                0, /* We only have one handler for the execution
1176                    * event_loop,
1177                    * so the handler_id is always 0 */
1178                NetworkIoMessage::HandleProtocolMessage {
1179                    protocol,
1180                    peer: stream,
1181                    node_id: *session_node_id.as_ref().unwrap(),
1182                    data,
1183                },
1184            ) {
1185                warn!("Error occurs, discard protocol message: err={}", e);
1186            }
1187        }
1188    }
1189
1190    fn session_writable(
1191        &self, stream: StreamToken, io: &IoContext<NetworkIoMessage>,
1192    ) {
1193        if let Some(session) = self.sessions.get(stream) {
1194            {
1195                // We check dropped_nodes first to make sure we stop processing
1196                // communications from any dropped peers
1197                let sess_id = session.read().id().copied();
1198                if let Some(node_id) = sess_id {
1199                    let to_drop = self.dropped_nodes.read().contains(&node_id);
1200                    self.drop_peers(io);
1201                    if to_drop {
1202                        return;
1203                    }
1204                }
1205            }
1206            let mut sess = session.write();
1207            if let Err(e) = sess.writable(io) {
1208                trace!("{}: Session write error: {:?}", stream, e);
1209            }
1210            if sess.done() {
1211                io.deregister_stream(stream).unwrap_or_else(|e| {
1212                    debug!("Error deregistering stream: {:?}", e)
1213                });
1214            }
1215        }
1216    }
1217
1218    fn accept(&self, io: &IoContext<NetworkIoMessage>) {
1219        trace!("Accepting incoming connection");
1220        loop {
1221            let (socket, address) = match self.tcp_listener.lock().accept() {
1222                Ok((sock, addr)) => (sock, addr),
1223                Err(e) => {
1224                    if e.kind() != io::ErrorKind::WouldBlock {
1225                        debug!("Error accepting connection: {:?}", e);
1226                    }
1227                    break;
1228                }
1229            };
1230
1231            if let Err(e) = self.create_connection(socket, address, None, io) {
1232                debug!("Can't accept connection: {:?}", e);
1233            }
1234        }
1235    }
1236
1237    fn kill_connection_by_token(
1238        &self, token: StreamToken, io: &IoContext<NetworkIoMessage>,
1239        remote: bool, op: Option<UpdateNodeOperation>, reason: &str,
1240    ) {
1241        let mut to_disconnect: Vec<ProtocolId> = Vec::new();
1242        let mut failure_id = None;
1243        let mut deregister = false;
1244
1245        if let FIRST_SESSION..=LAST_SESSION = token {
1246            if let Some(session) = self.sessions.get(token) {
1247                let mut sess = session.write();
1248                if !sess.expired() {
1249                    // Removed before the disconnect packets (not just before
1250                    // `set_expired`) to minimize the stale-entry window.
1251                    if let Some(id) = sess.id() {
1252                        self.sessions.remove_node_id_entry(id, token);
1253                    }
1254                    if sess.is_ready() {
1255                        for (p, _) in self.handlers.read().iter() {
1256                            if sess.have_capability(*p) {
1257                                to_disconnect.push(*p);
1258                                sess.send_disconnect(DisconnectReason::Custom(
1259                                    reason.into(),
1260                                ));
1261                            }
1262                        }
1263                    }
1264                    sess.set_expired();
1265                }
1266                deregister = remote || sess.done();
1267                failure_id = sess.id().cloned();
1268                debug!(
1269                    "kill connection by token, deregister = {}, reason = {:?}, session = {:?}, op = {:?}",
1270                    deregister, reason, *sess, op
1271                );
1272            }
1273        }
1274
1275        if let Some(id) = failure_id {
1276            if remote {
1277                if let Some(op) = op {
1278                    match op {
1279                        UpdateNodeOperation::Failure => {
1280                            self.node_db.write().note_failure(
1281                                &id, true,  /* by_connection */
1282                                false, /* trusted_only */
1283                            );
1284                        }
1285                        UpdateNodeOperation::Demotion => {
1286                            let mut node_db = self.node_db.write();
1287                            node_db.demote(&id);
1288                            node_db.note_demoted(
1289                                &id, true, /* by_connection */
1290                            );
1291                        }
1292                        UpdateNodeOperation::Remove => {
1293                            self.node_db.write().set_blacklisted(&id);
1294                        }
1295                    }
1296                }
1297            }
1298
1299            for p in to_disconnect {
1300                if let Some(h) = self.handlers.read().get(&p).cloned() {
1301                    let network_context = NetworkContext::new(io, h, p, self);
1302                    network_context
1303                        .protocol_handler()
1304                        .on_peer_disconnected(&network_context, &id);
1305                }
1306            }
1307        }
1308
1309        if deregister {
1310            io.deregister_stream(token).unwrap_or_else(|e| {
1311                debug!("Error deregistering stream {:?}", e);
1312            });
1313        }
1314    }
1315
1316    fn kill_connection(
1317        &self, node_id: &NodeId, io: &IoContext<NetworkIoMessage>,
1318        remote: bool, op: Option<UpdateNodeOperation>, reason: &str,
1319    ) -> bool {
1320        let mut to_disconnect: Vec<ProtocolId> = Vec::new();
1321        let mut deregister = false;
1322        let mut token = 0;
1323
1324        if let Some(session) = self.sessions.get_by_id(node_id) {
1325            let mut sess = session.write();
1326            if !sess.expired() {
1327                // Removed before the disconnect packets (not just before
1328                // `set_expired`) to minimize the stale-entry window.
1329                self.sessions.remove_node_id_entry(node_id, sess.token());
1330                if sess.is_ready() {
1331                    for (p, _) in self.handlers.read().iter() {
1332                        if sess.have_capability(*p) {
1333                            to_disconnect.push(*p);
1334                            sess.send_disconnect(DisconnectReason::Custom(
1335                                reason.into(),
1336                            ));
1337                        }
1338                    }
1339                }
1340                sess.set_expired();
1341            }
1342            deregister = remote || sess.done();
1343            token = sess.token();
1344            assert_eq!(sess.id().unwrap().clone(), node_id.clone());
1345            debug!(
1346                "kill connection, deregister = {}, reason = {:?}, session = {:?}, op = {:?}",
1347                deregister, reason, *sess, op
1348            );
1349
1350            if remote {
1351                if let Some(op) = op {
1352                    match op {
1353                        UpdateNodeOperation::Failure => {
1354                            self.node_db.write().note_failure(
1355                                node_id, true,  /* by_connection */
1356                                false, /* trusted_only */
1357                            );
1358                        }
1359                        UpdateNodeOperation::Demotion => {
1360                            let mut node_db = self.node_db.write();
1361                            node_db.demote(node_id);
1362                            node_db.note_demoted(
1363                                node_id, true, /* by_connection */
1364                            );
1365                        }
1366                        UpdateNodeOperation::Remove => {
1367                            self.node_db.write().set_blacklisted(node_id);
1368                        }
1369                    }
1370                }
1371            }
1372        }
1373
1374        for p in to_disconnect {
1375            if let Some(h) = self.handlers.read().get(&p).cloned() {
1376                let network_context = NetworkContext::new(io, h, p, self);
1377                network_context
1378                    .protocol_handler()
1379                    .on_peer_disconnected(&network_context, node_id);
1380            }
1381        }
1382
1383        if deregister {
1384            io.deregister_stream(token).unwrap_or_else(|e| {
1385                debug!("Error deregistering stream {:?}", e);
1386            });
1387        }
1388
1389        deregister
1390    }
1391
1392    pub fn with_context<F, R>(
1393        &self, handler: Arc<dyn NetworkProtocolHandler + Sync>,
1394        protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F,
1395    ) -> R
1396    where
1397        F: FnOnce(&NetworkContext) -> R,
1398    {
1399        action(&NetworkContext::new(io, handler, protocol, self))
1400    }
1401
1402    fn udp_readable(&self, io: &IoContext<NetworkIoMessage>) {
1403        let udp_socket = self.udp_socket.lock();
1404        let writable;
1405        {
1406            let udp_channel = self.udp_channel.read();
1407            writable = udp_channel.any_sends_queued();
1408        }
1409
1410        let mut buf = [0u8; MAX_DATAGRAM_SIZE];
1411        match udp_socket.recv_from(&mut buf).map_non_block() {
1412            Ok(Some((len, address))) => self
1413                .on_udp_packet(&buf[0..len], address)
1414                .unwrap_or_else(|e| {
1415                    debug!("Error processing UDP packet: {:?}", e);
1416                }),
1417            Ok(_) => {}
1418            Err(e) => {
1419                debug!("Error reading UDP socket: {:?}", e);
1420            }
1421        };
1422
1423        let new_writable;
1424        {
1425            let udp_channel = self.udp_channel.read();
1426            new_writable = udp_channel.any_sends_queued();
1427        }
1428
1429        // Check whether on_udp_packet produces new to-be-sent messages.
1430        // If it does, we might need to change monitor interest to All if
1431        // it is only Readable.
1432        if writable != new_writable {
1433            io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1434                debug!("Error updating UDP registration: {:?}", e)
1435            });
1436        }
1437    }
1438
1439    fn udp_writable(&self, io: &IoContext<NetworkIoMessage>) {
1440        let udp_socket = self.udp_socket.lock();
1441        let mut udp_channel = self.udp_channel.write();
1442        while let Some(data) = udp_channel.dequeue_send() {
1443            match udp_socket
1444                .send_to(&data.payload, data.address)
1445                .map_non_block()
1446            {
1447                Ok(Some(size)) if size == data.payload.len() => {}
1448                Ok(Some(_)) => {
1449                    warn!("UDP sent incomplete datagram");
1450                }
1451                Ok(None) => {
1452                    udp_channel.requeue_send(data);
1453                    return;
1454                }
1455                Err(e) => {
1456                    debug!(
1457                        "UDP send error: {:?}, address: {:?}",
1458                        e, &data.address
1459                    );
1460                    return;
1461                }
1462            }
1463        }
1464        // look at whether the monitor interest can be set as Readable.
1465        io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1466            debug!("Error updating UDP registration: {:?}", e)
1467        });
1468    }
1469
1470    fn on_udp_packet(
1471        &self, packet: &[u8], from: SocketAddr,
1472    ) -> Result<(), Error> {
1473        if packet.is_empty() {
1474            return Ok(());
1475        }
1476
1477        let res = match packet[0] {
1478            UDP_PROTOCOL_DISCOVERY => {
1479                if let Some(discovery) = self.discovery.lock().as_mut() {
1480                    discovery.on_packet(
1481                        &UdpIoContext::new(&self.udp_channel, &self.node_db),
1482                        &packet[1..],
1483                        from,
1484                    )?;
1485                    Ok(())
1486                } else {
1487                    warn!("Discovery is not ready. Drop the message!");
1488                    Ok(())
1489                }
1490            }
1491            _ => {
1492                warn!("Unknown UDP protocol. Simply drops the message!");
1493                Ok(())
1494            }
1495        };
1496        res
1497    }
1498
1499    fn on_check_sessions(&self, io: &IoContext<NetworkIoMessage>) {
1500        let mut disconnect_peers = Vec::new();
1501
1502        for session in self.sessions.all() {
1503            if let Some(sess) = session.try_read() {
1504                if let (true, op) = sess.check_timeout() {
1505                    disconnect_peers.push((sess.token(), op));
1506                }
1507            }
1508        }
1509
1510        for (token, op) in disconnect_peers {
1511            self.kill_connection_by_token(
1512                token,
1513                io,
1514                true,
1515                op,
1516                "session timeout", // reason
1517            );
1518        }
1519    }
1520}
1521
1522impl IoHandler<NetworkIoMessage> for NetworkServiceInner {
1523    fn initialize(&self, io: &IoContext<NetworkIoMessage>) {
1524        io.register_timer(HOUSEKEEPING, self.config.housekeeping_timeout)
1525            .expect("Error registering housekeeping timer");
1526        io.message(NetworkIoMessage::Start).unwrap_or_else(|e| {
1527            warn!("Error sending IO notification: {:?}", e)
1528        });
1529    }
1530
1531    fn stream_hup(
1532        &self, io: &IoContext<NetworkIoMessage>, stream: StreamToken,
1533    ) {
1534        trace!("Hup: {}", stream);
1535        match stream {
1536            FIRST_SESSION..=LAST_SESSION => self.connection_closed(stream, io),
1537            _ => warn!("Unexpected hup"),
1538        }
1539    }
1540
1541    fn stream_readable(
1542        &self, io: &IoContext<NetworkIoMessage>, stream: StreamToken,
1543    ) {
1544        match stream {
1545            FIRST_SESSION..=LAST_SESSION => self.session_readable(stream, io),
1546            TCP_ACCEPT => self.accept(io),
1547            UDP_MESSAGE => self.udp_readable(io),
1548            _ => panic!("Received unknown readable token"),
1549        }
1550    }
1551
1552    fn stream_writable(
1553        &self, io: &IoContext<NetworkIoMessage>, stream: StreamToken,
1554    ) {
1555        match stream {
1556            FIRST_SESSION..=LAST_SESSION => self.session_writable(stream, io),
1557            UDP_MESSAGE => self.udp_writable(io),
1558            _ => panic!("Received unknown writable token"),
1559        }
1560    }
1561
1562    fn timeout(&self, io: &IoContext<NetworkIoMessage>, token: TimerToken) {
1563        match token {
1564            FIRST_SESSION..=LAST_SESSION => {
1565                debug!("Connection timeout: {}", token);
1566                self.kill_connection_by_token(
1567                    token,
1568                    io,
1569                    true,
1570                    Some(UpdateNodeOperation::Failure),
1571                    "handshake timeout", // reason
1572                );
1573            }
1574            HOUSEKEEPING => self.on_housekeeping(io),
1575            DISCOVERY_REFRESH => {
1576                // Run the _slow_ discovery if enough peers are connected
1577                let disc_general = self.has_enough_outgoing_peers(
1578                    None,
1579                    self.config.max_outgoing_peers,
1580                );
1581                let disc_archive = self.has_enough_outgoing_peers(
1582                    Some((NODE_TAG_NODE_TYPE, NODE_TAG_ARCHIVE)),
1583                    self.config.max_outgoing_peers_archive,
1584                );
1585                if disc_general || disc_archive {
1586                    if let Some(d) = self.discovery.lock().as_mut() {
1587                        d.disc_option.general = disc_general;
1588                        d.disc_option.archive = disc_archive;
1589                        d.refresh();
1590                    }
1591                    io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1592                        debug!("Error updating discovery registration: {:?}", e)
1593                    });
1594                }
1595            }
1596            FAST_DISCOVERY_REFRESH => {
1597                // Run the fast discovery if not enough peers are connected
1598                let disc_general = !self.has_enough_outgoing_peers(
1599                    None,
1600                    self.config.max_outgoing_peers,
1601                );
1602                let disc_archive = !self.has_enough_outgoing_peers(
1603                    Some((NODE_TAG_NODE_TYPE, NODE_TAG_ARCHIVE)),
1604                    self.config.max_outgoing_peers_archive,
1605                );
1606                if disc_general || disc_archive {
1607                    if let Some(d) = self.discovery.lock().as_mut() {
1608                        d.disc_option.general = disc_general;
1609                        d.disc_option.archive = disc_archive;
1610                        d.refresh();
1611                    }
1612                    io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1613                        debug!("Error updating discovery registration: {:?}", e)
1614                    });
1615                }
1616            }
1617            DISCOVERY_ROUND => {
1618                if let Some(d) = self.discovery.lock().as_mut() {
1619                    d.round(&UdpIoContext::new(
1620                        &self.udp_channel,
1621                        &self.node_db,
1622                    ))
1623                }
1624                io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1625                    debug!("Error updating discovery registration: {:?}", e)
1626                });
1627            }
1628            NODE_TABLE => {
1629                trace!("Refreshing node table");
1630                self.try_promote_untrusted();
1631                self.node_db.write().save();
1632            }
1633            CHECK_SESSIONS => self.on_check_sessions(io),
1634            SEND_DELAYED_MESSAGES => {
1635                if let Some(ref queue) = self.delayed_queue {
1636                    queue.send_delayed_messages(self);
1637                }
1638            }
1639            _ => match self.timers.read().get(&token).cloned() {
1640                Some(timer) => {
1641                    match self.handlers.read().get(&timer.protocol).cloned() {
1642                        None => warn!(
1643                            "No handler found for protocol: {:?}",
1644                            timer.protocol
1645                        ),
1646                        Some(h) => {
1647                            let network_context = NetworkContext::new(
1648                                io,
1649                                h,
1650                                timer.protocol,
1651                                self,
1652                            );
1653                            network_context
1654                                .protocol_handler()
1655                                .on_timeout(&network_context, timer.token);
1656                        }
1657                    }
1658                }
1659                None => {
1660                    warn!("Unknown timer token: {}", token);
1661                } // timer is not registered through us
1662            },
1663        }
1664    }
1665
1666    fn message(
1667        &self, io: &IoContext<NetworkIoMessage>, message: &NetworkIoMessage,
1668    ) {
1669        match message {
1670            NetworkIoMessage::Start => self.start(io).unwrap_or_else(|e| {
1671                warn!("Error starting network service: {:?}", e)
1672            }),
1673            NetworkIoMessage::AddHandler {
1674                handler,
1675                protocol,
1676                version,
1677                callback,
1678            } => {
1679                let h = handler.clone();
1680                let network_context =
1681                    NetworkContext::new(io, h, *protocol, self);
1682                network_context
1683                    .protocol_handler()
1684                    .initialize(&network_context);
1685                self.handlers.write().insert(*protocol, handler.clone());
1686                {
1687                    let protocols = &mut *self.metadata.protocols.write();
1688                    for protocol_info in protocols.iter() {
1689                        assert_ne!(
1690                            protocol, &protocol_info.protocol,
1691                            "Do not register same protocol twice"
1692                        );
1693                    }
1694                    protocols.push(ProtocolInfo {
1695                        protocol: *protocol,
1696                        version: *version,
1697                    });
1698                    self.metadata.minimum_peer_protocol_version.write().push(
1699                        ProtocolInfo {
1700                            protocol: *protocol,
1701                            version: handler.minimum_supported_version(),
1702                        },
1703                    );
1704                }
1705                info!(
1706                    "Protocol {:?} version {:?} registered.",
1707                    protocol, version
1708                );
1709                callback.send(()).expect("protocol register error");
1710            }
1711            NetworkIoMessage::AddTimer {
1712                ref protocol,
1713                ref delay,
1714                ref token,
1715            } => {
1716                let handler_token = {
1717                    let mut timer_counter = self.timer_counter.write();
1718                    let counter = &mut *timer_counter;
1719                    let handler_token = *counter;
1720                    *counter += 1;
1721                    handler_token
1722                };
1723                self.timers.write().insert(
1724                    handler_token,
1725                    ProtocolTimer {
1726                        protocol: *protocol,
1727                        token: *token,
1728                    },
1729                );
1730                io.register_timer(handler_token, *delay)
1731                    .unwrap_or_else(|e| {
1732                        debug!("Error registering timer {}: {:?}", token, e)
1733                    });
1734            }
1735            NetworkIoMessage::DispatchWork {
1736                ref protocol,
1737                ref work_type,
1738            } => {
1739                if let Some(handler) =
1740                    self.handlers.read().get(protocol).cloned()
1741                {
1742                    let network_context =
1743                        NetworkContext::new(io, handler, *protocol, self);
1744                    network_context
1745                        .protocol_handler()
1746                        .on_work_dispatch(&network_context, *work_type);
1747                } else {
1748                    warn!("Work is dispatched to unknown handler");
1749                }
1750            }
1751            NetworkIoMessage::HandleProtocolMessage {
1752                ref protocol,
1753                peer: _,
1754                ref node_id,
1755                ref data,
1756            } => {
1757                debug!("Receive ProtocolMsg {:?}", protocol);
1758                if let Some(handler) =
1759                    self.handlers.read().get(protocol).cloned()
1760                {
1761                    let network_context =
1762                        NetworkContext::new(io, handler, *protocol, self);
1763                    network_context.protocol_handler().on_message(
1764                        &network_context,
1765                        node_id,
1766                        data,
1767                    );
1768                } else {
1769                    warn!("Work is handled by unknown handler");
1770                }
1771            }
1772        }
1773    }
1774
1775    fn register_stream(
1776        &self, stream: StreamToken, reg: Token, poll_registry: &Registry,
1777    ) {
1778        match stream {
1779            FIRST_SESSION..=LAST_SESSION => {
1780                if let Some(session) = self.sessions.get(stream) {
1781                    session
1782                        .write()
1783                        .register_socket(reg, poll_registry)
1784                        .expect("Error registering socket");
1785                }
1786            }
1787            TCP_ACCEPT => {
1788                poll_registry
1789                    .register(
1790                        &mut *self.tcp_listener.lock(),
1791                        Token(TCP_ACCEPT),
1792                        Interest::READABLE | Interest::WRITABLE,
1793                    )
1794                    .expect("Error registering stream");
1795            }
1796            UDP_MESSAGE => {
1797                poll_registry
1798                    .register(
1799                        &mut *self.udp_socket.lock(),
1800                        reg,
1801                        Interest::READABLE | Interest::WRITABLE,
1802                    )
1803                    .expect("Error registering UDP socket");
1804            }
1805            _ => warn!("Unexpected stream registeration"),
1806        }
1807    }
1808
1809    fn deregister_stream(&self, stream: StreamToken, poll_registry: &Registry) {
1810        match stream {
1811            FIRST_SESSION..=LAST_SESSION => {
1812                if let Some(session) = self.sessions.get(stream) {
1813                    let mut sess = session.write();
1814                    if sess.expired() {
1815                        sess.deregister_socket(poll_registry)
1816                            .expect("Error deregistering socket");
1817                        if let Some(node_id) = sess.id() {
1818                            self.node_db.write().note_failure(
1819                                node_id, true,  /* by_connection */
1820                                false, /* trusted_only */
1821                            );
1822                        }
1823                        self.sessions.remove(&sess);
1824                        debug!("Removed session: {:?}", *sess);
1825                    }
1826                }
1827            }
1828            _ => warn!("Unexpected stream deregistration"),
1829        }
1830    }
1831
1832    fn update_stream(
1833        &self, stream: StreamToken, reg: Token, poll_registry: &Registry,
1834    ) {
1835        match stream {
1836            FIRST_SESSION..=LAST_SESSION => {
1837                if let Some(session) = self.sessions.get(stream) {
1838                    session
1839                        .write()
1840                        .update_socket(reg, poll_registry)
1841                        .expect("Error updating socket");
1842                }
1843            }
1844            TCP_ACCEPT => poll_registry
1845                .reregister(
1846                    &mut *self.tcp_listener.lock(),
1847                    Token(TCP_ACCEPT),
1848                    Interest::READABLE | Interest::WRITABLE,
1849                )
1850                .expect("Error reregistering stream"),
1851            UDP_MESSAGE => {
1852                let mut udp_socket = self.udp_socket.lock();
1853                let udp_channel = self.udp_channel.read();
1854
1855                let registration = if udp_channel.any_sends_queued() {
1856                    Interest::READABLE | Interest::WRITABLE
1857                } else {
1858                    Interest::READABLE
1859                };
1860                poll_registry
1861                    .reregister(&mut *udp_socket, reg, registration)
1862                    .expect("Error reregistering UDP socket");
1863            }
1864            _ => warn!("Unexpected stream update"),
1865        }
1866    }
1867}
1868
1869struct DelayMessageContext {
1870    ts: Instant,
1871    io: IoContext<NetworkIoMessage>,
1872    protocol: ProtocolId,
1873    session: SharedSession,
1874    peer: NodeId,
1875    msg: Vec<u8>,
1876    /// The minimum peer protocol version since which the message is supported.
1877    min_protocol_version: ProtocolVersion,
1878    priority: SendQueuePriority,
1879}
1880
1881impl DelayMessageContext {
1882    #[allow(clippy::too_many_arguments)]
1883    pub fn new(
1884        ts: Instant, io: IoContext<NetworkIoMessage>, protocol: ProtocolId,
1885        session: SharedSession, peer: NodeId, msg: Vec<u8>,
1886        min_protocol_version: ProtocolVersion, priority: SendQueuePriority,
1887    ) -> Self {
1888        DelayMessageContext {
1889            ts,
1890            io,
1891            protocol,
1892            session,
1893            peer,
1894            msg,
1895            min_protocol_version,
1896            priority,
1897        }
1898    }
1899}
1900
1901impl Ord for DelayMessageContext {
1902    fn cmp(&self, other: &Self) -> Ordering { other.ts.cmp(&self.ts) }
1903}
1904
1905impl PartialOrd for DelayMessageContext {
1906    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1907        Some(self.cmp(other))
1908    }
1909}
1910
1911impl Eq for DelayMessageContext {}
1912
1913impl PartialEq for DelayMessageContext {
1914    fn eq(&self, other: &Self) -> bool { self.ts == other.ts }
1915}
1916
1917/// Wrapper around network service.
1918pub struct NetworkContext<'a> {
1919    io: &'a IoContext<NetworkIoMessage>,
1920    handler: Arc<dyn NetworkProtocolHandler + Sync>,
1921    protocol: ProtocolId,
1922    min_supported_version: ProtocolVersion,
1923    network_service: &'a NetworkServiceInner,
1924}
1925
1926impl<'a> NetworkContext<'a> {
1927    pub fn new(
1928        io: &'a IoContext<NetworkIoMessage>,
1929        handler: Arc<dyn NetworkProtocolHandler + Sync>, protocol: ProtocolId,
1930        network_service: &'a NetworkServiceInner,
1931    ) -> NetworkContext<'a> {
1932        let min_supported_version = handler.minimum_supported_version();
1933        NetworkContext {
1934            io,
1935            handler,
1936            protocol,
1937            min_supported_version,
1938            network_service,
1939        }
1940    }
1941
1942    pub fn protocol_handler(&self) -> &dyn NetworkProtocolHandler {
1943        &*self.handler
1944    }
1945}
1946
1947impl<'a> NetworkContextTrait for NetworkContext<'a> {
1948    fn get_protocol(&self) -> ProtocolId { self.protocol }
1949
1950    fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool> {
1951        self.network_service.get_peer_connection_origin(node_id)
1952    }
1953
1954    fn is_peer_self(&self, node_id: &NodeId) -> bool {
1955        *node_id == *self.network_service.metadata.id()
1956    }
1957
1958    fn self_node_id(&self) -> NodeId { *self.network_service.metadata.id() }
1959
1960    /// Message is sent through this method.
1961    fn send(
1962        &self, node_id: &NodeId, msg: Vec<u8>,
1963        min_protocol_version: ProtocolVersion,
1964        version_valid_till: ProtocolVersion, priority: SendQueuePriority,
1965    ) -> Result<(), Error> {
1966        if version_valid_till < self.min_supported_version {
1967            bail!(Error::SendUnsupportedMessage {
1968                protocol: self.protocol,
1969                msg_id: parse_msg_id_leb128_2_bytes_at_most(&mut &*msg)
1970                    .expect("locally encoded message must contain a valid leb128 msg id"),
1971                peer_protocol_version: None,
1972                min_supported_version: Some(self.min_supported_version),
1973            });
1974        }
1975
1976        if *node_id == *self.network_service.metadata.id() {
1977            self.handler.send_local_message(self, msg);
1978            return Ok(());
1979        }
1980
1981        let session = self.network_service.sessions.get_by_id(node_id);
1982        trace!("Sending {} bytes to {}", msg.len(), node_id);
1983        if let Some(session) = session {
1984            let latency =
1985                self.network_service.delayed_queue.as_ref().and_then(|q| {
1986                    session
1987                        .write()
1988                        .metadata
1989                        .id
1990                        .and_then(|id| q.latencies.read().get(&id).copied())
1991                });
1992            match latency {
1993                Some(latency) => {
1994                    let q =
1995                        self.network_service.delayed_queue.as_ref().unwrap();
1996                    let mut queue = q.queue.lock();
1997                    let ts_to_send = Instant::now() + latency;
1998                    queue.push(DelayMessageContext::new(
1999                        ts_to_send,
2000                        self.io.clone(),
2001                        self.protocol,
2002                        session,
2003                        *node_id,
2004                        msg,
2005                        min_protocol_version,
2006                        priority,
2007                    ));
2008                    self.io.register_timer_once_nocancel(
2009                        SEND_DELAYED_MESSAGES,
2010                        latency,
2011                    )?;
2012                    trace!("register delayed timer delay:{:?} ts_to_send:{:?} length:{}", latency, ts_to_send, queue.len());
2013                }
2014                None => {
2015                    session.write().send_packet(
2016                        self.io,
2017                        Some(self.protocol),
2018                        min_protocol_version,
2019                        session::PACKET_USER,
2020                        msg,
2021                        priority,
2022                    )?;
2023                }
2024            }
2025            // TODO: Handle result from send_packet()
2026        }
2027        Ok(())
2028    }
2029
2030    fn disconnect_peer(
2031        &self, node_id: &NodeId, op: Option<UpdateNodeOperation>, reason: &str,
2032    ) {
2033        self.network_service
2034            .kill_connection(node_id, self.io, true, op, reason);
2035    }
2036
2037    fn register_timer(
2038        &self, token: TimerToken, delay: Duration,
2039    ) -> Result<(), Error> {
2040        self.io
2041            .message(NetworkIoMessage::AddTimer {
2042                token,
2043                delay,
2044                protocol: self.protocol,
2045            })
2046            .unwrap_or_else(|e| {
2047                warn!("Error sending network IO message: {:?}", e)
2048            });
2049        Ok(())
2050    }
2051
2052    fn dispatch_work(&self, work_type: HandlerWorkType) {
2053        self.io
2054            .message(NetworkIoMessage::DispatchWork {
2055                protocol: self.protocol,
2056                work_type,
2057            })
2058            .expect("Error sending network IO message");
2059    }
2060
2061    fn insert_peer_node_tag(&self, peer: NodeId, key: &str, value: &str) {
2062        self.network_service
2063            .node_db
2064            .write()
2065            .set_tag(peer, key, value);
2066    }
2067}
2068
2069fn save_key(path: &Path, key: &Secret) {
2070    let mut path_buf = PathBuf::from(path);
2071    if let Err(e) = fs::create_dir_all(path_buf.as_path()) {
2072        warn!("Error creating key directory: {:?}", e);
2073        return;
2074    };
2075    path_buf.push("key");
2076    let path = path_buf.as_path();
2077    let mut file = match fs::File::create(path) {
2078        Ok(file) => file,
2079        Err(e) => {
2080            warn!("Error creating key file: {:?}", e);
2081            return;
2082        }
2083    };
2084    if let Err(e) = restrict_permissions_owner(path, true, false) {
2085        warn!("Failed to modify permissions of the file ({})", e);
2086    }
2087    if let Err(e) = file.write(&key.to_hex().into_bytes()) {
2088        warn!("Error writing key file: {:?}", e);
2089    }
2090}
2091
2092fn load_key(path: &Path) -> Option<Secret> {
2093    let mut path_buf = PathBuf::from(path);
2094    path_buf.push("key");
2095    let mut file = match fs::File::open(path_buf.as_path()) {
2096        Ok(file) => file,
2097        Err(e) => {
2098            debug!("failed to open key file: {:?}", e);
2099            return None;
2100        }
2101    };
2102    let mut buf = String::new();
2103    match file.read_to_string(&mut buf) {
2104        Ok(_) => {}
2105        Err(e) => {
2106            warn!("Error reading key file: {:?}", e);
2107            return None;
2108        }
2109    }
2110    match Secret::from_str(&buf) {
2111        Ok(key) => Some(key),
2112        Err(e) => {
2113            warn!("Error parsing key file: {:?}", e);
2114            None
2115        }
2116    }
2117}
2118
2119pub fn load_pos_private_key(
2120    path: &Path,
2121) -> Option<(ConsensusPrivateKey, Option<ConsensusVRFPrivateKey>)> {
2122    let mut path_buf = PathBuf::from(path);
2123    path_buf.push("pos_key");
2124    let mut file = match fs::File::open(path_buf.as_path()) {
2125        Ok(file) => file,
2126        Err(e) => {
2127            debug!("failed to open key file: {:?}", e);
2128            return None;
2129        }
2130    };
2131    let mut buf = String::new();
2132    match file.read_to_string(&mut buf) {
2133        Ok(_) => {}
2134        Err(e) => {
2135            warn!("Error reading key file: {:?}", e);
2136            return None;
2137        }
2138    }
2139    let key_str: Vec<_> = buf.split(",").collect();
2140    let private_key = match ConsensusPrivateKey::from_encoded_string(key_str[0])
2141    {
2142        Ok(key) => Some(key),
2143        Err(e) => {
2144            warn!("Error parsing key file: {:?}", e);
2145            None
2146        }
2147    }?;
2148    if key_str.len() <= 1 {
2149        return Some((private_key, None));
2150    }
2151    let vrf_private_key =
2152        match ConsensusVRFPrivateKey::from_encoded_string(key_str[1]) {
2153            Ok(key) => Some(key),
2154            Err(e) => {
2155                warn!("Error parsing key file: {:?}", e);
2156                None
2157            }
2158        }?;
2159    Some((private_key, Some(vrf_private_key)))
2160}
2161
2162impl std::fmt::Display for ProtocolVersion {
2163    fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> {
2164        write!(f, "{}", self.0)
2165    }
2166}
2167
2168impl std::ops::Deref for ProtocolVersion {
2169    type Target = u8;
2170
2171    fn deref(&self) -> &Self::Target { &self.0 }
2172}
2173
2174impl Encodable for ProtocolVersion {
2175    fn rlp_append(&self, s: &mut RlpStream) { s.append_internal(&self.0); }
2176}
2177
2178impl Decodable for ProtocolVersion {
2179    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
2180        Ok(Self(rlp.as_val()?))
2181    }
2182}