1#![allow(deprecated)]
24
25mod mio_util;
26mod service_mio;
27mod worker;
28
29pub use crate::service_mio::{
30 IoChannel, IoContext, IoManager, IoMessage, IoService, StreamToken,
31 TimerToken, TOKENS_PER_HANDLER,
32};
33
34use mio::{Registry, Token};
35pub use mio_util::{would_block, MapNonBlock, NotifyError};
36use std::{cell::Cell, env, error, fmt, io};
37
38thread_local! {
39 pub static LOCAL_STACK_SIZE: Cell<usize> = Cell::new(env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024));
43}
44
45#[derive(Debug)]
46pub enum IoError {
48 Mio(io::Error),
49 StdIo(io::Error),
51}
52
53impl fmt::Display for IoError {
54 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
55 match *self {
58 IoError::Mio(ref std_err) => std_err.fmt(f),
59 IoError::StdIo(ref std_err) => std_err.fmt(f),
60 }
61 }
62}
63
64impl error::Error for IoError {
65 fn description(&self) -> &str { "IO error" }
66}
67
68impl From<io::Error> for IoError {
69 fn from(err: std::io::Error) -> IoError { IoError::StdIo(err) }
70}
71
72impl<Message: Send> From<NotifyError<IoMessage<Message>>> for IoError {
73 fn from(_err: NotifyError<IoMessage<Message>>) -> IoError {
74 IoError::Mio(io::Error::new(
75 io::ErrorKind::ConnectionAborted,
76 "Network IO notification error",
77 ))
78 }
79}
80
81pub trait IoHandler<Message>: Send + Sync
85where Message: Send + Sync + 'static
86{
87 fn initialize(&self, _io: &IoContext<Message>) {}
89 fn timeout(&self, _io: &IoContext<Message>, _timer: TimerToken) {}
91 fn message(&self, _io: &IoContext<Message>, _message: &Message) {}
94 fn stream_hup(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
96 fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
98 fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
100 fn register_stream(
102 &self, _stream: StreamToken, _reg: Token, _registry: &Registry,
103 ) {
104 }
105 fn update_stream(
107 &self, _stream: StreamToken, _reg: Token, _registry: &Registry,
108 ) {
109 }
110 fn deregister_stream(&self, _stream: StreamToken, _registry: &Registry) {}
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117 use std::{
118 sync::{atomic, Arc},
119 thread,
120 time::Duration,
121 };
122
123 #[test]
128 #[ignore]
129 fn send_message_to_handler() {
130 struct MyHandler(atomic::AtomicBool);
131
132 #[derive(Clone)]
133 struct MyMessage {
134 data: u32,
135 }
136
137 impl IoHandler<MyMessage> for MyHandler {
138 fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
139 assert_eq!(message.data, 5);
140 self.0.store(true, atomic::Ordering::SeqCst);
141 }
142 }
143
144 let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
145
146 let service = IoService::<MyMessage>::start(123)
147 .expect("Error creating network service");
148 service.register_handler(handler.clone()).unwrap();
149
150 service.send_message(MyMessage { data: 5 }).unwrap();
151
152 thread::sleep(Duration::from_secs(1));
153 assert!(handler.0.load(atomic::Ordering::SeqCst));
154 }
155
156 #[test]
157 fn timeout_working() {
158 struct MyHandler(atomic::AtomicBool);
159
160 #[derive(Clone)]
161 #[allow(dead_code)]
162 struct MyMessage {
163 data: u32,
164 }
165
166 impl IoHandler<MyMessage> for MyHandler {
167 fn initialize(&self, io: &IoContext<MyMessage>) {
168 io.register_timer_once(1234, Duration::from_millis(500))
169 .unwrap();
170 }
171
172 fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
173 assert_eq!(timer, 1234);
174 assert!(!self.0.swap(true, atomic::Ordering::SeqCst));
175 }
176 }
177
178 let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
179
180 let service = IoService::<MyMessage>::start(123)
181 .expect("Error creating network service");
182 service.register_handler(handler.clone()).unwrap();
183
184 thread::sleep(Duration::from_secs(2));
185 assert!(handler.0.load(atomic::Ordering::SeqCst));
186 }
187
188 #[test]
189 fn multi_timeout_working() {
190 struct MyHandler(atomic::AtomicUsize);
191
192 #[derive(Clone)]
193 #[allow(dead_code)]
194 struct MyMessage {
195 data: u32,
196 }
197
198 impl IoHandler<MyMessage> for MyHandler {
199 fn initialize(&self, io: &IoContext<MyMessage>) {
200 io.register_timer(1234, Duration::from_millis(500)).unwrap();
201 }
202
203 fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
204 assert_eq!(timer, 1234);
205 self.0.fetch_add(1, atomic::Ordering::SeqCst);
206 }
207 }
208
209 let handler = Arc::new(MyHandler(atomic::AtomicUsize::new(0)));
210
211 let service = IoService::<MyMessage>::start(123)
212 .expect("Error creating network service");
213 service.register_handler(handler.clone()).unwrap();
214
215 thread::sleep(Duration::from_secs(2));
216 assert!(handler.0.load(atomic::Ordering::SeqCst) >= 2);
217 }
218}