client/rpc/impls/cfx/
pubsub.rs

1// Copyright 2020 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5use crate::rpc::{
6    errors,
7    helpers::{build_header, EpochQueue, SubscriberId, Subscribers},
8    metadata::Metadata,
9    traits::pubsub::PubSub,
10    types::{
11        pubsub::{self, SubscriptionEpoch},
12        Header as RpcHeader, Log as RpcLog,
13    },
14};
15use cfx_addr::Network;
16use cfx_parameters::{
17    consensus::DEFERRED_STATE_EPOCH_COUNT,
18    consensus_internal::REWARD_EPOCH_COUNT,
19};
20use cfx_types::{Space, H256};
21use cfxcore::{
22    channel::Channel, BlockDataManager, Notifications, SharedConsensusGraph,
23};
24use futures::future::join_all;
25use itertools::zip;
26use jsonrpc_core::Result as RpcResult;
27use jsonrpc_pubsub::{
28    typed::{Sink, Subscriber},
29    SinkResult, SubscriptionId,
30};
31use log::{debug, error, trace, warn};
32use parking_lot::RwLock;
33use primitives::{
34    filter::LogFilter, log_entry::LocalizedLogEntry, BlockReceipts,
35};
36use std::{
37    sync::{Arc, Weak},
38    time::Duration,
39};
40use tokio::{runtime::Runtime, time::sleep};
41
42type Client = Sink<pubsub::Result>;
43
44/// Cfx PubSub implementation.
45#[derive(Clone)]
46pub struct PubSubClient {
47    handler: Arc<ChainNotificationHandler>,
48    heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
49    epochs_subscribers: Arc<RwLock<Subscribers<Client>>>,
50    logs_subscribers: Arc<RwLock<Subscribers<(Client, LogFilter)>>>,
51    heads_loop_started: Arc<RwLock<bool>>,
52    notifications: Arc<Notifications>,
53    pub executor: Arc<Runtime>,
54}
55
56impl PubSubClient {
57    /// Creates new `PubSubClient`.
58    pub fn new(
59        executor: Arc<Runtime>, consensus: SharedConsensusGraph,
60        notifications: Arc<Notifications>, network: Network,
61    ) -> Self {
62        let heads_subscribers = Arc::new(RwLock::new(Subscribers::default()));
63        let epochs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
64        let logs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
65
66        let handler = Arc::new(ChainNotificationHandler {
67            consensus: consensus.clone(),
68            data_man: consensus.data_manager().clone(),
69            network,
70        });
71
72        PubSubClient {
73            handler,
74            heads_subscribers,
75            epochs_subscribers,
76            logs_subscribers,
77            heads_loop_started: Arc::new(RwLock::new(false)),
78            notifications,
79            executor,
80        }
81    }
82
83    pub fn epochs_ordered(&self) -> Arc<Channel<(u64, Vec<H256>)>> {
84        self.notifications.epochs_ordered.clone()
85    }
86
87    /// Returns a chain notification handler.
88    pub fn handler(&self) -> Weak<ChainNotificationHandler> {
89        Arc::downgrade(&self.handler)
90    }
91
92    fn start_head_loop(&self) {
93        let mut loop_started = self.heads_loop_started.write();
94        if *loop_started {
95            return;
96        }
97
98        debug!("start_headers_loop");
99        *loop_started = true;
100
101        // --------- newHeads ---------
102        // subscribe to the `new_block_hashes` channel
103        let new_block_hashes = self.notifications.new_block_hashes.clone();
104        let mut receiver = new_block_hashes.subscribe();
105
106        // loop asynchronously
107        let handler_clone = self.handler.clone();
108        let this = self.clone();
109
110        let fut = async move {
111            while let Some(hash) = receiver.recv().await {
112                // handler_clone.notify_header(&hash);
113                let subscribers = this.heads_subscribers.read();
114
115                // do not retrieve anything unnecessarily
116                if subscribers.is_empty() {
117                    new_block_hashes.unsubscribe(receiver.id);
118                    let mut loop_started = this.heads_loop_started.write();
119                    *loop_started = false;
120                    break;
121                }
122
123                let header = match handler_clone.get_header_by_hash(&hash) {
124                    Ok(h) => h,
125                    Err(e) => {
126                        error!(
127                            "Unexpected error while constructing RpcHeader: {:?}",
128                            e
129                        );
130                        continue;
131                    }
132                };
133
134                let mut ids_to_remove = vec![];
135                for (id, subscriber) in subscribers.iter() {
136                    let send_res = notify(
137                        subscriber,
138                        pubsub::Result::Header(header.clone()),
139                    );
140                    if let Err(err) = send_res {
141                        if err.is_disconnected() {
142                            ids_to_remove.push(id.clone());
143                        }
144                    }
145                }
146
147                drop(subscribers);
148                for id in ids_to_remove {
149                    this.heads_subscribers
150                        .write()
151                        .remove(&SubscriptionId::String(id.as_string()));
152                }
153            }
154        };
155
156        self.executor.spawn(fut);
157    }
158
159    // Start an async loop that continuously receives epoch notifications and
160    // publishes the corresponding epochs to subscriber `id`, keeping their
161    // original order. The loop terminates when subscriber `id` unsubscribes.
162    fn start_epoch_loop(&self, id: SubscriberId, sub_epoch: SubscriptionEpoch) {
163        trace!("start_epoch_loop({:?})", id);
164
165        // clone everything we use in our async loop
166        let subscribers = self.epochs_subscribers.clone();
167        let epochs_ordered = self.notifications.epochs_ordered.clone();
168        let handler = self.handler.clone();
169
170        // subscribe to the `epochs_ordered` channel
171        let mut receiver = epochs_ordered.subscribe();
172
173        // when subscribing to "latest_state", use a queue to make sure
174        // we only process epochs once they have been executed
175        let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
176            if sub_epoch == SubscriptionEpoch::LatestState {
177                (DEFERRED_STATE_EPOCH_COUNT - 1) as usize
178            } else {
179                0
180            },
181        );
182
183        // loop asynchronously
184        let fut = async move {
185            while let Some((epoch, hashes)) = receiver.recv().await {
186                trace!("epoch_loop({:?}): {:?}", id, (epoch, &hashes));
187
188                // retrieve subscriber
189                let sub = match subscribers.read().get(&id) {
190                    Some(sub) => sub.clone(),
191                    None => {
192                        // unsubscribed, terminate loop
193                        epochs_ordered.unsubscribe(receiver.id);
194                        return;
195                    }
196                };
197
198                let (epoch, hashes) = match queue.push((epoch, hashes)) {
199                    None => continue,
200                    Some(e) => e,
201                };
202
203                // wait for epoch to be executed
204                if sub_epoch == SubscriptionEpoch::LatestState {
205                    let pivot = hashes.last().expect("empty epoch in pubsub");
206                    handler.wait_for_epoch(&pivot).await;
207                }
208
209                // publish epochs
210                let send_res = handler.notify_epoch(sub, (epoch, hashes)).await;
211                if let Err(err) = send_res {
212                    if err.is_disconnected() {
213                        epochs_ordered.unsubscribe(receiver.id);
214                        subscribers
215                            .write()
216                            .remove(&SubscriptionId::String(id.as_string()));
217                        return;
218                    }
219                }
220            }
221        };
222
223        self.executor.spawn(fut);
224    }
225
226    // Start an async loop that continuously receives epoch notifications and
227    // publishes the corresponding logs to subscriber `id`, keeping their
228    // original order. The loop terminates when subscriber `id` unsubscribes.
229    fn start_logs_loop(&self, id: SubscriberId) {
230        trace!("start_logs_loop({:?})", id);
231
232        // clone everything we use in our async loop
233        let subscribers = self.logs_subscribers.clone();
234        let epochs_ordered = self.notifications.epochs_ordered.clone();
235        let handler = self.handler.clone();
236
237        // subscribe to the `epochs_ordered` channel
238        let mut receiver = epochs_ordered.subscribe();
239
240        // use a queue to make sure we only process an epoch once it has been
241        // executed for sure
242        let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
243            (DEFERRED_STATE_EPOCH_COUNT - 1) as usize,
244        );
245
246        // loop asynchronously
247        let fut = async move {
248            let mut last_epoch = 0;
249
250            while let Some(epoch) = receiver.recv().await {
251                trace!("logs_loop({:?}): {:?}", id, epoch);
252
253                // retrieve subscriber
254                let (sub, filter) = match subscribers.read().get(&id) {
255                    Some(sub) => sub.clone(),
256                    None => {
257                        // unsubscribed, terminate loop
258                        epochs_ordered.unsubscribe(receiver.id);
259                        return;
260                    }
261                };
262
263                let epoch = match queue.push(epoch) {
264                    None => continue,
265                    Some(e) => e,
266                };
267
268                // publish pivot chain reorg if necessary
269                if epoch.0 <= last_epoch {
270                    debug!("pivot chain reorg: {} -> {}", last_epoch, epoch.0);
271                    assert!(epoch.0 > 0, "Unexpected epoch number received.");
272                    handler.notify_revert(&sub, epoch.0 - 1).await;
273                }
274
275                last_epoch = epoch.0;
276
277                // publish matching logs
278                let send_res = handler.notify_logs(&sub, filter, epoch).await;
279                if let Err(err) = send_res {
280                    if err.is_disconnected() {
281                        epochs_ordered.unsubscribe(receiver.id);
282                        subscribers
283                            .write()
284                            .remove(&SubscriptionId::String(id.as_string()));
285                        return;
286                    }
287                }
288            }
289        };
290
291        self.executor.spawn(fut);
292    }
293}
294
295/// PubSub notification handler.
296pub struct ChainNotificationHandler {
297    consensus: SharedConsensusGraph,
298    data_man: Arc<BlockDataManager>,
299    pub network: Network,
300}
301
302impl ChainNotificationHandler {
303    fn get_header_by_hash(&self, hash: &H256) -> Result<RpcHeader, String> {
304        let header = match self.data_man.block_header_by_hash(hash) {
305            Some(h) => build_header(&*h, self.network, self.consensus.clone()),
306            None => return Err("Header not found".to_string()),
307        };
308
309        header
310    }
311
312    async fn notify_epoch(
313        &self, subscriber: Client, epoch: (u64, Vec<H256>),
314    ) -> SinkResult {
315        trace!("notify_epoch({:?})", epoch);
316
317        let (epoch, hashes) = epoch;
318        let hashes = hashes.into_iter().map(H256::from).collect();
319
320        notify(
321            &subscriber,
322            pubsub::Result::Epoch {
323                epoch_number: epoch.into(),
324                epoch_hashes_ordered: hashes,
325            },
326        )
327    }
328
329    async fn notify_revert(&self, subscriber: &Client, epoch: u64) {
330        trace!("notify_revert({:?})", epoch);
331
332        let _ = notify(
333            subscriber,
334            pubsub::Result::ChainReorg {
335                revert_to: epoch.into(),
336            },
337        );
338    }
339
340    async fn notify_logs(
341        &self, subscriber: &Client, filter: LogFilter, epoch: (u64, Vec<H256>),
342    ) -> SinkResult {
343        trace!("notify_logs({:?})", epoch);
344
345        // NOTE: calls to DbManager are supposed to be cached
346        // FIXME(thegaram): what is the perf impact of calling this for each
347        // subscriber? would it be better to do this once for each epoch?
348        let logs = match self.retrieve_epoch_logs(epoch).await {
349            Some(logs) => logs,
350            None => return Ok(()),
351        };
352
353        // apply filter to logs
354        let logs = logs
355            .iter()
356            .filter(|l| filter.matches(&l.entry))
357            .cloned()
358            .map(|l| RpcLog::try_from_localized(l, self.network));
359
360        // send logs in order
361        // FIXME(thegaram): Sink::notify flushes after each item.
362        // consider sending them in a batch.
363        for log in logs {
364            match log {
365                Ok(l) => {
366                    let send_res = notify(subscriber, pubsub::Result::Log(l));
367                    if send_res.is_err() {
368                        return send_res;
369                    }
370                }
371                Err(e) => {
372                    error!(
373                        "Unexpected error while constructing RpcLog: {:?}",
374                        e
375                    );
376                }
377            }
378        }
379        Ok(())
380    }
381
382    // attempt to retrieve block receipts from BlockDataManager
383    // on failure, wait and retry a few times, then fail
384    // NOTE: we do this because we might get epoch notifications
385    // before the corresponding execution results are computed
386    async fn retrieve_block_receipts(
387        &self, block: &H256, pivot: &H256,
388    ) -> Option<Arc<BlockReceipts>> {
389        const POLL_INTERVAL_MS: Duration = Duration::from_millis(100);
390        let epoch = self.data_man.block_height_by_hash(pivot)?;
391
392        // we assume that all epochs we receive (with a distance of at least
393        // `DEFERRED_STATE_EPOCH_COUNT` from the tip of the pivot chain) are
394        // eventually executed, i.e. epochs are not dropped from the execution
395        // queue on pivot chain reorgs. moreover, multiple execution results
396        // might be stored for the same block for all epochs it was executed in.
397        // if these assumptions hold, we will eventually successfully read these
398        // execution results, even if they are outdated.
399        for ii in 0.. {
400            let latest = self.consensus.best_epoch_number();
401            match self.data_man.block_execution_result_by_hash_with_epoch(
402                &block, &pivot, false, /* update_pivot_assumption */
403                false, /* update_cache */
404            ) {
405                Some(res) => return Some(res.block_receipts.clone()),
406                None => {
407                    trace!("Cannot find receipts with {:?}/{:?}", block, pivot);
408                    let _ = sleep(POLL_INTERVAL_MS).await;
409                }
410            }
411
412            // we assume that an epoch gets executed within 100 seconds
413            if ii > 1000 {
414                error!("Cannot find receipts with {:?}/{:?}", block, pivot);
415                return None;
416            } else {
417                if latest
418                    > epoch + DEFERRED_STATE_EPOCH_COUNT + REWARD_EPOCH_COUNT
419                {
420                    // Even if the epoch was executed, the receipts on the fork
421                    // should have been deleted and cannot
422                    // be retrieved.
423                    warn!(
424                        "Cannot find receipts with {:?}/{:?}, latest_epoch={}",
425                        block, pivot, latest
426                    );
427                    return None;
428                }
429            }
430        }
431
432        unreachable!()
433    }
434
435    // wait until the execution results corresponding to `pivot` become
436    // available in the database.
437    async fn wait_for_epoch(&self, pivot: &H256) -> () {
438        let _ = self.retrieve_block_receipts(&pivot, &pivot).await;
439    }
440
441    async fn retrieve_epoch_logs(
442        &self, epoch: (u64, Vec<H256>),
443    ) -> Option<Vec<LocalizedLogEntry>> {
444        let (epoch_number, hashes) = epoch;
445        let pivot = hashes.last().cloned().expect("epoch should not be empty");
446
447        // retrieve epoch receipts
448        let fut = hashes
449            .iter()
450            .map(|h| self.retrieve_block_receipts(&h, &pivot));
451
452        let receipts = join_all(fut)
453            .await
454            .into_iter()
455            .collect::<Option<Vec<_>>>()?;
456
457        let mut logs = vec![];
458        let mut log_index = 0;
459
460        for (block_hash, block_receipts) in zip(hashes, receipts) {
461            // retrieve block transactions
462            let block = match self
463                .data_man
464                .block_by_hash(&block_hash, true /* update_cache */)
465            {
466                Some(b) => b,
467                None => {
468                    warn!("Unable to retrieve block {:?}", block_hash);
469                    return None;
470                }
471            };
472
473            let txs = &block.transactions;
474            assert_eq!(block_receipts.receipts.len(), txs.len());
475
476            // construct logs
477            for (txid, (receipt, tx)) in
478                zip(&block_receipts.receipts, txs).enumerate()
479            {
480                let native_logs: Vec<_> = receipt
481                    .logs
482                    .iter()
483                    .cloned()
484                    .filter(|l| l.space == Space::Native)
485                    .collect();
486
487                for (logid, entry) in native_logs.into_iter().enumerate() {
488                    logs.push(LocalizedLogEntry {
489                        entry,
490                        block_hash,
491                        epoch_number,
492                        block_timestamp: Some(block.block_header.timestamp()),
493                        transaction_hash: tx.hash,
494                        transaction_index: txid,
495                        log_index,
496                        transaction_log_index: logid,
497                    });
498
499                    log_index += 1;
500                }
501            }
502        }
503
504        Some(logs)
505    }
506}
507
508impl PubSub for PubSubClient {
509    type Metadata = Metadata;
510
511    fn subscribe(
512        &self, _meta: Metadata, subscriber: Subscriber<pubsub::Result>,
513        kind: pubsub::Kind, params: Option<pubsub::Params>,
514    ) {
515        let error = match (kind, params) {
516            // --------- newHeads ---------
517            (pubsub::Kind::NewHeads, None) => {
518                self.heads_subscribers.write().push(subscriber);
519                self.start_head_loop();
520                return;
521            }
522            (pubsub::Kind::NewHeads, _) => {
523                errors::invalid_params("newHeads", "Expected no parameters.")
524            }
525            // --------- epochs ---------
526            (pubsub::Kind::Epochs, None) => {
527                let id = self.epochs_subscribers.write().push(subscriber);
528                self.start_epoch_loop(id, SubscriptionEpoch::LatestMined);
529                return;
530            }
531            (pubsub::Kind::Epochs, Some(pubsub::Params::Epochs(epoch))) => {
532                let id = self.epochs_subscribers.write().push(subscriber);
533                self.start_epoch_loop(id, epoch);
534                return;
535            }
536            (pubsub::Kind::Epochs, _) => {
537                errors::invalid_params("epochs", "Expected epoch parameter.")
538            }
539            // --------- logs ---------
540            (pubsub::Kind::Logs, None) => {
541                let id = self
542                    .logs_subscribers
543                    .write()
544                    .push(subscriber, LogFilter::default());
545
546                self.start_logs_loop(id);
547                return;
548            }
549            (pubsub::Kind::Logs, Some(pubsub::Params::Logs(filter))) => {
550                match filter.into_primitive() {
551                    Err(e) => e,
552                    Ok(filter) => {
553                        let id = self
554                            .logs_subscribers
555                            .write()
556                            .push(subscriber, filter);
557
558                        self.start_logs_loop(id);
559                        return;
560                    }
561                }
562            }
563            (pubsub::Kind::Logs, _) => {
564                errors::invalid_params("logs", "Expected filter parameter.")
565            }
566            _ => errors::unimplemented(None),
567        };
568
569        let _ = subscriber.reject(error);
570    }
571
572    fn unsubscribe(
573        &self, _: Option<Self::Metadata>, id: SubscriptionId,
574    ) -> RpcResult<bool> {
575        let res0 = self.heads_subscribers.write().remove(&id).is_some();
576        let res1 = self.epochs_subscribers.write().remove(&id).is_some();
577        let res2 = self.logs_subscribers.write().remove(&id).is_some();
578
579        Ok(res0 || res1 || res2)
580    }
581}
582
583fn notify(subscriber: &Client, result: pubsub::Result) -> SinkResult {
584    subscriber.notify(Ok(result))
585}