cfx_rpc_eth_impl/
pubsub.rs

1use crate::helpers::EpochQueue;
2use cfx_parameters::{
3    consensus::DEFERRED_STATE_EPOCH_COUNT,
4    consensus_internal::REWARD_EPOCH_COUNT,
5};
6use cfx_rpc_cfx_impl::helpers::subscribers::pipe_from_stream;
7use cfx_rpc_cfx_types::{traits::BlockProvider, PhantomBlock};
8use cfx_rpc_eth_api::EthPubSubApiServer;
9use cfx_rpc_eth_types::{
10    eth_pubsub::{Kind as SubscriptionKind, Params, Result as PubSubResult},
11    Header, Log,
12};
13use cfx_tasks::TaskExecutor;
14use cfx_types::{Space, H256};
15use cfxcore::{
16    BlockDataManager, ConsensusGraph, Notifications, SharedConsensusGraph,
17};
18use futures::StreamExt;
19use jsonrpsee::{core::SubscriptionResult, PendingSubscriptionSink};
20use log::{debug, error, info, trace, warn};
21use parking_lot::RwLock;
22use primitives::{
23    filter::LogFilter, log_entry::LocalizedLogEntry, BlockReceipts, EpochNumber,
24};
25use std::{
26    collections::{HashMap, VecDeque},
27    iter::zip,
28    sync::Arc,
29    time::Duration,
30};
31use tokio::{sync::broadcast, time::sleep};
32use tokio_stream::{wrappers::BroadcastStream, Stream};
33
34const BROADCAST_CHANNEL_SIZE: usize = 1000;
35
36#[derive(Clone)]
37pub struct PubSubApi {
38    executor: TaskExecutor,
39    chain_data_provider: Arc<ChainDataProvider>,
40    notifications: Arc<Notifications>,
41    heads_loop_started: Arc<RwLock<bool>>,
42    head_sender: Arc<broadcast::Sender<Header>>,
43    log_loop_started: Arc<RwLock<HashMap<LogFilter, bool>>>,
44    log_senders: Arc<RwLock<HashMap<LogFilter, broadcast::Sender<Log>>>>,
45}
46
47impl PubSubApi {
48    pub fn new(
49        consensus: SharedConsensusGraph, notifications: Arc<Notifications>,
50        executor: TaskExecutor,
51    ) -> PubSubApi {
52        let (head_sender, _) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
53        let log_senders = Arc::new(RwLock::new(HashMap::new()));
54        let chain_data_provider =
55            Arc::new(ChainDataProvider::new(consensus.clone()));
56
57        PubSubApi {
58            executor,
59            notifications,
60            heads_loop_started: Arc::new(RwLock::new(false)),
61            head_sender: Arc::new(head_sender),
62            log_senders,
63            chain_data_provider,
64            log_loop_started: Arc::new(RwLock::new(HashMap::new())),
65        }
66    }
67
68    fn new_headers_stream(&self) -> impl Stream<Item = Header> {
69        let receiver = self.head_sender.subscribe();
70        BroadcastStream::new(receiver)
71            .filter(|item| {
72                let res = match item {
73                    Ok(_) => true,
74                    Err(_) => false, /* there are two types of errors: closed
75                                      * and lagged, mainly lagged */
76                };
77                futures::future::ready(res)
78            })
79            .map(|item| item.expect("should not be an error"))
80    }
81
82    fn new_logs_stream(&self, filter: LogFilter) -> impl Stream<Item = Log> {
83        let receiver;
84        let senders = self.log_senders.read();
85        if !senders.contains_key(&filter) {
86            drop(senders);
87            let mut senders = self.log_senders.write();
88            let (tx, rx) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
89            senders.insert(filter, tx);
90            receiver = rx;
91        } else {
92            receiver = senders.get(&filter).unwrap().subscribe();
93        }
94
95        BroadcastStream::new(receiver)
96            .filter(|item| {
97                let res = match item {
98                    Ok(_) => true,
99                    Err(_) => false,
100                };
101                futures::future::ready(res)
102            })
103            .map(|item| item.expect("should not be an error"))
104    }
105
106    fn start_heads_loop(&self) {
107        let mut loop_started = self.heads_loop_started.write();
108        if *loop_started {
109            return;
110        }
111        *loop_started = true;
112
113        debug!("async start_headers_loop");
114
115        // subscribe to the `epochs_ordered` channel
116        let mut receiver = self.notifications.epochs_ordered.subscribe();
117        let head_sender = self.head_sender.clone();
118        // clone everything we use in our async loop
119        let chain_data_provider = self.chain_data_provider.clone();
120        let heads_loop_started = self.heads_loop_started.clone();
121
122        // loop asynchronously
123        let fut = async move {
124            // use queue to make sure we only process epochs once they have been
125            // executed
126            let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
127                (DEFERRED_STATE_EPOCH_COUNT - 1) as usize,
128            );
129
130            while let Some((epoch, hashes)) = receiver.recv().await {
131                debug!("epoch_loop: {:?}", (epoch, &hashes));
132                let (epoch, hashes) = match queue.push((epoch, hashes)) {
133                    None => continue,
134                    Some(e) => e,
135                };
136
137                // wait for epoch to be executed
138                let pivot = hashes.last().expect("empty epoch in pubsub");
139                chain_data_provider.wait_for_epoch(&pivot).await;
140
141                // publish epochs
142                let header = chain_data_provider.get_pivot_block_header(epoch);
143                if let Some(header) = header {
144                    let send_res = head_sender.send(header);
145                    if send_res.is_err() {
146                        // stop the loop
147                        let mut loop_started = heads_loop_started.write();
148                        *loop_started = false;
149                        return;
150                    }
151                }
152            }
153        };
154
155        self.executor.spawn(fut);
156    }
157
158    fn start_logs_loop(&self, filter: LogFilter) {
159        let mut loop_started = self.log_loop_started.write();
160        if loop_started.contains_key(&filter) {
161            return;
162        }
163        loop_started.insert(filter.clone(), true);
164
165        // subscribe to the `epochs_ordered` channel
166        let mut receiver = self.notifications.epochs_ordered.subscribe();
167        let senders = self.log_senders.read();
168        let tx = senders.get(&filter).unwrap().clone();
169
170        // clone everything we use in our async loop
171        let chain_data_provider = self.chain_data_provider.clone();
172        let loop_started = self.log_loop_started.clone();
173
174        // loop asynchronously
175        let fut = async move {
176            let mut last_epoch = 0;
177            let mut epochs: VecDeque<(u64, Vec<H256>, Vec<Log>)> =
178                VecDeque::new();
179            // use a queue to make sure we only process an epoch once it has
180            // been executed for sure
181            let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
182                (DEFERRED_STATE_EPOCH_COUNT - 1) as usize,
183            );
184
185            while let Some(epoch) = receiver.recv().await {
186                let epoch = match queue.push(epoch) {
187                    None => continue,
188                    Some(e) => e,
189                };
190
191                // publish pivot chain reorg if necessary
192                if epoch.0 <= last_epoch {
193                    debug!("pivot chain reorg: {} -> {}", last_epoch, epoch.0);
194                    assert!(epoch.0 > 0, "Unexpected epoch number received.");
195
196                    let mut reverted = vec![];
197                    while let Some(e) = epochs.back() {
198                        if e.0 >= epoch.0 {
199                            reverted.push(epochs.pop_back().unwrap());
200                        } else {
201                            break;
202                        }
203                    }
204
205                    for (_, _, logs) in reverted.into_iter() {
206                        for mut log in logs.into_iter() {
207                            log.removed = true;
208                            // send removed logs
209                            let send_res = tx.send(log);
210                            if send_res.is_err() {
211                                let mut loop_started = loop_started.write();
212                                loop_started.remove(&filter);
213                                return;
214                            }
215                        }
216                    }
217                }
218
219                last_epoch = epoch.0;
220
221                let latest_finalized_epoch_number =
222                    chain_data_provider.latest_finalized_epoch_number();
223                while let Some(e) = epochs.front() {
224                    if e.0 < latest_finalized_epoch_number {
225                        epochs.pop_front();
226                    } else {
227                        break;
228                    }
229                }
230
231                let logs = chain_data_provider
232                    .get_epoch_logs(&filter, epoch.clone(), false)
233                    .await;
234                for log in logs.iter() {
235                    let send_res = tx.send(log.clone());
236                    // when send_res is an error, it means all the receiver has
237                    // been dropped and we should stop the
238                    // loop
239                    if send_res.is_err() {
240                        let mut loop_started = loop_started.write();
241                        loop_started.remove(&filter);
242                        return;
243                    }
244                }
245                epochs.push_back((epoch.0, epoch.1, logs));
246            }
247        };
248
249        self.executor.spawn(fut);
250    }
251}
252
253#[async_trait::async_trait]
254impl EthPubSubApiServer for PubSubApi {
255    async fn subscribe(
256        &self, pending: PendingSubscriptionSink, kind: SubscriptionKind,
257        params: Option<Params>,
258    ) -> SubscriptionResult {
259        match (kind, params) {
260            (SubscriptionKind::NewHeads, None) => {
261                let sink = pending.accept().await?;
262                let stream = self
263                    .new_headers_stream()
264                    .map(|header| PubSubResult::Header(header));
265                self.executor.spawn(async move {
266                    let _ = pipe_from_stream(sink, stream).await;
267                });
268
269                // start the head stream
270                self.start_heads_loop();
271                Ok(())
272            }
273            (SubscriptionKind::NewHeads, _) => {
274                // reject
275                Err("Params should be empty".into())
276            }
277            (SubscriptionKind::Logs, None) => {
278                let mut filter = LogFilter::default();
279                filter.space = Space::Ethereum;
280
281                let sink = pending.accept().await?;
282                let stream = self
283                    .new_logs_stream(filter.clone())
284                    .map(|log| PubSubResult::Log(log));
285                self.executor.spawn(async {
286                    let _ = pipe_from_stream(sink, stream).await;
287                });
288
289                // start the log loop
290                self.start_logs_loop(filter);
291                Ok(())
292            }
293            (SubscriptionKind::Logs, Some(Params::Logs(filter))) => {
294                let filter = match filter
295                    .into_primitive(self.chain_data_provider.as_ref())
296                {
297                    Err(_e) => return Err("Invalid filter params".into()),
298                    Ok(filter) => filter,
299                };
300                let stream = self
301                    .new_logs_stream(filter.clone())
302                    .map(|log| PubSubResult::Log(log));
303                let sink = pending.accept().await?;
304                self.executor.spawn(async {
305                    let _ = pipe_from_stream(sink, stream).await;
306                });
307
308                // start the log loop
309                self.start_logs_loop(filter);
310                Ok(())
311            }
312            (_, _) => {
313                // reject
314                Err("Not supported".into())
315            }
316        }
317    }
318}
319
320pub struct ChainDataProvider {
321    consensus: SharedConsensusGraph,
322    data_man: Arc<BlockDataManager>,
323}
324
325impl ChainDataProvider {
326    pub fn new(consensus: SharedConsensusGraph) -> ChainDataProvider {
327        let data_man = consensus.data_manager().clone();
328        ChainDataProvider {
329            consensus,
330            data_man,
331        }
332    }
333
334    fn latest_finalized_epoch_number(&self) -> u64 {
335        self.consensus.latest_finalized_epoch_number()
336    }
337
338    fn consensus_graph(&self) -> &ConsensusGraph { &self.consensus }
339
340    async fn get_epoch_logs(
341        &self, filter: &LogFilter, epoch: (u64, Vec<H256>), removed: bool,
342    ) -> Vec<Log> {
343        let logs = match self.retrieve_epoch_logs(epoch).await {
344            Some(logs) => logs,
345            None => return vec![],
346        };
347
348        // apply filter to logs
349        let logs = logs
350            .iter()
351            .filter(|l| filter.matches(&l.entry))
352            .cloned()
353            .map(|l| Log::try_from_localized(l, self, removed))
354            .filter(|l| l.is_ok())
355            .map(|l| l.unwrap())
356            .collect();
357
358        return logs;
359    }
360
361    async fn wait_for_epoch(&self, pivot: &H256) -> Option<Arc<BlockReceipts>> {
362        self.retrieve_block_receipts(&pivot, &pivot).await
363    }
364
365    fn get_pivot_block_header(&self, epoch: u64) -> Option<Header> {
366        let phantom_block = {
367            // keep read lock to ensure consistent view
368            let _inner = self.consensus_graph().inner.read();
369            let block = self.consensus_graph().get_phantom_block_by_number(
370                EpochNumber::Number(epoch),
371                None,
372                false,
373            );
374
375            let pb = match block {
376                Err(e) => {
377                    debug!("Invalid params {:?}", e);
378                    None
379                }
380                Ok(pb) => pb,
381            };
382
383            pb
384        };
385
386        phantom_block.map(|b| Header::from_phantom(&b))
387    }
388
389    // attempt to retrieve block receipts from BlockDataManager
390    // on failure, wait and retry a few times, then fail
391    // NOTE: we do this because we might get epoch notifications
392    // before the corresponding execution results are computed
393    async fn retrieve_block_receipts(
394        &self, block: &H256, pivot: &H256,
395    ) -> Option<Arc<BlockReceipts>> {
396        info!("eth pubsub retrieve_block_receipts");
397        const POLL_INTERVAL_MS: Duration = Duration::from_millis(100);
398        let epoch = self.data_man.block_height_by_hash(pivot)?;
399
400        // we assume that all epochs we receive (with a distance of at least
401        // `DEFERRED_STATE_EPOCH_COUNT` from the tip of the pivot chain) are
402        // eventually executed, i.e. epochs are not dropped from the execution
403        // queue on pivot chain reorgs. moreover, multiple execution results
404        // might be stored for the same block for all epochs it was executed in.
405        // if these assumptions hold, we will eventually successfully read these
406        // execution results, even if they are outdated.
407        for ii in 0.. {
408            let latest = self.consensus.best_epoch_number();
409            match self.data_man.block_execution_result_by_hash_with_epoch(
410                &block, &pivot, false, /* update_pivot_assumption */
411                false, /* update_cache */
412            ) {
413                Some(res) => return Some(res.block_receipts.clone()),
414                None => {
415                    trace!("Cannot find receipts with {:?}/{:?}", block, pivot);
416                    let _ = sleep(POLL_INTERVAL_MS).await;
417                }
418            }
419
420            // we assume that an epoch gets executed within 100 seconds
421            if ii > 1000 {
422                error!("Cannot find receipts with {:?}/{:?}", block, pivot);
423                return None;
424            } else {
425                if latest
426                    > epoch + DEFERRED_STATE_EPOCH_COUNT + REWARD_EPOCH_COUNT
427                {
428                    // Even if the epoch was executed, the receipts on the fork
429                    // should have been deleted and cannot
430                    // be retrieved.
431                    warn!(
432                        "Cannot find receipts with {:?}/{:?}, latest_epoch={}",
433                        block, pivot, latest
434                    );
435                    return None;
436                }
437            }
438        }
439
440        unreachable!()
441    }
442
443    async fn get_phantom_block(
444        &self, epoch: u64, pivot: H256,
445    ) -> Option<PhantomBlock> {
446        debug!("eth pubsub get_phantom_block");
447        const POLL_INTERVAL_MS: Duration = Duration::from_millis(100);
448
449        for ii in 0.. {
450            let latest = self.consensus.best_epoch_number();
451            match self.consensus_graph().get_phantom_block_by_number(
452                EpochNumber::Number(epoch),
453                Some(pivot),
454                false, /* include_traces */
455            ) {
456                Ok(Some(b)) => return Some(b),
457                Ok(None) => {
458                    error!("Block not executed yet {:?}", pivot);
459                    let _ = sleep(POLL_INTERVAL_MS).await;
460                }
461                Err(e) => {
462                    error!("get_phantom_block_by_number failed {}", e);
463                    return None;
464                }
465            };
466
467            // we assume that an epoch gets executed within 100 seconds
468            if ii > 1000 {
469                error!("Cannot construct phantom block for {:?}", pivot);
470                return None;
471            } else {
472                if latest
473                    > epoch + DEFERRED_STATE_EPOCH_COUNT + REWARD_EPOCH_COUNT
474                {
475                    // Even if the epoch was executed, the phantom block on the
476                    // fork should be unable to constructed.
477                    warn!(
478                        "Cannot onstruct phantom block for {:?}, latest_epoch={}",
479                        pivot, latest
480                    );
481                    return None;
482                }
483            }
484        }
485
486        unreachable!()
487    }
488
489    async fn retrieve_epoch_logs(
490        &self, epoch: (u64, Vec<H256>),
491    ) -> Option<Vec<LocalizedLogEntry>> {
492        info!("eth pubsub retrieve_epoch_logs");
493        let (epoch_number, hashes) = epoch;
494        let pivot = hashes.last().cloned().expect("epoch should not be empty");
495
496        let pb = self.get_phantom_block(epoch_number, pivot).await?;
497
498        let mut logs = vec![];
499        let mut log_index = 0;
500
501        let txs = &pb.transactions;
502        assert_eq!(pb.receipts.len(), txs.len());
503
504        // construct logs
505        for (txid, (receipt, tx)) in zip(&pb.receipts, txs).enumerate() {
506            let eth_logs: Vec<_> = receipt
507                .logs
508                .iter()
509                .cloned()
510                .filter(|l| l.space == Space::Ethereum)
511                .collect();
512
513            for (logid, entry) in eth_logs.into_iter().enumerate() {
514                logs.push(LocalizedLogEntry {
515                    entry,
516                    block_hash: pivot,
517                    epoch_number,
518                    block_timestamp: Some(pb.pivot_header.timestamp()),
519                    transaction_hash: tx.hash,
520                    transaction_index: txid,
521                    log_index,
522                    transaction_log_index: logid,
523                });
524
525                log_index += 1;
526            }
527        }
528
529        Some(logs)
530    }
531}
532
533impl BlockProvider for &ChainDataProvider {
534    fn get_block_epoch_number(&self, hash: &H256) -> Option<u64> {
535        self.consensus.get_block_epoch_number(hash)
536    }
537
538    fn get_block_hashes_by_epoch(
539        &self, epoch_number: EpochNumber,
540    ) -> Result<Vec<H256>, String> {
541        self.consensus
542            .get_block_hashes_by_epoch(epoch_number)
543            .map_err(|e| e.to_string())
544    }
545}