io/
lib.rs

1// Copyright 2015-2018 Parity Technologies (UK) Ltd.
2// This file is part of Parity.
3
4// Parity is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
16
17// Copyright 2019 Conflux Foundation. All rights reserved.
18// Conflux is free software and distributed under GNU General Public License.
19// See http://www.gnu.org/licenses/
20
21//! General IO module.
22
23#![allow(deprecated)]
24
25mod mio_util;
26mod service_mio;
27mod worker;
28
29pub use crate::service_mio::{
30    IoChannel, IoContext, IoManager, IoMessage, IoService, StreamToken,
31    TimerToken, TOKENS_PER_HANDLER,
32};
33
34use mio::{Registry, Token};
35pub use mio_util::{would_block, MapNonBlock, NotifyError};
36use std::{cell::Cell, env, error, fmt, io};
37
38thread_local! {
39    /// Stack size
40    /// Should be modified if it is changed in Rust since it is no way
41    /// to know or get it
42    pub static LOCAL_STACK_SIZE: Cell<usize> = Cell::new(env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024));
43}
44
45#[derive(Debug)]
46/// IO Error
47pub enum IoError {
48    Mio(io::Error),
49    /// Error concerning the Rust standard library's IO subsystem.
50    StdIo(io::Error),
51}
52
53impl fmt::Display for IoError {
54    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
55        // just defer to the std implementation for now.
56        // we can refine the formatting when more variants are added.
57        match *self {
58            IoError::Mio(ref std_err) => std_err.fmt(f),
59            IoError::StdIo(ref std_err) => std_err.fmt(f),
60        }
61    }
62}
63
64impl error::Error for IoError {
65    fn description(&self) -> &str { "IO error" }
66}
67
68impl From<io::Error> for IoError {
69    fn from(err: std::io::Error) -> IoError { IoError::StdIo(err) }
70}
71
72impl<Message: Send> From<NotifyError<IoMessage<Message>>> for IoError {
73    fn from(_err: NotifyError<IoMessage<Message>>) -> IoError {
74        IoError::Mio(io::Error::new(
75            io::ErrorKind::ConnectionAborted,
76            "Network IO notification error",
77        ))
78    }
79}
80
81/// Generic IO handler.
82/// All the handler function are called from within IO event loop.
83/// `Message` type is used as notification data
84pub trait IoHandler<Message>: Send + Sync
85where Message: Send + Sync + 'static
86{
87    /// Initialize the handler
88    fn initialize(&self, _io: &IoContext<Message>) {}
89    /// Timer function called after a timeout created with `HandlerIo::timeout`.
90    fn timeout(&self, _io: &IoContext<Message>, _timer: TimerToken) {}
91    /// Called when a broadcasted message is received. The message can only be
92    /// sent from a different IO handler.
93    fn message(&self, _io: &IoContext<Message>, _message: &Message) {}
94    /// Called when an IO stream gets closed
95    fn stream_hup(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
96    /// Called when an IO stream can be read from
97    fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
98    /// Called when an IO stream can be written to
99    fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
100    /// Register a new stream with the event loop
101    fn register_stream(
102        &self, _stream: StreamToken, _reg: Token, _registry: &Registry,
103    ) {
104    }
105    /// Re-register a stream with the event loop
106    fn update_stream(
107        &self, _stream: StreamToken, _reg: Token, _registry: &Registry,
108    ) {
109    }
110    /// Deregister a stream. Called when stream is removed from event loop
111    fn deregister_stream(&self, _stream: StreamToken, _registry: &Registry) {}
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117    use std::{
118        sync::{atomic, Arc},
119        thread,
120        time::Duration,
121    };
122
123    // Mio's behaviour is too unstable for this test. Sometimes we have to wait
124    // a few milliseconds, sometimes more than 5 seconds for the message to
125    // arrive. Therefore we ignore this test in order to not have spurious
126    // failure when running continuous integration.
127    #[test]
128    #[ignore]
129    fn send_message_to_handler() {
130        struct MyHandler(atomic::AtomicBool);
131
132        #[derive(Clone)]
133        struct MyMessage {
134            data: u32,
135        }
136
137        impl IoHandler<MyMessage> for MyHandler {
138            fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
139                assert_eq!(message.data, 5);
140                self.0.store(true, atomic::Ordering::SeqCst);
141            }
142        }
143
144        let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
145
146        let service = IoService::<MyMessage>::start(123)
147            .expect("Error creating network service");
148        service.register_handler(handler.clone()).unwrap();
149
150        service.send_message(MyMessage { data: 5 }).unwrap();
151
152        thread::sleep(Duration::from_secs(1));
153        assert!(handler.0.load(atomic::Ordering::SeqCst));
154    }
155
156    #[test]
157    fn timeout_working() {
158        struct MyHandler(atomic::AtomicBool);
159
160        #[derive(Clone)]
161        #[allow(dead_code)]
162        struct MyMessage {
163            data: u32,
164        }
165
166        impl IoHandler<MyMessage> for MyHandler {
167            fn initialize(&self, io: &IoContext<MyMessage>) {
168                io.register_timer_once(1234, Duration::from_millis(500))
169                    .unwrap();
170            }
171
172            fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
173                assert_eq!(timer, 1234);
174                assert!(!self.0.swap(true, atomic::Ordering::SeqCst));
175            }
176        }
177
178        let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
179
180        let service = IoService::<MyMessage>::start(123)
181            .expect("Error creating network service");
182        service.register_handler(handler.clone()).unwrap();
183
184        thread::sleep(Duration::from_secs(2));
185        assert!(handler.0.load(atomic::Ordering::SeqCst));
186    }
187
188    #[test]
189    fn multi_timeout_working() {
190        struct MyHandler(atomic::AtomicUsize);
191
192        #[derive(Clone)]
193        #[allow(dead_code)]
194        struct MyMessage {
195            data: u32,
196        }
197
198        impl IoHandler<MyMessage> for MyHandler {
199            fn initialize(&self, io: &IoContext<MyMessage>) {
200                io.register_timer(1234, Duration::from_millis(500)).unwrap();
201            }
202
203            fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
204                assert_eq!(timer, 1234);
205                self.0.fetch_add(1, atomic::Ordering::SeqCst);
206            }
207        }
208
209        let handler = Arc::new(MyHandler(atomic::AtomicUsize::new(0)));
210
211        let service = IoService::<MyMessage>::start(123)
212            .expect("Error creating network service");
213        service.register_handler(handler.clone()).unwrap();
214
215        thread::sleep(Duration::from_secs(2));
216        assert!(handler.0.load(atomic::Ordering::SeqCst) >= 2);
217    }
218}