#![allow(deprecated)]
mod service_mio;
mod worker;
use mio::{deprecated::NotifyError, Poll, Token};
use std::{cell::Cell, error, fmt};
thread_local! {
pub static LOCAL_STACK_SIZE: Cell<usize> = Cell::new(::std::env::var("RUST_MIN_STACK").ok().and_then(|s| s.parse().ok()).unwrap_or(2 * 1024 * 1024));
}
#[derive(Debug)]
pub enum IoError {
Mio(::std::io::Error),
StdIo(::std::io::Error),
}
impl fmt::Display for IoError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
IoError::Mio(ref std_err) => std_err.fmt(f),
IoError::StdIo(ref std_err) => std_err.fmt(f),
}
}
}
impl error::Error for IoError {
fn description(&self) -> &str { "IO error" }
}
impl From<::std::io::Error> for IoError {
fn from(err: ::std::io::Error) -> IoError { IoError::StdIo(err) }
}
impl<Message> From<NotifyError<service_mio::IoMessage<Message>>> for IoError
where Message: Send
{
fn from(_err: NotifyError<service_mio::IoMessage<Message>>) -> IoError {
IoError::Mio(::std::io::Error::new(
::std::io::ErrorKind::ConnectionAborted,
"Network IO notification error",
))
}
}
pub trait IoHandler<Message>: Send + Sync
where Message: Send + Sync + 'static
{
fn initialize(&self, _io: &IoContext<Message>) {}
fn timeout(&self, _io: &IoContext<Message>, _timer: TimerToken) {}
fn message(&self, _io: &IoContext<Message>, _message: &Message) {}
fn stream_hup(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
fn stream_readable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
fn stream_writable(&self, _io: &IoContext<Message>, _stream: StreamToken) {}
fn register_stream(
&self, _stream: StreamToken, _reg: Token, _event_loop: &Poll,
) {
}
fn update_stream(
&self, _stream: StreamToken, _reg: Token, _event_loop: &Poll,
) {
}
fn deregister_stream(&self, _stream: StreamToken, _event_loop: &Poll) {}
}
pub use crate::service_mio::{
IoChannel, IoContext, IoManager, IoService, StreamToken, TimerToken,
TOKENS_PER_HANDLER,
};
#[cfg(test)]
mod tests {
use super::*;
use std::{
sync::{atomic, Arc},
thread,
time::Duration,
};
#[test]
#[ignore]
fn send_message_to_handler() {
struct MyHandler(atomic::AtomicBool);
#[derive(Clone)]
struct MyMessage {
data: u32,
}
impl IoHandler<MyMessage> for MyHandler {
fn message(&self, _io: &IoContext<MyMessage>, message: &MyMessage) {
assert_eq!(message.data, 5);
self.0.store(true, atomic::Ordering::SeqCst);
}
}
let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
let poll = Arc::new(Poll::new().unwrap());
let service = IoService::<MyMessage>::start(poll)
.expect("Error creating network service");
service.register_handler(handler.clone()).unwrap();
service.send_message(MyMessage { data: 5 }).unwrap();
thread::sleep(Duration::from_secs(1));
assert!(handler.0.load(atomic::Ordering::SeqCst));
}
#[test]
fn timeout_working() {
struct MyHandler(atomic::AtomicBool);
#[derive(Clone)]
#[allow(dead_code)]
struct MyMessage {
data: u32,
}
impl IoHandler<MyMessage> for MyHandler {
fn initialize(&self, io: &IoContext<MyMessage>) {
io.register_timer_once(1234, Duration::from_millis(500))
.unwrap();
}
fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
assert_eq!(timer, 1234);
assert!(!self.0.swap(true, atomic::Ordering::SeqCst));
}
}
let handler = Arc::new(MyHandler(atomic::AtomicBool::new(false)));
let poll = Arc::new(Poll::new().unwrap());
let service = IoService::<MyMessage>::start(poll)
.expect("Error creating network service");
service.register_handler(handler.clone()).unwrap();
thread::sleep(Duration::from_secs(2));
assert!(handler.0.load(atomic::Ordering::SeqCst));
}
#[test]
fn multi_timeout_working() {
struct MyHandler(atomic::AtomicUsize);
#[derive(Clone)]
#[allow(dead_code)]
struct MyMessage {
data: u32,
}
impl IoHandler<MyMessage> for MyHandler {
fn initialize(&self, io: &IoContext<MyMessage>) {
io.register_timer(1234, Duration::from_millis(500)).unwrap();
}
fn timeout(&self, _io: &IoContext<MyMessage>, timer: TimerToken) {
assert_eq!(timer, 1234);
self.0.fetch_add(1, atomic::Ordering::SeqCst);
}
}
let handler = Arc::new(MyHandler(atomic::AtomicUsize::new(0)));
let poll = Arc::new(Poll::new().unwrap());
let service = IoService::<MyMessage>::start(poll)
.expect("Error creating network service");
service.register_handler(handler.clone()).unwrap();
thread::sleep(Duration::from_secs(2));
assert!(handler.0.load(atomic::Ordering::SeqCst) >= 2);
}
}