1use std::{
6 cmp::{min, Ordering},
7 collections::{BinaryHeap, HashMap, HashSet, VecDeque},
8 fmt::Formatter,
9 fs,
10 io::{self, Read, Write},
11 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
12 path::{Path, PathBuf},
13 str::FromStr,
14 sync::{atomic::Ordering as AtomicOrdering, Arc},
15 time::{Duration, Instant},
16};
17
18use keccak_hash::keccak;
19use mio::{
20 net::{TcpListener, TcpStream, UdpSocket},
21 Interest, Registry, Token,
22};
23use parity_path::restrict_permissions_owner;
24use parking_lot::{Mutex, RwLock};
25use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
26
27use crate::keylib::{sign, Generator, KeyPair, Random, Secret};
28use cfx_addr::Network;
29use cfx_bytes::Bytes;
30use cfx_util_macros::bail;
31use diem_crypto::ValidCryptoMaterialStringExt;
32use diem_types::{
33 account_address::from_consensus_public_key,
34 validator_config::{
35 ConsensusPrivateKey, ConsensusPublicKey, ConsensusVRFPrivateKey,
36 ConsensusVRFPublicKey,
37 },
38};
39use log::{debug, info, trace, warn};
40use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
41use priority_send_queue::SendQueuePriority;
42use serde::{Deserialize, Serialize};
43
44use crate::{
45 discovery::Discovery,
46 handshake::BYPASS_CRYPTOGRAPHY,
47 iolib::{
48 IoContext, IoHandler, IoService, MapNonBlock, StreamToken, TimerToken,
49 },
50 ip_utils::{map_external_address, select_public_address},
51 node_database::NodeDatabase,
52 node_table::*,
53 parse_msg_id_leb128_2_bytes_at_most,
54 session::{self, Session, SessionData, SessionDetails},
55 session_manager::SessionManager,
56 Error, HandlerWorkType, IpFilter, NatType, NetworkConfiguration,
57 NetworkContext as NetworkContextTrait, NetworkIoMessage,
58 NetworkProtocolHandler, PeerInfo, ProtocolId, ProtocolInfo,
59 UpdateNodeOperation, NODE_TAG_ARCHIVE, NODE_TAG_NODE_TYPE,
60};
61
62use super::DisconnectReason;
63
64const MAX_SESSIONS: usize = 2048;
65
66const DEFAULT_PORT: u16 = 32323;
67
68const FIRST_SESSION: StreamToken = 0;
69const LAST_SESSION: StreamToken = FIRST_SESSION + MAX_SESSIONS - 1;
70const SYS_TIMER: TimerToken = LAST_SESSION + 1;
71const TCP_ACCEPT: StreamToken = SYS_TIMER + 1;
72const HOUSEKEEPING: TimerToken = SYS_TIMER + 2;
73const UDP_MESSAGE: StreamToken = SYS_TIMER + 3;
74const DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 4;
75const FAST_DISCOVERY_REFRESH: TimerToken = SYS_TIMER + 5;
76const DISCOVERY_ROUND: TimerToken = SYS_TIMER + 6;
77const NODE_TABLE: TimerToken = SYS_TIMER + 7;
78const SEND_DELAYED_MESSAGES: TimerToken = SYS_TIMER + 8;
79const CHECK_SESSIONS: TimerToken = SYS_TIMER + 9;
80const HANDLER_TIMER: TimerToken = LAST_SESSION + 256;
81const STOP_NET_POLL: TimerToken = HANDLER_TIMER + 1;
82
83pub const DEFAULT_HOUSEKEEPING_TIMEOUT: Duration = Duration::from_secs(2);
84pub const DEFAULT_DISCOVERY_REFRESH_TIMEOUT: Duration =
86 Duration::from_secs(120);
87pub const DEFAULT_FAST_DISCOVERY_REFRESH_TIMEOUT: Duration =
89 Duration::from_secs(10);
90pub const DEFAULT_DISCOVERY_ROUND_TIMEOUT: Duration =
92 Duration::from_millis(500);
93pub const DEFAULT_NODE_TABLE_TIMEOUT: Duration = Duration::from_secs(300);
96pub const DEFAULT_CONNECTION_LIFETIME_FOR_PROMOTION: Duration =
99 Duration::from_secs(3 * 24 * 3600);
100const DEFAULT_CHECK_SESSIONS_TIMEOUT: Duration = Duration::from_secs(10);
101
102#[derive(
103 Clone,
104 Copy,
105 Debug,
106 Default,
107 Eq,
108 PartialOrd,
109 PartialEq,
110 Serialize,
111 Deserialize,
112)]
113pub struct ProtocolVersion(pub u8);
114
115impl MallocSizeOf for ProtocolVersion {
116 fn size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { 0 }
117}
118
119pub const MAX_DATAGRAM_SIZE: usize = 1280;
120pub const UDP_PROTOCOL_DISCOVERY: u8 = 1;
121
122pub struct Datagram {
123 pub payload: Bytes,
124 pub address: SocketAddr,
125}
126
127pub struct UdpChannel {
128 pub send_queue: VecDeque<Datagram>,
129}
130
131impl UdpChannel {
132 pub fn new() -> UdpChannel {
133 UdpChannel {
134 send_queue: VecDeque::new(),
135 }
136 }
137
138 pub fn any_sends_queued(&self) -> bool { !self.send_queue.is_empty() }
139
140 pub fn dequeue_send(&mut self) -> Option<Datagram> {
141 self.send_queue.pop_front()
142 }
143
144 pub fn requeue_send(&mut self, datagram: Datagram) {
145 self.send_queue.push_front(datagram)
146 }
147}
148
149pub struct UdpIoContext<'a> {
150 pub channel: &'a RwLock<UdpChannel>,
151 pub node_db: &'a RwLock<NodeDatabase>,
152}
153
154impl<'a> UdpIoContext<'a> {
155 pub fn new(
156 channel: &'a RwLock<UdpChannel>, node_db: &'a RwLock<NodeDatabase>,
157 ) -> UdpIoContext<'a> {
158 UdpIoContext { channel, node_db }
159 }
160
161 pub fn send(&self, payload: Bytes, address: SocketAddr) {
162 self.channel
163 .write()
164 .send_queue
165 .push_back(Datagram { payload, address });
166 }
167}
168
169pub struct NetworkService {
174 pub io_service: Option<IoService<NetworkIoMessage>>,
175 pub inner: Option<Arc<NetworkServiceInner>>,
176 config: NetworkConfiguration,
177}
178
179impl NetworkService {
180 pub fn new(config: NetworkConfiguration) -> NetworkService {
181 NetworkService {
182 io_service: None,
183 inner: None,
184 config,
185 }
186 }
187
188 pub fn is_consortium(&self) -> bool { self.config.is_consortium }
189
190 pub fn get_network_type(&self) -> &Network {
191 self.config.get_network_type()
192 }
193
194 pub fn network_id(&self) -> u64 { self.config.id }
195
196 pub fn is_test_mode(&self) -> bool { self.config.test_mode }
197
198 pub fn initialize(
200 &mut self, pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
201 ) -> Result<(), Error> {
202 let raw_io_service =
203 IoService::<NetworkIoMessage>::start(STOP_NET_POLL)?;
204 self.io_service = Some(raw_io_service);
205
206 if self.inner.is_none() {
207 if self.config.test_mode {
208 BYPASS_CRYPTOGRAPHY.store(true, AtomicOrdering::Relaxed);
209 }
210
211 let inner = Arc::new(match self.config.test_mode {
212 true => NetworkServiceInner::new_with_latency(
213 &self.config,
214 pos_pub_keys,
215 )?,
216 false => NetworkServiceInner::new(&self.config, pos_pub_keys)?,
217 });
218 self.io_service
219 .as_ref()
220 .unwrap()
221 .register_handler(inner.clone())?;
222 self.inner = Some(inner);
223 }
224 Ok(())
225 }
226
227 pub fn start(&self) {
228 let handler = self.inner.as_ref().unwrap().clone();
229 self.io_service
230 .as_ref()
231 .expect("Already set")
232 .start_network_poll(handler, MAX_SESSIONS);
233 }
234
235 pub fn add_peer(&self, node: NodeEntry) -> Result<(), Error> {
237 if let Some(ref x) = self.inner {
238 x.node_db.write().insert_trusted(node);
239 Ok(())
240 } else {
241 Err("Network service not started yet!".into())
242 }
243 }
244
245 pub fn drop_peer(&self, node: NodeEntry) -> Result<(), Error> {
247 if let Some(ref x) = self.inner {
248 x.drop_node(node.id)
249 } else {
250 Err("Network service not started yet!".into())
251 }
252 }
253
254 pub fn local_addr(&self) -> Option<SocketAddr> {
256 self.inner.as_ref().map(|inner_ref| inner_ref.local_addr())
257 }
258
259 pub fn register_protocol(
261 &self, handler: Arc<dyn NetworkProtocolHandler + Sync>,
262 protocol: ProtocolId, version: ProtocolVersion,
263 ) -> Result<(), Error> {
264 let (tx, rx) = std::sync::mpsc::sync_channel(0);
265 self.io_service.as_ref().unwrap().send_message(
266 NetworkIoMessage::AddHandler {
267 handler,
268 protocol,
269 version,
270 callback: tx,
271 },
272 )?;
273 rx.recv().expect("protocol register error");
275 Ok(())
276 }
277
278 pub fn with_context<F, R>(
280 &self, handler: Arc<dyn NetworkProtocolHandler + Sync>,
281 protocol: ProtocolId, action: F,
282 ) -> Result<R, String>
283 where
284 F: FnOnce(&NetworkContext) -> R,
285 {
286 let io = IoContext::new(self.io_service.as_ref().unwrap().channel(), 0);
287 match self.inner {
288 Some(ref inner) => {
289 Ok(inner.with_context(handler, protocol, &io, action))
290 }
291 None => Err("Network service not started yet!".to_owned().into()),
292 }
293 }
294
295 pub fn get_peer_info(&self) -> Option<Vec<PeerInfo>> {
297 self.inner.as_ref().map(|inner| inner.get_peer_info())
298 }
299
300 pub fn sign_challenge(&self, challenge: Vec<u8>) -> Result<Vec<u8>, Error> {
302 let hash = keccak(challenge);
303 if let Some(ref inner) = self.inner {
304 let signature = match sign(inner.metadata.keys.secret(), &hash) {
305 Ok(s) => s,
306 Err(e) => {
307 warn!("Error signing hello packet");
308 return Err(Error::from(e));
309 }
310 };
311 Ok(signature[..].to_owned())
312 } else {
313 Err("Network service not started yet!".into())
314 }
315 }
316
317 pub fn net_key_pair(&self) -> Result<KeyPair, Error> {
318 if let Some(ref inner) = self.inner {
319 Ok(inner.metadata.keys.clone())
320 } else {
321 Err("Network service not started yet!".into())
322 }
323 }
324
325 pub fn pos_public_key(&self) -> Option<ConsensusPublicKey> {
326 if let Some(ref inner) = self.inner {
327 inner.sessions.self_pos_public_key.clone().map(|k| k.0)
328 } else {
329 None
330 }
331 }
332
333 pub fn add_latency(
334 &self, id: NodeId, latency_ms: f64,
335 ) -> Result<(), Error> {
336 if let Some(ref inner) = self.inner {
337 inner.add_latency(id, latency_ms)
338 } else {
339 Err("Network service not started yet!".into())
340 }
341 }
342
343 pub fn get_node(&self, id: &NodeId) -> Option<(bool, Node)> {
344 let inner = self.inner.as_ref()?;
345 let node_db = inner.node_db.read();
346 let (trusted, node) = node_db.get_with_trusty(id)?;
347 Some((trusted, node.clone()))
348 }
349
350 pub fn get_detailed_sessions(
351 &self, node_id: Option<NodeId>,
352 ) -> Option<Vec<SessionDetails>> {
353 let inner = self.inner.as_ref()?;
354 match node_id {
355 None => Some(
356 inner
357 .sessions
358 .all()
359 .iter()
360 .map(|s| s.read().details())
361 .collect(),
362 ),
363 Some(id) => {
364 let session = inner.sessions.get_by_id(&id)?;
365 let details = session.read().details();
366 Some(vec![details])
367 }
368 }
369 }
370
371 pub fn disconnect_node(
372 &self, id: &NodeId, op: Option<UpdateNodeOperation>,
373 ) -> bool {
374 if self.inner.is_none() || self.io_service.is_none() {
375 return false;
376 }
377 let inner = self.inner.as_ref().unwrap();
378 let io = IoContext::new(self.io_service.as_ref().unwrap().channel(), 0);
379 inner.kill_connection(
380 id,
381 &io,
382 true,
383 op,
384 "disconnect requested", );
386 true
387 }
388
389 pub fn save_node_db(&self) {
390 if let Some(inner) = &self.inner {
391 inner.node_db.write().save();
392 }
393 }
394}
395
396type SharedSession = Arc<RwLock<Session>>;
397
398pub struct HostMetadata {
399 pub network_id: u64,
400 pub keys: KeyPair,
402 pub protocols: RwLock<Vec<ProtocolInfo>>,
403 pub minimum_peer_protocol_version: RwLock<Vec<ProtocolInfo>>,
404 pub local_address: SocketAddr,
405 pub local_endpoint: NodeEndpoint,
407 pub public_endpoint: NodeEndpoint,
409}
410
411impl HostMetadata {
412 pub(crate) fn secret(&self) -> &Secret { self.keys.secret() }
413
414 pub(crate) fn id(&self) -> &NodeId { self.keys.public() }
415}
416
417#[derive(Copy, Clone)]
418struct ProtocolTimer {
419 pub protocol: ProtocolId,
420 pub token: TimerToken, }
422
423pub struct NetworkServiceInner {
426 pub sessions: SessionManager,
427 pub metadata: HostMetadata,
428 pub config: NetworkConfiguration,
429 udp_socket: Mutex<UdpSocket>,
430 tcp_listener: Mutex<TcpListener>,
431 udp_channel: RwLock<UdpChannel>,
432 discovery: Mutex<Option<Discovery>>,
433 handlers:
434 RwLock<HashMap<ProtocolId, Arc<dyn NetworkProtocolHandler + Sync>>>,
435 timers: RwLock<HashMap<TimerToken, ProtocolTimer>>,
436 timer_counter: RwLock<usize>,
437 pub node_db: RwLock<NodeDatabase>,
438 reserved_nodes: RwLock<HashSet<NodeId>>,
439 dropped_nodes: RwLock<HashSet<NodeId>>,
440
441 is_consortium: bool,
442
443 delayed_queue: Option<DelayedQueue>,
445}
446
447struct DelayedQueue {
448 queue: Mutex<BinaryHeap<DelayMessageContext>>,
449 latencies: RwLock<HashMap<NodeId, Duration>>,
450}
451
452impl DelayedQueue {
453 fn new() -> Self {
454 DelayedQueue {
455 queue: Mutex::new(BinaryHeap::new()),
456 latencies: RwLock::new(HashMap::new()),
457 }
458 }
459
460 fn send_delayed_messages(&self, network_service: &NetworkServiceInner) {
461 let context = self.queue.lock().pop().unwrap();
462 let r = context.session.write().send_packet(
463 &context.io,
464 Some(context.protocol),
465 context.min_protocol_version,
466 session::PACKET_USER,
467 context.msg,
468 context.priority,
469 );
470 match r {
471 Ok(_) => {}
472 Err(Error::Expired) => {
473 info!(
477 "Error sending delayed message to expired connection {:?}",
478 context.peer
479 );
480 }
481 Err(e) => {
482 info!(
483 "Error sending delayed message: peer={:?} err={:?}",
484 context.peer, e
485 );
486 network_service.kill_connection(
487 &context.peer,
488 &context.io,
489 true,
490 Some(UpdateNodeOperation::Failure),
491 "failed to send delayed message", );
493 }
494 };
495 }
496}
497
498impl NetworkServiceInner {
499 pub fn new(
500 config: &NetworkConfiguration,
501 pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
502 ) -> Result<NetworkServiceInner, Error> {
503 let mut listen_address = match config.listen_address {
504 None => SocketAddr::V4(SocketAddrV4::new(
505 Ipv4Addr::new(0, 0, 0, 0),
506 DEFAULT_PORT,
507 )),
508 Some(addr) => addr,
509 };
510
511 let keys = if let Some(ref secret) = config.use_secret {
512 KeyPair::from_secret(secret.clone())?
513 } else {
514 config
515 .config_path
516 .clone()
517 .and_then(|ref p| load_key(Path::new(&p)))
518 .map_or_else(
519 || {
520 let key = Random
521 .generate()
522 .expect("Error generating random key pair");
523 if let Some(path) = config.config_path.clone() {
524 save_key(Path::new(&path), key.secret());
525 }
526 key
527 },
528 |s| {
529 KeyPair::from_secret(s)
530 .expect("Error creating node secret key")
531 },
532 )
533 };
534 info!("Self pos public key: {:?}", pos_pub_keys);
535
536 info!("Self node id: {:?}", *keys.public());
537
538 let tcp_listener = TcpListener::bind(listen_address)?;
539 listen_address = SocketAddr::new(
540 listen_address.ip(),
541 tcp_listener.local_addr()?.port(),
542 );
543 debug!("Listening at {:?}", listen_address);
544 let udp_port = config.udp_port.unwrap_or_else(|| listen_address.port());
545 let local_endpoint = NodeEndpoint {
546 address: listen_address,
547 udp_port,
548 };
549 let mut udp_addr = local_endpoint.address;
550 udp_addr.set_port(local_endpoint.udp_port);
551 let udp_socket =
552 UdpSocket::bind(udp_addr).expect("Error binding UDP socket");
553
554 let public_address = config.public_address;
555 let public_endpoint = match public_address {
556 None => {
557 let public_address =
558 select_public_address(local_endpoint.address.port());
559 let public_endpoint = NodeEndpoint {
560 address: public_address,
561 udp_port: local_endpoint.udp_port,
562 };
563 if config.nat_enabled {
564 match map_external_address(&local_endpoint, &NatType::Any) {
565 Some(endpoint) => {
566 info!(
567 "NAT mapped to external address {}",
568 endpoint.address
569 );
570 endpoint
571 }
572 None => public_endpoint,
573 }
574 } else {
575 public_endpoint
576 }
577 }
578 Some(addr) => NodeEndpoint {
579 address: addr,
580 udp_port: local_endpoint.udp_port,
581 },
582 };
583
584 let allow_ips = config.ip_filter.clone();
585 let discovery = {
586 if config.discovery_enabled {
587 Some(Discovery::new(
588 &keys,
589 public_endpoint.clone(),
590 allow_ips,
591 config.discovery_config.clone(),
592 ))
593 } else {
594 None
595 }
596 };
597
598 let nodes_path = config.config_path.clone();
599
600 let mut inner = NetworkServiceInner {
601 metadata: HostMetadata {
602 network_id: config.id,
603 keys,
604 protocols: RwLock::new(Vec::new()),
605 minimum_peer_protocol_version: Default::default(),
606 local_address: listen_address,
607 local_endpoint,
608 public_endpoint,
609 },
610 config: config.clone(),
611 udp_channel: RwLock::new(UdpChannel::new()),
612 discovery: Mutex::new(discovery),
613 udp_socket: Mutex::new(udp_socket),
614 tcp_listener: Mutex::new(tcp_listener),
615 sessions: SessionManager::new(
616 FIRST_SESSION,
617 MAX_SESSIONS,
618 config.max_incoming_peers,
619 &config.session_ip_limit_config,
620 Some(pos_pub_keys),
621 ),
622 handlers: RwLock::new(HashMap::new()),
623 timers: RwLock::new(HashMap::new()),
624 timer_counter: RwLock::new(HANDLER_TIMER),
625 node_db: RwLock::new(NodeDatabase::new(
626 nodes_path,
627 config.subnet_quota,
628 )),
629 reserved_nodes: RwLock::new(HashSet::new()),
630 dropped_nodes: RwLock::new(HashSet::new()),
631 is_consortium: config.is_consortium,
632 delayed_queue: None,
633 };
634
635 for n in &config.boot_nodes {
636 inner.add_boot_node(n);
637 }
638
639 let reserved_nodes = config.reserved_nodes.clone();
640 for n in reserved_nodes {
641 if let Err(e) = inner.add_reserved_node(&n) {
642 debug!("Error parsing node id: {}: {:?}", n, e);
643 }
644 }
645
646 Ok(inner)
647 }
648
649 pub fn new_with_latency(
650 config: &NetworkConfiguration,
651 pos_pub_keys: (ConsensusPublicKey, ConsensusVRFPublicKey),
652 ) -> Result<NetworkServiceInner, Error> {
653 let r = NetworkServiceInner::new(config, pos_pub_keys);
654 if r.is_err() {
655 return r;
656 }
657 let mut inner = r.unwrap();
658 inner.delayed_queue = Some(DelayedQueue::new());
659 Ok(inner)
660 }
661
662 pub fn add_latency(
663 &self, peer: NodeId, latency_ms: f64,
664 ) -> Result<(), Error> {
665 match self.delayed_queue {
666 Some(ref queue) => {
667 let mut latencies = queue.latencies.write();
668 latencies
669 .insert(peer, Duration::from_millis(latency_ms as u64));
670 Ok(())
671 }
672 None => Err(
673 "conflux not in test mode, and does not support add_latency"
674 .into(),
675 ),
676 }
677 }
678
679 pub fn get_ip_filter(&self) -> &IpFilter { &self.config.ip_filter }
680
681 fn add_boot_node(&self, id: &str) {
682 match Node::from_str(id) {
683 Err(e) => {
684 debug!("Could not add node {}: {:?}", id, e);
685 }
686 Ok(n) => {
687 self.node_db.write().insert_trusted(NodeEntry {
688 id: n.id,
689 endpoint: n.endpoint,
690 });
691 }
692 }
693 }
694
695 fn add_reserved_node(&mut self, id: &str) -> Result<(), Error> {
696 let n = Node::from_str(id)?;
697 self.node_db.write().insert_trusted(NodeEntry {
698 id: n.id.clone(),
699 endpoint: n.endpoint.clone(),
700 });
701 self.reserved_nodes.write().insert(n.id);
702 Ok(())
703 }
704
705 fn initialize_udp_protocols(
706 &self, io: &IoContext<NetworkIoMessage>,
707 ) -> Result<(), Error> {
708 if let Some(discovery) = self.discovery.lock().as_mut() {
710 let allow_ips = self.config.ip_filter.clone();
711 let nodes = self.node_db.read().sample_trusted_nodes(
712 self.config.discovery_config.discover_node_count,
713 &allow_ips,
714 );
715 discovery.try_ping_nodes(
716 &UdpIoContext::new(&self.udp_channel, &self.node_db),
717 nodes,
718 );
719 io.register_timer(
720 FAST_DISCOVERY_REFRESH,
721 self.config.fast_discovery_refresh_timeout,
722 )?;
723 io.register_timer(
724 DISCOVERY_REFRESH,
725 self.config.discovery_refresh_timeout,
726 )?;
727 io.register_timer(
728 DISCOVERY_ROUND,
729 self.config.discovery_round_timeout,
730 )?;
731 }
732 io.register_timer(NODE_TABLE, self.config.node_table_timeout)?;
733 io.register_timer(CHECK_SESSIONS, DEFAULT_CHECK_SESSIONS_TIMEOUT)?;
734
735 Ok(())
736 }
737
738 fn try_promote_untrusted(&self) {
739 let mut incoming_ids: Vec<NodeId> = Vec::new();
741 for s in self.sessions.all() {
742 if let Some(s) = s.try_read() {
743 if s.is_ready() && !s.metadata.originated && !s.expired() {
744 if let Some(id) = s.metadata.id {
746 incoming_ids.push(id);
747 }
748 }
749 }
750 }
751
752 self.node_db.write().promote(
755 incoming_ids,
756 self.config.connection_lifetime_for_promotion,
757 );
758 }
759
760 pub fn local_addr(&self) -> SocketAddr { self.metadata.local_address }
761
762 fn drop_node(&self, local_id: NodeId) -> Result<(), Error> {
763 let removed_node = self.node_db.write().remove(&local_id);
764
765 if let Some(node) = removed_node {
766 assert_eq!(node.id, local_id);
767 if let Some(_stream_token) = node.stream_token {
768 let mut wd = self.dropped_nodes.write();
769 wd.insert(node.id);
770 }
771 }
772
773 Ok(())
774 }
775
776 fn has_enough_outgoing_peers(
777 &self, tag: Option<(&str, &str)>, max: usize,
778 ) -> bool {
779 let count = match tag {
780 Some((k, v)) => self.sessions.count_with_tag(&k.into(), &v.into()),
781 None => self.sessions.stat().1, };
783
784 count >= max
785 }
786
787 fn on_housekeeping(&self, io: &IoContext<NetworkIoMessage>) {
788 if self.is_consortium {
789 unimplemented!();
790 } else {
791 self.connect_peers(io);
792 }
793 self.drop_peers(io);
794 }
795
796 fn connect_peers(&self, io: &IoContext<NetworkIoMessage>) {
798 if self.metadata.minimum_peer_protocol_version.read().len() == 0 {
799 return;
802 }
803
804 let self_id = self.metadata.id().clone();
805
806 let sampled_archive_nodes = self.sample_archive_nodes();
807
808 let (handshake_count, egress_count, ingress_count) =
809 self.sessions.stat();
810 let samples;
811 {
812 let egress_attempt_count = if self.config.max_outgoing_peers
813 > egress_count + sampled_archive_nodes.len()
814 {
815 self.config.max_outgoing_peers
816 - egress_count
817 - sampled_archive_nodes.len()
818 } else {
819 0
820 };
821 samples = self.node_db.read().sample_trusted_node_ids(
822 egress_attempt_count as u32,
823 &self.config.ip_filter,
824 );
825 }
826
827 let reserved_nodes = self.reserved_nodes.read();
828 let nodes = reserved_nodes
830 .iter()
831 .cloned()
832 .chain(sampled_archive_nodes)
833 .chain(samples);
834
835 let max_handshakes_per_round = self.config.max_handshakes / 2;
836 let mut started: usize = 0;
837 for id in nodes
838 .filter(|id| !self.sessions.contains_node(id) && *id != self_id)
839 .take(min(
840 max_handshakes_per_round,
841 self.config.max_handshakes.saturating_sub(handshake_count),
842 ))
843 {
844 self.connect_peer(&id, io);
845 started += 1;
846 }
847 debug!(
848 "Connecting peers: {} sessions, {} pending + {} started",
849 egress_count + ingress_count,
850 handshake_count,
851 started
852 );
853 if egress_count + ingress_count == 0 {
854 warn!(
855 "No peers connected at this moment, {} pending + {} started",
856 handshake_count, started
857 );
858 }
859 }
860
861 fn sample_archive_nodes(&self) -> HashSet<NodeId> {
863 if self.config.max_outgoing_peers_archive == 0 {
864 return HashSet::new();
865 }
866
867 let key: String = NODE_TAG_NODE_TYPE.into();
868 let value: String = NODE_TAG_ARCHIVE.into();
869 let archive_sessions = self.sessions.count_with_tag(&key, &value);
870
871 if archive_sessions >= self.config.max_outgoing_peers_archive {
872 return HashSet::new();
873 }
874
875 self.node_db.read().sample_trusted_node_ids_with_tag(
876 (self.config.max_outgoing_peers_archive - archive_sessions) as u32,
877 &key,
878 &value,
879 )
880 }
881
882 fn drop_peers(&self, io: &IoContext<NetworkIoMessage>) {
884 {
885 if self.dropped_nodes.read().len() == 0 {
886 return;
887 }
888 }
889
890 let mut killed_nodes = HashSet::new();
891 let mut w = self.dropped_nodes.write();
892 for node_id in w.iter() {
893 if self.kill_connection(
894 node_id,
895 io,
896 true,
897 Some(UpdateNodeOperation::Failure),
898 "peer dropped in manual", ) {
900 killed_nodes.insert(*node_id);
901 }
902 }
903
904 for node_id in killed_nodes.iter() {
905 w.remove(node_id);
906 }
907 }
908
909 fn connect_peer(&self, id: &NodeId, io: &IoContext<NetworkIoMessage>) {
910 if self.sessions.contains_node(id) {
911 trace!("Abort connect. Node already connected");
912 return;
913 }
914
915 let (socket, address) = {
916 let address = {
917 if let Some(node) = self.node_db.read().get(id, true) {
919 node.endpoint.address
920 } else {
921 debug!("Abort connect. Node expired");
922 return;
923 }
924 };
925
926 if !self.sessions.is_ip_allowed(&address.ip()) {
927 debug!("cannot create outgoing connection to node, id = {:?}, address = {:?}", id, address);
928 return;
929 }
930
931 match TcpStream::connect(address) {
932 Ok(socket) => {
933 trace!("{}: connecting to {:?}", id, address);
934 (socket, address)
935 }
936 Err(e) => {
937 self.node_db.write().note_failure(
938 id, true, true, );
941 debug!(
942 "{}: can't connect o address {:?} {:?}",
943 id, address, e
944 );
945 return;
946 }
947 }
948 };
949
950 if let Err(e) = self.create_connection(socket, address, Some(id), io) {
951 self.node_db.write().note_failure(
952 id, true, true, );
955 debug!("Can't create connection: {:?}", e);
956 }
957 }
958
959 pub fn get_peer_info(&self) -> Vec<PeerInfo> {
960 debug!("get_peer_info: enter");
961
962 let mut peers = Vec::with_capacity(self.sessions.count());
963 debug!("get_peer_info: {} sessions in total", peers.capacity());
964
965 for session in self.sessions.all() {
966 let sess = session.read();
967 if !sess.expired() {
968 peers.push(PeerInfo {
969 id: sess.token(),
970 nodeid: sess.id().unwrap_or(&NodeId::default()).clone(),
971 addr: sess.address(),
972 protocols: sess.metadata.peer_protocols.clone(),
973 });
974 }
975 }
976
977 debug!("get_peer_info: leave, {} peers retrieved", peers.len());
978
979 peers
980 }
981
982 pub fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool> {
983 match self.sessions.get_by_id(node_id) {
984 Some(session) => {
985 let sess = session.read();
986 Some(sess.originated())
987 }
988 None => None,
989 }
990 }
991
992 fn start(&self, io: &IoContext<NetworkIoMessage>) -> Result<(), Error> {
993 self.initialize_udp_protocols(io)?;
994 io.register_stream(UDP_MESSAGE)?;
995 io.register_stream(TCP_ACCEPT)?;
996 Ok(())
997 }
998
999 fn create_connection(
1003 &self, socket: TcpStream, address: SocketAddr, id: Option<&NodeId>,
1004 io: &IoContext<NetworkIoMessage>,
1005 ) -> Result<(), Error> {
1006 match self.sessions.create(socket, address, id, io, self) {
1007 Ok(token) => {
1008 debug!("new session created, token = {}, address = {:?}, id = {:?}", token, address, id);
1009 if let Some(id) = id {
1010 self.node_db.write().note_success(id, Some(token), true);
1014 }
1015 io.register_stream(token).map(|_| ()).map_err(Into::into)
1016 }
1017 Err(reason) => {
1018 debug!("failed to create session, reason = {}, address = {:?}, id = {:?}", reason, address, id);
1019 Ok(())
1020 }
1021 }
1022 }
1023
1024 fn connection_closed(
1025 &self, stream: StreamToken, io: &IoContext<NetworkIoMessage>,
1026 ) {
1027 trace!("Connection closed: {}", stream);
1028 self.kill_connection_by_token(
1029 stream,
1030 io,
1031 true,
1032 Some(UpdateNodeOperation::Failure),
1033 "connection closed", );
1035 }
1036
1037 fn session_readable(
1038 &self, stream: StreamToken, io: &IoContext<NetworkIoMessage>,
1039 ) {
1040 let mut handshake_done = false;
1041
1042 let mut messages: Vec<(ProtocolId, Vec<u8>)> = Vec::new();
1043 let mut kill = false;
1044 let mut token_to_disconnect = None;
1045
1046 let session = if let Some(session) = self.sessions.get(stream) {
1047 session
1048 } else {
1049 return;
1050 };
1051
1052 let mut session_node_id = session.read().id().map(|id| *id);
1055 let mut pos_public_key_opt = None;
1056 if let Some(node_id) = session_node_id {
1057 let to_drop = self.dropped_nodes.read().contains(&node_id);
1058 self.drop_peers(io);
1059 if to_drop {
1060 return;
1061 }
1062 }
1063
1064 loop {
1065 let mut sess = session.write();
1066 let data = sess.readable(io, self);
1067 let session_data = match data {
1068 Ok(session_data) => session_data,
1069 Err(e) => {
1070 debug!("Failed to read session data, error = {:?}, session = {:?}", e, *sess);
1071 kill = true;
1072 break;
1073 }
1074 };
1075
1076 if session_data.token_to_disconnect.is_some() {
1077 debug!(
1078 "session_readable: set token_to_disconnect to {:?}",
1079 session_data.token_to_disconnect
1080 );
1081 token_to_disconnect = session_data.token_to_disconnect;
1082 }
1083
1084 match session_data.session_data {
1085 SessionData::Ready { pos_public_key } => {
1086 debug!(
1087 "receive Ready with pos_public_key={:?} account={:?}",
1088 pos_public_key,
1089 pos_public_key
1090 .as_ref()
1091 .map(|k| from_consensus_public_key(&k.0, &k.1)),
1092 );
1093 handshake_done = true;
1094 session_node_id = Some(*sess.id().unwrap());
1095 pos_public_key_opt = pos_public_key;
1096 }
1097 SessionData::Message { data, protocol } => {
1098 drop(sess);
1099 match self.handlers.read().get(&protocol) {
1100 None => warn!(
1101 "No handler found for protocol: {:?}",
1102 protocol
1103 ),
1104 Some(_) => {
1105 messages.push((protocol, data));
1106 }
1107 }
1108 }
1109 SessionData::None => break,
1110 SessionData::Continue => {}
1111 }
1112 }
1113
1114 if let Some(token_to_disconnect) = token_to_disconnect {
1115 self.kill_connection_by_token(
1116 token_to_disconnect.0,
1117 io,
1118 true,
1119 Some(UpdateNodeOperation::Failure),
1120 token_to_disconnect.1.as_str(), );
1122 }
1123
1124 if kill {
1125 self.kill_connection_by_token(
1126 stream,
1127 io,
1128 true,
1129 Some(UpdateNodeOperation::Failure),
1130 "session readable error", );
1132 return;
1133 }
1134
1135 if handshake_done {
1138 let handlers = self.handlers.read();
1139 let session_metadata = session.read().metadata.clone();
1143 for protocol in &session_metadata.peer_protocols {
1144 if let Some(handler) = handlers.get(&protocol.protocol).cloned()
1145 {
1146 debug!("session handshaked, token = {}", stream);
1147 let network_context = NetworkContext::new(
1148 io,
1149 handler,
1150 protocol.protocol,
1151 self,
1152 );
1153 network_context.protocol_handler().on_peer_connected(
1154 &network_context,
1155 session_node_id.as_ref().unwrap(),
1156 protocol.version,
1157 pos_public_key_opt.clone(),
1158 );
1159 }
1160 }
1161 }
1162
1163 for (protocol, data) in messages {
1164 if let Err(e) = io.handle(
1165 stream,
1166 0, NetworkIoMessage::HandleProtocolMessage {
1170 protocol,
1171 peer: stream,
1172 node_id: session_node_id.as_ref().unwrap().clone(),
1173 data,
1174 },
1175 ) {
1176 warn!("Error occurs, discard protocol message: err={}", e);
1177 }
1178 }
1179 }
1180
1181 fn session_writable(
1182 &self, stream: StreamToken, io: &IoContext<NetworkIoMessage>,
1183 ) {
1184 if let Some(session) = self.sessions.get(stream) {
1185 {
1186 let sess_id = session.read().id().map(|id| *id);
1189 if let Some(node_id) = sess_id {
1190 let to_drop = self.dropped_nodes.read().contains(&node_id);
1191 self.drop_peers(io);
1192 if to_drop {
1193 return;
1194 }
1195 }
1196 }
1197 let mut sess = session.write();
1198 if let Err(e) = sess.writable(io) {
1199 trace!("{}: Session write error: {:?}", stream, e);
1200 }
1201 if sess.done() {
1202 io.deregister_stream(stream).unwrap_or_else(|e| {
1203 debug!("Error deregistering stream: {:?}", e)
1204 });
1205 }
1206 }
1207 }
1208
1209 fn accept(&self, io: &IoContext<NetworkIoMessage>) {
1210 trace!("Accepting incoming connection");
1211 loop {
1212 let (socket, address) = match self.tcp_listener.lock().accept() {
1213 Ok((sock, addr)) => (sock, addr),
1214 Err(e) => {
1215 if e.kind() != io::ErrorKind::WouldBlock {
1216 debug!("Error accepting connection: {:?}", e);
1217 }
1218 break;
1219 }
1220 };
1221
1222 if let Err(e) = self.create_connection(socket, address, None, io) {
1223 debug!("Can't accept connection: {:?}", e);
1224 }
1225 }
1226 }
1227
1228 fn kill_connection_by_token(
1229 &self, token: StreamToken, io: &IoContext<NetworkIoMessage>,
1230 remote: bool, op: Option<UpdateNodeOperation>, reason: &str,
1231 ) {
1232 let mut to_disconnect: Vec<ProtocolId> = Vec::new();
1233 let mut failure_id = None;
1234 let mut deregister = false;
1235
1236 if let FIRST_SESSION..=LAST_SESSION = token {
1237 if let Some(session) = self.sessions.get(token) {
1238 let mut sess = session.write();
1239 if !sess.expired() {
1240 if sess.is_ready() {
1241 for (p, _) in self.handlers.read().iter() {
1242 if sess.have_capability(*p) {
1243 to_disconnect.push(*p);
1244 sess.send_disconnect(DisconnectReason::Custom(
1245 reason.into(),
1246 ));
1247 }
1248 }
1249 }
1250 sess.set_expired();
1251 }
1252 deregister = remote || sess.done();
1253 failure_id = sess.id().cloned();
1254 debug!(
1255 "kill connection by token, deregister = {}, reason = {:?}, session = {:?}, op = {:?}",
1256 deregister, reason, *sess, op
1257 );
1258 }
1259 }
1260
1261 if let Some(id) = failure_id {
1262 if remote {
1263 if let Some(op) = op {
1264 match op {
1265 UpdateNodeOperation::Failure => {
1266 self.node_db.write().note_failure(
1267 &id, true, false, );
1270 }
1271 UpdateNodeOperation::Demotion => {
1272 let mut node_db = self.node_db.write();
1273 node_db.demote(&id);
1274 node_db.note_demoted(
1275 &id, true, );
1277 }
1278 UpdateNodeOperation::Remove => {
1279 self.node_db.write().set_blacklisted(&id);
1280 }
1281 }
1282 }
1283 }
1284
1285 for p in to_disconnect {
1286 if let Some(h) = self.handlers.read().get(&p).cloned() {
1287 let network_context = NetworkContext::new(io, h, p, self);
1288 network_context
1289 .protocol_handler()
1290 .on_peer_disconnected(&network_context, &id);
1291 }
1292 }
1293 }
1294
1295 if deregister {
1296 io.deregister_stream(token).unwrap_or_else(|e| {
1297 debug!("Error deregistering stream {:?}", e);
1298 });
1299 }
1300 }
1301
1302 fn kill_connection(
1303 &self, node_id: &NodeId, io: &IoContext<NetworkIoMessage>,
1304 remote: bool, op: Option<UpdateNodeOperation>, reason: &str,
1305 ) -> bool {
1306 let mut to_disconnect: Vec<ProtocolId> = Vec::new();
1307 let mut deregister = false;
1308 let mut token = 0;
1309
1310 if let Some(session) = self.sessions.get_by_id(node_id) {
1311 let mut sess = session.write();
1312 if !sess.expired() {
1313 if sess.is_ready() {
1314 for (p, _) in self.handlers.read().iter() {
1315 if sess.have_capability(*p) {
1316 to_disconnect.push(*p);
1317 sess.send_disconnect(DisconnectReason::Custom(
1318 reason.into(),
1319 ));
1320 }
1321 }
1322 }
1323 sess.set_expired();
1324 }
1325 deregister = remote || sess.done();
1326 token = sess.token();
1327 assert_eq!(sess.id().unwrap().clone(), node_id.clone());
1328 debug!(
1329 "kill connection, deregister = {}, reason = {:?}, session = {:?}, op = {:?}",
1330 deregister, reason, *sess, op
1331 );
1332
1333 if remote {
1334 if let Some(op) = op {
1335 match op {
1336 UpdateNodeOperation::Failure => {
1337 self.node_db.write().note_failure(
1338 node_id, true, false, );
1341 }
1342 UpdateNodeOperation::Demotion => {
1343 let mut node_db = self.node_db.write();
1344 node_db.demote(node_id);
1345 node_db.note_demoted(
1346 node_id, true, );
1348 }
1349 UpdateNodeOperation::Remove => {
1350 self.node_db.write().set_blacklisted(node_id);
1351 }
1352 }
1353 }
1354 }
1355 }
1356
1357 for p in to_disconnect {
1358 if let Some(h) = self.handlers.read().get(&p).cloned() {
1359 let network_context = NetworkContext::new(io, h, p, self);
1360 network_context
1361 .protocol_handler()
1362 .on_peer_disconnected(&network_context, node_id);
1363 }
1364 }
1365
1366 if deregister {
1367 io.deregister_stream(token).unwrap_or_else(|e| {
1368 debug!("Error deregistering stream {:?}", e);
1369 });
1370 }
1371
1372 deregister
1373 }
1374
1375 pub fn with_context<F, R>(
1376 &self, handler: Arc<dyn NetworkProtocolHandler + Sync>,
1377 protocol: ProtocolId, io: &IoContext<NetworkIoMessage>, action: F,
1378 ) -> R
1379 where
1380 F: FnOnce(&NetworkContext) -> R,
1381 {
1382 action(&NetworkContext::new(io, handler, protocol, self))
1383 }
1384
1385 fn udp_readable(&self, io: &IoContext<NetworkIoMessage>) {
1386 let udp_socket = self.udp_socket.lock();
1387 let writable;
1388 {
1389 let udp_channel = self.udp_channel.read();
1390 writable = udp_channel.any_sends_queued();
1391 }
1392
1393 let mut buf = [0u8; MAX_DATAGRAM_SIZE];
1394 match udp_socket.recv_from(&mut buf).map_non_block() {
1395 Ok(Some((len, address))) => self
1396 .on_udp_packet(&buf[0..len], address)
1397 .unwrap_or_else(|e| {
1398 debug!("Error processing UDP packet: {:?}", e);
1399 }),
1400 Ok(_) => {}
1401 Err(e) => {
1402 debug!("Error reading UDP socket: {:?}", e);
1403 }
1404 };
1405
1406 let new_writable;
1407 {
1408 let udp_channel = self.udp_channel.read();
1409 new_writable = udp_channel.any_sends_queued();
1410 }
1411
1412 if writable != new_writable {
1416 io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1417 debug!("Error updating UDP registration: {:?}", e)
1418 });
1419 }
1420 }
1421
1422 fn udp_writable(&self, io: &IoContext<NetworkIoMessage>) {
1423 let udp_socket = self.udp_socket.lock();
1424 let mut udp_channel = self.udp_channel.write();
1425 while let Some(data) = udp_channel.dequeue_send() {
1426 match udp_socket
1427 .send_to(&data.payload, data.address)
1428 .map_non_block()
1429 {
1430 Ok(Some(size)) if size == data.payload.len() => {}
1431 Ok(Some(_)) => {
1432 warn!("UDP sent incomplete datagram");
1433 }
1434 Ok(None) => {
1435 udp_channel.requeue_send(data);
1436 return;
1437 }
1438 Err(e) => {
1439 debug!(
1440 "UDP send error: {:?}, address: {:?}",
1441 e, &data.address
1442 );
1443 return;
1444 }
1445 }
1446 }
1447 io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1449 debug!("Error updating UDP registration: {:?}", e)
1450 });
1451 }
1452
1453 fn on_udp_packet(
1454 &self, packet: &[u8], from: SocketAddr,
1455 ) -> Result<(), Error> {
1456 if packet.is_empty() {
1457 return Ok(());
1458 }
1459
1460 let res = match packet[0] {
1461 UDP_PROTOCOL_DISCOVERY => {
1462 if let Some(discovery) = self.discovery.lock().as_mut() {
1463 discovery.on_packet(
1464 &UdpIoContext::new(&self.udp_channel, &self.node_db),
1465 &packet[1..],
1466 from,
1467 )?;
1468 Ok(())
1469 } else {
1470 warn!("Discovery is not ready. Drop the message!");
1471 Ok(())
1472 }
1473 }
1474 _ => {
1475 warn!("Unknown UDP protocol. Simply drops the message!");
1476 Ok(())
1477 }
1478 };
1479 res
1480 }
1481
1482 fn on_check_sessions(&self, io: &IoContext<NetworkIoMessage>) {
1483 let mut disconnect_peers = Vec::new();
1484
1485 for session in self.sessions.all() {
1486 if let Some(sess) = session.try_read() {
1487 if let (true, op) = sess.check_timeout() {
1488 disconnect_peers.push((sess.token(), op));
1489 }
1490 }
1491 }
1492
1493 for (token, op) in disconnect_peers {
1494 self.kill_connection_by_token(
1495 token,
1496 io,
1497 true,
1498 op,
1499 "session timeout", );
1501 }
1502 }
1503}
1504
1505impl IoHandler<NetworkIoMessage> for NetworkServiceInner {
1506 fn initialize(&self, io: &IoContext<NetworkIoMessage>) {
1507 io.register_timer(HOUSEKEEPING, self.config.housekeeping_timeout)
1508 .expect("Error registering housekeeping timer");
1509 io.message(NetworkIoMessage::Start).unwrap_or_else(|e| {
1510 warn!("Error sending IO notification: {:?}", e)
1511 });
1512 }
1513
1514 fn stream_hup(
1515 &self, io: &IoContext<NetworkIoMessage>, stream: StreamToken,
1516 ) {
1517 trace!("Hup: {}", stream);
1518 match stream {
1519 FIRST_SESSION..=LAST_SESSION => self.connection_closed(stream, io),
1520 _ => warn!("Unexpected hup"),
1521 }
1522 }
1523
1524 fn stream_readable(
1525 &self, io: &IoContext<NetworkIoMessage>, stream: StreamToken,
1526 ) {
1527 match stream {
1528 FIRST_SESSION..=LAST_SESSION => self.session_readable(stream, io),
1529 TCP_ACCEPT => self.accept(io),
1530 UDP_MESSAGE => self.udp_readable(io),
1531 _ => panic!("Received unknown readable token"),
1532 }
1533 }
1534
1535 fn stream_writable(
1536 &self, io: &IoContext<NetworkIoMessage>, stream: StreamToken,
1537 ) {
1538 match stream {
1539 FIRST_SESSION..=LAST_SESSION => self.session_writable(stream, io),
1540 UDP_MESSAGE => self.udp_writable(io),
1541 _ => panic!("Received unknown writable token"),
1542 }
1543 }
1544
1545 fn timeout(&self, io: &IoContext<NetworkIoMessage>, token: TimerToken) {
1546 match token {
1547 FIRST_SESSION..=LAST_SESSION => {
1548 debug!("Connection timeout: {}", token);
1549 self.kill_connection_by_token(
1550 token,
1551 io,
1552 true,
1553 Some(UpdateNodeOperation::Failure),
1554 "handshake timeout", );
1556 }
1557 HOUSEKEEPING => self.on_housekeeping(io),
1558 DISCOVERY_REFRESH => {
1559 let disc_general = self.has_enough_outgoing_peers(
1561 None,
1562 self.config.max_outgoing_peers,
1563 );
1564 let disc_archive = self.has_enough_outgoing_peers(
1565 Some((NODE_TAG_NODE_TYPE, NODE_TAG_ARCHIVE)),
1566 self.config.max_outgoing_peers_archive,
1567 );
1568 if disc_general || disc_archive {
1569 self.discovery.lock().as_mut().map(|d| {
1570 d.disc_option.general = disc_general;
1571 d.disc_option.archive = disc_archive;
1572 d.refresh();
1573 });
1574 io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1575 debug!("Error updating discovery registration: {:?}", e)
1576 });
1577 }
1578 }
1579 FAST_DISCOVERY_REFRESH => {
1580 let disc_general = !self.has_enough_outgoing_peers(
1582 None,
1583 self.config.max_outgoing_peers,
1584 );
1585 let disc_archive = !self.has_enough_outgoing_peers(
1586 Some((NODE_TAG_NODE_TYPE, NODE_TAG_ARCHIVE)),
1587 self.config.max_outgoing_peers_archive,
1588 );
1589 if disc_general || disc_archive {
1590 self.discovery.lock().as_mut().map(|d| {
1591 d.disc_option.general = disc_general;
1592 d.disc_option.archive = disc_archive;
1593 d.refresh();
1594 });
1595 io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1596 debug!("Error updating discovery registration: {:?}", e)
1597 });
1598 }
1599 }
1600 DISCOVERY_ROUND => {
1601 if let Some(d) = self.discovery.lock().as_mut() {
1602 d.round(&UdpIoContext::new(
1603 &self.udp_channel,
1604 &self.node_db,
1605 ))
1606 }
1607 io.update_registration(UDP_MESSAGE).unwrap_or_else(|e| {
1608 debug!("Error updating discovery registration: {:?}", e)
1609 });
1610 }
1611 NODE_TABLE => {
1612 trace!("Refreshing node table");
1613 self.try_promote_untrusted();
1614 self.node_db.write().save();
1615 }
1616 CHECK_SESSIONS => self.on_check_sessions(io),
1617 SEND_DELAYED_MESSAGES => {
1618 if let Some(ref queue) = self.delayed_queue {
1619 queue.send_delayed_messages(self);
1620 }
1621 }
1622 _ => match self.timers.read().get(&token).cloned() {
1623 Some(timer) => {
1624 match self.handlers.read().get(&timer.protocol).cloned() {
1625 None => warn!(
1626 "No handler found for protocol: {:?}",
1627 timer.protocol
1628 ),
1629 Some(h) => {
1630 let network_context = NetworkContext::new(
1631 io,
1632 h,
1633 timer.protocol,
1634 self,
1635 );
1636 network_context
1637 .protocol_handler()
1638 .on_timeout(&network_context, timer.token);
1639 }
1640 }
1641 }
1642 None => {
1643 warn!("Unknown timer token: {}", token);
1644 } },
1646 }
1647 }
1648
1649 fn message(
1650 &self, io: &IoContext<NetworkIoMessage>, message: &NetworkIoMessage,
1651 ) {
1652 match message {
1653 NetworkIoMessage::Start => self.start(io).unwrap_or_else(|e| {
1654 warn!("Error starting network service: {:?}", e)
1655 }),
1656 NetworkIoMessage::AddHandler {
1657 handler,
1658 protocol,
1659 version,
1660 callback,
1661 } => {
1662 let h = handler.clone();
1663 let network_context =
1664 NetworkContext::new(io, h, *protocol, self);
1665 network_context
1666 .protocol_handler()
1667 .initialize(&network_context);
1668 self.handlers.write().insert(*protocol, handler.clone());
1669 {
1670 let protocols = &mut *self.metadata.protocols.write();
1671 for protocol_info in protocols.iter() {
1672 assert_ne!(
1673 protocol, &protocol_info.protocol,
1674 "Do not register same protocol twice"
1675 );
1676 }
1677 protocols.push(ProtocolInfo {
1678 protocol: *protocol,
1679 version: *version,
1680 });
1681 self.metadata.minimum_peer_protocol_version.write().push(
1682 ProtocolInfo {
1683 protocol: *protocol,
1684 version: handler.minimum_supported_version(),
1685 },
1686 );
1687 }
1688 info!(
1689 "Protocol {:?} version {:?} registered.",
1690 protocol, version
1691 );
1692 callback.send(()).expect("protocol register error");
1693 }
1694 NetworkIoMessage::AddTimer {
1695 ref protocol,
1696 ref delay,
1697 ref token,
1698 } => {
1699 let handler_token = {
1700 let mut timer_counter = self.timer_counter.write();
1701 let counter = &mut *timer_counter;
1702 let handler_token = *counter;
1703 *counter += 1;
1704 handler_token
1705 };
1706 self.timers.write().insert(
1707 handler_token,
1708 ProtocolTimer {
1709 protocol: *protocol,
1710 token: *token,
1711 },
1712 );
1713 io.register_timer(handler_token, *delay)
1714 .unwrap_or_else(|e| {
1715 debug!("Error registering timer {}: {:?}", token, e)
1716 });
1717 }
1718 NetworkIoMessage::DispatchWork {
1719 ref protocol,
1720 ref work_type,
1721 } => {
1722 if let Some(handler) =
1723 self.handlers.read().get(protocol).cloned()
1724 {
1725 let network_context =
1726 NetworkContext::new(io, handler, *protocol, self);
1727 network_context
1728 .protocol_handler()
1729 .on_work_dispatch(&network_context, *work_type);
1730 } else {
1731 warn!("Work is dispatched to unknown handler");
1732 }
1733 }
1734 NetworkIoMessage::HandleProtocolMessage {
1735 ref protocol,
1736 peer: _,
1737 ref node_id,
1738 ref data,
1739 } => {
1740 debug!("Receive ProtocolMsg {:?}", protocol);
1741 if let Some(handler) =
1742 self.handlers.read().get(protocol).cloned()
1743 {
1744 let network_context =
1745 NetworkContext::new(io, handler, *protocol, self);
1746 network_context.protocol_handler().on_message(
1747 &network_context,
1748 node_id,
1749 data,
1750 );
1751 } else {
1752 warn!("Work is handled by unknown handler");
1753 }
1754 }
1755 }
1756 }
1757
1758 fn register_stream(
1759 &self, stream: StreamToken, reg: Token, poll_registry: &Registry,
1760 ) {
1761 match stream {
1762 FIRST_SESSION..=LAST_SESSION => {
1763 if let Some(session) = self.sessions.get(stream) {
1764 session
1765 .write()
1766 .register_socket(reg, poll_registry)
1767 .expect("Error registering socket");
1768 }
1769 }
1770 TCP_ACCEPT => {
1771 poll_registry
1772 .register(
1773 &mut *self.tcp_listener.lock(),
1774 Token(TCP_ACCEPT),
1775 Interest::READABLE | Interest::WRITABLE,
1776 )
1777 .expect("Error registering stream");
1778 }
1779 UDP_MESSAGE => {
1780 poll_registry
1781 .register(
1782 &mut *self.udp_socket.lock(),
1783 reg,
1784 Interest::READABLE | Interest::WRITABLE,
1785 )
1786 .expect("Error registering UDP socket");
1787 }
1788 _ => warn!("Unexpected stream registeration"),
1789 }
1790 }
1791
1792 fn deregister_stream(&self, stream: StreamToken, poll_registry: &Registry) {
1793 match stream {
1794 FIRST_SESSION..=LAST_SESSION => {
1795 if let Some(session) = self.sessions.get(stream) {
1796 let mut sess = session.write();
1797 if sess.expired() {
1798 sess.deregister_socket(poll_registry)
1799 .expect("Error deregistering socket");
1800 if let Some(node_id) = sess.id() {
1801 self.node_db.write().note_failure(
1802 node_id, true, false, );
1805 }
1806 self.sessions.remove(&*sess);
1807 debug!("Removed session: {:?}", *sess);
1808 }
1809 }
1810 }
1811 _ => warn!("Unexpected stream deregistration"),
1812 }
1813 }
1814
1815 fn update_stream(
1816 &self, stream: StreamToken, reg: Token, poll_registry: &Registry,
1817 ) {
1818 match stream {
1819 FIRST_SESSION..=LAST_SESSION => {
1820 if let Some(session) = self.sessions.get(stream) {
1821 session
1822 .write()
1823 .update_socket(reg, poll_registry)
1824 .expect("Error updating socket");
1825 }
1826 }
1827 TCP_ACCEPT => poll_registry
1828 .reregister(
1829 &mut *self.tcp_listener.lock(),
1830 Token(TCP_ACCEPT),
1831 Interest::READABLE | Interest::WRITABLE,
1832 )
1833 .expect("Error reregistering stream"),
1834 UDP_MESSAGE => {
1835 let mut udp_socket = self.udp_socket.lock();
1836 let udp_channel = self.udp_channel.read();
1837
1838 let registration = if udp_channel.any_sends_queued() {
1839 Interest::READABLE | Interest::WRITABLE
1840 } else {
1841 Interest::READABLE
1842 };
1843 poll_registry
1844 .reregister(&mut *udp_socket, reg, registration)
1845 .expect("Error reregistering UDP socket");
1846 }
1847 _ => warn!("Unexpected stream update"),
1848 }
1849 }
1850}
1851
1852struct DelayMessageContext {
1853 ts: Instant,
1854 io: IoContext<NetworkIoMessage>,
1855 protocol: ProtocolId,
1856 session: SharedSession,
1857 peer: NodeId,
1858 msg: Vec<u8>,
1859 min_protocol_version: ProtocolVersion,
1861 priority: SendQueuePriority,
1862}
1863
1864impl DelayMessageContext {
1865 pub fn new(
1866 ts: Instant, io: IoContext<NetworkIoMessage>, protocol: ProtocolId,
1867 session: SharedSession, peer: NodeId, msg: Vec<u8>,
1868 min_protocol_version: ProtocolVersion, priority: SendQueuePriority,
1869 ) -> Self {
1870 DelayMessageContext {
1871 ts,
1872 io,
1873 protocol,
1874 session,
1875 peer,
1876 msg,
1877 min_protocol_version,
1878 priority,
1879 }
1880 }
1881}
1882
1883impl Ord for DelayMessageContext {
1884 fn cmp(&self, other: &Self) -> Ordering { other.ts.cmp(&self.ts) }
1885}
1886
1887impl PartialOrd for DelayMessageContext {
1888 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1889 Some(self.cmp(other))
1890 }
1891}
1892
1893impl Eq for DelayMessageContext {}
1894
1895impl PartialEq for DelayMessageContext {
1896 fn eq(&self, other: &Self) -> bool { self.ts == other.ts }
1897}
1898
1899pub struct NetworkContext<'a> {
1901 io: &'a IoContext<NetworkIoMessage>,
1902 handler: Arc<dyn NetworkProtocolHandler + Sync>,
1903 protocol: ProtocolId,
1904 min_supported_version: ProtocolVersion,
1905 network_service: &'a NetworkServiceInner,
1906}
1907
1908impl<'a> NetworkContext<'a> {
1909 pub fn new(
1910 io: &'a IoContext<NetworkIoMessage>,
1911 handler: Arc<dyn NetworkProtocolHandler + Sync>, protocol: ProtocolId,
1912 network_service: &'a NetworkServiceInner,
1913 ) -> NetworkContext<'a> {
1914 let min_supported_version = handler.minimum_supported_version();
1915 NetworkContext {
1916 io,
1917 handler,
1918 protocol,
1919 min_supported_version,
1920 network_service,
1921 }
1922 }
1923
1924 pub fn protocol_handler(&self) -> &dyn NetworkProtocolHandler {
1925 &*self.handler
1926 }
1927}
1928
1929impl<'a> NetworkContextTrait for NetworkContext<'a> {
1930 fn get_protocol(&self) -> ProtocolId { self.protocol }
1931
1932 fn get_peer_connection_origin(&self, node_id: &NodeId) -> Option<bool> {
1933 self.network_service.get_peer_connection_origin(node_id)
1934 }
1935
1936 fn is_peer_self(&self, node_id: &NodeId) -> bool {
1937 *node_id == *self.network_service.metadata.id()
1938 }
1939
1940 fn self_node_id(&self) -> NodeId { *self.network_service.metadata.id() }
1941
1942 fn send(
1944 &self, node_id: &NodeId, msg: Vec<u8>,
1945 min_protocol_version: ProtocolVersion,
1946 version_valid_till: ProtocolVersion, priority: SendQueuePriority,
1947 ) -> Result<(), Error> {
1948 if version_valid_till < self.min_supported_version {
1949 bail!(Error::SendUnsupportedMessage {
1950 protocol: self.protocol,
1951 msg_id: parse_msg_id_leb128_2_bytes_at_most(&mut &*msg),
1952 peer_protocol_version: None,
1953 min_supported_version: Some(self.min_supported_version),
1954 });
1955 }
1956
1957 if *node_id == *self.network_service.metadata.id() {
1958 self.handler.send_local_message(self, msg);
1959 return Ok(());
1960 }
1961
1962 let session = self.network_service.sessions.get_by_id(node_id);
1963 trace!("Sending {} bytes to {}", msg.len(), node_id);
1964 if let Some(session) = session {
1965 let latency =
1966 self.network_service.delayed_queue.as_ref().and_then(|q| {
1967 session
1968 .write()
1969 .metadata
1970 .id
1971 .and_then(|id| q.latencies.read().get(&id).copied())
1972 });
1973 match latency {
1974 Some(latency) => {
1975 let q =
1976 self.network_service.delayed_queue.as_ref().unwrap();
1977 let mut queue = q.queue.lock();
1978 let ts_to_send = Instant::now() + latency;
1979 queue.push(DelayMessageContext::new(
1980 ts_to_send,
1981 self.io.clone(),
1982 self.protocol,
1983 session,
1984 *node_id,
1985 msg,
1986 min_protocol_version,
1987 priority,
1988 ));
1989 self.io.register_timer_once_nocancel(
1990 SEND_DELAYED_MESSAGES,
1991 latency,
1992 )?;
1993 trace!("register delayed timer delay:{:?} ts_to_send:{:?} length:{}", latency, ts_to_send, queue.len());
1994 }
1995 None => {
1996 session.write().send_packet(
1997 self.io,
1998 Some(self.protocol),
1999 min_protocol_version,
2000 session::PACKET_USER,
2001 msg,
2002 priority,
2003 )?;
2004 }
2005 }
2006 }
2008 Ok(())
2009 }
2010
2011 fn disconnect_peer(
2012 &self, node_id: &NodeId, op: Option<UpdateNodeOperation>, reason: &str,
2013 ) {
2014 self.network_service
2015 .kill_connection(node_id, self.io, true, op, reason);
2016 }
2017
2018 fn register_timer(
2019 &self, token: TimerToken, delay: Duration,
2020 ) -> Result<(), Error> {
2021 self.io
2022 .message(NetworkIoMessage::AddTimer {
2023 token,
2024 delay,
2025 protocol: self.protocol,
2026 })
2027 .unwrap_or_else(|e| {
2028 warn!("Error sending network IO message: {:?}", e)
2029 });
2030 Ok(())
2031 }
2032
2033 fn dispatch_work(&self, work_type: HandlerWorkType) {
2034 self.io
2035 .message(NetworkIoMessage::DispatchWork {
2036 protocol: self.protocol,
2037 work_type,
2038 })
2039 .expect("Error sending network IO message");
2040 }
2041
2042 fn insert_peer_node_tag(&self, peer: NodeId, key: &str, value: &str) {
2043 self.network_service
2044 .node_db
2045 .write()
2046 .set_tag(peer, key, value);
2047 }
2048}
2049
2050fn save_key(path: &Path, key: &Secret) {
2051 let mut path_buf = PathBuf::from(path);
2052 if let Err(e) = fs::create_dir_all(path_buf.as_path()) {
2053 warn!("Error creating key directory: {:?}", e);
2054 return;
2055 };
2056 path_buf.push("key");
2057 let path = path_buf.as_path();
2058 let mut file = match fs::File::create(&path) {
2059 Ok(file) => file,
2060 Err(e) => {
2061 warn!("Error creating key file: {:?}", e);
2062 return;
2063 }
2064 };
2065 if let Err(e) = restrict_permissions_owner(path, true, false) {
2066 warn!("Failed to modify permissions of the file ({})", e);
2067 }
2068 if let Err(e) = file.write(&key.to_hex().into_bytes()) {
2069 warn!("Error writing key file: {:?}", e);
2070 }
2071}
2072
2073fn load_key(path: &Path) -> Option<Secret> {
2074 let mut path_buf = PathBuf::from(path);
2075 path_buf.push("key");
2076 let mut file = match fs::File::open(path_buf.as_path()) {
2077 Ok(file) => file,
2078 Err(e) => {
2079 debug!("failed to open key file: {:?}", e);
2080 return None;
2081 }
2082 };
2083 let mut buf = String::new();
2084 match file.read_to_string(&mut buf) {
2085 Ok(_) => {}
2086 Err(e) => {
2087 warn!("Error reading key file: {:?}", e);
2088 return None;
2089 }
2090 }
2091 match Secret::from_str(&buf) {
2092 Ok(key) => Some(key),
2093 Err(e) => {
2094 warn!("Error parsing key file: {:?}", e);
2095 None
2096 }
2097 }
2098}
2099
2100pub fn load_pos_private_key(
2101 path: &Path,
2102) -> Option<(ConsensusPrivateKey, Option<ConsensusVRFPrivateKey>)> {
2103 let mut path_buf = PathBuf::from(path);
2104 path_buf.push("pos_key");
2105 let mut file = match fs::File::open(path_buf.as_path()) {
2106 Ok(file) => file,
2107 Err(e) => {
2108 debug!("failed to open key file: {:?}", e);
2109 return None;
2110 }
2111 };
2112 let mut buf = String::new();
2113 match file.read_to_string(&mut buf) {
2114 Ok(_) => {}
2115 Err(e) => {
2116 warn!("Error reading key file: {:?}", e);
2117 return None;
2118 }
2119 }
2120 let key_str: Vec<_> = buf.split(",").collect();
2121 let private_key =
2122 match ConsensusPrivateKey::from_encoded_string(&key_str[0]) {
2123 Ok(key) => Some(key),
2124 Err(e) => {
2125 warn!("Error parsing key file: {:?}", e);
2126 None
2127 }
2128 }?;
2129 if key_str.len() <= 1 {
2130 return Some((private_key, None));
2131 }
2132 let vrf_private_key =
2133 match ConsensusVRFPrivateKey::from_encoded_string(&key_str[1]) {
2134 Ok(key) => Some(key),
2135 Err(e) => {
2136 warn!("Error parsing key file: {:?}", e);
2137 None
2138 }
2139 }?;
2140 Some((private_key, Some(vrf_private_key)))
2141}
2142
2143impl std::fmt::Display for ProtocolVersion {
2144 fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> {
2145 write!(f, "{}", self.0)
2146 }
2147}
2148
2149impl std::ops::Deref for ProtocolVersion {
2150 type Target = u8;
2151
2152 fn deref(&self) -> &Self::Target { &self.0 }
2153}
2154
2155impl Encodable for ProtocolVersion {
2156 fn rlp_append(&self, s: &mut RlpStream) { s.append_internal(&self.0); }
2157}
2158
2159impl Decodable for ProtocolVersion {
2160 fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
2161 Ok(Self(rlp.as_val()?))
2162 }
2163}