cfxcore/pos/state_sync/
bootstrapper.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7use crate::pos::{
8    mempool as diem_mempool,
9    state_sync::{
10        client::{CoordinatorMessage, StateSyncClient},
11        coordinator::StateSyncCoordinator,
12        executor_proxy::{ExecutorProxy, ExecutorProxyTrait},
13    },
14};
15use diem_config::config::NodeConfig;
16use diem_types::waypoint::Waypoint;
17use executor_types::ChunkExecutor;
18use futures::channel::mpsc;
19use std::{boxed::Box, sync::Arc};
20use storage_interface::DbReader;
21use subscription_service::ReconfigSubscription;
22use tokio::runtime::{Builder, Runtime};
23
24/// Creates and bootstraps new state syncs and creates clients for
25/// communicating with those state syncs.
26pub struct StateSyncBootstrapper {
27    _runtime: Runtime,
28    coordinator_sender: mpsc::UnboundedSender<CoordinatorMessage>,
29}
30
31impl StateSyncBootstrapper {
32    pub fn bootstrap(
33        /* network: Vec<(NodeNetworkId, StateSyncSender, StateSyncEvents)>, */
34        state_sync_to_mempool_sender: mpsc::Sender<
35            diem_mempool::CommitNotification,
36        >,
37        storage: Arc<dyn DbReader>, executor: Box<dyn ChunkExecutor>,
38        node_config: &NodeConfig, waypoint: Waypoint,
39        reconfig_event_subscriptions: Vec<ReconfigSubscription>,
40    ) -> Self {
41        let runtime = Builder::new_multi_thread()
42            .thread_name("state-sync")
43            .enable_all()
44            .build()
45            .expect("[State Sync] Failed to create runtime!");
46
47        let executor_proxy =
48            ExecutorProxy::new(storage, executor, reconfig_event_subscriptions);
49        Self::bootstrap_with_executor_proxy(
50            runtime,
51            //network,
52            state_sync_to_mempool_sender,
53            node_config,
54            waypoint,
55            executor_proxy,
56        )
57    }
58
59    pub fn bootstrap_with_executor_proxy<E: ExecutorProxyTrait + 'static>(
60        runtime: Runtime,
61        /* network: Vec<(NodeNetworkId, StateSyncSender, StateSyncEvents)>, */
62        state_sync_to_mempool_sender: mpsc::Sender<
63            diem_mempool::CommitNotification,
64        >,
65        node_config: &NodeConfig, waypoint: Waypoint, executor_proxy: E,
66    ) -> Self {
67        let (coordinator_sender, coordinator_receiver) = mpsc::unbounded();
68        let initial_state = executor_proxy
69            .get_local_storage_state()
70            .expect("[State Sync] Starting failure: cannot sync with storage!");
71        /*let network_senders: HashMap<_, _> = network
72        .iter()
73        .map(|(network_id, sender, _events)| {
74            (network_id.clone(), sender.clone())
75        })
76        .collect();*/
77
78        let coordinator = StateSyncCoordinator::new(
79            coordinator_receiver,
80            state_sync_to_mempool_sender,
81            //network_senders,
82            node_config,
83            waypoint,
84            executor_proxy,
85            initial_state,
86        )
87        .expect("[State Sync] Unable to create state sync coordinator!");
88        runtime.spawn(coordinator.start(/*network*/));
89
90        Self {
91            _runtime: runtime,
92            coordinator_sender,
93        }
94    }
95
96    pub fn create_client(&self, commit_timeout_secs: u64) -> StateSyncClient {
97        StateSyncClient::new(
98            self.coordinator_sender.clone(),
99            commit_timeout_secs,
100        )
101    }
102}