1use crate::{
2 impls::errors::*, state::StateTrait, MptKeyValue, NodeMerkleProof,
3 StateProof, StorageStateTraitExt,
4};
5use cfx_internal_common::StateRootWithAuxInfo;
6use cfx_types::Space;
7use parking_lot::Mutex;
8use primitives::{
9 EpochId, NodeMerkleTriplet, StaticBool, StorageKey, StorageKeyWithSpace,
10};
11use std::{
12 sync::mpsc::{channel, Sender},
13 thread::{self, JoinHandle},
14};
15
16pub struct ReplicatedState<Main> {
17 state: Main,
18 replication_handler: ReplicationHandler,
19}
20
21pub trait StateFilter: Sync + Send {
22 fn keep_key(&self, _key: &StorageKeyWithSpace) -> bool;
23}
24
25impl StateFilter for Space {
26 fn keep_key(&self, key: &StorageKeyWithSpace) -> bool { key.space == *self }
27}
28
29impl<Main: StateTrait> ReplicatedState<Main> {
30 pub fn new<Replicate: StateTrait + Send + 'static>(
31 main_state: Main, replicated_state: Replicate,
32 filter: Option<Box<dyn StateFilter>>,
33 ) -> ReplicatedState<Main> {
34 let replication_handler =
35 ReplicationHandler::new(replicated_state, filter);
36 Self {
37 state: main_state,
38 replication_handler,
39 }
40 }
41}
42
43struct ReplicationHandler {
44 filter: Option<Box<dyn StateFilter>>,
45 op_sender: Mutex<Sender<StateOperation>>,
47 thread_handle: Option<JoinHandle<Result<()>>>,
48}
49
50impl ReplicationHandler {
51 fn new<Replicate: StateTrait + Send + 'static>(
52 mut replicated_state: Replicate, filter: Option<Box<dyn StateFilter>>,
53 ) -> ReplicationHandler {
54 let (op_tx, op_rx) = channel();
55 let thread_handle = thread::Builder::new()
56 .name("state_replication".into())
57 .spawn(move || {
58 for op in op_rx {
59 trace!("replicated_state: op={:?}", op);
60 let err = match op {
61 StateOperation::Set { access_key, value } => {
62 replicated_state
63 .set(access_key.as_storage_key(), value)
64 .err()
65 }
66 StateOperation::Delete { access_key } => {
67 replicated_state
68 .delete(access_key.as_storage_key())
69 .err()
70 }
71 StateOperation::DeleteAll { access_key_prefix } => {
72 replicated_state
73 .delete_all(access_key_prefix.as_storage_key())
74 .err()
75 }
76 StateOperation::ComputeStateRoot => {
77 replicated_state.compute_state_root().err()
78 }
79 StateOperation::Commit { epoch_id } => {
80 return replicated_state
81 .commit(epoch_id)
82 .map(|_| ());
83 }
84 };
85 if let Some(e) = err {
86 error!("StateReplication Error: err={:?}", e);
87 return Err(e);
88 }
89 }
90 Ok(())
91 })
92 .expect("spawn error");
93 Self {
94 filter,
95 op_sender: Mutex::new(op_tx),
96 thread_handle: Some(thread_handle),
97 }
98 }
99
100 fn send_op(&self, op: StateOperation) {
101 if let Some(filter) = &self.filter {
102 if let Some(key) = op.get_key() {
103 if !filter.keep_key(&key) {
104 return;
106 }
107 }
108 }
109 if let Err(e) = self.op_sender.lock().send(op) {
110 error!("send_op: err={:?}", e);
111 }
112 }
113}
114
115#[derive(Debug)]
116enum StateOperation {
117 Set {
118 access_key: OwnedStorageKeyWithSpace,
119 value: Box<[u8]>,
120 },
121 Delete {
122 access_key: OwnedStorageKeyWithSpace,
123 },
124 DeleteAll {
125 access_key_prefix: OwnedStorageKeyWithSpace,
126 },
127 ComputeStateRoot,
128 Commit {
129 epoch_id: EpochId,
130 },
131}
132
133impl StateOperation {
134 fn get_key(&self) -> Option<StorageKeyWithSpace<'_>> {
135 match self {
136 StateOperation::Set { access_key, .. }
137 | StateOperation::Delete { access_key, .. }
138 | StateOperation::DeleteAll {
139 access_key_prefix: access_key,
140 ..
141 } => Some(access_key.as_storage_key()),
142 StateOperation::ComputeStateRoot
143 | StateOperation::Commit { .. } => None,
144 }
145 }
146}
147
148#[derive(Debug)]
149enum OwnedStorageKey {
150 AccountKey(Vec<u8>),
151 StorageRootKey(Vec<u8>),
152 StorageKey {
153 address_bytes: Vec<u8>,
154 storage_key: Vec<u8>,
155 },
156 CodeRootKey(Vec<u8>),
157 CodeKey {
158 address_bytes: Vec<u8>,
159 code_hash_bytes: Vec<u8>,
160 },
161 DepositListKey(Vec<u8>),
162 VoteListKey(Vec<u8>),
163 EmptyKey,
164 AddressPrefixKey(Vec<u8>),
165}
166
167impl OwnedStorageKey {
168 fn as_storage_key(&self) -> StorageKey<'_> {
169 match &self {
170 OwnedStorageKey::AccountKey(k) => {
171 StorageKey::AccountKey(k.as_slice())
172 }
173 OwnedStorageKey::StorageRootKey(k) => {
174 StorageKey::StorageRootKey(k.as_slice())
175 }
176 OwnedStorageKey::StorageKey {
177 address_bytes,
178 storage_key,
179 } => StorageKey::StorageKey {
180 address_bytes: address_bytes.as_slice(),
181 storage_key: &storage_key,
182 },
183 OwnedStorageKey::CodeRootKey(k) => {
184 StorageKey::CodeRootKey(k.as_slice())
185 }
186 OwnedStorageKey::CodeKey {
187 address_bytes,
188 code_hash_bytes,
189 } => StorageKey::CodeKey {
190 address_bytes: &address_bytes,
191 code_hash_bytes: &code_hash_bytes,
192 },
193 OwnedStorageKey::DepositListKey(k) => {
194 StorageKey::DepositListKey(k.as_slice())
195 }
196 OwnedStorageKey::VoteListKey(k) => {
197 StorageKey::VoteListKey(k.as_slice())
198 }
199 OwnedStorageKey::EmptyKey => StorageKey::EmptyKey,
200 OwnedStorageKey::AddressPrefixKey(k) => {
201 StorageKey::AddressPrefixKey(k.as_slice())
202 }
203 }
204 }
205}
206
207#[derive(Debug)]
208struct OwnedStorageKeyWithSpace {
209 pub key: OwnedStorageKey,
210 pub space: Space,
211}
212
213impl OwnedStorageKeyWithSpace {
214 fn as_storage_key(&self) -> StorageKeyWithSpace<'_> {
215 StorageKeyWithSpace {
216 key: self.key.as_storage_key(),
217 space: self.space,
218 }
219 }
220}
221
222impl<'a> From<StorageKey<'a>> for OwnedStorageKey {
223 fn from(ref_key: StorageKey<'a>) -> Self {
224 match ref_key {
225 StorageKey::AccountKey(k) => {
226 OwnedStorageKey::AccountKey(k.to_vec())
227 }
228 StorageKey::StorageRootKey(k) => {
229 OwnedStorageKey::StorageRootKey(k.to_vec())
230 }
231 StorageKey::StorageKey {
232 address_bytes,
233 storage_key,
234 } => OwnedStorageKey::StorageKey {
235 address_bytes: address_bytes.to_vec(),
236 storage_key: storage_key.to_vec(),
237 },
238 StorageKey::CodeRootKey(k) => {
239 OwnedStorageKey::CodeRootKey(k.to_vec())
240 }
241 StorageKey::CodeKey {
242 address_bytes,
243 code_hash_bytes,
244 } => OwnedStorageKey::CodeKey {
245 address_bytes: address_bytes.to_vec(),
246 code_hash_bytes: code_hash_bytes.to_vec(),
247 },
248 StorageKey::DepositListKey(k) => {
249 OwnedStorageKey::DepositListKey(k.to_vec())
250 }
251 StorageKey::VoteListKey(k) => {
252 OwnedStorageKey::VoteListKey(k.to_vec())
253 }
254 StorageKey::EmptyKey => OwnedStorageKey::EmptyKey,
255 StorageKey::AddressPrefixKey(k) => {
256 OwnedStorageKey::AddressPrefixKey(k.to_vec())
257 }
258 }
259 }
260}
261
262impl<'a> From<StorageKeyWithSpace<'a>> for OwnedStorageKeyWithSpace {
263 fn from(ref_key: StorageKeyWithSpace<'a>) -> Self {
264 Self {
265 key: ref_key.key.into(),
266 space: ref_key.space,
267 }
268 }
269}
270
271impl<Main: StateTrait> StateTrait for ReplicatedState<Main> {
272 fn get(
273 &self, access_key: StorageKeyWithSpace,
274 ) -> Result<Option<Box<[u8]>>> {
275 self.state.get(access_key)
276 }
277
278 fn set(
279 &mut self, access_key: StorageKeyWithSpace, value: Box<[u8]>,
280 ) -> Result<()> {
281 self.replication_handler.send_op(StateOperation::Set {
282 access_key: access_key.into(),
283 value: value.clone(),
284 });
285 self.state.set(access_key, value)
286 }
287
288 fn delete(&mut self, access_key: StorageKeyWithSpace) -> Result<()> {
289 self.replication_handler.send_op(StateOperation::Delete {
290 access_key: access_key.into(),
291 });
292 self.state.delete(access_key)
293 }
294
295 fn delete_test_only(
296 &mut self, _access_key: StorageKeyWithSpace,
297 ) -> Result<Option<Box<[u8]>>> {
298 todo!()
299 }
300
301 fn delete_all(
302 &mut self, access_key_prefix: StorageKeyWithSpace,
303 ) -> Result<Option<Vec<MptKeyValue>>> {
304 self.replication_handler.send_op(StateOperation::DeleteAll {
305 access_key_prefix: access_key_prefix.into(),
306 });
307 self.state.delete_all(access_key_prefix)
308 }
309
310 fn read_all(
311 &mut self, access_key_prefix: StorageKeyWithSpace,
312 ) -> Result<Option<Vec<MptKeyValue>>> {
313 self.state.read_all(access_key_prefix)
314 }
315
316 fn read_all_with_callback(
317 &mut self, access_key_prefix: StorageKeyWithSpace,
318 callback: &mut dyn FnMut(MptKeyValue), only_account_key: bool,
319 ) -> Result<()> {
320 self.state.read_all_with_callback(
321 access_key_prefix,
322 callback,
323 only_account_key,
324 )
325 }
326
327 fn compute_state_root(&mut self) -> Result<StateRootWithAuxInfo> {
328 self.replication_handler
329 .send_op(StateOperation::ComputeStateRoot);
330 self.state.compute_state_root()
331 }
332
333 fn get_state_root(&self) -> Result<StateRootWithAuxInfo> {
334 self.state.get_state_root()
335 }
336
337 fn commit(&mut self, epoch_id: EpochId) -> Result<StateRootWithAuxInfo> {
338 let r = self.state.commit(epoch_id);
339 self.replication_handler
340 .send_op(StateOperation::Commit { epoch_id });
341 self.replication_handler
343 .thread_handle
344 .take()
345 .expect("only commit once")
346 .join()
347 .expect("ReplicationHandler thread join error")?;
348 r
349 }
350}
351
352impl<Main: StorageStateTraitExt> StorageStateTraitExt
353 for ReplicatedState<Main>
354{
355 fn get_with_proof(
356 &self, access_key: StorageKeyWithSpace,
357 ) -> Result<(Option<Box<[u8]>>, StateProof)> {
358 self.state.get_with_proof(access_key)
359 }
360
361 fn get_node_merkle_all_versions<WithProof: StaticBool>(
362 &self, access_key: StorageKeyWithSpace,
363 ) -> Result<(NodeMerkleTriplet, NodeMerkleProof)> {
364 self.state
365 .get_node_merkle_all_versions::<WithProof>(access_key)
366 }
367}