cfx_rpc/
pubsub.rs

1use crate::helpers::EpochQueue;
2use cfx_parameters::{
3    consensus::DEFERRED_STATE_EPOCH_COUNT,
4    consensus_internal::REWARD_EPOCH_COUNT,
5};
6use cfx_rpc_cfx_types::{traits::BlockProvider, PhantomBlock};
7use cfx_rpc_eth_api::EthPubSubApiServer;
8use cfx_rpc_eth_types::{
9    eth_pubsub::{Kind as SubscriptionKind, Params, Result as PubSubResult},
10    Header, Log,
11};
12use cfx_rpc_utils::error::jsonrpsee_error_helpers::internal_rpc_err;
13use cfx_tasks::TaskExecutor;
14use cfx_types::{Space, H256};
15use cfxcore::{
16    BlockDataManager, ConsensusGraph, Notifications, SharedConsensusGraph,
17};
18use futures::StreamExt;
19use jsonrpsee::{
20    core::SubscriptionResult, server::SubscriptionMessage, types::ErrorObject,
21    PendingSubscriptionSink, SubscriptionSink,
22};
23use log::{debug, error, info, trace, warn};
24use parking_lot::RwLock;
25use primitives::{
26    filter::LogFilter, log_entry::LocalizedLogEntry, BlockReceipts, EpochNumber,
27};
28use serde::Serialize;
29use std::{
30    collections::{HashMap, VecDeque},
31    iter::zip,
32    sync::Arc,
33    time::Duration,
34};
35use tokio::{sync::broadcast, time::sleep};
36use tokio_stream::{wrappers::BroadcastStream, Stream};
37
38const BROADCAST_CHANNEL_SIZE: usize = 1000;
39
40#[derive(Clone)]
41pub struct PubSubApi {
42    executor: TaskExecutor,
43    chain_data_provider: Arc<ChainDataProvider>,
44    notifications: Arc<Notifications>,
45    heads_loop_started: Arc<RwLock<bool>>,
46    head_sender: Arc<broadcast::Sender<Header>>,
47    log_loop_started: Arc<RwLock<HashMap<LogFilter, bool>>>,
48    log_senders: Arc<RwLock<HashMap<LogFilter, broadcast::Sender<Log>>>>,
49}
50
51impl PubSubApi {
52    pub fn new(
53        consensus: SharedConsensusGraph, notifications: Arc<Notifications>,
54        executor: TaskExecutor,
55    ) -> PubSubApi {
56        let (head_sender, _) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
57        let log_senders = Arc::new(RwLock::new(HashMap::new()));
58        let chain_data_provider =
59            Arc::new(ChainDataProvider::new(consensus.clone()));
60
61        PubSubApi {
62            executor,
63            notifications,
64            heads_loop_started: Arc::new(RwLock::new(false)),
65            head_sender: Arc::new(head_sender),
66            log_senders,
67            chain_data_provider,
68            log_loop_started: Arc::new(RwLock::new(HashMap::new())),
69        }
70    }
71
72    fn new_headers_stream(&self) -> impl Stream<Item = Header> {
73        let receiver = self.head_sender.subscribe();
74        BroadcastStream::new(receiver)
75            .filter(|item| {
76                let res = match item {
77                    Ok(_) => true,
78                    Err(_) => false, /* there are two types of errors: closed
79                                      * and lagged, mainly lagged */
80                };
81                futures::future::ready(res)
82            })
83            .map(|item| item.expect("should not be an error"))
84    }
85
86    fn new_logs_stream(&self, filter: LogFilter) -> impl Stream<Item = Log> {
87        let receiver;
88        let senders = self.log_senders.read();
89        if !senders.contains_key(&filter) {
90            drop(senders);
91            let mut senders = self.log_senders.write();
92            let (tx, rx) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
93            senders.insert(filter, tx);
94            receiver = rx;
95        } else {
96            receiver = senders.get(&filter).unwrap().subscribe();
97        }
98
99        BroadcastStream::new(receiver)
100            .filter(|item| {
101                let res = match item {
102                    Ok(_) => true,
103                    Err(_) => false,
104                };
105                futures::future::ready(res)
106            })
107            .map(|item| item.expect("should not be an error"))
108    }
109
110    fn start_heads_loop(&self) {
111        let mut loop_started = self.heads_loop_started.write();
112        if *loop_started {
113            return;
114        }
115        *loop_started = true;
116
117        debug!("async start_headers_loop");
118
119        // subscribe to the `epochs_ordered` channel
120        let mut receiver = self.notifications.epochs_ordered.subscribe();
121        let head_sender = self.head_sender.clone();
122        // clone everything we use in our async loop
123        let chain_data_provider = self.chain_data_provider.clone();
124        let heads_loop_started = self.heads_loop_started.clone();
125
126        // loop asynchronously
127        let fut = async move {
128            // use queue to make sure we only process epochs once they have been
129            // executed
130            let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
131                (DEFERRED_STATE_EPOCH_COUNT - 1) as usize,
132            );
133
134            while let Some((epoch, hashes)) = receiver.recv().await {
135                debug!("epoch_loop: {:?}", (epoch, &hashes));
136                let (epoch, hashes) = match queue.push((epoch, hashes)) {
137                    None => continue,
138                    Some(e) => e,
139                };
140
141                // wait for epoch to be executed
142                let pivot = hashes.last().expect("empty epoch in pubsub");
143                chain_data_provider.wait_for_epoch(&pivot).await;
144
145                // publish epochs
146                let header = chain_data_provider.get_pivot_block_header(epoch);
147                if let Some(header) = header {
148                    let send_res = head_sender.send(header);
149                    if send_res.is_err() {
150                        // stop the loop
151                        let mut loop_started = heads_loop_started.write();
152                        *loop_started = false;
153                        return;
154                    }
155                }
156            }
157        };
158
159        self.executor.spawn(fut);
160    }
161
162    fn start_logs_loop(&self, filter: LogFilter) {
163        let mut loop_started = self.log_loop_started.write();
164        if loop_started.contains_key(&filter) {
165            return;
166        }
167        loop_started.insert(filter.clone(), true);
168
169        // subscribe to the `epochs_ordered` channel
170        let mut receiver = self.notifications.epochs_ordered.subscribe();
171        let senders = self.log_senders.read();
172        let tx = senders.get(&filter).unwrap().clone();
173
174        // clone everything we use in our async loop
175        let chain_data_provider = self.chain_data_provider.clone();
176        let loop_started = self.log_loop_started.clone();
177
178        // loop asynchronously
179        let fut = async move {
180            let mut last_epoch = 0;
181            let mut epochs: VecDeque<(u64, Vec<H256>, Vec<Log>)> =
182                VecDeque::new();
183            // use a queue to make sure we only process an epoch once it has
184            // been executed for sure
185            let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
186                (DEFERRED_STATE_EPOCH_COUNT - 1) as usize,
187            );
188
189            while let Some(epoch) = receiver.recv().await {
190                let epoch = match queue.push(epoch) {
191                    None => continue,
192                    Some(e) => e,
193                };
194
195                // publish pivot chain reorg if necessary
196                if epoch.0 <= last_epoch {
197                    debug!("pivot chain reorg: {} -> {}", last_epoch, epoch.0);
198                    assert!(epoch.0 > 0, "Unexpected epoch number received.");
199
200                    let mut reverted = vec![];
201                    while let Some(e) = epochs.back() {
202                        if e.0 >= epoch.0 {
203                            reverted.push(epochs.pop_back().unwrap());
204                        } else {
205                            break;
206                        }
207                    }
208
209                    for (_, _, logs) in reverted.into_iter() {
210                        for mut log in logs.into_iter() {
211                            log.removed = true;
212                            // send removed logs
213                            let send_res = tx.send(log);
214                            if send_res.is_err() {
215                                let mut loop_started = loop_started.write();
216                                loop_started.remove(&filter);
217                                return;
218                            }
219                        }
220                    }
221                }
222
223                last_epoch = epoch.0;
224
225                let latest_finalized_epoch_number =
226                    chain_data_provider.latest_finalized_epoch_number();
227                while let Some(e) = epochs.front() {
228                    if e.0 < latest_finalized_epoch_number {
229                        epochs.pop_front();
230                    } else {
231                        break;
232                    }
233                }
234
235                let logs = chain_data_provider
236                    .get_epoch_logs(&filter, epoch.clone(), false)
237                    .await;
238                for log in logs.iter() {
239                    let send_res = tx.send(log.clone());
240                    // when send_res is an error, it means all the receiver has
241                    // been dropped and we should stop the
242                    // loop
243                    if send_res.is_err() {
244                        let mut loop_started = loop_started.write();
245                        loop_started.remove(&filter);
246                        return;
247                    }
248                }
249                epochs.push_back((epoch.0, epoch.1, logs));
250            }
251        };
252
253        self.executor.spawn(fut);
254    }
255}
256
257#[async_trait::async_trait]
258impl EthPubSubApiServer for PubSubApi {
259    async fn subscribe(
260        &self, pending: PendingSubscriptionSink, kind: SubscriptionKind,
261        params: Option<Params>,
262    ) -> SubscriptionResult {
263        match (kind, params) {
264            (SubscriptionKind::NewHeads, None) => {
265                let sink = pending.accept().await?;
266                let stream = self
267                    .new_headers_stream()
268                    .map(|header| PubSubResult::Header(header));
269                self.executor.spawn(async move {
270                    let _ = pipe_from_stream(sink, stream).await;
271                });
272
273                // start the head stream
274                self.start_heads_loop();
275                Ok(())
276            }
277            (SubscriptionKind::NewHeads, _) => {
278                // reject
279                Err("Params should be empty".into())
280            }
281            (SubscriptionKind::Logs, None) => {
282                let mut filter = LogFilter::default();
283                filter.space = Space::Ethereum;
284
285                let sink = pending.accept().await?;
286                let stream = self
287                    .new_logs_stream(filter.clone())
288                    .map(|log| PubSubResult::Log(log));
289                self.executor.spawn(async {
290                    let _ = pipe_from_stream(sink, stream).await;
291                });
292
293                // start the log loop
294                self.start_logs_loop(filter);
295                Ok(())
296            }
297            (SubscriptionKind::Logs, Some(Params::Logs(filter))) => {
298                let filter = match filter
299                    .into_primitive(self.chain_data_provider.as_ref())
300                {
301                    Err(_e) => return Err("Invalid filter params".into()),
302                    Ok(filter) => filter,
303                };
304                let stream = self
305                    .new_logs_stream(filter.clone())
306                    .map(|log| PubSubResult::Log(log));
307                let sink = pending.accept().await?;
308                self.executor.spawn(async {
309                    let _ = pipe_from_stream(sink, stream).await;
310                });
311
312                // start the log loop
313                self.start_logs_loop(filter);
314                Ok(())
315            }
316            (_, _) => {
317                // reject
318                Err("Not supported".into())
319            }
320        }
321    }
322}
323
324pub struct ChainDataProvider {
325    consensus: SharedConsensusGraph,
326    data_man: Arc<BlockDataManager>,
327}
328
329impl ChainDataProvider {
330    pub fn new(consensus: SharedConsensusGraph) -> ChainDataProvider {
331        let data_man = consensus.data_manager().clone();
332        ChainDataProvider {
333            consensus,
334            data_man,
335        }
336    }
337
338    fn latest_finalized_epoch_number(&self) -> u64 {
339        self.consensus.latest_finalized_epoch_number()
340    }
341
342    fn consensus_graph(&self) -> &ConsensusGraph { &self.consensus }
343
344    async fn get_epoch_logs(
345        &self, filter: &LogFilter, epoch: (u64, Vec<H256>), removed: bool,
346    ) -> Vec<Log> {
347        let logs = match self.retrieve_epoch_logs(epoch).await {
348            Some(logs) => logs,
349            None => return vec![],
350        };
351
352        // apply filter to logs
353        let logs = logs
354            .iter()
355            .filter(|l| filter.matches(&l.entry))
356            .cloned()
357            .map(|l| Log::try_from_localized(l, self, removed))
358            .filter(|l| l.is_ok())
359            .map(|l| l.unwrap())
360            .collect();
361
362        return logs;
363    }
364
365    async fn wait_for_epoch(&self, pivot: &H256) -> Option<Arc<BlockReceipts>> {
366        self.retrieve_block_receipts(&pivot, &pivot).await
367    }
368
369    fn get_pivot_block_header(&self, epoch: u64) -> Option<Header> {
370        let phantom_block = {
371            // keep read lock to ensure consistent view
372            let _inner = self.consensus_graph().inner.read();
373            let block = self.consensus_graph().get_phantom_block_by_number(
374                EpochNumber::Number(epoch),
375                None,
376                false,
377            );
378
379            let pb = match block {
380                Err(e) => {
381                    debug!("Invalid params {:?}", e);
382                    None
383                }
384                Ok(pb) => pb,
385            };
386
387            pb
388        };
389
390        phantom_block.map(|b| Header::from_phantom(&b))
391    }
392
393    // attempt to retrieve block receipts from BlockDataManager
394    // on failure, wait and retry a few times, then fail
395    // NOTE: we do this because we might get epoch notifications
396    // before the corresponding execution results are computed
397    async fn retrieve_block_receipts(
398        &self, block: &H256, pivot: &H256,
399    ) -> Option<Arc<BlockReceipts>> {
400        info!("eth pubsub retrieve_block_receipts");
401        const POLL_INTERVAL_MS: Duration = Duration::from_millis(100);
402        let epoch = self.data_man.block_height_by_hash(pivot)?;
403
404        // we assume that all epochs we receive (with a distance of at least
405        // `DEFERRED_STATE_EPOCH_COUNT` from the tip of the pivot chain) are
406        // eventually executed, i.e. epochs are not dropped from the execution
407        // queue on pivot chain reorgs. moreover, multiple execution results
408        // might be stored for the same block for all epochs it was executed in.
409        // if these assumptions hold, we will eventually successfully read these
410        // execution results, even if they are outdated.
411        for ii in 0.. {
412            let latest = self.consensus.best_epoch_number();
413            match self.data_man.block_execution_result_by_hash_with_epoch(
414                &block, &pivot, false, /* update_pivot_assumption */
415                false, /* update_cache */
416            ) {
417                Some(res) => return Some(res.block_receipts.clone()),
418                None => {
419                    trace!("Cannot find receipts with {:?}/{:?}", block, pivot);
420                    let _ = sleep(POLL_INTERVAL_MS).await;
421                }
422            }
423
424            // we assume that an epoch gets executed within 100 seconds
425            if ii > 1000 {
426                error!("Cannot find receipts with {:?}/{:?}", block, pivot);
427                return None;
428            } else {
429                if latest
430                    > epoch + DEFERRED_STATE_EPOCH_COUNT + REWARD_EPOCH_COUNT
431                {
432                    // Even if the epoch was executed, the receipts on the fork
433                    // should have been deleted and cannot
434                    // be retrieved.
435                    warn!(
436                        "Cannot find receipts with {:?}/{:?}, latest_epoch={}",
437                        block, pivot, latest
438                    );
439                    return None;
440                }
441            }
442        }
443
444        unreachable!()
445    }
446
447    async fn get_phantom_block(
448        &self, epoch: u64, pivot: H256,
449    ) -> Option<PhantomBlock> {
450        debug!("eth pubsub get_phantom_block");
451        const POLL_INTERVAL_MS: Duration = Duration::from_millis(100);
452
453        for ii in 0.. {
454            let latest = self.consensus.best_epoch_number();
455            match self.consensus_graph().get_phantom_block_by_number(
456                EpochNumber::Number(epoch),
457                Some(pivot),
458                false, /* include_traces */
459            ) {
460                Ok(Some(b)) => return Some(b),
461                Ok(None) => {
462                    error!("Block not executed yet {:?}", pivot);
463                    let _ = sleep(POLL_INTERVAL_MS).await;
464                }
465                Err(e) => {
466                    error!("get_phantom_block_by_number failed {}", e);
467                    return None;
468                }
469            };
470
471            // we assume that an epoch gets executed within 100 seconds
472            if ii > 1000 {
473                error!("Cannot construct phantom block for {:?}", pivot);
474                return None;
475            } else {
476                if latest
477                    > epoch + DEFERRED_STATE_EPOCH_COUNT + REWARD_EPOCH_COUNT
478                {
479                    // Even if the epoch was executed, the phantom block on the
480                    // fork should be unable to constructed.
481                    warn!(
482                        "Cannot onstruct phantom block for {:?}, latest_epoch={}",
483                        pivot, latest
484                    );
485                    return None;
486                }
487            }
488        }
489
490        unreachable!()
491    }
492
493    async fn retrieve_epoch_logs(
494        &self, epoch: (u64, Vec<H256>),
495    ) -> Option<Vec<LocalizedLogEntry>> {
496        info!("eth pubsub retrieve_epoch_logs");
497        let (epoch_number, hashes) = epoch;
498        let pivot = hashes.last().cloned().expect("epoch should not be empty");
499
500        let pb = self.get_phantom_block(epoch_number, pivot).await?;
501
502        let mut logs = vec![];
503        let mut log_index = 0;
504
505        let txs = &pb.transactions;
506        assert_eq!(pb.receipts.len(), txs.len());
507
508        // construct logs
509        for (txid, (receipt, tx)) in zip(&pb.receipts, txs).enumerate() {
510            let eth_logs: Vec<_> = receipt
511                .logs
512                .iter()
513                .cloned()
514                .filter(|l| l.space == Space::Ethereum)
515                .collect();
516
517            for (logid, entry) in eth_logs.into_iter().enumerate() {
518                logs.push(LocalizedLogEntry {
519                    entry,
520                    block_hash: pivot,
521                    epoch_number,
522                    block_timestamp: Some(pb.pivot_header.timestamp()),
523                    transaction_hash: tx.hash,
524                    transaction_index: txid,
525                    log_index,
526                    transaction_log_index: logid,
527                });
528
529                log_index += 1;
530            }
531        }
532
533        Some(logs)
534    }
535}
536
537impl BlockProvider for &ChainDataProvider {
538    fn get_block_epoch_number(&self, hash: &H256) -> Option<u64> {
539        self.consensus.get_block_epoch_number(hash)
540    }
541
542    fn get_block_hashes_by_epoch(
543        &self, epoch_number: EpochNumber,
544    ) -> Result<Vec<H256>, String> {
545        self.consensus
546            .get_block_hashes_by_epoch(epoch_number)
547            .map_err(|e| e.to_string())
548    }
549}
550
551/// Helper to convert a serde error into an [`ErrorObject`]
552#[derive(Debug, thiserror::Error)]
553#[error("Failed to serialize subscription item: {0}")]
554pub struct SubscriptionSerializeError(#[from] serde_json::Error);
555
556impl SubscriptionSerializeError {
557    const fn new(err: serde_json::Error) -> Self { Self(err) }
558}
559
560impl From<SubscriptionSerializeError> for ErrorObject<'static> {
561    fn from(value: SubscriptionSerializeError) -> Self {
562        internal_rpc_err(value.to_string())
563    }
564}
565
566/// Pipes all stream items to the subscription sink.
567/// when the stream ends or the sink is closed, the function returns.
568async fn pipe_from_stream<T, St>(
569    sink: SubscriptionSink, mut stream: St,
570) -> Result<(), ErrorObject<'static>>
571where
572    St: Stream<Item = T> + Unpin,
573    T: Serialize,
574{
575    loop {
576        tokio::select! {
577            _ = sink.closed() => {
578                // connection dropped: when user unsubscribes or network closed
579                break Ok(())
580            },
581            maybe_item = stream.next() => {
582                let item = match maybe_item {
583                    Some(item) => item,
584                    None => {
585                        // stream ended
586                        break  Ok(())
587                    },
588                };
589                let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item).map_err(SubscriptionSerializeError::new)?;
590                if sink.send(msg).await.is_err() {
591                    break Ok(());
592                }
593            }
594        }
595    }
596}