1use crate::helpers::EpochQueue;
2use cfx_parameters::{
3 consensus::DEFERRED_STATE_EPOCH_COUNT,
4 consensus_internal::REWARD_EPOCH_COUNT,
5};
6use cfx_rpc_cfx_types::{traits::BlockProvider, PhantomBlock};
7use cfx_rpc_eth_api::EthPubSubApiServer;
8use cfx_rpc_eth_types::{
9 eth_pubsub::{Kind as SubscriptionKind, Params, Result as PubSubResult},
10 Header, Log,
11};
12use cfx_rpc_utils::error::jsonrpsee_error_helpers::internal_rpc_err;
13use cfx_tasks::TaskExecutor;
14use cfx_types::{Space, H256};
15use cfxcore::{
16 BlockDataManager, ConsensusGraph, Notifications, SharedConsensusGraph,
17};
18use futures::StreamExt;
19use jsonrpsee::{
20 core::SubscriptionResult, server::SubscriptionMessage, types::ErrorObject,
21 PendingSubscriptionSink, SubscriptionSink,
22};
23use log::{debug, error, info, trace, warn};
24use parking_lot::RwLock;
25use primitives::{
26 filter::LogFilter, log_entry::LocalizedLogEntry, BlockReceipts, EpochNumber,
27};
28use serde::Serialize;
29use std::{
30 collections::{HashMap, VecDeque},
31 iter::zip,
32 sync::Arc,
33 time::Duration,
34};
35use tokio::{sync::broadcast, time::sleep};
36use tokio_stream::{wrappers::BroadcastStream, Stream};
37
38const BROADCAST_CHANNEL_SIZE: usize = 1000;
39
40#[derive(Clone)]
41pub struct PubSubApi {
42 executor: TaskExecutor,
43 chain_data_provider: Arc<ChainDataProvider>,
44 notifications: Arc<Notifications>,
45 heads_loop_started: Arc<RwLock<bool>>,
46 head_sender: Arc<broadcast::Sender<Header>>,
47 log_loop_started: Arc<RwLock<HashMap<LogFilter, bool>>>,
48 log_senders: Arc<RwLock<HashMap<LogFilter, broadcast::Sender<Log>>>>,
49}
50
51impl PubSubApi {
52 pub fn new(
53 consensus: SharedConsensusGraph, notifications: Arc<Notifications>,
54 executor: TaskExecutor,
55 ) -> PubSubApi {
56 let (head_sender, _) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
57 let log_senders = Arc::new(RwLock::new(HashMap::new()));
58 let chain_data_provider =
59 Arc::new(ChainDataProvider::new(consensus.clone()));
60
61 PubSubApi {
62 executor,
63 notifications,
64 heads_loop_started: Arc::new(RwLock::new(false)),
65 head_sender: Arc::new(head_sender),
66 log_senders,
67 chain_data_provider,
68 log_loop_started: Arc::new(RwLock::new(HashMap::new())),
69 }
70 }
71
72 fn new_headers_stream(&self) -> impl Stream<Item = Header> {
73 let receiver = self.head_sender.subscribe();
74 BroadcastStream::new(receiver)
75 .filter(|item| {
76 let res = match item {
77 Ok(_) => true,
78 Err(_) => false, };
81 futures::future::ready(res)
82 })
83 .map(|item| item.expect("should not be an error"))
84 }
85
86 fn new_logs_stream(&self, filter: LogFilter) -> impl Stream<Item = Log> {
87 let receiver;
88 let senders = self.log_senders.read();
89 if !senders.contains_key(&filter) {
90 drop(senders);
91 let mut senders = self.log_senders.write();
92 let (tx, rx) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
93 senders.insert(filter, tx);
94 receiver = rx;
95 } else {
96 receiver = senders.get(&filter).unwrap().subscribe();
97 }
98
99 BroadcastStream::new(receiver)
100 .filter(|item| {
101 let res = match item {
102 Ok(_) => true,
103 Err(_) => false,
104 };
105 futures::future::ready(res)
106 })
107 .map(|item| item.expect("should not be an error"))
108 }
109
110 fn start_heads_loop(&self) {
111 let mut loop_started = self.heads_loop_started.write();
112 if *loop_started {
113 return;
114 }
115 *loop_started = true;
116
117 debug!("async start_headers_loop");
118
119 let mut receiver = self.notifications.epochs_ordered.subscribe();
121 let head_sender = self.head_sender.clone();
122 let chain_data_provider = self.chain_data_provider.clone();
124 let heads_loop_started = self.heads_loop_started.clone();
125
126 let fut = async move {
128 let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
131 (DEFERRED_STATE_EPOCH_COUNT - 1) as usize,
132 );
133
134 while let Some((epoch, hashes)) = receiver.recv().await {
135 debug!("epoch_loop: {:?}", (epoch, &hashes));
136 let (epoch, hashes) = match queue.push((epoch, hashes)) {
137 None => continue,
138 Some(e) => e,
139 };
140
141 let pivot = hashes.last().expect("empty epoch in pubsub");
143 chain_data_provider.wait_for_epoch(&pivot).await;
144
145 let header = chain_data_provider.get_pivot_block_header(epoch);
147 if let Some(header) = header {
148 let send_res = head_sender.send(header);
149 if send_res.is_err() {
150 let mut loop_started = heads_loop_started.write();
152 *loop_started = false;
153 return;
154 }
155 }
156 }
157 };
158
159 self.executor.spawn(fut);
160 }
161
162 fn start_logs_loop(&self, filter: LogFilter) {
163 let mut loop_started = self.log_loop_started.write();
164 if loop_started.contains_key(&filter) {
165 return;
166 }
167 loop_started.insert(filter.clone(), true);
168
169 let mut receiver = self.notifications.epochs_ordered.subscribe();
171 let senders = self.log_senders.read();
172 let tx = senders.get(&filter).unwrap().clone();
173
174 let chain_data_provider = self.chain_data_provider.clone();
176 let loop_started = self.log_loop_started.clone();
177
178 let fut = async move {
180 let mut last_epoch = 0;
181 let mut epochs: VecDeque<(u64, Vec<H256>, Vec<Log>)> =
182 VecDeque::new();
183 let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
186 (DEFERRED_STATE_EPOCH_COUNT - 1) as usize,
187 );
188
189 while let Some(epoch) = receiver.recv().await {
190 let epoch = match queue.push(epoch) {
191 None => continue,
192 Some(e) => e,
193 };
194
195 if epoch.0 <= last_epoch {
197 debug!("pivot chain reorg: {} -> {}", last_epoch, epoch.0);
198 assert!(epoch.0 > 0, "Unexpected epoch number received.");
199
200 let mut reverted = vec![];
201 while let Some(e) = epochs.back() {
202 if e.0 >= epoch.0 {
203 reverted.push(epochs.pop_back().unwrap());
204 } else {
205 break;
206 }
207 }
208
209 for (_, _, logs) in reverted.into_iter() {
210 for mut log in logs.into_iter() {
211 log.removed = true;
212 let send_res = tx.send(log);
214 if send_res.is_err() {
215 let mut loop_started = loop_started.write();
216 loop_started.remove(&filter);
217 return;
218 }
219 }
220 }
221 }
222
223 last_epoch = epoch.0;
224
225 let latest_finalized_epoch_number =
226 chain_data_provider.latest_finalized_epoch_number();
227 while let Some(e) = epochs.front() {
228 if e.0 < latest_finalized_epoch_number {
229 epochs.pop_front();
230 } else {
231 break;
232 }
233 }
234
235 let logs = chain_data_provider
236 .get_epoch_logs(&filter, epoch.clone(), false)
237 .await;
238 for log in logs.iter() {
239 let send_res = tx.send(log.clone());
240 if send_res.is_err() {
244 let mut loop_started = loop_started.write();
245 loop_started.remove(&filter);
246 return;
247 }
248 }
249 epochs.push_back((epoch.0, epoch.1, logs));
250 }
251 };
252
253 self.executor.spawn(fut);
254 }
255}
256
257#[async_trait::async_trait]
258impl EthPubSubApiServer for PubSubApi {
259 async fn subscribe(
260 &self, pending: PendingSubscriptionSink, kind: SubscriptionKind,
261 params: Option<Params>,
262 ) -> SubscriptionResult {
263 match (kind, params) {
264 (SubscriptionKind::NewHeads, None) => {
265 let sink = pending.accept().await?;
266 let stream = self
267 .new_headers_stream()
268 .map(|header| PubSubResult::Header(header));
269 self.executor.spawn(async move {
270 let _ = pipe_from_stream(sink, stream).await;
271 });
272
273 self.start_heads_loop();
275 Ok(())
276 }
277 (SubscriptionKind::NewHeads, _) => {
278 Err("Params should be empty".into())
280 }
281 (SubscriptionKind::Logs, None) => {
282 let mut filter = LogFilter::default();
283 filter.space = Space::Ethereum;
284
285 let sink = pending.accept().await?;
286 let stream = self
287 .new_logs_stream(filter.clone())
288 .map(|log| PubSubResult::Log(log));
289 self.executor.spawn(async {
290 let _ = pipe_from_stream(sink, stream).await;
291 });
292
293 self.start_logs_loop(filter);
295 Ok(())
296 }
297 (SubscriptionKind::Logs, Some(Params::Logs(filter))) => {
298 let filter = match filter
299 .into_primitive(self.chain_data_provider.as_ref())
300 {
301 Err(_e) => return Err("Invalid filter params".into()),
302 Ok(filter) => filter,
303 };
304 let stream = self
305 .new_logs_stream(filter.clone())
306 .map(|log| PubSubResult::Log(log));
307 let sink = pending.accept().await?;
308 self.executor.spawn(async {
309 let _ = pipe_from_stream(sink, stream).await;
310 });
311
312 self.start_logs_loop(filter);
314 Ok(())
315 }
316 (_, _) => {
317 Err("Not supported".into())
319 }
320 }
321 }
322}
323
324pub struct ChainDataProvider {
325 consensus: SharedConsensusGraph,
326 data_man: Arc<BlockDataManager>,
327}
328
329impl ChainDataProvider {
330 pub fn new(consensus: SharedConsensusGraph) -> ChainDataProvider {
331 let data_man = consensus.data_manager().clone();
332 ChainDataProvider {
333 consensus,
334 data_man,
335 }
336 }
337
338 fn latest_finalized_epoch_number(&self) -> u64 {
339 self.consensus.latest_finalized_epoch_number()
340 }
341
342 fn consensus_graph(&self) -> &ConsensusGraph { &self.consensus }
343
344 async fn get_epoch_logs(
345 &self, filter: &LogFilter, epoch: (u64, Vec<H256>), removed: bool,
346 ) -> Vec<Log> {
347 let logs = match self.retrieve_epoch_logs(epoch).await {
348 Some(logs) => logs,
349 None => return vec![],
350 };
351
352 let logs = logs
354 .iter()
355 .filter(|l| filter.matches(&l.entry))
356 .cloned()
357 .map(|l| Log::try_from_localized(l, self, removed))
358 .filter(|l| l.is_ok())
359 .map(|l| l.unwrap())
360 .collect();
361
362 return logs;
363 }
364
365 async fn wait_for_epoch(&self, pivot: &H256) -> Option<Arc<BlockReceipts>> {
366 self.retrieve_block_receipts(&pivot, &pivot).await
367 }
368
369 fn get_pivot_block_header(&self, epoch: u64) -> Option<Header> {
370 let phantom_block = {
371 let _inner = self.consensus_graph().inner.read();
373 let block = self.consensus_graph().get_phantom_block_by_number(
374 EpochNumber::Number(epoch),
375 None,
376 false,
377 );
378
379 let pb = match block {
380 Err(e) => {
381 debug!("Invalid params {:?}", e);
382 None
383 }
384 Ok(pb) => pb,
385 };
386
387 pb
388 };
389
390 phantom_block.map(|b| Header::from_phantom(&b))
391 }
392
393 async fn retrieve_block_receipts(
398 &self, block: &H256, pivot: &H256,
399 ) -> Option<Arc<BlockReceipts>> {
400 info!("eth pubsub retrieve_block_receipts");
401 const POLL_INTERVAL_MS: Duration = Duration::from_millis(100);
402 let epoch = self.data_man.block_height_by_hash(pivot)?;
403
404 for ii in 0.. {
412 let latest = self.consensus.best_epoch_number();
413 match self.data_man.block_execution_result_by_hash_with_epoch(
414 &block, &pivot, false, false, ) {
417 Some(res) => return Some(res.block_receipts.clone()),
418 None => {
419 trace!("Cannot find receipts with {:?}/{:?}", block, pivot);
420 let _ = sleep(POLL_INTERVAL_MS).await;
421 }
422 }
423
424 if ii > 1000 {
426 error!("Cannot find receipts with {:?}/{:?}", block, pivot);
427 return None;
428 } else {
429 if latest
430 > epoch + DEFERRED_STATE_EPOCH_COUNT + REWARD_EPOCH_COUNT
431 {
432 warn!(
436 "Cannot find receipts with {:?}/{:?}, latest_epoch={}",
437 block, pivot, latest
438 );
439 return None;
440 }
441 }
442 }
443
444 unreachable!()
445 }
446
447 async fn get_phantom_block(
448 &self, epoch: u64, pivot: H256,
449 ) -> Option<PhantomBlock> {
450 debug!("eth pubsub get_phantom_block");
451 const POLL_INTERVAL_MS: Duration = Duration::from_millis(100);
452
453 for ii in 0.. {
454 let latest = self.consensus.best_epoch_number();
455 match self.consensus_graph().get_phantom_block_by_number(
456 EpochNumber::Number(epoch),
457 Some(pivot),
458 false, ) {
460 Ok(Some(b)) => return Some(b),
461 Ok(None) => {
462 error!("Block not executed yet {:?}", pivot);
463 let _ = sleep(POLL_INTERVAL_MS).await;
464 }
465 Err(e) => {
466 error!("get_phantom_block_by_number failed {}", e);
467 return None;
468 }
469 };
470
471 if ii > 1000 {
473 error!("Cannot construct phantom block for {:?}", pivot);
474 return None;
475 } else {
476 if latest
477 > epoch + DEFERRED_STATE_EPOCH_COUNT + REWARD_EPOCH_COUNT
478 {
479 warn!(
482 "Cannot onstruct phantom block for {:?}, latest_epoch={}",
483 pivot, latest
484 );
485 return None;
486 }
487 }
488 }
489
490 unreachable!()
491 }
492
493 async fn retrieve_epoch_logs(
494 &self, epoch: (u64, Vec<H256>),
495 ) -> Option<Vec<LocalizedLogEntry>> {
496 info!("eth pubsub retrieve_epoch_logs");
497 let (epoch_number, hashes) = epoch;
498 let pivot = hashes.last().cloned().expect("epoch should not be empty");
499
500 let pb = self.get_phantom_block(epoch_number, pivot).await?;
501
502 let mut logs = vec![];
503 let mut log_index = 0;
504
505 let txs = &pb.transactions;
506 assert_eq!(pb.receipts.len(), txs.len());
507
508 for (txid, (receipt, tx)) in zip(&pb.receipts, txs).enumerate() {
510 let eth_logs: Vec<_> = receipt
511 .logs
512 .iter()
513 .cloned()
514 .filter(|l| l.space == Space::Ethereum)
515 .collect();
516
517 for (logid, entry) in eth_logs.into_iter().enumerate() {
518 logs.push(LocalizedLogEntry {
519 entry,
520 block_hash: pivot,
521 epoch_number,
522 block_timestamp: Some(pb.pivot_header.timestamp()),
523 transaction_hash: tx.hash,
524 transaction_index: txid,
525 log_index,
526 transaction_log_index: logid,
527 });
528
529 log_index += 1;
530 }
531 }
532
533 Some(logs)
534 }
535}
536
537impl BlockProvider for &ChainDataProvider {
538 fn get_block_epoch_number(&self, hash: &H256) -> Option<u64> {
539 self.consensus.get_block_epoch_number(hash)
540 }
541
542 fn get_block_hashes_by_epoch(
543 &self, epoch_number: EpochNumber,
544 ) -> Result<Vec<H256>, String> {
545 self.consensus
546 .get_block_hashes_by_epoch(epoch_number)
547 .map_err(|e| e.to_string())
548 }
549}
550
551#[derive(Debug, thiserror::Error)]
553#[error("Failed to serialize subscription item: {0}")]
554pub struct SubscriptionSerializeError(#[from] serde_json::Error);
555
556impl SubscriptionSerializeError {
557 const fn new(err: serde_json::Error) -> Self { Self(err) }
558}
559
560impl From<SubscriptionSerializeError> for ErrorObject<'static> {
561 fn from(value: SubscriptionSerializeError) -> Self {
562 internal_rpc_err(value.to_string())
563 }
564}
565
566async fn pipe_from_stream<T, St>(
569 sink: SubscriptionSink, mut stream: St,
570) -> Result<(), ErrorObject<'static>>
571where
572 St: Stream<Item = T> + Unpin,
573 T: Serialize,
574{
575 loop {
576 tokio::select! {
577 _ = sink.closed() => {
578 break Ok(())
580 },
581 maybe_item = stream.next() => {
582 let item = match maybe_item {
583 Some(item) => item,
584 None => {
585 break Ok(())
587 },
588 };
589 let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item).map_err(SubscriptionSerializeError::new)?;
590 if sink.send(msg).await.is_err() {
591 break Ok(());
592 }
593 }
594 }
595 }
596}