cfx_storage/impls/delta_mpt/
delta_mpt_open_db_manager.rs1impl CacheIndexTrait for DeltaMptId {}
2
3struct CacheUtil {
4 cache_data: HashMap<
5 DeltaMptId,
6 (Arc<dyn DeltaDbTrait + Send + Sync>, LRUHandle<u32>),
7 >,
8}
9
10impl CacheStoreUtil for CacheUtil {
11 type CacheAlgoData = LRUHandle<u32>;
12 type ElementIndex = DeltaMptId;
13
14 fn get(&self, element_index: DeltaMptId) -> LRUHandle<u32> {
15 match self.cache_data.get(&element_index) {
16 Some(tuple) => tuple.1,
17 None => {
18 unreachable!();
19 }
20 }
21 }
22
23 fn set(&mut self, element_index: DeltaMptId, algo_data: &LRUHandle<u32>) {
24 match self.cache_data.get_mut(&element_index) {
25 Some(tuple) => tuple.1 = *algo_data,
26 None => {
27 unreachable!();
28 }
29 }
30 }
31}
32
33#[derive(Clone)]
34pub struct ArcDeltaDbWrapper {
35 pub inner: Option<Arc<dyn DeltaDbTrait>>,
37 pub lru: Option<Weak<Mutex<dyn OnDemandOpenDeltaDbInnerTrait>>>,
38 pub mpt_id: DeltaMptId,
39}
40
41impl ArcDeltaDbWrapper {
42 pub fn db_ref(&self) -> &dyn DeltaDbTrait {
43 self.inner.as_ref().unwrap().as_ref()
44 }
45}
46
47impl Deref for ArcDeltaDbWrapper {
48 type Target = dyn DeltaDbTrait;
49
50 fn deref(&self) -> &Self::Target { self.inner.as_ref().unwrap().as_ref() }
51}
52
53impl Drop for ArcDeltaDbWrapper {
54 fn drop(&mut self) {
55 if self.lru.is_none() {
56 return;
58 }
59 Weak::upgrade(self.lru.as_ref().unwrap()).map(|lru| {
60 let mut lru_lock = lru.lock();
61 let maybe_arc_db = self.inner.take();
62 let need_release =
63 Arc::strong_count(maybe_arc_db.as_ref().unwrap()) == 2;
64 drop(maybe_arc_db);
65 if need_release {
66 lru_lock.release(self.mpt_id, false);
67 }
68 });
69 }
70}
71
72impl KeyValueDbTypes for ArcDeltaDbWrapper {
73 type ValueType = Box<[u8]>;
74}
75
76impl KeyValueDbTraitRead for ArcDeltaDbWrapper {
77 fn get(&self, key: &[u8]) -> Result<Option<Self::ValueType>> {
78 (**self).get(key)
79 }
80}
81
82mark_kvdb_multi_reader!(ArcDeltaDbWrapper);
83
84pub trait OnDemandOpenDeltaDbInnerTrait: Send + Sync {
85 fn open(&mut self, mpt_id: DeltaMptId) -> Result<ArcDeltaDbWrapper>;
86 fn create(
87 &mut self, snapshot_epoch_id: &EpochId, mpt_id: DeltaMptId,
88 opened_db: Option<Arc<dyn DeltaDbTrait + Send + Sync>>,
89 ) -> Result<ArcDeltaDbWrapper>;
90 fn release(&mut self, mpt_id: DeltaMptId, destroy: bool);
91}
92
93pub trait OpenableOnDemandOpenDeltaDbTrait: Send + Sync {
95 fn open(&self, mpt_id: DeltaMptId) -> Result<ArcDeltaDbWrapper>;
96}
97
98pub struct OpenDeltaDbLru<DeltaDbManager: DeltaDbManagerTrait> {
99 inner: Arc<Mutex<dyn OnDemandOpenDeltaDbInnerTrait>>,
100 phantom: PhantomData<DeltaDbManager>,
101}
102
103impl<T: 'static + DeltaDbManagerTrait + Send + Sync> OpenDeltaDbLru<T>
104where T::DeltaDb: 'static + Send + Sync + DeltaDbTrait
105{
106 pub fn new(delta_db_manager: Arc<T>, capacity: u32) -> Result<Self> {
107 Ok(Self {
108 inner: Arc::new(Mutex::new(OpenDeltaDbLruInner::new(
109 delta_db_manager,
110 capacity,
111 )?)),
112 phantom: PhantomData,
113 })
114 }
115
116 pub fn create(
117 &self, snapshot_epoch_id: &EpochId, mpt_id: DeltaMptId,
118 ) -> Result<ArcDeltaDbWrapper> {
119 let mut arc_db = self
120 .inner
121 .lock()
122 .create(snapshot_epoch_id, mpt_id, None)
123 .unwrap();
124 arc_db.lru = Some(Arc::downgrade(&self.inner));
125 Ok(arc_db)
126 }
127
128 pub fn import(
129 &self, snapshot_epoch_id: &EpochId, mpt_id: DeltaMptId,
130 opened_db: T::DeltaDb,
131 ) -> Result<ArcDeltaDbWrapper> {
132 let mut arc_db = self
133 .inner
134 .lock()
135 .create(snapshot_epoch_id, mpt_id, Some(Arc::new(opened_db)))
136 .unwrap();
137 arc_db.lru = Some(Arc::downgrade(&self.inner));
138 Ok(arc_db)
139 }
140
141 pub fn release(&self, mpt_id: DeltaMptId, destroy: bool) {
142 self.inner.lock().release(mpt_id, destroy);
143 }
144}
145
146impl<T: 'static + DeltaDbManagerTrait + Send + Sync>
147 OpenableOnDemandOpenDeltaDbTrait for OpenDeltaDbLru<T>
148where T::DeltaDb: 'static + Send + Sync + DeltaDbTrait
149{
150 fn open(&self, mpt_id: DeltaMptId) -> Result<ArcDeltaDbWrapper> {
151 let mut arc_db = self.inner.lock().open(mpt_id).unwrap();
152 arc_db.lru = Some(Arc::downgrade(&self.inner));
153 Ok(arc_db)
154 }
155}
156
157pub struct OpenDeltaDbLruInner<DeltaDbManager: DeltaDbManagerTrait> {
158 delta_db_manager: Arc<DeltaDbManager>,
159 mpt_id_to_snapshot_epoch_id: HashMap<DeltaMptId, EpochId>,
160 cache_util: CacheUtil,
161 lru: LRU<u32, DeltaMptId>,
162}
163
164impl<T: DeltaDbManagerTrait + Send + Sync> OpenDeltaDbLruInner<T>
165where T::DeltaDb: 'static + Send + Sync + DeltaDbTrait
166{
167 pub fn new(delta_db_manager: Arc<T>, capacity: u32) -> Result<Self> {
168 Ok(Self {
169 delta_db_manager,
170 mpt_id_to_snapshot_epoch_id: HashMap::new(),
171 cache_util: CacheUtil {
172 cache_data: HashMap::new(),
173 },
174 lru: LRU::<u32, DeltaMptId>::new(capacity),
175 })
176 }
177
178 fn lru_access(&mut self, mpt_id: DeltaMptId) {
179 match self.lru.access(mpt_id, &mut self.cache_util) {
180 CacheAccessResult::MissReplaced {
181 evicted: lru_evicted_keys,
182 evicted_keep_cache_algo_data: _,
183 } => {
184 let lru_evicted = unsafe { lru_evicted_keys.get_unchecked(0) };
186 self.release(*lru_evicted, false);
187 }
188 _ => {}
189 }
190 }
191}
192
193impl<T: DeltaDbManagerTrait + Send + Sync> OnDemandOpenDeltaDbInnerTrait
194 for OpenDeltaDbLruInner<T>
195where T::DeltaDb: 'static + Send + Sync + DeltaDbTrait
196{
197 fn open(&mut self, mpt_id: DeltaMptId) -> Result<ArcDeltaDbWrapper> {
198 match self.cache_util.cache_data.get(&mpt_id) {
199 Some(tuple) => {
200 let arc_db = tuple.0.clone();
201 self.lru_access(mpt_id);
202 Ok(ArcDeltaDbWrapper {
203 inner: Some(arc_db),
204 lru: None,
205 mpt_id,
206 })
207 }
208 None => {
209 let snapshot_epoch_id =
210 self.mpt_id_to_snapshot_epoch_id.get(&mpt_id).unwrap();
211 let arc_db = Arc::new(
212 self.delta_db_manager
213 .get_delta_db(
214 &self
215 .delta_db_manager
216 .get_delta_db_name(snapshot_epoch_id),
217 )?
218 .unwrap(),
219 );
220 self.cache_util.cache_data.insert(
221 mpt_id,
222 (arc_db.clone(), LRUHandle::<u32>::default()),
223 );
224 self.lru_access(mpt_id);
225 Ok(ArcDeltaDbWrapper {
226 inner: Some(arc_db),
227 lru: None,
228 mpt_id,
229 })
230 }
231 }
232 }
233
234 fn create(
235 &mut self, snapshot_epoch_id: &EpochId, mpt_id: DeltaMptId,
236 opened_db: Option<Arc<dyn DeltaDbTrait + Send + Sync>>,
237 ) -> Result<ArcDeltaDbWrapper> {
238 match self.mpt_id_to_snapshot_epoch_id.get(&mpt_id) {
239 Some(epoch_id) => {
240 debug_assert!(snapshot_epoch_id == epoch_id);
241 match opened_db {
242 Some(_arc) => unreachable!(),
243 None => self.open(mpt_id),
244 }
245 }
246 None => {
247 let arc_db = match opened_db {
248 Some(arc) => arc,
249 None => Arc::new(
250 self.delta_db_manager.new_empty_delta_db(
251 &self
252 .delta_db_manager
253 .get_delta_db_name(snapshot_epoch_id),
254 )?,
255 ),
256 };
257 self.mpt_id_to_snapshot_epoch_id
258 .insert(mpt_id, snapshot_epoch_id.clone());
259 self.cache_util.cache_data.insert(
260 mpt_id,
261 (arc_db.clone(), LRUHandle::<u32>::default()),
262 );
263 self.lru_access(mpt_id);
264 Ok(ArcDeltaDbWrapper {
265 inner: Some(arc_db),
266 lru: None,
267 mpt_id,
268 })
269 }
270 }
271 }
272
273 fn release(&mut self, mpt_id: DeltaMptId, destroy: bool) {
282 match self.cache_util.cache_data.get(&mpt_id) {
283 Some(tuple) => {
284 let strong_count = Arc::strong_count(&tuple.0);
285 if destroy {
286 debug_assert!(strong_count == 1);
287 }
288 if destroy || (strong_count == 1 && !tuple.1.is_hit()) {
289 self.lru.delete(mpt_id, &mut self.cache_util);
291 self.cache_util.cache_data.remove(&mpt_id);
292 }
293 }
294 None => {}
295 }
296 if destroy {
297 self.mpt_id_to_snapshot_epoch_id.remove(&mpt_id);
298 }
299 }
300}
301
302use crate::{
303 impls::{
304 delta_mpt::{
305 cache::algorithm::{
306 lru::{LRUHandle, LRU},
307 CacheAccessResult, CacheAlgorithm, CacheIndexTrait,
308 CacheStoreUtil,
309 },
310 node_ref_map::DeltaMptId,
311 },
312 errors::*,
313 },
314 storage_db::{key_value_db::*, DeltaDbManagerTrait, DeltaDbTrait},
315};
316use parking_lot::Mutex;
317use primitives::EpochId;
318use std::{
319 collections::HashMap,
320 marker::PhantomData,
321 ops::Deref,
322 sync::{Arc, Weak},
323};