cfx_rpc_utils/helpers/
blocking_tasks.rs

1//! Spawns a blocking task. CPU heavy tasks are executed with the `rayon`
2//! library. IO heavy tasks are executed on the `tokio` runtime.
3
4use crate::error::EthApiError;
5use cfx_tasks::TaskSpawner;
6use futures::Future;
7use jsonrpsee::types::ErrorObjectOwned;
8use tokio::sync::oneshot;
9
10/// Executes code on a blocking thread.
11pub trait SpawnBlocking: Clone + Send + Sync + 'static {
12    /// Returns a handle for spawning IO heavy blocking tasks.
13    ///
14    /// Runtime access in default trait method implementations.
15    fn io_task_spawner(&self) -> impl TaskSpawner;
16
17    /// Executes the future on a new blocking task.
18    ///
19    /// Note: This is expected for futures that are dominated by blocking IO
20    /// operations, for tracing or CPU bound operations in general use
21    /// [`spawn_tracing`](Self::spawn_tracing).
22    fn spawn_blocking_io<F, R>(
23        &self, f: F,
24    ) -> impl Future<Output = Result<R, ErrorObjectOwned>> + Send
25    where
26        F: FnOnce(Self) -> Result<R, ErrorObjectOwned> + Send + 'static,
27        R: Send + 'static,
28    {
29        let (tx, rx) = oneshot::channel();
30        let this = self.clone();
31        self.io_task_spawner().spawn_blocking(Box::pin(async move {
32            let res = f(this);
33            let _ = tx.send(res);
34        }));
35
36        async move {
37            rx.await.map_err(|_| {
38                ErrorObjectOwned::from(EthApiError::InternalEthError)
39            })?
40        }
41    }
42}