1use crate::rpc::{
6 errors,
7 helpers::{build_header, EpochQueue, SubscriberId, Subscribers},
8 metadata::Metadata,
9 traits::pubsub::PubSub,
10 types::{
11 pubsub::{self, SubscriptionEpoch},
12 Header as RpcHeader, Log as RpcLog,
13 },
14};
15use cfx_addr::Network;
16use cfx_parameters::{
17 consensus::DEFERRED_STATE_EPOCH_COUNT,
18 consensus_internal::REWARD_EPOCH_COUNT,
19};
20use cfx_rpc_utils::error::jsonrpc_error_helpers::error_object_owned_to_jsonrpc_error;
21use cfx_types::{Space, H256};
22use cfxcore::{
23 channel::Channel, BlockDataManager, Notifications, SharedConsensusGraph,
24};
25use futures::future::join_all;
26use itertools::zip;
27use jsonrpc_core::Result as RpcResult;
28use jsonrpc_pubsub::{
29 typed::{Sink, Subscriber},
30 SinkResult, SubscriptionId,
31};
32use log::{debug, error, trace, warn};
33use parking_lot::RwLock;
34use primitives::{
35 filter::LogFilter, log_entry::LocalizedLogEntry, BlockReceipts,
36};
37use std::{
38 sync::{Arc, Weak},
39 time::Duration,
40};
41use tokio::{runtime::Runtime, time::sleep};
42
43type Client = Sink<pubsub::Result>;
44
45#[derive(Clone)]
47pub struct PubSubClient {
48 handler: Arc<ChainNotificationHandler>,
49 heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
50 epochs_subscribers: Arc<RwLock<Subscribers<Client>>>,
51 logs_subscribers: Arc<RwLock<Subscribers<(Client, LogFilter)>>>,
52 heads_loop_started: Arc<RwLock<bool>>,
53 notifications: Arc<Notifications>,
54 pub executor: Arc<Runtime>,
55}
56
57impl PubSubClient {
58 pub fn new(
60 executor: Arc<Runtime>, consensus: SharedConsensusGraph,
61 notifications: Arc<Notifications>, network: Network,
62 ) -> Self {
63 let heads_subscribers = Arc::new(RwLock::new(Subscribers::default()));
64 let epochs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
65 let logs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
66
67 let handler = Arc::new(ChainNotificationHandler {
68 consensus: consensus.clone(),
69 data_man: consensus.data_manager().clone(),
70 network,
71 });
72
73 PubSubClient {
74 handler,
75 heads_subscribers,
76 epochs_subscribers,
77 logs_subscribers,
78 heads_loop_started: Arc::new(RwLock::new(false)),
79 notifications,
80 executor,
81 }
82 }
83
84 pub fn epochs_ordered(&self) -> Arc<Channel<(u64, Vec<H256>)>> {
85 self.notifications.epochs_ordered.clone()
86 }
87
88 pub fn handler(&self) -> Weak<ChainNotificationHandler> {
90 Arc::downgrade(&self.handler)
91 }
92
93 fn start_head_loop(&self) {
94 let mut loop_started = self.heads_loop_started.write();
95 if *loop_started {
96 return;
97 }
98
99 debug!("start_headers_loop");
100 *loop_started = true;
101
102 let new_block_hashes = self.notifications.new_block_hashes.clone();
105 let mut receiver = new_block_hashes.subscribe();
106
107 let handler_clone = self.handler.clone();
109 let this = self.clone();
110
111 let fut = async move {
112 while let Some(hash) = receiver.recv().await {
113 let subscribers = this.heads_subscribers.read();
115
116 if subscribers.is_empty() {
118 new_block_hashes.unsubscribe(receiver.id);
119 let mut loop_started = this.heads_loop_started.write();
120 *loop_started = false;
121 break;
122 }
123
124 let header = match handler_clone.get_header_by_hash(&hash) {
125 Ok(h) => h,
126 Err(e) => {
127 error!(
128 "Unexpected error while constructing RpcHeader: {:?}",
129 e
130 );
131 continue;
132 }
133 };
134
135 let mut ids_to_remove = vec![];
136 for (id, subscriber) in subscribers.iter() {
137 let send_res = notify(
138 subscriber,
139 pubsub::Result::Header(header.clone()),
140 );
141 if let Err(err) = send_res {
142 if err.is_disconnected() {
143 ids_to_remove.push(id.clone());
144 }
145 }
146 }
147
148 drop(subscribers);
149 for id in ids_to_remove {
150 this.heads_subscribers
151 .write()
152 .remove(&SubscriptionId::String(id.as_string()));
153 }
154 }
155 };
156
157 self.executor.spawn(fut);
158 }
159
160 fn start_epoch_loop(&self, id: SubscriberId, sub_epoch: SubscriptionEpoch) {
164 trace!("start_epoch_loop({:?})", id);
165
166 let subscribers = self.epochs_subscribers.clone();
168 let epochs_ordered = self.notifications.epochs_ordered.clone();
169 let handler = self.handler.clone();
170
171 let mut receiver = epochs_ordered.subscribe();
173
174 let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
177 if sub_epoch == SubscriptionEpoch::LatestState {
178 (DEFERRED_STATE_EPOCH_COUNT - 1) as usize
179 } else {
180 0
181 },
182 );
183
184 let fut = async move {
186 while let Some((epoch, hashes)) = receiver.recv().await {
187 trace!("epoch_loop({:?}): {:?}", id, (epoch, &hashes));
188
189 let sub = match subscribers.read().get(&id) {
191 Some(sub) => sub.clone(),
192 None => {
193 epochs_ordered.unsubscribe(receiver.id);
195 return;
196 }
197 };
198
199 let (epoch, hashes) = match queue.push((epoch, hashes)) {
200 None => continue,
201 Some(e) => e,
202 };
203
204 if sub_epoch == SubscriptionEpoch::LatestState {
206 let pivot = hashes.last().expect("empty epoch in pubsub");
207 handler.wait_for_epoch(&pivot).await;
208 }
209
210 let send_res = handler.notify_epoch(sub, (epoch, hashes)).await;
212 if let Err(err) = send_res {
213 if err.is_disconnected() {
214 epochs_ordered.unsubscribe(receiver.id);
215 subscribers
216 .write()
217 .remove(&SubscriptionId::String(id.as_string()));
218 return;
219 }
220 }
221 }
222 };
223
224 self.executor.spawn(fut);
225 }
226
227 fn start_logs_loop(&self, id: SubscriberId) {
231 trace!("start_logs_loop({:?})", id);
232
233 let subscribers = self.logs_subscribers.clone();
235 let epochs_ordered = self.notifications.epochs_ordered.clone();
236 let handler = self.handler.clone();
237
238 let mut receiver = epochs_ordered.subscribe();
240
241 let mut queue = EpochQueue::<Vec<H256>>::with_capacity(
244 (DEFERRED_STATE_EPOCH_COUNT - 1) as usize,
245 );
246
247 let fut = async move {
249 let mut last_epoch = 0;
250
251 while let Some(epoch) = receiver.recv().await {
252 trace!("logs_loop({:?}): {:?}", id, epoch);
253
254 let (sub, filter) = match subscribers.read().get(&id) {
256 Some(sub) => sub.clone(),
257 None => {
258 epochs_ordered.unsubscribe(receiver.id);
260 return;
261 }
262 };
263
264 let epoch = match queue.push(epoch) {
265 None => continue,
266 Some(e) => e,
267 };
268
269 if epoch.0 <= last_epoch {
271 debug!("pivot chain reorg: {} -> {}", last_epoch, epoch.0);
272 assert!(epoch.0 > 0, "Unexpected epoch number received.");
273 handler.notify_revert(&sub, epoch.0 - 1).await;
274 }
275
276 last_epoch = epoch.0;
277
278 let send_res = handler.notify_logs(&sub, filter, epoch).await;
280 if let Err(err) = send_res {
281 if err.is_disconnected() {
282 epochs_ordered.unsubscribe(receiver.id);
283 subscribers
284 .write()
285 .remove(&SubscriptionId::String(id.as_string()));
286 return;
287 }
288 }
289 }
290 };
291
292 self.executor.spawn(fut);
293 }
294}
295
296pub struct ChainNotificationHandler {
298 consensus: SharedConsensusGraph,
299 data_man: Arc<BlockDataManager>,
300 pub network: Network,
301}
302
303impl ChainNotificationHandler {
304 fn get_header_by_hash(&self, hash: &H256) -> Result<RpcHeader, String> {
305 let header = match self.data_man.block_header_by_hash(hash) {
306 Some(h) => build_header(&*h, self.network, self.consensus.clone()),
307 None => return Err("Header not found".to_string()),
308 };
309
310 header
311 }
312
313 async fn notify_epoch(
314 &self, subscriber: Client, epoch: (u64, Vec<H256>),
315 ) -> SinkResult {
316 trace!("notify_epoch({:?})", epoch);
317
318 let (epoch, hashes) = epoch;
319
320 notify(
321 &subscriber,
322 pubsub::Result::Epoch {
323 epoch_number: epoch.into(),
324 epoch_hashes_ordered: hashes,
325 },
326 )
327 }
328
329 async fn notify_revert(&self, subscriber: &Client, epoch: u64) {
330 trace!("notify_revert({:?})", epoch);
331
332 let _ = notify(
333 subscriber,
334 pubsub::Result::ChainReorg {
335 revert_to: epoch.into(),
336 },
337 );
338 }
339
340 async fn notify_logs(
341 &self, subscriber: &Client, filter: LogFilter, epoch: (u64, Vec<H256>),
342 ) -> SinkResult {
343 trace!("notify_logs({:?})", epoch);
344
345 let logs = match self.retrieve_epoch_logs(epoch).await {
349 Some(logs) => logs,
350 None => return Ok(()),
351 };
352
353 let logs = logs
355 .iter()
356 .filter(|l| filter.matches(&l.entry))
357 .cloned()
358 .map(|l| RpcLog::try_from_localized(l, self.network));
359
360 for log in logs {
364 match log {
365 Ok(l) => {
366 let send_res = notify(subscriber, pubsub::Result::Log(l));
367 if send_res.is_err() {
368 return send_res;
369 }
370 }
371 Err(e) => {
372 error!(
373 "Unexpected error while constructing RpcLog: {:?}",
374 e
375 );
376 }
377 }
378 }
379 Ok(())
380 }
381
382 async fn retrieve_block_receipts(
387 &self, block: &H256, pivot: &H256,
388 ) -> Option<Arc<BlockReceipts>> {
389 const POLL_INTERVAL_MS: Duration = Duration::from_millis(100);
390 let epoch = self.data_man.block_height_by_hash(pivot)?;
391
392 for ii in 0.. {
400 let latest = self.consensus.best_epoch_number();
401 match self.data_man.block_execution_result_by_hash_with_epoch(
402 &block, &pivot, false, false, ) {
405 Some(res) => return Some(res.block_receipts.clone()),
406 None => {
407 trace!("Cannot find receipts with {:?}/{:?}", block, pivot);
408 let _ = sleep(POLL_INTERVAL_MS).await;
409 }
410 }
411
412 if ii > 1000 {
414 error!("Cannot find receipts with {:?}/{:?}", block, pivot);
415 return None;
416 } else {
417 if latest
418 > epoch + DEFERRED_STATE_EPOCH_COUNT + REWARD_EPOCH_COUNT
419 {
420 warn!(
424 "Cannot find receipts with {:?}/{:?}, latest_epoch={}",
425 block, pivot, latest
426 );
427 return None;
428 }
429 }
430 }
431
432 unreachable!()
433 }
434
435 async fn wait_for_epoch(&self, pivot: &H256) -> () {
438 let _ = self.retrieve_block_receipts(&pivot, &pivot).await;
439 }
440
441 async fn retrieve_epoch_logs(
442 &self, epoch: (u64, Vec<H256>),
443 ) -> Option<Vec<LocalizedLogEntry>> {
444 let (epoch_number, hashes) = epoch;
445 let pivot = hashes.last().cloned().expect("epoch should not be empty");
446
447 let fut = hashes
449 .iter()
450 .map(|h| self.retrieve_block_receipts(&h, &pivot));
451
452 let receipts = join_all(fut)
453 .await
454 .into_iter()
455 .collect::<Option<Vec<_>>>()?;
456
457 let mut logs = vec![];
458 let mut log_index = 0;
459
460 for (block_hash, block_receipts) in zip(hashes, receipts) {
461 let block = match self
463 .data_man
464 .block_by_hash(&block_hash, true )
465 {
466 Some(b) => b,
467 None => {
468 warn!("Unable to retrieve block {:?}", block_hash);
469 return None;
470 }
471 };
472
473 let txs = &block.transactions;
474 assert_eq!(block_receipts.receipts.len(), txs.len());
475
476 for (txid, (receipt, tx)) in
478 zip(&block_receipts.receipts, txs).enumerate()
479 {
480 let native_logs: Vec<_> = receipt
481 .logs
482 .iter()
483 .cloned()
484 .filter(|l| l.space == Space::Native)
485 .collect();
486
487 for (logid, entry) in native_logs.into_iter().enumerate() {
488 logs.push(LocalizedLogEntry {
489 entry,
490 block_hash,
491 epoch_number,
492 block_timestamp: Some(block.block_header.timestamp()),
493 transaction_hash: tx.hash,
494 transaction_index: txid,
495 log_index,
496 transaction_log_index: logid,
497 });
498
499 log_index += 1;
500 }
501 }
502 }
503
504 Some(logs)
505 }
506}
507
508impl PubSub for PubSubClient {
509 type Metadata = Metadata;
510
511 fn subscribe(
512 &self, _meta: Metadata, subscriber: Subscriber<pubsub::Result>,
513 kind: pubsub::Kind, params: Option<pubsub::Params>,
514 ) {
515 let error = match (kind, params) {
516 (pubsub::Kind::NewHeads, None) => {
518 self.heads_subscribers.write().push(subscriber);
519 self.start_head_loop();
520 return;
521 }
522 (pubsub::Kind::NewHeads, _) => {
523 errors::invalid_params("newHeads", "Expected no parameters.")
524 }
525 (pubsub::Kind::Epochs, None) => {
527 let id = self.epochs_subscribers.write().push(subscriber);
528 self.start_epoch_loop(id, SubscriptionEpoch::LatestMined);
529 return;
530 }
531 (pubsub::Kind::Epochs, Some(pubsub::Params::Epochs(epoch))) => {
532 let id = self.epochs_subscribers.write().push(subscriber);
533 self.start_epoch_loop(id, epoch);
534 return;
535 }
536 (pubsub::Kind::Epochs, _) => {
537 errors::invalid_params("epochs", "Expected epoch parameter.")
538 }
539 (pubsub::Kind::Logs, None) => {
541 let id = self
542 .logs_subscribers
543 .write()
544 .push(subscriber, LogFilter::default());
545
546 self.start_logs_loop(id);
547 return;
548 }
549 (pubsub::Kind::Logs, Some(pubsub::Params::Logs(filter))) => {
550 match filter.into_primitive() {
551 Err(e) => error_object_owned_to_jsonrpc_error(e),
552 Ok(filter) => {
553 let id = self
554 .logs_subscribers
555 .write()
556 .push(subscriber, filter);
557
558 self.start_logs_loop(id);
559 return;
560 }
561 }
562 }
563 (pubsub::Kind::Logs, _) => {
564 errors::invalid_params("logs", "Expected filter parameter.")
565 }
566 _ => errors::unimplemented(None),
567 };
568
569 let _ = subscriber.reject(error);
570 }
571
572 fn unsubscribe(
573 &self, _: Option<Self::Metadata>, id: SubscriptionId,
574 ) -> RpcResult<bool> {
575 let res0 = self.heads_subscribers.write().remove(&id).is_some();
576 let res1 = self.epochs_subscribers.write().remove(&id).is_some();
577 let res2 = self.logs_subscribers.write().remove(&id).is_some();
578
579 Ok(res0 || res1 || res2)
580 }
581}
582
583fn notify(subscriber: &Client, result: pubsub::Result) -> SinkResult {
584 subscriber.notify(Ok(result))
585}