use crate::pos::{
mempool::CommitResponse,
state_sync::{counters, error::Error, shared_components::SyncState},
};
use diem_logger::prelude::*;
use diem_types::{
contract_event::ContractEvent, ledger_info::LedgerInfoWithSignatures,
transaction::Transaction,
};
use futures::{
channel::{mpsc, oneshot},
future::Future,
SinkExt,
};
use std::time::{Duration, SystemTime};
use tokio::time::timeout;
pub struct SyncRequest {
pub callback: oneshot::Sender<Result<(), Error>>,
pub last_commit_timestamp: SystemTime,
pub target: LedgerInfoWithSignatures,
}
pub struct CommitNotification {
pub callback: oneshot::Sender<Result<CommitResponse, Error>>,
pub committed_transactions: Vec<Transaction>,
pub reconfiguration_events: Vec<ContractEvent>,
}
pub enum CoordinatorMessage {
SyncRequest(Box<SyncRequest>), CommitNotification(Box<CommitNotification>), GetSyncState(oneshot::Sender<SyncState>), WaitForInitialization(oneshot::Sender<Result<(), Error>>), }
pub struct StateSyncClient {
coordinator_sender: mpsc::UnboundedSender<CoordinatorMessage>,
commit_timeout_ms: u64,
}
impl StateSyncClient {
pub fn new(
coordinator_sender: mpsc::UnboundedSender<CoordinatorMessage>,
commit_timeout_ms: u64,
) -> Self {
Self {
coordinator_sender,
commit_timeout_ms,
}
}
pub fn sync_to(
&self, target: LedgerInfoWithSignatures,
) -> impl Future<Output = Result<(), Error>> {
let mut sender = self.coordinator_sender.clone();
let (cb_sender, cb_receiver) = oneshot::channel();
let request = SyncRequest {
callback: cb_sender,
target,
last_commit_timestamp: SystemTime::now(),
};
async move {
sender
.send(CoordinatorMessage::SyncRequest(Box::new(request)))
.await?;
cb_receiver.await?
}
}
pub fn commit(
&self, committed_txns: Vec<Transaction>,
reconfig_events: Vec<ContractEvent>,
) -> impl Future<Output = Result<(), Error>> {
let mut sender = self.coordinator_sender.clone();
let (cb_sender, cb_receiver) = oneshot::channel();
let commit_timeout_ms = self.commit_timeout_ms;
let notification = CommitNotification {
callback: cb_sender,
committed_transactions: committed_txns,
reconfiguration_events: reconfig_events,
};
diem_debug!(
"state_sync::commit: {} reconfig events",
notification.reconfiguration_events.len()
);
async move {
sender
.send(CoordinatorMessage::CommitNotification(Box::new(
notification,
)))
.await?;
match timeout(Duration::from_millis(commit_timeout_ms), cb_receiver)
.await
{
Err(_) => {
counters::COMMIT_FLOW_FAIL
.with_label_values(&[counters::STATE_SYNC_LABEL])
.inc();
Err(Error::UnexpectedError(
"State sync client timeout: failed to receive commit() ack in time!".into(),
))
}
Ok(response) => {
let response = response??; if response.success {
Ok(())
} else {
Err(Error::UnexpectedError(format!(
"State sync client failed: commit() returned an error: {:?}",
response.error_message
)))
}
}
}
}
}
pub fn get_state(&self) -> impl Future<Output = Result<SyncState, Error>> {
let mut sender = self.coordinator_sender.clone();
let (cb_sender, cb_receiver) = oneshot::channel();
async move {
sender
.send(CoordinatorMessage::GetSyncState(cb_sender))
.await?;
cb_receiver.await.map_err(|error| error.into())
}
}
pub fn wait_until_initialized(
&self,
) -> impl Future<Output = Result<(), Error>> {
let mut sender = self.coordinator_sender.clone();
let (cb_sender, cb_receiver) = oneshot::channel();
async move {
sender
.send(CoordinatorMessage::WaitForInitialization(cb_sender))
.await?;
cb_receiver.await?
}
}
}