1use keccak_hash as hash;
24use log::{debug, trace, warn};
25
26mod traits;
27
28pub use traits::{Error, JobDispatcher, PushWorkHandler, ServiceConfiguration};
29
30use jsonrpc_core::{
31 to_value, Compatibility, IoDelegate, MetaIoHandler, Metadata, Params, Value,
32};
33use jsonrpc_tcp_server::{
34 Dispatcher, MetaExtractor, PushMessageError, RequestContext,
35 Server as JsonRpcServer, ServerBuilder as JsonRpcServerBuilder,
36};
37use std::sync::Arc;
38
39use crate::traits::Error::InvalidSolution;
40use cfx_types::H256;
41use hash::keccak;
42use parking_lot::RwLock;
43use std::{
44 collections::{HashMap, HashSet},
45 net::SocketAddr,
46};
47
48type RpcResult = Result<jsonrpc_core::Value, jsonrpc_core::Error>;
49
50const NOTIFY_COUNTER_INITIAL: u32 = 16;
51
52pub struct Stratum {
54 rpc_server: Option<JsonRpcServer>,
59 implementation: Arc<StratumImpl>,
63 tcp_dispatcher: Dispatcher,
67}
68
69impl Stratum {
70 pub fn start(
71 addr: &SocketAddr, dispatcher: Arc<dyn JobDispatcher>,
72 secret: Option<H256>,
73 ) -> Result<Arc<Stratum>, Error> {
74 let implementation = Arc::new(StratumImpl {
75 dispatcher,
76 workers: Arc::new(RwLock::default()),
77 secret,
78 notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
79 });
80
81 let mut delegate = IoDelegate::<StratumImpl, SocketMetadata>::new(
82 implementation.clone(),
83 );
84 delegate.add_method_with_meta("mining.subscribe", {
85 let implementation = implementation.clone();
86 move |_, params, meta| {
87 let implementation = implementation.clone();
88 async move { implementation.subscribe(params, meta).await }
89 }
90 });
91
92 delegate.add_method_with_meta("mining.submit", {
93 let implementation = implementation.clone();
94 move |_, params, meta| {
95 let implementation = implementation.clone();
96 async move { implementation.submit(params, meta).await }
97 }
98 });
99 let mut handler = MetaIoHandler::<SocketMetadata>::with_compatibility(
100 Compatibility::Both,
101 );
102 handler.extend_with(delegate);
103
104 let server_builder = JsonRpcServerBuilder::new(handler);
105 let tcp_dispatcher = server_builder.dispatcher();
106 let server_builder = server_builder.session_meta_extractor(
107 PeerMetaExtractor::new(tcp_dispatcher.clone()),
108 );
109 let server = server_builder.start(addr)?;
110
111 let stratum = Arc::new(Stratum {
112 rpc_server: Some(server),
113 implementation,
114 tcp_dispatcher,
115 });
116
117 Ok(stratum)
118 }
119}
120
121impl PushWorkHandler for Stratum {
122 fn push_work_all(&self, payload: String) -> Result<(), Error> {
123 debug!("Pushing job {} to miners", payload);
124
125 self.implementation
126 .push_work_all(payload, &self.tcp_dispatcher)
127 }
128}
129
130impl Drop for Stratum {
131 fn drop(&mut self) {
132 if let Some(server) = self.rpc_server.take() {
134 server.close()
135 }
136 }
137}
138
139struct StratumImpl {
140 dispatcher: Arc<dyn JobDispatcher>,
142 workers: Arc<RwLock<HashMap<SocketAddr, String>>>,
144 secret: Option<H256>,
146 notify_counter: RwLock<u32>,
148}
149
150impl StratumImpl {
151 async fn subscribe(
153 &self, params: Params, meta: SocketMetadata,
154 ) -> RpcResult {
155 params.parse::<(String, String)>().map(|(worker_id, secret)|{
156 if let Some(valid_secret) = self.secret {
157 let hash = keccak(secret);
158 if hash != valid_secret {
159 return to_value(&false);
160 }
161 }
162 debug!(target: "stratum", "New worker #{} registered", worker_id);
163 self.workers.write().insert(meta.addr().clone(), worker_id);
164 to_value(true)
165 }).map(|v| v.expect("Only true/false is returned and it's always serializable; qed"))
166 }
167
168 async fn submit(&self, params: Params, _meta: SocketMetadata) -> RpcResult {
170 Ok(Value::Array(match params {
171 Params::Array(vals) => {
172 match self.dispatcher.submit(
174 vals.iter()
175 .filter_map(|val| match *val {
176 Value::String(ref s) => Some(s.to_owned()),
177 _ => None,
178 })
179 .collect::<Vec<String>>(),
180 ) {
181 Ok(()) => vec![to_value(true).expect("serializable")],
182 Err(InvalidSolution(msg)) => {
183 warn!("Error because of invalid solution: {:?}", msg);
186 vec![
187 to_value(false).expect("serializable"),
188 to_value(msg).expect("serializable"),
189 ]
190 }
191 Err(submit_err) => {
192 warn!("Error while submitting share: {:?}", submit_err);
193 vec![to_value(false).expect("serializable")]
194 }
195 }
196 }
197 _ => {
198 trace!(target: "stratum", "Invalid submit work format {:?}", params);
199 vec![to_value(false).expect("serializable")]
200 }
201 }))
202 }
203
204 fn push_work_all(
205 &self, payload: String, tcp_dispatcher: &Dispatcher,
206 ) -> Result<(), Error> {
207 let hup_peers = {
208 let workers = self.workers.read();
209 let next_request_id = {
210 let mut counter = self.notify_counter.write();
211 if *counter == ::std::u32::MAX {
212 *counter = NOTIFY_COUNTER_INITIAL;
213 } else {
214 *counter += 1
215 }
216 *counter
217 };
218
219 let mut hup_peers = HashSet::with_capacity(0); let workers_msg = format!("{{ \"id\": {}, \"method\": \"mining.notify\", \"params\": {} }}", next_request_id, payload);
221 trace!(target: "stratum", "Pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg);
222 for (ref addr, worker_id) in workers.iter() {
223 trace!(target: "stratum", "Pushing work to {} at addr {}", &worker_id, &addr);
224 match tcp_dispatcher.push_message(addr, workers_msg.clone()) {
225 Err(PushMessageError::NoSuchPeer) => {
226 debug!(target: "stratum", "Worker no longer connected: {} addr {}", &worker_id, &addr);
227 hup_peers.insert(**addr);
228 }
229 Err(PushMessageError::Send(e)) => {
230 warn!(target: "stratum", "Unexpected transport error: {:?}", e);
231 if e.is_disconnected() {
232 hup_peers.insert(**addr);
233 }
234 }
235 Ok(_) => {}
236 }
237 }
238 hup_peers
239 };
240
241 if !hup_peers.is_empty() {
242 let mut workers = self.workers.write();
243 for hup_peer in hup_peers {
244 workers.remove(&hup_peer);
245 }
246 }
247
248 Ok(())
249 }
250}
251
252#[derive(Clone)]
253pub struct SocketMetadata {
254 addr: SocketAddr,
255 #[allow(dead_code)]
259 tcp_dispatcher: Option<Dispatcher>,
260}
261
262impl Default for SocketMetadata {
263 fn default() -> Self {
264 SocketMetadata {
265 addr: "0.0.0.0:0".parse().unwrap(),
266 tcp_dispatcher: None,
267 }
268 }
269}
270
271impl SocketMetadata {
272 pub fn addr(&self) -> &SocketAddr { &self.addr }
273}
274
275impl Metadata for SocketMetadata {}
276
277pub struct PeerMetaExtractor {
278 tcp_dispatcher: Dispatcher,
279}
280
281impl PeerMetaExtractor {
282 fn new(tcp_dispatcher: Dispatcher) -> Self {
283 PeerMetaExtractor { tcp_dispatcher }
284 }
285}
286
287impl MetaExtractor<SocketMetadata> for PeerMetaExtractor {
288 fn extract(&self, context: &RequestContext) -> SocketMetadata {
289 SocketMetadata {
290 addr: context.peer_addr,
291 tcp_dispatcher: Some(self.tcp_dispatcher.clone()),
292 }
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use std::{net::SocketAddr, sync::Arc};
300 use tokio::{
301 io::{AsyncReadExt, AsyncWriteExt},
302 net::TcpStream,
303 runtime::Runtime,
304 time::sleep,
305 };
306
307 pub struct VoidManager;
308
309 impl JobDispatcher for VoidManager {
310 fn submit(&self, _payload: Vec<String>) -> Result<(), Error> { Ok(()) }
311 }
312
313 fn dummy_request(addr: &SocketAddr, data: &str) -> Vec<u8> {
314 let runtime = Runtime::new()
315 .expect("Tokio Runtime should be created with no errors");
316
317 runtime.block_on(async {
318 let mut stream = TcpStream::connect(addr)
319 .await
320 .expect("Should connect to server");
321
322 let mut data_vec = data.as_bytes().to_vec();
323 data_vec.extend(b"\n");
324
325 stream
326 .write_all(&data_vec)
327 .await
328 .expect("Should write data to stream");
329
330 stream.shutdown().await.expect("Should shutdown write half");
331
332 let mut read_buf = Vec::with_capacity(2048);
333 stream
334 .read_to_end(&mut read_buf)
335 .await
336 .expect("Should read data from stream");
337
338 read_buf
339 })
340 }
341
342 #[test]
343 fn can_be_started() {
344 let stratum = Stratum::start(
345 &"127.0.0.1:19980".parse().unwrap(),
346 Arc::new(VoidManager),
347 None,
348 );
349 assert!(stratum.is_ok());
350 }
351
352 struct DummyManager {
353 initial_payload: String,
354 }
355
356 impl DummyManager {
357 fn build() -> DummyManager {
358 DummyManager {
359 initial_payload: r#"[ "dummy payload" ]"#.to_owned(),
360 }
361 }
362
363 fn of_initial(mut self, new_initial: &str) -> DummyManager {
364 self.initial_payload = new_initial.to_owned();
365 self
366 }
367 }
368
369 impl JobDispatcher for DummyManager {
370 fn submit(&self, _payload: Vec<String>) -> Result<(), Error> { Ok(()) }
371 }
372
373 fn terminated_str(origin: &'static str) -> String {
374 let mut s = String::new();
375 s.push_str(origin);
376 s.push_str("\n");
377 s
378 }
379
380 #[test]
381 fn can_subscribe() {
382 let addr = "127.0.0.1:19970".parse().unwrap();
383 let stratum = Stratum::start(
384 &addr,
385 Arc::new(
386 DummyManager::build()
387 .of_initial(r#"["dummy autorize payload"]"#),
388 ),
389 None,
390 )
391 .expect("There should be no error starting stratum");
392
393 let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", ""], "id": 1}"#;
394 let response =
395 String::from_utf8(dummy_request(&addr, request)).unwrap();
396
397 assert_eq!(
398 terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#),
399 response
400 );
401 assert_eq!(1, stratum.implementation.workers.read().len());
402 }
403
404 #[test]
405 fn can_push_work() {
406 let _ = ::env_logger::try_init();
407
408 let addr = "127.0.0.1:19995".parse().unwrap();
409 let stratum = Stratum::start(
410 &addr,
411 Arc::new(
412 DummyManager::build()
413 .of_initial(r#"["dummy autorize payload"]"#),
414 ),
415 None,
416 )
417 .expect("There should be no error starting stratum");
418
419 let mut auth_request =
420 r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", ""], "id": 1}"#
421 .as_bytes()
422 .to_vec();
423 auth_request.extend(b"\n");
424
425 let auth_response = "{\"jsonrpc\":\"2.0\",\"result\":true,\"id\":1}\n";
426
427 let runtime = Runtime::new()
428 .expect("Tokio Runtime should be created with no errors");
429
430 let response = runtime.block_on(async {
431 let mut stream = TcpStream::connect(&addr)
432 .await
433 .expect("Should connect to server");
434
435 stream
437 .write_all(&auth_request)
438 .await
439 .expect("Should write auth request");
440
441 let mut read_buf0 = vec![0u8; auth_response.len()];
443 stream
444 .read_exact(&mut read_buf0)
445 .await
446 .expect("Should read auth response");
447
448 assert_eq!(String::from_utf8(read_buf0).unwrap(), auth_response);
449 trace!(target: "stratum", "Received authorization confirmation");
450
451 sleep(std::time::Duration::from_millis(100)).await;
453
454 trace!(target: "stratum", "Pusing work to peers");
456 stratum
457 .push_work_all(r#"{ "00040008", "100500" }"#.to_owned())
458 .expect("Pushing work should produce no errors");
459
460 sleep(std::time::Duration::from_millis(100)).await;
462
463 trace!(target: "stratum", "Ready to read work from server");
464 sleep(std::time::Duration::from_millis(100)).await;
465
466 stream.shutdown().await.expect("Should shutdown write half");
467
468 let mut read_buf1 = Vec::with_capacity(2048);
470 stream
471 .read_to_end(&mut read_buf1)
472 .await
473 .expect("Should read work response");
474
475 trace!(target: "stratum", "Received work from server");
476 read_buf1
477 });
478
479 let response =
480 String::from_utf8(response).expect("Response should be utf-8");
481
482 assert_eq!(
483 "{ \"id\": 17, \"method\": \"mining.notify\", \"params\": { \"00040008\", \"100500\" } }\n",
484 response
485 );
486 }
487
488 #[test]
489 fn test_can_subscribe_with_secret() {
490 let addr = "127.0.0.1:19971".parse().unwrap();
491 let secret_str = "test_secret";
492 let secret_hash = keccak(secret_str);
493
494 let stratum =
495 Stratum::start(&addr, Arc::new(VoidManager), Some(secret_hash))
496 .expect("Should start stratum with secret");
497
498 let request = format!(
499 r#"{{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", "{}"], "id": 1}}"#,
500 secret_str
501 );
502
503 let response =
504 String::from_utf8(dummy_request(&addr, &request)).unwrap();
505
506 assert_eq!(
507 terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#),
508 response
509 );
510 assert_eq!(1, stratum.implementation.workers.read().len());
511 }
512
513 #[test]
514 fn test_can_subscribe_with_invalid_secret() {
515 let addr = "127.0.0.1:19972".parse().unwrap();
516 let secret_str = "test_secret";
517 let secret_hash = keccak(secret_str);
518 let stratum =
519 Stratum::start(&addr, Arc::new(VoidManager), Some(secret_hash))
520 .expect("Should start stratum with secret");
521
522 let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", "wrong_secret"], "id": 2}"#;
523 let response =
524 String::from_utf8(dummy_request(&addr, request)).unwrap();
525
526 assert_eq!(
527 terminated_str(r#"{"jsonrpc":"2.0","result":false,"id":2}"#),
528 response
529 );
530 assert_eq!(0, stratum.implementation.workers.read().len());
531 }
532
533 #[test]
534 fn test_can_submit() {
535 let addr = "127.0.0.1:19973".parse().unwrap();
536
537 struct TestDispatcher {
538 submissions: Arc<RwLock<Vec<Vec<String>>>>,
539 }
540
541 impl JobDispatcher for TestDispatcher {
542 fn submit(&self, payload: Vec<String>) -> Result<(), Error> {
543 self.submissions.write().push(payload);
544 Ok(())
545 }
546 }
547
548 let test_dispatcher = TestDispatcher {
549 submissions: Arc::new(RwLock::new(Vec::new())),
550 };
551 let submissions = test_dispatcher.submissions.clone();
552
553 let stratum = Stratum::start(&addr, Arc::new(test_dispatcher), None)
554 .expect("Should start stratum");
555
556 let subscribe_request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", ""], "id": 1}"#;
558 let subscribe_response =
559 String::from_utf8(dummy_request(&addr, subscribe_request)).unwrap();
560
561 assert_eq!(
562 terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#),
563 subscribe_response
564 );
565
566 let submit_request = r#"{"jsonrpc": "2.0", "method": "mining.submit", "params": ["test_miner", "job_id", "0x1", "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"], "id": 2}"#;
569
570 let submit_response =
571 String::from_utf8(dummy_request(&addr, submit_request)).unwrap();
572
573 assert_eq!(
574 terminated_str(r#"{"jsonrpc":"2.0","result":[true],"id":2}"#),
575 submit_response
576 );
577
578 assert_eq!(1, submissions.read().len());
579 assert_eq!(
580 vec![
581 "test_miner",
582 "job_id",
583 "0x1",
584 "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"
585 ],
586 submissions.read()[0]
587 );
588
589 assert_eq!(1, stratum.implementation.workers.read().len());
590 }
591}