cfxcore/pos/state_sync/coordinator.rs
1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8#![allow(unused)]
9use crate::pos::{
10 mempool::{CommitNotification, CommitResponse, CommittedTransaction},
11 state_sync::{
12 chunk_request::{GetChunkRequest, TargetType},
13 chunk_response::ResponseLedgerInfo,
14 client::{CoordinatorMessage, SyncRequest},
15 counters,
16 error::Error,
17 executor_proxy::ExecutorProxyTrait,
18 logging::{LogEntry, LogEvent, LogSchema},
19 shared_components::SyncState,
20 },
21};
22use diem_config::config::{
23 NodeConfig, PeerNetworkId, RoleType, StateSyncConfig,
24};
25use diem_logger::prelude::*;
26use diem_types::{
27 contract_event::ContractEvent,
28 ledger_info::LedgerInfoWithSignatures,
29 transaction::{Transaction, TransactionListWithProof, Version},
30 waypoint::Waypoint,
31};
32use futures::{
33 channel::{mpsc, oneshot},
34 StreamExt,
35};
36use std::{
37 cmp,
38 collections::HashMap,
39 time::{Duration, SystemTime},
40};
41use tokio::time::{interval, timeout};
42use tokio_stream::wrappers::IntervalStream;
43
44#[derive(Clone, Debug, PartialEq, Eq)]
45struct PendingRequestInfo {
46 expiration_time: SystemTime,
47 known_version: u64,
48 request_epoch: u64,
49 target_li: Option<LedgerInfoWithSignatures>,
50 chunk_limit: u64,
51}
52
53/// Coordination of the state sync process is driven by StateSyncCoordinator.
54/// The `start()` function runs an infinite event loop and triggers actions
55/// based on external and internal (local) requests. The coordinator works in
56/// two modes (depending on the role):
57/// * FullNode: infinite stream of ChunkRequests is sent to the predefined
58/// static peers
59/// (the parent is going to reply with a ChunkResponse if its committed version
60/// becomes higher within the timeout interval).
61/// * Validator: the ChunkRequests are generated on demand for a specific target
62/// LedgerInfo to
63/// synchronize to.
64pub(crate) struct StateSyncCoordinator<T> {
65 // used to process client requests
66 client_events: mpsc::UnboundedReceiver<CoordinatorMessage>,
67 // used to send messages (e.g. notifications about newly committed txns) to
68 // mempool
69 state_sync_to_mempool_sender: mpsc::Sender<CommitNotification>,
70 // Current state of the storage, which includes both the latest committed
71 // transaction and the latest transaction covered by the LedgerInfo
72 // (see `SynchronizerState` documentation). The state is updated via
73 // syncing with the local storage.
74 local_state: SyncState,
75 // config
76 config: StateSyncConfig,
77 // role of node
78 role: RoleType,
79 // An initial waypoint: for as long as the local version is less than a
80 // version determined by waypoint a node is not going to be abl
81 waypoint: Waypoint,
82 // Actor for sending chunk requests
83 // Manages to whom and how to send chunk requests
84 //request_manager: RequestManager,
85 // Optional sync request to be called when the target sync is reached
86 sync_request: Option<SyncRequest>,
87 // If we're a full node syncing to the latest state, this holds the highest
88 // ledger info we know about and are currently syncing to. This allows
89 // us to incrementally sync to ledger infos in storage. Higher ledger
90 // infos will only be considered once we sync to this.
91 target_ledger_info: Option<LedgerInfoWithSignatures>,
92 // Option initialization listener to be called when the coordinator is
93 // caught up with its waypoint.
94 initialization_listener: Option<oneshot::Sender<Result<(), Error>>>,
95 // queue of incoming long polling requests
96 // peer will be notified about new chunk of transactions if it's available
97 // before expiry time
98 subscriptions: HashMap<PeerNetworkId, PendingRequestInfo>,
99 executor_proxy: T,
100}
101
102impl<T: ExecutorProxyTrait> StateSyncCoordinator<T> {
103 pub fn new(
104 client_events: mpsc::UnboundedReceiver<CoordinatorMessage>,
105 state_sync_to_mempool_sender: mpsc::Sender<CommitNotification>,
106 /* network_senders: HashMap<NodeNetworkId, StateSyncSender>, */
107 node_config: &NodeConfig, waypoint: Waypoint, executor_proxy: T,
108 initial_state: SyncState,
109 ) -> Result<Self, Error> {
110 diem_info!(LogSchema::event_log(
111 LogEntry::Waypoint,
112 LogEvent::Initialize
113 )
114 .waypoint(waypoint));
115
116 // Create a new request manager.
117 let role = node_config.base.role;
118 let tick_interval_ms = node_config.state_sync.tick_interval_ms;
119 let retry_timeout_val = match role {
120 RoleType::FullNode => tick_interval_ms
121 .checked_add(node_config.state_sync.long_poll_timeout_ms)
122 .ok_or_else(|| {
123 Error::IntegerOverflow(
124 "Fullnode retry timeout has overflown!".into(),
125 )
126 })?,
127 RoleType::Validator => {
128 tick_interval_ms.checked_mul(2).ok_or_else(|| {
129 Error::IntegerOverflow(
130 "Validator retry timeout has overflown!".into(),
131 )
132 })?
133 }
134 };
135 /*
136 let request_manager = RequestManager::new(
137 Duration::from_millis(retry_timeout_val),
138 Duration::from_millis(node_config.state_sync.multicast_timeout_ms),
139 network_senders,
140 );*/
141
142 Ok(Self {
143 client_events,
144 state_sync_to_mempool_sender,
145 local_state: initial_state,
146 config: node_config.state_sync.clone(),
147 role,
148 waypoint,
149 //request_manager,
150 subscriptions: HashMap::new(),
151 sync_request: None,
152 target_ledger_info: None,
153 initialization_listener: None,
154 executor_proxy,
155 })
156 }
157
158 /// main routine. starts sync coordinator that listens for CoordinatorMsg
159 pub async fn start(
160 mut self,
161 /* network_handles: Vec<(NodeNetworkId, StateSyncSender,
162 * StateSyncEvents)>, */
163 ) {
164 diem_info!(LogSchema::new(LogEntry::RuntimeStart));
165 let mut interval = IntervalStream::new(interval(
166 Duration::from_millis(self.config.tick_interval_ms),
167 ))
168 .fuse();
169
170 /*let events: Vec<_> = network_handles
171 .into_iter()
172 .map(|(network_id, _sender, events)| {
173 events.map(move |e| (network_id.clone(), e))
174 })
175 .collect();
176 let mut network_events = select_all(events).fuse();*/
177
178 loop {
179 let _timer = counters::MAIN_LOOP.start_timer();
180 ::futures::select! {
181 msg = self.client_events.select_next_some() => {
182 match msg {
183 CoordinatorMessage::SyncRequest(request) => {
184 let _timer = counters::PROCESS_COORDINATOR_MSG_LATENCY
185 .with_label_values(&[counters::SYNC_MSG_LABEL])
186 .start_timer();
187 /*if let Err(e) = self.process_sync_request(*request) {
188 diem_error!(LogSchema::new(LogEntry::SyncRequest).error(&e));
189 counters::SYNC_REQUEST_RESULT.with_label_values(&[counters::FAIL_LABEL]).inc();
190 }*/
191 }
192 CoordinatorMessage::CommitNotification(notification) => {
193 let _timer = counters::PROCESS_COORDINATOR_MSG_LATENCY
194 .with_label_values(&[counters::COMMIT_MSG_LABEL])
195 .start_timer();
196 if let Err(e) = self.process_commit_notification(notification.committed_transactions, Some(notification.callback), notification.reconfiguration_events, None).await {
197 counters::CONSENSUS_COMMIT_FAIL_COUNT.inc();
198 diem_error!(LogSchema::event_log(LogEntry::ConsensusCommit, LogEvent::PostCommitFail).error(&e));
199 }
200 }
201 CoordinatorMessage::GetSyncState(callback) => {
202 let _ = self.get_sync_state(callback);
203 }
204 CoordinatorMessage::WaitForInitialization(cb_sender) => {
205 if let Err(e) = self.wait_for_initialization(cb_sender) {
206 diem_error!(LogSchema::new(LogEntry::Waypoint).error(&e));
207 }
208 }
209 };
210 },
211 /*(network_id, event) = network_events.select_next_some() => {
212 match event {
213 Event::NewPeer(metadata) => {
214 if let Err(e) = self.process_new_peer(network_id, metadata) {
215 diem_error!(LogSchema::new(LogEntry::NewPeer).error(&e));
216 }
217 }
218 Event::LostPeer(metadata) => {
219 if let Err(e) = self.process_lost_peer(network_id, metadata.remote_peer_id) {
220 diem_error!(LogSchema::new(LogEntry::LostPeer).error(&e));
221 }
222 }
223 Event::Message(peer_id, message) => {
224 if let Err(e) = self.process_chunk_message(network_id.clone(), peer_id, message).await {
225 diem_error!(LogSchema::new(LogEntry::ProcessChunkMessage).error(&e));
226 }
227 }
228 unexpected_event => {
229 counters::NETWORK_ERROR_COUNT.inc();
230 diem_warn!(LogSchema::new(LogEntry::NetworkError),
231 "received unexpected network event: {:?}", unexpected_event);
232 },
233
234 }
235 },*/
236 _ = interval.select_next_some() => {
237 if let Err(e) = self.check_progress() {
238 diem_error!(LogSchema::event_log(LogEntry::ProgressCheck, LogEvent::Fail).error(&e));
239 }
240 }
241 }
242 }
243 // diem_error!("Coordinator stops");
244 }
245
246 /*fn process_new_peer(
247 &mut self, network_id: NodeNetworkId, metadata: ConnectionMetadata,
248 ) -> Result<(), Error> {
249 let peer = PeerNetworkId(network_id, metadata.remote_peer_id);
250 self.request_manager.enable_peer(peer, metadata)?;
251 self.check_progress()
252 }*/
253
254 /*fn process_lost_peer(
255 &mut self, network_id: NodeNetworkId, peer_id: PeerId,
256 ) -> Result<(), Error> {
257 let peer = PeerNetworkId(network_id, peer_id);
258 self.request_manager.disable_peer(&peer)
259 }*/
260
261 /*pub(crate) async fn process_chunk_message(
262 &mut self, network_id: NodeNetworkId, peer_id: PeerId,
263 msg: StateSyncMessage,
264 ) -> Result<(), Error>
265 {
266 let peer = PeerNetworkId(network_id, peer_id);
267 match msg {
268 StateSyncMessage::GetChunkRequest(request) => {
269 // Time request handling
270 let _timer = counters::PROCESS_MSG_LATENCY
271 .with_label_values(&[
272 &peer.raw_network_id().to_string(),
273 &peer.peer_id().to_string(),
274 counters::CHUNK_REQUEST_MSG_LABEL,
275 ])
276 .start_timer();
277
278 // Process chunk request
279 let process_result =
280 self.process_chunk_request(peer.clone(), *request.clone());
281 if let Err(ref error) = process_result {
282 diem_error!(LogSchema::event_log(
283 LogEntry::ProcessChunkRequest,
284 LogEvent::Fail
285 )
286 .peer(&peer)
287 .error(&error.clone())
288 .local_li_version(self.local_state.committed_version())
289 .chunk_request(*request));
290 counters::PROCESS_CHUNK_REQUEST_COUNT
291 .with_label_values(&[
292 &peer.raw_network_id().to_string(),
293 &peer.peer_id().to_string(),
294 counters::FAIL_LABEL,
295 ])
296 .inc();
297 } else {
298 counters::PROCESS_CHUNK_REQUEST_COUNT
299 .with_label_values(&[
300 &peer.raw_network_id().to_string(),
301 &peer.peer_id().to_string(),
302 counters::SUCCESS_LABEL,
303 ])
304 .inc();
305 }
306 process_result
307 }
308 StateSyncMessage::GetChunkResponse(response) => {
309 // Time response handling
310 let _timer = counters::PROCESS_MSG_LATENCY
311 .with_label_values(&[
312 &peer.raw_network_id().to_string(),
313 &peer.peer_id().to_string(),
314 counters::CHUNK_RESPONSE_MSG_LABEL,
315 ])
316 .start_timer();
317
318 // Process chunk response
319 self.process_chunk_response(&peer, *response).await
320 }
321 }
322 }*/
323
324 /// Sync up coordinator state with the local storage
325 /// and updates the pending ledger info accordingly
326 fn sync_state_with_local_storage(&mut self) -> Result<(), Error> {
327 let new_state =
328 self.executor_proxy.get_local_storage_state().map_err(|e| {
329 counters::STORAGE_READ_FAIL_COUNT.inc();
330 e
331 })?;
332 if new_state.trusted_epoch() > self.local_state.trusted_epoch() {
333 diem_info!(LogSchema::new(LogEntry::EpochChange)
334 .old_epoch(self.local_state.trusted_epoch())
335 .new_epoch(new_state.trusted_epoch()));
336 }
337 self.local_state = new_state;
338 Ok(())
339 }
340
341 /// Verify that the local state's latest LI version (i.e. committed version)
342 /// has reached the waypoint version.
343 fn is_initialized(&self) -> bool {
344 self.waypoint.version() <= self.local_state.committed_version()
345 }
346
347 fn wait_for_initialization(
348 &mut self, cb_sender: oneshot::Sender<Result<(), Error>>,
349 ) -> Result<(), Error> {
350 if self.is_initialized() {
351 Self::send_initialization_callback(cb_sender)?;
352 } else {
353 self.initialization_listener = Some(cb_sender);
354 }
355
356 Ok(())
357 }
358
359 /*/// This method requests state sync to sync to the target specified by the
360 /// SyncRequest. If there is an existing sync request it will be
361 /// overridden. Note: when processing a sync request, state sync assumes
362 /// that it's the only one modifying storage, i.e., consensus is not
363 /// trying to commit transactions concurrently.
364 fn process_sync_request(
365 &mut self, request: SyncRequest,
366 ) -> Result<(), Error> {
367 fail_point!("state_sync::process_sync_request_message", |_| {
368 Err(crate::error::Error::UnexpectedError(
369 "Injected error in process_sync_request_message".into(),
370 ))
371 });
372
373 // Full nodes don't support sync requests
374 if self.role == RoleType::FullNode {
375 return Err(Error::FullNodeSyncRequest);
376 }
377
378 let local_li_version = self.local_state.committed_version();
379 let target_version = request.target.ledger_info().version();
380 diem_info!(LogSchema::event_log(
381 LogEntry::SyncRequest,
382 LogEvent::Received
383 )
384 .target_version(target_version)
385 .local_li_version(local_li_version));
386
387 self.sync_state_with_local_storage()?;
388 if !self.is_initialized() {
389 return Err(Error::UninitializedError(
390 "Unable to process sync request message!".into(),
391 ));
392 }
393
394 if target_version == local_li_version {
395 return Self::send_sync_req_callback(request, Ok(()));
396 }
397 if target_version < local_li_version {
398 Self::send_sync_req_callback(
399 request,
400 Err(Error::UnexpectedError(
401 "Sync request to old version".into(),
402 )),
403 )?;
404 return Err(Error::OldSyncRequestVersion(
405 target_version,
406 local_li_version,
407 ));
408 }
409
410 // Save the new sync request
411 self.sync_request = Some(request);
412
413 // Send a chunk request for the sync target
414 let known_version = self.local_state.synced_version();
415 self.send_chunk_request_with_target(
416 known_version,
417 self.local_state.trusted_epoch(),
418 self.create_sync_request_chunk_target(known_version)?,
419 )
420 }*/
421
422 /// Notifies consensus of the given commit response.
423 /// Note: if a callback is not specified, the response isn't sent anywhere.
424 fn notify_consensus_of_commit_response(
425 &self, commit_response: CommitResponse,
426 callback: Option<oneshot::Sender<Result<CommitResponse, Error>>>,
427 ) -> Result<(), Error> {
428 if let Some(callback) = callback {
429 if let Err(error) = callback.send(Ok(commit_response)) {
430 counters::COMMIT_FLOW_FAIL
431 .with_label_values(&[counters::CONSENSUS_LABEL])
432 .inc();
433 return Err(Error::CallbackSendFailed(format!(
434 "Failed to send commit ACK to consensus!: {:?}",
435 error
436 )));
437 }
438 }
439 Ok(())
440 }
441
442 /// This method updates state sync to process new transactions that have
443 /// been committed to storage (e.g., through consensus or through a
444 /// chunk response). When notified about a new commit we should: (i)
445 /// respond to relevant long poll requests; (ii) update local sync and
446 /// initialization requests (where appropriate); and (iii) publish
447 /// on chain config updates.
448 async fn process_commit_notification(
449 &mut self, committed_transactions: Vec<Transaction>,
450 commit_callback: Option<oneshot::Sender<Result<CommitResponse, Error>>>,
451 reconfiguration_events: Vec<ContractEvent>,
452 chunk_sender: Option<&PeerNetworkId>,
453 ) -> Result<(), Error> {
454 diem_debug!(
455 "process_commit_notification: {} events",
456 reconfiguration_events.len()
457 );
458 // We choose to re-sync the state with the storage as it's the simplest
459 // approach: in case the performance implications of re-syncing
460 // upon every commit are high, it's possible to manage some of
461 // the highest known versions in memory.
462 self.sync_state_with_local_storage()?;
463 self.update_sync_state_metrics_and_logs()?;
464
465 // Notify mempool of commit
466 let commit_response = match self
467 .notify_mempool_of_committed_transactions(committed_transactions)
468 .await
469 {
470 Ok(()) => CommitResponse::success(),
471 Err(error) => {
472 diem_error!(LogSchema::new(LogEntry::CommitFlow).error(&error));
473 CommitResponse::error(error.to_string())
474 }
475 };
476
477 // Notify consensus of the commit response
478 if let Err(error) = self.notify_consensus_of_commit_response(
479 commit_response,
480 commit_callback,
481 ) {
482 diem_error!(LogSchema::new(LogEntry::CommitFlow).error(&error),);
483 }
484
485 // Check long poll subscriptions, update peer requests and sync request
486 // last progress timestamp.
487 self.check_subscriptions();
488 let synced_version = self.local_state.synced_version();
489 /*self.request_manager.remove_requests(synced_version);
490 if let Some(peer) = chunk_sender {
491 self.request_manager.process_success_response(peer);
492 }*/
493 if let Some(mut req) = self.sync_request.as_mut() {
494 req.last_commit_timestamp = SystemTime::now();
495 }
496
497 // Check if we're now initialized or if we hit the sync request target
498 self.check_initialized_or_sync_request_completed(synced_version)?;
499
500 // Publish the on chain config updates
501 if let Err(error) = self
502 .executor_proxy
503 .publish_on_chain_config_updates(reconfiguration_events)
504 {
505 counters::RECONFIG_PUBLISH_COUNT
506 .with_label_values(&[counters::FAIL_LABEL])
507 .inc();
508 diem_error!(LogSchema::event_log(
509 LogEntry::Reconfig,
510 LogEvent::Fail
511 )
512 .error(&error));
513 }
514
515 Ok(())
516 }
517
518 /// Checks if we are now at the initialization point (i.e., the waypoint),
519 /// or at the version specified by a sync request made by consensus.
520 fn check_initialized_or_sync_request_completed(
521 &mut self, synced_version: u64,
522 ) -> Result<(), Error> {
523 let committed_version = self.local_state.committed_version();
524 let local_epoch = self.local_state.trusted_epoch();
525
526 // Check if we're now initialized
527 if self.is_initialized() {
528 if let Some(initialization_listener) =
529 self.initialization_listener.take()
530 {
531 diem_info!(LogSchema::event_log(
532 LogEntry::Waypoint,
533 LogEvent::Complete
534 )
535 .local_li_version(committed_version)
536 .local_synced_version(synced_version)
537 .local_epoch(local_epoch));
538 Self::send_initialization_callback(initialization_listener)?;
539 }
540 }
541
542 // Check if we're now at the sync request target
543 if let Some(sync_request) = self.sync_request.as_ref() {
544 let sync_target_version =
545 sync_request.target.ledger_info().version();
546 if synced_version > sync_target_version {
547 return Err(Error::SyncedBeyondTarget(
548 synced_version,
549 sync_target_version,
550 ));
551 }
552 if synced_version == sync_target_version {
553 diem_info!(LogSchema::event_log(
554 LogEntry::SyncRequest,
555 LogEvent::Complete
556 )
557 .local_li_version(committed_version)
558 .local_synced_version(synced_version)
559 .local_epoch(local_epoch));
560 counters::SYNC_REQUEST_RESULT
561 .with_label_values(&[counters::COMPLETE_LABEL])
562 .inc();
563 if let Some(sync_request) = self.sync_request.take() {
564 Self::send_sync_req_callback(sync_request, Ok(()))?;
565 }
566 }
567 }
568
569 Ok(())
570 }
571
572 /// Notifies mempool that transactions have been committed.
573 async fn notify_mempool_of_committed_transactions(
574 &mut self, committed_transactions: Vec<Transaction>,
575 ) -> Result<(), Error> {
576 // Get all user transactions from committed transactions
577 let user_transactions = committed_transactions
578 .iter()
579 .filter_map(|transaction| match transaction {
580 Transaction::UserTransaction(signed_txn) => {
581 Some(CommittedTransaction {
582 sender: signed_txn.sender(),
583 hash: signed_txn.hash(),
584 })
585 }
586 _ => None,
587 })
588 .collect();
589
590 // Create commit notification of user transactions for mempool
591 let (callback_sender, callback_receiver) = oneshot::channel();
592 let req = CommitNotification {
593 transactions: user_transactions,
594 block_timestamp_usecs: self
595 .local_state
596 .committed_ledger_info()
597 .ledger_info()
598 .timestamp_usecs(),
599 callback: callback_sender,
600 };
601
602 // Notify mempool of committed transactions
603 if let Err(error) = self.state_sync_to_mempool_sender.try_send(req) {
604 counters::COMMIT_FLOW_FAIL
605 .with_label_values(&[counters::TO_MEMPOOL_LABEL])
606 .inc();
607 Err(Error::CallbackSendFailed(format!(
608 "Failed to notify mempool of committed transactions! Error: {:?}",
609 error
610 )))
611 } else if let Err(error) = timeout(
612 Duration::from_millis(self.config.mempool_commit_timeout_ms),
613 callback_receiver,
614 )
615 .await
616 {
617 counters::COMMIT_FLOW_FAIL
618 .with_label_values(&[counters::FROM_MEMPOOL_LABEL])
619 .inc();
620 Err(Error::CallbackSendFailed(format!(
621 "Did not receive ACK for commit notification from mempool! Error: {:?}",
622 error
623 )))
624 } else {
625 Ok(())
626 }
627 }
628
629 /// Updates the metrics and logs based on the current (local) sync state.
630 fn update_sync_state_metrics_and_logs(&mut self) -> Result<(), Error> {
631 // Get data from local sync state
632 let synced_version = self.local_state.synced_version();
633 let committed_version = self.local_state.committed_version();
634 let local_epoch = self.local_state.trusted_epoch();
635
636 // Update versions
637 counters::set_version(counters::VersionType::Synced, synced_version);
638 counters::set_version(
639 counters::VersionType::Committed,
640 committed_version,
641 );
642 counters::EPOCH.set(local_epoch as i64);
643
644 // Update timestamps
645 counters::set_timestamp(
646 counters::TimestampType::Synced,
647 self.executor_proxy.get_version_timestamp(synced_version)?,
648 );
649 counters::set_timestamp(
650 counters::TimestampType::Committed,
651 self.executor_proxy
652 .get_version_timestamp(committed_version)?,
653 );
654 counters::set_timestamp(
655 counters::TimestampType::Real,
656 diem_infallible::duration_since_epoch().as_micros() as u64,
657 );
658
659 diem_debug!(LogSchema::new(LogEntry::LocalState)
660 .local_li_version(committed_version)
661 .local_synced_version(synced_version)
662 .local_epoch(local_epoch));
663 Ok(())
664 }
665
666 /// Returns the current SyncState of state sync.
667 /// Note: this is only used for testing and should be removed once
668 /// integration/e2e tests are updated to not rely on this.
669 fn get_sync_state(
670 &mut self, callback: oneshot::Sender<SyncState>,
671 ) -> Result<(), Error> {
672 self.sync_state_with_local_storage()?;
673 match callback.send(self.local_state.clone()) {
674 Err(error) => Err(Error::CallbackSendFailed(format!(
675 "Failed to get sync state! Error: {:?}",
676 error
677 ))),
678 _ => Ok(()),
679 }
680 }
681
682 /*/// There are two types of ChunkRequests:
683 /// 1) Validator chunk requests are for a specific target LI and don't ask
684 /// for long polling. 2) FullNode chunk requests don't specify a target
685 /// LI and can allow long polling.
686 fn process_chunk_request(
687 &mut self, peer: PeerNetworkId, request: GetChunkRequest,
688 ) -> Result<(), Error> {
689 diem_debug!(LogSchema::event_log(
690 LogEntry::ProcessChunkRequest,
691 LogEvent::Received
692 )
693 .peer(&peer)
694 .chunk_request(request.clone())
695 .local_li_version(self.local_state.committed_version()));
696 fail_point!("state_sync::process_chunk_request", |_| {
697 Err(crate::error::Error::UnexpectedError(
698 "Injected error in process_chunk_request".into(),
699 ))
700 });
701 self.sync_state_with_local_storage()?;
702
703 // Verify the chunk request is valid before trying to process it. If
704 // it's invalid, penalize the peer's score.
705 if let Err(error) = self.verify_chunk_request_is_valid(&request) {
706 self.request_manager.process_invalid_chunk_request(&peer);
707 return Err(error);
708 }
709
710 match request.target.clone() {
711 TargetType::TargetLedgerInfo(li) => self
712 .process_request_for_target_and_highest(
713 peer,
714 request,
715 Some(li),
716 None,
717 ),
718 TargetType::HighestAvailable {
719 target_li,
720 timeout_ms,
721 } => self.process_request_for_target_and_highest(
722 peer,
723 request,
724 target_li,
725 Some(timeout_ms),
726 ),
727 TargetType::Waypoint(waypoint_version) => self
728 .process_request_for_waypoint(peer, request, waypoint_version),
729 }
730 }*/
731
732 /*fn verify_chunk_request_is_valid(
733 &mut self, request: &GetChunkRequest,
734 ) -> Result<(), Error> {
735 // Ensure request versions are correctly formed
736 if let Some(target_version) = request.target.version() {
737 if target_version < request.known_version {
738 return Err(Error::InvalidChunkRequest(
739 "Target version is less than known version! Discarding request.".into(),
740 ));
741 }
742 }
743
744 // Ensure request epochs are correctly formed
745 if let Some(target_epoch) = request.target.epoch() {
746 if target_epoch < request.current_epoch {
747 return Err(Error::InvalidChunkRequest(
748 "Target epoch is less than current epoch! Discarding request.".into(),
749 ));
750 }
751 }
752
753 // Ensure the chunk limit is not zero
754 if request.limit == 0 {
755 return Err(Error::InvalidChunkRequest(
756 "Chunk request limit is 0. Discarding request.".into(),
757 ));
758 }
759
760 // Ensure the timeout is not zero
761 if let TargetType::HighestAvailable {
762 target_li: _,
763 timeout_ms,
764 } = request.target.clone()
765 {
766 if timeout_ms == 0 {
767 return Err(Error::InvalidChunkRequest(
768 "Long poll timeout is 0. Discarding request.".into(),
769 ));
770 }
771 }
772
773 Ok(())
774 }*/
775
776 /*/// Processing requests with no target LedgerInfo (highest available) and
777 /// potentially long polling.
778 /// Assumes that the local state is uptodate with storage.
779 fn process_request_for_target_and_highest(
780 &mut self, peer: PeerNetworkId, request: GetChunkRequest,
781 target_li: Option<LedgerInfoWithSignatures>, timeout_ms: Option<u64>,
782 ) -> Result<(), Error>
783 {
784 let chunk_limit =
785 std::cmp::min(request.limit, self.config.max_chunk_limit);
786 let timeout = if let Some(timeout_ms) = timeout_ms {
787 std::cmp::min(timeout_ms, self.config.max_timeout_ms)
788 } else {
789 self.config.max_timeout_ms
790 };
791
792 // If the node cannot respond to the request now (i.e., it's not
793 // up-to-date with the requestor) add the request to the
794 // subscriptions to be handled when this node catches up.
795 let local_version = self.local_state.committed_version();
796 if local_version <= request.known_version {
797 let expiration_time =
798 SystemTime::now().checked_add(Duration::from_millis(timeout));
799 if let Some(time) = expiration_time {
800 let request_info = PendingRequestInfo {
801 expiration_time: time,
802 known_version: request.known_version,
803 request_epoch: request.current_epoch,
804 target_li,
805 chunk_limit,
806 };
807 self.subscriptions.insert(peer, request_info);
808 }
809 return Ok(());
810 }
811
812 let (target_li, highest_li) = self.calculate_target_and_highest_li(
813 request.current_epoch,
814 target_li,
815 local_version,
816 )?;
817
818 self.deliver_chunk(
819 peer,
820 request.known_version,
821 ResponseLedgerInfo::ProgressiveLedgerInfo {
822 target_li,
823 highest_li,
824 },
825 chunk_limit,
826 )
827 }*/
828
829 fn calculate_target_and_highest_li(
830 &mut self, request_epoch: u64,
831 request_target_li: Option<LedgerInfoWithSignatures>,
832 local_version: u64,
833 ) -> Result<
834 (LedgerInfoWithSignatures, Option<LedgerInfoWithSignatures>),
835 Error,
836 > {
837 // If the request's epoch is in the past, `target_li` will be set to the
838 // end-of-epoch LI for that epoch
839 let target_li =
840 self.choose_response_li(request_epoch, request_target_li)?;
841
842 let highest_li = if target_li.ledger_info().version() < local_version
843 && target_li.ledger_info().epoch()
844 == self.local_state.trusted_epoch()
845 {
846 // Only populate highest_li field if it's in the past, and the same
847 // epoch. Recipient won't be able to verify ledger info
848 // if it's in a different epoch.
849 Some(self.local_state.committed_ledger_info())
850 } else {
851 None
852 };
853
854 Ok((target_li, highest_li))
855 }
856
857 fn process_request_for_waypoint(
858 &mut self, peer: PeerNetworkId, request: GetChunkRequest,
859 waypoint_version: Version,
860 ) -> Result<(), Error> {
861 let mut limit =
862 std::cmp::min(request.limit, self.config.max_chunk_limit);
863 if self.local_state.committed_version() < waypoint_version {
864 return Err(Error::UnexpectedError(format!(
865 "Local version {} < requested waypoint version {}.",
866 self.local_state.committed_version(),
867 waypoint_version
868 )));
869 }
870 if request.known_version >= waypoint_version {
871 return Err(Error::UnexpectedError(format!(
872 "Waypoint request version {} is not smaller than waypoint {}",
873 request.known_version, waypoint_version
874 )));
875 }
876
877 // Retrieve the waypoint LI.
878 let waypoint_li = self
879 .executor_proxy
880 .get_epoch_ending_ledger_info(waypoint_version)?;
881
882 // Txns are up to the end of request epoch with the proofs relative to
883 // the waypoint LI.
884 let end_of_epoch_li = if waypoint_li.ledger_info().epoch()
885 > request.current_epoch
886 {
887 let end_of_epoch_li = self
888 .executor_proxy
889 .get_epoch_change_ledger_info(request.current_epoch)?;
890 if end_of_epoch_li.ledger_info().version() < request.known_version {
891 return Err(Error::UnexpectedError(format!("Waypoint request's current_epoch (epoch {}, version {}) < waypoint request's known_version {}",
892 end_of_epoch_li.ledger_info().epoch(),
893 end_of_epoch_li.ledger_info().version(),
894 request.known_version,)));
895 }
896 let num_txns_until_end_of_epoch =
897 end_of_epoch_li.ledger_info().version() - request.known_version;
898 limit = std::cmp::min(limit, num_txns_until_end_of_epoch);
899 Some(end_of_epoch_li)
900 } else {
901 None
902 };
903
904 self.deliver_chunk(
905 peer,
906 request.known_version,
907 ResponseLedgerInfo::LedgerInfoForWaypoint {
908 waypoint_li,
909 end_of_epoch_li,
910 },
911 limit,
912 )
913 }
914
915 /// Generate and send the ChunkResponse to the given peer.
916 /// The chunk response contains transactions from the local storage with the
917 /// proofs relative to the given target ledger info.
918 /// In case target is None, the ledger info is set to the local highest
919 /// ledger info.
920 fn deliver_chunk(
921 &mut self, peer: PeerNetworkId, known_version: u64,
922 response_li: ResponseLedgerInfo, limit: u64,
923 ) -> Result<(), Error> {
924 /*
925 let txns = self.executor_proxy.get_chunk(
926 known_version,
927 limit,
928 response_li.version(),
929 )?;
930 let chunk_response = GetChunkResponse::new(response_li, txns);
931 let log = LogSchema::event_log(
932 LogEntry::ProcessChunkRequest,
933 LogEvent::DeliverChunk,
934 )
935 .chunk_response(chunk_response.clone())
936 .peer(&peer);
937 let msg = StateSyncMessage::GetChunkResponse(Box::new(chunk_response));
938 let send_result = self.request_manager.send_chunk_response(&peer, msg);
939 let send_result_label = if send_result.is_err() {
940 counters::SEND_FAIL_LABEL
941 } else {
942 diem_debug!(log);
943 counters::SEND_SUCCESS_LABEL
944 };
945 counters::RESPONSES_SENT
946 .with_label_values(&[
947 &peer.raw_network_id().to_string(),
948 &peer.peer_id().to_string(),
949 send_result_label,
950 ])
951 .inc();
952
953 send_result.map_err(|e| {
954 diem_error!(log.error(&e));
955 Error::UnexpectedError(format!(
956 "Network error in sending chunk response to {}",
957 peer
958 ))
959 })*/
960 Ok(())
961 }
962
963 /// The choice of the LedgerInfo in the response follows the following
964 /// logic:
965 /// * response LI is either the requested target or the highest local LI if
966 /// target is None.
967 /// * if the response LI would not belong to `request_epoch`, change
968 /// the response LI to the LI that is terminating `request_epoch`.
969 fn choose_response_li(
970 &self, request_epoch: u64, target: Option<LedgerInfoWithSignatures>,
971 ) -> Result<LedgerInfoWithSignatures, Error> {
972 let mut target_li =
973 target.unwrap_or_else(|| self.local_state.committed_ledger_info());
974 let target_epoch = target_li.ledger_info().epoch();
975 if target_epoch > request_epoch {
976 let end_of_epoch_li = self
977 .executor_proxy
978 .get_epoch_change_ledger_info(request_epoch)?;
979 diem_debug!(LogSchema::event_log(
980 LogEntry::ProcessChunkRequest,
981 LogEvent::PastEpochRequested
982 )
983 .old_epoch(request_epoch)
984 .new_epoch(target_epoch));
985 target_li = end_of_epoch_li;
986 }
987 Ok(target_li)
988 }
989
990 /*/// Applies (i.e., executes and stores) the chunk to storage iff `response`
991 /// is valid.
992 fn apply_chunk(
993 &mut self, peer: &PeerNetworkId, response: GetChunkResponse,
994 ) -> Result<(), Error> {
995 diem_debug!(LogSchema::event_log(
996 LogEntry::ProcessChunkResponse,
997 LogEvent::Received
998 )
999 .chunk_response(response.clone())
1000 .peer(peer));
1001 fail_point!("state_sync::apply_chunk", |_| {
1002 Err(crate::error::Error::UnexpectedError(
1003 "Injected error in apply_chunk".into(),
1004 ))
1005 });
1006
1007 // Process the chunk based on the response type
1008 let txn_list_with_proof = response.txn_list_with_proof.clone();
1009 let chunk_size = response.txn_list_with_proof.len() as u64;
1010 let known_version = self.local_state.synced_version();
1011 match response.response_li {
1012 ResponseLedgerInfo::VerifiableLedgerInfo(li) => self
1013 .process_response_with_target_and_highest(
1014 txn_list_with_proof,
1015 li,
1016 None,
1017 ),
1018 ResponseLedgerInfo::ProgressiveLedgerInfo {
1019 target_li,
1020 highest_li,
1021 } => {
1022 let highest_li =
1023 highest_li.unwrap_or_else(|| target_li.clone());
1024 self.process_response_with_target_and_highest(
1025 txn_list_with_proof,
1026 target_li,
1027 Some(highest_li),
1028 )
1029 }
1030 ResponseLedgerInfo::LedgerInfoForWaypoint {
1031 waypoint_li,
1032 end_of_epoch_li,
1033 } => self.process_response_with_waypoint_li(
1034 txn_list_with_proof,
1035 waypoint_li,
1036 end_of_epoch_li,
1037 ),
1038 }
1039 .map_err(|error| {
1040 //self.request_manager.process_invalid_chunk(&peer);
1041 Error::ProcessInvalidChunk(error.to_string())
1042 })?;
1043
1044 // Update counters and logs with processed chunk information
1045 counters::STATE_SYNC_CHUNK_SIZE
1046 .with_label_values(&[
1047 &peer.raw_network_id().to_string(),
1048 &peer.peer_id().to_string(),
1049 ])
1050 .observe(chunk_size as f64);
1051 let new_version =
1052 known_version.checked_add(chunk_size).ok_or_else(|| {
1053 Error::IntegerOverflow("New version has overflown!".into())
1054 })?;
1055 diem_debug!(
1056 LogSchema::event_log(
1057 LogEntry::ProcessChunkResponse,
1058 LogEvent::ApplyChunkSuccess
1059 ),
1060 "Applied chunk of size {}. Previous version: {}, new version {}",
1061 chunk_size,
1062 known_version,
1063 new_version
1064 );
1065
1066 // Log the request processing time (time from first requested until
1067 // now).
1068 match self.request_manager.get_first_request_time(known_version) {
1069 None => {
1070 diem_info!(
1071 LogSchema::event_log(LogEntry::ProcessChunkResponse, LogEvent::ReceivedChunkWithoutRequest),
1072 "Received a chunk of size {}, without making a request! Previous version: {}, new version {}",
1073 chunk_size,
1074 known_version,
1075 new_version
1076 );
1077 }
1078 Some(first_request_time) => {
1079 if let Ok(duration) =
1080 SystemTime::now().duration_since(first_request_time)
1081 {
1082 counters::SYNC_PROGRESS_DURATION.observe_duration(duration);
1083 }
1084 }
1085 }
1086 }*/
1087
1088 /*/// * Verifies, processes and stores the chunk in the given response.
1089 /// * Triggers post-commit actions based on new local state (after
1090 /// successfully processing a chunk).
1091 async fn process_chunk_response(
1092 &mut self, peer: &PeerNetworkId, response: GetChunkResponse,
1093 ) -> Result<(), Error> {
1094 // Ensure consensus isn't running, otherwise we might get a race with
1095 // storage writes.
1096 if self.is_consensus_executing() {
1097 let error = Error::ConsensusIsExecuting;
1098 diem_error!(LogSchema::new(LogEntry::ProcessChunkResponse,)
1099 .peer(peer)
1100 .error(&error));
1101 return Err(error);
1102 }
1103
1104 // Verify the chunk response is well formed before trying to process it.
1105 self.verify_chunk_response_is_valid(&peer, &response)?;
1106
1107 // Validate the response and store the chunk if possible.
1108 // Any errors thrown here should be for detecting bad chunks.
1109 match self.apply_chunk(peer, response.clone()) {
1110 Ok(()) => {
1111 counters::APPLY_CHUNK_COUNT
1112 .with_label_values(&[
1113 &peer.raw_network_id().to_string(),
1114 &peer.peer_id().to_string(),
1115 counters::SUCCESS_LABEL,
1116 ])
1117 .inc();
1118 }
1119 Err(error) => {
1120 diem_error!(LogSchema::event_log(
1121 LogEntry::ProcessChunkResponse,
1122 LogEvent::ApplyChunkFail
1123 )
1124 .peer(peer)
1125 .error(&error));
1126 counters::APPLY_CHUNK_COUNT
1127 .with_label_values(&[
1128 &peer.raw_network_id().to_string(),
1129 &peer.peer_id().to_string(),
1130 counters::FAIL_LABEL,
1131 ])
1132 .inc();
1133 return Err(error);
1134 }
1135 }
1136
1137 // Process the newly committed chunk
1138 self.process_commit_notification(
1139 response.txn_list_with_proof.transactions.clone(),
1140 None,
1141 vec![],
1142 Some(peer),
1143 )
1144 .await
1145 .map_err(|error| {
1146 diem_error!(LogSchema::event_log(
1147 LogEntry::ProcessChunkResponse,
1148 LogEvent::PostCommitFail
1149 )
1150 .peer(peer)
1151 .error(&error));
1152 error
1153 })
1154 }*/
1155
1156 /*fn verify_chunk_response_is_valid(
1157 &mut self, peer: &PeerNetworkId, response: &GetChunkResponse,
1158 ) -> Result<(), Error> {
1159 // Verify response comes from known peer
1160 if !self.request_manager.is_known_state_sync_peer(peer) {
1161 counters::RESPONSE_FROM_DOWNSTREAM_COUNT
1162 .with_label_values(&[
1163 &peer.raw_network_id().to_string(),
1164 &peer.peer_id().to_string(),
1165 ])
1166 .inc();
1167 self.request_manager.process_chunk_from_downstream(&peer);
1168 return Err(Error::ReceivedChunkFromDownstream(peer.to_string()));
1169 }
1170
1171 // Verify the chunk is not empty and that it starts at the correct
1172 // version
1173 if let Some(first_chunk_version) =
1174 response.txn_list_with_proof.first_transaction_version
1175 {
1176 let known_version = self.local_state.synced_version();
1177 let expected_version =
1178 known_version.checked_add(1).ok_or_else(|| {
1179 Error::IntegerOverflow(
1180 "Expected version has overflown!".into(),
1181 )
1182 })?;
1183
1184 if first_chunk_version != expected_version {
1185 self.request_manager.process_chunk_version_mismatch(
1186 &peer,
1187 first_chunk_version,
1188 known_version,
1189 )?;
1190 }
1191 } else {
1192 // The chunk is empty
1193 self.request_manager.process_empty_chunk(&peer);
1194 return Err(Error::ReceivedEmptyChunk(peer.to_string()));
1195 }
1196
1197 // Verify the chunk has the expected type for the current syncing mode
1198 match &response.response_li {
1199 ResponseLedgerInfo::LedgerInfoForWaypoint {
1200 waypoint_li,
1201 end_of_epoch_li,
1202 } => self
1203 .verify_response_with_waypoint_li(waypoint_li, end_of_epoch_li),
1204 ResponseLedgerInfo::VerifiableLedgerInfo(response_li) => {
1205 self.verify_response_with_target_and_highest(response_li, &None)
1206 }
1207 ResponseLedgerInfo::ProgressiveLedgerInfo {
1208 target_li,
1209 highest_li,
1210 } => self
1211 .verify_response_with_target_and_highest(target_li, highest_li),
1212 }
1213 }*/
1214
1215 fn verify_response_with_target_and_highest(
1216 &mut self, target_li: &LedgerInfoWithSignatures,
1217 highest_li: &Option<LedgerInfoWithSignatures>,
1218 ) -> Result<(), Error> {
1219 if !self.is_initialized() {
1220 return Err(Error::ReceivedWrongChunkType(
1221 "Received a progressive ledger info, but we're not initialized!".into(),
1222 ));
1223 }
1224
1225 // If we're syncing to a specific target for consensus, valid responses
1226 // should not exceed the ledger info version of the sync request.
1227 if let Some(sync_request) = self.sync_request.as_ref() {
1228 let sync_request_version =
1229 sync_request.target.ledger_info().version();
1230 let response_version = target_li.ledger_info().version();
1231 if sync_request_version < response_version {
1232 let error_message = format!("Verifiable ledger info version is higher than the sync target. Received: {}, requested: {}.",
1233 response_version,
1234 sync_request_version);
1235 return Err(Error::ProcessInvalidChunk(error_message));
1236 }
1237 }
1238
1239 // Valid responses should not have a highest ledger info less than
1240 // target
1241 if let Some(highest_li) = highest_li {
1242 let target_version = target_li.ledger_info().version();
1243 let highest_version = highest_li.ledger_info().version();
1244 if target_version > highest_version {
1245 let error_message = format!("Progressive ledger info has target version > highest version. Target: {}, highest: {}.",
1246 target_version,
1247 highest_version);
1248 return Err(Error::ProcessInvalidChunk(error_message));
1249 }
1250 }
1251
1252 Ok(())
1253 }
1254
1255 fn verify_response_with_waypoint_li(
1256 &mut self, waypoint_li: &LedgerInfoWithSignatures,
1257 end_of_epoch_li: &Option<LedgerInfoWithSignatures>,
1258 ) -> Result<(), Error> {
1259 if self.is_initialized() || self.sync_request.is_some() {
1260 return Err(Error::ReceivedWrongChunkType(
1261 "Received a waypoint ledger info, but we're already initialized!".into(),
1262 ));
1263 }
1264
1265 // Valid waypoint responses should not have an end_of_epoch_li version >
1266 // waypoint_li
1267 if let Some(end_of_epoch_li) = end_of_epoch_li {
1268 let end_of_epoch_version = end_of_epoch_li.ledger_info().version();
1269 let waypoint_version = waypoint_li.ledger_info().version();
1270 if end_of_epoch_version > waypoint_version {
1271 let error_message = format!("Waypoint ledger info version is less than the end_of_epoch_li version. Waypoint: {}, end_of_epoch_li: {}.",
1272 waypoint_version,
1273 end_of_epoch_version);
1274 return Err(Error::ProcessInvalidChunk(error_message));
1275 }
1276 }
1277
1278 Ok(())
1279 }
1280
1281 /// Logs the highest seen ledger info version based on the current syncing
1282 /// mode.
1283 fn log_highest_seen_version(
1284 &self, new_highest_li: Option<LedgerInfoWithSignatures>,
1285 ) {
1286 let current_highest_version = if !self.is_initialized() {
1287 self.waypoint.version()
1288 } else if let Some(sync_request) = self.sync_request.as_ref() {
1289 sync_request.target.ledger_info().version()
1290 } else if let Some(new_highest_li) = new_highest_li.as_ref() {
1291 new_highest_li.ledger_info().version()
1292 } else if let Some(target_ledger_info) =
1293 self.target_ledger_info.as_ref()
1294 {
1295 target_ledger_info.ledger_info().version()
1296 } else {
1297 self.local_state.synced_version()
1298 };
1299
1300 let highest_seen_version =
1301 counters::get_version(counters::VersionType::Highest);
1302 let highest_version =
1303 cmp::max(current_highest_version, highest_seen_version);
1304 counters::set_version(counters::VersionType::Highest, highest_version);
1305 }
1306
1307 /// Calculates the next version and epoch to request (assuming the given
1308 /// transaction list and ledger info will be applied successfully).
1309 /// Note: if no ledger info is specified, we assume the next chunk will
1310 /// be for our current epoch.
1311 fn calculate_new_known_version_and_epoch(
1312 &mut self, txn_list_with_proof: TransactionListWithProof,
1313 ledger_info: Option<LedgerInfoWithSignatures>,
1314 ) -> Result<(u64, u64), Error> {
1315 let new_version = self
1316 .local_state
1317 .synced_version()
1318 .checked_add(txn_list_with_proof.len() as u64)
1319 .ok_or_else(|| {
1320 Error::IntegerOverflow(
1321 "Potential state sync version has overflown".into(),
1322 )
1323 })?;
1324
1325 let mut new_epoch = self.local_state.trusted_epoch();
1326 if let Some(ledger_info) = ledger_info {
1327 if ledger_info.ledger_info().version() == new_version
1328 && ledger_info.ledger_info().ends_epoch()
1329 {
1330 // This chunk is going to finish the current epoch. Choose the
1331 // next one.
1332 new_epoch = new_epoch.checked_add(1).ok_or_else(|| {
1333 Error::IntegerOverflow(
1334 "Potential state sync epoch has overflown".into(),
1335 )
1336 })?;
1337 }
1338 }
1339
1340 Ok((new_version, new_epoch))
1341 }
1342
1343 /// Returns a chunk target for the highest available synchronization.
1344 fn create_highest_available_chunk_target(
1345 &self, target_ledger_info: Option<LedgerInfoWithSignatures>,
1346 ) -> TargetType {
1347 TargetType::HighestAvailable {
1348 target_li: target_ledger_info,
1349 timeout_ms: self.config.long_poll_timeout_ms,
1350 }
1351 }
1352
1353 /// Returns a chunk target for consensus request synchronization.
1354 fn create_sync_request_chunk_target(
1355 &self, known_version: u64,
1356 ) -> Result<TargetType, Error> {
1357 if let Some(sync_request) = &self.sync_request {
1358 let target_version = sync_request.target.ledger_info().version();
1359 if target_version <= known_version {
1360 Err(Error::SyncedBeyondTarget(known_version, target_version))
1361 } else {
1362 let chunk_target = self.create_highest_available_chunk_target(
1363 Some(sync_request.target.clone()),
1364 );
1365 Ok(chunk_target)
1366 }
1367 } else {
1368 Err(Error::NoSyncRequestFound(
1369 "Unable to create a sync request chunk target".into(),
1370 ))
1371 }
1372 }
1373
1374 /// Returns a chunk target for waypoint synchronization.
1375 fn create_waypoint_chunk_target(&self) -> TargetType {
1376 let waypoint_version = self.waypoint.version();
1377 TargetType::Waypoint(waypoint_version)
1378 }
1379
1380 /*/// Processing chunk responses that carry a LedgerInfo that should be
1381 /// verified using the current local trusted validator set.
1382 fn process_response_with_target_and_highest(
1383 &mut self, txn_list_with_proof: TransactionListWithProof,
1384 response_li: LedgerInfoWithSignatures,
1385 new_highest_li: Option<LedgerInfoWithSignatures>,
1386 ) -> Result<(), Error>
1387 {
1388 // Optimistically calculate the new known version and epoch (assume the
1389 // current chunk is applied successfully).
1390 let (known_version, known_epoch) = self
1391 .calculate_new_known_version_and_epoch(
1392 txn_list_with_proof.clone(),
1393 Some(response_li.clone()),
1394 )?;
1395
1396 // Send the next chunk request based on the sync mode (sync request or
1397 // highest available).
1398 if self.sync_request.is_some() {
1399 match self.create_sync_request_chunk_target(known_version) {
1400 Ok(chunk_target) => {
1401 // Send the chunk request and log any errors. If errors are
1402 // logged continue processing the chunk.
1403 let _ = self.send_chunk_request_and_log_error(
1404 known_version,
1405 known_epoch,
1406 chunk_target,
1407 LogEntry::ProcessChunkResponse,
1408 );
1409 }
1410 Err(error) => {
1411 diem_error!(LogSchema::new(LogEntry::SendChunkRequest)
1412 .error(&error));
1413 }
1414 }
1415 } else {
1416 let mut new_target_ledger_info = None;
1417 if let Some(target_ledger_info) = self.target_ledger_info.clone() {
1418 if known_version < target_ledger_info.ledger_info().version() {
1419 new_target_ledger_info = Some(target_ledger_info);
1420 }
1421 }
1422 // Send the chunk request and log any errors. If errors are logged
1423 // continue processing the chunk.
1424 let _ = self.send_chunk_request_and_log_error(
1425 known_version,
1426 known_epoch,
1427 self.create_highest_available_chunk_target(
1428 new_target_ledger_info,
1429 ),
1430 LogEntry::ProcessChunkResponse,
1431 );
1432 }
1433
1434 // Validate chunk ledger infos
1435 self.local_state.verify_ledger_info(&response_li)?;
1436 if let Some(new_highest_li) = new_highest_li.clone() {
1437 if new_highest_li != response_li {
1438 self.local_state.verify_ledger_info(&new_highest_li)?;
1439 }
1440 }
1441
1442 // Validate and store the chunk
1443 self.log_highest_seen_version(new_highest_li.clone());
1444 self.validate_and_store_chunk(txn_list_with_proof, response_li, None)?;
1445
1446 // Need to sync with local storage to update synced version
1447 self.sync_state_with_local_storage()?;
1448 let synced_version = self.local_state.synced_version();
1449
1450 // Check if we've synced beyond our current target ledger info
1451 if let Some(target_ledger_info) = &self.target_ledger_info {
1452 if synced_version >= target_ledger_info.ledger_info().version() {
1453 self.target_ledger_info = None;
1454 }
1455 }
1456
1457 // If we don't have a target ledger info, check if the new highest
1458 // is appropriate for us.
1459 if self.target_ledger_info.is_none() {
1460 if let Some(new_highest_li) = new_highest_li {
1461 if synced_version < new_highest_li.ledger_info().version() {
1462 self.target_ledger_info = Some(new_highest_li);
1463 }
1464 }
1465 }
1466
1467 Ok(())
1468 }*/
1469
1470 /*/// Processing chunk responses that carry a LedgerInfo corresponding to the
1471 /// waypoint.
1472 fn process_response_with_waypoint_li(
1473 &mut self, txn_list_with_proof: TransactionListWithProof,
1474 waypoint_li: LedgerInfoWithSignatures,
1475 end_of_epoch_li: Option<LedgerInfoWithSignatures>,
1476 ) -> Result<(), Error>
1477 {
1478 // Optimistically calculate the new known version and epoch (assume the
1479 // current chunk is applied successfully).
1480 let (known_version, known_epoch) = self
1481 .calculate_new_known_version_and_epoch(
1482 txn_list_with_proof.clone(),
1483 end_of_epoch_li.clone(),
1484 )?;
1485 if known_version < self.waypoint.version() {
1486 // Send the chunk request and log any errors. If errors are logged
1487 // continue processing the chunk.
1488 let _ = self.send_chunk_request_and_log_error(
1489 known_version,
1490 known_epoch,
1491 self.create_waypoint_chunk_target(),
1492 LogEntry::ProcessChunkResponse,
1493 );
1494 }
1495
1496 // Verify the end_of_epoch_li against local state and ensure the version
1497 // corresponds to the version at the end of the chunk.
1498 // The executor expects that when it is passed an end_of_epoch_li to
1499 // commit, it is going to execute/commit transactions leading up
1500 // to that li, so we also verify that the end_of_epoch_li
1501 // actually ends the epoch.
1502 let end_of_epoch_li_to_commit = if let Some(end_of_epoch_li) =
1503 end_of_epoch_li
1504 {
1505 self.local_state.verify_ledger_info(&end_of_epoch_li)?;
1506
1507 let ledger_info = end_of_epoch_li.ledger_info();
1508 if !ledger_info.ends_epoch() {
1509 return Err(Error::ProcessInvalidChunk(
1510 "Received waypoint ledger info with an end_of_epoch_li that does not end the epoch!".into(),
1511 ));
1512 }
1513
1514 // If we're now at the end of epoch version (i.e., known_version is
1515 // the same as the end_of_epoch_li version), the
1516 // end_of_epoch_li should be passed to storage so that we
1517 // can commit the end_of_epoch_li. If not, storage should only sync
1518 // the given chunk.
1519 if ledger_info.version() == known_version {
1520 Some(end_of_epoch_li)
1521 } else {
1522 None
1523 }
1524 } else {
1525 None
1526 };
1527 self.waypoint
1528 .verify(waypoint_li.ledger_info())
1529 .map_err(|error| {
1530 Error::UnexpectedError(format!(
1531 "Waypoint verification failed: {}",
1532 error
1533 ))
1534 })?;
1535
1536 self.validate_and_store_chunk(
1537 txn_list_with_proof,
1538 waypoint_li,
1539 end_of_epoch_li_to_commit,
1540 )?;
1541 self.log_highest_seen_version(None);
1542
1543 Ok(())
1544 }*/
1545
1546 // Assumes that the target LI has been already verified by the caller.
1547 fn validate_and_store_chunk(
1548 &mut self, txn_list_with_proof: TransactionListWithProof,
1549 target: LedgerInfoWithSignatures,
1550 intermediate_end_of_epoch_li: Option<LedgerInfoWithSignatures>,
1551 ) -> Result<(), Error> {
1552 let target_epoch = target.ledger_info().epoch();
1553 let target_version = target.ledger_info().version();
1554 let local_epoch = self.local_state.committed_epoch();
1555 let local_version = self.local_state.committed_version();
1556 if (target_epoch, target_version) <= (local_epoch, local_version) {
1557 diem_warn!(
1558 LogSchema::event_log(
1559 LogEntry::ProcessChunkResponse,
1560 LogEvent::OldResponseLI
1561 )
1562 .local_li_version(local_version)
1563 .local_epoch(local_epoch),
1564 response_li_version = target_version,
1565 response_li_epoch = target_epoch
1566 );
1567 return Ok(());
1568 }
1569
1570 self.executor_proxy.execute_chunk(
1571 txn_list_with_proof,
1572 target,
1573 intermediate_end_of_epoch_li,
1574 )
1575 }
1576
1577 /// Returns true if consensus is currently executing and state sync should
1578 /// therefore not write to storage. Reads are still permitted (e.g., to
1579 /// handle chunk requests).
1580 fn is_consensus_executing(&mut self) -> bool {
1581 self.is_initialized()
1582 && self.role == RoleType::Validator
1583 && self.sync_request.is_none()
1584 }
1585
1586 /// Ensures that state sync is making progress:
1587 /// * Kick starts the initial sync process (e.g., syncing to a waypoint or
1588 /// target).
1589 /// * Issues a new request if too much time has passed since the last
1590 /// request was sent.
1591 fn check_progress(&mut self) -> Result<(), Error> {
1592 if self.is_consensus_executing() {
1593 return Ok(()); // No need to check progress or issue any requests
1594 // (consensus is running).
1595 }
1596
1597 // Check if the sync request has timed out (i.e., if we aren't
1598 // committing fast enough)
1599 if let Some(sync_request) = self.sync_request.as_ref() {
1600 let timeout_between_commits =
1601 Duration::from_millis(self.config.sync_request_timeout_ms);
1602 let commit_deadline = sync_request
1603 .last_commit_timestamp
1604 .checked_add(timeout_between_commits)
1605 .ok_or_else(|| {
1606 Error::IntegerOverflow(
1607 "The commit deadline timestamp has overflown!".into(),
1608 )
1609 })?;
1610
1611 // Check if the commit deadline has been exceeded.
1612 if SystemTime::now().duration_since(commit_deadline).is_ok() {
1613 counters::SYNC_REQUEST_RESULT
1614 .with_label_values(&[counters::TIMEOUT_LABEL])
1615 .inc();
1616 diem_warn!(LogSchema::event_log(
1617 LogEntry::SyncRequest,
1618 LogEvent::Timeout
1619 ));
1620
1621 // Remove the sync request and notify consensus that the request
1622 // timed out!
1623 if let Some(sync_request) = self.sync_request.take() {
1624 if let Err(e) = Self::send_sync_req_callback(
1625 sync_request,
1626 Err(Error::UnexpectedError(
1627 "Sync request timed out!".into(),
1628 )),
1629 ) {
1630 diem_error!(LogSchema::event_log(
1631 LogEntry::SyncRequest,
1632 LogEvent::CallbackFail
1633 )
1634 .error(&e));
1635 }
1636 }
1637 }
1638 }
1639
1640 // If the coordinator didn't make progress by the expected time or did
1641 // not send a request for the current local synced version,
1642 // issue a new request.
1643 /*let known_version = self.local_state.synced_version();
1644 if self.request_manager.has_request_timed_out(known_version)? {
1645 counters::TIMEOUT.inc();
1646 diem_warn!(LogSchema::new(LogEntry::Timeout).version(known_version));
1647
1648 let trusted_epoch = self.local_state.trusted_epoch();
1649 let chunk_target = if !self.is_initialized() {
1650 self.create_waypoint_chunk_target()
1651 } else if self.sync_request.is_some() {
1652 self.create_sync_request_chunk_target(known_version)?
1653 } else {
1654 self.create_highest_available_chunk_target(
1655 self.target_ledger_info.clone(),
1656 )
1657 };
1658 self.send_chunk_request_and_log_error(
1659 known_version,
1660 trusted_epoch,
1661 chunk_target,
1662 LogEntry::Timeout,
1663 )
1664 } else {
1665 Ok(())
1666 }*/
1667 Ok(())
1668 }
1669
1670 /*/// Sends a chunk request with a given `known_version`, `known_epoch` and
1671 /// `chunk_target`. Immediately logs any errors returned by the
1672 /// operation using the given log entry.
1673 fn send_chunk_request_and_log_error(
1674 &mut self, known_version: u64, known_epoch: u64,
1675 chunk_target: TargetType, log_entry: LogEntry,
1676 ) -> Result<(), Error>
1677 {
1678 if let Err(error) = self.send_chunk_request_with_target(
1679 known_version,
1680 known_epoch,
1681 chunk_target,
1682 ) {
1683 diem_error!(LogSchema::event_log(
1684 log_entry,
1685 LogEvent::SendChunkRequestFail
1686 )
1687 .version(known_version)
1688 .local_epoch(known_epoch)
1689 .error(&error));
1690 Err(error)
1691 } else {
1692 Ok(())
1693 }
1694 }*/
1695
1696 /*/// Sends a chunk request with a given `known_version`, `known_epoch` and
1697 /// `target`.
1698 fn send_chunk_request_with_target(
1699 &mut self, known_version: u64, known_epoch: u64, target: TargetType,
1700 ) -> Result<(), Error> {
1701 if self.request_manager.no_available_peers() {
1702 diem_warn!(LogSchema::event_log(
1703 LogEntry::SendChunkRequest,
1704 LogEvent::MissingPeers
1705 ));
1706 return Err(Error::NoAvailablePeers(
1707 "No peers to send chunk request to!".into(),
1708 ));
1709 }
1710
1711 let target_version = target
1712 .version()
1713 .unwrap_or_else(|| known_version.wrapping_add(1));
1714 counters::set_version(counters::VersionType::Target, target_version);
1715
1716 let req = GetChunkRequest::new(
1717 known_version,
1718 known_epoch,
1719 self.config.chunk_limit,
1720 target,
1721 );
1722 self.request_manager.send_chunk_request(req)
1723 }*/
1724
1725 fn deliver_subscription(
1726 &mut self, peer: PeerNetworkId, request_info: PendingRequestInfo,
1727 local_version: u64,
1728 ) -> Result<(), Error> {
1729 let (target_li, highest_li) = self.calculate_target_and_highest_li(
1730 request_info.request_epoch,
1731 request_info.target_li,
1732 local_version,
1733 )?;
1734
1735 self.deliver_chunk(
1736 peer,
1737 request_info.known_version,
1738 ResponseLedgerInfo::ProgressiveLedgerInfo {
1739 target_li,
1740 highest_li,
1741 },
1742 request_info.chunk_limit,
1743 )
1744 }
1745
1746 /// The function is called after the local storage is updated with new
1747 /// transactions: it might deliver chunks for the subscribers that have
1748 /// been waiting with the long polls.
1749 ///
1750 /// Note that it is possible to help the subscribers only with the
1751 /// transactions that match the highest ledger info in the local storage
1752 /// (some committed transactions are ahead of the latest ledger info and
1753 /// are not going to be used for helping the remote subscribers).
1754 /// The function assumes that the local state has been synced with storage.
1755 fn check_subscriptions(&mut self) {
1756 let highest_li_version = self.local_state.committed_version();
1757
1758 let mut ready = vec![];
1759 self.subscriptions.retain(|peer, request_info| {
1760 // filter out expired peer requests
1761 if SystemTime::now()
1762 .duration_since(request_info.expiration_time)
1763 .is_ok()
1764 {
1765 return false;
1766 }
1767 if request_info.known_version < highest_li_version {
1768 ready.push((peer.clone(), request_info.clone()));
1769 false
1770 } else {
1771 true
1772 }
1773 });
1774
1775 ready.into_iter().for_each(|(peer, request_info)| {
1776 let result_label = if let Err(err) = self.deliver_subscription(
1777 peer.clone(),
1778 request_info,
1779 highest_li_version,
1780 ) {
1781 diem_error!(LogSchema::new(LogEntry::SubscriptionDeliveryFail)
1782 .peer(&peer)
1783 .error(&err));
1784 counters::FAIL_LABEL
1785 } else {
1786 counters::SUCCESS_LABEL
1787 };
1788 counters::SUBSCRIPTION_DELIVERY_COUNT
1789 .with_label_values(&[
1790 &peer.raw_network_id().to_string(),
1791 &peer.peer_id().to_string(),
1792 result_label,
1793 ])
1794 .inc();
1795 });
1796 }
1797
1798 fn send_sync_req_callback(
1799 sync_req: SyncRequest, msg: Result<(), Error>,
1800 ) -> Result<(), Error> {
1801 sync_req.callback.send(msg).map_err(|failed_msg| {
1802 counters::FAILED_CHANNEL_SEND
1803 .with_label_values(&[counters::CONSENSUS_SYNC_REQ_CALLBACK])
1804 .inc();
1805 Error::UnexpectedError(format!(
1806 "Consensus sync request callback error - failed to send the following message: {:?}",
1807 failed_msg
1808 ))
1809 })
1810 }
1811
1812 fn send_initialization_callback(
1813 callback: oneshot::Sender<Result<(), Error>>,
1814 ) -> Result<(), Error> {
1815 match callback.send(Ok(())) {
1816 Err(error) => {
1817 counters::FAILED_CHANNEL_SEND
1818 .with_label_values(&[counters::WAYPOINT_INIT_CALLBACK])
1819 .inc();
1820 Err(Error::CallbackSendFailed(format!(
1821 "Waypoint initialization callback error - failed to send following msg: {:?}",
1822 error
1823 )))
1824 }
1825 _ => Ok(()),
1826 }
1827 }
1828}
1829
1830/*
1831#[cfg(test)]
1832mod tests {
1833 use crate::pos::{
1834 mempool::CommitResponse,
1835 state_sync::{
1836 chunk_request::{GetChunkRequest, TargetType},
1837 chunk_response::{GetChunkResponse, ResponseLedgerInfo},
1838 client::SyncRequest,
1839 coordinator::StateSyncCoordinator,
1840 error::Error,
1841 executor_proxy::ExecutorProxy,
1842 network::StateSyncMessage,
1843 shared_components::test_utils::{
1844 self, create_coordinator_with_config_and_waypoint,
1845 },
1846 },
1847 };
1848 use diem_config::{
1849 config::{NodeConfig, PeerNetworkId, PeerRole, RoleType},
1850 network_id::{NetworkId, NodeNetworkId},
1851 };
1852 use diem_crypto::{
1853 ed25519::{Ed25519PrivateKey, Ed25519Signature},
1854 HashValue, PrivateKey, Uniform,
1855 };
1856 use diem_types::{
1857 account_address::AccountAddress,
1858 block_info::BlockInfo,
1859 chain_id::ChainId,
1860 ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
1861 proof::TransactionListProof,
1862 transaction::{
1863 RawTransaction, Script, SignedTransaction, Transaction,
1864 TransactionListWithProof, TransactionPayload, Version,
1865 },
1866 waypoint::Waypoint,
1867 PeerId,
1868 };
1869 use futures::{channel::oneshot, executor::block_on};
1870 use netcore::transport::ConnectionOrigin;
1871 use network::transport::ConnectionMetadata;
1872 use std::{collections::BTreeMap, time::SystemTime};
1873
1874 #[test]
1875 fn test_process_sync_request() {
1876 // Create a coordinator for a full node
1877 let mut full_node_coordinator =
1878 test_utils::create_full_node_coordinator();
1879
1880 // Verify that fullnodes can't process sync requests
1881 let (sync_request, _) = create_sync_request_at_version(0);
1882 let process_result =
1883 full_node_coordinator.process_sync_request(sync_request);
1884 if !matches!(process_result, Err(Error::FullNodeSyncRequest)) {
1885 panic!(
1886 "Expected an full node sync request error, but got: {:?}",
1887 process_result
1888 );
1889 }
1890
1891 // Create a coordinator for a validator node
1892 let mut validator_coordinator =
1893 test_utils::create_validator_coordinator();
1894
1895 // Perform sync request for version that matches initial waypoint
1896 // version
1897 let (sync_request, mut callback_receiver) =
1898 create_sync_request_at_version(0);
1899 validator_coordinator
1900 .process_sync_request(sync_request)
1901 .unwrap();
1902 match callback_receiver.try_recv() {
1903 Ok(Some(result)) => assert!(result.is_ok()),
1904 result => panic!("Expected okay but got: {:?}", result),
1905 };
1906
1907 // Create validator coordinator with waypoint higher than 0
1908 let waypoint_version = 10;
1909 let waypoint_ledger_info =
1910 create_ledger_info_at_version(waypoint_version);
1911 let waypoint = Waypoint::new_any(&waypoint_ledger_info.ledger_info());
1912 let mut validator_coordinator =
1913 create_coordinator_with_config_and_waypoint(
1914 NodeConfig::default(),
1915 waypoint,
1916 );
1917
1918 // Verify coordinator won't process sync requests as it's not yet
1919 // initialized
1920 let (sync_request, mut callback_receiver) =
1921 create_sync_request_at_version(10);
1922 let process_result =
1923 validator_coordinator.process_sync_request(sync_request);
1924 if !matches!(process_result, Err(Error::UninitializedError(..))) {
1925 panic!(
1926 "Expected an uninitialized error, but got: {:?}",
1927 process_result
1928 );
1929 }
1930 let callback_result = callback_receiver.try_recv();
1931 if !matches!(callback_result, Err(_)) {
1932 panic!("Expected error but got: {:?}", callback_result);
1933 }
1934
1935 // TODO(joshlind): add a check for syncing to old versions once we
1936 // support storage modifications in unit tests.
1937 }
1938
1939 #[test]
1940 fn test_get_sync_state() {
1941 // Create a coordinator for a validator node
1942 let mut validator_coordinator =
1943 test_utils::create_validator_coordinator();
1944
1945 // Get the sync state from state sync
1946 let (callback_sender, mut callback_receiver) = oneshot::channel();
1947 validator_coordinator
1948 .get_sync_state(callback_sender)
1949 .unwrap();
1950 match callback_receiver.try_recv() {
1951 Ok(Some(sync_state)) => {
1952 assert_eq!(sync_state.committed_version(), 0);
1953 }
1954 result => panic!("Expected okay but got: {:?}", result),
1955 };
1956
1957 // Drop the callback receiver and verify error
1958 let (callback_sender, _) = oneshot::channel();
1959 let sync_state_result =
1960 validator_coordinator.get_sync_state(callback_sender);
1961 if !matches!(sync_state_result, Err(Error::CallbackSendFailed(..))) {
1962 panic!("Expected error but got: {:?}", sync_state_result);
1963 }
1964 }
1965
1966 #[test]
1967 fn test_wait_for_initialization() {
1968 // Create a coordinator for a validator node
1969 let mut validator_coordinator =
1970 test_utils::create_validator_coordinator();
1971
1972 // Check already initialized returns immediately
1973 let (callback_sender, mut callback_receiver) = oneshot::channel();
1974 validator_coordinator
1975 .wait_for_initialization(callback_sender)
1976 .unwrap();
1977 match callback_receiver.try_recv() {
1978 Ok(Some(result)) => assert!(result.is_ok()),
1979 result => panic!("Expected okay but got: {:?}", result),
1980 };
1981
1982 // Drop the callback receiver and verify error
1983 let (callback_sender, _) = oneshot::channel();
1984 let initialization_result =
1985 validator_coordinator.wait_for_initialization(callback_sender);
1986 if !matches!(initialization_result, Err(Error::CallbackSendFailed(..)))
1987 {
1988 panic!("Expected error but got: {:?}", initialization_result);
1989 }
1990
1991 // Create a coordinator with the waypoint version higher than 0
1992 let waypoint_version = 10;
1993 let waypoint_ledger_info =
1994 create_ledger_info_at_version(waypoint_version);
1995 let waypoint = Waypoint::new_any(&waypoint_ledger_info.ledger_info());
1996 let mut validator_coordinator =
1997 create_coordinator_with_config_and_waypoint(
1998 NodeConfig::default(),
1999 waypoint,
2000 );
2001
2002 // Verify callback is not executed as state sync is not yet initialized
2003 let (callback_sender, mut callback_receiver) = oneshot::channel();
2004 validator_coordinator
2005 .wait_for_initialization(callback_sender)
2006 .unwrap();
2007 let callback_result = callback_receiver.try_recv();
2008 if !matches!(callback_result, Ok(None)) {
2009 panic!("Expected none but got: {:?}", callback_result);
2010 }
2011
2012 // TODO(joshlind): add a check that verifies the callback is executed
2013 // once we can update storage in the unit tests.
2014 }
2015
2016 #[test]
2017 fn test_process_commit_notification() {
2018 // Create a coordinator for a validator node
2019 let mut validator_coordinator =
2020 test_utils::create_validator_coordinator();
2021
2022 // Verify that a commit notification with no transactions doesn't
2023 // diem_error!
2024 block_on(validator_coordinator.process_commit_notification(
2025 vec![],
2026 None,
2027 vec![],
2028 None,
2029 ))
2030 .unwrap();
2031
2032 // Verify that consensus is sent a commit ack when everything works
2033 let (callback_sender, mut callback_receiver) =
2034 oneshot::channel::<Result<CommitResponse, Error>>();
2035 block_on(validator_coordinator.process_commit_notification(
2036 vec![],
2037 Some(callback_sender),
2038 vec![],
2039 None,
2040 ))
2041 .unwrap();
2042 let callback_result = callback_receiver.try_recv();
2043 if !matches!(callback_result, Ok(Some(Ok(..)))) {
2044 panic!("Expected an okay result but got: {:?}", callback_result);
2045 }
2046
2047 // TODO(joshlind): verify that mempool is sent the correct transactions!
2048 let (callback_sender, _callback_receiver) =
2049 oneshot::channel::<Result<CommitResponse, Error>>();
2050 let committed_transactions = vec![create_test_transaction()];
2051 block_on(validator_coordinator.process_commit_notification(
2052 committed_transactions,
2053 Some(callback_sender),
2054 vec![],
2055 None,
2056 ))
2057 .unwrap();
2058
2059 // TODO(joshlind): check initialized is fired when unit tests support
2060 // storage modifications.
2061
2062 // TODO(joshlind): check sync request is called when unit tests support
2063 // storage modifications.
2064
2065 // TODO(joshlind): test that long poll requests are handled
2066 // appropriately when new unit tests support this.
2067
2068 // TODO(joshlind): test that reconfiguration events are handled
2069 // appropriately and listeners are notified.
2070 }
2071
2072 #[test]
2073 fn test_check_progress() {
2074 // Create a coordinator for a validator node
2075 let mut validator_coordinator =
2076 test_utils::create_validator_coordinator();
2077
2078 // Verify no error is returned when consensus is running
2079 validator_coordinator.check_progress().unwrap();
2080
2081 // Send a sync request to state sync (to mark that consensus is no
2082 // longer running)
2083 let (sync_request, _) = create_sync_request_at_version(1);
2084 let _ = validator_coordinator.process_sync_request(sync_request);
2085
2086 // Verify the no available peers error is returned
2087 let progress_result = validator_coordinator.check_progress();
2088 if !matches!(progress_result, Err(Error::NoAvailablePeers(..))) {
2089 panic!("Expected an err result but got: {:?}", progress_result);
2090 }
2091
2092 // Create validator coordinator with tiny state sync timeout
2093 let mut node_config = NodeConfig::default();
2094 node_config.base.role = RoleType::Validator;
2095 node_config.state_sync.sync_request_timeout_ms = 0;
2096 let mut validator_coordinator =
2097 create_coordinator_with_config_and_waypoint(
2098 node_config,
2099 Waypoint::default(),
2100 );
2101
2102 // Set a new sync request
2103 let (sync_request, mut callback_receiver) =
2104 create_sync_request_at_version(1);
2105 let _ = validator_coordinator.process_sync_request(sync_request);
2106
2107 // Verify sync request timeout notifies the callback
2108 validator_coordinator.check_progress().unwrap_err();
2109 let callback_result = callback_receiver.try_recv();
2110 if !matches!(callback_result, Ok(Some(Err(..)))) {
2111 panic!("Expected an err result but got: {:?}", callback_result);
2112 }
2113
2114 // TODO(joshlind): check request resend after timeout.
2115
2116 // TODO(joshlind): check overflow error returns.
2117
2118 // TODO(joshlind): test that check progress passes when there are valid
2119 // peers.
2120 }
2121
2122 #[test]
2123 fn test_new_and_lost_peers() {
2124 // Create a coordinator for a validator node
2125 let mut validator_coordinator =
2126 test_utils::create_validator_coordinator();
2127
2128 // Create a public peer
2129 let node_network_id = NodeNetworkId::new(NetworkId::Public, 0);
2130 let peer_id = PeerId::random();
2131 let connection_metadata = ConnectionMetadata::mock_with_role_and_origin(
2132 peer_id,
2133 PeerRole::Validator,
2134 ConnectionOrigin::Inbound,
2135 );
2136
2137 // Verify error is returned when adding peer that is not a valid peer
2138 let new_peer_result = validator_coordinator
2139 .process_new_peer(node_network_id, connection_metadata.clone());
2140 if !matches!(new_peer_result, Err(Error::InvalidStateSyncPeer(..))) {
2141 panic!(
2142 "Expected an invalid peer error but got: {:?}",
2143 new_peer_result
2144 );
2145 }
2146
2147 // Verify the same error is not returned when adding a validator node
2148 let node_network_id = NodeNetworkId::new(NetworkId::Validator, 0);
2149 let new_peer_result = validator_coordinator
2150 .process_new_peer(node_network_id.clone(), connection_metadata);
2151 if matches!(new_peer_result, Err(Error::InvalidStateSyncPeer(..))) {
2152 panic!(
2153 "Expected not to receive an invalid peer error but got: {:?}",
2154 new_peer_result
2155 );
2156 }
2157
2158 // Verify no error is returned when removing the node
2159 validator_coordinator
2160 .process_lost_peer(node_network_id, peer_id)
2161 .unwrap();
2162 }
2163
2164 #[test]
2165 fn test_invalid_chunk_request_messages() {
2166 // Create a coordinator for a validator node
2167 let mut validator_coordinator =
2168 test_utils::create_validator_coordinator();
2169
2170 // Constants for the chunk requests
2171 let peer_network_id = PeerNetworkId::random();
2172 let current_epoch = 0;
2173 let chunk_limit = 250;
2174 let timeout_ms = 1000;
2175
2176 // Create chunk requests with a known version higher than the target
2177 let known_version = 100;
2178 let target_version = 10;
2179 let chunk_requests = create_chunk_requests(
2180 known_version,
2181 current_epoch,
2182 chunk_limit,
2183 target_version,
2184 timeout_ms,
2185 );
2186
2187 // Verify invalid request errors are thrown
2188 verify_all_chunk_requests_are_invalid(
2189 &mut validator_coordinator,
2190 &peer_network_id,
2191 &chunk_requests,
2192 );
2193
2194 // Create chunk requests with a current epoch higher than the target
2195 // epoch
2196 let known_version = 0;
2197 let current_epoch = 100;
2198 let chunk_requests = create_chunk_requests(
2199 known_version,
2200 current_epoch,
2201 chunk_limit,
2202 target_version,
2203 timeout_ms,
2204 );
2205
2206 // Verify invalid request errors are thrown
2207 verify_all_chunk_requests_are_invalid(
2208 &mut validator_coordinator,
2209 &peer_network_id,
2210 &chunk_requests[1..2], // Ignore waypoint request
2211 );
2212
2213 // Create chunk requests with a chunk limit size of 0 (which is a
2214 // pointless request)
2215 let chunk_limit = 0;
2216 let chunk_requests = create_chunk_requests(
2217 known_version,
2218 current_epoch,
2219 chunk_limit,
2220 target_version,
2221 timeout_ms,
2222 );
2223
2224 // Verify invalid request errors are thrown
2225 verify_all_chunk_requests_are_invalid(
2226 &mut validator_coordinator,
2227 &peer_network_id,
2228 &chunk_requests,
2229 );
2230
2231 // Create chunk requests with a long poll timeout of 0 (which is a
2232 // pointless request)
2233 let chunk_limit = 0;
2234 let chunk_requests = create_chunk_requests(
2235 known_version,
2236 current_epoch,
2237 chunk_limit,
2238 target_version,
2239 timeout_ms,
2240 );
2241
2242 // Verify invalid request errors are thrown
2243 verify_all_chunk_requests_are_invalid(
2244 &mut validator_coordinator,
2245 &peer_network_id,
2246 &chunk_requests,
2247 );
2248 }
2249
2250 #[test]
2251 fn test_process_chunk_response_messages() {
2252 // Create a coordinator for a validator node
2253 let mut validator_coordinator =
2254 test_utils::create_validator_coordinator();
2255
2256 // Create a peer and empty chunk responses
2257 let peer_network_id = PeerNetworkId::random_validator();
2258 let empty_chunk_responses = create_empty_chunk_responses(10);
2259
2260 // Verify a consensus error is returned when processing each chunk
2261 for chunk_response in &empty_chunk_responses {
2262 let result = block_on(validator_coordinator.process_chunk_message(
2263 peer_network_id.network_id(),
2264 peer_network_id.peer_id(),
2265 chunk_response.clone(),
2266 ));
2267 if !matches!(result, Err(Error::ConsensusIsExecuting)) {
2268 panic!("Expected consensus executing error, got: {:?}", result);
2269 }
2270 }
2271
2272 // Make a sync request (to force consensus to yield)
2273 let (sync_request, _) = create_sync_request_at_version(10);
2274 let _ = validator_coordinator.process_sync_request(sync_request);
2275
2276 // Verify we now get a downstream error (as the peer is downstream to
2277 // us)
2278 for chunk_response in &empty_chunk_responses {
2279 let result = block_on(validator_coordinator.process_chunk_message(
2280 peer_network_id.network_id(),
2281 peer_network_id.peer_id(),
2282 chunk_response.clone(),
2283 ));
2284 if !matches!(result, Err(Error::ReceivedChunkFromDownstream(..))) {
2285 panic!("Expected a downstream error, but got: {:?}", result);
2286 }
2287 }
2288
2289 // Add the peer to our known peers
2290 process_new_peer_event(&mut validator_coordinator, &peer_network_id);
2291
2292 // Verify we now get an empty chunk error
2293 for chunk_response in &empty_chunk_responses {
2294 let result = block_on(validator_coordinator.process_chunk_message(
2295 peer_network_id.network_id(),
2296 peer_network_id.peer_id(),
2297 chunk_response.clone(),
2298 ));
2299 if !matches!(result, Err(Error::ReceivedEmptyChunk(..))) {
2300 panic!("Expected an empty chunk error, got: {:?}", result);
2301 }
2302 }
2303
2304 // Send a non-empty chunk with a version mismatch and verify a mismatch
2305 // error is returned
2306 let chunk_responses = create_non_empty_chunk_responses(10);
2307 for chunk_response in &chunk_responses {
2308 let result = block_on(validator_coordinator.process_chunk_message(
2309 peer_network_id.network_id(),
2310 peer_network_id.peer_id(),
2311 chunk_response.clone(),
2312 ));
2313 if !matches!(result, Err(Error::ReceivedNonSequentialChunk(..))) {
2314 panic!(
2315 "Expected a non-sequential error, but got: {:?}",
2316 result
2317 );
2318 }
2319 }
2320 }
2321
2322 #[test]
2323 fn test_process_chunk_response_highest() {
2324 // Create a coordinator for a full node
2325 let mut full_node_coordinator =
2326 test_utils::create_full_node_coordinator();
2327
2328 // Create a peer for the node and add the peer as a known peer
2329 let peer_network_id = PeerNetworkId::random_validator();
2330 process_new_peer_event(&mut full_node_coordinator, &peer_network_id);
2331
2332 // Verify wrong chunk type for non-highest messages
2333 let chunk_responses = create_non_empty_chunk_responses(1);
2334 verify_all_chunk_responses_are_the_wrong_type(
2335 &mut full_node_coordinator,
2336 &peer_network_id,
2337 &chunk_responses[0..1], /* Ignore the target and highest chunk
2338 * responses */
2339 );
2340
2341 // Verify highest known version must be greater than target version
2342 let response_ledger_info = ResponseLedgerInfo::ProgressiveLedgerInfo {
2343 target_li: create_ledger_info_at_version(100),
2344 highest_li: Some(create_ledger_info_at_version(10)),
2345 };
2346 let highest_response = create_chunk_response_message(
2347 response_ledger_info,
2348 create_dummy_transaction_list_with_proof(1),
2349 );
2350 verify_all_chunk_responses_are_invalid(
2351 &mut full_node_coordinator,
2352 &peer_network_id,
2353 &[highest_response],
2354 );
2355
2356 // Verify invalid ledger infos are rejected
2357 let response_ledger_info = ResponseLedgerInfo::ProgressiveLedgerInfo {
2358 target_li: create_ledger_info_at_version(100),
2359 highest_li: None,
2360 };
2361 let highest_response = create_chunk_response_message(
2362 response_ledger_info,
2363 create_dummy_transaction_list_with_proof(1),
2364 );
2365 verify_all_chunk_responses_are_invalid(
2366 &mut full_node_coordinator,
2367 &peer_network_id,
2368 &[highest_response],
2369 );
2370 }
2371
2372 #[test]
2373 fn test_process_chunk_response_target() {
2374 // Create a coordinator for a validator
2375 let mut validator_coordinator =
2376 test_utils::create_validator_coordinator();
2377
2378 // Create a peer for the node and add the peer as a known peer
2379 let peer_network_id = PeerNetworkId::random_validator();
2380 process_new_peer_event(&mut validator_coordinator, &peer_network_id);
2381
2382 // Make a sync request (to force consensus to yield)
2383 let (sync_request, _) = create_sync_request_at_version(10);
2384 let _ = validator_coordinator.process_sync_request(sync_request);
2385
2386 // Verify wrong chunk type for waypoint message
2387 let chunk_responses = create_non_empty_chunk_responses(1);
2388 verify_all_chunk_responses_are_the_wrong_type(
2389 &mut validator_coordinator,
2390 &peer_network_id,
2391 &chunk_responses[0..1], /* Ignore the target and highest chunk
2392 * responses */
2393 );
2394
2395 // Verify ledger info version doesn't exceed sync request version
2396 let ledger_info = create_ledger_info_at_version(100);
2397 let response_ledger_info =
2398 ResponseLedgerInfo::VerifiableLedgerInfo(ledger_info);
2399 let target_response = create_chunk_response_message(
2400 response_ledger_info,
2401 create_dummy_transaction_list_with_proof(1),
2402 );
2403 verify_all_chunk_responses_are_invalid(
2404 &mut validator_coordinator,
2405 &peer_network_id,
2406 &[target_response],
2407 );
2408
2409 // Verify invalid ledger infos are rejected
2410 let ledger_info = create_ledger_info_at_version(5);
2411 let response_ledger_info =
2412 ResponseLedgerInfo::VerifiableLedgerInfo(ledger_info);
2413 let target_response = create_chunk_response_message(
2414 response_ledger_info,
2415 create_dummy_transaction_list_with_proof(1),
2416 );
2417 verify_all_chunk_responses_are_invalid(
2418 &mut validator_coordinator,
2419 &peer_network_id,
2420 &[target_response],
2421 );
2422 }
2423
2424 #[test]
2425 fn test_process_chunk_response_waypoint() {
2426 // Create a coordinator for a validator node with waypoint version of 10
2427 let waypoint_ledger_info = create_ledger_info_at_version(10);
2428 let waypoint = Waypoint::new_any(&waypoint_ledger_info.ledger_info());
2429 let mut validator_coordinator =
2430 create_coordinator_with_config_and_waypoint(
2431 NodeConfig::default(),
2432 waypoint,
2433 );
2434
2435 // Create a peer for the node and add the peer as a known peer
2436 let peer_network_id = PeerNetworkId::random_validator();
2437 process_new_peer_event(&mut validator_coordinator, &peer_network_id);
2438
2439 // Verify wrong chunk type for non-waypoint messages
2440 let chunk_responses = create_non_empty_chunk_responses(1);
2441 verify_all_chunk_responses_are_the_wrong_type(
2442 &mut validator_coordinator,
2443 &peer_network_id,
2444 &chunk_responses[1..=2], // Ignore the waypoint chunk response
2445 );
2446
2447 // Verify end of epoch version is less than waypoint version
2448 let response_ledger_info = ResponseLedgerInfo::LedgerInfoForWaypoint {
2449 waypoint_li: create_ledger_info_at_version(10),
2450 end_of_epoch_li: Some(create_ledger_info_at_version(100)),
2451 };
2452 let waypoint_response = create_chunk_response_message(
2453 response_ledger_info,
2454 create_dummy_transaction_list_with_proof(1),
2455 );
2456 verify_all_chunk_responses_are_invalid(
2457 &mut validator_coordinator,
2458 &peer_network_id,
2459 &[waypoint_response],
2460 );
2461
2462 // Verify that invalid waypoint ledger infos are rejected
2463 let response_ledger_info = ResponseLedgerInfo::LedgerInfoForWaypoint {
2464 waypoint_li: create_ledger_info_at_version(10),
2465 end_of_epoch_li: Some(create_ledger_info_at_version(10)),
2466 };
2467 let waypoint_response = create_chunk_response_message(
2468 response_ledger_info,
2469 create_dummy_transaction_list_with_proof(1),
2470 );
2471 verify_all_chunk_responses_are_invalid(
2472 &mut validator_coordinator,
2473 &peer_network_id,
2474 &[waypoint_response],
2475 );
2476 }
2477
2478 fn create_test_transaction() -> Transaction {
2479 let private_key = Ed25519PrivateKey::generate_for_testing();
2480 let public_key = private_key.public_key();
2481
2482 let transaction_payload =
2483 TransactionPayload::Script(Script::new(vec![], vec![], vec![]));
2484 let raw_transaction = RawTransaction::new(
2485 AccountAddress::random(),
2486 0,
2487 transaction_payload,
2488 0,
2489 0,
2490 "".into(),
2491 0,
2492 ChainId::new(10),
2493 );
2494 let signed_transaction = SignedTransaction::new(
2495 raw_transaction,
2496 public_key,
2497 Ed25519Signature::dummy_signature(),
2498 );
2499
2500 Transaction::UserTransaction(signed_transaction)
2501 }
2502
2503 fn create_ledger_info_at_version(
2504 version: Version,
2505 ) -> LedgerInfoWithSignatures {
2506 let block_info = BlockInfo::new(
2507 0,
2508 0,
2509 HashValue::zero(),
2510 HashValue::zero(),
2511 version,
2512 0,
2513 None,
2514 );
2515 let ledger_info = LedgerInfo::new(block_info, HashValue::random());
2516 LedgerInfoWithSignatures::new(ledger_info, BTreeMap::new())
2517 }
2518
2519 fn create_sync_request_at_version(
2520 version: Version,
2521 ) -> (SyncRequest, oneshot::Receiver<Result<(), Error>>) {
2522 // Create ledger info with signatures at given version
2523 let ledger_info = create_ledger_info_at_version(version);
2524
2525 // Create sync request with target version and callback
2526 let (callback_sender, callback_receiver) = oneshot::channel();
2527 let sync_request = SyncRequest {
2528 callback: callback_sender,
2529 target: ledger_info,
2530 last_commit_timestamp: SystemTime::now(),
2531 };
2532
2533 (sync_request, callback_receiver)
2534 }
2535
2536 /// Creates a set of chunk requests (one for each type of possible request).
2537 /// The returned request types are: [waypoint, target, highest].
2538 fn create_chunk_requests(
2539 known_version: Version, current_epoch: u64, chunk_limit: u64,
2540 target_version: u64, timeout_ms: u64,
2541 ) -> Vec<StateSyncMessage>
2542 {
2543 // Create a waypoint chunk request
2544 let target = TargetType::Waypoint(target_version);
2545 let waypoint_request = create_chunk_request_message(
2546 known_version,
2547 current_epoch,
2548 chunk_limit,
2549 target,
2550 );
2551
2552 // Create a highest chunk request
2553 let target_li = Some(create_ledger_info_at_version(target_version));
2554 let target = TargetType::HighestAvailable {
2555 target_li,
2556 timeout_ms,
2557 };
2558 let highest_request = create_chunk_request_message(
2559 known_version,
2560 current_epoch,
2561 chunk_limit,
2562 target,
2563 );
2564
2565 // Create a target chunk request
2566 let target_ledger_info = create_ledger_info_at_version(target_version);
2567 let target = TargetType::TargetLedgerInfo(target_ledger_info);
2568 let target_request = create_chunk_request_message(
2569 known_version,
2570 current_epoch,
2571 chunk_limit,
2572 target,
2573 );
2574
2575 vec![waypoint_request, target_request, highest_request]
2576 }
2577
2578 fn create_chunk_request_message(
2579 known_version: Version, current_epoch: u64, chunk_limit: u64,
2580 target: TargetType,
2581 ) -> StateSyncMessage
2582 {
2583 let chunk_request = GetChunkRequest::new(
2584 known_version,
2585 current_epoch,
2586 chunk_limit,
2587 target,
2588 );
2589 StateSyncMessage::GetChunkRequest(Box::new(chunk_request))
2590 }
2591
2592 fn create_dummy_transaction_list_with_proof(
2593 version: Version,
2594 ) -> TransactionListWithProof {
2595 TransactionListWithProof::new(
2596 vec![create_test_transaction()],
2597 None,
2598 Some(version),
2599 TransactionListProof::new_empty(),
2600 )
2601 }
2602
2603 fn create_chunk_response_message(
2604 response_ledger_info: ResponseLedgerInfo,
2605 transaction_list_with_proof: TransactionListWithProof,
2606 ) -> StateSyncMessage
2607 {
2608 let chunk_response = GetChunkResponse::new(
2609 response_ledger_info,
2610 transaction_list_with_proof,
2611 );
2612 StateSyncMessage::GetChunkResponse(Box::new(chunk_response))
2613 }
2614
2615 fn create_empty_chunk_responses(version: Version) -> Vec<StateSyncMessage> {
2616 create_chunk_responses(version, TransactionListWithProof::new_empty())
2617 }
2618
2619 fn create_non_empty_chunk_responses(
2620 version: Version,
2621 ) -> Vec<StateSyncMessage> {
2622 let transaction_list_with_proof =
2623 create_dummy_transaction_list_with_proof(version);
2624 create_chunk_responses(version, transaction_list_with_proof)
2625 }
2626
2627 /// Creates a set of chunk responses (one for each type of possible
2628 /// response). The returned response types are: [waypoint, target,
2629 /// highest].
2630 fn create_chunk_responses(
2631 version: Version, transaction_list_with_proof: TransactionListWithProof,
2632 ) -> Vec<StateSyncMessage> {
2633 let ledger_info_at_version = create_ledger_info_at_version(version);
2634
2635 // Create a waypoint chunk response
2636 let response_ledger_info = ResponseLedgerInfo::LedgerInfoForWaypoint {
2637 waypoint_li: ledger_info_at_version.clone(),
2638 end_of_epoch_li: None,
2639 };
2640 let waypoint_response = create_chunk_response_message(
2641 response_ledger_info,
2642 transaction_list_with_proof.clone(),
2643 );
2644
2645 // Create a highest chunk response
2646 let response_ledger_info = ResponseLedgerInfo::ProgressiveLedgerInfo {
2647 target_li: ledger_info_at_version.clone(),
2648 highest_li: None,
2649 };
2650 let highest_response = create_chunk_response_message(
2651 response_ledger_info,
2652 transaction_list_with_proof.clone(),
2653 );
2654
2655 // Create a target chunk response
2656 let response_ledger_info =
2657 ResponseLedgerInfo::VerifiableLedgerInfo(ledger_info_at_version);
2658 let target_response = create_chunk_response_message(
2659 response_ledger_info,
2660 transaction_list_with_proof,
2661 );
2662
2663 vec![waypoint_response, target_response, highest_response]
2664 }
2665
2666 fn verify_all_chunk_requests_are_invalid(
2667 coordinator: &mut StateSyncCoordinator<ExecutorProxy>,
2668 peer_network_id: &PeerNetworkId, requests: &[StateSyncMessage],
2669 )
2670 {
2671 for request in requests {
2672 let result = block_on(coordinator.process_chunk_message(
2673 peer_network_id.network_id(),
2674 peer_network_id.peer_id(),
2675 request.clone(),
2676 ));
2677 if !matches!(result, Err(Error::InvalidChunkRequest(..))) {
2678 panic!(
2679 "Expected an invalid chunk request, but got: {:?}",
2680 result
2681 );
2682 }
2683 }
2684 }
2685
2686 fn verify_all_chunk_responses_are_invalid(
2687 coordinator: &mut StateSyncCoordinator<ExecutorProxy>,
2688 peer_network_id: &PeerNetworkId, responses: &[StateSyncMessage],
2689 )
2690 {
2691 for response in responses {
2692 let result = block_on(coordinator.process_chunk_message(
2693 peer_network_id.network_id(),
2694 peer_network_id.peer_id(),
2695 response.clone(),
2696 ));
2697 if !matches!(result, Err(Error::ProcessInvalidChunk(..))) {
2698 panic!("Expected invalid chunk error, but got: {:?}", result);
2699 }
2700 }
2701 }
2702
2703 fn verify_all_chunk_responses_are_the_wrong_type(
2704 coordinator: &mut StateSyncCoordinator<ExecutorProxy>,
2705 peer_network_id: &PeerNetworkId, responses: &[StateSyncMessage],
2706 )
2707 {
2708 for response in responses {
2709 let result = block_on(coordinator.process_chunk_message(
2710 peer_network_id.network_id(),
2711 peer_network_id.peer_id(),
2712 response.clone(),
2713 ));
2714 if !matches!(result, Err(Error::ReceivedWrongChunkType(..))) {
2715 panic!("Expected wrong type error, but got: {:?}", result);
2716 }
2717 }
2718 }
2719
2720 fn process_new_peer_event(
2721 coordinator: &mut StateSyncCoordinator<ExecutorProxy>,
2722 peer: &PeerNetworkId,
2723 )
2724 {
2725 let connection_metadata = ConnectionMetadata::mock_with_role_and_origin(
2726 peer.peer_id(),
2727 PeerRole::Validator,
2728 ConnectionOrigin::Outbound,
2729 );
2730 let _ = coordinator
2731 .process_new_peer(peer.network_id(), connection_metadata);
2732 }
2733}
2734*/