cfxcore/light_protocol/handler/sync/
state_entries.rs1use super::{
6 common::{FutureItem, PendingItem, SyncManager, TimeOrdered},
7 state_roots::StateRoots,
8};
9use crate::{
10 light_protocol::{
11 common::{FullPeerState, Peers},
12 error::*,
13 message::{
14 msgid, GetStateEntries, StateEntryProof, StateEntryWithKey,
15 StateKey,
16 },
17 },
18 message::{Message, RequestId},
19 UniqueId,
20};
21use cfx_parameters::light::{
22 CACHE_TIMEOUT, MAX_STATE_ENTRIES_IN_FLIGHT, STATE_ENTRY_REQUEST_BATCH_SIZE,
23 STATE_ENTRY_REQUEST_TIMEOUT,
24};
25use futures::future::FutureExt;
26use lru_time_cache::LruCache;
27use network::{node_table::NodeId, NetworkContext};
28use parking_lot::RwLock;
29use primitives::StorageKeyWithSpace;
30use std::{future::Future, sync::Arc};
31
32pub type StateEntry = Option<Vec<u8>>;
33
34#[derive(Debug)]
35#[allow(dead_code)]
36struct Statistics {
37 cached: usize,
38 in_flight: usize,
39 waiting: usize,
40}
41
42type MissingStateEntry = TimeOrdered<StateKey>;
43
44type PendingStateEntry = PendingItem<StateEntry, ClonableError>;
45
46pub struct StateEntries {
47 request_id_allocator: Arc<UniqueId>,
49
50 state_roots: Arc<StateRoots>,
52
53 sync_manager: SyncManager<StateKey, MissingStateEntry>,
55
56 verified: Arc<RwLock<LruCache<StateKey, PendingStateEntry>>>,
58}
59
60impl StateEntries {
61 pub fn new(
62 peers: Arc<Peers<FullPeerState>>, state_roots: Arc<StateRoots>,
63 request_id_allocator: Arc<UniqueId>,
64 ) -> Self {
65 let sync_manager =
66 SyncManager::new(peers.clone(), msgid::GET_STATE_ENTRIES);
67
68 let cache = LruCache::with_expiry_duration(*CACHE_TIMEOUT);
69 let verified = Arc::new(RwLock::new(cache));
70
71 StateEntries {
72 request_id_allocator,
73 sync_manager,
74 verified,
75 state_roots,
76 }
77 }
78
79 #[inline]
80 pub fn print_stats(&self) {
81 debug!(
82 "state entry sync statistics: {:?}",
83 Statistics {
84 cached: self.verified.read().len(),
85 in_flight: self.sync_manager.num_in_flight(),
86 waiting: self.sync_manager.num_waiting(),
87 }
88 );
89 }
90
91 #[inline]
92 pub fn request_now(
93 &self, io: &dyn NetworkContext, epoch: u64, key: Vec<u8>,
94 ) -> impl Future<Output = Result<StateEntry>> {
95 let mut verified = self.verified.write();
96 let key = StateKey { epoch, key };
97
98 if !verified.contains_key(&key) {
99 let missing = std::iter::once(MissingStateEntry::new(key.clone()));
100
101 self.sync_manager.request_now(missing, |peer, keys| {
102 self.send_request(io, peer, keys)
103 });
104 }
105
106 verified
107 .entry(key.clone())
108 .or_insert(PendingItem::pending())
109 .clear_error();
110
111 FutureItem::new(key, self.verified.clone())
112 .map(|res| res.map_err(|e| e.into()))
113 }
114
115 #[inline]
116 pub fn receive(
117 &self, peer: &NodeId, id: RequestId,
118 entries: impl Iterator<Item = StateEntryWithKey>,
119 ) -> Result<()> {
120 for StateEntryWithKey { key, entry, proof } in entries {
121 trace!(
122 "Validating state entry {:?} with key {:?} and proof {:?}",
123 entry,
124 key,
125 proof
126 );
127
128 match self.sync_manager.check_if_requested(peer, id, &key)? {
129 None => continue,
130 Some(_) => self.validate_and_store(key, entry, proof)?,
131 };
132 }
133
134 Ok(())
135 }
136
137 #[inline]
138 pub fn validate_and_store(
139 &self, key: StateKey, entry: Option<Vec<u8>>, proof: StateEntryProof,
140 ) -> Result<()> {
141 if let Err(e) =
143 self.validate_state_entry(key.epoch, &key.key, &entry, proof)
144 {
145 let e = ClonableError::from(e);
148
149 self.verified
150 .write()
151 .entry(key.clone())
152 .or_insert(PendingItem::pending())
153 .set_error(e.clone());
154
155 bail!(e);
156 }
157
158 self.verified
160 .write()
161 .entry(key.clone())
162 .or_insert(PendingItem::pending())
163 .set(entry);
164
165 self.sync_manager.remove_in_flight(&key);
166
167 Ok(())
168 }
169
170 #[inline]
171 pub fn clean_up(&self) {
172 let timeout = *STATE_ENTRY_REQUEST_TIMEOUT;
174 let entries = self.sync_manager.remove_timeout_requests(timeout);
175 trace!("Timeout state-entries ({}): {:?}", entries.len(), entries);
176 self.sync_manager.insert_waiting(entries.into_iter());
177
178 self.verified.write().get(&Default::default());
180 }
181
182 #[inline]
183 fn send_request(
184 &self, io: &dyn NetworkContext, peer: &NodeId, keys: Vec<StateKey>,
185 ) -> Result<Option<RequestId>> {
186 if keys.is_empty() {
187 return Ok(None);
188 }
189
190 let request_id = self.request_id_allocator.next();
191
192 trace!(
193 "send_request GetStateEntries peer={:?} id={:?} keys={:?}",
194 peer,
195 request_id,
196 keys
197 );
198
199 let msg: Box<dyn Message> =
200 Box::new(GetStateEntries { request_id, keys });
201
202 msg.send(io, peer)?;
203 Ok(Some(request_id))
204 }
205
206 #[inline]
207 pub fn sync(&self, io: &dyn NetworkContext) {
208 self.sync_manager.sync(
209 MAX_STATE_ENTRIES_IN_FLIGHT,
210 STATE_ENTRY_REQUEST_BATCH_SIZE,
211 |peer, keys| self.send_request(io, peer, keys),
212 );
213 }
214
215 #[inline]
216 fn validate_state_entry(
217 &self, epoch: u64, key: &Vec<u8>, value: &Option<Vec<u8>>,
218 proof: StateEntryProof,
219 ) -> Result<()> {
220 let state_root = proof.state_root;
222
223 self.state_roots
224 .validate_state_root(epoch, &state_root)
225 .map_err(|e| Error::InvalidStateProof {
226 epoch,
227 key: key.clone(),
228 value: value.clone(),
229 reason: "Validation of current state root failed",
230 source: Some(Box::new(e)),
231 })?;
232
233 let maybe_prev_root = proof.prev_snapshot_state_root;
235
236 self.state_roots
237 .validate_prev_snapshot_state_root(epoch, &maybe_prev_root)
238 .map_err(|e| Error::InvalidStateProof {
239 epoch,
240 key: key.clone(),
241 value: value.clone(),
242 reason: "Validation of previous state root failed",
243 source: Some(Box::new(e)),
244 })?;
245
246 let maybe_intermediate_padding = maybe_prev_root.map(|root| {
248 StorageKeyWithSpace::delta_mpt_padding(
249 &root.snapshot_root,
250 &root.intermediate_delta_root,
251 )
252 });
253
254 if !proof.state_proof.is_valid_kv(
256 key,
257 value.as_ref().map(|v| &**v),
258 state_root,
259 maybe_intermediate_padding,
260 ) {
261 bail!(Error::InvalidStateProof {
262 epoch,
263 key: key.clone(),
264 value: value.clone(),
265 reason: "Validation of merkle proof failed",
266 source: None
267 });
268 }
269
270 Ok(())
271 }
272}