1use crate::helpers::EpochQueue;
2use cfx_parameters::{
3 consensus::DEFERRED_STATE_EPOCH_COUNT,
4 consensus_internal::REWARD_EPOCH_COUNT,
5};
6use cfx_rpc_cfx_impl::helpers::subscribers::pipe_from_stream;
7use cfx_rpc_cfx_types::{traits::BlockProvider, PhantomBlock};
8use cfx_rpc_eth_api::EthPubSubApiServer;
9use cfx_rpc_eth_types::{
10 eth_pubsub::{Kind as SubscriptionKind, Params, Result as PubSubResult},
11 Header, Log,
12};
13use cfx_tasks::TaskExecutor;
14use cfx_types::{Space, H256};
15use cfxcore::{
16 BlockDataManager, ConsensusGraph, Notifications, SharedConsensusGraph,
17};
18use futures::StreamExt;
19use jsonrpsee::{core::SubscriptionResult, PendingSubscriptionSink};
20use log::{debug, error, info, trace, warn};
21use parking_lot::RwLock;
22use primitives::{
23 filter::LogFilter, log_entry::LocalizedLogEntry, BlockReceipts, EpochNumber,
24};
25use std::{
26 collections::{HashMap, VecDeque},
27 iter::zip,
28 sync::Arc,
29 time::Duration,
30};
31use tokio::{sync::broadcast, time::sleep};
32use tokio_stream::{wrappers::BroadcastStream, Stream};
33
34const BROADCAST_CHANNEL_SIZE: usize = 1000;
35
36#[derive(Clone)]
37pub struct PubSubApi {
38 executor: TaskExecutor,
39 chain_data_provider: Arc<ChainDataProvider>,
40 notifications: Arc<Notifications>,
41 heads_loop_started: Arc<RwLock<bool>>,
42 head_sender: Arc<broadcast::Sender<Header>>,
43 log_loop_started: Arc<RwLock<HashMap<LogFilter, bool>>>,
44 log_senders: Arc<RwLock<HashMap<LogFilter, broadcast::Sender<Log>>>>,
45}
46
47impl PubSubApi {
48 pub fn new(
49 consensus: SharedConsensusGraph, notifications: Arc<Notifications>,
50 executor: TaskExecutor,
51 ) -> PubSubApi {
52 let (head_sender, _) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
53 let log_senders = Arc::new(RwLock::new(HashMap::new()));
54 let chain_data_provider =
55 Arc::new(ChainDataProvider::new(consensus.clone()));
56
57 PubSubApi {
58 executor,
59 notifications,
60 heads_loop_started: Arc::new(RwLock::new(false)),
61 head_sender: Arc::new(head_sender),
62 log_senders,
63 chain_data_provider,
64 log_loop_started: Arc::new(RwLock::new(HashMap::new())),
65 }
66 }
67
68 fn new_headers_stream(&self) -> impl Stream<Item = Header> {
69 let receiver = self.head_sender.subscribe();
70 BroadcastStream::new(receiver)
71 .filter(|item| {
72 let res = match item {
73 Ok(_) => true,
74 Err(_) => false, };
77 futures::future::ready(res)
78 })
79 .map(|item| item.expect("should not be an error"))
80 }
81
82 fn new_logs_stream(&self, filter: LogFilter) -> impl Stream<Item = Log> {
83 let receiver;
84 let senders = self.log_senders.read();
85 if !senders.contains_key(&filter) {
86 drop(senders);
87 let mut senders = self.log_senders.write();
88 let (tx, rx) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
89 senders.insert(filter, tx);
90 receiver = rx;
91 } else {
92 receiver = senders.get(&filter).unwrap().subscribe();
93 }
94
95 BroadcastStream::new(receiver)
96 .filter(|item| {
97 let res = match item {
98 Ok(_) => true,
99 Err(_) => false,
100 };
101 futures::future::ready(res)
102 })
103 .map(|item| item.expect("should not be an error"))
104 }
105
106 fn start_heads_loop(&self) {
107 let mut loop_started = self.heads_loop_started.write();
108 if *loop_started {
109 return;
110 }
111 *loop_started = true;
112
113 debug!("async start_headers_loop");
114
115 let mut receiver = self.notifications.epochs_ordered.subscribe();
117 let head_sender = self.head_sender.clone();
118 let chain_data_provider = self.chain_data_provider.clone();
120 let heads_loop_started = self.heads_loop_started.clone();
121
122 let fut = async move {
124 let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
127 (DEFERRED_STATE_EPOCH_COUNT - 1) as usize,
128 );
129
130 while let Some((epoch, hashes)) = receiver.recv().await {
131 debug!("epoch_loop: {:?}", (epoch, &hashes));
132 let (epoch, hashes) = match queue.push((epoch, hashes)) {
133 None => continue,
134 Some(e) => e,
135 };
136
137 let pivot = hashes.last().expect("empty epoch in pubsub");
139 chain_data_provider.wait_for_epoch(&pivot).await;
140
141 let header = chain_data_provider.get_pivot_block_header(epoch);
143 if let Some(header) = header {
144 let send_res = head_sender.send(header);
145 if send_res.is_err() {
146 let mut loop_started = heads_loop_started.write();
148 *loop_started = false;
149 return;
150 }
151 }
152 }
153 };
154
155 self.executor.spawn(fut);
156 }
157
158 fn start_logs_loop(&self, filter: LogFilter) {
159 let mut loop_started = self.log_loop_started.write();
160 if loop_started.contains_key(&filter) {
161 return;
162 }
163 loop_started.insert(filter.clone(), true);
164
165 let mut receiver = self.notifications.epochs_ordered.subscribe();
167 let senders = self.log_senders.read();
168 let tx = senders.get(&filter).unwrap().clone();
169
170 let chain_data_provider = self.chain_data_provider.clone();
172 let loop_started = self.log_loop_started.clone();
173
174 let fut = async move {
176 let mut last_epoch = 0;
177 let mut epochs: VecDeque<(u64, Vec<H256>, Vec<Log>)> =
178 VecDeque::new();
179 let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
182 (DEFERRED_STATE_EPOCH_COUNT - 1) as usize,
183 );
184
185 while let Some(epoch) = receiver.recv().await {
186 let epoch = match queue.push(epoch) {
187 None => continue,
188 Some(e) => e,
189 };
190
191 if epoch.0 <= last_epoch {
193 debug!("pivot chain reorg: {} -> {}", last_epoch, epoch.0);
194 assert!(epoch.0 > 0, "Unexpected epoch number received.");
195
196 let mut reverted = vec![];
197 while let Some(e) = epochs.back() {
198 if e.0 >= epoch.0 {
199 reverted.push(epochs.pop_back().unwrap());
200 } else {
201 break;
202 }
203 }
204
205 for (_, _, logs) in reverted.into_iter() {
206 for mut log in logs.into_iter() {
207 log.removed = true;
208 let send_res = tx.send(log);
210 if send_res.is_err() {
211 let mut loop_started = loop_started.write();
212 loop_started.remove(&filter);
213 return;
214 }
215 }
216 }
217 }
218
219 last_epoch = epoch.0;
220
221 let latest_finalized_epoch_number =
222 chain_data_provider.latest_finalized_epoch_number();
223 while let Some(e) = epochs.front() {
224 if e.0 < latest_finalized_epoch_number {
225 epochs.pop_front();
226 } else {
227 break;
228 }
229 }
230
231 let logs = chain_data_provider
232 .get_epoch_logs(&filter, epoch.clone(), false)
233 .await;
234 for log in logs.iter() {
235 let send_res = tx.send(log.clone());
236 if send_res.is_err() {
240 let mut loop_started = loop_started.write();
241 loop_started.remove(&filter);
242 return;
243 }
244 }
245 epochs.push_back((epoch.0, epoch.1, logs));
246 }
247 };
248
249 self.executor.spawn(fut);
250 }
251}
252
253#[async_trait::async_trait]
254impl EthPubSubApiServer for PubSubApi {
255 async fn subscribe(
256 &self, pending: PendingSubscriptionSink, kind: SubscriptionKind,
257 params: Option<Params>,
258 ) -> SubscriptionResult {
259 match (kind, params) {
260 (SubscriptionKind::NewHeads, None) => {
261 let sink = pending.accept().await?;
262 let stream = self
263 .new_headers_stream()
264 .map(|header| PubSubResult::Header(header));
265 self.executor.spawn(async move {
266 let _ = pipe_from_stream(sink, stream).await;
267 });
268
269 self.start_heads_loop();
271 Ok(())
272 }
273 (SubscriptionKind::NewHeads, _) => {
274 Err("Params should be empty".into())
276 }
277 (SubscriptionKind::Logs, None) => {
278 let mut filter = LogFilter::default();
279 filter.space = Space::Ethereum;
280
281 let sink = pending.accept().await?;
282 let stream = self
283 .new_logs_stream(filter.clone())
284 .map(|log| PubSubResult::Log(log));
285 self.executor.spawn(async {
286 let _ = pipe_from_stream(sink, stream).await;
287 });
288
289 self.start_logs_loop(filter);
291 Ok(())
292 }
293 (SubscriptionKind::Logs, Some(Params::Logs(filter))) => {
294 let filter = match filter
295 .into_primitive(self.chain_data_provider.as_ref())
296 {
297 Err(_e) => return Err("Invalid filter params".into()),
298 Ok(filter) => filter,
299 };
300 let stream = self
301 .new_logs_stream(filter.clone())
302 .map(|log| PubSubResult::Log(log));
303 let sink = pending.accept().await?;
304 self.executor.spawn(async {
305 let _ = pipe_from_stream(sink, stream).await;
306 });
307
308 self.start_logs_loop(filter);
310 Ok(())
311 }
312 (_, _) => {
313 Err("Not supported".into())
315 }
316 }
317 }
318}
319
320pub struct ChainDataProvider {
321 consensus: SharedConsensusGraph,
322 data_man: Arc<BlockDataManager>,
323}
324
325impl ChainDataProvider {
326 pub fn new(consensus: SharedConsensusGraph) -> ChainDataProvider {
327 let data_man = consensus.data_manager().clone();
328 ChainDataProvider {
329 consensus,
330 data_man,
331 }
332 }
333
334 fn latest_finalized_epoch_number(&self) -> u64 {
335 self.consensus.latest_finalized_epoch_number()
336 }
337
338 fn consensus_graph(&self) -> &ConsensusGraph { &self.consensus }
339
340 async fn get_epoch_logs(
341 &self, filter: &LogFilter, epoch: (u64, Vec<H256>), removed: bool,
342 ) -> Vec<Log> {
343 let logs = match self.retrieve_epoch_logs(epoch).await {
344 Some(logs) => logs,
345 None => return vec![],
346 };
347
348 let logs = logs
350 .iter()
351 .filter(|l| filter.matches(&l.entry))
352 .cloned()
353 .map(|l| Log::try_from_localized(l, self, removed))
354 .filter(|l| l.is_ok())
355 .map(|l| l.unwrap())
356 .collect();
357
358 return logs;
359 }
360
361 async fn wait_for_epoch(&self, pivot: &H256) -> Option<Arc<BlockReceipts>> {
362 self.retrieve_block_receipts(&pivot, &pivot).await
363 }
364
365 fn get_pivot_block_header(&self, epoch: u64) -> Option<Header> {
366 let phantom_block = {
367 let _inner = self.consensus_graph().inner.read();
369 let block = self.consensus_graph().get_phantom_block_by_number(
370 EpochNumber::Number(epoch),
371 None,
372 false,
373 );
374
375 let pb = match block {
376 Err(e) => {
377 debug!("Invalid params {:?}", e);
378 None
379 }
380 Ok(pb) => pb,
381 };
382
383 pb
384 };
385
386 phantom_block.map(|b| Header::from_phantom(&b))
387 }
388
389 async fn retrieve_block_receipts(
394 &self, block: &H256, pivot: &H256,
395 ) -> Option<Arc<BlockReceipts>> {
396 info!("eth pubsub retrieve_block_receipts");
397 const POLL_INTERVAL_MS: Duration = Duration::from_millis(100);
398 let epoch = self.data_man.block_height_by_hash(pivot)?;
399
400 for ii in 0.. {
408 let latest = self.consensus.best_epoch_number();
409 match self.data_man.block_execution_result_by_hash_with_epoch(
410 &block, &pivot, false, false, ) {
413 Some(res) => return Some(res.block_receipts.clone()),
414 None => {
415 trace!("Cannot find receipts with {:?}/{:?}", block, pivot);
416 let _ = sleep(POLL_INTERVAL_MS).await;
417 }
418 }
419
420 if ii > 1000 {
422 error!("Cannot find receipts with {:?}/{:?}", block, pivot);
423 return None;
424 } else {
425 if latest
426 > epoch + DEFERRED_STATE_EPOCH_COUNT + REWARD_EPOCH_COUNT
427 {
428 warn!(
432 "Cannot find receipts with {:?}/{:?}, latest_epoch={}",
433 block, pivot, latest
434 );
435 return None;
436 }
437 }
438 }
439
440 unreachable!()
441 }
442
443 async fn get_phantom_block(
444 &self, epoch: u64, pivot: H256,
445 ) -> Option<PhantomBlock> {
446 debug!("eth pubsub get_phantom_block");
447 const POLL_INTERVAL_MS: Duration = Duration::from_millis(100);
448
449 for ii in 0.. {
450 let latest = self.consensus.best_epoch_number();
451 match self.consensus_graph().get_phantom_block_by_number(
452 EpochNumber::Number(epoch),
453 Some(pivot),
454 false, ) {
456 Ok(Some(b)) => return Some(b),
457 Ok(None) => {
458 error!("Block not executed yet {:?}", pivot);
459 let _ = sleep(POLL_INTERVAL_MS).await;
460 }
461 Err(e) => {
462 error!("get_phantom_block_by_number failed {}", e);
463 return None;
464 }
465 };
466
467 if ii > 1000 {
469 error!("Cannot construct phantom block for {:?}", pivot);
470 return None;
471 } else {
472 if latest
473 > epoch + DEFERRED_STATE_EPOCH_COUNT + REWARD_EPOCH_COUNT
474 {
475 warn!(
478 "Cannot onstruct phantom block for {:?}, latest_epoch={}",
479 pivot, latest
480 );
481 return None;
482 }
483 }
484 }
485
486 unreachable!()
487 }
488
489 async fn retrieve_epoch_logs(
490 &self, epoch: (u64, Vec<H256>),
491 ) -> Option<Vec<LocalizedLogEntry>> {
492 info!("eth pubsub retrieve_epoch_logs");
493 let (epoch_number, hashes) = epoch;
494 let pivot = hashes.last().cloned().expect("epoch should not be empty");
495
496 let pb = self.get_phantom_block(epoch_number, pivot).await?;
497
498 let mut logs = vec![];
499 let mut log_index = 0;
500
501 let txs = &pb.transactions;
502 assert_eq!(pb.receipts.len(), txs.len());
503
504 for (txid, (receipt, tx)) in zip(&pb.receipts, txs).enumerate() {
506 let eth_logs: Vec<_> = receipt
507 .logs
508 .iter()
509 .cloned()
510 .filter(|l| l.space == Space::Ethereum)
511 .collect();
512
513 for (logid, entry) in eth_logs.into_iter().enumerate() {
514 logs.push(LocalizedLogEntry {
515 entry,
516 block_hash: pivot,
517 epoch_number,
518 block_timestamp: Some(pb.pivot_header.timestamp()),
519 transaction_hash: tx.hash,
520 transaction_index: txid,
521 log_index,
522 transaction_log_index: logid,
523 });
524
525 log_index += 1;
526 }
527 }
528
529 Some(logs)
530 }
531}
532
533impl BlockProvider for &ChainDataProvider {
534 fn get_block_epoch_number(&self, hash: &H256) -> Option<u64> {
535 self.consensus.get_block_epoch_number(hash)
536 }
537
538 fn get_block_hashes_by_epoch(
539 &self, epoch_number: EpochNumber,
540 ) -> Result<Vec<H256>, String> {
541 self.consensus
542 .get_block_hashes_by_epoch(epoch_number)
543 .map_err(|e| e.to_string())
544 }
545}