cfxcore/sync/message/
capability.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    message::Message,
7    sync::{
8        message::{Context, Handleable},
9        Error, SynchronizationState,
10    },
11};
12use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
13use network::{node_table::NodeId, NetworkContext};
14use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
15
16#[derive(Debug, Eq, PartialEq, Clone, Copy, DeriveMallocSizeOf)]
17pub enum DynamicCapability {
18    NormalPhase(bool),  // provide tx relay
19    ServeHeaders(bool), // provide block header downloads
20}
21
22impl DynamicCapability {
23    fn code(&self) -> u8 {
24        match self {
25            DynamicCapability::NormalPhase(_) => 0,
26            DynamicCapability::ServeHeaders(_) => 1,
27        }
28    }
29
30    pub fn broadcast_with_peers(
31        self, io: &dyn NetworkContext, peers: Vec<NodeId>,
32    ) {
33        let msg = DynamicCapabilityChange { changed: self };
34
35        for peer in peers {
36            if let Err(e) = msg.send(io, &peer) {
37                debug!("Failed to send capability change message, peer = {}, message = {:?}, err = {:?}", peer, msg, e);
38            }
39        }
40    }
41
42    pub fn broadcast(
43        self, io: &dyn NetworkContext, syn: &SynchronizationState,
44    ) {
45        let peers = syn.peers.read().keys().cloned().collect();
46        self.broadcast_with_peers(io, peers);
47    }
48}
49
50impl Encodable for DynamicCapability {
51    fn rlp_append(&self, s: &mut RlpStream) {
52        s.begin_list(2).append(&self.code());
53
54        match self {
55            DynamicCapability::NormalPhase(enabled) => s.append(enabled),
56            DynamicCapability::ServeHeaders(enabled) => s.append(enabled),
57        };
58    }
59}
60
61impl Decodable for DynamicCapability {
62    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
63        if rlp.item_count()? != 2 {
64            return Err(DecoderError::RlpIncorrectListLen);
65        }
66
67        match rlp.val_at::<u8>(0)? {
68            0 => Ok(DynamicCapability::NormalPhase(rlp.val_at(1)?)),
69            1 => Ok(DynamicCapability::ServeHeaders(rlp.val_at(1)?)),
70            _ => Err(DecoderError::Custom("invalid capability code")),
71        }
72    }
73}
74
75#[derive(Debug, Default, DeriveMallocSizeOf)]
76pub struct DynamicCapabilitySet {
77    caps: [Option<DynamicCapability>; 3],
78}
79
80impl DynamicCapabilitySet {
81    pub fn insert(&mut self, cap: DynamicCapability) {
82        self.caps[cap.code() as usize] = Some(cap);
83    }
84
85    pub fn contains(&self, cap: DynamicCapability) -> bool {
86        match self.caps[cap.code() as usize].as_ref() {
87            Some(cur_cap) => cur_cap == &cap,
88            None => return false,
89        }
90    }
91}
92
93#[derive(Debug)]
94pub struct DynamicCapabilityChange {
95    pub changed: DynamicCapability,
96}
97
98impl Encodable for DynamicCapabilityChange {
99    fn rlp_append(&self, s: &mut RlpStream) {
100        s.append_internal(&self.changed);
101    }
102}
103
104impl Decodable for DynamicCapabilityChange {
105    fn decode(d: &Rlp) -> Result<Self, DecoderError> {
106        let changed = d.as_val()?;
107        Ok(DynamicCapabilityChange { changed })
108    }
109}
110
111impl Handleable for DynamicCapabilityChange {
112    fn handle(self, ctx: &Context) -> Result<(), Error> {
113        debug!(
114            "handle dynamic_capability_change: msg={:?}, peer={}",
115            self, ctx.node_id
116        );
117        let peer = ctx.manager.syn.get_peer_info(&ctx.node_id)?;
118        peer.write().capabilities.insert(self.changed);
119        Ok(())
120    }
121}