1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

// Copyright 2021 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/
use crate::pos::{
    mempool as diem_mempool,
    state_sync::{
        client::{CoordinatorMessage, StateSyncClient},
        coordinator::StateSyncCoordinator,
        executor_proxy::{ExecutorProxy, ExecutorProxyTrait},
    },
};
use diem_config::config::NodeConfig;
use diem_types::waypoint::Waypoint;
use executor_types::ChunkExecutor;
use futures::channel::mpsc;
use std::{boxed::Box, sync::Arc};
use storage_interface::DbReader;
use subscription_service::ReconfigSubscription;
use tokio::runtime::{Builder, Runtime};

/// Creates and bootstraps new state syncs and creates clients for
/// communicating with those state syncs.
pub struct StateSyncBootstrapper {
    _runtime: Runtime,
    coordinator_sender: mpsc::UnboundedSender<CoordinatorMessage>,
}

impl StateSyncBootstrapper {
    pub fn bootstrap(
        /* network: Vec<(NodeNetworkId, StateSyncSender, StateSyncEvents)>, */
        state_sync_to_mempool_sender: mpsc::Sender<
            diem_mempool::CommitNotification,
        >,
        storage: Arc<dyn DbReader>, executor: Box<dyn ChunkExecutor>,
        node_config: &NodeConfig, waypoint: Waypoint,
        reconfig_event_subscriptions: Vec<ReconfigSubscription>,
    ) -> Self {
        let runtime = Builder::new_multi_thread()
            .thread_name("state-sync")
            .enable_all()
            .build()
            .expect("[State Sync] Failed to create runtime!");

        let executor_proxy =
            ExecutorProxy::new(storage, executor, reconfig_event_subscriptions);
        Self::bootstrap_with_executor_proxy(
            runtime,
            //network,
            state_sync_to_mempool_sender,
            node_config,
            waypoint,
            executor_proxy,
        )
    }

    pub fn bootstrap_with_executor_proxy<E: ExecutorProxyTrait + 'static>(
        runtime: Runtime,
        /* network: Vec<(NodeNetworkId, StateSyncSender, StateSyncEvents)>, */
        state_sync_to_mempool_sender: mpsc::Sender<
            diem_mempool::CommitNotification,
        >,
        node_config: &NodeConfig, waypoint: Waypoint, executor_proxy: E,
    ) -> Self {
        let (coordinator_sender, coordinator_receiver) = mpsc::unbounded();
        let initial_state = executor_proxy
            .get_local_storage_state()
            .expect("[State Sync] Starting failure: cannot sync with storage!");
        /*let network_senders: HashMap<_, _> = network
        .iter()
        .map(|(network_id, sender, _events)| {
            (network_id.clone(), sender.clone())
        })
        .collect();*/

        let coordinator = StateSyncCoordinator::new(
            coordinator_receiver,
            state_sync_to_mempool_sender,
            //network_senders,
            node_config,
            waypoint,
            executor_proxy,
            initial_state,
        )
        .expect("[State Sync] Unable to create state sync coordinator!");
        runtime.spawn(coordinator.start(/*network*/));

        Self {
            _runtime: runtime,
            coordinator_sender,
        }
    }

    pub fn create_client(&self, commit_timeout_secs: u64) -> StateSyncClient {
        StateSyncClient::new(
            self.coordinator_sender.clone(),
            commit_timeout_secs,
        )
    }
}