blockgen/miner/
stratum.rs

1// Copyright 2019-2020 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5// Copyright 2015-2019 Parity Technologies (UK) Ltd.
6// This file is part of Parity Ethereum.
7
8// Parity Ethereum is free software: you can redistribute it and/or modify
9// it under the terms of the GNU General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12
13// Parity Ethereum is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16// GNU General Public License for more details.
17
18// You should have received a copy of the GNU General Public License
19// along with Parity Ethereum.  If not, see <http://www.gnu.org/licenses/>.
20
21// Copyright 2019 Conflux Foundation. All rights reserved.
22// Conflux is free software and distributed under GNU General Public License.
23// See http://www.gnu.org/licenses/
24
25//! Client-side stratum job dispatcher and mining notifier handler
26
27use crate::{BlockGenerator, SolutionReceiver};
28
29use super::MineWorker;
30use cfx_stratum::{
31    Error as StratumServiceError, JobDispatcher, PushWorkHandler,
32    Stratum as StratumService,
33};
34use cfx_types::{H256, U256};
35use cfxcore::pow::{PowComputer, ProofOfWorkProblem, ProofOfWorkSolution};
36use log::{info, trace, warn};
37use parking_lot::Mutex;
38use std::{
39    collections::HashSet,
40    fmt,
41    net::{AddrParseError, SocketAddr},
42    sync::{mpsc, Arc},
43};
44
45/// Configures stratum server options.
46#[derive(Debug, PartialEq, Clone)]
47pub struct Options {
48    /// Network address
49    pub listen_addr: String,
50    /// Port
51    pub port: u16,
52    /// Secret for peers
53    pub secret: Option<H256>,
54}
55
56fn clean_0x(s: &str) -> &str {
57    if s.starts_with("0x") {
58        &s[2..]
59    } else {
60        s
61    }
62}
63
64struct SubmitPayload {
65    worker_id: String,
66    nonce: U256,
67    pow_hash: H256,
68}
69
70impl SubmitPayload {
71    fn from_args(payload: Vec<String>) -> Result<Self, PayloadError> {
72        if payload.len() != 4 {
73            return Err(PayloadError::ArgumentsAmountUnexpected(payload.len()));
74        }
75
76        let worker_id = payload[0].clone();
77
78        let nonce = match clean_0x(&payload[2]).parse::<U256>() {
79            Ok(nonce) => nonce,
80            Err(e) => {
81                warn!(target: "stratum", "submit_work ({}): invalid nonce ({:?})", &payload[0], e);
82                return Err(PayloadError::InvalidNonce(payload[0].clone()));
83            }
84        };
85
86        let pow_hash = match clean_0x(&payload[3]).parse::<H256>() {
87            Ok(pow_hash) => pow_hash,
88            Err(e) => {
89                warn!(target: "stratum", "submit_work ({}): invalid hash ({:?})", &payload[1], e);
90                return Err(PayloadError::InvalidPowHash(payload[1].clone()));
91            }
92        };
93
94        Ok(SubmitPayload {
95            worker_id,
96            nonce,
97            pow_hash,
98        })
99    }
100}
101
102#[derive(Debug)]
103#[allow(dead_code)]
104pub enum PayloadError {
105    ArgumentsAmountUnexpected(usize),
106    InvalidNonce(String),
107    InvalidPowHash(String),
108}
109
110impl fmt::Display for PayloadError {
111    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
112        fmt::Debug::fmt(&self, f)
113    }
114}
115
116/// Job dispatcher for stratum service
117pub struct StratumJobDispatcher {
118    recent_problems: Mutex<Vec<(ProofOfWorkProblem, HashSet<U256>)>>,
119    solution_sender: Mutex<mpsc::Sender<ProofOfWorkSolution>>,
120    pow: Arc<PowComputer>,
121    window_size: usize,
122}
123
124impl JobDispatcher for StratumJobDispatcher {
125    fn submit(&self, payload: Vec<String>) -> Result<(), StratumServiceError> {
126        let payload = SubmitPayload::from_args(payload)
127            .map_err(|e| StratumServiceError::Dispatch(e.to_string()))?;
128
129        trace!(
130            target: "stratum",
131            "submit_work: Decoded: nonce={}, pow_hash={}, worker_id={}",
132            payload.nonce,
133            payload.pow_hash,
134            payload.worker_id,
135        );
136
137        let sol = ProofOfWorkSolution {
138            nonce: payload.nonce,
139        };
140        {
141            let mut probs = self.recent_problems.lock();
142            let mut found = false;
143            for (pow_prob, solved_nonce) in probs.iter_mut() {
144                if pow_prob.block_hash == payload.pow_hash {
145                    if solved_nonce.contains(&sol.nonce) {
146                        return Err(StratumServiceError::InvalidSolution(
147                            format!(
148                                "Problem already solved with nonce = {}! worker_id = {}",
149                                sol.nonce, payload.worker_id
150                            ).into(),
151                        ));
152                    } else if self.pow.validate(pow_prob, &sol) {
153                        solved_nonce.insert(sol.nonce);
154                        info!(
155                            "Stratum worker {} mined a block!",
156                            payload.worker_id
157                        );
158                        found = true;
159                    } else {
160                        return Err(StratumServiceError::InvalidSolution(
161                            format!(
162                                "Incorrect Nonce! worker_id = {}!",
163                                payload.worker_id
164                            )
165                            .into(),
166                        ));
167                    }
168                }
169            }
170            if !found {
171                return Err(StratumServiceError::InvalidSolution(
172                    format!(
173                        "Solution for a stale job! worker_id = {}",
174                        payload.worker_id
175                    )
176                    .into(),
177                ));
178            }
179
180            match self.solution_sender.lock().send(sol) {
181                Ok(_) => {}
182                Err(e) => {
183                    warn!("{}", e);
184                }
185            }
186        }
187
188        Ok(())
189    }
190}
191
192impl StratumJobDispatcher {
193    /// New stratum job dispatcher given the miner and client
194    fn new(
195        solution_sender: mpsc::Sender<ProofOfWorkSolution>,
196        pow: Arc<PowComputer>, pow_window_size: usize,
197    ) -> StratumJobDispatcher {
198        StratumJobDispatcher {
199            recent_problems: Mutex::new(vec![]),
200            solution_sender: Mutex::new(solution_sender),
201            pow,
202            window_size: pow_window_size,
203        }
204    }
205
206    fn notify_new_problem(&self, current_problem: &ProofOfWorkProblem) {
207        let mut probs = self.recent_problems.lock();
208        if probs.len() == self.window_size {
209            probs.remove(0);
210        }
211        probs.push((current_problem.clone(), HashSet::new()));
212    }
213
214    /// Serializes payload for stratum service
215    fn payload(
216        &self, block_height: u64, pow_hash: H256, boundary: U256,
217    ) -> String {
218        // Now we just fill the job_id as pow_hash. This will be more consistent
219        // with the convention.
220        format!(
221            r#"["0x{:x}", "{}", "0x{:x}","0x{:x}"]"#,
222            pow_hash, block_height, pow_hash, boundary
223        )
224    }
225}
226
227/// Wrapper for dedicated stratum service
228pub struct Stratum {
229    dispatcher: Arc<StratumJobDispatcher>,
230    service: Arc<StratumService>,
231}
232
233#[derive(Debug)]
234/// Stratum error
235pub enum Error {
236    #[allow(unused)]
237    /// IPC sockets error
238    Service(StratumServiceError),
239    #[allow(unused)]
240    /// Invalid network address
241    Address(AddrParseError),
242}
243
244impl From<StratumServiceError> for Error {
245    fn from(service_err: StratumServiceError) -> Error {
246        Error::Service(service_err)
247    }
248}
249
250impl From<AddrParseError> for Error {
251    fn from(err: AddrParseError) -> Error { Error::Address(err) }
252}
253
254impl MineWorker for Stratum {
255    fn receive_problem(&self, prob: ProofOfWorkProblem) {
256        trace!(target: "stratum", "Notify work");
257
258        self.dispatcher.notify_new_problem(&prob);
259        self.service.push_work_all(
260            self.dispatcher.payload(prob.block_height, prob.block_hash, prob.boundary)
261        ).unwrap_or_else(
262            |e| warn!(target: "stratum", "Error while pushing work: {:?}", e)
263        );
264    }
265}
266
267impl Stratum {
268    pub fn spawn(bg: &BlockGenerator) -> (Self, SolutionReceiver) {
269        let (solution_sender, solution_receiver) = mpsc::channel();
270        let cfg = Options {
271            listen_addr: bg.pow_config.stratum_listen_addr.clone(),
272            port: bg.pow_config.stratum_port,
273            secret: bg.pow_config.stratum_secret,
274        };
275        let stratum = Stratum::start(
276            &cfg,
277            bg.pow.clone(),
278            bg.pow_config.pow_problem_window_size,
279            solution_sender,
280        )
281        .expect("Failed to start Stratum service.");
282
283        (stratum, solution_receiver)
284    }
285
286    /// New stratum job dispatcher, given the miner, client and dedicated
287    /// stratum service
288    pub fn start(
289        options: &Options, pow: Arc<PowComputer>, pow_window_size: usize,
290        solution_sender: mpsc::Sender<ProofOfWorkSolution>,
291    ) -> Result<Stratum, Error> {
292        use std::net::IpAddr;
293
294        let dispatcher = Arc::new(StratumJobDispatcher::new(
295            solution_sender,
296            pow,
297            pow_window_size,
298        ));
299
300        let stratum_svc = StratumService::start(
301            &SocketAddr::new(
302                options.listen_addr.parse::<IpAddr>()?,
303                options.port,
304            ),
305            dispatcher.clone(),
306            options.secret.clone(),
307        )?;
308
309        Ok(Stratum {
310            dispatcher,
311            service: stratum_svc,
312        })
313    }
314}