cfxcore/pos/state_sync/
coordinator.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8#![allow(unused)]
9use crate::pos::{
10    mempool::{CommitNotification, CommitResponse, CommittedTransaction},
11    state_sync::{
12        chunk_request::{GetChunkRequest, TargetType},
13        chunk_response::ResponseLedgerInfo,
14        client::{CoordinatorMessage, SyncRequest},
15        counters,
16        error::Error,
17        executor_proxy::ExecutorProxyTrait,
18        logging::{LogEntry, LogEvent, LogSchema},
19        shared_components::SyncState,
20    },
21};
22use diem_config::config::{
23    NodeConfig, PeerNetworkId, RoleType, StateSyncConfig,
24};
25use diem_logger::prelude::*;
26use diem_types::{
27    contract_event::ContractEvent,
28    ledger_info::LedgerInfoWithSignatures,
29    transaction::{Transaction, TransactionListWithProof, Version},
30    waypoint::Waypoint,
31};
32use futures::{
33    channel::{mpsc, oneshot},
34    StreamExt,
35};
36use std::{
37    cmp,
38    collections::HashMap,
39    time::{Duration, SystemTime},
40};
41use tokio::time::{interval, timeout};
42use tokio_stream::wrappers::IntervalStream;
43
44#[derive(Clone, Debug, PartialEq, Eq)]
45struct PendingRequestInfo {
46    expiration_time: SystemTime,
47    known_version: u64,
48    request_epoch: u64,
49    target_li: Option<LedgerInfoWithSignatures>,
50    chunk_limit: u64,
51}
52
53/// Coordination of the state sync process is driven by StateSyncCoordinator.
54/// The `start()` function runs an infinite event loop and triggers actions
55/// based on external and internal (local) requests. The coordinator works in
56/// two modes (depending on the role):
57/// * FullNode: infinite stream of ChunkRequests is sent to the predefined
58///   static peers
59/// (the parent is going to reply with a ChunkResponse if its committed version
60/// becomes higher within the timeout interval).
61/// * Validator: the ChunkRequests are generated on demand for a specific target
62///   LedgerInfo to
63/// synchronize to.
64pub(crate) struct StateSyncCoordinator<T> {
65    // used to process client requests
66    client_events: mpsc::UnboundedReceiver<CoordinatorMessage>,
67    // used to send messages (e.g. notifications about newly committed txns) to
68    // mempool
69    state_sync_to_mempool_sender: mpsc::Sender<CommitNotification>,
70    // Current state of the storage, which includes both the latest committed
71    // transaction and the latest transaction covered by the LedgerInfo
72    // (see `SynchronizerState` documentation). The state is updated via
73    // syncing with the local storage.
74    local_state: SyncState,
75    // config
76    config: StateSyncConfig,
77    // role of node
78    role: RoleType,
79    // An initial waypoint: for as long as the local version is less than a
80    // version determined by waypoint a node is not going to be abl
81    waypoint: Waypoint,
82    // Actor for sending chunk requests
83    // Manages to whom and how to send chunk requests
84    //request_manager: RequestManager,
85    // Optional sync request to be called when the target sync is reached
86    sync_request: Option<SyncRequest>,
87    // If we're a full node syncing to the latest state, this holds the highest
88    // ledger info we know about and are currently syncing to. This allows
89    // us to incrementally sync to ledger infos in storage. Higher ledger
90    // infos will only be considered once we sync to this.
91    target_ledger_info: Option<LedgerInfoWithSignatures>,
92    // Option initialization listener to be called when the coordinator is
93    // caught up with its waypoint.
94    initialization_listener: Option<oneshot::Sender<Result<(), Error>>>,
95    // queue of incoming long polling requests
96    // peer will be notified about new chunk of transactions if it's available
97    // before expiry time
98    subscriptions: HashMap<PeerNetworkId, PendingRequestInfo>,
99    executor_proxy: T,
100}
101
102impl<T: ExecutorProxyTrait> StateSyncCoordinator<T> {
103    pub fn new(
104        client_events: mpsc::UnboundedReceiver<CoordinatorMessage>,
105        state_sync_to_mempool_sender: mpsc::Sender<CommitNotification>,
106        /* network_senders: HashMap<NodeNetworkId, StateSyncSender>, */
107        node_config: &NodeConfig, waypoint: Waypoint, executor_proxy: T,
108        initial_state: SyncState,
109    ) -> Result<Self, Error> {
110        diem_info!(LogSchema::event_log(
111            LogEntry::Waypoint,
112            LogEvent::Initialize
113        )
114        .waypoint(waypoint));
115
116        // Create a new request manager.
117        let role = node_config.base.role;
118        let tick_interval_ms = node_config.state_sync.tick_interval_ms;
119        let retry_timeout_val = match role {
120            RoleType::FullNode => tick_interval_ms
121                .checked_add(node_config.state_sync.long_poll_timeout_ms)
122                .ok_or_else(|| {
123                    Error::IntegerOverflow(
124                        "Fullnode retry timeout has overflown!".into(),
125                    )
126                })?,
127            RoleType::Validator => {
128                tick_interval_ms.checked_mul(2).ok_or_else(|| {
129                    Error::IntegerOverflow(
130                        "Validator retry timeout has overflown!".into(),
131                    )
132                })?
133            }
134        };
135        /*
136        let request_manager = RequestManager::new(
137            Duration::from_millis(retry_timeout_val),
138            Duration::from_millis(node_config.state_sync.multicast_timeout_ms),
139            network_senders,
140        );*/
141
142        Ok(Self {
143            client_events,
144            state_sync_to_mempool_sender,
145            local_state: initial_state,
146            config: node_config.state_sync.clone(),
147            role,
148            waypoint,
149            //request_manager,
150            subscriptions: HashMap::new(),
151            sync_request: None,
152            target_ledger_info: None,
153            initialization_listener: None,
154            executor_proxy,
155        })
156    }
157
158    /// main routine. starts sync coordinator that listens for CoordinatorMsg
159    pub async fn start(
160        mut self,
161        /* network_handles: Vec<(NodeNetworkId, StateSyncSender,
162         * StateSyncEvents)>, */
163    ) {
164        diem_info!(LogSchema::new(LogEntry::RuntimeStart));
165        let mut interval = IntervalStream::new(interval(
166            Duration::from_millis(self.config.tick_interval_ms),
167        ))
168        .fuse();
169
170        /*let events: Vec<_> = network_handles
171            .into_iter()
172            .map(|(network_id, _sender, events)| {
173                events.map(move |e| (network_id.clone(), e))
174            })
175            .collect();
176        let mut network_events = select_all(events).fuse();*/
177
178        loop {
179            let _timer = counters::MAIN_LOOP.start_timer();
180            ::futures::select! {
181                msg = self.client_events.select_next_some() => {
182                    match msg {
183                        CoordinatorMessage::SyncRequest(request) => {
184                            let _timer = counters::PROCESS_COORDINATOR_MSG_LATENCY
185                                .with_label_values(&[counters::SYNC_MSG_LABEL])
186                                .start_timer();
187                            /*if let Err(e) = self.process_sync_request(*request) {
188                                diem_error!(LogSchema::new(LogEntry::SyncRequest).error(&e));
189                                counters::SYNC_REQUEST_RESULT.with_label_values(&[counters::FAIL_LABEL]).inc();
190                            }*/
191                        }
192                        CoordinatorMessage::CommitNotification(notification) => {
193                            let _timer = counters::PROCESS_COORDINATOR_MSG_LATENCY
194                                .with_label_values(&[counters::COMMIT_MSG_LABEL])
195                                .start_timer();
196                            if let Err(e) = self.process_commit_notification(notification.committed_transactions, Some(notification.callback), notification.reconfiguration_events, None).await {
197                                counters::CONSENSUS_COMMIT_FAIL_COUNT.inc();
198                                diem_error!(LogSchema::event_log(LogEntry::ConsensusCommit, LogEvent::PostCommitFail).error(&e));
199                            }
200                        }
201                        CoordinatorMessage::GetSyncState(callback) => {
202                            let _ = self.get_sync_state(callback);
203                        }
204                        CoordinatorMessage::WaitForInitialization(cb_sender) => {
205                            if let Err(e) = self.wait_for_initialization(cb_sender) {
206                                diem_error!(LogSchema::new(LogEntry::Waypoint).error(&e));
207                            }
208                        }
209                    };
210                },
211                /*(network_id, event) = network_events.select_next_some() => {
212                    match event {
213                        Event::NewPeer(metadata) => {
214                            if let Err(e) = self.process_new_peer(network_id, metadata) {
215                                diem_error!(LogSchema::new(LogEntry::NewPeer).error(&e));
216                            }
217                        }
218                        Event::LostPeer(metadata) => {
219                            if let Err(e) = self.process_lost_peer(network_id, metadata.remote_peer_id) {
220                                diem_error!(LogSchema::new(LogEntry::LostPeer).error(&e));
221                            }
222                        }
223                        Event::Message(peer_id, message) => {
224                            if let Err(e) = self.process_chunk_message(network_id.clone(), peer_id, message).await {
225                                diem_error!(LogSchema::new(LogEntry::ProcessChunkMessage).error(&e));
226                            }
227                        }
228                        unexpected_event => {
229                            counters::NETWORK_ERROR_COUNT.inc();
230                            diem_warn!(LogSchema::new(LogEntry::NetworkError),
231                            "received unexpected network event: {:?}", unexpected_event);
232                        },
233
234                    }
235                },*/
236                _ = interval.select_next_some() => {
237                    if let Err(e) = self.check_progress() {
238                        diem_error!(LogSchema::event_log(LogEntry::ProgressCheck, LogEvent::Fail).error(&e));
239                    }
240                }
241            }
242        }
243        // diem_error!("Coordinator stops");
244    }
245
246    /*fn process_new_peer(
247        &mut self, network_id: NodeNetworkId, metadata: ConnectionMetadata,
248    ) -> Result<(), Error> {
249        let peer = PeerNetworkId(network_id, metadata.remote_peer_id);
250        self.request_manager.enable_peer(peer, metadata)?;
251        self.check_progress()
252    }*/
253
254    /*fn process_lost_peer(
255        &mut self, network_id: NodeNetworkId, peer_id: PeerId,
256    ) -> Result<(), Error> {
257        let peer = PeerNetworkId(network_id, peer_id);
258        self.request_manager.disable_peer(&peer)
259    }*/
260
261    /*pub(crate) async fn process_chunk_message(
262        &mut self, network_id: NodeNetworkId, peer_id: PeerId,
263        msg: StateSyncMessage,
264    ) -> Result<(), Error>
265    {
266        let peer = PeerNetworkId(network_id, peer_id);
267        match msg {
268            StateSyncMessage::GetChunkRequest(request) => {
269                // Time request handling
270                let _timer = counters::PROCESS_MSG_LATENCY
271                    .with_label_values(&[
272                        &peer.raw_network_id().to_string(),
273                        &peer.peer_id().to_string(),
274                        counters::CHUNK_REQUEST_MSG_LABEL,
275                    ])
276                    .start_timer();
277
278                // Process chunk request
279                let process_result =
280                    self.process_chunk_request(peer.clone(), *request.clone());
281                if let Err(ref error) = process_result {
282                    diem_error!(LogSchema::event_log(
283                        LogEntry::ProcessChunkRequest,
284                        LogEvent::Fail
285                    )
286                    .peer(&peer)
287                    .error(&error.clone())
288                    .local_li_version(self.local_state.committed_version())
289                    .chunk_request(*request));
290                    counters::PROCESS_CHUNK_REQUEST_COUNT
291                        .with_label_values(&[
292                            &peer.raw_network_id().to_string(),
293                            &peer.peer_id().to_string(),
294                            counters::FAIL_LABEL,
295                        ])
296                        .inc();
297                } else {
298                    counters::PROCESS_CHUNK_REQUEST_COUNT
299                        .with_label_values(&[
300                            &peer.raw_network_id().to_string(),
301                            &peer.peer_id().to_string(),
302                            counters::SUCCESS_LABEL,
303                        ])
304                        .inc();
305                }
306                process_result
307            }
308            StateSyncMessage::GetChunkResponse(response) => {
309                // Time response handling
310                let _timer = counters::PROCESS_MSG_LATENCY
311                    .with_label_values(&[
312                        &peer.raw_network_id().to_string(),
313                        &peer.peer_id().to_string(),
314                        counters::CHUNK_RESPONSE_MSG_LABEL,
315                    ])
316                    .start_timer();
317
318                // Process chunk response
319                self.process_chunk_response(&peer, *response).await
320            }
321        }
322    }*/
323
324    /// Sync up coordinator state with the local storage
325    /// and updates the pending ledger info accordingly
326    fn sync_state_with_local_storage(&mut self) -> Result<(), Error> {
327        let new_state =
328            self.executor_proxy.get_local_storage_state().map_err(|e| {
329                counters::STORAGE_READ_FAIL_COUNT.inc();
330                e
331            })?;
332        if new_state.trusted_epoch() > self.local_state.trusted_epoch() {
333            diem_info!(LogSchema::new(LogEntry::EpochChange)
334                .old_epoch(self.local_state.trusted_epoch())
335                .new_epoch(new_state.trusted_epoch()));
336        }
337        self.local_state = new_state;
338        Ok(())
339    }
340
341    /// Verify that the local state's latest LI version (i.e. committed version)
342    /// has reached the waypoint version.
343    fn is_initialized(&self) -> bool {
344        self.waypoint.version() <= self.local_state.committed_version()
345    }
346
347    fn wait_for_initialization(
348        &mut self, cb_sender: oneshot::Sender<Result<(), Error>>,
349    ) -> Result<(), Error> {
350        if self.is_initialized() {
351            Self::send_initialization_callback(cb_sender)?;
352        } else {
353            self.initialization_listener = Some(cb_sender);
354        }
355
356        Ok(())
357    }
358
359    /*/// This method requests state sync to sync to the target specified by the
360    /// SyncRequest. If there is an existing sync request it will be
361    /// overridden. Note: when processing a sync request, state sync assumes
362    /// that it's the only one modifying storage, i.e., consensus is not
363    /// trying to commit transactions concurrently.
364    fn process_sync_request(
365        &mut self, request: SyncRequest,
366    ) -> Result<(), Error> {
367        fail_point!("state_sync::process_sync_request_message", |_| {
368            Err(crate::error::Error::UnexpectedError(
369                "Injected error in process_sync_request_message".into(),
370            ))
371        });
372
373        // Full nodes don't support sync requests
374        if self.role == RoleType::FullNode {
375            return Err(Error::FullNodeSyncRequest);
376        }
377
378        let local_li_version = self.local_state.committed_version();
379        let target_version = request.target.ledger_info().version();
380        diem_info!(LogSchema::event_log(
381            LogEntry::SyncRequest,
382            LogEvent::Received
383        )
384        .target_version(target_version)
385        .local_li_version(local_li_version));
386
387        self.sync_state_with_local_storage()?;
388        if !self.is_initialized() {
389            return Err(Error::UninitializedError(
390                "Unable to process sync request message!".into(),
391            ));
392        }
393
394        if target_version == local_li_version {
395            return Self::send_sync_req_callback(request, Ok(()));
396        }
397        if target_version < local_li_version {
398            Self::send_sync_req_callback(
399                request,
400                Err(Error::UnexpectedError(
401                    "Sync request to old version".into(),
402                )),
403            )?;
404            return Err(Error::OldSyncRequestVersion(
405                target_version,
406                local_li_version,
407            ));
408        }
409
410        // Save the new sync request
411        self.sync_request = Some(request);
412
413        // Send a chunk request for the sync target
414        let known_version = self.local_state.synced_version();
415        self.send_chunk_request_with_target(
416            known_version,
417            self.local_state.trusted_epoch(),
418            self.create_sync_request_chunk_target(known_version)?,
419        )
420    }*/
421
422    /// Notifies consensus of the given commit response.
423    /// Note: if a callback is not specified, the response isn't sent anywhere.
424    fn notify_consensus_of_commit_response(
425        &self, commit_response: CommitResponse,
426        callback: Option<oneshot::Sender<Result<CommitResponse, Error>>>,
427    ) -> Result<(), Error> {
428        if let Some(callback) = callback {
429            if let Err(error) = callback.send(Ok(commit_response)) {
430                counters::COMMIT_FLOW_FAIL
431                    .with_label_values(&[counters::CONSENSUS_LABEL])
432                    .inc();
433                return Err(Error::CallbackSendFailed(format!(
434                    "Failed to send commit ACK to consensus!: {:?}",
435                    error
436                )));
437            }
438        }
439        Ok(())
440    }
441
442    /// This method updates state sync to process new transactions that have
443    /// been committed to storage (e.g., through consensus or through a
444    /// chunk response). When notified about a new commit we should: (i)
445    /// respond to relevant long poll requests; (ii) update local sync and
446    /// initialization requests (where appropriate); and (iii) publish
447    /// on chain config updates.
448    async fn process_commit_notification(
449        &mut self, committed_transactions: Vec<Transaction>,
450        commit_callback: Option<oneshot::Sender<Result<CommitResponse, Error>>>,
451        reconfiguration_events: Vec<ContractEvent>,
452        chunk_sender: Option<&PeerNetworkId>,
453    ) -> Result<(), Error> {
454        diem_debug!(
455            "process_commit_notification: {} events",
456            reconfiguration_events.len()
457        );
458        // We choose to re-sync the state with the storage as it's the simplest
459        // approach: in case the performance implications of re-syncing
460        // upon every commit are high, it's possible to manage some of
461        // the highest known versions in memory.
462        self.sync_state_with_local_storage()?;
463        self.update_sync_state_metrics_and_logs()?;
464
465        // Notify mempool of commit
466        let commit_response = match self
467            .notify_mempool_of_committed_transactions(committed_transactions)
468            .await
469        {
470            Ok(()) => CommitResponse::success(),
471            Err(error) => {
472                diem_error!(LogSchema::new(LogEntry::CommitFlow).error(&error));
473                CommitResponse::error(error.to_string())
474            }
475        };
476
477        // Notify consensus of the commit response
478        if let Err(error) = self.notify_consensus_of_commit_response(
479            commit_response,
480            commit_callback,
481        ) {
482            diem_error!(LogSchema::new(LogEntry::CommitFlow).error(&error),);
483        }
484
485        // Check long poll subscriptions, update peer requests and sync request
486        // last progress timestamp.
487        self.check_subscriptions();
488        let synced_version = self.local_state.synced_version();
489        /*self.request_manager.remove_requests(synced_version);
490        if let Some(peer) = chunk_sender {
491            self.request_manager.process_success_response(peer);
492        }*/
493        if let Some(mut req) = self.sync_request.as_mut() {
494            req.last_commit_timestamp = SystemTime::now();
495        }
496
497        // Check if we're now initialized or if we hit the sync request target
498        self.check_initialized_or_sync_request_completed(synced_version)?;
499
500        // Publish the on chain config updates
501        if let Err(error) = self
502            .executor_proxy
503            .publish_on_chain_config_updates(reconfiguration_events)
504        {
505            counters::RECONFIG_PUBLISH_COUNT
506                .with_label_values(&[counters::FAIL_LABEL])
507                .inc();
508            diem_error!(LogSchema::event_log(
509                LogEntry::Reconfig,
510                LogEvent::Fail
511            )
512            .error(&error));
513        }
514
515        Ok(())
516    }
517
518    /// Checks if we are now at the initialization point (i.e., the waypoint),
519    /// or at the version specified by a sync request made by consensus.
520    fn check_initialized_or_sync_request_completed(
521        &mut self, synced_version: u64,
522    ) -> Result<(), Error> {
523        let committed_version = self.local_state.committed_version();
524        let local_epoch = self.local_state.trusted_epoch();
525
526        // Check if we're now initialized
527        if self.is_initialized() {
528            if let Some(initialization_listener) =
529                self.initialization_listener.take()
530            {
531                diem_info!(LogSchema::event_log(
532                    LogEntry::Waypoint,
533                    LogEvent::Complete
534                )
535                .local_li_version(committed_version)
536                .local_synced_version(synced_version)
537                .local_epoch(local_epoch));
538                Self::send_initialization_callback(initialization_listener)?;
539            }
540        }
541
542        // Check if we're now at the sync request target
543        if let Some(sync_request) = self.sync_request.as_ref() {
544            let sync_target_version =
545                sync_request.target.ledger_info().version();
546            if synced_version > sync_target_version {
547                return Err(Error::SyncedBeyondTarget(
548                    synced_version,
549                    sync_target_version,
550                ));
551            }
552            if synced_version == sync_target_version {
553                diem_info!(LogSchema::event_log(
554                    LogEntry::SyncRequest,
555                    LogEvent::Complete
556                )
557                .local_li_version(committed_version)
558                .local_synced_version(synced_version)
559                .local_epoch(local_epoch));
560                counters::SYNC_REQUEST_RESULT
561                    .with_label_values(&[counters::COMPLETE_LABEL])
562                    .inc();
563                if let Some(sync_request) = self.sync_request.take() {
564                    Self::send_sync_req_callback(sync_request, Ok(()))?;
565                }
566            }
567        }
568
569        Ok(())
570    }
571
572    /// Notifies mempool that transactions have been committed.
573    async fn notify_mempool_of_committed_transactions(
574        &mut self, committed_transactions: Vec<Transaction>,
575    ) -> Result<(), Error> {
576        // Get all user transactions from committed transactions
577        let user_transactions = committed_transactions
578            .iter()
579            .filter_map(|transaction| match transaction {
580                Transaction::UserTransaction(signed_txn) => {
581                    Some(CommittedTransaction {
582                        sender: signed_txn.sender(),
583                        hash: signed_txn.hash(),
584                    })
585                }
586                _ => None,
587            })
588            .collect();
589
590        // Create commit notification of user transactions for mempool
591        let (callback_sender, callback_receiver) = oneshot::channel();
592        let req = CommitNotification {
593            transactions: user_transactions,
594            block_timestamp_usecs: self
595                .local_state
596                .committed_ledger_info()
597                .ledger_info()
598                .timestamp_usecs(),
599            callback: callback_sender,
600        };
601
602        // Notify mempool of committed transactions
603        if let Err(error) = self.state_sync_to_mempool_sender.try_send(req) {
604            counters::COMMIT_FLOW_FAIL
605                .with_label_values(&[counters::TO_MEMPOOL_LABEL])
606                .inc();
607            Err(Error::CallbackSendFailed(format!(
608                "Failed to notify mempool of committed transactions! Error: {:?}",
609                error
610            )))
611        } else if let Err(error) = timeout(
612            Duration::from_millis(self.config.mempool_commit_timeout_ms),
613            callback_receiver,
614        )
615        .await
616        {
617            counters::COMMIT_FLOW_FAIL
618                .with_label_values(&[counters::FROM_MEMPOOL_LABEL])
619                .inc();
620            Err(Error::CallbackSendFailed(format!(
621                "Did not receive ACK for commit notification from mempool! Error: {:?}",
622                error
623            )))
624        } else {
625            Ok(())
626        }
627    }
628
629    /// Updates the metrics and logs based on the current (local) sync state.
630    fn update_sync_state_metrics_and_logs(&mut self) -> Result<(), Error> {
631        // Get data from local sync state
632        let synced_version = self.local_state.synced_version();
633        let committed_version = self.local_state.committed_version();
634        let local_epoch = self.local_state.trusted_epoch();
635
636        // Update versions
637        counters::set_version(counters::VersionType::Synced, synced_version);
638        counters::set_version(
639            counters::VersionType::Committed,
640            committed_version,
641        );
642        counters::EPOCH.set(local_epoch as i64);
643
644        // Update timestamps
645        counters::set_timestamp(
646            counters::TimestampType::Synced,
647            self.executor_proxy.get_version_timestamp(synced_version)?,
648        );
649        counters::set_timestamp(
650            counters::TimestampType::Committed,
651            self.executor_proxy
652                .get_version_timestamp(committed_version)?,
653        );
654        counters::set_timestamp(
655            counters::TimestampType::Real,
656            diem_infallible::duration_since_epoch().as_micros() as u64,
657        );
658
659        diem_debug!(LogSchema::new(LogEntry::LocalState)
660            .local_li_version(committed_version)
661            .local_synced_version(synced_version)
662            .local_epoch(local_epoch));
663        Ok(())
664    }
665
666    /// Returns the current SyncState of state sync.
667    /// Note: this is only used for testing and should be removed once
668    /// integration/e2e tests are updated to not rely on this.
669    fn get_sync_state(
670        &mut self, callback: oneshot::Sender<SyncState>,
671    ) -> Result<(), Error> {
672        self.sync_state_with_local_storage()?;
673        match callback.send(self.local_state.clone()) {
674            Err(error) => Err(Error::CallbackSendFailed(format!(
675                "Failed to get sync state! Error: {:?}",
676                error
677            ))),
678            _ => Ok(()),
679        }
680    }
681
682    /*/// There are two types of ChunkRequests:
683    /// 1) Validator chunk requests are for a specific target LI and don't ask
684    /// for long polling. 2) FullNode chunk requests don't specify a target
685    /// LI and can allow long polling.
686    fn process_chunk_request(
687        &mut self, peer: PeerNetworkId, request: GetChunkRequest,
688    ) -> Result<(), Error> {
689        diem_debug!(LogSchema::event_log(
690            LogEntry::ProcessChunkRequest,
691            LogEvent::Received
692        )
693        .peer(&peer)
694        .chunk_request(request.clone())
695        .local_li_version(self.local_state.committed_version()));
696        fail_point!("state_sync::process_chunk_request", |_| {
697            Err(crate::error::Error::UnexpectedError(
698                "Injected error in process_chunk_request".into(),
699            ))
700        });
701        self.sync_state_with_local_storage()?;
702
703        // Verify the chunk request is valid before trying to process it. If
704        // it's invalid, penalize the peer's score.
705        if let Err(error) = self.verify_chunk_request_is_valid(&request) {
706            self.request_manager.process_invalid_chunk_request(&peer);
707            return Err(error);
708        }
709
710        match request.target.clone() {
711            TargetType::TargetLedgerInfo(li) => self
712                .process_request_for_target_and_highest(
713                    peer,
714                    request,
715                    Some(li),
716                    None,
717                ),
718            TargetType::HighestAvailable {
719                target_li,
720                timeout_ms,
721            } => self.process_request_for_target_and_highest(
722                peer,
723                request,
724                target_li,
725                Some(timeout_ms),
726            ),
727            TargetType::Waypoint(waypoint_version) => self
728                .process_request_for_waypoint(peer, request, waypoint_version),
729        }
730    }*/
731
732    /*fn verify_chunk_request_is_valid(
733        &mut self, request: &GetChunkRequest,
734    ) -> Result<(), Error> {
735        // Ensure request versions are correctly formed
736        if let Some(target_version) = request.target.version() {
737            if target_version < request.known_version {
738                return Err(Error::InvalidChunkRequest(
739                    "Target version is less than known version! Discarding request.".into(),
740                ));
741            }
742        }
743
744        // Ensure request epochs are correctly formed
745        if let Some(target_epoch) = request.target.epoch() {
746            if target_epoch < request.current_epoch {
747                return Err(Error::InvalidChunkRequest(
748                    "Target epoch is less than current epoch! Discarding request.".into(),
749                ));
750            }
751        }
752
753        // Ensure the chunk limit is not zero
754        if request.limit == 0 {
755            return Err(Error::InvalidChunkRequest(
756                "Chunk request limit is 0. Discarding request.".into(),
757            ));
758        }
759
760        // Ensure the timeout is not zero
761        if let TargetType::HighestAvailable {
762            target_li: _,
763            timeout_ms,
764        } = request.target.clone()
765        {
766            if timeout_ms == 0 {
767                return Err(Error::InvalidChunkRequest(
768                    "Long poll timeout is 0. Discarding request.".into(),
769                ));
770            }
771        }
772
773        Ok(())
774    }*/
775
776    /*/// Processing requests with no target LedgerInfo (highest available) and
777    /// potentially long polling.
778    /// Assumes that the local state is uptodate with storage.
779    fn process_request_for_target_and_highest(
780        &mut self, peer: PeerNetworkId, request: GetChunkRequest,
781        target_li: Option<LedgerInfoWithSignatures>, timeout_ms: Option<u64>,
782    ) -> Result<(), Error>
783    {
784        let chunk_limit =
785            std::cmp::min(request.limit, self.config.max_chunk_limit);
786        let timeout = if let Some(timeout_ms) = timeout_ms {
787            std::cmp::min(timeout_ms, self.config.max_timeout_ms)
788        } else {
789            self.config.max_timeout_ms
790        };
791
792        // If the node cannot respond to the request now (i.e., it's not
793        // up-to-date with the requestor) add the request to the
794        // subscriptions to be handled when this node catches up.
795        let local_version = self.local_state.committed_version();
796        if local_version <= request.known_version {
797            let expiration_time =
798                SystemTime::now().checked_add(Duration::from_millis(timeout));
799            if let Some(time) = expiration_time {
800                let request_info = PendingRequestInfo {
801                    expiration_time: time,
802                    known_version: request.known_version,
803                    request_epoch: request.current_epoch,
804                    target_li,
805                    chunk_limit,
806                };
807                self.subscriptions.insert(peer, request_info);
808            }
809            return Ok(());
810        }
811
812        let (target_li, highest_li) = self.calculate_target_and_highest_li(
813            request.current_epoch,
814            target_li,
815            local_version,
816        )?;
817
818        self.deliver_chunk(
819            peer,
820            request.known_version,
821            ResponseLedgerInfo::ProgressiveLedgerInfo {
822                target_li,
823                highest_li,
824            },
825            chunk_limit,
826        )
827    }*/
828
829    fn calculate_target_and_highest_li(
830        &mut self, request_epoch: u64,
831        request_target_li: Option<LedgerInfoWithSignatures>,
832        local_version: u64,
833    ) -> Result<
834        (LedgerInfoWithSignatures, Option<LedgerInfoWithSignatures>),
835        Error,
836    > {
837        // If the request's epoch is in the past, `target_li` will be set to the
838        // end-of-epoch LI for that epoch
839        let target_li =
840            self.choose_response_li(request_epoch, request_target_li)?;
841
842        let highest_li = if target_li.ledger_info().version() < local_version
843            && target_li.ledger_info().epoch()
844                == self.local_state.trusted_epoch()
845        {
846            // Only populate highest_li field if it's in the past, and the same
847            // epoch. Recipient won't be able to verify ledger info
848            // if it's in a different epoch.
849            Some(self.local_state.committed_ledger_info())
850        } else {
851            None
852        };
853
854        Ok((target_li, highest_li))
855    }
856
857    fn process_request_for_waypoint(
858        &mut self, peer: PeerNetworkId, request: GetChunkRequest,
859        waypoint_version: Version,
860    ) -> Result<(), Error> {
861        let mut limit =
862            std::cmp::min(request.limit, self.config.max_chunk_limit);
863        if self.local_state.committed_version() < waypoint_version {
864            return Err(Error::UnexpectedError(format!(
865                "Local version {} < requested waypoint version {}.",
866                self.local_state.committed_version(),
867                waypoint_version
868            )));
869        }
870        if request.known_version >= waypoint_version {
871            return Err(Error::UnexpectedError(format!(
872                "Waypoint request version {} is not smaller than waypoint {}",
873                request.known_version, waypoint_version
874            )));
875        }
876
877        // Retrieve the waypoint LI.
878        let waypoint_li = self
879            .executor_proxy
880            .get_epoch_ending_ledger_info(waypoint_version)?;
881
882        // Txns are up to the end of request epoch with the proofs relative to
883        // the waypoint LI.
884        let end_of_epoch_li = if waypoint_li.ledger_info().epoch()
885            > request.current_epoch
886        {
887            let end_of_epoch_li = self
888                .executor_proxy
889                .get_epoch_change_ledger_info(request.current_epoch)?;
890            if end_of_epoch_li.ledger_info().version() < request.known_version {
891                return Err(Error::UnexpectedError(format!("Waypoint request's current_epoch (epoch {}, version {}) < waypoint request's known_version {}",
892                                                          end_of_epoch_li.ledger_info().epoch(),
893                                                          end_of_epoch_li.ledger_info().version(),
894                                                          request.known_version,)));
895            }
896            let num_txns_until_end_of_epoch =
897                end_of_epoch_li.ledger_info().version() - request.known_version;
898            limit = std::cmp::min(limit, num_txns_until_end_of_epoch);
899            Some(end_of_epoch_li)
900        } else {
901            None
902        };
903
904        self.deliver_chunk(
905            peer,
906            request.known_version,
907            ResponseLedgerInfo::LedgerInfoForWaypoint {
908                waypoint_li,
909                end_of_epoch_li,
910            },
911            limit,
912        )
913    }
914
915    /// Generate and send the ChunkResponse to the given peer.
916    /// The chunk response contains transactions from the local storage with the
917    /// proofs relative to the given target ledger info.
918    /// In case target is None, the ledger info is set to the local highest
919    /// ledger info.
920    fn deliver_chunk(
921        &mut self, peer: PeerNetworkId, known_version: u64,
922        response_li: ResponseLedgerInfo, limit: u64,
923    ) -> Result<(), Error> {
924        /*
925        let txns = self.executor_proxy.get_chunk(
926            known_version,
927            limit,
928            response_li.version(),
929        )?;
930        let chunk_response = GetChunkResponse::new(response_li, txns);
931        let log = LogSchema::event_log(
932            LogEntry::ProcessChunkRequest,
933            LogEvent::DeliverChunk,
934        )
935        .chunk_response(chunk_response.clone())
936        .peer(&peer);
937        let msg = StateSyncMessage::GetChunkResponse(Box::new(chunk_response));
938        let send_result = self.request_manager.send_chunk_response(&peer, msg);
939        let send_result_label = if send_result.is_err() {
940            counters::SEND_FAIL_LABEL
941        } else {
942            diem_debug!(log);
943            counters::SEND_SUCCESS_LABEL
944        };
945        counters::RESPONSES_SENT
946            .with_label_values(&[
947                &peer.raw_network_id().to_string(),
948                &peer.peer_id().to_string(),
949                send_result_label,
950            ])
951            .inc();
952
953        send_result.map_err(|e| {
954            diem_error!(log.error(&e));
955            Error::UnexpectedError(format!(
956                "Network error in sending chunk response to {}",
957                peer
958            ))
959        })*/
960        Ok(())
961    }
962
963    /// The choice of the LedgerInfo in the response follows the following
964    /// logic:
965    /// * response LI is either the requested target or the highest local LI if
966    ///   target is None.
967    /// * if the response LI would not belong to `request_epoch`, change
968    /// the response LI to the LI that is terminating `request_epoch`.
969    fn choose_response_li(
970        &self, request_epoch: u64, target: Option<LedgerInfoWithSignatures>,
971    ) -> Result<LedgerInfoWithSignatures, Error> {
972        let mut target_li =
973            target.unwrap_or_else(|| self.local_state.committed_ledger_info());
974        let target_epoch = target_li.ledger_info().epoch();
975        if target_epoch > request_epoch {
976            let end_of_epoch_li = self
977                .executor_proxy
978                .get_epoch_change_ledger_info(request_epoch)?;
979            diem_debug!(LogSchema::event_log(
980                LogEntry::ProcessChunkRequest,
981                LogEvent::PastEpochRequested
982            )
983            .old_epoch(request_epoch)
984            .new_epoch(target_epoch));
985            target_li = end_of_epoch_li;
986        }
987        Ok(target_li)
988    }
989
990    /*/// Applies (i.e., executes and stores) the chunk to storage iff `response`
991    /// is valid.
992    fn apply_chunk(
993        &mut self, peer: &PeerNetworkId, response: GetChunkResponse,
994    ) -> Result<(), Error> {
995        diem_debug!(LogSchema::event_log(
996            LogEntry::ProcessChunkResponse,
997            LogEvent::Received
998        )
999        .chunk_response(response.clone())
1000        .peer(peer));
1001        fail_point!("state_sync::apply_chunk", |_| {
1002            Err(crate::error::Error::UnexpectedError(
1003                "Injected error in apply_chunk".into(),
1004            ))
1005        });
1006
1007        // Process the chunk based on the response type
1008        let txn_list_with_proof = response.txn_list_with_proof.clone();
1009        let chunk_size = response.txn_list_with_proof.len() as u64;
1010        let known_version = self.local_state.synced_version();
1011        match response.response_li {
1012            ResponseLedgerInfo::VerifiableLedgerInfo(li) => self
1013                .process_response_with_target_and_highest(
1014                    txn_list_with_proof,
1015                    li,
1016                    None,
1017                ),
1018            ResponseLedgerInfo::ProgressiveLedgerInfo {
1019                target_li,
1020                highest_li,
1021            } => {
1022                let highest_li =
1023                    highest_li.unwrap_or_else(|| target_li.clone());
1024                self.process_response_with_target_and_highest(
1025                    txn_list_with_proof,
1026                    target_li,
1027                    Some(highest_li),
1028                )
1029            }
1030            ResponseLedgerInfo::LedgerInfoForWaypoint {
1031                waypoint_li,
1032                end_of_epoch_li,
1033            } => self.process_response_with_waypoint_li(
1034                txn_list_with_proof,
1035                waypoint_li,
1036                end_of_epoch_li,
1037            ),
1038        }
1039        .map_err(|error| {
1040            //self.request_manager.process_invalid_chunk(&peer);
1041            Error::ProcessInvalidChunk(error.to_string())
1042        })?;
1043
1044        // Update counters and logs with processed chunk information
1045        counters::STATE_SYNC_CHUNK_SIZE
1046            .with_label_values(&[
1047                &peer.raw_network_id().to_string(),
1048                &peer.peer_id().to_string(),
1049            ])
1050            .observe(chunk_size as f64);
1051        let new_version =
1052            known_version.checked_add(chunk_size).ok_or_else(|| {
1053                Error::IntegerOverflow("New version has overflown!".into())
1054            })?;
1055        diem_debug!(
1056            LogSchema::event_log(
1057                LogEntry::ProcessChunkResponse,
1058                LogEvent::ApplyChunkSuccess
1059            ),
1060            "Applied chunk of size {}. Previous version: {}, new version {}",
1061            chunk_size,
1062            known_version,
1063            new_version
1064        );
1065
1066        // Log the request processing time (time from first requested until
1067        // now).
1068        match self.request_manager.get_first_request_time(known_version) {
1069            None => {
1070                diem_info!(
1071                    LogSchema::event_log(LogEntry::ProcessChunkResponse, LogEvent::ReceivedChunkWithoutRequest),
1072                    "Received a chunk of size {}, without making a request! Previous version: {}, new version {}",
1073                    chunk_size,
1074                    known_version,
1075                    new_version
1076                );
1077            }
1078            Some(first_request_time) => {
1079                if let Ok(duration) =
1080                    SystemTime::now().duration_since(first_request_time)
1081                {
1082                    counters::SYNC_PROGRESS_DURATION.observe_duration(duration);
1083                }
1084            }
1085        }
1086    }*/
1087
1088    /*/// * Verifies, processes and stores the chunk in the given response.
1089    /// * Triggers post-commit actions based on new local state (after
1090    ///   successfully processing a chunk).
1091    async fn process_chunk_response(
1092        &mut self, peer: &PeerNetworkId, response: GetChunkResponse,
1093    ) -> Result<(), Error> {
1094        // Ensure consensus isn't running, otherwise we might get a race with
1095        // storage writes.
1096        if self.is_consensus_executing() {
1097            let error = Error::ConsensusIsExecuting;
1098            diem_error!(LogSchema::new(LogEntry::ProcessChunkResponse,)
1099                .peer(peer)
1100                .error(&error));
1101            return Err(error);
1102        }
1103
1104        // Verify the chunk response is well formed before trying to process it.
1105        self.verify_chunk_response_is_valid(&peer, &response)?;
1106
1107        // Validate the response and store the chunk if possible.
1108        // Any errors thrown here should be for detecting bad chunks.
1109        match self.apply_chunk(peer, response.clone()) {
1110            Ok(()) => {
1111                counters::APPLY_CHUNK_COUNT
1112                    .with_label_values(&[
1113                        &peer.raw_network_id().to_string(),
1114                        &peer.peer_id().to_string(),
1115                        counters::SUCCESS_LABEL,
1116                    ])
1117                    .inc();
1118            }
1119            Err(error) => {
1120                diem_error!(LogSchema::event_log(
1121                    LogEntry::ProcessChunkResponse,
1122                    LogEvent::ApplyChunkFail
1123                )
1124                .peer(peer)
1125                .error(&error));
1126                counters::APPLY_CHUNK_COUNT
1127                    .with_label_values(&[
1128                        &peer.raw_network_id().to_string(),
1129                        &peer.peer_id().to_string(),
1130                        counters::FAIL_LABEL,
1131                    ])
1132                    .inc();
1133                return Err(error);
1134            }
1135        }
1136
1137        // Process the newly committed chunk
1138        self.process_commit_notification(
1139            response.txn_list_with_proof.transactions.clone(),
1140            None,
1141            vec![],
1142            Some(peer),
1143        )
1144        .await
1145        .map_err(|error| {
1146            diem_error!(LogSchema::event_log(
1147                LogEntry::ProcessChunkResponse,
1148                LogEvent::PostCommitFail
1149            )
1150            .peer(peer)
1151            .error(&error));
1152            error
1153        })
1154    }*/
1155
1156    /*fn verify_chunk_response_is_valid(
1157        &mut self, peer: &PeerNetworkId, response: &GetChunkResponse,
1158    ) -> Result<(), Error> {
1159        // Verify response comes from known peer
1160        if !self.request_manager.is_known_state_sync_peer(peer) {
1161            counters::RESPONSE_FROM_DOWNSTREAM_COUNT
1162                .with_label_values(&[
1163                    &peer.raw_network_id().to_string(),
1164                    &peer.peer_id().to_string(),
1165                ])
1166                .inc();
1167            self.request_manager.process_chunk_from_downstream(&peer);
1168            return Err(Error::ReceivedChunkFromDownstream(peer.to_string()));
1169        }
1170
1171        // Verify the chunk is not empty and that it starts at the correct
1172        // version
1173        if let Some(first_chunk_version) =
1174            response.txn_list_with_proof.first_transaction_version
1175        {
1176            let known_version = self.local_state.synced_version();
1177            let expected_version =
1178                known_version.checked_add(1).ok_or_else(|| {
1179                    Error::IntegerOverflow(
1180                        "Expected version has overflown!".into(),
1181                    )
1182                })?;
1183
1184            if first_chunk_version != expected_version {
1185                self.request_manager.process_chunk_version_mismatch(
1186                    &peer,
1187                    first_chunk_version,
1188                    known_version,
1189                )?;
1190            }
1191        } else {
1192            // The chunk is empty
1193            self.request_manager.process_empty_chunk(&peer);
1194            return Err(Error::ReceivedEmptyChunk(peer.to_string()));
1195        }
1196
1197        // Verify the chunk has the expected type for the current syncing mode
1198        match &response.response_li {
1199            ResponseLedgerInfo::LedgerInfoForWaypoint {
1200                waypoint_li,
1201                end_of_epoch_li,
1202            } => self
1203                .verify_response_with_waypoint_li(waypoint_li, end_of_epoch_li),
1204            ResponseLedgerInfo::VerifiableLedgerInfo(response_li) => {
1205                self.verify_response_with_target_and_highest(response_li, &None)
1206            }
1207            ResponseLedgerInfo::ProgressiveLedgerInfo {
1208                target_li,
1209                highest_li,
1210            } => self
1211                .verify_response_with_target_and_highest(target_li, highest_li),
1212        }
1213    }*/
1214
1215    fn verify_response_with_target_and_highest(
1216        &mut self, target_li: &LedgerInfoWithSignatures,
1217        highest_li: &Option<LedgerInfoWithSignatures>,
1218    ) -> Result<(), Error> {
1219        if !self.is_initialized() {
1220            return Err(Error::ReceivedWrongChunkType(
1221                "Received a progressive ledger info, but we're not initialized!".into(),
1222            ));
1223        }
1224
1225        // If we're syncing to a specific target for consensus, valid responses
1226        // should not exceed the ledger info version of the sync request.
1227        if let Some(sync_request) = self.sync_request.as_ref() {
1228            let sync_request_version =
1229                sync_request.target.ledger_info().version();
1230            let response_version = target_li.ledger_info().version();
1231            if sync_request_version < response_version {
1232                let error_message = format!("Verifiable ledger info version is higher than the sync target. Received: {}, requested: {}.",
1233                                            response_version,
1234                                            sync_request_version);
1235                return Err(Error::ProcessInvalidChunk(error_message));
1236            }
1237        }
1238
1239        // Valid responses should not have a highest ledger info less than
1240        // target
1241        if let Some(highest_li) = highest_li {
1242            let target_version = target_li.ledger_info().version();
1243            let highest_version = highest_li.ledger_info().version();
1244            if target_version > highest_version {
1245                let error_message = format!("Progressive ledger info has target version > highest version. Target: {}, highest: {}.",
1246                                            target_version,
1247                                            highest_version);
1248                return Err(Error::ProcessInvalidChunk(error_message));
1249            }
1250        }
1251
1252        Ok(())
1253    }
1254
1255    fn verify_response_with_waypoint_li(
1256        &mut self, waypoint_li: &LedgerInfoWithSignatures,
1257        end_of_epoch_li: &Option<LedgerInfoWithSignatures>,
1258    ) -> Result<(), Error> {
1259        if self.is_initialized() || self.sync_request.is_some() {
1260            return Err(Error::ReceivedWrongChunkType(
1261                "Received a waypoint ledger info, but we're already initialized!".into(),
1262            ));
1263        }
1264
1265        // Valid waypoint responses should not have an end_of_epoch_li version >
1266        // waypoint_li
1267        if let Some(end_of_epoch_li) = end_of_epoch_li {
1268            let end_of_epoch_version = end_of_epoch_li.ledger_info().version();
1269            let waypoint_version = waypoint_li.ledger_info().version();
1270            if end_of_epoch_version > waypoint_version {
1271                let error_message = format!("Waypoint ledger info version is less than the end_of_epoch_li version. Waypoint: {}, end_of_epoch_li: {}.",
1272                                            waypoint_version,
1273                                            end_of_epoch_version);
1274                return Err(Error::ProcessInvalidChunk(error_message));
1275            }
1276        }
1277
1278        Ok(())
1279    }
1280
1281    /// Logs the highest seen ledger info version based on the current syncing
1282    /// mode.
1283    fn log_highest_seen_version(
1284        &self, new_highest_li: Option<LedgerInfoWithSignatures>,
1285    ) {
1286        let current_highest_version = if !self.is_initialized() {
1287            self.waypoint.version()
1288        } else if let Some(sync_request) = self.sync_request.as_ref() {
1289            sync_request.target.ledger_info().version()
1290        } else if let Some(new_highest_li) = new_highest_li.as_ref() {
1291            new_highest_li.ledger_info().version()
1292        } else if let Some(target_ledger_info) =
1293            self.target_ledger_info.as_ref()
1294        {
1295            target_ledger_info.ledger_info().version()
1296        } else {
1297            self.local_state.synced_version()
1298        };
1299
1300        let highest_seen_version =
1301            counters::get_version(counters::VersionType::Highest);
1302        let highest_version =
1303            cmp::max(current_highest_version, highest_seen_version);
1304        counters::set_version(counters::VersionType::Highest, highest_version);
1305    }
1306
1307    /// Calculates the next version and epoch to request (assuming the given
1308    /// transaction list and ledger info will be applied successfully).
1309    /// Note: if no ledger info is specified, we assume the next chunk will
1310    /// be for our current epoch.
1311    fn calculate_new_known_version_and_epoch(
1312        &mut self, txn_list_with_proof: TransactionListWithProof,
1313        ledger_info: Option<LedgerInfoWithSignatures>,
1314    ) -> Result<(u64, u64), Error> {
1315        let new_version = self
1316            .local_state
1317            .synced_version()
1318            .checked_add(txn_list_with_proof.len() as u64)
1319            .ok_or_else(|| {
1320                Error::IntegerOverflow(
1321                    "Potential state sync version has overflown".into(),
1322                )
1323            })?;
1324
1325        let mut new_epoch = self.local_state.trusted_epoch();
1326        if let Some(ledger_info) = ledger_info {
1327            if ledger_info.ledger_info().version() == new_version
1328                && ledger_info.ledger_info().ends_epoch()
1329            {
1330                // This chunk is going to finish the current epoch. Choose the
1331                // next one.
1332                new_epoch = new_epoch.checked_add(1).ok_or_else(|| {
1333                    Error::IntegerOverflow(
1334                        "Potential state sync epoch has overflown".into(),
1335                    )
1336                })?;
1337            }
1338        }
1339
1340        Ok((new_version, new_epoch))
1341    }
1342
1343    /// Returns a chunk target for the highest available synchronization.
1344    fn create_highest_available_chunk_target(
1345        &self, target_ledger_info: Option<LedgerInfoWithSignatures>,
1346    ) -> TargetType {
1347        TargetType::HighestAvailable {
1348            target_li: target_ledger_info,
1349            timeout_ms: self.config.long_poll_timeout_ms,
1350        }
1351    }
1352
1353    /// Returns a chunk target for consensus request synchronization.
1354    fn create_sync_request_chunk_target(
1355        &self, known_version: u64,
1356    ) -> Result<TargetType, Error> {
1357        if let Some(sync_request) = &self.sync_request {
1358            let target_version = sync_request.target.ledger_info().version();
1359            if target_version <= known_version {
1360                Err(Error::SyncedBeyondTarget(known_version, target_version))
1361            } else {
1362                let chunk_target = self.create_highest_available_chunk_target(
1363                    Some(sync_request.target.clone()),
1364                );
1365                Ok(chunk_target)
1366            }
1367        } else {
1368            Err(Error::NoSyncRequestFound(
1369                "Unable to create a sync request chunk target".into(),
1370            ))
1371        }
1372    }
1373
1374    /// Returns a chunk target for waypoint synchronization.
1375    fn create_waypoint_chunk_target(&self) -> TargetType {
1376        let waypoint_version = self.waypoint.version();
1377        TargetType::Waypoint(waypoint_version)
1378    }
1379
1380    /*/// Processing chunk responses that carry a LedgerInfo that should be
1381    /// verified using the current local trusted validator set.
1382    fn process_response_with_target_and_highest(
1383        &mut self, txn_list_with_proof: TransactionListWithProof,
1384        response_li: LedgerInfoWithSignatures,
1385        new_highest_li: Option<LedgerInfoWithSignatures>,
1386    ) -> Result<(), Error>
1387    {
1388        // Optimistically calculate the new known version and epoch (assume the
1389        // current chunk is applied successfully).
1390        let (known_version, known_epoch) = self
1391            .calculate_new_known_version_and_epoch(
1392                txn_list_with_proof.clone(),
1393                Some(response_li.clone()),
1394            )?;
1395
1396        // Send the next chunk request based on the sync mode (sync request or
1397        // highest available).
1398        if self.sync_request.is_some() {
1399            match self.create_sync_request_chunk_target(known_version) {
1400                Ok(chunk_target) => {
1401                    // Send the chunk request and log any errors. If errors are
1402                    // logged continue processing the chunk.
1403                    let _ = self.send_chunk_request_and_log_error(
1404                        known_version,
1405                        known_epoch,
1406                        chunk_target,
1407                        LogEntry::ProcessChunkResponse,
1408                    );
1409                }
1410                Err(error) => {
1411                    diem_error!(LogSchema::new(LogEntry::SendChunkRequest)
1412                        .error(&error));
1413                }
1414            }
1415        } else {
1416            let mut new_target_ledger_info = None;
1417            if let Some(target_ledger_info) = self.target_ledger_info.clone() {
1418                if known_version < target_ledger_info.ledger_info().version() {
1419                    new_target_ledger_info = Some(target_ledger_info);
1420                }
1421            }
1422            // Send the chunk request and log any errors. If errors are logged
1423            // continue processing the chunk.
1424            let _ = self.send_chunk_request_and_log_error(
1425                known_version,
1426                known_epoch,
1427                self.create_highest_available_chunk_target(
1428                    new_target_ledger_info,
1429                ),
1430                LogEntry::ProcessChunkResponse,
1431            );
1432        }
1433
1434        // Validate chunk ledger infos
1435        self.local_state.verify_ledger_info(&response_li)?;
1436        if let Some(new_highest_li) = new_highest_li.clone() {
1437            if new_highest_li != response_li {
1438                self.local_state.verify_ledger_info(&new_highest_li)?;
1439            }
1440        }
1441
1442        // Validate and store the chunk
1443        self.log_highest_seen_version(new_highest_li.clone());
1444        self.validate_and_store_chunk(txn_list_with_proof, response_li, None)?;
1445
1446        // Need to sync with local storage to update synced version
1447        self.sync_state_with_local_storage()?;
1448        let synced_version = self.local_state.synced_version();
1449
1450        // Check if we've synced beyond our current target ledger info
1451        if let Some(target_ledger_info) = &self.target_ledger_info {
1452            if synced_version >= target_ledger_info.ledger_info().version() {
1453                self.target_ledger_info = None;
1454            }
1455        }
1456
1457        // If we don't have a target ledger info, check if the new highest
1458        // is appropriate for us.
1459        if self.target_ledger_info.is_none() {
1460            if let Some(new_highest_li) = new_highest_li {
1461                if synced_version < new_highest_li.ledger_info().version() {
1462                    self.target_ledger_info = Some(new_highest_li);
1463                }
1464            }
1465        }
1466
1467        Ok(())
1468    }*/
1469
1470    /*/// Processing chunk responses that carry a LedgerInfo corresponding to the
1471    /// waypoint.
1472    fn process_response_with_waypoint_li(
1473        &mut self, txn_list_with_proof: TransactionListWithProof,
1474        waypoint_li: LedgerInfoWithSignatures,
1475        end_of_epoch_li: Option<LedgerInfoWithSignatures>,
1476    ) -> Result<(), Error>
1477    {
1478        // Optimistically calculate the new known version and epoch (assume the
1479        // current chunk is applied successfully).
1480        let (known_version, known_epoch) = self
1481            .calculate_new_known_version_and_epoch(
1482                txn_list_with_proof.clone(),
1483                end_of_epoch_li.clone(),
1484            )?;
1485        if known_version < self.waypoint.version() {
1486            // Send the chunk request and log any errors. If errors are logged
1487            // continue processing the chunk.
1488            let _ = self.send_chunk_request_and_log_error(
1489                known_version,
1490                known_epoch,
1491                self.create_waypoint_chunk_target(),
1492                LogEntry::ProcessChunkResponse,
1493            );
1494        }
1495
1496        // Verify the end_of_epoch_li against local state and ensure the version
1497        // corresponds to the version at the end of the chunk.
1498        // The executor expects that when it is passed an end_of_epoch_li to
1499        // commit, it is going to execute/commit transactions leading up
1500        // to that li, so we also verify that the end_of_epoch_li
1501        // actually ends the epoch.
1502        let end_of_epoch_li_to_commit = if let Some(end_of_epoch_li) =
1503            end_of_epoch_li
1504        {
1505            self.local_state.verify_ledger_info(&end_of_epoch_li)?;
1506
1507            let ledger_info = end_of_epoch_li.ledger_info();
1508            if !ledger_info.ends_epoch() {
1509                return Err(Error::ProcessInvalidChunk(
1510                    "Received waypoint ledger info with an end_of_epoch_li that does not end the epoch!".into(),
1511                ));
1512            }
1513
1514            // If we're now at the end of epoch version (i.e., known_version is
1515            // the same as the end_of_epoch_li version), the
1516            // end_of_epoch_li should be passed to storage so that we
1517            // can commit the end_of_epoch_li. If not, storage should only sync
1518            // the given chunk.
1519            if ledger_info.version() == known_version {
1520                Some(end_of_epoch_li)
1521            } else {
1522                None
1523            }
1524        } else {
1525            None
1526        };
1527        self.waypoint
1528            .verify(waypoint_li.ledger_info())
1529            .map_err(|error| {
1530                Error::UnexpectedError(format!(
1531                    "Waypoint verification failed: {}",
1532                    error
1533                ))
1534            })?;
1535
1536        self.validate_and_store_chunk(
1537            txn_list_with_proof,
1538            waypoint_li,
1539            end_of_epoch_li_to_commit,
1540        )?;
1541        self.log_highest_seen_version(None);
1542
1543        Ok(())
1544    }*/
1545
1546    // Assumes that the target LI has been already verified by the caller.
1547    fn validate_and_store_chunk(
1548        &mut self, txn_list_with_proof: TransactionListWithProof,
1549        target: LedgerInfoWithSignatures,
1550        intermediate_end_of_epoch_li: Option<LedgerInfoWithSignatures>,
1551    ) -> Result<(), Error> {
1552        let target_epoch = target.ledger_info().epoch();
1553        let target_version = target.ledger_info().version();
1554        let local_epoch = self.local_state.committed_epoch();
1555        let local_version = self.local_state.committed_version();
1556        if (target_epoch, target_version) <= (local_epoch, local_version) {
1557            diem_warn!(
1558                LogSchema::event_log(
1559                    LogEntry::ProcessChunkResponse,
1560                    LogEvent::OldResponseLI
1561                )
1562                .local_li_version(local_version)
1563                .local_epoch(local_epoch),
1564                response_li_version = target_version,
1565                response_li_epoch = target_epoch
1566            );
1567            return Ok(());
1568        }
1569
1570        self.executor_proxy.execute_chunk(
1571            txn_list_with_proof,
1572            target,
1573            intermediate_end_of_epoch_li,
1574        )
1575    }
1576
1577    /// Returns true if consensus is currently executing and state sync should
1578    /// therefore not write to storage. Reads are still permitted (e.g., to
1579    /// handle chunk requests).
1580    fn is_consensus_executing(&mut self) -> bool {
1581        self.is_initialized()
1582            && self.role == RoleType::Validator
1583            && self.sync_request.is_none()
1584    }
1585
1586    /// Ensures that state sync is making progress:
1587    /// * Kick starts the initial sync process (e.g., syncing to a waypoint or
1588    ///   target).
1589    /// * Issues a new request if too much time has passed since the last
1590    ///   request was sent.
1591    fn check_progress(&mut self) -> Result<(), Error> {
1592        if self.is_consensus_executing() {
1593            return Ok(()); // No need to check progress or issue any requests
1594                           // (consensus is running).
1595        }
1596
1597        // Check if the sync request has timed out (i.e., if we aren't
1598        // committing fast enough)
1599        if let Some(sync_request) = self.sync_request.as_ref() {
1600            let timeout_between_commits =
1601                Duration::from_millis(self.config.sync_request_timeout_ms);
1602            let commit_deadline = sync_request
1603                .last_commit_timestamp
1604                .checked_add(timeout_between_commits)
1605                .ok_or_else(|| {
1606                    Error::IntegerOverflow(
1607                        "The commit deadline timestamp has overflown!".into(),
1608                    )
1609                })?;
1610
1611            // Check if the commit deadline has been exceeded.
1612            if SystemTime::now().duration_since(commit_deadline).is_ok() {
1613                counters::SYNC_REQUEST_RESULT
1614                    .with_label_values(&[counters::TIMEOUT_LABEL])
1615                    .inc();
1616                diem_warn!(LogSchema::event_log(
1617                    LogEntry::SyncRequest,
1618                    LogEvent::Timeout
1619                ));
1620
1621                // Remove the sync request and notify consensus that the request
1622                // timed out!
1623                if let Some(sync_request) = self.sync_request.take() {
1624                    if let Err(e) = Self::send_sync_req_callback(
1625                        sync_request,
1626                        Err(Error::UnexpectedError(
1627                            "Sync request timed out!".into(),
1628                        )),
1629                    ) {
1630                        diem_error!(LogSchema::event_log(
1631                            LogEntry::SyncRequest,
1632                            LogEvent::CallbackFail
1633                        )
1634                        .error(&e));
1635                    }
1636                }
1637            }
1638        }
1639
1640        // If the coordinator didn't make progress by the expected time or did
1641        // not send a request for the current local synced version,
1642        // issue a new request.
1643        /*let known_version = self.local_state.synced_version();
1644        if self.request_manager.has_request_timed_out(known_version)? {
1645            counters::TIMEOUT.inc();
1646            diem_warn!(LogSchema::new(LogEntry::Timeout).version(known_version));
1647
1648            let trusted_epoch = self.local_state.trusted_epoch();
1649            let chunk_target = if !self.is_initialized() {
1650                self.create_waypoint_chunk_target()
1651            } else if self.sync_request.is_some() {
1652                self.create_sync_request_chunk_target(known_version)?
1653            } else {
1654                self.create_highest_available_chunk_target(
1655                    self.target_ledger_info.clone(),
1656                )
1657            };
1658            self.send_chunk_request_and_log_error(
1659                known_version,
1660                trusted_epoch,
1661                chunk_target,
1662                LogEntry::Timeout,
1663            )
1664        } else {
1665            Ok(())
1666        }*/
1667        Ok(())
1668    }
1669
1670    /*/// Sends a chunk request with a given `known_version`, `known_epoch` and
1671    /// `chunk_target`. Immediately logs any errors returned by the
1672    /// operation using the given log entry.
1673    fn send_chunk_request_and_log_error(
1674        &mut self, known_version: u64, known_epoch: u64,
1675        chunk_target: TargetType, log_entry: LogEntry,
1676    ) -> Result<(), Error>
1677    {
1678        if let Err(error) = self.send_chunk_request_with_target(
1679            known_version,
1680            known_epoch,
1681            chunk_target,
1682        ) {
1683            diem_error!(LogSchema::event_log(
1684                log_entry,
1685                LogEvent::SendChunkRequestFail
1686            )
1687            .version(known_version)
1688            .local_epoch(known_epoch)
1689            .error(&error));
1690            Err(error)
1691        } else {
1692            Ok(())
1693        }
1694    }*/
1695
1696    /*/// Sends a chunk request with a given `known_version`, `known_epoch` and
1697    /// `target`.
1698    fn send_chunk_request_with_target(
1699        &mut self, known_version: u64, known_epoch: u64, target: TargetType,
1700    ) -> Result<(), Error> {
1701        if self.request_manager.no_available_peers() {
1702            diem_warn!(LogSchema::event_log(
1703                LogEntry::SendChunkRequest,
1704                LogEvent::MissingPeers
1705            ));
1706            return Err(Error::NoAvailablePeers(
1707                "No peers to send chunk request to!".into(),
1708            ));
1709        }
1710
1711        let target_version = target
1712            .version()
1713            .unwrap_or_else(|| known_version.wrapping_add(1));
1714        counters::set_version(counters::VersionType::Target, target_version);
1715
1716        let req = GetChunkRequest::new(
1717            known_version,
1718            known_epoch,
1719            self.config.chunk_limit,
1720            target,
1721        );
1722        self.request_manager.send_chunk_request(req)
1723    }*/
1724
1725    fn deliver_subscription(
1726        &mut self, peer: PeerNetworkId, request_info: PendingRequestInfo,
1727        local_version: u64,
1728    ) -> Result<(), Error> {
1729        let (target_li, highest_li) = self.calculate_target_and_highest_li(
1730            request_info.request_epoch,
1731            request_info.target_li,
1732            local_version,
1733        )?;
1734
1735        self.deliver_chunk(
1736            peer,
1737            request_info.known_version,
1738            ResponseLedgerInfo::ProgressiveLedgerInfo {
1739                target_li,
1740                highest_li,
1741            },
1742            request_info.chunk_limit,
1743        )
1744    }
1745
1746    /// The function is called after the local storage is updated with new
1747    /// transactions: it might deliver chunks for the subscribers that have
1748    /// been waiting with the long polls.
1749    ///
1750    /// Note that it is possible to help the subscribers only with the
1751    /// transactions that match the highest ledger info in the local storage
1752    /// (some committed transactions are ahead of the latest ledger info and
1753    /// are not going to be used for helping the remote subscribers).
1754    /// The function assumes that the local state has been synced with storage.
1755    fn check_subscriptions(&mut self) {
1756        let highest_li_version = self.local_state.committed_version();
1757
1758        let mut ready = vec![];
1759        self.subscriptions.retain(|peer, request_info| {
1760            // filter out expired peer requests
1761            if SystemTime::now()
1762                .duration_since(request_info.expiration_time)
1763                .is_ok()
1764            {
1765                return false;
1766            }
1767            if request_info.known_version < highest_li_version {
1768                ready.push((peer.clone(), request_info.clone()));
1769                false
1770            } else {
1771                true
1772            }
1773        });
1774
1775        ready.into_iter().for_each(|(peer, request_info)| {
1776            let result_label = if let Err(err) = self.deliver_subscription(
1777                peer.clone(),
1778                request_info,
1779                highest_li_version,
1780            ) {
1781                diem_error!(LogSchema::new(LogEntry::SubscriptionDeliveryFail)
1782                    .peer(&peer)
1783                    .error(&err));
1784                counters::FAIL_LABEL
1785            } else {
1786                counters::SUCCESS_LABEL
1787            };
1788            counters::SUBSCRIPTION_DELIVERY_COUNT
1789                .with_label_values(&[
1790                    &peer.raw_network_id().to_string(),
1791                    &peer.peer_id().to_string(),
1792                    result_label,
1793                ])
1794                .inc();
1795        });
1796    }
1797
1798    fn send_sync_req_callback(
1799        sync_req: SyncRequest, msg: Result<(), Error>,
1800    ) -> Result<(), Error> {
1801        sync_req.callback.send(msg).map_err(|failed_msg| {
1802            counters::FAILED_CHANNEL_SEND
1803                .with_label_values(&[counters::CONSENSUS_SYNC_REQ_CALLBACK])
1804                .inc();
1805            Error::UnexpectedError(format!(
1806                "Consensus sync request callback error - failed to send the following message: {:?}",
1807                failed_msg
1808            ))
1809        })
1810    }
1811
1812    fn send_initialization_callback(
1813        callback: oneshot::Sender<Result<(), Error>>,
1814    ) -> Result<(), Error> {
1815        match callback.send(Ok(())) {
1816            Err(error) => {
1817                counters::FAILED_CHANNEL_SEND
1818                    .with_label_values(&[counters::WAYPOINT_INIT_CALLBACK])
1819                    .inc();
1820                Err(Error::CallbackSendFailed(format!(
1821                    "Waypoint initialization callback error - failed to send following msg: {:?}",
1822                    error
1823                )))
1824            }
1825            _ => Ok(()),
1826        }
1827    }
1828}
1829
1830/*
1831#[cfg(test)]
1832mod tests {
1833    use crate::pos::{
1834        mempool::CommitResponse,
1835        state_sync::{
1836            chunk_request::{GetChunkRequest, TargetType},
1837            chunk_response::{GetChunkResponse, ResponseLedgerInfo},
1838            client::SyncRequest,
1839            coordinator::StateSyncCoordinator,
1840            error::Error,
1841            executor_proxy::ExecutorProxy,
1842            network::StateSyncMessage,
1843            shared_components::test_utils::{
1844                self, create_coordinator_with_config_and_waypoint,
1845            },
1846        },
1847    };
1848    use diem_config::{
1849        config::{NodeConfig, PeerNetworkId, PeerRole, RoleType},
1850        network_id::{NetworkId, NodeNetworkId},
1851    };
1852    use diem_crypto::{
1853        ed25519::{Ed25519PrivateKey, Ed25519Signature},
1854        HashValue, PrivateKey, Uniform,
1855    };
1856    use diem_types::{
1857        account_address::AccountAddress,
1858        block_info::BlockInfo,
1859        chain_id::ChainId,
1860        ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
1861        proof::TransactionListProof,
1862        transaction::{
1863            RawTransaction, Script, SignedTransaction, Transaction,
1864            TransactionListWithProof, TransactionPayload, Version,
1865        },
1866        waypoint::Waypoint,
1867        PeerId,
1868    };
1869    use futures::{channel::oneshot, executor::block_on};
1870    use netcore::transport::ConnectionOrigin;
1871    use network::transport::ConnectionMetadata;
1872    use std::{collections::BTreeMap, time::SystemTime};
1873
1874    #[test]
1875    fn test_process_sync_request() {
1876        // Create a coordinator for a full node
1877        let mut full_node_coordinator =
1878            test_utils::create_full_node_coordinator();
1879
1880        // Verify that fullnodes can't process sync requests
1881        let (sync_request, _) = create_sync_request_at_version(0);
1882        let process_result =
1883            full_node_coordinator.process_sync_request(sync_request);
1884        if !matches!(process_result, Err(Error::FullNodeSyncRequest)) {
1885            panic!(
1886                "Expected an full node sync request error, but got: {:?}",
1887                process_result
1888            );
1889        }
1890
1891        // Create a coordinator for a validator node
1892        let mut validator_coordinator =
1893            test_utils::create_validator_coordinator();
1894
1895        // Perform sync request for version that matches initial waypoint
1896        // version
1897        let (sync_request, mut callback_receiver) =
1898            create_sync_request_at_version(0);
1899        validator_coordinator
1900            .process_sync_request(sync_request)
1901            .unwrap();
1902        match callback_receiver.try_recv() {
1903            Ok(Some(result)) => assert!(result.is_ok()),
1904            result => panic!("Expected okay but got: {:?}", result),
1905        };
1906
1907        // Create validator coordinator with waypoint higher than 0
1908        let waypoint_version = 10;
1909        let waypoint_ledger_info =
1910            create_ledger_info_at_version(waypoint_version);
1911        let waypoint = Waypoint::new_any(&waypoint_ledger_info.ledger_info());
1912        let mut validator_coordinator =
1913            create_coordinator_with_config_and_waypoint(
1914                NodeConfig::default(),
1915                waypoint,
1916            );
1917
1918        // Verify coordinator won't process sync requests as it's not yet
1919        // initialized
1920        let (sync_request, mut callback_receiver) =
1921            create_sync_request_at_version(10);
1922        let process_result =
1923            validator_coordinator.process_sync_request(sync_request);
1924        if !matches!(process_result, Err(Error::UninitializedError(..))) {
1925            panic!(
1926                "Expected an uninitialized error, but got: {:?}",
1927                process_result
1928            );
1929        }
1930        let callback_result = callback_receiver.try_recv();
1931        if !matches!(callback_result, Err(_)) {
1932            panic!("Expected error but got: {:?}", callback_result);
1933        }
1934
1935        // TODO(joshlind): add a check for syncing to old versions once we
1936        // support storage modifications in unit tests.
1937    }
1938
1939    #[test]
1940    fn test_get_sync_state() {
1941        // Create a coordinator for a validator node
1942        let mut validator_coordinator =
1943            test_utils::create_validator_coordinator();
1944
1945        // Get the sync state from state sync
1946        let (callback_sender, mut callback_receiver) = oneshot::channel();
1947        validator_coordinator
1948            .get_sync_state(callback_sender)
1949            .unwrap();
1950        match callback_receiver.try_recv() {
1951            Ok(Some(sync_state)) => {
1952                assert_eq!(sync_state.committed_version(), 0);
1953            }
1954            result => panic!("Expected okay but got: {:?}", result),
1955        };
1956
1957        // Drop the callback receiver and verify error
1958        let (callback_sender, _) = oneshot::channel();
1959        let sync_state_result =
1960            validator_coordinator.get_sync_state(callback_sender);
1961        if !matches!(sync_state_result, Err(Error::CallbackSendFailed(..))) {
1962            panic!("Expected error but got: {:?}", sync_state_result);
1963        }
1964    }
1965
1966    #[test]
1967    fn test_wait_for_initialization() {
1968        // Create a coordinator for a validator node
1969        let mut validator_coordinator =
1970            test_utils::create_validator_coordinator();
1971
1972        // Check already initialized returns immediately
1973        let (callback_sender, mut callback_receiver) = oneshot::channel();
1974        validator_coordinator
1975            .wait_for_initialization(callback_sender)
1976            .unwrap();
1977        match callback_receiver.try_recv() {
1978            Ok(Some(result)) => assert!(result.is_ok()),
1979            result => panic!("Expected okay but got: {:?}", result),
1980        };
1981
1982        // Drop the callback receiver and verify error
1983        let (callback_sender, _) = oneshot::channel();
1984        let initialization_result =
1985            validator_coordinator.wait_for_initialization(callback_sender);
1986        if !matches!(initialization_result, Err(Error::CallbackSendFailed(..)))
1987        {
1988            panic!("Expected error but got: {:?}", initialization_result);
1989        }
1990
1991        // Create a coordinator with the waypoint version higher than 0
1992        let waypoint_version = 10;
1993        let waypoint_ledger_info =
1994            create_ledger_info_at_version(waypoint_version);
1995        let waypoint = Waypoint::new_any(&waypoint_ledger_info.ledger_info());
1996        let mut validator_coordinator =
1997            create_coordinator_with_config_and_waypoint(
1998                NodeConfig::default(),
1999                waypoint,
2000            );
2001
2002        // Verify callback is not executed as state sync is not yet initialized
2003        let (callback_sender, mut callback_receiver) = oneshot::channel();
2004        validator_coordinator
2005            .wait_for_initialization(callback_sender)
2006            .unwrap();
2007        let callback_result = callback_receiver.try_recv();
2008        if !matches!(callback_result, Ok(None)) {
2009            panic!("Expected none but got: {:?}", callback_result);
2010        }
2011
2012        // TODO(joshlind): add a check that verifies the callback is executed
2013        // once we can update storage in the unit tests.
2014    }
2015
2016    #[test]
2017    fn test_process_commit_notification() {
2018        // Create a coordinator for a validator node
2019        let mut validator_coordinator =
2020            test_utils::create_validator_coordinator();
2021
2022        // Verify that a commit notification with no transactions doesn't
2023        // diem_error!
2024        block_on(validator_coordinator.process_commit_notification(
2025            vec![],
2026            None,
2027            vec![],
2028            None,
2029        ))
2030        .unwrap();
2031
2032        // Verify that consensus is sent a commit ack when everything works
2033        let (callback_sender, mut callback_receiver) =
2034            oneshot::channel::<Result<CommitResponse, Error>>();
2035        block_on(validator_coordinator.process_commit_notification(
2036            vec![],
2037            Some(callback_sender),
2038            vec![],
2039            None,
2040        ))
2041        .unwrap();
2042        let callback_result = callback_receiver.try_recv();
2043        if !matches!(callback_result, Ok(Some(Ok(..)))) {
2044            panic!("Expected an okay result but got: {:?}", callback_result);
2045        }
2046
2047        // TODO(joshlind): verify that mempool is sent the correct transactions!
2048        let (callback_sender, _callback_receiver) =
2049            oneshot::channel::<Result<CommitResponse, Error>>();
2050        let committed_transactions = vec![create_test_transaction()];
2051        block_on(validator_coordinator.process_commit_notification(
2052            committed_transactions,
2053            Some(callback_sender),
2054            vec![],
2055            None,
2056        ))
2057        .unwrap();
2058
2059        // TODO(joshlind): check initialized is fired when unit tests support
2060        // storage modifications.
2061
2062        // TODO(joshlind): check sync request is called when unit tests support
2063        // storage modifications.
2064
2065        // TODO(joshlind): test that long poll requests are handled
2066        // appropriately when new unit tests support this.
2067
2068        // TODO(joshlind): test that reconfiguration events are handled
2069        // appropriately and listeners are notified.
2070    }
2071
2072    #[test]
2073    fn test_check_progress() {
2074        // Create a coordinator for a validator node
2075        let mut validator_coordinator =
2076            test_utils::create_validator_coordinator();
2077
2078        // Verify no error is returned when consensus is running
2079        validator_coordinator.check_progress().unwrap();
2080
2081        // Send a sync request to state sync (to mark that consensus is no
2082        // longer running)
2083        let (sync_request, _) = create_sync_request_at_version(1);
2084        let _ = validator_coordinator.process_sync_request(sync_request);
2085
2086        // Verify the no available peers error is returned
2087        let progress_result = validator_coordinator.check_progress();
2088        if !matches!(progress_result, Err(Error::NoAvailablePeers(..))) {
2089            panic!("Expected an err result but got: {:?}", progress_result);
2090        }
2091
2092        // Create validator coordinator with tiny state sync timeout
2093        let mut node_config = NodeConfig::default();
2094        node_config.base.role = RoleType::Validator;
2095        node_config.state_sync.sync_request_timeout_ms = 0;
2096        let mut validator_coordinator =
2097            create_coordinator_with_config_and_waypoint(
2098                node_config,
2099                Waypoint::default(),
2100            );
2101
2102        // Set a new sync request
2103        let (sync_request, mut callback_receiver) =
2104            create_sync_request_at_version(1);
2105        let _ = validator_coordinator.process_sync_request(sync_request);
2106
2107        // Verify sync request timeout notifies the callback
2108        validator_coordinator.check_progress().unwrap_err();
2109        let callback_result = callback_receiver.try_recv();
2110        if !matches!(callback_result, Ok(Some(Err(..)))) {
2111            panic!("Expected an err result but got: {:?}", callback_result);
2112        }
2113
2114        // TODO(joshlind): check request resend after timeout.
2115
2116        // TODO(joshlind): check overflow error returns.
2117
2118        // TODO(joshlind): test that check progress passes when there are valid
2119        // peers.
2120    }
2121
2122    #[test]
2123    fn test_new_and_lost_peers() {
2124        // Create a coordinator for a validator node
2125        let mut validator_coordinator =
2126            test_utils::create_validator_coordinator();
2127
2128        // Create a public peer
2129        let node_network_id = NodeNetworkId::new(NetworkId::Public, 0);
2130        let peer_id = PeerId::random();
2131        let connection_metadata = ConnectionMetadata::mock_with_role_and_origin(
2132            peer_id,
2133            PeerRole::Validator,
2134            ConnectionOrigin::Inbound,
2135        );
2136
2137        // Verify error is returned when adding peer that is not a valid peer
2138        let new_peer_result = validator_coordinator
2139            .process_new_peer(node_network_id, connection_metadata.clone());
2140        if !matches!(new_peer_result, Err(Error::InvalidStateSyncPeer(..))) {
2141            panic!(
2142                "Expected an invalid peer error but got: {:?}",
2143                new_peer_result
2144            );
2145        }
2146
2147        // Verify the same error is not returned when adding a validator node
2148        let node_network_id = NodeNetworkId::new(NetworkId::Validator, 0);
2149        let new_peer_result = validator_coordinator
2150            .process_new_peer(node_network_id.clone(), connection_metadata);
2151        if matches!(new_peer_result, Err(Error::InvalidStateSyncPeer(..))) {
2152            panic!(
2153                "Expected not to receive an invalid peer error but got: {:?}",
2154                new_peer_result
2155            );
2156        }
2157
2158        // Verify no error is returned when removing the node
2159        validator_coordinator
2160            .process_lost_peer(node_network_id, peer_id)
2161            .unwrap();
2162    }
2163
2164    #[test]
2165    fn test_invalid_chunk_request_messages() {
2166        // Create a coordinator for a validator node
2167        let mut validator_coordinator =
2168            test_utils::create_validator_coordinator();
2169
2170        // Constants for the chunk requests
2171        let peer_network_id = PeerNetworkId::random();
2172        let current_epoch = 0;
2173        let chunk_limit = 250;
2174        let timeout_ms = 1000;
2175
2176        // Create chunk requests with a known version higher than the target
2177        let known_version = 100;
2178        let target_version = 10;
2179        let chunk_requests = create_chunk_requests(
2180            known_version,
2181            current_epoch,
2182            chunk_limit,
2183            target_version,
2184            timeout_ms,
2185        );
2186
2187        // Verify invalid request errors are thrown
2188        verify_all_chunk_requests_are_invalid(
2189            &mut validator_coordinator,
2190            &peer_network_id,
2191            &chunk_requests,
2192        );
2193
2194        // Create chunk requests with a current epoch higher than the target
2195        // epoch
2196        let known_version = 0;
2197        let current_epoch = 100;
2198        let chunk_requests = create_chunk_requests(
2199            known_version,
2200            current_epoch,
2201            chunk_limit,
2202            target_version,
2203            timeout_ms,
2204        );
2205
2206        // Verify invalid request errors are thrown
2207        verify_all_chunk_requests_are_invalid(
2208            &mut validator_coordinator,
2209            &peer_network_id,
2210            &chunk_requests[1..2], // Ignore waypoint request
2211        );
2212
2213        // Create chunk requests with a chunk limit size of 0 (which is a
2214        // pointless request)
2215        let chunk_limit = 0;
2216        let chunk_requests = create_chunk_requests(
2217            known_version,
2218            current_epoch,
2219            chunk_limit,
2220            target_version,
2221            timeout_ms,
2222        );
2223
2224        // Verify invalid request errors are thrown
2225        verify_all_chunk_requests_are_invalid(
2226            &mut validator_coordinator,
2227            &peer_network_id,
2228            &chunk_requests,
2229        );
2230
2231        // Create chunk requests with a long poll timeout of 0 (which is a
2232        // pointless request)
2233        let chunk_limit = 0;
2234        let chunk_requests = create_chunk_requests(
2235            known_version,
2236            current_epoch,
2237            chunk_limit,
2238            target_version,
2239            timeout_ms,
2240        );
2241
2242        // Verify invalid request errors are thrown
2243        verify_all_chunk_requests_are_invalid(
2244            &mut validator_coordinator,
2245            &peer_network_id,
2246            &chunk_requests,
2247        );
2248    }
2249
2250    #[test]
2251    fn test_process_chunk_response_messages() {
2252        // Create a coordinator for a validator node
2253        let mut validator_coordinator =
2254            test_utils::create_validator_coordinator();
2255
2256        // Create a peer and empty chunk responses
2257        let peer_network_id = PeerNetworkId::random_validator();
2258        let empty_chunk_responses = create_empty_chunk_responses(10);
2259
2260        // Verify a consensus error is returned when processing each chunk
2261        for chunk_response in &empty_chunk_responses {
2262            let result = block_on(validator_coordinator.process_chunk_message(
2263                peer_network_id.network_id(),
2264                peer_network_id.peer_id(),
2265                chunk_response.clone(),
2266            ));
2267            if !matches!(result, Err(Error::ConsensusIsExecuting)) {
2268                panic!("Expected consensus executing error, got: {:?}", result);
2269            }
2270        }
2271
2272        // Make a sync request (to force consensus to yield)
2273        let (sync_request, _) = create_sync_request_at_version(10);
2274        let _ = validator_coordinator.process_sync_request(sync_request);
2275
2276        // Verify we now get a downstream error (as the peer is downstream to
2277        // us)
2278        for chunk_response in &empty_chunk_responses {
2279            let result = block_on(validator_coordinator.process_chunk_message(
2280                peer_network_id.network_id(),
2281                peer_network_id.peer_id(),
2282                chunk_response.clone(),
2283            ));
2284            if !matches!(result, Err(Error::ReceivedChunkFromDownstream(..))) {
2285                panic!("Expected a downstream error, but got: {:?}", result);
2286            }
2287        }
2288
2289        // Add the peer to our known peers
2290        process_new_peer_event(&mut validator_coordinator, &peer_network_id);
2291
2292        // Verify we now get an empty chunk error
2293        for chunk_response in &empty_chunk_responses {
2294            let result = block_on(validator_coordinator.process_chunk_message(
2295                peer_network_id.network_id(),
2296                peer_network_id.peer_id(),
2297                chunk_response.clone(),
2298            ));
2299            if !matches!(result, Err(Error::ReceivedEmptyChunk(..))) {
2300                panic!("Expected an empty chunk error, got: {:?}", result);
2301            }
2302        }
2303
2304        // Send a non-empty chunk with a version mismatch and verify a mismatch
2305        // error is returned
2306        let chunk_responses = create_non_empty_chunk_responses(10);
2307        for chunk_response in &chunk_responses {
2308            let result = block_on(validator_coordinator.process_chunk_message(
2309                peer_network_id.network_id(),
2310                peer_network_id.peer_id(),
2311                chunk_response.clone(),
2312            ));
2313            if !matches!(result, Err(Error::ReceivedNonSequentialChunk(..))) {
2314                panic!(
2315                    "Expected a non-sequential error, but got: {:?}",
2316                    result
2317                );
2318            }
2319        }
2320    }
2321
2322    #[test]
2323    fn test_process_chunk_response_highest() {
2324        // Create a coordinator for a full node
2325        let mut full_node_coordinator =
2326            test_utils::create_full_node_coordinator();
2327
2328        // Create a peer for the node and add the peer as a known peer
2329        let peer_network_id = PeerNetworkId::random_validator();
2330        process_new_peer_event(&mut full_node_coordinator, &peer_network_id);
2331
2332        // Verify wrong chunk type for non-highest messages
2333        let chunk_responses = create_non_empty_chunk_responses(1);
2334        verify_all_chunk_responses_are_the_wrong_type(
2335            &mut full_node_coordinator,
2336            &peer_network_id,
2337            &chunk_responses[0..1], /* Ignore the target and highest chunk
2338                                     * responses */
2339        );
2340
2341        // Verify highest known version must be greater than target version
2342        let response_ledger_info = ResponseLedgerInfo::ProgressiveLedgerInfo {
2343            target_li: create_ledger_info_at_version(100),
2344            highest_li: Some(create_ledger_info_at_version(10)),
2345        };
2346        let highest_response = create_chunk_response_message(
2347            response_ledger_info,
2348            create_dummy_transaction_list_with_proof(1),
2349        );
2350        verify_all_chunk_responses_are_invalid(
2351            &mut full_node_coordinator,
2352            &peer_network_id,
2353            &[highest_response],
2354        );
2355
2356        // Verify invalid ledger infos are rejected
2357        let response_ledger_info = ResponseLedgerInfo::ProgressiveLedgerInfo {
2358            target_li: create_ledger_info_at_version(100),
2359            highest_li: None,
2360        };
2361        let highest_response = create_chunk_response_message(
2362            response_ledger_info,
2363            create_dummy_transaction_list_with_proof(1),
2364        );
2365        verify_all_chunk_responses_are_invalid(
2366            &mut full_node_coordinator,
2367            &peer_network_id,
2368            &[highest_response],
2369        );
2370    }
2371
2372    #[test]
2373    fn test_process_chunk_response_target() {
2374        // Create a coordinator for a validator
2375        let mut validator_coordinator =
2376            test_utils::create_validator_coordinator();
2377
2378        // Create a peer for the node and add the peer as a known peer
2379        let peer_network_id = PeerNetworkId::random_validator();
2380        process_new_peer_event(&mut validator_coordinator, &peer_network_id);
2381
2382        // Make a sync request (to force consensus to yield)
2383        let (sync_request, _) = create_sync_request_at_version(10);
2384        let _ = validator_coordinator.process_sync_request(sync_request);
2385
2386        // Verify wrong chunk type for waypoint message
2387        let chunk_responses = create_non_empty_chunk_responses(1);
2388        verify_all_chunk_responses_are_the_wrong_type(
2389            &mut validator_coordinator,
2390            &peer_network_id,
2391            &chunk_responses[0..1], /* Ignore the target and highest chunk
2392                                     * responses */
2393        );
2394
2395        // Verify ledger info version doesn't exceed sync request version
2396        let ledger_info = create_ledger_info_at_version(100);
2397        let response_ledger_info =
2398            ResponseLedgerInfo::VerifiableLedgerInfo(ledger_info);
2399        let target_response = create_chunk_response_message(
2400            response_ledger_info,
2401            create_dummy_transaction_list_with_proof(1),
2402        );
2403        verify_all_chunk_responses_are_invalid(
2404            &mut validator_coordinator,
2405            &peer_network_id,
2406            &[target_response],
2407        );
2408
2409        // Verify invalid ledger infos are rejected
2410        let ledger_info = create_ledger_info_at_version(5);
2411        let response_ledger_info =
2412            ResponseLedgerInfo::VerifiableLedgerInfo(ledger_info);
2413        let target_response = create_chunk_response_message(
2414            response_ledger_info,
2415            create_dummy_transaction_list_with_proof(1),
2416        );
2417        verify_all_chunk_responses_are_invalid(
2418            &mut validator_coordinator,
2419            &peer_network_id,
2420            &[target_response],
2421        );
2422    }
2423
2424    #[test]
2425    fn test_process_chunk_response_waypoint() {
2426        // Create a coordinator for a validator node with waypoint version of 10
2427        let waypoint_ledger_info = create_ledger_info_at_version(10);
2428        let waypoint = Waypoint::new_any(&waypoint_ledger_info.ledger_info());
2429        let mut validator_coordinator =
2430            create_coordinator_with_config_and_waypoint(
2431                NodeConfig::default(),
2432                waypoint,
2433            );
2434
2435        // Create a peer for the node and add the peer as a known peer
2436        let peer_network_id = PeerNetworkId::random_validator();
2437        process_new_peer_event(&mut validator_coordinator, &peer_network_id);
2438
2439        // Verify wrong chunk type for non-waypoint messages
2440        let chunk_responses = create_non_empty_chunk_responses(1);
2441        verify_all_chunk_responses_are_the_wrong_type(
2442            &mut validator_coordinator,
2443            &peer_network_id,
2444            &chunk_responses[1..=2], // Ignore the waypoint chunk response
2445        );
2446
2447        // Verify end of epoch version is less than waypoint version
2448        let response_ledger_info = ResponseLedgerInfo::LedgerInfoForWaypoint {
2449            waypoint_li: create_ledger_info_at_version(10),
2450            end_of_epoch_li: Some(create_ledger_info_at_version(100)),
2451        };
2452        let waypoint_response = create_chunk_response_message(
2453            response_ledger_info,
2454            create_dummy_transaction_list_with_proof(1),
2455        );
2456        verify_all_chunk_responses_are_invalid(
2457            &mut validator_coordinator,
2458            &peer_network_id,
2459            &[waypoint_response],
2460        );
2461
2462        // Verify that invalid waypoint ledger infos are rejected
2463        let response_ledger_info = ResponseLedgerInfo::LedgerInfoForWaypoint {
2464            waypoint_li: create_ledger_info_at_version(10),
2465            end_of_epoch_li: Some(create_ledger_info_at_version(10)),
2466        };
2467        let waypoint_response = create_chunk_response_message(
2468            response_ledger_info,
2469            create_dummy_transaction_list_with_proof(1),
2470        );
2471        verify_all_chunk_responses_are_invalid(
2472            &mut validator_coordinator,
2473            &peer_network_id,
2474            &[waypoint_response],
2475        );
2476    }
2477
2478    fn create_test_transaction() -> Transaction {
2479        let private_key = Ed25519PrivateKey::generate_for_testing();
2480        let public_key = private_key.public_key();
2481
2482        let transaction_payload =
2483            TransactionPayload::Script(Script::new(vec![], vec![], vec![]));
2484        let raw_transaction = RawTransaction::new(
2485            AccountAddress::random(),
2486            0,
2487            transaction_payload,
2488            0,
2489            0,
2490            "".into(),
2491            0,
2492            ChainId::new(10),
2493        );
2494        let signed_transaction = SignedTransaction::new(
2495            raw_transaction,
2496            public_key,
2497            Ed25519Signature::dummy_signature(),
2498        );
2499
2500        Transaction::UserTransaction(signed_transaction)
2501    }
2502
2503    fn create_ledger_info_at_version(
2504        version: Version,
2505    ) -> LedgerInfoWithSignatures {
2506        let block_info = BlockInfo::new(
2507            0,
2508            0,
2509            HashValue::zero(),
2510            HashValue::zero(),
2511            version,
2512            0,
2513            None,
2514        );
2515        let ledger_info = LedgerInfo::new(block_info, HashValue::random());
2516        LedgerInfoWithSignatures::new(ledger_info, BTreeMap::new())
2517    }
2518
2519    fn create_sync_request_at_version(
2520        version: Version,
2521    ) -> (SyncRequest, oneshot::Receiver<Result<(), Error>>) {
2522        // Create ledger info with signatures at given version
2523        let ledger_info = create_ledger_info_at_version(version);
2524
2525        // Create sync request with target version and callback
2526        let (callback_sender, callback_receiver) = oneshot::channel();
2527        let sync_request = SyncRequest {
2528            callback: callback_sender,
2529            target: ledger_info,
2530            last_commit_timestamp: SystemTime::now(),
2531        };
2532
2533        (sync_request, callback_receiver)
2534    }
2535
2536    /// Creates a set of chunk requests (one for each type of possible request).
2537    /// The returned request types are: [waypoint, target, highest].
2538    fn create_chunk_requests(
2539        known_version: Version, current_epoch: u64, chunk_limit: u64,
2540        target_version: u64, timeout_ms: u64,
2541    ) -> Vec<StateSyncMessage>
2542    {
2543        // Create a waypoint chunk request
2544        let target = TargetType::Waypoint(target_version);
2545        let waypoint_request = create_chunk_request_message(
2546            known_version,
2547            current_epoch,
2548            chunk_limit,
2549            target,
2550        );
2551
2552        // Create a highest chunk request
2553        let target_li = Some(create_ledger_info_at_version(target_version));
2554        let target = TargetType::HighestAvailable {
2555            target_li,
2556            timeout_ms,
2557        };
2558        let highest_request = create_chunk_request_message(
2559            known_version,
2560            current_epoch,
2561            chunk_limit,
2562            target,
2563        );
2564
2565        // Create a target chunk request
2566        let target_ledger_info = create_ledger_info_at_version(target_version);
2567        let target = TargetType::TargetLedgerInfo(target_ledger_info);
2568        let target_request = create_chunk_request_message(
2569            known_version,
2570            current_epoch,
2571            chunk_limit,
2572            target,
2573        );
2574
2575        vec![waypoint_request, target_request, highest_request]
2576    }
2577
2578    fn create_chunk_request_message(
2579        known_version: Version, current_epoch: u64, chunk_limit: u64,
2580        target: TargetType,
2581    ) -> StateSyncMessage
2582    {
2583        let chunk_request = GetChunkRequest::new(
2584            known_version,
2585            current_epoch,
2586            chunk_limit,
2587            target,
2588        );
2589        StateSyncMessage::GetChunkRequest(Box::new(chunk_request))
2590    }
2591
2592    fn create_dummy_transaction_list_with_proof(
2593        version: Version,
2594    ) -> TransactionListWithProof {
2595        TransactionListWithProof::new(
2596            vec![create_test_transaction()],
2597            None,
2598            Some(version),
2599            TransactionListProof::new_empty(),
2600        )
2601    }
2602
2603    fn create_chunk_response_message(
2604        response_ledger_info: ResponseLedgerInfo,
2605        transaction_list_with_proof: TransactionListWithProof,
2606    ) -> StateSyncMessage
2607    {
2608        let chunk_response = GetChunkResponse::new(
2609            response_ledger_info,
2610            transaction_list_with_proof,
2611        );
2612        StateSyncMessage::GetChunkResponse(Box::new(chunk_response))
2613    }
2614
2615    fn create_empty_chunk_responses(version: Version) -> Vec<StateSyncMessage> {
2616        create_chunk_responses(version, TransactionListWithProof::new_empty())
2617    }
2618
2619    fn create_non_empty_chunk_responses(
2620        version: Version,
2621    ) -> Vec<StateSyncMessage> {
2622        let transaction_list_with_proof =
2623            create_dummy_transaction_list_with_proof(version);
2624        create_chunk_responses(version, transaction_list_with_proof)
2625    }
2626
2627    /// Creates a set of chunk responses (one for each type of possible
2628    /// response). The returned response types are: [waypoint, target,
2629    /// highest].
2630    fn create_chunk_responses(
2631        version: Version, transaction_list_with_proof: TransactionListWithProof,
2632    ) -> Vec<StateSyncMessage> {
2633        let ledger_info_at_version = create_ledger_info_at_version(version);
2634
2635        // Create a waypoint chunk response
2636        let response_ledger_info = ResponseLedgerInfo::LedgerInfoForWaypoint {
2637            waypoint_li: ledger_info_at_version.clone(),
2638            end_of_epoch_li: None,
2639        };
2640        let waypoint_response = create_chunk_response_message(
2641            response_ledger_info,
2642            transaction_list_with_proof.clone(),
2643        );
2644
2645        // Create a highest chunk response
2646        let response_ledger_info = ResponseLedgerInfo::ProgressiveLedgerInfo {
2647            target_li: ledger_info_at_version.clone(),
2648            highest_li: None,
2649        };
2650        let highest_response = create_chunk_response_message(
2651            response_ledger_info,
2652            transaction_list_with_proof.clone(),
2653        );
2654
2655        // Create a target chunk response
2656        let response_ledger_info =
2657            ResponseLedgerInfo::VerifiableLedgerInfo(ledger_info_at_version);
2658        let target_response = create_chunk_response_message(
2659            response_ledger_info,
2660            transaction_list_with_proof,
2661        );
2662
2663        vec![waypoint_response, target_response, highest_response]
2664    }
2665
2666    fn verify_all_chunk_requests_are_invalid(
2667        coordinator: &mut StateSyncCoordinator<ExecutorProxy>,
2668        peer_network_id: &PeerNetworkId, requests: &[StateSyncMessage],
2669    )
2670    {
2671        for request in requests {
2672            let result = block_on(coordinator.process_chunk_message(
2673                peer_network_id.network_id(),
2674                peer_network_id.peer_id(),
2675                request.clone(),
2676            ));
2677            if !matches!(result, Err(Error::InvalidChunkRequest(..))) {
2678                panic!(
2679                    "Expected an invalid chunk request, but got: {:?}",
2680                    result
2681                );
2682            }
2683        }
2684    }
2685
2686    fn verify_all_chunk_responses_are_invalid(
2687        coordinator: &mut StateSyncCoordinator<ExecutorProxy>,
2688        peer_network_id: &PeerNetworkId, responses: &[StateSyncMessage],
2689    )
2690    {
2691        for response in responses {
2692            let result = block_on(coordinator.process_chunk_message(
2693                peer_network_id.network_id(),
2694                peer_network_id.peer_id(),
2695                response.clone(),
2696            ));
2697            if !matches!(result, Err(Error::ProcessInvalidChunk(..))) {
2698                panic!("Expected invalid chunk error, but got: {:?}", result);
2699            }
2700        }
2701    }
2702
2703    fn verify_all_chunk_responses_are_the_wrong_type(
2704        coordinator: &mut StateSyncCoordinator<ExecutorProxy>,
2705        peer_network_id: &PeerNetworkId, responses: &[StateSyncMessage],
2706    )
2707    {
2708        for response in responses {
2709            let result = block_on(coordinator.process_chunk_message(
2710                peer_network_id.network_id(),
2711                peer_network_id.peer_id(),
2712                response.clone(),
2713            ));
2714            if !matches!(result, Err(Error::ReceivedWrongChunkType(..))) {
2715                panic!("Expected wrong type error, but got: {:?}", result);
2716            }
2717        }
2718    }
2719
2720    fn process_new_peer_event(
2721        coordinator: &mut StateSyncCoordinator<ExecutorProxy>,
2722        peer: &PeerNetworkId,
2723    )
2724    {
2725        let connection_metadata = ConnectionMetadata::mock_with_role_and_origin(
2726            peer.peer_id(),
2727            PeerRole::Validator,
2728            ConnectionOrigin::Outbound,
2729        );
2730        let _ = coordinator
2731            .process_new_peer(peer.network_id(), connection_metadata);
2732    }
2733}
2734*/