cfxcore/light_protocol/handler/sync/
state_entries.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use super::{
6    common::{FutureItem, PendingItem, SyncManager, TimeOrdered},
7    state_roots::StateRoots,
8};
9use crate::{
10    light_protocol::{
11        common::{FullPeerState, Peers},
12        error::*,
13        message::{
14            msgid, GetStateEntries, StateEntryProof, StateEntryWithKey,
15            StateKey,
16        },
17    },
18    message::{Message, RequestId},
19    UniqueId,
20};
21use cfx_parameters::light::{
22    CACHE_TIMEOUT, MAX_STATE_ENTRIES_IN_FLIGHT, STATE_ENTRY_REQUEST_BATCH_SIZE,
23    STATE_ENTRY_REQUEST_TIMEOUT,
24};
25use futures::future::FutureExt;
26use lru_time_cache::LruCache;
27use network::{node_table::NodeId, NetworkContext};
28use parking_lot::RwLock;
29use primitives::StorageKeyWithSpace;
30use std::{future::Future, sync::Arc};
31
32pub type StateEntry = Option<Vec<u8>>;
33
34#[derive(Debug)]
35#[allow(dead_code)]
36struct Statistics {
37    cached: usize,
38    in_flight: usize,
39    waiting: usize,
40}
41
42type MissingStateEntry = TimeOrdered<StateKey>;
43
44type PendingStateEntry = PendingItem<StateEntry, ClonableError>;
45
46pub struct StateEntries {
47    // series of unique request ids
48    request_id_allocator: Arc<UniqueId>,
49
50    // state_root sync manager
51    state_roots: Arc<StateRoots>,
52
53    // sync and request manager
54    sync_manager: SyncManager<StateKey, MissingStateEntry>,
55
56    // state entries received from full node
57    verified: Arc<RwLock<LruCache<StateKey, PendingStateEntry>>>,
58}
59
60impl StateEntries {
61    pub fn new(
62        peers: Arc<Peers<FullPeerState>>, state_roots: Arc<StateRoots>,
63        request_id_allocator: Arc<UniqueId>,
64    ) -> Self {
65        let sync_manager =
66            SyncManager::new(peers.clone(), msgid::GET_STATE_ENTRIES);
67
68        let cache = LruCache::with_expiry_duration(*CACHE_TIMEOUT);
69        let verified = Arc::new(RwLock::new(cache));
70
71        StateEntries {
72            request_id_allocator,
73            sync_manager,
74            verified,
75            state_roots,
76        }
77    }
78
79    #[inline]
80    pub fn print_stats(&self) {
81        debug!(
82            "state entry sync statistics: {:?}",
83            Statistics {
84                cached: self.verified.read().len(),
85                in_flight: self.sync_manager.num_in_flight(),
86                waiting: self.sync_manager.num_waiting(),
87            }
88        );
89    }
90
91    #[inline]
92    pub fn request_now(
93        &self, io: &dyn NetworkContext, epoch: u64, key: Vec<u8>,
94    ) -> impl Future<Output = Result<StateEntry>> {
95        let mut verified = self.verified.write();
96        let key = StateKey { epoch, key };
97
98        if !verified.contains_key(&key) {
99            let missing = std::iter::once(MissingStateEntry::new(key.clone()));
100
101            self.sync_manager.request_now(missing, |peer, keys| {
102                self.send_request(io, peer, keys)
103            });
104        }
105
106        verified
107            .entry(key.clone())
108            .or_insert(PendingItem::pending())
109            .clear_error();
110
111        FutureItem::new(key, self.verified.clone())
112            .map(|res| res.map_err(|e| e.into()))
113    }
114
115    #[inline]
116    pub fn receive(
117        &self, peer: &NodeId, id: RequestId,
118        entries: impl Iterator<Item = StateEntryWithKey>,
119    ) -> Result<()> {
120        for StateEntryWithKey { key, entry, proof } in entries {
121            trace!(
122                "Validating state entry {:?} with key {:?} and proof {:?}",
123                entry,
124                key,
125                proof
126            );
127
128            match self.sync_manager.check_if_requested(peer, id, &key)? {
129                None => continue,
130                Some(_) => self.validate_and_store(key, entry, proof)?,
131            };
132        }
133
134        Ok(())
135    }
136
137    #[inline]
138    pub fn validate_and_store(
139        &self, key: StateKey, entry: Option<Vec<u8>>, proof: StateEntryProof,
140    ) -> Result<()> {
141        // validate state entry
142        if let Err(e) =
143            self.validate_state_entry(key.epoch, &key.key, &entry, proof)
144        {
145            // forward error to both rpc caller(s) and sync handler
146            // so we need to make it clonable
147            let e = ClonableError::from(e);
148
149            self.verified
150                .write()
151                .entry(key.clone())
152                .or_insert(PendingItem::pending())
153                .set_error(e.clone());
154
155            bail!(e);
156        }
157
158        // store state entry by state key
159        self.verified
160            .write()
161            .entry(key.clone())
162            .or_insert(PendingItem::pending())
163            .set(entry);
164
165        self.sync_manager.remove_in_flight(&key);
166
167        Ok(())
168    }
169
170    #[inline]
171    pub fn clean_up(&self) {
172        // remove timeout in-flight requests
173        let timeout = *STATE_ENTRY_REQUEST_TIMEOUT;
174        let entries = self.sync_manager.remove_timeout_requests(timeout);
175        trace!("Timeout state-entries ({}): {:?}", entries.len(), entries);
176        self.sync_manager.insert_waiting(entries.into_iter());
177
178        // trigger cache cleanup
179        self.verified.write().get(&Default::default());
180    }
181
182    #[inline]
183    fn send_request(
184        &self, io: &dyn NetworkContext, peer: &NodeId, keys: Vec<StateKey>,
185    ) -> Result<Option<RequestId>> {
186        if keys.is_empty() {
187            return Ok(None);
188        }
189
190        let request_id = self.request_id_allocator.next();
191
192        trace!(
193            "send_request GetStateEntries peer={:?} id={:?} keys={:?}",
194            peer,
195            request_id,
196            keys
197        );
198
199        let msg: Box<dyn Message> =
200            Box::new(GetStateEntries { request_id, keys });
201
202        msg.send(io, peer)?;
203        Ok(Some(request_id))
204    }
205
206    #[inline]
207    pub fn sync(&self, io: &dyn NetworkContext) {
208        self.sync_manager.sync(
209            MAX_STATE_ENTRIES_IN_FLIGHT,
210            STATE_ENTRY_REQUEST_BATCH_SIZE,
211            |peer, keys| self.send_request(io, peer, keys),
212        );
213    }
214
215    #[inline]
216    fn validate_state_entry(
217        &self, epoch: u64, key: &Vec<u8>, value: &Option<Vec<u8>>,
218        proof: StateEntryProof,
219    ) -> Result<()> {
220        // validate state root
221        let state_root = proof.state_root;
222
223        self.state_roots
224            .validate_state_root(epoch, &state_root)
225            .map_err(|e| Error::InvalidStateProof {
226                epoch,
227                key: key.clone(),
228                value: value.clone(),
229                reason: "Validation of current state root failed",
230                source: Some(Box::new(e)),
231            })?;
232
233        // validate previous state root
234        let maybe_prev_root = proof.prev_snapshot_state_root;
235
236        self.state_roots
237            .validate_prev_snapshot_state_root(epoch, &maybe_prev_root)
238            .map_err(|e| Error::InvalidStateProof {
239                epoch,
240                key: key.clone(),
241                value: value.clone(),
242                reason: "Validation of previous state root failed",
243                source: Some(Box::new(e)),
244            })?;
245
246        // construct padding
247        let maybe_intermediate_padding = maybe_prev_root.map(|root| {
248            StorageKeyWithSpace::delta_mpt_padding(
249                &root.snapshot_root,
250                &root.intermediate_delta_root,
251            )
252        });
253
254        // validate state entry
255        if !proof.state_proof.is_valid_kv(
256            key,
257            value.as_ref().map(|v| &**v),
258            state_root,
259            maybe_intermediate_padding,
260        ) {
261            bail!(Error::InvalidStateProof {
262                epoch,
263                key: key.clone(),
264                value: value.clone(),
265                reason: "Validation of merkle proof failed",
266                source: None
267            });
268        }
269
270        Ok(())
271    }
272}