cfx_storage/impls/
replicated_state.rs

1use crate::{
2    impls::errors::*, state::StateTrait, MptKeyValue, NodeMerkleProof,
3    StateProof, StorageStateTraitExt,
4};
5use cfx_internal_common::StateRootWithAuxInfo;
6use cfx_types::Space;
7use parking_lot::Mutex;
8use primitives::{
9    EpochId, NodeMerkleTriplet, StaticBool, StorageKey, StorageKeyWithSpace,
10};
11use std::{
12    sync::mpsc::{channel, Sender},
13    thread::{self, JoinHandle},
14};
15
16pub struct ReplicatedState<Main> {
17    state: Main,
18    replication_handler: ReplicationHandler,
19}
20
21pub trait StateFilter: Sync + Send {
22    fn keep_key(&self, _key: &StorageKeyWithSpace) -> bool;
23}
24
25impl StateFilter for Space {
26    fn keep_key(&self, key: &StorageKeyWithSpace) -> bool { key.space == *self }
27}
28
29impl<Main: StateTrait> ReplicatedState<Main> {
30    pub fn new<Replicate: StateTrait + Send + 'static>(
31        main_state: Main, replicated_state: Replicate,
32        filter: Option<Box<dyn StateFilter>>,
33    ) -> ReplicatedState<Main> {
34        let replication_handler =
35            ReplicationHandler::new(replicated_state, filter);
36        Self {
37            state: main_state,
38            replication_handler,
39        }
40    }
41}
42
43struct ReplicationHandler {
44    filter: Option<Box<dyn StateFilter>>,
45    // We need `Mutex` to make the struct `Sync`.
46    op_sender: Mutex<Sender<StateOperation>>,
47    thread_handle: Option<JoinHandle<Result<()>>>,
48}
49
50impl ReplicationHandler {
51    fn new<Replicate: StateTrait + Send + 'static>(
52        mut replicated_state: Replicate, filter: Option<Box<dyn StateFilter>>,
53    ) -> ReplicationHandler {
54        let (op_tx, op_rx) = channel();
55        let thread_handle = thread::Builder::new()
56            .name("state_replication".into())
57            .spawn(move || {
58                for op in op_rx {
59                    trace!("replicated_state: op={:?}", op);
60                    let err = match op {
61                        StateOperation::Set { access_key, value } => {
62                            replicated_state
63                                .set(access_key.as_storage_key(), value)
64                                .err()
65                        }
66                        StateOperation::Delete { access_key } => {
67                            replicated_state
68                                .delete(access_key.as_storage_key())
69                                .err()
70                        }
71                        StateOperation::DeleteAll { access_key_prefix } => {
72                            replicated_state
73                                .delete_all(access_key_prefix.as_storage_key())
74                                .err()
75                        }
76                        StateOperation::ComputeStateRoot => {
77                            replicated_state.compute_state_root().err()
78                        }
79                        StateOperation::Commit { epoch_id } => {
80                            return replicated_state
81                                .commit(epoch_id)
82                                .map(|_| ());
83                        }
84                    };
85                    if let Some(e) = err {
86                        error!("StateReplication Error: err={:?}", e);
87                        return Err(e);
88                    }
89                }
90                Ok(())
91            })
92            .expect("spawn error");
93        Self {
94            filter,
95            op_sender: Mutex::new(op_tx),
96            thread_handle: Some(thread_handle),
97        }
98    }
99
100    fn send_op(&self, op: StateOperation) {
101        if let Some(filter) = &self.filter {
102            if let Some(key) = op.get_key() {
103                if !filter.keep_key(&key) {
104                    // This key should not be stored in the replicated state.
105                    return;
106                }
107            }
108        }
109        if let Err(e) = self.op_sender.lock().send(op) {
110            error!("send_op: err={:?}", e);
111        }
112    }
113}
114
115#[derive(Debug)]
116enum StateOperation {
117    Set {
118        access_key: OwnedStorageKeyWithSpace,
119        value: Box<[u8]>,
120    },
121    Delete {
122        access_key: OwnedStorageKeyWithSpace,
123    },
124    DeleteAll {
125        access_key_prefix: OwnedStorageKeyWithSpace,
126    },
127    ComputeStateRoot,
128    Commit {
129        epoch_id: EpochId,
130    },
131}
132
133impl StateOperation {
134    fn get_key(&self) -> Option<StorageKeyWithSpace<'_>> {
135        match self {
136            StateOperation::Set { access_key, .. }
137            | StateOperation::Delete { access_key, .. }
138            | StateOperation::DeleteAll {
139                access_key_prefix: access_key,
140                ..
141            } => Some(access_key.as_storage_key()),
142            StateOperation::ComputeStateRoot
143            | StateOperation::Commit { .. } => None,
144        }
145    }
146}
147
148#[derive(Debug)]
149enum OwnedStorageKey {
150    AccountKey(Vec<u8>),
151    StorageRootKey(Vec<u8>),
152    StorageKey {
153        address_bytes: Vec<u8>,
154        storage_key: Vec<u8>,
155    },
156    CodeRootKey(Vec<u8>),
157    CodeKey {
158        address_bytes: Vec<u8>,
159        code_hash_bytes: Vec<u8>,
160    },
161    DepositListKey(Vec<u8>),
162    VoteListKey(Vec<u8>),
163    EmptyKey,
164    AddressPrefixKey(Vec<u8>),
165}
166
167impl OwnedStorageKey {
168    fn as_storage_key(&self) -> StorageKey<'_> {
169        match &self {
170            OwnedStorageKey::AccountKey(k) => {
171                StorageKey::AccountKey(k.as_slice())
172            }
173            OwnedStorageKey::StorageRootKey(k) => {
174                StorageKey::StorageRootKey(k.as_slice())
175            }
176            OwnedStorageKey::StorageKey {
177                address_bytes,
178                storage_key,
179            } => StorageKey::StorageKey {
180                address_bytes: address_bytes.as_slice(),
181                storage_key: &storage_key,
182            },
183            OwnedStorageKey::CodeRootKey(k) => {
184                StorageKey::CodeRootKey(k.as_slice())
185            }
186            OwnedStorageKey::CodeKey {
187                address_bytes,
188                code_hash_bytes,
189            } => StorageKey::CodeKey {
190                address_bytes: &address_bytes,
191                code_hash_bytes: &code_hash_bytes,
192            },
193            OwnedStorageKey::DepositListKey(k) => {
194                StorageKey::DepositListKey(k.as_slice())
195            }
196            OwnedStorageKey::VoteListKey(k) => {
197                StorageKey::VoteListKey(k.as_slice())
198            }
199            OwnedStorageKey::EmptyKey => StorageKey::EmptyKey,
200            OwnedStorageKey::AddressPrefixKey(k) => {
201                StorageKey::AddressPrefixKey(k.as_slice())
202            }
203        }
204    }
205}
206
207#[derive(Debug)]
208struct OwnedStorageKeyWithSpace {
209    pub key: OwnedStorageKey,
210    pub space: Space,
211}
212
213impl OwnedStorageKeyWithSpace {
214    fn as_storage_key(&self) -> StorageKeyWithSpace<'_> {
215        StorageKeyWithSpace {
216            key: self.key.as_storage_key(),
217            space: self.space,
218        }
219    }
220}
221
222impl<'a> From<StorageKey<'a>> for OwnedStorageKey {
223    fn from(ref_key: StorageKey<'a>) -> Self {
224        match ref_key {
225            StorageKey::AccountKey(k) => {
226                OwnedStorageKey::AccountKey(k.to_vec())
227            }
228            StorageKey::StorageRootKey(k) => {
229                OwnedStorageKey::StorageRootKey(k.to_vec())
230            }
231            StorageKey::StorageKey {
232                address_bytes,
233                storage_key,
234            } => OwnedStorageKey::StorageKey {
235                address_bytes: address_bytes.to_vec(),
236                storage_key: storage_key.to_vec(),
237            },
238            StorageKey::CodeRootKey(k) => {
239                OwnedStorageKey::CodeRootKey(k.to_vec())
240            }
241            StorageKey::CodeKey {
242                address_bytes,
243                code_hash_bytes,
244            } => OwnedStorageKey::CodeKey {
245                address_bytes: address_bytes.to_vec(),
246                code_hash_bytes: code_hash_bytes.to_vec(),
247            },
248            StorageKey::DepositListKey(k) => {
249                OwnedStorageKey::DepositListKey(k.to_vec())
250            }
251            StorageKey::VoteListKey(k) => {
252                OwnedStorageKey::VoteListKey(k.to_vec())
253            }
254            StorageKey::EmptyKey => OwnedStorageKey::EmptyKey,
255            StorageKey::AddressPrefixKey(k) => {
256                OwnedStorageKey::AddressPrefixKey(k.to_vec())
257            }
258        }
259    }
260}
261
262impl<'a> From<StorageKeyWithSpace<'a>> for OwnedStorageKeyWithSpace {
263    fn from(ref_key: StorageKeyWithSpace<'a>) -> Self {
264        Self {
265            key: ref_key.key.into(),
266            space: ref_key.space,
267        }
268    }
269}
270
271impl<Main: StateTrait> StateTrait for ReplicatedState<Main> {
272    fn get(
273        &self, access_key: StorageKeyWithSpace,
274    ) -> Result<Option<Box<[u8]>>> {
275        self.state.get(access_key)
276    }
277
278    fn set(
279        &mut self, access_key: StorageKeyWithSpace, value: Box<[u8]>,
280    ) -> Result<()> {
281        self.replication_handler.send_op(StateOperation::Set {
282            access_key: access_key.into(),
283            value: value.clone(),
284        });
285        self.state.set(access_key, value)
286    }
287
288    fn delete(&mut self, access_key: StorageKeyWithSpace) -> Result<()> {
289        self.replication_handler.send_op(StateOperation::Delete {
290            access_key: access_key.into(),
291        });
292        self.state.delete(access_key)
293    }
294
295    fn delete_test_only(
296        &mut self, _access_key: StorageKeyWithSpace,
297    ) -> Result<Option<Box<[u8]>>> {
298        todo!()
299    }
300
301    fn delete_all(
302        &mut self, access_key_prefix: StorageKeyWithSpace,
303    ) -> Result<Option<Vec<MptKeyValue>>> {
304        self.replication_handler.send_op(StateOperation::DeleteAll {
305            access_key_prefix: access_key_prefix.into(),
306        });
307        self.state.delete_all(access_key_prefix)
308    }
309
310    fn read_all(
311        &mut self, access_key_prefix: StorageKeyWithSpace,
312    ) -> Result<Option<Vec<MptKeyValue>>> {
313        self.state.read_all(access_key_prefix)
314    }
315
316    fn read_all_with_callback(
317        &mut self, access_key_prefix: StorageKeyWithSpace,
318        callback: &mut dyn FnMut(MptKeyValue), only_account_key: bool,
319    ) -> Result<()> {
320        self.state.read_all_with_callback(
321            access_key_prefix,
322            callback,
323            only_account_key,
324        )
325    }
326
327    fn compute_state_root(&mut self) -> Result<StateRootWithAuxInfo> {
328        self.replication_handler
329            .send_op(StateOperation::ComputeStateRoot);
330        self.state.compute_state_root()
331    }
332
333    fn get_state_root(&self) -> Result<StateRootWithAuxInfo> {
334        self.state.get_state_root()
335    }
336
337    fn commit(&mut self, epoch_id: EpochId) -> Result<StateRootWithAuxInfo> {
338        let r = self.state.commit(epoch_id);
339        self.replication_handler
340            .send_op(StateOperation::Commit { epoch_id });
341        // TODO(lpl): This can be probably delayed.
342        self.replication_handler
343            .thread_handle
344            .take()
345            .expect("only commit once")
346            .join()
347            .expect("ReplicationHandler thread join error")?;
348        r
349    }
350}
351
352impl<Main: StorageStateTraitExt> StorageStateTraitExt
353    for ReplicatedState<Main>
354{
355    fn get_with_proof(
356        &self, access_key: StorageKeyWithSpace,
357    ) -> Result<(Option<Box<[u8]>>, StateProof)> {
358        self.state.get_with_proof(access_key)
359    }
360
361    fn get_node_merkle_all_versions<WithProof: StaticBool>(
362        &self, access_key: StorageKeyWithSpace,
363    ) -> Result<(NodeMerkleTriplet, NodeMerkleProof)> {
364        self.state
365            .get_node_merkle_all_versions::<WithProof>(access_key)
366    }
367}