1use crate::pos::{
9 mempool::{CommitNotification, CommitResponse, CommittedTransaction},
10 state_sync::{
11 client::{CoordinatorMessage, SyncRequest},
12 executor_proxy::{ExecutorProxyTrait, SyncState},
13 logging::{LogEntry, LogEvent, LogSchema},
14 },
15};
16use diem_config::config::{NodeConfig, RoleType, StateSyncConfig};
17use diem_logger::prelude::*;
18use diem_types::{contract_event::ContractEvent, transaction::Transaction};
19use futures::{
20 channel::{mpsc, oneshot},
21 StreamExt,
22};
23use std::time::{Duration, SystemTime};
24use tokio::time::{interval, timeout};
25use tokio_stream::wrappers::IntervalStream;
26
27pub(crate) struct StateSyncCoordinator<T> {
31 client_events: mpsc::UnboundedReceiver<CoordinatorMessage>,
32 state_sync_to_mempool_sender: mpsc::Sender<CommitNotification>,
33 local_state: SyncState,
34 config: StateSyncConfig,
35 role: RoleType,
36 sync_request: Option<SyncRequest>,
37 executor_proxy: T,
38}
39
40impl<T: ExecutorProxyTrait> StateSyncCoordinator<T> {
41 pub fn new(
42 client_events: mpsc::UnboundedReceiver<CoordinatorMessage>,
43 state_sync_to_mempool_sender: mpsc::Sender<CommitNotification>,
44 node_config: &NodeConfig, executor_proxy: T, initial_state: SyncState,
45 ) -> Result<Self, anyhow::Error> {
46 let role = node_config.base.role;
47
48 Ok(Self {
49 client_events,
50 state_sync_to_mempool_sender,
51 local_state: initial_state,
52 config: node_config.state_sync.clone(),
53 role,
54 sync_request: None,
55 executor_proxy,
56 })
57 }
58
59 pub async fn start(mut self) {
61 diem_info!(LogSchema::new(LogEntry::RuntimeStart));
62 let mut interval = IntervalStream::new(interval(
63 Duration::from_millis(self.config.tick_interval_ms),
64 ))
65 .fuse();
66
67 loop {
68 ::futures::select! {
69 msg = self.client_events.select_next_some() => {
70 match msg {
71 CoordinatorMessage::SyncRequest(_request) => {
72 }
75 CoordinatorMessage::CommitNotification(notification) => {
76 if let Err(e) = self.process_commit_notification(
77 notification.committed_transactions,
78 Some(notification.callback),
79 notification.reconfiguration_events,
80 ).await {
81 diem_error!(
82 LogSchema::event_log(
83 LogEntry::ConsensusCommit,
84 LogEvent::PostCommitFail
85 ),
86 "Failed to process commit notification: {:?}",
87 e
88 );
89 }
90 }
91 CoordinatorMessage::WaitForInitialization(cb_sender) => {
92 if let Err(e) = Self::send_initialization_callback(cb_sender) {
94 diem_error!(
95 "Failed to send initialization callback: {:?}",
96 e
97 );
98 }
99 }
100 };
101 },
102 _ = interval.select_next_some() => {
103 if let Err(e) = self.check_progress() {
104 diem_error!(
105 LogSchema::event_log(
106 LogEntry::ProgressCheck,
107 LogEvent::Fail
108 ),
109 "Progress check failed: {:?}",
110 e
111 );
112 }
113 }
114 }
115 }
116 }
117
118 fn sync_state_with_local_storage(&mut self) -> Result<(), anyhow::Error> {
119 let new_state = self.executor_proxy.get_local_storage_state()?;
120 if new_state.trusted_epoch() > self.local_state.trusted_epoch() {
121 diem_info!(LogSchema::new(LogEntry::EpochChange)
122 .old_epoch(self.local_state.trusted_epoch())
123 .new_epoch(new_state.trusted_epoch()));
124 }
125 self.local_state = new_state;
126 Ok(())
127 }
128
129 async fn process_commit_notification(
132 &mut self, committed_transactions: Vec<Transaction>,
133 commit_callback: Option<
134 oneshot::Sender<Result<CommitResponse, anyhow::Error>>,
135 >,
136 reconfiguration_events: Vec<ContractEvent>,
137 ) -> Result<(), anyhow::Error> {
138 diem_debug!(
139 "process_commit_notification: {} events",
140 reconfiguration_events.len()
141 );
142 self.sync_state_with_local_storage()?;
143
144 let commit_response = match self
146 .notify_mempool_of_committed_transactions(committed_transactions)
147 .await
148 {
149 Ok(()) => CommitResponse::success(),
150 Err(error) => {
151 diem_error!(
152 LogSchema::new(LogEntry::CommitFlow),
153 "Failed to notify mempool: {:?}",
154 error
155 );
156 CommitResponse::error(error.to_string())
157 }
158 };
159
160 if let Err(error) = self.notify_consensus_of_commit_response(
162 commit_response,
163 commit_callback,
164 ) {
165 diem_error!(
166 LogSchema::new(LogEntry::CommitFlow),
167 "Failed to notify consensus: {:?}",
168 error
169 );
170 }
171
172 if let Some(req) = self.sync_request.as_mut() {
173 req.last_commit_timestamp = SystemTime::now();
174 }
175
176 let synced_version = self.local_state.synced_version();
178 self.check_sync_request_completed(synced_version)?;
179
180 if let Err(error) = self
182 .executor_proxy
183 .publish_on_chain_config_updates(reconfiguration_events)
184 {
185 diem_error!(
186 LogSchema::event_log(LogEntry::Reconfig, LogEvent::Fail),
187 "Failed to publish reconfig updates: {:?}",
188 error
189 );
190 }
191
192 Ok(())
193 }
194
195 fn check_sync_request_completed(
196 &mut self, synced_version: u64,
197 ) -> Result<(), anyhow::Error> {
198 if let Some(sync_request) = self.sync_request.as_ref() {
199 let sync_target_version =
200 sync_request.target.ledger_info().version();
201 if synced_version > sync_target_version {
202 return Err(anyhow::anyhow!(
203 "Synced beyond the target version. Synced version: {}, target version: {}",
204 synced_version,
205 sync_target_version,
206 ));
207 }
208 if synced_version == sync_target_version {
209 let committed_version = self.local_state.committed_version();
210 let local_epoch = self.local_state.trusted_epoch();
211 diem_info!(LogSchema::event_log(
212 LogEntry::SyncRequest,
213 LogEvent::Complete
214 )
215 .local_li_version(committed_version)
216 .local_synced_version(synced_version)
217 .local_epoch(local_epoch));
218 if let Some(sync_request) = self.sync_request.take() {
219 Self::send_sync_req_callback(sync_request, Ok(()))?;
220 }
221 }
222 }
223
224 Ok(())
225 }
226
227 fn notify_consensus_of_commit_response(
228 &self, commit_response: CommitResponse,
229 callback: Option<
230 oneshot::Sender<Result<CommitResponse, anyhow::Error>>,
231 >,
232 ) -> Result<(), anyhow::Error> {
233 if let Some(callback) = callback {
234 if let Err(error) = callback.send(Ok(commit_response)) {
235 return Err(anyhow::anyhow!(
236 "Failed to send commit ACK to consensus!: {:?}",
237 error
238 ));
239 }
240 }
241 Ok(())
242 }
243
244 async fn notify_mempool_of_committed_transactions(
245 &mut self, committed_transactions: Vec<Transaction>,
246 ) -> Result<(), anyhow::Error> {
247 let user_transactions = committed_transactions
248 .iter()
249 .filter_map(|transaction| match transaction {
250 Transaction::UserTransaction(signed_txn) => {
251 Some(CommittedTransaction {
252 sender: signed_txn.sender(),
253 hash: signed_txn.hash(),
254 })
255 }
256 _ => None,
257 })
258 .collect();
259
260 let (callback_sender, callback_receiver) = oneshot::channel();
261 let req = CommitNotification {
262 transactions: user_transactions,
263 block_timestamp_usecs: self
264 .local_state
265 .committed_ledger_info()
266 .ledger_info()
267 .timestamp_usecs(),
268 callback: callback_sender,
269 };
270
271 if let Err(error) = self.state_sync_to_mempool_sender.try_send(req) {
272 Err(anyhow::anyhow!(
273 "Failed to notify mempool of committed transactions! Error: {:?}",
274 error
275 ))
276 } else if let Err(error) = timeout(
277 Duration::from_millis(self.config.mempool_commit_timeout_ms),
278 callback_receiver,
279 )
280 .await
281 {
282 Err(anyhow::anyhow!(
283 "Did not receive ACK for commit notification from mempool! Error: {:?}",
284 error
285 ))
286 } else {
287 Ok(())
288 }
289 }
290
291 fn check_progress(&mut self) -> Result<(), anyhow::Error> {
292 if self.role == RoleType::Validator && self.sync_request.is_none() {
293 return Ok(());
294 }
295
296 if let Some(sync_request) = self.sync_request.as_ref() {
298 let timeout_between_commits =
299 Duration::from_millis(self.config.sync_request_timeout_ms);
300 let commit_deadline = sync_request
301 .last_commit_timestamp
302 .checked_add(timeout_between_commits)
303 .ok_or_else(|| {
304 anyhow::anyhow!(
305 "The commit deadline timestamp has overflown!"
306 )
307 })?;
308
309 if SystemTime::now().duration_since(commit_deadline).is_ok() {
310 diem_warn!(LogSchema::event_log(
311 LogEntry::SyncRequest,
312 LogEvent::Timeout
313 ));
314
315 if let Some(sync_request) = self.sync_request.take() {
316 if let Err(e) = Self::send_sync_req_callback(
317 sync_request,
318 Err(anyhow::anyhow!("Sync request timed out!")),
319 ) {
320 diem_error!(
321 LogSchema::event_log(
322 LogEntry::SyncRequest,
323 LogEvent::CallbackFail
324 ),
325 "Failed to send sync request callback: {:?}",
326 e
327 );
328 }
329 }
330 }
331 }
332
333 Ok(())
334 }
335
336 fn send_sync_req_callback(
337 sync_req: SyncRequest, msg: Result<(), anyhow::Error>,
338 ) -> Result<(), anyhow::Error> {
339 sync_req.callback.send(msg).map_err(|failed_msg| {
340 anyhow::anyhow!(
341 "Consensus sync request callback error - failed to send: {:?}",
342 failed_msg
343 )
344 })
345 }
346
347 fn send_initialization_callback(
348 callback: oneshot::Sender<Result<(), anyhow::Error>>,
349 ) -> Result<(), anyhow::Error> {
350 callback.send(Ok(())).map_err(|error| {
351 anyhow::anyhow!(
352 "Initialization callback error - failed to send: {:?}",
353 error
354 )
355 })
356 }
357}