cfxcore/pos/state_sync/
client.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::pos::{
9    mempool::CommitResponse,
10    state_sync::{counters, error::Error, shared_components::SyncState},
11};
12use diem_logger::prelude::*;
13use diem_types::{
14    contract_event::ContractEvent, ledger_info::LedgerInfoWithSignatures,
15    transaction::Transaction,
16};
17use futures::{
18    channel::{mpsc, oneshot},
19    future::Future,
20    SinkExt,
21};
22use std::time::{Duration, SystemTime};
23use tokio::time::timeout;
24
25/// A sync request for a specified target ledger info.
26pub struct SyncRequest {
27    pub callback: oneshot::Sender<Result<(), Error>>,
28    pub last_commit_timestamp: SystemTime,
29    pub target: LedgerInfoWithSignatures,
30}
31
32/// A commit notification to notify state sync of new commits.
33pub struct CommitNotification {
34    pub callback: oneshot::Sender<Result<CommitResponse, Error>>,
35    pub committed_transactions: Vec<Transaction>,
36    pub reconfiguration_events: Vec<ContractEvent>,
37}
38
39/// Messages used by the StateSyncClient for communication with the
40/// StateSyncCoordinator.
41pub enum CoordinatorMessage {
42    SyncRequest(Box<SyncRequest>), /* Initiate a new sync request for a
43                                    * given target. */
44    CommitNotification(Box<CommitNotification>), // Notify state sync about
45    // committed transactions.
46    GetSyncState(oneshot::Sender<SyncState>), // Return the local sync state.
47    WaitForInitialization(oneshot::Sender<Result<(), Error>>), /* Wait until state sync is initialized to the waypoint. */
48}
49
50/// A client used for communicating with a StateSyncCoordinator.
51pub struct StateSyncClient {
52    coordinator_sender: mpsc::UnboundedSender<CoordinatorMessage>,
53
54    /// Timeout for the StateSyncClient to receive an ack when executing
55    /// commit().
56    commit_timeout_ms: u64,
57}
58
59impl StateSyncClient {
60    pub fn new(
61        coordinator_sender: mpsc::UnboundedSender<CoordinatorMessage>,
62        commit_timeout_ms: u64,
63    ) -> Self {
64        Self {
65            coordinator_sender,
66            commit_timeout_ms,
67        }
68    }
69
70    /// Sync node's state to target ledger info (LI).
71    /// In case of success (`Result::Ok`) the LI of storage is at the given
72    /// target.
73    pub fn sync_to(
74        &self, target: LedgerInfoWithSignatures,
75    ) -> impl Future<Output = Result<(), Error>> {
76        let mut sender = self.coordinator_sender.clone();
77        let (cb_sender, cb_receiver) = oneshot::channel();
78        let request = SyncRequest {
79            callback: cb_sender,
80            target,
81            last_commit_timestamp: SystemTime::now(),
82        };
83
84        async move {
85            sender
86                .send(CoordinatorMessage::SyncRequest(Box::new(request)))
87                .await?;
88            cb_receiver.await?
89        }
90    }
91
92    /// Notifies state sync about newly committed transactions.
93    pub fn commit(
94        &self, committed_txns: Vec<Transaction>,
95        reconfig_events: Vec<ContractEvent>,
96    ) -> impl Future<Output = Result<(), Error>> {
97        let mut sender = self.coordinator_sender.clone();
98        let (cb_sender, cb_receiver) = oneshot::channel();
99
100        let commit_timeout_ms = self.commit_timeout_ms;
101        let notification = CommitNotification {
102            callback: cb_sender,
103            committed_transactions: committed_txns,
104            reconfiguration_events: reconfig_events,
105        };
106        diem_debug!(
107            "state_sync::commit: {} reconfig events",
108            notification.reconfiguration_events.len()
109        );
110
111        async move {
112            sender
113                .send(CoordinatorMessage::CommitNotification(Box::new(
114                    notification,
115                )))
116                .await?;
117
118            match timeout(Duration::from_millis(commit_timeout_ms), cb_receiver)
119                .await
120            {
121                Err(_) => {
122                    counters::COMMIT_FLOW_FAIL
123                        .with_label_values(&[counters::STATE_SYNC_LABEL])
124                        .inc();
125                    Err(Error::UnexpectedError(
126                        "State sync client timeout: failed to receive commit() ack in time!".into(),
127                    ))
128                }
129                Ok(response) => {
130                    let response = response??; // Unwrap the futures result to get the body
131                    if response.success {
132                        Ok(())
133                    } else {
134                        Err(Error::UnexpectedError(format!(
135                            "State sync client failed: commit() returned an error: {:?}",
136                            response.error_message
137                        )))
138                    }
139                }
140            }
141        }
142    }
143
144    /// Returns information about the state sync internal state. This should
145    /// only be used by tests.
146    // TODO(joshlind): remove this once unit tests are added!
147    pub fn get_state(&self) -> impl Future<Output = Result<SyncState, Error>> {
148        let mut sender = self.coordinator_sender.clone();
149        let (cb_sender, cb_receiver) = oneshot::channel();
150
151        async move {
152            sender
153                .send(CoordinatorMessage::GetSyncState(cb_sender))
154                .await?;
155            cb_receiver.await.map_err(|error| error.into())
156        }
157    }
158
159    /// Waits until state sync is caught up with the waypoint specified in the
160    /// local config.
161    pub fn wait_until_initialized(
162        &self,
163    ) -> impl Future<Output = Result<(), Error>> {
164        let mut sender = self.coordinator_sender.clone();
165        let (cb_sender, cb_receiver) = oneshot::channel();
166
167        async move {
168            sender
169                .send(CoordinatorMessage::WaitForInitialization(cb_sender))
170                .await?;
171            cb_receiver.await?
172        }
173    }
174}