1use crate::{BlockGenerator, SolutionReceiver};
28
29use super::MineWorker;
30use cfx_stratum::{
31 Error as StratumServiceError, JobDispatcher, PushWorkHandler,
32 Stratum as StratumService,
33};
34use cfx_types::{H256, U256};
35use cfxcore::pow::{PowComputer, ProofOfWorkProblem, ProofOfWorkSolution};
36use log::{info, trace, warn};
37use parking_lot::Mutex;
38use std::{
39 collections::HashSet,
40 fmt,
41 net::{AddrParseError, SocketAddr},
42 sync::{mpsc, Arc},
43};
44
45#[derive(Debug, PartialEq, Clone)]
47pub struct Options {
48 pub listen_addr: String,
50 pub port: u16,
52 pub secret: Option<H256>,
54}
55
56fn clean_0x(s: &str) -> &str {
57 if s.starts_with("0x") {
58 &s[2..]
59 } else {
60 s
61 }
62}
63
64struct SubmitPayload {
65 worker_id: String,
66 nonce: U256,
67 pow_hash: H256,
68}
69
70impl SubmitPayload {
71 fn from_args(payload: Vec<String>) -> Result<Self, PayloadError> {
72 if payload.len() != 4 {
73 return Err(PayloadError::ArgumentsAmountUnexpected(payload.len()));
74 }
75
76 let worker_id = payload[0].clone();
77
78 let nonce = match clean_0x(&payload[2]).parse::<U256>() {
79 Ok(nonce) => nonce,
80 Err(e) => {
81 warn!(target: "stratum", "submit_work ({}): invalid nonce ({:?})", &payload[0], e);
82 return Err(PayloadError::InvalidNonce(payload[0].clone()));
83 }
84 };
85
86 let pow_hash = match clean_0x(&payload[3]).parse::<H256>() {
87 Ok(pow_hash) => pow_hash,
88 Err(e) => {
89 warn!(target: "stratum", "submit_work ({}): invalid hash ({:?})", &payload[1], e);
90 return Err(PayloadError::InvalidPowHash(payload[1].clone()));
91 }
92 };
93
94 Ok(SubmitPayload {
95 worker_id,
96 nonce,
97 pow_hash,
98 })
99 }
100}
101
102#[derive(Debug)]
103#[allow(dead_code)]
104pub enum PayloadError {
105 ArgumentsAmountUnexpected(usize),
106 InvalidNonce(String),
107 InvalidPowHash(String),
108}
109
110impl fmt::Display for PayloadError {
111 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
112 fmt::Debug::fmt(&self, f)
113 }
114}
115
116pub struct StratumJobDispatcher {
118 recent_problems: Mutex<Vec<(ProofOfWorkProblem, HashSet<U256>)>>,
119 solution_sender: Mutex<mpsc::Sender<ProofOfWorkSolution>>,
120 pow: Arc<PowComputer>,
121 window_size: usize,
122}
123
124impl JobDispatcher for StratumJobDispatcher {
125 fn submit(&self, payload: Vec<String>) -> Result<(), StratumServiceError> {
126 let payload = SubmitPayload::from_args(payload)
127 .map_err(|e| StratumServiceError::Dispatch(e.to_string()))?;
128
129 trace!(
130 target: "stratum",
131 "submit_work: Decoded: nonce={}, pow_hash={}, worker_id={}",
132 payload.nonce,
133 payload.pow_hash,
134 payload.worker_id,
135 );
136
137 let sol = ProofOfWorkSolution {
138 nonce: payload.nonce,
139 };
140 {
141 let mut probs = self.recent_problems.lock();
142 let mut found = false;
143 for (pow_prob, solved_nonce) in probs.iter_mut() {
144 if pow_prob.block_hash == payload.pow_hash {
145 if solved_nonce.contains(&sol.nonce) {
146 return Err(StratumServiceError::InvalidSolution(
147 format!(
148 "Problem already solved with nonce = {}! worker_id = {}",
149 sol.nonce, payload.worker_id
150 ).into(),
151 ));
152 } else if self.pow.validate(pow_prob, &sol) {
153 solved_nonce.insert(sol.nonce);
154 info!(
155 "Stratum worker {} mined a block!",
156 payload.worker_id
157 );
158 found = true;
159 } else {
160 return Err(StratumServiceError::InvalidSolution(
161 format!(
162 "Incorrect Nonce! worker_id = {}!",
163 payload.worker_id
164 )
165 .into(),
166 ));
167 }
168 }
169 }
170 if !found {
171 return Err(StratumServiceError::InvalidSolution(
172 format!(
173 "Solution for a stale job! worker_id = {}",
174 payload.worker_id
175 )
176 .into(),
177 ));
178 }
179
180 match self.solution_sender.lock().send(sol) {
181 Ok(_) => {}
182 Err(e) => {
183 warn!("{}", e);
184 }
185 }
186 }
187
188 Ok(())
189 }
190}
191
192impl StratumJobDispatcher {
193 fn new(
195 solution_sender: mpsc::Sender<ProofOfWorkSolution>,
196 pow: Arc<PowComputer>, pow_window_size: usize,
197 ) -> StratumJobDispatcher {
198 StratumJobDispatcher {
199 recent_problems: Mutex::new(vec![]),
200 solution_sender: Mutex::new(solution_sender),
201 pow,
202 window_size: pow_window_size,
203 }
204 }
205
206 fn notify_new_problem(&self, current_problem: &ProofOfWorkProblem) {
207 let mut probs = self.recent_problems.lock();
208 if probs.len() == self.window_size {
209 probs.remove(0);
210 }
211 probs.push((current_problem.clone(), HashSet::new()));
212 }
213
214 fn payload(
216 &self, block_height: u64, pow_hash: H256, boundary: U256,
217 ) -> String {
218 format!(
221 r#"["0x{:x}", "{}", "0x{:x}","0x{:x}"]"#,
222 pow_hash, block_height, pow_hash, boundary
223 )
224 }
225}
226
227pub struct Stratum {
229 dispatcher: Arc<StratumJobDispatcher>,
230 service: Arc<StratumService>,
231}
232
233#[derive(Debug)]
234pub enum Error {
236 #[allow(unused)]
237 Service(StratumServiceError),
239 #[allow(unused)]
240 Address(AddrParseError),
242}
243
244impl From<StratumServiceError> for Error {
245 fn from(service_err: StratumServiceError) -> Error {
246 Error::Service(service_err)
247 }
248}
249
250impl From<AddrParseError> for Error {
251 fn from(err: AddrParseError) -> Error { Error::Address(err) }
252}
253
254impl MineWorker for Stratum {
255 fn receive_problem(&self, prob: ProofOfWorkProblem) {
256 trace!(target: "stratum", "Notify work");
257
258 self.dispatcher.notify_new_problem(&prob);
259 self.service.push_work_all(
260 self.dispatcher.payload(prob.block_height, prob.block_hash, prob.boundary)
261 ).unwrap_or_else(
262 |e| warn!(target: "stratum", "Error while pushing work: {:?}", e)
263 );
264 }
265}
266
267impl Stratum {
268 pub fn spawn(bg: &BlockGenerator) -> (Self, SolutionReceiver) {
269 let (solution_sender, solution_receiver) = mpsc::channel();
270 let cfg = Options {
271 listen_addr: bg.pow_config.stratum_listen_addr.clone(),
272 port: bg.pow_config.stratum_port,
273 secret: bg.pow_config.stratum_secret,
274 };
275 let stratum = Stratum::start(
276 &cfg,
277 bg.pow.clone(),
278 bg.pow_config.pow_problem_window_size,
279 solution_sender,
280 )
281 .expect("Failed to start Stratum service.");
282
283 (stratum, solution_receiver)
284 }
285
286 pub fn start(
289 options: &Options, pow: Arc<PowComputer>, pow_window_size: usize,
290 solution_sender: mpsc::Sender<ProofOfWorkSolution>,
291 ) -> Result<Stratum, Error> {
292 use std::net::IpAddr;
293
294 let dispatcher = Arc::new(StratumJobDispatcher::new(
295 solution_sender,
296 pow,
297 pow_window_size,
298 ));
299
300 let stratum_svc = StratumService::start(
301 &SocketAddr::new(
302 options.listen_addr.parse::<IpAddr>()?,
303 options.port,
304 ),
305 dispatcher.clone(),
306 options.secret.clone(),
307 )?;
308
309 Ok(Stratum {
310 dispatcher,
311 service: stratum_svc,
312 })
313 }
314}