cfxcore/pos/state_sync/
bootstrapper.rs1use crate::pos::{
8 mempool as diem_mempool,
9 state_sync::{
10 client::{CoordinatorMessage, StateSyncClient},
11 coordinator::StateSyncCoordinator,
12 executor_proxy::{ExecutorProxy, ExecutorProxyTrait},
13 },
14};
15use diem_config::config::NodeConfig;
16use diem_types::waypoint::Waypoint;
17use executor_types::ChunkExecutor;
18use futures::channel::mpsc;
19use std::{boxed::Box, sync::Arc};
20use storage_interface::DbReader;
21use subscription_service::ReconfigSubscription;
22use tokio::runtime::{Builder, Runtime};
23
24pub struct StateSyncBootstrapper {
27 _runtime: Runtime,
28 coordinator_sender: mpsc::UnboundedSender<CoordinatorMessage>,
29}
30
31impl StateSyncBootstrapper {
32 pub fn bootstrap(
33 state_sync_to_mempool_sender: mpsc::Sender<
35 diem_mempool::CommitNotification,
36 >,
37 storage: Arc<dyn DbReader>, executor: Box<dyn ChunkExecutor>,
38 node_config: &NodeConfig, waypoint: Waypoint,
39 reconfig_event_subscriptions: Vec<ReconfigSubscription>,
40 ) -> Self {
41 let runtime = Builder::new_multi_thread()
42 .thread_name("state-sync")
43 .enable_all()
44 .build()
45 .expect("[State Sync] Failed to create runtime!");
46
47 let executor_proxy =
48 ExecutorProxy::new(storage, executor, reconfig_event_subscriptions);
49 Self::bootstrap_with_executor_proxy(
50 runtime,
51 state_sync_to_mempool_sender,
53 node_config,
54 waypoint,
55 executor_proxy,
56 )
57 }
58
59 pub fn bootstrap_with_executor_proxy<E: ExecutorProxyTrait + 'static>(
60 runtime: Runtime,
61 state_sync_to_mempool_sender: mpsc::Sender<
63 diem_mempool::CommitNotification,
64 >,
65 node_config: &NodeConfig, waypoint: Waypoint, executor_proxy: E,
66 ) -> Self {
67 let (coordinator_sender, coordinator_receiver) = mpsc::unbounded();
68 let initial_state = executor_proxy
69 .get_local_storage_state()
70 .expect("[State Sync] Starting failure: cannot sync with storage!");
71 let coordinator = StateSyncCoordinator::new(
79 coordinator_receiver,
80 state_sync_to_mempool_sender,
81 node_config,
83 waypoint,
84 executor_proxy,
85 initial_state,
86 )
87 .expect("[State Sync] Unable to create state sync coordinator!");
88 runtime.spawn(coordinator.start());
89
90 Self {
91 _runtime: runtime,
92 coordinator_sender,
93 }
94 }
95
96 pub fn create_client(&self, commit_timeout_secs: u64) -> StateSyncClient {
97 StateSyncClient::new(
98 self.coordinator_sender.clone(),
99 commit_timeout_secs,
100 )
101 }
102}