cfxcore/sync/message/
capability.rs1use crate::{
6 message::Message,
7 sync::{
8 message::{Context, Handleable},
9 Error, SynchronizationState,
10 },
11};
12use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
13use network::{node_table::NodeId, NetworkContext};
14use primitives::CompatBool;
15use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
16
17#[derive(Debug, Eq, PartialEq, Clone, Copy, DeriveMallocSizeOf)]
18pub enum DynamicCapability {
19 NormalPhase(bool), ServeHeaders(bool), }
22
23impl DynamicCapability {
24 fn code(&self) -> u8 {
25 match self {
26 DynamicCapability::NormalPhase(_) => 0,
27 DynamicCapability::ServeHeaders(_) => 1,
28 }
29 }
30
31 pub fn broadcast_with_peers(
32 self, io: &dyn NetworkContext, peers: Vec<NodeId>,
33 ) {
34 let msg = DynamicCapabilityChange { changed: self };
35
36 for peer in peers {
37 if let Err(e) = msg.send(io, &peer) {
38 debug!("Failed to send capability change message, peer = {}, message = {:?}, err = {:?}", peer, msg, e);
39 }
40 }
41 }
42
43 pub fn broadcast(
44 self, io: &dyn NetworkContext, syn: &SynchronizationState,
45 ) {
46 let peers = syn.peers.read().keys().cloned().collect();
47 self.broadcast_with_peers(io, peers);
48 }
49}
50
51impl Encodable for DynamicCapability {
52 fn rlp_append(&self, s: &mut RlpStream) {
53 s.begin_list(2).append(&self.code());
54
55 match self {
56 DynamicCapability::NormalPhase(enabled) => {
57 s.append(&CompatBool(*enabled))
58 }
59 DynamicCapability::ServeHeaders(enabled) => {
60 s.append(&CompatBool(*enabled))
61 }
62 };
63 }
64}
65
66impl Decodable for DynamicCapability {
67 fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
68 if rlp.item_count()? != 2 {
69 return Err(DecoderError::RlpIncorrectListLen);
70 }
71
72 match rlp.val_at::<u8>(0)? {
73 0 => Ok(DynamicCapability::NormalPhase(
74 rlp.val_at::<CompatBool>(1)?.0,
75 )),
76 1 => Ok(DynamicCapability::ServeHeaders(
77 rlp.val_at::<CompatBool>(1)?.0,
78 )),
79 _ => Err(DecoderError::Custom("invalid capability code")),
80 }
81 }
82}
83
84#[derive(Debug, Default, DeriveMallocSizeOf)]
85pub struct DynamicCapabilitySet {
86 caps: [Option<DynamicCapability>; 3],
87}
88
89impl DynamicCapabilitySet {
90 pub fn insert(&mut self, cap: DynamicCapability) {
91 self.caps[cap.code() as usize] = Some(cap);
92 }
93
94 pub fn contains(&self, cap: DynamicCapability) -> bool {
95 match self.caps[cap.code() as usize].as_ref() {
96 Some(cur_cap) => cur_cap == &cap,
97 None => return false,
98 }
99 }
100}
101
102#[derive(Debug)]
103pub struct DynamicCapabilityChange {
104 pub changed: DynamicCapability,
105}
106
107impl Encodable for DynamicCapabilityChange {
108 fn rlp_append(&self, s: &mut RlpStream) {
109 s.append_internal(&self.changed);
110 }
111}
112
113impl Decodable for DynamicCapabilityChange {
114 fn decode(d: &Rlp) -> Result<Self, DecoderError> {
115 let changed = d.as_val()?;
116 Ok(DynamicCapabilityChange { changed })
117 }
118}
119
120impl Handleable for DynamicCapabilityChange {
121 fn handle(self, ctx: &Context) -> Result<(), Error> {
122 debug!(
123 "handle dynamic_capability_change: msg={:?}, peer={}",
124 self, ctx.node_id
125 );
126 let peer = ctx.manager.syn.get_peer_info(&ctx.node_id)?;
127 peer.write().capabilities.insert(self.changed);
128 Ok(())
129 }
130}