cfxcore/pos/state_sync/
client.rs1use crate::pos::{
9 mempool::CommitResponse,
10 state_sync::{counters, error::Error, shared_components::SyncState},
11};
12use diem_logger::prelude::*;
13use diem_types::{
14 contract_event::ContractEvent, ledger_info::LedgerInfoWithSignatures,
15 transaction::Transaction,
16};
17use futures::{
18 channel::{mpsc, oneshot},
19 future::Future,
20 SinkExt,
21};
22use std::time::{Duration, SystemTime};
23use tokio::time::timeout;
24
25pub struct SyncRequest {
27 pub callback: oneshot::Sender<Result<(), Error>>,
28 pub last_commit_timestamp: SystemTime,
29 pub target: LedgerInfoWithSignatures,
30}
31
32pub struct CommitNotification {
34 pub callback: oneshot::Sender<Result<CommitResponse, Error>>,
35 pub committed_transactions: Vec<Transaction>,
36 pub reconfiguration_events: Vec<ContractEvent>,
37}
38
39pub enum CoordinatorMessage {
42 SyncRequest(Box<SyncRequest>), CommitNotification(Box<CommitNotification>), GetSyncState(oneshot::Sender<SyncState>), WaitForInitialization(oneshot::Sender<Result<(), Error>>), }
49
50pub struct StateSyncClient {
52 coordinator_sender: mpsc::UnboundedSender<CoordinatorMessage>,
53
54 commit_timeout_ms: u64,
57}
58
59impl StateSyncClient {
60 pub fn new(
61 coordinator_sender: mpsc::UnboundedSender<CoordinatorMessage>,
62 commit_timeout_ms: u64,
63 ) -> Self {
64 Self {
65 coordinator_sender,
66 commit_timeout_ms,
67 }
68 }
69
70 pub fn sync_to(
74 &self, target: LedgerInfoWithSignatures,
75 ) -> impl Future<Output = Result<(), Error>> {
76 let mut sender = self.coordinator_sender.clone();
77 let (cb_sender, cb_receiver) = oneshot::channel();
78 let request = SyncRequest {
79 callback: cb_sender,
80 target,
81 last_commit_timestamp: SystemTime::now(),
82 };
83
84 async move {
85 sender
86 .send(CoordinatorMessage::SyncRequest(Box::new(request)))
87 .await?;
88 cb_receiver.await?
89 }
90 }
91
92 pub fn commit(
94 &self, committed_txns: Vec<Transaction>,
95 reconfig_events: Vec<ContractEvent>,
96 ) -> impl Future<Output = Result<(), Error>> {
97 let mut sender = self.coordinator_sender.clone();
98 let (cb_sender, cb_receiver) = oneshot::channel();
99
100 let commit_timeout_ms = self.commit_timeout_ms;
101 let notification = CommitNotification {
102 callback: cb_sender,
103 committed_transactions: committed_txns,
104 reconfiguration_events: reconfig_events,
105 };
106 diem_debug!(
107 "state_sync::commit: {} reconfig events",
108 notification.reconfiguration_events.len()
109 );
110
111 async move {
112 sender
113 .send(CoordinatorMessage::CommitNotification(Box::new(
114 notification,
115 )))
116 .await?;
117
118 match timeout(Duration::from_millis(commit_timeout_ms), cb_receiver)
119 .await
120 {
121 Err(_) => {
122 counters::COMMIT_FLOW_FAIL
123 .with_label_values(&[counters::STATE_SYNC_LABEL])
124 .inc();
125 Err(Error::UnexpectedError(
126 "State sync client timeout: failed to receive commit() ack in time!".into(),
127 ))
128 }
129 Ok(response) => {
130 let response = response??; if response.success {
132 Ok(())
133 } else {
134 Err(Error::UnexpectedError(format!(
135 "State sync client failed: commit() returned an error: {:?}",
136 response.error_message
137 )))
138 }
139 }
140 }
141 }
142 }
143
144 pub fn get_state(&self) -> impl Future<Output = Result<SyncState, Error>> {
148 let mut sender = self.coordinator_sender.clone();
149 let (cb_sender, cb_receiver) = oneshot::channel();
150
151 async move {
152 sender
153 .send(CoordinatorMessage::GetSyncState(cb_sender))
154 .await?;
155 cb_receiver.await.map_err(|error| error.into())
156 }
157 }
158
159 pub fn wait_until_initialized(
162 &self,
163 ) -> impl Future<Output = Result<(), Error>> {
164 let mut sender = self.coordinator_sender.clone();
165 let (cb_sender, cb_receiver) = oneshot::channel();
166
167 async move {
168 sender
169 .send(CoordinatorMessage::WaitForInitialization(cb_sender))
170 .await?;
171 cb_receiver.await?
172 }
173 }
174}