io/
worker.rs

1// Copyright 2015-2018 Parity Technologies (UK) Ltd.
2// This file is part of Parity.
3
4// Parity is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Parity is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Parity.  If not, see <http://www.gnu.org/licenses/>.
16
17// Copyright 2019 Conflux Foundation. All rights reserved.
18// Conflux is free software and distributed under GNU General Public License.
19// See http://www.gnu.org/licenses/
20
21use crate::{
22    service_mio::{HandlerId, IoChannel, IoContext},
23    IoHandler, LOCAL_STACK_SIZE,
24};
25use crossbeam_channel::{Receiver, RecvTimeoutError};
26use crossbeam_deque::{Steal, Stealer};
27use log::{trace, warn};
28use std::{
29    sync::{
30        atomic::{AtomicBool, Ordering as AtomicOrdering},
31        Arc, Condvar as SCondvar, Mutex as SMutex,
32    },
33    thread::{self, JoinHandle},
34    time::Duration,
35};
36
37const STACK_SIZE: usize = 16 * 1024 * 1024;
38
39pub enum WorkType<Message> {
40    Timeout,
41    Message(Arc<Message>),
42}
43
44pub struct Work<Message> {
45    pub work_type: WorkType<Message>,
46    pub token: usize,
47    pub handler_id: HandlerId,
48    pub handler: Arc<dyn IoHandler<Message>>,
49}
50
51/// A socket IO worker thread
52pub struct SocketWorker {
53    thread: Option<JoinHandle<()>>,
54    deleting: Arc<AtomicBool>,
55}
56
57impl SocketWorker {
58    /// Creates a socket worker instance
59    pub fn new<Message>(
60        index: usize, rx: Receiver<Work<Message>>, channel: IoChannel<Message>,
61    ) -> SocketWorker
62    where Message: Send + Sync + 'static {
63        let deleting = Arc::new(AtomicBool::new(false));
64        let mut worker = SocketWorker {
65            thread: None,
66            deleting: deleting.clone(),
67        };
68        worker.thread = Some(
69            thread::Builder::new()
70                .stack_size(STACK_SIZE)
71                .name(format!("Socket IO Worker #{}", index))
72                .spawn(move || {
73                    LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
74                    SocketWorker::work_loop(rx, channel.clone(), deleting)
75                })
76                .expect("Error creating socket worker thread"),
77        );
78        worker
79    }
80
81    fn work_loop<Message>(
82        rx: Receiver<Work<Message>>, channel: IoChannel<Message>,
83        deleting: Arc<AtomicBool>,
84    ) where
85        Message: Send + Sync + 'static,
86    {
87        while !deleting.load(AtomicOrdering::Acquire) {
88            // Add timeout because if the worker is dropped, we can check
89            // `deleting` without blocking forever.
90            match rx.recv_timeout(Duration::from_millis(500)) {
91                Ok(work) => SocketWorker::do_work(work, channel.clone()),
92                Err(RecvTimeoutError::Timeout) => continue,
93                Err(RecvTimeoutError::Disconnected) => break,
94            }
95        }
96    }
97
98    fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>)
99    where Message: Send + Sync + 'static {
100        match work.work_type {
101            WorkType::Message(message) => {
102                work.handler.message(
103                    &IoContext::new(channel, work.handler_id),
104                    &*message,
105                );
106            }
107            _ => warn!(target: "SocketWorker::do_work", "Unexpected WorkType"),
108        }
109    }
110}
111
112impl Drop for SocketWorker {
113    fn drop(&mut self) {
114        trace!(target: "shutdown", "[SocketIoWorker] Closing...");
115        self.deleting.store(true, AtomicOrdering::Release);
116        if let Some(thread) = self.thread.take() {
117            thread.join().ok();
118        }
119        trace!(target: "shutdown", "[SocketIoWorker] Closed");
120    }
121}
122
123/// An IO worker thread
124/// Sorts them ready for blockchain insertion.
125pub struct Worker {
126    thread: Option<JoinHandle<()>>,
127    wait: Arc<SCondvar>,
128    deleting: Arc<AtomicBool>,
129    wait_mutex: Arc<SMutex<()>>,
130}
131
132impl Worker {
133    /// Creates a new worker instance.
134    pub fn new<Message>(
135        index: usize, stealer: Stealer<Work<Message>>,
136        channel: IoChannel<Message>, wait: Arc<SCondvar>,
137        wait_mutex: Arc<SMutex<()>>,
138    ) -> Worker
139    where
140        Message: Send + Sync + 'static,
141    {
142        let deleting = Arc::new(AtomicBool::new(false));
143        let mut worker = Worker {
144            thread: None,
145            wait: wait.clone(),
146            deleting: deleting.clone(),
147            wait_mutex: wait_mutex.clone(),
148        };
149        worker.thread = Some(
150            thread::Builder::new()
151                .stack_size(STACK_SIZE)
152                .name(format!("IO Worker #{}", index))
153                .spawn(move || {
154                    LOCAL_STACK_SIZE.with(|val| val.set(STACK_SIZE));
155                    Worker::work_loop(
156                        stealer,
157                        channel.clone(),
158                        wait,
159                        wait_mutex.clone(),
160                        deleting,
161                    )
162                })
163                .expect("Error creating worker thread"),
164        );
165        worker
166    }
167
168    fn work_loop<Message>(
169        stealer: Stealer<Work<Message>>, channel: IoChannel<Message>,
170        wait: Arc<SCondvar>, wait_mutex: Arc<SMutex<()>>,
171        deleting: Arc<AtomicBool>,
172    ) where
173        Message: Send + Sync + 'static,
174    {
175        loop {
176            {
177                let lock = wait_mutex.lock().expect("Poisoned work_loop mutex");
178                if deleting.load(AtomicOrdering::Acquire) {
179                    return;
180                }
181                std::mem::drop(wait.wait(lock));
182            }
183
184            // TODO: If a `work` is enqueued and notified after the following
185            // loop end but before we start waiting on `wait`, this
186            // work may not be processed in time because all workers are
187            // waiting? This is not an issue so far because we have
188            // many timeout events that always notify workers.
189            while !deleting.load(AtomicOrdering::Acquire) {
190                match stealer.steal() {
191                    Steal::Success(work) => {
192                        Worker::do_work(work, channel.clone())
193                    }
194                    Steal::Retry => {}
195                    Steal::Empty => break,
196                }
197            }
198        }
199    }
200
201    fn do_work<Message>(work: Work<Message>, channel: IoChannel<Message>)
202    where Message: Send + Sync + 'static {
203        match work.work_type {
204            WorkType::Timeout => {
205                work.handler.timeout(
206                    &IoContext::new(channel, work.handler_id),
207                    work.token,
208                );
209            }
210            WorkType::Message(message) => {
211                work.handler.message(
212                    &IoContext::new(channel, work.handler_id),
213                    &*message,
214                );
215            }
216        }
217    }
218}
219
220impl Drop for Worker {
221    fn drop(&mut self) {
222        trace!(target: "shutdown", "[IoWorker] Closing...");
223        {
224            let _lock =
225                self.wait_mutex.lock().expect("Poisoned work_loop mutex");
226            self.deleting.store(true, AtomicOrdering::Release);
227            self.wait.notify_all();
228        }
229        if let Some(thread) = self.thread.take() {
230            thread.join().ok();
231        }
232        trace!(target: "shutdown", "[IoWorker] Closed");
233    }
234}