use crate::{
    service_mio::{HandlerId, IoChannel, IoContext},
    IoHandler, LOCAL_STACK_SIZE,
};
use crossbeam_channel::{Receiver, RecvTimeoutError};
use crossbeam_deque::{Steal, Stealer};
use log::{trace, warn};
use std::{
    sync::{
        atomic::{AtomicBool, Ordering as AtomicOrdering},
        Arc, Condvar as SCondvar, Mutex as SMutex,
    },
    thread::{self, JoinHandle},
    time::Duration,
};
const STACK_SIZE: usize = 16 * 1024 * 1024;
pub enum WorkType<Message> {
    Timeout,
    Message(Arc<Message>),
}
pub struct Work<Message> {
    pub work_type: WorkType<Message>,
    pub token: usize,
    pub handler_id: HandlerId,
    pub handler: Arc<dyn IoHandler<Message>>,
}
pub struct SocketWorker {
    thread: Option<JoinHandle<()>>,
    deleting: Arc<AtomicBool>,
}
impl SocketWorker {
    pub fn new<Message>(
        index: usize, rx: Receiver<Work<Message>>, channel: IoChannel<Message>,
    ) -> SocketWorker
    where Message: Send + Sync + 'static {
        let deleting = Arc::new(AtomicBool::new(false));
        let mut worker = SocketWorker {
            thread: None,
            deleting: deleting.clone(),
        };
        worker.thread = Some(
            thread::Builder::new()
                .stack_size(STACK_SIZE)
                .name(format!("Socket IO Worker #{}", index))
                .spawn(move || {
                    LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
                    SocketWorker::work_loop(rx, channel.clone(), deleting)
                })
                .expect("Error creating socket worker thread"),
        );
        worker
    }
    fn work_loop<Message>(
        rx: Receiver<Work<Message>>, channel: IoChannel<Message>,
        deleting: Arc<AtomicBool>,
    ) where
        Message: Send + Sync + 'static,
    {
        while !deleting.load(AtomicOrdering::Acquire) {
            match rx.recv_timeout(Duration::from_millis(500)) {
                Ok(work) => SocketWorker::do_work(work, channel.clone()),
                Err(RecvTimeoutError::Timeout) => continue,
                Err(RecvTimeoutError::Disconnected) => break,
            }
        }
    }
    fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>)
    where Message: Send + Sync + 'static {
        match work.work_type {
            WorkType::Message(message) => {
                work.handler.message(
                    &IoContext::new(channel, work.handler_id),
                    &*message,
                );
            }
            _ => warn!(target: "SocketWorker::do_work", "Unexpected WorkType"),
        }
    }
}
impl Drop for SocketWorker {
    fn drop(&mut self) {
        trace!(target: "shutdown", "[SocketIoWorker] Closing...");
        self.deleting.store(true, AtomicOrdering::Release);
        if let Some(thread) = self.thread.take() {
            thread.join().ok();
        }
        trace!(target: "shutdown", "[SocketIoWorker] Closed");
    }
}
pub struct Worker {
    thread: Option<JoinHandle<()>>,
    wait: Arc<SCondvar>,
    deleting: Arc<AtomicBool>,
    wait_mutex: Arc<SMutex<()>>,
}
impl Worker {
    pub fn new<Message>(
        index: usize, stealer: Stealer<Work<Message>>,
        channel: IoChannel<Message>, wait: Arc<SCondvar>,
        wait_mutex: Arc<SMutex<()>>,
    ) -> Worker
    where
        Message: Send + Sync + 'static,
    {
        let deleting = Arc::new(AtomicBool::new(false));
        let mut worker = Worker {
            thread: None,
            wait: wait.clone(),
            deleting: deleting.clone(),
            wait_mutex: wait_mutex.clone(),
        };
        worker.thread = Some(
            thread::Builder::new()
                .stack_size(STACK_SIZE)
                .name(format!("IO Worker #{}", index))
                .spawn(move || {
                    LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
                    Worker::work_loop(
                        stealer,
                        channel.clone(),
                        wait,
                        wait_mutex.clone(),
                        deleting,
                    )
                })
                .expect("Error creating worker thread"),
        );
        worker
    }
    fn work_loop<Message>(
        stealer: Stealer<Work<Message>>, channel: IoChannel<Message>,
        wait: Arc<SCondvar>, wait_mutex: Arc<SMutex<()>>,
        deleting: Arc<AtomicBool>,
    ) where
        Message: Send + Sync + 'static,
    {
        loop {
            {
                let lock = wait_mutex.lock().expect("Poisoned work_loop mutex");
                if deleting.load(AtomicOrdering::Acquire) {
                    return;
                }
                std::mem::drop(wait.wait(lock));
            }
            while !deleting.load(AtomicOrdering::Acquire) {
                match stealer.steal() {
                    Steal::Success(work) => {
                        Worker::do_work(work, channel.clone())
                    }
                    Steal::Retry => {}
                    Steal::Empty => break,
                }
            }
        }
    }
    fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>)
    where Message: Send + Sync + 'static {
        match work.work_type {
            WorkType::Timeout => {
                work.handler.timeout(
                    &IoContext::new(channel, work.handler_id),
                    work.token,
                );
            }
            WorkType::Message(message) => {
                work.handler.message(
                    &IoContext::new(channel, work.handler_id),
                    &*message,
                );
            }
        }
    }
}
impl Drop for Worker {
    fn drop(&mut self) {
        trace!(target: "shutdown", "[IoWorker] Closing...");
        {
            let _lock =
                self.wait_mutex.lock().expect("Poisoned work_loop mutex");
            self.deleting.store(true, AtomicOrdering::Release);
            self.wait.notify_all();
        }
        if let Some(thread) = self.thread.take() {
            thread.join().ok();
        }
        trace!(target: "shutdown", "[IoWorker] Closed");
    }
}