cfxcore/sync/message/
capability.rs1use crate::{
6 message::Message,
7 sync::{
8 message::{Context, Handleable},
9 Error, SynchronizationState,
10 },
11};
12use malloc_size_of_derive::MallocSizeOf as DeriveMallocSizeOf;
13use network::{node_table::NodeId, NetworkContext};
14use rlp::{Decodable, DecoderError, Encodable, Rlp, RlpStream};
15
16#[derive(Debug, Eq, PartialEq, Clone, Copy, DeriveMallocSizeOf)]
17pub enum DynamicCapability {
18 NormalPhase(bool), ServeHeaders(bool), }
21
22impl DynamicCapability {
23 fn code(&self) -> u8 {
24 match self {
25 DynamicCapability::NormalPhase(_) => 0,
26 DynamicCapability::ServeHeaders(_) => 1,
27 }
28 }
29
30 pub fn broadcast_with_peers(
31 self, io: &dyn NetworkContext, peers: Vec<NodeId>,
32 ) {
33 let msg = DynamicCapabilityChange { changed: self };
34
35 for peer in peers {
36 if let Err(e) = msg.send(io, &peer) {
37 debug!("Failed to send capability change message, peer = {}, message = {:?}, err = {:?}", peer, msg, e);
38 }
39 }
40 }
41
42 pub fn broadcast(
43 self, io: &dyn NetworkContext, syn: &SynchronizationState,
44 ) {
45 let peers = syn.peers.read().keys().cloned().collect();
46 self.broadcast_with_peers(io, peers);
47 }
48}
49
50impl Encodable for DynamicCapability {
51 fn rlp_append(&self, s: &mut RlpStream) {
52 s.begin_list(2).append(&self.code());
53
54 match self {
55 DynamicCapability::NormalPhase(enabled) => s.append(enabled),
56 DynamicCapability::ServeHeaders(enabled) => s.append(enabled),
57 };
58 }
59}
60
61impl Decodable for DynamicCapability {
62 fn decode(rlp: &Rlp) -> Result<Self, DecoderError> {
63 if rlp.item_count()? != 2 {
64 return Err(DecoderError::RlpIncorrectListLen);
65 }
66
67 match rlp.val_at::<u8>(0)? {
68 0 => Ok(DynamicCapability::NormalPhase(rlp.val_at(1)?)),
69 1 => Ok(DynamicCapability::ServeHeaders(rlp.val_at(1)?)),
70 _ => Err(DecoderError::Custom("invalid capability code")),
71 }
72 }
73}
74
75#[derive(Debug, Default, DeriveMallocSizeOf)]
76pub struct DynamicCapabilitySet {
77 caps: [Option<DynamicCapability>; 3],
78}
79
80impl DynamicCapabilitySet {
81 pub fn insert(&mut self, cap: DynamicCapability) {
82 self.caps[cap.code() as usize] = Some(cap);
83 }
84
85 pub fn contains(&self, cap: DynamicCapability) -> bool {
86 match self.caps[cap.code() as usize].as_ref() {
87 Some(cur_cap) => cur_cap == &cap,
88 None => return false,
89 }
90 }
91}
92
93#[derive(Debug)]
94pub struct DynamicCapabilityChange {
95 pub changed: DynamicCapability,
96}
97
98impl Encodable for DynamicCapabilityChange {
99 fn rlp_append(&self, s: &mut RlpStream) {
100 s.append_internal(&self.changed);
101 }
102}
103
104impl Decodable for DynamicCapabilityChange {
105 fn decode(d: &Rlp) -> Result<Self, DecoderError> {
106 let changed = d.as_val()?;
107 Ok(DynamicCapabilityChange { changed })
108 }
109}
110
111impl Handleable for DynamicCapabilityChange {
112 fn handle(self, ctx: &Context) -> Result<(), Error> {
113 debug!(
114 "handle dynamic_capability_change: msg={:?}, peer={}",
115 self, ctx.node_id
116 );
117 let peer = ctx.manager.syn.get_peer_info(&ctx.node_id)?;
118 peer.write().capabilities.insert(self.changed);
119 Ok(())
120 }
121}