use crate::pos::{
mempool as diem_mempool,
state_sync::{
client::{CoordinatorMessage, StateSyncClient},
coordinator::StateSyncCoordinator,
executor_proxy::{ExecutorProxy, ExecutorProxyTrait},
},
};
use diem_config::config::NodeConfig;
use diem_types::waypoint::Waypoint;
use executor_types::ChunkExecutor;
use futures::channel::mpsc;
use std::{boxed::Box, sync::Arc};
use storage_interface::DbReader;
use subscription_service::ReconfigSubscription;
use tokio::runtime::{Builder, Runtime};
pub struct StateSyncBootstrapper {
_runtime: Runtime,
coordinator_sender: mpsc::UnboundedSender<CoordinatorMessage>,
}
impl StateSyncBootstrapper {
pub fn bootstrap(
state_sync_to_mempool_sender: mpsc::Sender<
diem_mempool::CommitNotification,
>,
storage: Arc<dyn DbReader>, executor: Box<dyn ChunkExecutor>,
node_config: &NodeConfig, waypoint: Waypoint,
reconfig_event_subscriptions: Vec<ReconfigSubscription>,
) -> Self {
let runtime = Builder::new_multi_thread()
.thread_name("state-sync")
.enable_all()
.build()
.expect("[State Sync] Failed to create runtime!");
let executor_proxy =
ExecutorProxy::new(storage, executor, reconfig_event_subscriptions);
Self::bootstrap_with_executor_proxy(
runtime,
state_sync_to_mempool_sender,
node_config,
waypoint,
executor_proxy,
)
}
pub fn bootstrap_with_executor_proxy<E: ExecutorProxyTrait + 'static>(
runtime: Runtime,
state_sync_to_mempool_sender: mpsc::Sender<
diem_mempool::CommitNotification,
>,
node_config: &NodeConfig, waypoint: Waypoint, executor_proxy: E,
) -> Self {
let (coordinator_sender, coordinator_receiver) = mpsc::unbounded();
let initial_state = executor_proxy
.get_local_storage_state()
.expect("[State Sync] Starting failure: cannot sync with storage!");
let coordinator = StateSyncCoordinator::new(
coordinator_receiver,
state_sync_to_mempool_sender,
node_config,
waypoint,
executor_proxy,
initial_state,
)
.expect("[State Sync] Unable to create state sync coordinator!");
runtime.spawn(coordinator.start());
Self {
_runtime: runtime,
coordinator_sender,
}
}
pub fn create_client(&self, commit_timeout_secs: u64) -> StateSyncClient {
StateSyncClient::new(
self.coordinator_sender.clone(),
commit_timeout_secs,
)
}
}