cfxcore/sync/message/
capability.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::{
6    message::Message,
7    sync::{
8        message::{Context, Handleable},
9        Error, SynchronizationState,
10    },
11};
12use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
13use network::{node_table::NodeId, NetworkContext};
14use primitives::CompatBool;
15use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
16
17#[derive(Debug, Eq, PartialEq, Clone, Copy, DeriveMallocSizeOf)]
18pub enum DynamicCapability {
19    NormalPhase(bool),  // provide tx relay
20    ServeHeaders(bool), // provide block header downloads
21}
22
23impl DynamicCapability {
24    fn code(&self) -> u8 {
25        match self {
26            DynamicCapability::NormalPhase(_) => 0,
27            DynamicCapability::ServeHeaders(_) => 1,
28        }
29    }
30
31    pub fn broadcast_with_peers(
32        self, io: &dyn NetworkContext, peers: Vec<NodeId>,
33    ) {
34        let msg = DynamicCapabilityChange { changed: self };
35
36        for peer in peers {
37            if let Err(e) = msg.send(io, &peer) {
38                debug!("Failed to send capability change message, peer = {}, message = {:?}, err = {:?}", peer, msg, e);
39            }
40        }
41    }
42
43    pub fn broadcast(
44        self, io: &dyn NetworkContext, syn: &SynchronizationState,
45    ) {
46        let peers = syn.peers.read().keys().cloned().collect();
47        self.broadcast_with_peers(io, peers);
48    }
49}
50
51impl Encodable for DynamicCapability {
52    fn rlp_append(&self, s: &mut RlpStream) {
53        s.begin_list(2).append(&self.code());
54
55        match self {
56            DynamicCapability::NormalPhase(enabled) => {
57                s.append(&CompatBool(*enabled))
58            }
59            DynamicCapability::ServeHeaders(enabled) => {
60                s.append(&CompatBool(*enabled))
61            }
62        };
63    }
64}
65
66impl Decodable for DynamicCapability {
67    fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
68        if rlp.item_count()? != 2 {
69            return Err(DecoderError::RlpIncorrectListLen);
70        }
71
72        match rlp.val_at::<u8>(0)? {
73            0 => Ok(DynamicCapability::NormalPhase(
74                rlp.val_at::<CompatBool>(1)?.0,
75            )),
76            1 => Ok(DynamicCapability::ServeHeaders(
77                rlp.val_at::<CompatBool>(1)?.0,
78            )),
79            _ => Err(DecoderError::Custom("invalid capability code")),
80        }
81    }
82}
83
84#[derive(Debug, Default, DeriveMallocSizeOf)]
85pub struct DynamicCapabilitySet {
86    caps: [Option<DynamicCapability>; 3],
87}
88
89impl DynamicCapabilitySet {
90    pub fn insert(&mut self, cap: DynamicCapability) {
91        self.caps[cap.code() as usize] = Some(cap);
92    }
93
94    pub fn contains(&self, cap: DynamicCapability) -> bool {
95        match self.caps[cap.code() as usize].as_ref() {
96            Some(cur_cap) => cur_cap == &cap,
97            None => return false,
98        }
99    }
100}
101
102#[derive(Debug)]
103pub struct DynamicCapabilityChange {
104    pub changed: DynamicCapability,
105}
106
107impl Encodable for DynamicCapabilityChange {
108    fn rlp_append(&self, s: &mut RlpStream) {
109        s.append_internal(&self.changed);
110    }
111}
112
113impl Decodable for DynamicCapabilityChange {
114    fn decode(d: &Rlp) -> Result<Self, DecoderError> {
115        let changed = d.as_val()?;
116        Ok(DynamicCapabilityChange { changed })
117    }
118}
119
120impl Handleable for DynamicCapabilityChange {
121    fn handle(self, ctx: &Context) -> Result<(), Error> {
122        debug!(
123            "handle dynamic_capability_change: msg={:?}, peer={}",
124            self, ctx.node_id
125        );
126        let peer = ctx.manager.syn.get_peer_info(&ctx.node_id)?;
127        peer.write().capabilities.insert(self.changed);
128        Ok(())
129    }
130}