1use crate::{
22 service_mio::{HandlerId, IoChannel, IoContext},
23 IoHandler, LOCAL_STACK_SIZE,
24};
25use crossbeam_channel::{Receiver, RecvTimeoutError};
26use crossbeam_deque::{Steal, Stealer};
27use log::{trace, warn};
28use std::{
29 sync::{
30 atomic::{AtomicBool, Ordering as AtomicOrdering},
31 Arc, Condvar as SCondvar, Mutex as SMutex,
32 },
33 thread::{self, JoinHandle},
34 time::Duration,
35};
36
37const STACK_SIZE: usize = 16 * 1024 * 1024;
38
39pub enum WorkType<Message> {
40 Timeout,
41 Message(Arc<Message>),
42}
43
44pub struct Work<Message> {
45 pub work_type: WorkType<Message>,
46 pub token: usize,
47 pub handler_id: HandlerId,
48 pub handler: Arc<dyn IoHandler<Message>>,
49}
50
51pub struct SocketWorker {
53 thread: Option<JoinHandle<()>>,
54 deleting: Arc<AtomicBool>,
55}
56
57impl SocketWorker {
58 pub fn new<Message>(
60 index: usize, rx: Receiver<Work<Message>>, channel: IoChannel<Message>,
61 ) -> SocketWorker
62 where Message: Send + Sync + 'static {
63 let deleting = Arc::new(AtomicBool::new(false));
64 let mut worker = SocketWorker {
65 thread: None,
66 deleting: deleting.clone(),
67 };
68 worker.thread = Some(
69 thread::Builder::new()
70 .stack_size(STACK_SIZE)
71 .name(format!("Socket IO Worker #{}", index))
72 .spawn(move || {
73 LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
74 SocketWorker::work_loop(rx, channel.clone(), deleting)
75 })
76 .expect("Error creating socket worker thread"),
77 );
78 worker
79 }
80
81 fn work_loop<Message>(
82 rx: Receiver<Work<Message>>, channel: IoChannel<Message>,
83 deleting: Arc<AtomicBool>,
84 ) where
85 Message: Send + Sync + 'static,
86 {
87 while !deleting.load(AtomicOrdering::Acquire) {
88 match rx.recv_timeout(Duration::from_millis(500)) {
91 Ok(work) => SocketWorker::do_work(work, channel.clone()),
92 Err(RecvTimeoutError::Timeout) => continue,
93 Err(RecvTimeoutError::Disconnected) => break,
94 }
95 }
96 }
97
98 fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>)
99 where Message: Send + Sync + 'static {
100 match work.work_type {
101 WorkType::Message(message) => {
102 work.handler.message(
103 &IoContext::new(channel, work.handler_id),
104 &*message,
105 );
106 }
107 _ => warn!(target: "SocketWorker::do_work", "Unexpected WorkType"),
108 }
109 }
110}
111
112impl Drop for SocketWorker {
113 fn drop(&mut self) {
114 trace!(target: "shutdown", "[SocketIoWorker] Closing...");
115 self.deleting.store(true, AtomicOrdering::Release);
116 if let Some(thread) = self.thread.take() {
117 thread.join().ok();
118 }
119 trace!(target: "shutdown", "[SocketIoWorker] Closed");
120 }
121}
122
123pub struct Worker {
126 thread: Option<JoinHandle<()>>,
127 wait: Arc<SCondvar>,
128 deleting: Arc<AtomicBool>,
129 wait_mutex: Arc<SMutex<()>>,
130}
131
132impl Worker {
133 pub fn new<Message>(
135 index: usize, stealer: Stealer<Work<Message>>,
136 channel: IoChannel<Message>, wait: Arc<SCondvar>,
137 wait_mutex: Arc<SMutex<()>>,
138 ) -> Worker
139 where
140 Message: Send + Sync + 'static,
141 {
142 let deleting = Arc::new(AtomicBool::new(false));
143 let mut worker = Worker {
144 thread: None,
145 wait: wait.clone(),
146 deleting: deleting.clone(),
147 wait_mutex: wait_mutex.clone(),
148 };
149 worker.thread = Some(
150 thread::Builder::new()
151 .stack_size(STACK_SIZE)
152 .name(format!("IO Worker #{}", index))
153 .spawn(move || {
154 LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
155 Worker::work_loop(
156 stealer,
157 channel.clone(),
158 wait,
159 wait_mutex.clone(),
160 deleting,
161 )
162 })
163 .expect("Error creating worker thread"),
164 );
165 worker
166 }
167
168 fn work_loop<Message>(
169 stealer: Stealer<Work<Message>>, channel: IoChannel<Message>,
170 wait: Arc<SCondvar>, wait_mutex: Arc<SMutex<()>>,
171 deleting: Arc<AtomicBool>,
172 ) where
173 Message: Send + Sync + 'static,
174 {
175 loop {
176 {
177 let lock = wait_mutex.lock().expect("Poisoned work_loop mutex");
178 if deleting.load(AtomicOrdering::Acquire) {
179 return;
180 }
181 std::mem::drop(wait.wait(lock));
182 }
183
184 while !deleting.load(AtomicOrdering::Acquire) {
190 match stealer.steal() {
191 Steal::Success(work) => {
192 Worker::do_work(work, channel.clone())
193 }
194 Steal::Retry => {}
195 Steal::Empty => break,
196 }
197 }
198 }
199 }
200
201 fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>)
202 where Message: Send + Sync + 'static {
203 match work.work_type {
204 WorkType::Timeout => {
205 work.handler.timeout(
206 &IoContext::new(channel, work.handler_id),
207 work.token,
208 );
209 }
210 WorkType::Message(message) => {
211 work.handler.message(
212 &IoContext::new(channel, work.handler_id),
213 &*message,
214 );
215 }
216 }
217 }
218}
219
220impl Drop for Worker {
221 fn drop(&mut self) {
222 trace!(target: "shutdown", "[IoWorker] Closing...");
223 {
224 let _lock =
225 self.wait_mutex.lock().expect("Poisoned work_loop mutex");
226 self.deleting.store(true, AtomicOrdering::Release);
227 self.wait.notify_all();
228 }
229 if let Some(thread) = self.thread.take() {
230 thread.join().ok();
231 }
232 trace!(target: "shutdown", "[IoWorker] Closed");
233 }
234}