1pub struct KvdbSqliteSharded<ValueType> {
6 shards_connections: Option<Box<[SqliteConnection]>>,
9 statements: Arc<KvdbSqliteStatements>,
10 __marker_value: PhantomData<ValueType>,
11}
12
13impl<ValueType> MallocSizeOf for KvdbSqliteSharded<ValueType> {
14 fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
15 let mut size = 0;
16 if let Some(connections) = &self.shards_connections {
17 for connection in &**connections {
18 size += connection.size_of(ops);
19 }
20 }
21 size
23 }
24}
25
26impl<ValueType> KvdbSqliteSharded<ValueType> {
27 pub fn new(
28 shard_connections: Option<Box<[SqliteConnection]>>,
29 statements: Arc<KvdbSqliteStatements>,
30 ) -> Self {
31 if let Some(connections) = shard_connections.as_ref() {
32 assert_valid_num_shards(connections.len() as u16);
33 }
34 Self {
35 shards_connections: shard_connections,
36 statements,
37 __marker_value: Default::default(),
38 }
39 }
40
41 pub fn into_connections(self) -> Option<Box<[SqliteConnection]>> {
42 self.shards_connections
43 }
44}
45
46pub struct KvdbSqliteShardedBorrowMut<'db, ValueType> {
47 shards_connections: Option<*mut [SqliteConnection]>,
48 statements: &'db KvdbSqliteStatements,
49 __marker_lifetime: PhantomData<&'db mut SqliteConnection>,
50 __marker_value: PhantomData<ValueType>,
51}
52
53impl<'db, ValueType> KvdbSqliteShardedBorrowMut<'db, ValueType> {
54 pub fn new(
55 shard_connections: Option<&'db mut [SqliteConnection]>,
56 statements: &'db KvdbSqliteStatements,
57 ) -> Self {
58 if let Some(connections) = shard_connections.as_ref() {
59 assert_valid_num_shards(connections.len() as u16);
60 }
61 Self {
62 shards_connections: shard_connections
63 .map(|x| x as *mut [SqliteConnection]),
64 statements,
65 __marker_lifetime: Default::default(),
66 __marker_value: Default::default(),
67 }
68 }
69}
70
71pub struct KvdbSqliteShardedBorrowShared<'db, ValueType> {
72 shards_connections: Option<*const [SqliteConnection]>,
73 statements: &'db KvdbSqliteStatements,
74 __marker_lifetime: PhantomData<&'db SqliteConnection>,
75 __marker_value: PhantomData<ValueType>,
76}
77
78impl<'db, ValueType> KvdbSqliteShardedBorrowShared<'db, ValueType> {
79 pub fn new(
80 shard_connections: Option<&'db [SqliteConnection]>,
81 statements: &'db KvdbSqliteStatements,
82 ) -> Self {
83 if let Some(connections) = shard_connections.as_ref() {
84 assert_valid_num_shards(connections.len() as u16);
85 }
86 Self {
87 shards_connections: shard_connections
88 .map(|x| x as *const [SqliteConnection]),
89 statements,
90 __marker_lifetime: Default::default(),
91 __marker_value: Default::default(),
92 }
93 }
94}
95
96pub trait KvdbSqliteShardedRefDestructureTrait {
97 fn destructure(
98 &self,
99 ) -> (Option<&[SqliteConnection]>, &KvdbSqliteStatements);
100}
101
102pub trait KvdbSqliteShardedDestructureTrait {
103 fn destructure_mut(
104 &mut self,
105 ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements);
106}
107
108impl<ValueType> KvdbSqliteSharded<ValueType> {
109 pub fn db_path<P: AsRef<Path>>(path: P, shard_id: u16) -> PathBuf {
110 path.as_ref().join(format!("shard_{:02x}", shard_id))
111 }
112
113 pub fn try_clone(&self) -> Result<Self> {
114 Ok(Self {
115 shards_connections: match &self.shards_connections {
116 None => None,
117 Some(connections) => {
118 let mut cloned_connections =
119 Vec::with_capacity(connections.len());
120 for conn in connections.iter() {
121 cloned_connections.push(conn.try_clone()?)
122 }
123 Some(cloned_connections.into_boxed_slice())
124 }
125 },
126 statements: self.statements.clone(),
127 __marker_value: Default::default(),
128 })
129 }
130
131 pub fn open<P: AsRef<Path>>(
132 num_shards: u16, dir: P, readonly: bool,
133 statements: Arc<KvdbSqliteStatements>,
134 ) -> Result<Self> {
135 assert_valid_num_shards(num_shards);
136 let mut shards_connections = Vec::with_capacity(num_shards as usize);
137 for i in 0..num_shards {
138 shards_connections.push(SqliteConnection::open(
139 Self::db_path(&dir, i),
140 readonly,
141 SqliteConnection::default_open_flags(),
142 )?)
143 }
144 Ok(Self {
145 shards_connections: Some(shards_connections.into_boxed_slice()),
146 statements,
147 __marker_value: Default::default(),
148 })
149 }
150
151 pub fn open_or_create<P: AsRef<Path>>(
152 num_shards: u16, dir: P, statements: Arc<KvdbSqliteStatements>,
153 unsafe_mode: bool,
154 ) -> Result<(bool, Self)> {
155 if dir.as_ref().exists() {
156 Ok((
157 true,
158 Self::open(
159 num_shards, dir, false, statements,
160 )?,
161 ))
162 } else {
163 Ok((
164 false,
165 Self::create_and_open(
166 num_shards,
167 dir,
168 statements,
169 true,
170 unsafe_mode,
171 )?,
172 ))
173 }
174 }
175
176 pub fn create_and_open<P: AsRef<Path>>(
177 num_shards: u16, dir: P, statements: Arc<KvdbSqliteStatements>,
178 create_table: bool, unsafe_mode: bool,
179 ) -> Result<Self> {
180 assert_valid_num_shards(num_shards);
181 let mut shards_connections = Vec::with_capacity(num_shards as usize);
182 for i in 0..num_shards {
183 let connection = KvdbSqlite::<ValueType>::create_and_open(
184 Self::db_path(&dir, i),
185 statements.clone(),
186 create_table,
187 unsafe_mode,
188 )?
189 .into_connection()
190 .unwrap();
192 shards_connections.push(connection);
193 }
194 Ok(Self {
195 shards_connections: Some(shards_connections.into_boxed_slice()),
196 statements,
197 __marker_value: Default::default(),
198 })
199 }
200
201 pub fn create_table(
204 connections: &mut Box<[SqliteConnection]>,
205 statements: &KvdbSqliteStatements,
206 ) -> Result<()> {
207 for connection in connections.iter_mut() {
208 KvdbSqlite::<ValueType>::create_table(connection, statements)?
209 }
210 Ok(())
211 }
212
213 pub fn check_if_table_exist(
214 connections: &mut Box<[SqliteConnection]>,
215 statements: &KvdbSqliteStatements,
216 ) -> Result<bool> {
217 for connection in connections.iter_mut() {
218 if KvdbSqlite::<ValueType>::check_if_table_exist(
219 connection, statements,
220 )? {
221 return Ok(true);
222 }
223 }
224
225 Ok(false)
226 }
227
228 pub fn drop_table(
229 connections: &mut Box<[SqliteConnection]>,
230 statements: &KvdbSqliteStatements,
231 ) -> Result<()> {
232 for connection in connections.iter_mut() {
233 KvdbSqlite::<ValueType>::drop_table(connection, statements)?;
234 }
235 Ok(())
236 }
237
238 pub fn vacumm_db(
239 connections: &mut Box<[SqliteConnection]>,
240 statements: &KvdbSqliteStatements,
241 ) -> Result<()> {
242 for connection in connections.iter_mut() {
243 KvdbSqlite::<ValueType>::vacuum_db(connection, statements)?
244 }
245 Ok(())
246 }
247}
248
249fn assert_valid_num_shards(num_shards: u16) { assert_eq!(0, 256 % num_shards) }
250
251pub fn key_to_shard_id(key: &[u8], num_shards: usize) -> usize {
252 let k_len = key.len();
253 let b0 = if k_len > 0 {
254 unsafe { *key.get_unchecked(0) as usize }
255 } else {
256 0
257 };
258 let b1 = if k_len > 1 {
259 unsafe { *key.get_unchecked(1) as usize }
260 } else {
261 0
262 };
263 let b2 = if k_len > 2 {
264 unsafe { *key.get_unchecked(2) as usize }
265 } else {
266 0
267 };
268 let b3 = if k_len > 3 {
269 unsafe { *key.get_unchecked(3) as usize }
270 } else {
271 0
272 };
273 let mapped_key = ((b0 + b1) << 4) + b2 + b3;
274 mapped_key % num_shards
275}
276
277pub fn number_key_to_shard_id(key: i64, num_shards: usize) -> usize {
278 let x = (((key >> 52) & 0x0000000000000ff0)
279 + ((key & 0x00ff000000000000) >> 44)) as usize;
280 let y = (((key & 0x0000ff0000000000) >> 40)
281 + ((key & 0x000000ff00000000) >> 32)) as usize;
282 (x + y) % num_shards
283}
284
285pub trait KeyPrefixToMap {
287 fn key_prefix_to_map(&self) -> u32;
288}
289
290impl KeyPrefixToMap for Vec<u8> {
291 fn key_prefix_to_map(&self) -> u32 {
292 let k_len = self.len();
293 let b0 = if k_len > 0 {
294 unsafe { *self.get_unchecked(0) as u32 }
295 } else {
296 0
297 };
298 let b1 = if k_len > 1 {
299 unsafe { *self.get_unchecked(1) as u32 }
300 } else {
301 0
302 };
303 let b2 = if k_len > 2 {
304 unsafe { *self.get_unchecked(2) as u32 }
305 } else {
306 0
307 };
308 let b3 = if k_len > 3 {
309 unsafe { *self.get_unchecked(3) as u32 }
310 } else {
311 0
312 };
313 (b0 << 24) + (b1 << 16) + (b2 << 8) + b3
314 }
315}
316
317impl KeyPrefixToMap for i64 {
318 fn key_prefix_to_map(&self) -> u32 {
319 (*self >> 32) as u32 ^ 0x80000000
322 }
323}
324
325impl<
326 T: ReadImplFamily
327 + KvdbSqliteShardedRefDestructureTrait
328 + KeyValueDbTypes<ValueType = ValueType>,
329 ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
330 > ReadImplByFamily<KvdbSqliteSharded<ValueType>> for T
331{
332 fn get_impl(&self, key: &[u8]) -> Result<Option<Self::ValueType>> {
333 let (maybe_connections, statements) = self.destructure();
334 match maybe_connections {
335 None => Ok(None),
336 Some(connections) => KvdbSqliteBorrowShared::new((
337 connections.get(key_to_shard_id(key, connections.len())),
338 statements,
339 ))
340 .get_impl(key),
341 }
342 }
343
344 fn get_with_number_key_impl(
345 &self, key: i64,
346 ) -> Result<Option<Self::ValueType>> {
347 let (maybe_connections, statements) = self.destructure();
348 match maybe_connections {
349 None => Ok(None),
350 Some(connections) => KvdbSqliteBorrowShared::new((
351 connections.get(number_key_to_shard_id(key, connections.len())),
352 statements,
353 ))
354 .get_with_number_key_impl(key),
355 }
356 }
357}
358
359impl<
360 T: OwnedReadImplFamily
361 + KvdbSqliteShardedDestructureTrait
362 + KeyValueDbTypes<ValueType = ValueType>,
363 ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
364 > OwnedReadImplByFamily<KvdbSqliteSharded<ValueType>> for T
365{
366 fn get_mut_impl(&mut self, key: &[u8]) -> Result<Option<Self::ValueType>> {
367 let (maybe_connections, statements) = self.destructure_mut();
368 match maybe_connections {
369 None => Ok(None),
370 Some(connections) => KvdbSqliteBorrowMut::new((
371 connections.get_mut(key_to_shard_id(key, connections.len())),
372 statements,
373 ))
374 .get_mut_impl(key),
375 }
376 }
377
378 fn get_mut_with_number_key_impl(
379 &mut self, key: i64,
380 ) -> Result<Option<Self::ValueType>> {
381 let (maybe_connections, statements) = self.destructure_mut();
382 match maybe_connections {
383 None => Ok(None),
384 Some(connections) => KvdbSqliteBorrowMut::new((
385 connections
386 .get_mut(number_key_to_shard_id(key, connections.len())),
387 statements,
388 ))
389 .get_mut_with_number_key_impl(key),
390 }
391 }
392}
393
394impl<
395 T: SingleWriterImplFamily
396 + KvdbSqliteShardedDestructureTrait
397 + KeyValueDbTypes<ValueType = ValueType>,
398 ValueType: DbValueType,
399 > SingleWriterImplByFamily<KvdbSqliteSharded<ValueType>> for T
400where ValueType::Type: SqlBindableValue
401 + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
402{
403 fn delete_impl(
404 &mut self, key: &[u8],
405 ) -> Result<Option<Option<Self::ValueType>>> {
406 let (maybe_connections, statements) = self.destructure_mut();
407 match maybe_connections {
408 None => Err(Error::from(Error::DbNotExist)),
409 Some(connections) => KvdbSqliteBorrowMut::<ValueType>::new((
410 connections.get_mut(key_to_shard_id(key, connections.len())),
411 statements,
412 ))
413 .delete_impl(key),
414 }
415 }
416
417 fn delete_with_number_key_impl(
418 &mut self, key: i64,
419 ) -> Result<Option<Option<Self::ValueType>>> {
420 let (maybe_connections, statements) = self.destructure_mut();
421 match maybe_connections {
422 None => Err(Error::from(Error::DbNotExist)),
423 Some(connections) => KvdbSqliteBorrowMut::new((
424 connections
425 .get_mut(number_key_to_shard_id(key, connections.len())),
426 statements,
427 ))
428 .delete_with_number_key_impl(key),
429 }
430 }
431
432 fn put_impl(
433 &mut self, key: &[u8], value: &<Self::ValueType as DbValueType>::Type,
434 ) -> Result<Option<Option<Self::ValueType>>> {
435 let (maybe_connections, statements) = self.destructure_mut();
436 match maybe_connections {
437 None => Err(Error::from(Error::DbNotExist)),
438 Some(connections) => KvdbSqliteBorrowMut::<ValueType>::new((
439 connections.get_mut(key_to_shard_id(key, connections.len())),
440 statements,
441 ))
442 .put_impl(key, value),
443 }
444 }
445
446 fn put_with_number_key_impl(
447 &mut self, key: i64, value: &<Self::ValueType as DbValueType>::Type,
448 ) -> Result<Option<Option<Self::ValueType>>> {
449 let (maybe_connections, statements) = self.destructure_mut();
450 match maybe_connections {
451 None => Err(Error::from(Error::DbNotExist)),
452 Some(connections) => KvdbSqliteBorrowMut::new((
453 connections
454 .get_mut(number_key_to_shard_id(key, connections.len())),
455 statements,
456 ))
457 .put_with_number_key_impl(key, value),
458 }
459 }
460}
461
462pub struct ShardedIterMerger<
463 Key: KeyPrefixToMap,
464 Value,
465 ShardIter: FallibleIterator<Item = (Key, Value)>,
466> {
467 shard_iters: Vec<ShardIter>,
468 ordered_iter_ids: Vec<u8>,
469 ordered_key_prefixes_to_map: Vec<u32>,
470 maybe_next_key_values: Vec<Option<(Key, Value)>>,
471}
472
473impl<
474 Key: KeyPrefixToMap,
475 Value,
476 ShardIter: FallibleIterator<Item = (Key, Value)>,
477 > ShardedIterMerger<Key, Value, ShardIter>
478{
479 pub fn new() -> Self {
480 Self {
481 shard_iters: vec![],
482 ordered_iter_ids: vec![],
483 ordered_key_prefixes_to_map: vec![],
484 maybe_next_key_values: vec![None],
486 }
487 }
488
489 pub fn push_shard_iter(
490 &mut self, mut iter: ShardIter,
491 ) -> std::result::Result<(), ShardIter::Error> {
492 let maybe_first_key_value = iter.next()?;
493 let iter_id = self.shard_iters.len() as u8;
494 self.shard_iters.push(iter);
495 self.kv_push(iter_id, maybe_first_key_value);
496 Ok(())
497 }
498
499 fn kv_push(&mut self, iter_id: u8, kv: Option<(Key, Value)>) {
500 if kv.is_some() {
501 let mut i = 0;
502 let this_key_prefix_to_map =
503 kv.as_ref().unwrap().0.key_prefix_to_map();
504 while self.maybe_next_key_values[i].is_some() {
505 if this_key_prefix_to_map < self.ordered_key_prefixes_to_map[i]
506 {
507 break;
508 }
509 i += 1;
510 }
511 self.ordered_iter_ids.insert(i, iter_id);
512 self.ordered_key_prefixes_to_map
513 .insert(i, this_key_prefix_to_map);
514 self.maybe_next_key_values.insert(i, kv);
515 }
516 }
517
518 fn peek(&self) -> Option<u8> { self.ordered_iter_ids.get(0).cloned() }
519
520 fn kv_pop_and_push(
521 &mut self, iter_id: u8, kv: Option<(Key, Value)>,
522 ) -> std::result::Result<Option<(Key, Value)>, ShardIter::Error> {
523 let mut i;
524 if kv.is_some() {
525 i = 0;
526 let this_key_prefix_to_map =
527 kv.as_ref().unwrap().0.key_prefix_to_map();
528 while self.maybe_next_key_values[i].is_some() {
529 if this_key_prefix_to_map < self.ordered_key_prefixes_to_map[i]
530 {
531 break;
532 }
533 i += 1;
534 }
535 debug_assert!(i > 0);
539 unsafe {
540 let pos = i - 1;
541 if pos > 0 {
542 std::ptr::copy(
543 self.ordered_iter_ids.get_unchecked(1),
544 self.ordered_iter_ids.get_unchecked_mut(0),
545 pos,
546 );
547 }
548 std::ptr::write(
549 self.ordered_iter_ids.get_unchecked_mut(pos),
550 iter_id,
551 );
552 if pos > 0 {
553 std::ptr::copy(
554 self.ordered_key_prefixes_to_map.get_unchecked(1),
555 self.ordered_key_prefixes_to_map.get_unchecked_mut(0),
556 pos,
557 );
558 }
559 std::ptr::write(
560 self.ordered_key_prefixes_to_map.get_unchecked_mut(pos),
561 this_key_prefix_to_map,
562 );
563 let popped = Ok(std::ptr::read(
564 self.maybe_next_key_values.get_unchecked(0),
565 ));
566 if pos > 0 {
567 std::ptr::copy(
568 self.maybe_next_key_values.get_unchecked(1),
569 self.maybe_next_key_values.get_unchecked_mut(0),
570 pos,
571 );
572 }
573 std::ptr::write(
574 self.maybe_next_key_values.get_unchecked_mut(pos),
575 kv,
576 );
577
578 popped
579 }
580 } else {
581 self.ordered_iter_ids.remove(0);
582 self.ordered_key_prefixes_to_map.remove(0);
583 Ok(self.maybe_next_key_values.remove(0))
584 }
585 }
586}
587
588impl<
589 Key: KeyPrefixToMap,
590 Value,
591 ShardIter: FallibleIterator<Item = (Key, Value)>,
592 > FallibleIterator for ShardedIterMerger<Key, Value, ShardIter>
593{
594 type Error = ShardIter::Error;
595 type Item = (Key, Value);
596
597 fn next(&mut self) -> std::result::Result<Option<Self::Item>, Self::Error> {
598 match self.peek() {
599 None => Ok(None),
600 Some(iter_id) => {
601 let next_kv = self.shard_iters[iter_id as usize].next()?;
604 self.kv_pop_and_push(iter_id, next_kv)
605 }
606 }
607 }
608}
609
610pub trait KvdbSqliteShardedDestructureTraitWithValueType:
611 KvdbSqliteShardedDestructureTrait + KeyValueDbTypes
612{
613}
614impl<T: KvdbSqliteShardedDestructureTrait + KeyValueDbTypes>
615 KvdbSqliteShardedDestructureTraitWithValueType for T
616{
617}
618
619pub fn kvdb_sqlite_sharded_iter_range_impl<
621 'db,
622 Key: 'db + KeyPrefixToMap,
623 Value: 'db,
624 F: Clone + FnMut(&Statement<'db>) -> Result<(Key, Value)>,
625>(
626 maybe_shards_connections: Option<&'db mut [SqliteConnection]>,
627 statements: &KvdbSqliteStatements, lower_bound_incl: &[u8],
628 upper_bound_excl: Option<&[u8]>, f: F,
629) -> Result<ShardedIterMerger<Key, Value, MappedRows<'db, F>>> {
630 let mut shards_iter_merger_result = Ok(ShardedIterMerger::new());
631 if let Some(shards_connections) = maybe_shards_connections {
632 let shards_iter_merger = shards_iter_merger_result.as_mut().unwrap();
633 for connection in shards_connections.iter_mut() {
634 let shard_iter = kvdb_sqlite_iter_range_impl(
635 Some(connection),
636 statements,
637 lower_bound_incl,
638 upper_bound_excl,
639 f.clone(),
640 )?;
641 shards_iter_merger.push_shard_iter(shard_iter)?;
642 }
643 }
644
645 shards_iter_merger_result
646}
647
648pub fn kvdb_sqlite_sharded_iter_range_excl_impl<
649 'db,
650 Key: 'db + KeyPrefixToMap,
651 Value: 'db,
652 F: Clone + FnMut(&Statement<'db>) -> Result<(Key, Value)>,
653>(
654 maybe_shards_connections: Option<&'db mut [SqliteConnection]>,
655 statements: &KvdbSqliteStatements, lower_bound_excl: &[u8],
656 upper_bound_excl: &[u8], f: F,
657) -> Result<ShardedIterMerger<Key, Value, MappedRows<'db, F>>> {
658 let mut shards_iter_merger_result = Ok(ShardedIterMerger::new());
659 if let Some(shards_connections) = maybe_shards_connections {
660 let shards_iter_merger = shards_iter_merger_result.as_mut().unwrap();
661 for connection in shards_connections.iter_mut() {
662 let shard_iter = kvdb_sqlite_iter_range_excl_impl(
663 Some(connection),
664 statements,
665 lower_bound_excl,
666 upper_bound_excl,
667 f.clone(),
668 )?;
669 shards_iter_merger.push_shard_iter(shard_iter)?;
670 }
671 }
672
673 shards_iter_merger_result
674}
675
676macro_rules! enable_KvdbIterIterator_for_KvdbSqliteSharded_families {
677 ($ItemKeyType:ty, $ItemValueType:ty, $KeyType:ty) => {
678 impl<'a>
679 WrappedLifetimeFamily<
680 'a,
681 dyn FallibleIterator<
682 Item = ($ItemKeyType, $ItemValueType),
683 Error = Error,
684 >,
685 >
686 for KvdbIterIterator<
687 ($ItemKeyType, $ItemValueType),
688 $KeyType,
689 KvdbSqliteShardedIteratorTag,
690 >
691 {
692 type Out = ShardedIterMerger<
693 $ItemKeyType,
694 $ItemValueType,
695 MappedRows<
696 'a,
697 for<'r, 's> fn(
698 &'r Statement<'s>,
699 )
700 -> Result<($ItemKeyType, $ItemValueType)>,
701 >,
702 >;
703 }
704 impl
705 WrappedTrait<
706 dyn FallibleIterator<
707 Item = ($ItemKeyType, $ItemValueType),
708 Error = Error,
709 >,
710 >
711 for KvdbIterIterator<
712 ($ItemKeyType, $ItemValueType),
713 $KeyType,
714 KvdbSqliteShardedIteratorTag,
715 >
716 {
717 }
718 };
719}
720
721pub struct KvdbSqliteShardedIteratorTag();
722
723enable_KvdbIterIterator_for_KvdbSqliteSharded_families!(
724 Vec<u8>,
725 Box<[u8]>,
726 [u8]
727);
728enable_KvdbIterIterator_for_KvdbSqliteSharded_families!(Vec<u8>, (), [u8]);
729
730macro_rules! make_wrap_of_KeyValueDbIterableTrait_of_KvdbSqliteSharded_families {
731 ($ItemType:ty, $Ttype:ty) => {
732 impl
733 ElementSatisfy<
734 dyn KeyValueDbIterableTrait<
735 $ItemType,
736 [u8],
737 KvdbSqliteShardedIteratorTag,
738 >,
739 > for $Ttype
740 {
741 fn to_constrain_object(
742 &self,
743 ) -> &(dyn KeyValueDbIterableTrait<
744 $ItemType,
745 [u8],
746 KvdbSqliteShardedIteratorTag,
747 > + 'static) {
748 self
749 }
750
751 fn to_constrain_object_mut(
752 &mut self,
753 ) -> &mut (dyn KeyValueDbIterableTrait<
754 $ItemType,
755 [u8],
756 KvdbSqliteShardedIteratorTag,
757 > + 'static) {
758 self
759 }
760 }
761
762 impl
763 WrappedLifetimeFamily<
764 '_,
765 dyn KeyValueDbIterableTrait<
766 $ItemType,
767 [u8],
768 KvdbSqliteShardedIteratorTag,
769 >,
770 > for $Ttype
771 {
772 type Out = Self;
773 }
774 impl
775 WrappedTrait<
776 dyn KeyValueDbIterableTrait<
777 $ItemType,
778 [u8],
779 KvdbSqliteShardedIteratorTag,
780 >,
781 > for $Ttype
782 {
783 }
784 };
785}
786
787make_wrap_of_KeyValueDbIterableTrait_of_KvdbSqliteSharded_families!(
788 MptKeyValue,
789 KvdbSqliteSharded<Box<[u8]>>
790);
791make_wrap_of_KeyValueDbIterableTrait_of_KvdbSqliteSharded_families!(
792 (Vec<u8>, ()),
793 KvdbSqliteSharded<()>
794);
795make_wrap_of_KeyValueDbIterableTrait_of_KvdbSqliteSharded_families!(
796 MptKeyValue,
797 KvdbSqliteShardedBorrowMut<'static, Box<[u8]>>
798);
799
800impl<
803 ValueType: 'static + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
804 T: DerefMutPlusImplOrBorrowMutSelf<
805 dyn KvdbSqliteShardedDestructureTraitWithValueType<
806 ValueType = ValueType,
807 >,
808 >,
809 >
810 KeyValueDbIterableTrait<
811 (Vec<u8>, ValueType),
812 [u8],
813 KvdbSqliteShardedIteratorTag,
814 > for T
815where KvdbIterIterator<(Vec<u8>, ValueType), [u8], KvdbSqliteShardedIteratorTag>:
816 WrappedTrait<
817 dyn FallibleIterator<
818 Item = (Vec<u8>, ValueType),
819 Error = Error,
820 >,
821 > + for<'a> WrappedLifetimeFamily<
822 'a,
823 dyn FallibleIterator<
824 Item = (Vec<u8>, ValueType),
825 Error = Error,
826 >,
827 Out = ShardedIterMerger<
828 Vec<u8>,
829 ValueType,
830 MappedRows<
831 'a,
832 for<'r, 's> fn(
833 &'r Statement<'s>,
834 )
835 -> Result<(Vec<u8>, ValueType)>,
836 >,
837 >,
838 >
839{
840 fn iter_range(
841 &mut self, lower_bound_incl: &[u8], upper_bound_excl: Option<&[u8]>,
842 ) -> Result<
843 Wrap<
844 '_,
845 KvdbIterIterator<
846 (Vec<u8>, ValueType),
847 [u8],
848 KvdbSqliteShardedIteratorTag,
849 >,
850 dyn FallibleIterator<Item = (Vec<u8>, ValueType), Error = Error>,
851 >,
852 > {
853 let (maybe_shards_connections, statements) =
854 self.borrow_mut().destructure_mut();
855 Ok(Wrap(kvdb_sqlite_sharded_iter_range_impl(
856 maybe_shards_connections,
857 statements,
858 lower_bound_incl,
859 upper_bound_excl,
860 KvdbSqlite::<ValueType>::kv_from_iter_row::<Vec<u8>>
861 as for<'r, 's> fn(
862 &'r Statement<'s>,
863 )
864 -> Result<(Vec<u8>, ValueType)>,
865 )?))
866 }
867
868 fn iter_range_excl(
869 &mut self, lower_bound_excl: &[u8], upper_bound_excl: &[u8],
870 ) -> Result<
871 Wrap<
872 '_,
873 KvdbIterIterator<
874 (Vec<u8>, ValueType),
875 [u8],
876 KvdbSqliteShardedIteratorTag,
877 >,
878 dyn FallibleIterator<Item = (Vec<u8>, ValueType), Error = Error>,
879 >,
880 > {
881 let (maybe_shards_connections, statements) =
882 self.borrow_mut().destructure_mut();
883 Ok(Wrap(kvdb_sqlite_sharded_iter_range_excl_impl(
884 maybe_shards_connections,
885 statements,
886 lower_bound_excl,
887 upper_bound_excl,
888 KvdbSqlite::<ValueType>::kv_from_iter_row::<Vec<u8>>
889 as for<'r, 's> fn(
890 &'r Statement<'s>,
891 )
892 -> Result<(Vec<u8>, ValueType)>,
893 )?))
894 }
895}
896
897impl<
899 ValueType: 'static
900 + DbValueType
901 + ValueRead
902 + ValueReadImpl<<ValueType as ValueRead>::Kind>,
903 > KeyValueDbTraitTransactional for KvdbSqliteSharded<ValueType>
904where ValueType::Type: SqlBindableValue
905 + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
906{
907 type TransactionType = KvdbSqliteShardedTransaction<ValueType>;
908
909 fn start_transaction(
910 &self, immediate_write: bool,
911 ) -> Result<KvdbSqliteShardedTransaction<ValueType>> {
912 if self.shards_connections.is_none() {
913 bail!(Error::DbNotExist);
914 }
915
916 KvdbSqliteShardedTransaction::new(self.try_clone()?, immediate_write)
917 }
918}
919
920pub struct KvdbSqliteShardedTransaction<
921 ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
922> where ValueType::Type: SqlBindableValue
923 + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
924{
925 db: KvdbSqliteSharded<ValueType>,
926 committed: bool,
927}
928
929impl<
930 ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
931 > Drop for KvdbSqliteShardedTransaction<ValueType>
932where ValueType::Type: SqlBindableValue
933 + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
934{
935 fn drop(&mut self) {
936 if !self.committed {
937 self.revert().ok();
938 }
939 }
940}
941
942impl<
943 ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
944 > KvdbSqliteShardedTransaction<ValueType>
945where ValueType::Type: SqlBindableValue
946 + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
947{
948 fn new(
949 mut db: KvdbSqliteSharded<ValueType>, immediate_write: bool,
950 ) -> Result<Self> {
951 Self::start_transaction(
952 db.shards_connections.as_mut(),
953 immediate_write,
954 )?;
955 Ok(Self {
956 db,
957 committed: false,
958 })
959 }
960
961 fn start_transaction(
962 maybe_shard_connections: Option<&mut Box<[SqliteConnection]>>,
963 immediate_write: bool,
964 ) -> Result<()> {
965 if let Some(connections) = maybe_shard_connections {
966 for conn in connections.iter_mut() {
967 KvdbSqliteTransaction::<ValueType>::start_transaction(
968 conn.get_db_mut(),
969 immediate_write,
970 )?
971 }
972 }
973 Ok(())
974 }
975}
976
977impl<
978 ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
979 > KeyValueDbTransactionTrait for KvdbSqliteShardedTransaction<ValueType>
980where ValueType::Type: SqlBindableValue
981 + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
982{
983 fn commit(&mut self, _db: &dyn Any) -> Result<()> {
984 self.committed = true;
985 if let Some(connections) = self.db.shards_connections.as_mut() {
986 for conn in connections.iter_mut() {
987 conn.get_db_mut().execute("COMMIT")?;
988 }
989 }
990 Ok(())
991 }
992
993 fn revert(&mut self) -> Result<()> {
994 self.committed = true;
995 if let Some(connections) = self.db.shards_connections.as_mut() {
996 for conn in connections.iter_mut() {
997 conn.get_db_mut().execute("ROLLBACK")?;
998 }
999 }
1000 Ok(())
1001 }
1002
1003 fn restart(
1004 &mut self, immediate_write: bool, no_revert: bool,
1005 ) -> Result<()> {
1006 if !no_revert {
1007 self.revert()?;
1008 }
1009 Self::start_transaction(
1010 self.db.shards_connections.as_mut(),
1011 immediate_write,
1012 )
1013 }
1014}
1015
1016impl<
1017 ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
1018 > KeyValueDbTypes for KvdbSqliteShardedTransaction<ValueType>
1019where ValueType::Type: SqlBindableValue
1020 + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
1021{
1022 type ValueType = ValueType;
1023}
1024
1025impl KeyValueDbTraitMultiReader for KvdbSqliteSharded<Box<[u8]>> {}
1026impl OwnedReadImplFamily for &KvdbSqliteSharded<Box<[u8]>> {
1027 type FamilyRepresentative =
1028 dyn KeyValueDbTraitMultiReader<ValueType = Box<[u8]>>;
1029}
1030impl DeltaDbTrait for KvdbSqliteSharded<Box<[u8]>> {}
1031
1032impl<ValueType: DbValueType> KeyValueDbTypes for KvdbSqliteSharded<ValueType> {
1033 type ValueType = ValueType;
1034}
1035
1036impl<ValueType: DbValueType> KeyValueDbTypes
1037 for KvdbSqliteShardedBorrowMut<'_, ValueType>
1038{
1039 type ValueType = ValueType;
1040}
1041
1042impl<ValueType: DbValueType> KeyValueDbTypes
1043 for KvdbSqliteShardedBorrowShared<'_, ValueType>
1044{
1045 type ValueType = ValueType;
1046}
1047
1048impl<ValueType: DbValueType> KvdbSqliteShardedRefDestructureTrait
1049 for KvdbSqliteSharded<ValueType>
1050{
1051 fn destructure(
1052 &self,
1053 ) -> (Option<&[SqliteConnection]>, &KvdbSqliteStatements) {
1054 (
1055 self.shards_connections.as_ref().map(|r| &**r),
1056 &self.statements,
1057 )
1058 }
1059}
1060
1061impl<ValueType: DbValueType> KvdbSqliteShardedDestructureTrait
1062 for KvdbSqliteSharded<ValueType>
1063{
1064 fn destructure_mut(
1065 &mut self,
1066 ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
1067 (
1068 self.shards_connections.as_mut().map(|r| &mut **r),
1069 &self.statements,
1070 )
1071 }
1072}
1073
1074impl<
1075 ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
1076 > KvdbSqliteShardedDestructureTrait
1077 for KvdbSqliteShardedTransaction<ValueType>
1078where ValueType::Type: SqlBindableValue
1079 + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
1080{
1081 fn destructure_mut(
1082 &mut self,
1083 ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
1084 self.db.destructure_mut()
1085 }
1086}
1087
1088impl<ValueType: DbValueType> KvdbSqliteShardedRefDestructureTrait
1089 for KvdbSqliteShardedBorrowMut<'_, ValueType>
1090{
1091 fn destructure(
1092 &self,
1093 ) -> (Option<&[SqliteConnection]>, &KvdbSqliteStatements) {
1094 (
1095 self.shards_connections.clone().map(|r| unsafe { &*r }),
1096 &self.statements,
1097 )
1098 }
1099}
1100
1101impl<ValueType: DbValueType> KvdbSqliteShardedDestructureTrait
1102 for KvdbSqliteShardedBorrowMut<'_, ValueType>
1103{
1104 fn destructure_mut(
1105 &mut self,
1106 ) -> (Option<&mut [SqliteConnection]>, &KvdbSqliteStatements) {
1107 (
1108 self.shards_connections.clone().map(|r| unsafe { &mut *r }),
1109 &self.statements,
1110 )
1111 }
1112}
1113
1114impl<ValueType: DbValueType> KvdbSqliteShardedRefDestructureTrait
1115 for KvdbSqliteShardedBorrowShared<'_, ValueType>
1116{
1117 fn destructure(
1118 &self,
1119 ) -> (Option<&[SqliteConnection]>, &KvdbSqliteStatements) {
1120 (
1121 self.shards_connections.clone().map(|r| unsafe { &*r }),
1122 &self.statements,
1123 )
1124 }
1125}
1126
1127impl<ValueType> ReadImplFamily for KvdbSqliteSharded<ValueType> {
1128 type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1129}
1130
1131impl<ValueType> OwnedReadImplFamily for KvdbSqliteSharded<ValueType> {
1132 type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1133}
1134
1135impl<ValueType> SingleWriterImplFamily for KvdbSqliteSharded<ValueType> {
1136 type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1137}
1138
1139impl<ValueType> ReadImplFamily for KvdbSqliteShardedBorrowMut<'_, ValueType> {
1140 type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1141}
1142
1143impl<
1144 ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
1145 > OwnedReadImplFamily for KvdbSqliteShardedTransaction<ValueType>
1146where ValueType::Type: SqlBindableValue
1147 + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
1148{
1149 type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1150}
1151
1152impl<
1153 ValueType: DbValueType + ValueRead + ValueReadImpl<<ValueType as ValueRead>::Kind>,
1154 > SingleWriterImplFamily for KvdbSqliteShardedTransaction<ValueType>
1155where ValueType::Type: SqlBindableValue
1156 + BindValueAppendImpl<<ValueType::Type as SqlBindableValue>::Kind>
1157{
1158 type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1159}
1160
1161impl<ValueType> OwnedReadImplFamily
1162 for KvdbSqliteShardedBorrowMut<'_, ValueType>
1163{
1164 type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1165}
1166
1167impl<ValueType> SingleWriterImplFamily
1168 for KvdbSqliteShardedBorrowMut<'_, ValueType>
1169{
1170 type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1171}
1172
1173impl<ValueType> ReadImplFamily
1174 for KvdbSqliteShardedBorrowShared<'_, ValueType>
1175{
1176 type FamilyRepresentative = KvdbSqliteSharded<ValueType>;
1177}
1178
1179enable_deref_for_self! {KvdbSqliteSharded<Box<[u8]>>}
1180enable_deref_for_self! {KvdbSqliteSharded<()>}
1181enable_deref_for_self! {KvdbSqliteShardedBorrowShared<'_, Box<[u8]>>}
1182enable_deref_for_self! {KvdbSqliteShardedBorrowMut<'_, Box<[u8]>>}
1183enable_deref_mut_plus_impl_or_borrow_mut_self!(
1184 KvdbSqliteShardedDestructureTraitWithValueType<ValueType = Box<[u8]>>
1185);
1186enable_deref_mut_plus_impl_or_borrow_mut_self!(
1187 KvdbSqliteShardedDestructureTraitWithValueType<ValueType = ()>
1188);
1189
1190use crate::{
1191 impls::{
1192 errors::*,
1193 storage_db::{
1194 kvdb_sqlite::{
1195 kvdb_sqlite_iter_range_excl_impl, kvdb_sqlite_iter_range_impl,
1196 KvdbSqliteBorrowMut, KvdbSqliteBorrowShared,
1197 KvdbSqliteTransaction,
1198 },
1199 sqlite::{
1200 BindValueAppendImpl, MappedRows, SqlBindableValue, ValueRead,
1201 ValueReadImpl,
1202 },
1203 },
1204 },
1205 storage_db::{
1206 DbValueType, DeltaDbTrait, KeyValueDbIterableTrait,
1207 KeyValueDbTraitMultiReader, KeyValueDbTraitTransactional,
1208 KeyValueDbTransactionTrait, KeyValueDbTypes, KvdbIterIterator,
1209 OwnedReadImplByFamily, OwnedReadImplFamily, ReadImplByFamily,
1210 ReadImplFamily, SingleWriterImplByFamily, SingleWriterImplFamily,
1211 },
1212 utils::{
1213 deref_plus_impl_or_borrow_self::*,
1214 tuple::ElementSatisfy,
1215 wrap::{Wrap, WrappedLifetimeFamily, WrappedTrait},
1216 },
1217 KvdbSqlite, KvdbSqliteStatements, MptKeyValue, SqliteConnection,
1218};
1219use fallible_iterator::FallibleIterator;
1220use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
1221use sqlite::Statement;
1222use std::{
1223 any::Any,
1224 marker::PhantomData,
1225 path::{Path, PathBuf},
1226 sync::Arc,
1227};