cfx_storage/impls/delta_mpt/
delta_mpt_open_db_manager.rs

1impl CacheIndexTrait for DeltaMptId {}
2
3struct CacheUtil {
4    cache_data: HashMap<
5        DeltaMptId,
6        (Arc<dyn DeltaDbTrait + Send + Sync>, LRUHandle<u32>),
7    >,
8}
9
10impl CacheStoreUtil for CacheUtil {
11    type CacheAlgoData = LRUHandle<u32>;
12    type ElementIndex = DeltaMptId;
13
14    fn get(&self, element_index: DeltaMptId) -> LRUHandle<u32> {
15        match self.cache_data.get(&element_index) {
16            Some(tuple) => tuple.1,
17            None => {
18                unreachable!();
19            }
20        }
21    }
22
23    fn set(&mut self, element_index: DeltaMptId, algo_data: &LRUHandle<u32>) {
24        match self.cache_data.get_mut(&element_index) {
25            Some(tuple) => tuple.1 = *algo_data,
26            None => {
27                unreachable!();
28            }
29        }
30    }
31}
32
33#[derive(Clone)]
34pub struct ArcDeltaDbWrapper {
35    // inner will always be Some() before drop
36    pub inner: Option<Arc<dyn DeltaDbTrait>>,
37    pub lru: Option<Weak<Mutex<dyn OnDemandOpenDeltaDbInnerTrait>>>,
38    pub mpt_id: DeltaMptId,
39}
40
41impl ArcDeltaDbWrapper {
42    pub fn db_ref(&self) -> &dyn DeltaDbTrait {
43        self.inner.as_ref().unwrap().as_ref()
44    }
45}
46
47impl Deref for ArcDeltaDbWrapper {
48    type Target = dyn DeltaDbTrait;
49
50    fn deref(&self) -> &Self::Target { self.inner.as_ref().unwrap().as_ref() }
51}
52
53impl Drop for ArcDeltaDbWrapper {
54    fn drop(&mut self) {
55        if self.lru.is_none() {
56            // TODO: This is for SingleMptState.
57            return;
58        }
59        Weak::upgrade(self.lru.as_ref().unwrap()).map(|lru| {
60            let mut lru_lock = lru.lock();
61            let maybe_arc_db = self.inner.take();
62            let need_release =
63                Arc::strong_count(maybe_arc_db.as_ref().unwrap()) == 2;
64            drop(maybe_arc_db);
65            if need_release {
66                lru_lock.release(self.mpt_id, false);
67            }
68        });
69    }
70}
71
72impl KeyValueDbTypes for ArcDeltaDbWrapper {
73    type ValueType = Box<[u8]>;
74}
75
76impl KeyValueDbTraitRead for ArcDeltaDbWrapper {
77    fn get(&self, key: &[u8]) -> Result<Option<Self::ValueType>> {
78        (**self).get(key)
79    }
80}
81
82mark_kvdb_multi_reader!(ArcDeltaDbWrapper);
83
84pub trait OnDemandOpenDeltaDbInnerTrait: Send + Sync {
85    fn open(&mut self, mpt_id: DeltaMptId) -> Result<ArcDeltaDbWrapper>;
86    fn create(
87        &mut self, snapshot_epoch_id: &EpochId, mpt_id: DeltaMptId,
88        opened_db: Option<Arc<dyn DeltaDbTrait + Send + Sync>>,
89    ) -> Result<ArcDeltaDbWrapper>;
90    fn release(&mut self, mpt_id: DeltaMptId, destroy: bool);
91}
92
93// TODO: Allow pinning the DeltaDb for the latest state.
94pub trait OpenableOnDemandOpenDeltaDbTrait: Send + Sync {
95    fn open(&self, mpt_id: DeltaMptId) -> Result<ArcDeltaDbWrapper>;
96}
97
98pub struct OpenDeltaDbLru<DeltaDbManager: DeltaDbManagerTrait> {
99    inner: Arc<Mutex<dyn OnDemandOpenDeltaDbInnerTrait>>,
100    phantom: PhantomData<DeltaDbManager>,
101}
102
103impl<T: 'static + DeltaDbManagerTrait + Send + Sync> OpenDeltaDbLru<T>
104where T::DeltaDb: 'static + Send + Sync + DeltaDbTrait
105{
106    pub fn new(delta_db_manager: Arc<T>, capacity: u32) -> Result<Self> {
107        Ok(Self {
108            inner: Arc::new(Mutex::new(OpenDeltaDbLruInner::new(
109                delta_db_manager,
110                capacity,
111            )?)),
112            phantom: PhantomData,
113        })
114    }
115
116    pub fn create(
117        &self, snapshot_epoch_id: &EpochId, mpt_id: DeltaMptId,
118    ) -> Result<ArcDeltaDbWrapper> {
119        let mut arc_db = self
120            .inner
121            .lock()
122            .create(snapshot_epoch_id, mpt_id, None)
123            .unwrap();
124        arc_db.lru = Some(Arc::downgrade(&self.inner));
125        Ok(arc_db)
126    }
127
128    pub fn import(
129        &self, snapshot_epoch_id: &EpochId, mpt_id: DeltaMptId,
130        opened_db: T::DeltaDb,
131    ) -> Result<ArcDeltaDbWrapper> {
132        let mut arc_db = self
133            .inner
134            .lock()
135            .create(snapshot_epoch_id, mpt_id, Some(Arc::new(opened_db)))
136            .unwrap();
137        arc_db.lru = Some(Arc::downgrade(&self.inner));
138        Ok(arc_db)
139    }
140
141    pub fn release(&self, mpt_id: DeltaMptId, destroy: bool) {
142        self.inner.lock().release(mpt_id, destroy);
143    }
144}
145
146impl<T: 'static + DeltaDbManagerTrait + Send + Sync>
147    OpenableOnDemandOpenDeltaDbTrait for OpenDeltaDbLru<T>
148where T::DeltaDb: 'static + Send + Sync + DeltaDbTrait
149{
150    fn open(&self, mpt_id: DeltaMptId) -> Result<ArcDeltaDbWrapper> {
151        let mut arc_db = self.inner.lock().open(mpt_id).unwrap();
152        arc_db.lru = Some(Arc::downgrade(&self.inner));
153        Ok(arc_db)
154    }
155}
156
157pub struct OpenDeltaDbLruInner<DeltaDbManager: DeltaDbManagerTrait> {
158    delta_db_manager: Arc<DeltaDbManager>,
159    mpt_id_to_snapshot_epoch_id: HashMap<DeltaMptId, EpochId>,
160    cache_util: CacheUtil,
161    lru: LRU<u32, DeltaMptId>,
162}
163
164impl<T: DeltaDbManagerTrait + Send + Sync> OpenDeltaDbLruInner<T>
165where T::DeltaDb: 'static + Send + Sync + DeltaDbTrait
166{
167    pub fn new(delta_db_manager: Arc<T>, capacity: u32) -> Result<Self> {
168        Ok(Self {
169            delta_db_manager,
170            mpt_id_to_snapshot_epoch_id: HashMap::new(),
171            cache_util: CacheUtil {
172                cache_data: HashMap::new(),
173            },
174            lru: LRU::<u32, DeltaMptId>::new(capacity),
175        })
176    }
177
178    fn lru_access(&mut self, mpt_id: DeltaMptId) {
179        match self.lru.access(mpt_id, &mut self.cache_util) {
180            CacheAccessResult::MissReplaced {
181                evicted: lru_evicted_keys,
182                evicted_keep_cache_algo_data: _,
183            } => {
184                // It's known to contain exactly one item.
185                let lru_evicted = unsafe { lru_evicted_keys.get_unchecked(0) };
186                self.release(*lru_evicted, false);
187            }
188            _ => {}
189        }
190    }
191}
192
193impl<T: DeltaDbManagerTrait + Send + Sync> OnDemandOpenDeltaDbInnerTrait
194    for OpenDeltaDbLruInner<T>
195where T::DeltaDb: 'static + Send + Sync + DeltaDbTrait
196{
197    fn open(&mut self, mpt_id: DeltaMptId) -> Result<ArcDeltaDbWrapper> {
198        match self.cache_util.cache_data.get(&mpt_id) {
199            Some(tuple) => {
200                let arc_db = tuple.0.clone();
201                self.lru_access(mpt_id);
202                Ok(ArcDeltaDbWrapper {
203                    inner: Some(arc_db),
204                    lru: None,
205                    mpt_id,
206                })
207            }
208            None => {
209                let snapshot_epoch_id =
210                    self.mpt_id_to_snapshot_epoch_id.get(&mpt_id).unwrap();
211                let arc_db = Arc::new(
212                    self.delta_db_manager
213                        .get_delta_db(
214                            &self
215                                .delta_db_manager
216                                .get_delta_db_name(snapshot_epoch_id),
217                        )?
218                        .unwrap(),
219                );
220                self.cache_util.cache_data.insert(
221                    mpt_id,
222                    (arc_db.clone(), LRUHandle::<u32>::default()),
223                );
224                self.lru_access(mpt_id);
225                Ok(ArcDeltaDbWrapper {
226                    inner: Some(arc_db),
227                    lru: None,
228                    mpt_id,
229                })
230            }
231        }
232    }
233
234    fn create(
235        &mut self, snapshot_epoch_id: &EpochId, mpt_id: DeltaMptId,
236        opened_db: Option<Arc<dyn DeltaDbTrait + Send + Sync>>,
237    ) -> Result<ArcDeltaDbWrapper> {
238        match self.mpt_id_to_snapshot_epoch_id.get(&mpt_id) {
239            Some(epoch_id) => {
240                debug_assert!(snapshot_epoch_id == epoch_id);
241                match opened_db {
242                    Some(_arc) => unreachable!(),
243                    None => self.open(mpt_id),
244                }
245            }
246            None => {
247                let arc_db = match opened_db {
248                    Some(arc) => arc,
249                    None => Arc::new(
250                        self.delta_db_manager.new_empty_delta_db(
251                            &self
252                                .delta_db_manager
253                                .get_delta_db_name(snapshot_epoch_id),
254                        )?,
255                    ),
256                };
257                self.mpt_id_to_snapshot_epoch_id
258                    .insert(mpt_id, snapshot_epoch_id.clone());
259                self.cache_util.cache_data.insert(
260                    mpt_id,
261                    (arc_db.clone(), LRUHandle::<u32>::default()),
262                );
263                self.lru_access(mpt_id);
264                Ok(ArcDeltaDbWrapper {
265                    inner: Some(arc_db),
266                    lru: None,
267                    mpt_id,
268                })
269            }
270        }
271    }
272
273    // Release function is to close opened dbs which are not in lru and
274    // are not using. With destroy = true, it will delete db in disk.
275
276    // Lru will hold arc db which is_hit() == true, so if no one holds
277    // related ArcDeltaDbWrapper, ref count of arc is always 1. And
278    // for evicted arc db, lru will immediately drop it only if ref count
279    // == 1, to avoid double open db error. Otherwise, lru will still
280    // hold evicted arc db until last drop of related ArcDeltaDbWrapper.
281    fn release(&mut self, mpt_id: DeltaMptId, destroy: bool) {
282        match self.cache_util.cache_data.get(&mpt_id) {
283            Some(tuple) => {
284                let strong_count = Arc::strong_count(&tuple.0);
285                if destroy {
286                    debug_assert!(strong_count == 1);
287                }
288                if destroy || (strong_count == 1 && !tuple.1.is_hit()) {
289                    // If is_hit() == false, lru.delete will do nothing
290                    self.lru.delete(mpt_id, &mut self.cache_util);
291                    self.cache_util.cache_data.remove(&mpt_id);
292                }
293            }
294            None => {}
295        }
296        if destroy {
297            self.mpt_id_to_snapshot_epoch_id.remove(&mpt_id);
298        }
299    }
300}
301
302use crate::{
303    impls::{
304        delta_mpt::{
305            cache::algorithm::{
306                lru::{LRUHandle, LRU},
307                CacheAccessResult, CacheAlgorithm, CacheIndexTrait,
308                CacheStoreUtil,
309            },
310            node_ref_map::DeltaMptId,
311        },
312        errors::*,
313    },
314    storage_db::{key_value_db::*, DeltaDbManagerTrait, DeltaDbTrait},
315};
316use parking_lot::Mutex;
317use primitives::EpochId;
318use std::{
319    collections::HashMap,
320    marker::PhantomData,
321    ops::Deref,
322    sync::{Arc, Weak},
323};