cfx_storage/impls/storage_db/
kvdb_sqlite_sharded.rs

1// Copyright 2019 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5pub struct KvdbSqliteSharded<ValueType> {
6    // The main table is the number_key_table_name, if with_number_key_table
7    // was true when statements are created.
8    shards_connections: Option<Box<[SqliteConnection]>>,
9    statements: Arc<KvdbSqliteStatements>,
10    __marker_value: PhantomData<ValueType>,
11}
12
13impl<ValueType> MallocSizeOf for KvdbSqliteSharded<ValueType> {
14    fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
15        let mut size = 0;
16        if let Some(connections) = &self.shards_connections {
17            for connection in &**connections {
18                size += connection.size_of(ops);
19            }
20        }
21        // statements is small so can be ignored
22        size
23    }
24}
25
26impl<ValueType> KvdbSqliteSharded<ValueType> {
27    pub fn new(
28        shard_connections: Option<Box<[SqliteConnection]>>,
29        statements: Arc<KvdbSqliteStatements>,
30    ) -> Self {
31        if let Some(connections) = shard_connections.as_ref() {
32            assert_valid_num_shards(connections.len() as u16);
33        }
34        Self {
35            shards_connections: shard_connections,
36            statements,
37            __marker_value: Default::default(),
38        }
39    }
40
41    pub fn into_connections(self) -> Option<Box<[SqliteConnection]>> {
42        self.shards_connections
43    }
44}
45
46pub struct KvdbSqliteShardedBorrowMut<'db, ValueType> {
47    shards_connections: Option<*mut [SqliteConnection]>,
48    statements: &'db KvdbSqliteStatements,
49    __marker_lifetime: PhantomData<&'db mut SqliteConnection>,
50    __marker_value: PhantomData<ValueType>,
51}
52
53impl<'db, ValueType> KvdbSqliteShardedBorrowMut<'db, ValueType> {
54    pub fn new(
55        shard_connections: Option<&'db mut [SqliteConnection]>,
56        statements: &'db KvdbSqliteStatements,
57    ) -> Self {
58        if let Some(connections) = shard_connections.as_ref() {
59            assert_valid_num_shards(connections.len() as u16);
60        }
61        Self {
62            shards_connections: shard_connections
63                .map(|x| x as *mut [SqliteConnection]),
64            statements,
65            __marker_lifetime: Default::default(),
66            __marker_value: Default::default(),
67        }
68    }
69}
70
71pub struct KvdbSqliteShardedBorrowShared<'db, ValueType> {
72    shards_connections: Option<*const [SqliteConnection]>,
73    statements: &'db KvdbSqliteStatements,
74    __marker_lifetime: PhantomData<&'db SqliteConnection>,
75    __marker_value: PhantomData<ValueType>,
76}
77
78impl<'db, ValueType> KvdbSqliteShardedBorrowShared<'db, ValueType> {
79    pub fn new(
80        shard_connections: Option<&'db [SqliteConnection]>,
81        statements: &'db KvdbSqliteStatements,
82    ) -> Self {
83        if let Some(connections) = shard_connections.as_ref() {
84            assert_valid_num_shards(connections.len() as u16);
85        }
86        Self {
87            shards_connections: shard_connections
88                .map(|x| x as *const [SqliteConnection]),
89            statements,
90            __marker_lifetime: Default::default(),
91            __marker_value: Default::default(),
92        }
93    }
94}
95
96pub trait KvdbSqliteShardedRefDestructureTrait {
97    fn destructure(
98        &self,
99    ) -> (Option<&[SqliteConnection]>, &KvdbSqliteStatements);
100}
101
102pub trait KvdbSqliteShardedDestructureTrait {
103    fn destructure_mut(
104        &mut self,
105    ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements);
106}
107
108impl<ValueType> KvdbSqliteSharded<ValueType> {
109    pub fn db_path<P: AsRef<Path>>(path: P, shard_id: u16) -> PathBuf {
110        path.as_ref().join(format!("shard_{:02x}", shard_id))
111    }
112
113    pub fn try_clone(&self) -> Result<Self> {
114        Ok(Self {
115            shards_connections: match &self.shards_connections {
116                None => None,
117                Some(connections) => {
118                    let mut cloned_connections =
119                        Vec::with_capacity(connections.len());
120                    for conn in connections.iter() {
121                        cloned_connections.push(conn.try_clone()?)
122                    }
123                    Some(cloned_connections.into_boxed_slice())
124                }
125            },
126            statements: self.statements.clone(),
127            __marker_value: Default::default(),
128        })
129    }
130
131    pub fn open<P: AsRef<Path>>(
132        num_shards: u16, dir: P, readonly: bool,
133        statements: Arc<KvdbSqliteStatements>,
134    ) -> Result<Self> {
135        assert_valid_num_shards(num_shards);
136        let mut shards_connections = Vec::with_capacity(num_shards as usize);
137        for i in 0..num_shards {
138            shards_connections.push(SqliteConnection::open(
139                Self::db_path(&dir, i),
140                readonly,
141                SqliteConnection::default_open_flags(),
142            )?)
143        }
144        Ok(Self {
145            shards_connections: Some(shards_connections.into_boxed_slice()),
146            statements,
147            __marker_value: Default::default(),
148        })
149    }
150
151    pub fn open_or_create<P: AsRef<Path>>(
152        num_shards: u16, dir: P, statements: Arc<KvdbSqliteStatements>,
153        unsafe_mode: bool,
154    ) -> Result<(bool, Self)> {
155        if dir.as_ref().exists() {
156            Ok((
157                true,
158                Self::open(
159                    num_shards, dir, /* readonly = */ false, statements,
160                )?,
161            ))
162        } else {
163            Ok((
164                false,
165                Self::create_and_open(
166                    num_shards,
167                    dir,
168                    statements,
169                    /* create_table = */ true,
170                    unsafe_mode,
171                )?,
172            ))
173        }
174    }
175
176    pub fn create_and_open<P: AsRef<Path>>(
177        num_shards: u16, dir: P, statements: Arc<KvdbSqliteStatements>,
178        create_table: bool, unsafe_mode: bool,
179    ) -> Result<Self> {
180        assert_valid_num_shards(num_shards);
181        let mut shards_connections = Vec::with_capacity(num_shards as usize);
182        for i in 0..num_shards {
183            let connection = KvdbSqlite::<ValueType>::create_and_open(
184                Self::db_path(&dir, i),
185                statements.clone(),
186                /* create_table = */ create_table,
187                unsafe_mode,
188            )?
189            .into_connection()
190            // Safe to unwrap since the connection is newly created.
191            .unwrap();
192            shards_connections.push(connection);
193        }
194        Ok(Self {
195            shards_connections: Some(shards_connections.into_boxed_slice()),
196            statements,
197            __marker_value: Default::default(),
198        })
199    }
200
201    /// Call initialize databases separately. Typical usage is to create a new
202    /// table from the existing db.
203    pub fn create_table(
204        connections: &mut Box<[SqliteConnection]>,
205        statements: &KvdbSqliteStatements,
206    ) -> Result<()> {
207        for connection in connections.iter_mut() {
208            KvdbSqlite::<ValueType>::create_table(connection, statements)?
209        }
210        Ok(())
211    }
212
213    pub fn check_if_table_exist(
214        connections: &mut Box<[SqliteConnection]>,
215        statements: &KvdbSqliteStatements,
216    ) -> Result<bool> {
217        for connection in connections.iter_mut() {
218            if KvdbSqlite::<ValueType>::check_if_table_exist(
219                connection, statements,
220            )? {
221                return Ok(true);
222            }
223        }
224
225        Ok(false)
226    }
227
228    pub fn drop_table(
229        connections: &mut Box<[SqliteConnection]>,
230        statements: &KvdbSqliteStatements,
231    ) -> Result<()> {
232        for connection in connections.iter_mut() {
233            KvdbSqlite::<ValueType>::drop_table(connection, statements)?;
234        }
235        Ok(())
236    }
237
238    pub fn vacumm_db(
239        connections: &mut Box<[SqliteConnection]>,
240        statements: &KvdbSqliteStatements,
241    ) -> Result<()> {
242        for connection in connections.iter_mut() {
243            KvdbSqlite::<ValueType>::vacuum_db(connection, statements)?
244        }
245        Ok(())
246    }
247}
248
249fn assert_valid_num_shards(num_shards: u16) { assert_eq!(0, 256 % num_shards) }
250
251pub fn key_to_shard_id(key: &[u8], num_shards: usize) -> usize {
252    let k_len = key.len();
253    let b0 = if k_len > 0 {
254        unsafe { *key.get_unchecked(0) as usize }
255    } else {
256        0
257    };
258    let b1 = if k_len > 1 {
259        unsafe { *key.get_unchecked(1) as usize }
260    } else {
261        0
262    };
263    let b2 = if k_len > 2 {
264        unsafe { *key.get_unchecked(2) as usize }
265    } else {
266        0
267    };
268    let b3 = if k_len > 3 {
269        unsafe { *key.get_unchecked(3) as usize }
270    } else {
271        0
272    };
273    let mapped_key = ((b0 + b1) << 4) + b2 + b3;
274    mapped_key % num_shards
275}
276
277pub fn number_key_to_shard_id(key: i64, num_shards: usize) -> usize {
278    let x = (((key >> 52) & 0x0000000000000ff0)
279        + ((key & 0x00ff000000000000) >> 44)) as usize;
280    let y = (((key & 0x0000ff0000000000) >> 40)
281        + ((key & 0x000000ff00000000) >> 32)) as usize;
282    (x + y) % num_shards
283}
284
285/// Map as in Map-Reduce.
286pub trait KeyPrefixToMap {
287    fn key_prefix_to_map(&self) -> u32;
288}
289
290impl KeyPrefixToMap for Vec<u8> {
291    fn key_prefix_to_map(&self) -> u32 {
292        let k_len = self.len();
293        let b0 = if k_len > 0 {
294            unsafe { *self.get_unchecked(0) as u32 }
295        } else {
296            0
297        };
298        let b1 = if k_len > 1 {
299            unsafe { *self.get_unchecked(1) as u32 }
300        } else {
301            0
302        };
303        let b2 = if k_len > 2 {
304            unsafe { *self.get_unchecked(2) as u32 }
305        } else {
306            0
307        };
308        let b3 = if k_len > 3 {
309            unsafe { *self.get_unchecked(3) as u32 }
310        } else {
311            0
312        };
313        (b0 << 24) + (b1 << 16) + (b2 << 8) + b3
314    }
315}
316
317impl KeyPrefixToMap for i64 {
318    fn key_prefix_to_map(&self) -> u32 {
319        // We have to xor the first bit to order negative keys before positive
320        // keys.
321        (*self >> 32) as u32 ^ 0x80000000
322    }
323}
324
325impl<
326        T: ReadImplFamily
327            + KvdbSqliteShardedRefDestructureTrait
328            + KeyValueDbTypes<ValueType = ValueType>,
329        ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
330    > ReadImplByFamily<KvdbSqliteSharded<ValueType>> for T
331{
332    fn get_impl(&self, key: &[u8]) -> Result<Option<Self::ValueType>> {
333        let (maybe_connections, statements) = self.destructure();
334        match maybe_connections {
335            None => Ok(None),
336            Some(connections) => KvdbSqliteBorrowShared::new((
337                connections.get(key_to_shard_id(key, connections.len())),
338                statements,
339            ))
340            .get_impl(key),
341        }
342    }
343
344    fn get_with_number_key_impl(
345        &self, key: i64,
346    ) -> Result<Option<Self::ValueType>> {
347        let (maybe_connections, statements) = self.destructure();
348        match maybe_connections {
349            None => Ok(None),
350            Some(connections) => KvdbSqliteBorrowShared::new((
351                connections.get(number_key_to_shard_id(key, connections.len())),
352                statements,
353            ))
354            .get_with_number_key_impl(key),
355        }
356    }
357}
358
359impl<
360        T: OwnedReadImplFamily
361            + KvdbSqliteShardedDestructureTrait
362            + KeyValueDbTypes<ValueType = ValueType>,
363        ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
364    > OwnedReadImplByFamily<KvdbSqliteSharded<ValueType>> for T
365{
366    fn get_mut_impl(&mut self, key: &[u8]) -> Result<Option<Self::ValueType>> {
367        let (maybe_connections, statements) = self.destructure_mut();
368        match maybe_connections {
369            None => Ok(None),
370            Some(connections) => KvdbSqliteBorrowMut::new((
371                connections.get_mut(key_to_shard_id(key, connections.len())),
372                statements,
373            ))
374            .get_mut_impl(key),
375        }
376    }
377
378    fn get_mut_with_number_key_impl(
379        &mut self, key: i64,
380    ) -> Result<Option<Self::ValueType>> {
381        let (maybe_connections, statements) = self.destructure_mut();
382        match maybe_connections {
383            None => Ok(None),
384            Some(connections) => KvdbSqliteBorrowMut::new((
385                connections
386                    .get_mut(number_key_to_shard_id(key, connections.len())),
387                statements,
388            ))
389            .get_mut_with_number_key_impl(key),
390        }
391    }
392}
393
394impl<
395        T: SingleWriterImplFamily
396            + KvdbSqliteShardedDestructureTrait
397            + KeyValueDbTypes<ValueType = ValueType>,
398        ValueType: DbValueType,
399    > SingleWriterImplByFamily<KvdbSqliteSharded<ValueType>> for T
400where ValueType::Type: SqlBindableValue
401        + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
402{
403    fn delete_impl(
404        &mut self, key: &[u8],
405    ) -> Result<Option<Option<Self::ValueType>>> {
406        let (maybe_connections, statements) = self.destructure_mut();
407        match maybe_connections {
408            None => Err(Error::from(Error::DbNotExist)),
409            Some(connections) => KvdbSqliteBorrowMut::<ValueType>::new((
410                connections.get_mut(key_to_shard_id(key, connections.len())),
411                statements,
412            ))
413            .delete_impl(key),
414        }
415    }
416
417    fn delete_with_number_key_impl(
418        &mut self, key: i64,
419    ) -> Result<Option<Option<Self::ValueType>>> {
420        let (maybe_connections, statements) = self.destructure_mut();
421        match maybe_connections {
422            None => Err(Error::from(Error::DbNotExist)),
423            Some(connections) => KvdbSqliteBorrowMut::new((
424                connections
425                    .get_mut(number_key_to_shard_id(key, connections.len())),
426                statements,
427            ))
428            .delete_with_number_key_impl(key),
429        }
430    }
431
432    fn put_impl(
433        &mut self, key: &[u8], value: &<Self::ValueType as DbValueType>::Type,
434    ) -> Result<Option<Option<Self::ValueType>>> {
435        let (maybe_connections, statements) = self.destructure_mut();
436        match maybe_connections {
437            None => Err(Error::from(Error::DbNotExist)),
438            Some(connections) => KvdbSqliteBorrowMut::<ValueType>::new((
439                connections.get_mut(key_to_shard_id(key, connections.len())),
440                statements,
441            ))
442            .put_impl(key, value),
443        }
444    }
445
446    fn put_with_number_key_impl(
447        &mut self, key: i64, value: &<Self::ValueType as DbValueType>::Type,
448    ) -> Result<Option<Option<Self::ValueType>>> {
449        let (maybe_connections, statements) = self.destructure_mut();
450        match maybe_connections {
451            None => Err(Error::from(Error::DbNotExist)),
452            Some(connections) => KvdbSqliteBorrowMut::new((
453                connections
454                    .get_mut(number_key_to_shard_id(key, connections.len())),
455                statements,
456            ))
457            .put_with_number_key_impl(key, value),
458        }
459    }
460}
461
462pub struct ShardedIterMerger<
463    Key: KeyPrefixToMap,
464    Value,
465    ShardIter: FallibleIterator<Item = (Key, Value)>,
466> {
467    shard_iters: Vec<ShardIter>,
468    ordered_iter_ids: Vec<u8>,
469    ordered_key_prefixes_to_map: Vec<u32>,
470    maybe_next_key_values: Vec<Option<(Key, Value)>>,
471}
472
473impl<
474        Key: KeyPrefixToMap,
475        Value,
476        ShardIter: FallibleIterator<Item = (Key, Value)>,
477    > ShardedIterMerger<Key, Value, ShardIter>
478{
479    pub fn new() -> Self {
480        Self {
481            shard_iters: vec![],
482            ordered_iter_ids: vec![],
483            ordered_key_prefixes_to_map: vec![],
484            // Keep the rear element to None to help the while loops.
485            maybe_next_key_values: vec![None],
486        }
487    }
488
489    pub fn push_shard_iter(
490        &mut self, mut iter: ShardIter,
491    ) -> std::result::Result<(), ShardIter::Error> {
492        let maybe_first_key_value = iter.next()?;
493        let iter_id = self.shard_iters.len() as u8;
494        self.shard_iters.push(iter);
495        self.kv_push(iter_id, maybe_first_key_value);
496        Ok(())
497    }
498
499    fn kv_push(&mut self, iter_id: u8, kv: Option<(Key, Value)>) {
500        if kv.is_some() {
501            let mut i = 0;
502            let this_key_prefix_to_map =
503                kv.as_ref().unwrap().0.key_prefix_to_map();
504            while self.maybe_next_key_values[i].is_some() {
505                if this_key_prefix_to_map < self.ordered_key_prefixes_to_map[i]
506                {
507                    break;
508                }
509                i += 1;
510            }
511            self.ordered_iter_ids.insert(i, iter_id);
512            self.ordered_key_prefixes_to_map
513                .insert(i, this_key_prefix_to_map);
514            self.maybe_next_key_values.insert(i, kv);
515        }
516    }
517
518    fn peek(&self) -> Option<u8> { self.ordered_iter_ids.get(0).cloned() }
519
520    fn kv_pop_and_push(
521        &mut self, iter_id: u8, kv: Option<(Key, Value)>,
522    ) -> std::result::Result<Option<(Key, Value)>, ShardIter::Error> {
523        let mut i;
524        if kv.is_some() {
525            i = 0;
526            let this_key_prefix_to_map =
527                kv.as_ref().unwrap().0.key_prefix_to_map();
528            while self.maybe_next_key_values[i].is_some() {
529                if this_key_prefix_to_map < self.ordered_key_prefixes_to_map[i]
530                {
531                    break;
532                }
533                i += 1;
534            }
535            // i is at least 1, because key_prefix_to_map[0] is the smallest,
536            // and kv is the next element, which has to be >=
537            // key_prefix_to_map[0]. Have to use memmove, etc.
538            debug_assert!(i > 0);
539            unsafe {
540                let pos = i - 1;
541                if pos > 0 {
542                    std::ptr::copy(
543                        self.ordered_iter_ids.get_unchecked(1),
544                        self.ordered_iter_ids.get_unchecked_mut(0),
545                        pos,
546                    );
547                }
548                std::ptr::write(
549                    self.ordered_iter_ids.get_unchecked_mut(pos),
550                    iter_id,
551                );
552                if pos > 0 {
553                    std::ptr::copy(
554                        self.ordered_key_prefixes_to_map.get_unchecked(1),
555                        self.ordered_key_prefixes_to_map.get_unchecked_mut(0),
556                        pos,
557                    );
558                }
559                std::ptr::write(
560                    self.ordered_key_prefixes_to_map.get_unchecked_mut(pos),
561                    this_key_prefix_to_map,
562                );
563                let popped = Ok(std::ptr::read(
564                    self.maybe_next_key_values.get_unchecked(0),
565                ));
566                if pos > 0 {
567                    std::ptr::copy(
568                        self.maybe_next_key_values.get_unchecked(1),
569                        self.maybe_next_key_values.get_unchecked_mut(0),
570                        pos,
571                    );
572                }
573                std::ptr::write(
574                    self.maybe_next_key_values.get_unchecked_mut(pos),
575                    kv,
576                );
577
578                popped
579            }
580        } else {
581            self.ordered_iter_ids.remove(0);
582            self.ordered_key_prefixes_to_map.remove(0);
583            Ok(self.maybe_next_key_values.remove(0))
584        }
585    }
586}
587
588impl<
589        Key: KeyPrefixToMap,
590        Value,
591        ShardIter: FallibleIterator<Item = (Key, Value)>,
592    > FallibleIterator for ShardedIterMerger<Key, Value, ShardIter>
593{
594    type Error = ShardIter::Error;
595    type Item = (Key, Value);
596
597    fn next(&mut self) -> std::result::Result<Option<Self::Item>, Self::Error> {
598        match self.peek() {
599            None => Ok(None),
600            Some(iter_id) => {
601                // TODO: iter executions are not in parallel. Current
602                // performance seems OK.
603                let next_kv = self.shard_iters[iter_id as usize].next()?;
604                self.kv_pop_and_push(iter_id, next_kv)
605            }
606        }
607    }
608}
609
610pub trait KvdbSqliteShardedDestructureTraitWithValueType:
611    KvdbSqliteShardedDestructureTrait + KeyValueDbTypes
612{
613}
614impl<T: KvdbSqliteShardedDestructureTrait + KeyValueDbTypes>
615    KvdbSqliteShardedDestructureTraitWithValueType for T
616{
617}
618
619// TODO: iter executions are not in parallel. Current performance seems OK.
620pub fn kvdb_sqlite_sharded_iter_range_impl<
621    'db,
622    Key: 'db + KeyPrefixToMap,
623    Value: 'db,
624    F: Clone + FnMut(&Statement<'db>) -> Result<(Key, Value)>,
625>(
626    maybe_shards_connections: Option<&'db mut [SqliteConnection]>,
627    statements: &KvdbSqliteStatements, lower_bound_incl: &[u8],
628    upper_bound_excl: Option<&[u8]>, f: F,
629) -> Result<ShardedIterMerger<Key, Value, MappedRows<'db, F>>> {
630    let mut shards_iter_merger_result = Ok(ShardedIterMerger::new());
631    if let Some(shards_connections) = maybe_shards_connections {
632        let shards_iter_merger = shards_iter_merger_result.as_mut().unwrap();
633        for connection in shards_connections.iter_mut() {
634            let shard_iter = kvdb_sqlite_iter_range_impl(
635                Some(connection),
636                statements,
637                lower_bound_incl,
638                upper_bound_excl,
639                f.clone(),
640            )?;
641            shards_iter_merger.push_shard_iter(shard_iter)?;
642        }
643    }
644
645    shards_iter_merger_result
646}
647
648pub fn kvdb_sqlite_sharded_iter_range_excl_impl<
649    'db,
650    Key: 'db + KeyPrefixToMap,
651    Value: 'db,
652    F: Clone + FnMut(&Statement<'db>) -> Result<(Key, Value)>,
653>(
654    maybe_shards_connections: Option<&'db mut [SqliteConnection]>,
655    statements: &KvdbSqliteStatements, lower_bound_excl: &[u8],
656    upper_bound_excl: &[u8], f: F,
657) -> Result<ShardedIterMerger<Key, Value, MappedRows<'db, F>>> {
658    let mut shards_iter_merger_result = Ok(ShardedIterMerger::new());
659    if let Some(shards_connections) = maybe_shards_connections {
660        let shards_iter_merger = shards_iter_merger_result.as_mut().unwrap();
661        for connection in shards_connections.iter_mut() {
662            let shard_iter = kvdb_sqlite_iter_range_excl_impl(
663                Some(connection),
664                statements,
665                lower_bound_excl,
666                upper_bound_excl,
667                f.clone(),
668            )?;
669            shards_iter_merger.push_shard_iter(shard_iter)?;
670        }
671    }
672
673    shards_iter_merger_result
674}
675
676macro_rules! enable_KvdbIterIterator_for_KvdbSqliteSharded_families {
677    ($ItemKeyType:ty, $ItemValueType:ty, $KeyType:ty) => {
678        impl<'a>
679            WrappedLifetimeFamily<
680                'a,
681                dyn FallibleIterator<
682                    Item = ($ItemKeyType, $ItemValueType),
683                    Error = Error,
684                >,
685            >
686            for KvdbIterIterator<
687                ($ItemKeyType, $ItemValueType),
688                $KeyType,
689                KvdbSqliteShardedIteratorTag,
690            >
691        {
692            type Out = ShardedIterMerger<
693                $ItemKeyType,
694                $ItemValueType,
695                MappedRows<
696                    'a,
697                    for<'r, 's> fn(
698                        &'r Statement<'s>,
699                    )
700                        -> Result<($ItemKeyType, $ItemValueType)>,
701                >,
702            >;
703        }
704        impl
705            WrappedTrait<
706                dyn FallibleIterator<
707                    Item = ($ItemKeyType, $ItemValueType),
708                    Error = Error,
709                >,
710            >
711            for KvdbIterIterator<
712                ($ItemKeyType, $ItemValueType),
713                $KeyType,
714                KvdbSqliteShardedIteratorTag,
715            >
716        {
717        }
718    };
719}
720
721pub struct KvdbSqliteShardedIteratorTag();
722
723enable_KvdbIterIterator_for_KvdbSqliteSharded_families!(
724    Vec<u8>,
725    Box<[u8]>,
726    [u8]
727);
728enable_KvdbIterIterator_for_KvdbSqliteSharded_families!(Vec<u8>, (), [u8]);
729
730macro_rules! make_wrap_of_KeyValueDbIterableTrait_of_KvdbSqliteSharded_families {
731    ($ItemType:ty, $Ttype:ty) => {
732        impl
733            ElementSatisfy<
734                dyn KeyValueDbIterableTrait<
735                    $ItemType,
736                    [u8],
737                    KvdbSqliteShardedIteratorTag,
738                >,
739            > for $Ttype
740        {
741            fn to_constrain_object(
742                &self,
743            ) -> &(dyn KeyValueDbIterableTrait<
744                $ItemType,
745                [u8],
746                KvdbSqliteShardedIteratorTag,
747            > + 'static) {
748                self
749            }
750
751            fn to_constrain_object_mut(
752                &mut self,
753            ) -> &mut (dyn KeyValueDbIterableTrait<
754                $ItemType,
755                [u8],
756                KvdbSqliteShardedIteratorTag,
757            > + 'static) {
758                self
759            }
760        }
761
762        impl
763            WrappedLifetimeFamily<
764                '_,
765                dyn KeyValueDbIterableTrait<
766                    $ItemType,
767                    [u8],
768                    KvdbSqliteShardedIteratorTag,
769                >,
770            > for $Ttype
771        {
772            type Out = Self;
773        }
774        impl
775            WrappedTrait<
776                dyn KeyValueDbIterableTrait<
777                    $ItemType,
778                    [u8],
779                    KvdbSqliteShardedIteratorTag,
780                >,
781            > for $Ttype
782        {
783        }
784    };
785}
786
787make_wrap_of_KeyValueDbIterableTrait_of_KvdbSqliteSharded_families!(
788    MptKeyValue,
789    KvdbSqliteSharded<Box<[u8]>>
790);
791make_wrap_of_KeyValueDbIterableTrait_of_KvdbSqliteSharded_families!(
792    (Vec<u8>, ()),
793    KvdbSqliteSharded<()>
794);
795make_wrap_of_KeyValueDbIterableTrait_of_KvdbSqliteSharded_families!(
796    MptKeyValue,
797    KvdbSqliteShardedBorrowMut<'static, Box<[u8]>>
798);
799
800// FIXME: implement for more general types
801// (DerefOr...<KvdbSqliteShardedBorrow...>)
802impl<
803        ValueType: 'static + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
804        T: DerefMutPlusImplOrBorrowMutSelf<
805            dyn KvdbSqliteShardedDestructureTraitWithValueType<
806                ValueType = ValueType,
807            >,
808        >,
809    >
810    KeyValueDbIterableTrait<
811        (Vec<u8>, ValueType),
812        [u8],
813        KvdbSqliteShardedIteratorTag,
814    > for T
815where KvdbIterIterator<(Vec<u8>, ValueType), [u8], KvdbSqliteShardedIteratorTag>:
816        WrappedTrait<
817                dyn FallibleIterator<
818                    Item = (Vec<u8>, ValueType),
819                    Error = Error,
820                >,
821            > + for<'a> WrappedLifetimeFamily<
822                'a,
823                dyn FallibleIterator<
824                    Item = (Vec<u8>, ValueType),
825                    Error = Error,
826                >,
827                Out = ShardedIterMerger<
828                    Vec<u8>,
829                    ValueType,
830                    MappedRows<
831                        'a,
832                        for<'r, 's> fn(
833                            &'r Statement<'s>,
834                        )
835                            -> Result<(Vec<u8>, ValueType)>,
836                    >,
837                >,
838            >
839{
840    fn iter_range(
841        &mut self, lower_bound_incl: &[u8], upper_bound_excl: Option<&[u8]>,
842    ) -> Result<
843        Wrap<
844            '_,
845            KvdbIterIterator<
846                (Vec<u8>, ValueType),
847                [u8],
848                KvdbSqliteShardedIteratorTag,
849            >,
850            dyn FallibleIterator<Item = (Vec<u8>, ValueType), Error = Error>,
851        >,
852    > {
853        let (maybe_shards_connections, statements) =
854            self.borrow_mut().destructure_mut();
855        Ok(Wrap(kvdb_sqlite_sharded_iter_range_impl(
856            maybe_shards_connections,
857            statements,
858            lower_bound_incl,
859            upper_bound_excl,
860            KvdbSqlite::<ValueType>::kv_from_iter_row::<Vec<u8>>
861                as for<'r, 's> fn(
862                    &'r Statement<'s>,
863                )
864                    -> Result<(Vec<u8>, ValueType)>,
865        )?))
866    }
867
868    fn iter_range_excl(
869        &mut self, lower_bound_excl: &[u8], upper_bound_excl: &[u8],
870    ) -> Result<
871        Wrap<
872            '_,
873            KvdbIterIterator<
874                (Vec<u8>, ValueType),
875                [u8],
876                KvdbSqliteShardedIteratorTag,
877            >,
878            dyn FallibleIterator<Item = (Vec<u8>, ValueType), Error = Error>,
879        >,
880    > {
881        let (maybe_shards_connections, statements) =
882            self.borrow_mut().destructure_mut();
883        Ok(Wrap(kvdb_sqlite_sharded_iter_range_excl_impl(
884            maybe_shards_connections,
885            statements,
886            lower_bound_excl,
887            upper_bound_excl,
888            KvdbSqlite::<ValueType>::kv_from_iter_row::<Vec<u8>>
889                as for<'r, 's> fn(
890                    &'r Statement<'s>,
891                )
892                    -> Result<(Vec<u8>, ValueType)>,
893        )?))
894    }
895}
896
897// 'static because Any is static.
898impl<
899        ValueType: 'static
900            + DbValueType
901            + ValueRead
902            + ValueReadImpl<<ValueType as ValueRead>::Kind>,
903    > KeyValueDbTraitTransactional for KvdbSqliteSharded<ValueType>
904where ValueType::Type: SqlBindableValue
905        + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
906{
907    type TransactionType = KvdbSqliteShardedTransaction<ValueType>;
908
909    fn start_transaction(
910        &self, immediate_write: bool,
911    ) -> Result<KvdbSqliteShardedTransaction<ValueType>> {
912        if self.shards_connections.is_none() {
913            bail!(Error::DbNotExist);
914        }
915
916        KvdbSqliteShardedTransaction::new(self.try_clone()?, immediate_write)
917    }
918}
919
920pub struct KvdbSqliteShardedTransaction<
921    ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
922> where ValueType::Type: SqlBindableValue
923        + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
924{
925    db: KvdbSqliteSharded<ValueType>,
926    committed: bool,
927}
928
929impl<
930        ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
931    > Drop for KvdbSqliteShardedTransaction<ValueType>
932where ValueType::Type: SqlBindableValue
933        + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
934{
935    fn drop(&mut self) {
936        if !self.committed {
937            self.revert().ok();
938        }
939    }
940}
941
942impl<
943        ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
944    > KvdbSqliteShardedTransaction<ValueType>
945where ValueType::Type: SqlBindableValue
946        + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
947{
948    fn new(
949        mut db: KvdbSqliteSharded<ValueType>, immediate_write: bool,
950    ) -> Result<Self> {
951        Self::start_transaction(
952            db.shards_connections.as_mut(),
953            immediate_write,
954        )?;
955        Ok(Self {
956            db,
957            committed: false,
958        })
959    }
960
961    fn start_transaction(
962        maybe_shard_connections: Option<&mut Box<[SqliteConnection]>>,
963        immediate_write: bool,
964    ) -> Result<()> {
965        if let Some(connections) = maybe_shard_connections {
966            for conn in connections.iter_mut() {
967                KvdbSqliteTransaction::<ValueType>::start_transaction(
968                    conn.get_db_mut(),
969                    immediate_write,
970                )?
971            }
972        }
973        Ok(())
974    }
975}
976
977impl<
978        ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
979    > KeyValueDbTransactionTrait for KvdbSqliteShardedTransaction<ValueType>
980where ValueType::Type: SqlBindableValue
981        + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
982{
983    fn commit(&mut self, _db: &dyn Any) -> Result<()> {
984        self.committed = true;
985        if let Some(connections) = self.db.shards_connections.as_mut() {
986            for conn in connections.iter_mut() {
987                conn.get_db_mut().execute("COMMIT")?;
988            }
989        }
990        Ok(())
991    }
992
993    fn revert(&mut self) -> Result<()> {
994        self.committed = true;
995        if let Some(connections) = self.db.shards_connections.as_mut() {
996            for conn in connections.iter_mut() {
997                conn.get_db_mut().execute("ROLLBACK")?;
998            }
999        }
1000        Ok(())
1001    }
1002
1003    fn restart(
1004        &mut self, immediate_write: bool, no_revert: bool,
1005    ) -> Result<()> {
1006        if !no_revert {
1007            self.revert()?;
1008        }
1009        Self::start_transaction(
1010            self.db.shards_connections.as_mut(),
1011            immediate_write,
1012        )
1013    }
1014}
1015
1016impl<
1017        ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
1018    > KeyValueDbTypes for KvdbSqliteShardedTransaction<ValueType>
1019where ValueType::Type: SqlBindableValue
1020        + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
1021{
1022    type ValueType = ValueType;
1023}
1024
1025impl KeyValueDbTraitMultiReader for KvdbSqliteSharded<Box<[u8]>> {}
1026impl OwnedReadImplFamily for &KvdbSqliteSharded<Box<[u8]>> {
1027    type FamilyRepresentative =
1028        dyn KeyValueDbTraitMultiReader<ValueType = Box<[u8]>>;
1029}
1030impl DeltaDbTrait for KvdbSqliteSharded<Box<[u8]>> {}
1031
1032impl<ValueType: DbValueType> KeyValueDbTypes for KvdbSqliteSharded<ValueType> {
1033    type ValueType = ValueType;
1034}
1035
1036impl<ValueType: DbValueType> KeyValueDbTypes
1037    for KvdbSqliteShardedBorrowMut<'_, ValueType>
1038{
1039    type ValueType = ValueType;
1040}
1041
1042impl<ValueType: DbValueType> KeyValueDbTypes
1043    for KvdbSqliteShardedBorrowShared<'_, ValueType>
1044{
1045    type ValueType = ValueType;
1046}
1047
1048impl<ValueType: DbValueType> KvdbSqliteShardedRefDestructureTrait
1049    for KvdbSqliteSharded<ValueType>
1050{
1051    fn destructure(
1052        &self,
1053    ) -> (Option<&[SqliteConnection]>, &KvdbSqliteStatements) {
1054        (
1055            self.shards_connections.as_ref().map(|r| &**r),
1056            &self.statements,
1057        )
1058    }
1059}
1060
1061impl<ValueType: DbValueType> KvdbSqliteShardedDestructureTrait
1062    for KvdbSqliteSharded<ValueType>
1063{
1064    fn destructure_mut(
1065        &mut self,
1066    ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
1067        (
1068            self.shards_connections.as_mut().map(|r| &mut **r),
1069            &self.statements,
1070        )
1071    }
1072}
1073
1074impl<
1075        ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
1076    > KvdbSqliteShardedDestructureTrait
1077    for KvdbSqliteShardedTransaction<ValueType>
1078where ValueType::Type: SqlBindableValue
1079        + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
1080{
1081    fn destructure_mut(
1082        &mut self,
1083    ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
1084        self.db.destructure_mut()
1085    }
1086}
1087
1088impl<ValueType: DbValueType> KvdbSqliteShardedRefDestructureTrait
1089    for KvdbSqliteShardedBorrowMut<'_, ValueType>
1090{
1091    fn destructure(
1092        &self,
1093    ) -> (Option<&[SqliteConnection]>, &KvdbSqliteStatements) {
1094        (
1095            self.shards_connections.clone().map(|r| unsafe { &*r }),
1096            &self.statements,
1097        )
1098    }
1099}
1100
1101impl<ValueType: DbValueType> KvdbSqliteShardedDestructureTrait
1102    for KvdbSqliteShardedBorrowMut<'_, ValueType>
1103{
1104    fn destructure_mut(
1105        &mut self,
1106    ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
1107        (
1108            self.shards_connections.clone().map(|r| unsafe { &mut *r }),
1109            &self.statements,
1110        )
1111    }
1112}
1113
1114impl<ValueType: DbValueType> KvdbSqliteShardedRefDestructureTrait
1115    for KvdbSqliteShardedBorrowShared<'_, ValueType>
1116{
1117    fn destructure(
1118        &self,
1119    ) -> (Option<&[SqliteConnection]>, &KvdbSqliteStatements) {
1120        (
1121            self.shards_connections.clone().map(|r| unsafe { &*r }),
1122            &self.statements,
1123        )
1124    }
1125}
1126
1127impl<ValueType> ReadImplFamily for KvdbSqliteSharded<ValueType> {
1128    type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1129}
1130
1131impl<ValueType> OwnedReadImplFamily for KvdbSqliteSharded<ValueType> {
1132    type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1133}
1134
1135impl<ValueType> SingleWriterImplFamily for KvdbSqliteSharded<ValueType> {
1136    type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1137}
1138
1139impl<ValueType> ReadImplFamily for KvdbSqliteShardedBorrowMut<'_, ValueType> {
1140    type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1141}
1142
1143impl<
1144        ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
1145    > OwnedReadImplFamily for KvdbSqliteShardedTransaction<ValueType>
1146where ValueType::Type: SqlBindableValue
1147        + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
1148{
1149    type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1150}
1151
1152impl<
1153        ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
1154    > SingleWriterImplFamily for KvdbSqliteShardedTransaction<ValueType>
1155where ValueType::Type: SqlBindableValue
1156        + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
1157{
1158    type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1159}
1160
1161impl<ValueType> OwnedReadImplFamily
1162    for KvdbSqliteShardedBorrowMut<'_, ValueType>
1163{
1164    type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1165}
1166
1167impl<ValueType> SingleWriterImplFamily
1168    for KvdbSqliteShardedBorrowMut<'_, ValueType>
1169{
1170    type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1171}
1172
1173impl<ValueType> ReadImplFamily
1174    for KvdbSqliteShardedBorrowShared<'_, ValueType>
1175{
1176    type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1177}
1178
1179enable_deref_for_self! {KvdbSqliteSharded<Box<[u8]>>}
1180enable_deref_for_self! {KvdbSqliteSharded<()>}
1181enable_deref_for_self! {KvdbSqliteShardedBorrowShared<'_, Box<[u8]>>}
1182enable_deref_for_self! {KvdbSqliteShardedBorrowMut<'_, Box<[u8]>>}
1183enable_deref_mut_plus_impl_or_borrow_mut_self!(
1184    KvdbSqliteShardedDestructureTraitWithValueType<ValueType = Box<[u8]>>
1185);
1186enable_deref_mut_plus_impl_or_borrow_mut_self!(
1187    KvdbSqliteShardedDestructureTraitWithValueType<ValueType = ()>
1188);
1189
1190use crate::{
1191    impls::{
1192        errors::*,
1193        storage_db::{
1194            kvdb_sqlite::{
1195                kvdb_sqlite_iter_range_excl_impl, kvdb_sqlite_iter_range_impl,
1196                KvdbSqliteBorrowMut, KvdbSqliteBorrowShared,
1197                KvdbSqliteTransaction,
1198            },
1199            sqlite::{
1200                BindValueAppendImpl, MappedRows, SqlBindableValue, ValueRead,
1201                ValueReadImpl,
1202            },
1203        },
1204    },
1205    storage_db::{
1206        DbValueType, DeltaDbTrait, KeyValueDbIterableTrait,
1207        KeyValueDbTraitMultiReader, KeyValueDbTraitTransactional,
1208        KeyValueDbTransactionTrait, KeyValueDbTypes, KvdbIterIterator,
1209        OwnedReadImplByFamily, OwnedReadImplFamily, ReadImplByFamily,
1210        ReadImplFamily, SingleWriterImplByFamily, SingleWriterImplFamily,
1211    },
1212    utils::{
1213        deref_plus_impl_or_borrow_self::*,
1214        tuple::ElementSatisfy,
1215        wrap::{Wrap, WrappedLifetimeFamily, WrappedTrait},
1216    },
1217    KvdbSqlite, KvdbSqliteStatements, MptKeyValue, SqliteConnection,
1218};
1219use fallible_iterator::FallibleIterator;
1220use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
1221use sqlite::Statement;
1222use std::{
1223    any::Any,
1224    marker::PhantomData,
1225    path::{Path, PathBuf},
1226    sync::Arc,
1227};