1use crate::rpc::{
6 errors,
7 helpers::{build_header, EpochQueue, SubscriberId, Subscribers},
8 metadata::Metadata,
9 traits::pubsub::PubSub,
10 types::{
11 pubsub::{self, SubscriptionEpoch},
12 Header as RpcHeader, Log as RpcLog,
13 },
14};
15use cfx_addr::Network;
16use cfx_parameters::{
17 consensus::DEFERRED_STATE_EPOCH_COUNT,
18 consensus_internal::REWARD_EPOCH_COUNT,
19};
20use cfx_types::{Space, H256};
21use cfxcore::{
22 channel::Channel, BlockDataManager, Notifications, SharedConsensusGraph,
23};
24use futures::future::join_all;
25use itertools::zip;
26use jsonrpc_core::Result as RpcResult;
27use jsonrpc_pubsub::{
28 typed::{Sink, Subscriber},
29 SinkResult, SubscriptionId,
30};
31use log::{debug, error, trace, warn};
32use parking_lot::RwLock;
33use primitives::{
34 filter::LogFilter, log_entry::LocalizedLogEntry, BlockReceipts,
35};
36use std::{
37 sync::{Arc, Weak},
38 time::Duration,
39};
40use tokio::{runtime::Runtime, time::sleep};
41
42type Client = Sink<pubsub::Result>;
43
44#[derive(Clone)]
46pub struct PubSubClient {
47 handler: Arc<ChainNotificationHandler>,
48 heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
49 epochs_subscribers: Arc<RwLock<Subscribers<Client>>>,
50 logs_subscribers: Arc<RwLock<Subscribers<(Client, LogFilter)>>>,
51 heads_loop_started: Arc<RwLock<bool>>,
52 notifications: Arc<Notifications>,
53 pub executor: Arc<Runtime>,
54}
55
56impl PubSubClient {
57 pub fn new(
59 executor: Arc<Runtime>, consensus: SharedConsensusGraph,
60 notifications: Arc<Notifications>, network: Network,
61 ) -> Self {
62 let heads_subscribers = Arc::new(RwLock::new(Subscribers::default()));
63 let epochs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
64 let logs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
65
66 let handler = Arc::new(ChainNotificationHandler {
67 consensus: consensus.clone(),
68 data_man: consensus.data_manager().clone(),
69 network,
70 });
71
72 PubSubClient {
73 handler,
74 heads_subscribers,
75 epochs_subscribers,
76 logs_subscribers,
77 heads_loop_started: Arc::new(RwLock::new(false)),
78 notifications,
79 executor,
80 }
81 }
82
83 pub fn epochs_ordered(&self) -> Arc<Channel<(u64, Vec<H256>)>> {
84 self.notifications.epochs_ordered.clone()
85 }
86
87 pub fn handler(&self) -> Weak<ChainNotificationHandler> {
89 Arc::downgrade(&self.handler)
90 }
91
92 fn start_head_loop(&self) {
93 let mut loop_started = self.heads_loop_started.write();
94 if *loop_started {
95 return;
96 }
97
98 debug!("start_headers_loop");
99 *loop_started = true;
100
101 let new_block_hashes = self.notifications.new_block_hashes.clone();
104 let mut receiver = new_block_hashes.subscribe();
105
106 let handler_clone = self.handler.clone();
108 let this = self.clone();
109
110 let fut = async move {
111 while let Some(hash) = receiver.recv().await {
112 let subscribers = this.heads_subscribers.read();
114
115 if subscribers.is_empty() {
117 new_block_hashes.unsubscribe(receiver.id);
118 let mut loop_started = this.heads_loop_started.write();
119 *loop_started = false;
120 break;
121 }
122
123 let header = match handler_clone.get_header_by_hash(&hash) {
124 Ok(h) => h,
125 Err(e) => {
126 error!(
127 "Unexpected error while constructing RpcHeader: {:?}",
128 e
129 );
130 continue;
131 }
132 };
133
134 let mut ids_to_remove = vec![];
135 for (id, subscriber) in subscribers.iter() {
136 let send_res = notify(
137 subscriber,
138 pubsub::Result::Header(header.clone()),
139 );
140 if let Err(err) = send_res {
141 if err.is_disconnected() {
142 ids_to_remove.push(id.clone());
143 }
144 }
145 }
146
147 drop(subscribers);
148 for id in ids_to_remove {
149 this.heads_subscribers
150 .write()
151 .remove(&SubscriptionId::String(id.as_string()));
152 }
153 }
154 };
155
156 self.executor.spawn(fut);
157 }
158
159 fn start_epoch_loop(&self, id: SubscriberId, sub_epoch: SubscriptionEpoch) {
163 trace!("start_epoch_loop({:?})", id);
164
165 let subscribers = self.epochs_subscribers.clone();
167 let epochs_ordered = self.notifications.epochs_ordered.clone();
168 let handler = self.handler.clone();
169
170 let mut receiver = epochs_ordered.subscribe();
172
173 let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
176 if sub_epoch == SubscriptionEpoch::LatestState {
177 (DEFERRED_STATE_EPOCH_COUNT - 1) as usize
178 } else {
179 0
180 },
181 );
182
183 let fut = async move {
185 while let Some((epoch, hashes)) = receiver.recv().await {
186 trace!("epoch_loop({:?}): {:?}", id, (epoch, &hashes));
187
188 let sub = match subscribers.read().get(&id) {
190 Some(sub) => sub.clone(),
191 None => {
192 epochs_ordered.unsubscribe(receiver.id);
194 return;
195 }
196 };
197
198 let (epoch, hashes) = match queue.push((epoch, hashes)) {
199 None => continue,
200 Some(e) => e,
201 };
202
203 if sub_epoch == SubscriptionEpoch::LatestState {
205 let pivot = hashes.last().expect("empty epoch in pubsub");
206 handler.wait_for_epoch(&pivot).await;
207 }
208
209 let send_res = handler.notify_epoch(sub, (epoch, hashes)).await;
211 if let Err(err) = send_res {
212 if err.is_disconnected() {
213 epochs_ordered.unsubscribe(receiver.id);
214 subscribers
215 .write()
216 .remove(&SubscriptionId::String(id.as_string()));
217 return;
218 }
219 }
220 }
221 };
222
223 self.executor.spawn(fut);
224 }
225
226 fn start_logs_loop(&self, id: SubscriberId) {
230 trace!("start_logs_loop({:?})", id);
231
232 let subscribers = self.logs_subscribers.clone();
234 let epochs_ordered = self.notifications.epochs_ordered.clone();
235 let handler = self.handler.clone();
236
237 let mut receiver = epochs_ordered.subscribe();
239
240 let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
243 (DEFERRED_STATE_EPOCH_COUNT - 1) as usize,
244 );
245
246 let fut = async move {
248 let mut last_epoch = 0;
249
250 while let Some(epoch) = receiver.recv().await {
251 trace!("logs_loop({:?}): {:?}", id, epoch);
252
253 let (sub, filter) = match subscribers.read().get(&id) {
255 Some(sub) => sub.clone(),
256 None => {
257 epochs_ordered.unsubscribe(receiver.id);
259 return;
260 }
261 };
262
263 let epoch = match queue.push(epoch) {
264 None => continue,
265 Some(e) => e,
266 };
267
268 if epoch.0 <= last_epoch {
270 debug!("pivot chain reorg: {} -> {}", last_epoch, epoch.0);
271 assert!(epoch.0 > 0, "Unexpected epoch number received.");
272 handler.notify_revert(&sub, epoch.0 - 1).await;
273 }
274
275 last_epoch = epoch.0;
276
277 let send_res = handler.notify_logs(&sub, filter, epoch).await;
279 if let Err(err) = send_res {
280 if err.is_disconnected() {
281 epochs_ordered.unsubscribe(receiver.id);
282 subscribers
283 .write()
284 .remove(&SubscriptionId::String(id.as_string()));
285 return;
286 }
287 }
288 }
289 };
290
291 self.executor.spawn(fut);
292 }
293}
294
295pub struct ChainNotificationHandler {
297 consensus: SharedConsensusGraph,
298 data_man: Arc<BlockDataManager>,
299 pub network: Network,
300}
301
302impl ChainNotificationHandler {
303 fn get_header_by_hash(&self, hash: &H256) -> Result<RpcHeader, String> {
304 let header = match self.data_man.block_header_by_hash(hash) {
305 Some(h) => build_header(&*h, self.network, self.consensus.clone()),
306 None => return Err("Header not found".to_string()),
307 };
308
309 header
310 }
311
312 async fn notify_epoch(
313 &self, subscriber: Client, epoch: (u64, Vec<H256>),
314 ) -> SinkResult {
315 trace!("notify_epoch({:?})", epoch);
316
317 let (epoch, hashes) = epoch;
318 let hashes = hashes.into_iter().map(H256::from).collect();
319
320 notify(
321 &subscriber,
322 pubsub::Result::Epoch {
323 epoch_number: epoch.into(),
324 epoch_hashes_ordered: hashes,
325 },
326 )
327 }
328
329 async fn notify_revert(&self, subscriber: &Client, epoch: u64) {
330 trace!("notify_revert({:?})", epoch);
331
332 let _ = notify(
333 subscriber,
334 pubsub::Result::ChainReorg {
335 revert_to: epoch.into(),
336 },
337 );
338 }
339
340 async fn notify_logs(
341 &self, subscriber: &Client, filter: LogFilter, epoch: (u64, Vec<H256>),
342 ) -> SinkResult {
343 trace!("notify_logs({:?})", epoch);
344
345 let logs = match self.retrieve_epoch_logs(epoch).await {
349 Some(logs) => logs,
350 None => return Ok(()),
351 };
352
353 let logs = logs
355 .iter()
356 .filter(|l| filter.matches(&l.entry))
357 .cloned()
358 .map(|l| RpcLog::try_from_localized(l, self.network));
359
360 for log in logs {
364 match log {
365 Ok(l) => {
366 let send_res = notify(subscriber, pubsub::Result::Log(l));
367 if send_res.is_err() {
368 return send_res;
369 }
370 }
371 Err(e) => {
372 error!(
373 "Unexpected error while constructing RpcLog: {:?}",
374 e
375 );
376 }
377 }
378 }
379 Ok(())
380 }
381
382 async fn retrieve_block_receipts(
387 &self, block: &H256, pivot: &H256,
388 ) -> Option<Arc<BlockReceipts>> {
389 const POLL_INTERVAL_MS: Duration = Duration::from_millis(100);
390 let epoch = self.data_man.block_height_by_hash(pivot)?;
391
392 for ii in 0.. {
400 let latest = self.consensus.best_epoch_number();
401 match self.data_man.block_execution_result_by_hash_with_epoch(
402 &block, &pivot, false, false, ) {
405 Some(res) => return Some(res.block_receipts.clone()),
406 None => {
407 trace!("Cannot find receipts with {:?}/{:?}", block, pivot);
408 let _ = sleep(POLL_INTERVAL_MS).await;
409 }
410 }
411
412 if ii > 1000 {
414 error!("Cannot find receipts with {:?}/{:?}", block, pivot);
415 return None;
416 } else {
417 if latest
418 > epoch + DEFERRED_STATE_EPOCH_COUNT + REWARD_EPOCH_COUNT
419 {
420 warn!(
424 "Cannot find receipts with {:?}/{:?}, latest_epoch={}",
425 block, pivot, latest
426 );
427 return None;
428 }
429 }
430 }
431
432 unreachable!()
433 }
434
435 async fn wait_for_epoch(&self, pivot: &H256) -> () {
438 let _ = self.retrieve_block_receipts(&pivot, &pivot).await;
439 }
440
441 async fn retrieve_epoch_logs(
442 &self, epoch: (u64, Vec<H256>),
443 ) -> Option<Vec<LocalizedLogEntry>> {
444 let (epoch_number, hashes) = epoch;
445 let pivot = hashes.last().cloned().expect("epoch should not be empty");
446
447 let fut = hashes
449 .iter()
450 .map(|h| self.retrieve_block_receipts(&h, &pivot));
451
452 let receipts = join_all(fut)
453 .await
454 .into_iter()
455 .collect::<Option<Vec<_>>>()?;
456
457 let mut logs = vec![];
458 let mut log_index = 0;
459
460 for (block_hash, block_receipts) in zip(hashes, receipts) {
461 let block = match self
463 .data_man
464 .block_by_hash(&block_hash, true )
465 {
466 Some(b) => b,
467 None => {
468 warn!("Unable to retrieve block {:?}", block_hash);
469 return None;
470 }
471 };
472
473 let txs = &block.transactions;
474 assert_eq!(block_receipts.receipts.len(), txs.len());
475
476 for (txid, (receipt, tx)) in
478 zip(&block_receipts.receipts, txs).enumerate()
479 {
480 let native_logs: Vec<_> = receipt
481 .logs
482 .iter()
483 .cloned()
484 .filter(|l| l.space == Space::Native)
485 .collect();
486
487 for (logid, entry) in native_logs.into_iter().enumerate() {
488 logs.push(LocalizedLogEntry {
489 entry,
490 block_hash,
491 epoch_number,
492 block_timestamp: Some(block.block_header.timestamp()),
493 transaction_hash: tx.hash,
494 transaction_index: txid,
495 log_index,
496 transaction_log_index: logid,
497 });
498
499 log_index += 1;
500 }
501 }
502 }
503
504 Some(logs)
505 }
506}
507
508impl PubSub for PubSubClient {
509 type Metadata = Metadata;
510
511 fn subscribe(
512 &self, _meta: Metadata, subscriber: Subscriber<pubsub::Result>,
513 kind: pubsub::Kind, params: Option<pubsub::Params>,
514 ) {
515 let error = match (kind, params) {
516 (pubsub::Kind::NewHeads, None) => {
518 self.heads_subscribers.write().push(subscriber);
519 self.start_head_loop();
520 return;
521 }
522 (pubsub::Kind::NewHeads, _) => {
523 errors::invalid_params("newHeads", "Expected no parameters.")
524 }
525 (pubsub::Kind::Epochs, None) => {
527 let id = self.epochs_subscribers.write().push(subscriber);
528 self.start_epoch_loop(id, SubscriptionEpoch::LatestMined);
529 return;
530 }
531 (pubsub::Kind::Epochs, Some(pubsub::Params::Epochs(epoch))) => {
532 let id = self.epochs_subscribers.write().push(subscriber);
533 self.start_epoch_loop(id, epoch);
534 return;
535 }
536 (pubsub::Kind::Epochs, _) => {
537 errors::invalid_params("epochs", "Expected epoch parameter.")
538 }
539 (pubsub::Kind::Logs, None) => {
541 let id = self
542 .logs_subscribers
543 .write()
544 .push(subscriber, LogFilter::default());
545
546 self.start_logs_loop(id);
547 return;
548 }
549 (pubsub::Kind::Logs, Some(pubsub::Params::Logs(filter))) => {
550 match filter.into_primitive() {
551 Err(e) => e,
552 Ok(filter) => {
553 let id = self
554 .logs_subscribers
555 .write()
556 .push(subscriber, filter);
557
558 self.start_logs_loop(id);
559 return;
560 }
561 }
562 }
563 (pubsub::Kind::Logs, _) => {
564 errors::invalid_params("logs", "Expected filter parameter.")
565 }
566 _ => errors::unimplemented(None),
567 };
568
569 let _ = subscriber.reject(error);
570 }
571
572 fn unsubscribe(
573 &self, _: Option<Self::Metadata>, id: SubscriptionId,
574 ) -> RpcResult<bool> {
575 let res0 = self.heads_subscribers.write().remove(&id).is_some();
576 let res1 = self.epochs_subscribers.write().remove(&id).is_some();
577 let res2 = self.logs_subscribers.write().remove(&id).is_some();
578
579 Ok(res0 || res1 || res2)
580 }
581}
582
583fn notify(subscriber: &Client, result: pubsub::Result) -> SinkResult {
584 subscriber.notify(Ok(result))
585}