cfxcore/pos/state_sync/
coordinator.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::pos::{
9    mempool::{CommitNotification, CommitResponse, CommittedTransaction},
10    state_sync::{
11        client::{CoordinatorMessage, SyncRequest},
12        executor_proxy::{ExecutorProxyTrait, SyncState},
13        logging::{LogEntry, LogEvent, LogSchema},
14    },
15};
16use diem_config::config::{NodeConfig, RoleType, StateSyncConfig};
17use diem_logger::prelude::*;
18use diem_types::{contract_event::ContractEvent, transaction::Transaction};
19use futures::{
20    channel::{mpsc, oneshot},
21    StreamExt,
22};
23use std::time::{Duration, SystemTime};
24use tokio::time::{interval, timeout};
25use tokio_stream::wrappers::IntervalStream;
26
27/// Coordination of the state sync process is driven by StateSyncCoordinator.
28/// The `start()` function runs an infinite event loop and triggers actions
29/// based on external and internal (local) requests.
30pub(crate) struct StateSyncCoordinator<T> {
31    client_events: mpsc::UnboundedReceiver<CoordinatorMessage>,
32    state_sync_to_mempool_sender: mpsc::Sender<CommitNotification>,
33    local_state: SyncState,
34    config: StateSyncConfig,
35    role: RoleType,
36    sync_request: Option<SyncRequest>,
37    executor_proxy: T,
38}
39
40impl<T: ExecutorProxyTrait> StateSyncCoordinator<T> {
41    pub fn new(
42        client_events: mpsc::UnboundedReceiver<CoordinatorMessage>,
43        state_sync_to_mempool_sender: mpsc::Sender<CommitNotification>,
44        node_config: &NodeConfig, executor_proxy: T, initial_state: SyncState,
45    ) -> Result<Self, anyhow::Error> {
46        let role = node_config.base.role;
47
48        Ok(Self {
49            client_events,
50            state_sync_to_mempool_sender,
51            local_state: initial_state,
52            config: node_config.state_sync.clone(),
53            role,
54            sync_request: None,
55            executor_proxy,
56        })
57    }
58
59    /// main routine. starts sync coordinator that listens for CoordinatorMsg
60    pub async fn start(mut self) {
61        diem_info!(LogSchema::new(LogEntry::RuntimeStart));
62        let mut interval = IntervalStream::new(interval(
63            Duration::from_millis(self.config.tick_interval_ms),
64        ))
65        .fuse();
66
67        loop {
68            ::futures::select! {
69                msg = self.client_events.select_next_some() => {
70                    match msg {
71                        CoordinatorMessage::SyncRequest(_request) => {
72                            // Sync requests are no longer handled via
73                            // the network state sync layer.
74                        }
75                        CoordinatorMessage::CommitNotification(notification) => {
76                            if let Err(e) = self.process_commit_notification(
77                                notification.committed_transactions,
78                                Some(notification.callback),
79                                notification.reconfiguration_events,
80                            ).await {
81                                diem_error!(
82                                    LogSchema::event_log(
83                                        LogEntry::ConsensusCommit,
84                                        LogEvent::PostCommitFail
85                                    ),
86                                    "Failed to process commit notification: {:?}",
87                                    e
88                                );
89                            }
90                        }
91                        CoordinatorMessage::WaitForInitialization(cb_sender) => {
92                            // Always initialized (waypoint removed).
93                            if let Err(e) = Self::send_initialization_callback(cb_sender) {
94                                diem_error!(
95                                    "Failed to send initialization callback: {:?}",
96                                    e
97                                );
98                            }
99                        }
100                    };
101                },
102                _ = interval.select_next_some() => {
103                    if let Err(e) = self.check_progress() {
104                        diem_error!(
105                            LogSchema::event_log(
106                                LogEntry::ProgressCheck,
107                                LogEvent::Fail
108                            ),
109                            "Progress check failed: {:?}",
110                            e
111                        );
112                    }
113                }
114            }
115        }
116    }
117
118    fn sync_state_with_local_storage(&mut self) -> Result<(), anyhow::Error> {
119        let new_state = self.executor_proxy.get_local_storage_state()?;
120        if new_state.trusted_epoch() > self.local_state.trusted_epoch() {
121            diem_info!(LogSchema::new(LogEntry::EpochChange)
122                .old_epoch(self.local_state.trusted_epoch())
123                .new_epoch(new_state.trusted_epoch()));
124        }
125        self.local_state = new_state;
126        Ok(())
127    }
128
129    /// This method updates state sync to process new transactions that have
130    /// been committed to storage (e.g., through consensus).
131    async fn process_commit_notification(
132        &mut self, committed_transactions: Vec<Transaction>,
133        commit_callback: Option<
134            oneshot::Sender<Result<CommitResponse, anyhow::Error>>,
135        >,
136        reconfiguration_events: Vec<ContractEvent>,
137    ) -> Result<(), anyhow::Error> {
138        diem_debug!(
139            "process_commit_notification: {} events",
140            reconfiguration_events.len()
141        );
142        self.sync_state_with_local_storage()?;
143
144        // Notify mempool of commit
145        let commit_response = match self
146            .notify_mempool_of_committed_transactions(committed_transactions)
147            .await
148        {
149            Ok(()) => CommitResponse::success(),
150            Err(error) => {
151                diem_error!(
152                    LogSchema::new(LogEntry::CommitFlow),
153                    "Failed to notify mempool: {:?}",
154                    error
155                );
156                CommitResponse::error(error.to_string())
157            }
158        };
159
160        // Notify consensus of the commit response
161        if let Err(error) = self.notify_consensus_of_commit_response(
162            commit_response,
163            commit_callback,
164        ) {
165            diem_error!(
166                LogSchema::new(LogEntry::CommitFlow),
167                "Failed to notify consensus: {:?}",
168                error
169            );
170        }
171
172        if let Some(req) = self.sync_request.as_mut() {
173            req.last_commit_timestamp = SystemTime::now();
174        }
175
176        // Check if we hit the sync request target
177        let synced_version = self.local_state.synced_version();
178        self.check_sync_request_completed(synced_version)?;
179
180        // Publish the on chain config updates
181        if let Err(error) = self
182            .executor_proxy
183            .publish_on_chain_config_updates(reconfiguration_events)
184        {
185            diem_error!(
186                LogSchema::event_log(LogEntry::Reconfig, LogEvent::Fail),
187                "Failed to publish reconfig updates: {:?}",
188                error
189            );
190        }
191
192        Ok(())
193    }
194
195    fn check_sync_request_completed(
196        &mut self, synced_version: u64,
197    ) -> Result<(), anyhow::Error> {
198        if let Some(sync_request) = self.sync_request.as_ref() {
199            let sync_target_version =
200                sync_request.target.ledger_info().version();
201            if synced_version > sync_target_version {
202                return Err(anyhow::anyhow!(
203                    "Synced beyond the target version. Synced version: {}, target version: {}",
204                    synced_version,
205                    sync_target_version,
206                ));
207            }
208            if synced_version == sync_target_version {
209                let committed_version = self.local_state.committed_version();
210                let local_epoch = self.local_state.trusted_epoch();
211                diem_info!(LogSchema::event_log(
212                    LogEntry::SyncRequest,
213                    LogEvent::Complete
214                )
215                .local_li_version(committed_version)
216                .local_synced_version(synced_version)
217                .local_epoch(local_epoch));
218                if let Some(sync_request) = self.sync_request.take() {
219                    Self::send_sync_req_callback(sync_request, Ok(()))?;
220                }
221            }
222        }
223
224        Ok(())
225    }
226
227    fn notify_consensus_of_commit_response(
228        &self, commit_response: CommitResponse,
229        callback: Option<
230            oneshot::Sender<Result<CommitResponse, anyhow::Error>>,
231        >,
232    ) -> Result<(), anyhow::Error> {
233        if let Some(callback) = callback {
234            if let Err(error) = callback.send(Ok(commit_response)) {
235                return Err(anyhow::anyhow!(
236                    "Failed to send commit ACK to consensus!: {:?}",
237                    error
238                ));
239            }
240        }
241        Ok(())
242    }
243
244    async fn notify_mempool_of_committed_transactions(
245        &mut self, committed_transactions: Vec<Transaction>,
246    ) -> Result<(), anyhow::Error> {
247        let user_transactions = committed_transactions
248            .iter()
249            .filter_map(|transaction| match transaction {
250                Transaction::UserTransaction(signed_txn) => {
251                    Some(CommittedTransaction {
252                        sender: signed_txn.sender(),
253                        hash: signed_txn.hash(),
254                    })
255                }
256                _ => None,
257            })
258            .collect();
259
260        let (callback_sender, callback_receiver) = oneshot::channel();
261        let req = CommitNotification {
262            transactions: user_transactions,
263            block_timestamp_usecs: self
264                .local_state
265                .committed_ledger_info()
266                .ledger_info()
267                .timestamp_usecs(),
268            callback: callback_sender,
269        };
270
271        if let Err(error) = self.state_sync_to_mempool_sender.try_send(req) {
272            Err(anyhow::anyhow!(
273                "Failed to notify mempool of committed transactions! Error: {:?}",
274                error
275            ))
276        } else if let Err(error) = timeout(
277            Duration::from_millis(self.config.mempool_commit_timeout_ms),
278            callback_receiver,
279        )
280        .await
281        {
282            Err(anyhow::anyhow!(
283                "Did not receive ACK for commit notification from mempool! Error: {:?}",
284                error
285            ))
286        } else {
287            Ok(())
288        }
289    }
290
291    fn check_progress(&mut self) -> Result<(), anyhow::Error> {
292        if self.role == RoleType::Validator && self.sync_request.is_none() {
293            return Ok(());
294        }
295
296        // Check if the sync request has timed out
297        if let Some(sync_request) = self.sync_request.as_ref() {
298            let timeout_between_commits =
299                Duration::from_millis(self.config.sync_request_timeout_ms);
300            let commit_deadline = sync_request
301                .last_commit_timestamp
302                .checked_add(timeout_between_commits)
303                .ok_or_else(|| {
304                    anyhow::anyhow!(
305                        "The commit deadline timestamp has overflown!"
306                    )
307                })?;
308
309            if SystemTime::now().duration_since(commit_deadline).is_ok() {
310                diem_warn!(LogSchema::event_log(
311                    LogEntry::SyncRequest,
312                    LogEvent::Timeout
313                ));
314
315                if let Some(sync_request) = self.sync_request.take() {
316                    if let Err(e) = Self::send_sync_req_callback(
317                        sync_request,
318                        Err(anyhow::anyhow!("Sync request timed out!")),
319                    ) {
320                        diem_error!(
321                            LogSchema::event_log(
322                                LogEntry::SyncRequest,
323                                LogEvent::CallbackFail
324                            ),
325                            "Failed to send sync request callback: {:?}",
326                            e
327                        );
328                    }
329                }
330            }
331        }
332
333        Ok(())
334    }
335
336    fn send_sync_req_callback(
337        sync_req: SyncRequest, msg: Result<(), anyhow::Error>,
338    ) -> Result<(), anyhow::Error> {
339        sync_req.callback.send(msg).map_err(|failed_msg| {
340            anyhow::anyhow!(
341                "Consensus sync request callback error - failed to send: {:?}",
342                failed_msg
343            )
344        })
345    }
346
347    fn send_initialization_callback(
348        callback: oneshot::Sender<Result<(), anyhow::Error>>,
349    ) -> Result<(), anyhow::Error> {
350        callback.send(Ok(())).map_err(|error| {
351            anyhow::anyhow!(
352                "Initialization callback error - failed to send: {:?}",
353                error
354            )
355        })
356    }
357}