1use keccak_hash as hash;
24use log::{debug, trace, warn};
25
26mod traits;
27
28pub use traits::{Error, JobDispatcher, PushWorkHandler, ServiceConfiguration};
29
30use cfx_types::H256;
31use hash::keccak;
32use jsonrpsee::{
33 core::RpcResult,
34 server::RpcModule,
35 types::{ErrorObjectOwned, Id, Params, Request, Response, ResponsePayload},
36};
37use parking_lot::RwLock;
38use serde_json::Value;
39use std::{
40 collections::{HashMap, HashSet},
41 io,
42 net::SocketAddr,
43 sync::Arc,
44};
45use tokio::{
46 io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
47 net::{TcpListener, TcpStream},
48 sync::{mpsc, RwLock as AsyncRwLock},
49};
50
51const NOTIFY_COUNTER_INITIAL: u32 = 16;
52
53pub struct Stratum {
55 implementation: Arc<StratumImpl>,
57 shutdown_tx: mpsc::Sender<()>,
59}
60
61impl Stratum {
62 pub async fn start(
63 addr: &SocketAddr, dispatcher: Arc<dyn JobDispatcher>,
64 secret: Option<H256>,
65 ) -> Result<Arc<Stratum>, Error> {
66 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
67
68 let implementation = Arc::new(StratumImpl {
69 dispatcher,
70 secret,
71 notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
72 workers: Arc::new(RwLock::new(HashMap::new())),
73 connections: Arc::new(RwLock::default()),
74 });
75
76 let implementation_for_methods = implementation.clone();
77 let listener = TcpListener::bind(addr)
78 .await
79 .map_err(|e| Error::Io(e.to_string()))?;
80
81 let local_addr = listener
82 .local_addr()
83 .map_err(|e| Error::Io(e.to_string()))?;
84 debug!("Stratum server started on {}", local_addr);
85
86 tokio::spawn(async move {
88 loop {
89 tokio::select! {
90 result = listener.accept() => {
91 match result {
92 Ok((stream, peer_addr)) => {
93 debug!("Accepted connection from {}", peer_addr);
94 let impl_for_methods = implementation_for_methods.clone();
95
96 tokio::spawn(async move {
97 if let Err(e) = handle_connection(stream, peer_addr, impl_for_methods).await {
98 debug!("Connection error for {}: {}", peer_addr, e);
99 }
100 });
101 }
102 Err(e) => {
103 warn!("Failed to accept connection: {}", e);
104 }
105 }
106 }
107 _ = shutdown_rx.recv() => {
108 debug!("Shutting down stratum server");
109 break;
110 }
111 }
112 }
113 });
114
115 let stratum = Arc::new(Stratum {
116 implementation,
117 shutdown_tx,
118 });
119
120 Ok(stratum)
121 }
122
123 pub async fn stop(&self) -> Result<(), Error> {
124 self.shutdown_tx.send(()).await.map_err(|e| {
125 Error::Io(format!("Failed to send shutdown signal: {}", e))
126 })
127 }
128}
129
130impl PushWorkHandler for Stratum {
131 fn push_work_all(&self, payload: String) -> Result<(), Error> {
132 debug!("Pushing job {} to miners", payload);
133 self.implementation.push_work_all(payload)
134 }
135}
136
137async fn handle_connection(
138 stream: TcpStream, peer_addr: SocketAddr,
139 implementation_for_methods: Arc<StratumImpl>,
140) -> io::Result<()> {
141 let (reader, writer) = stream.into_split();
142 let mut reader = BufReader::new(reader);
143 let writer = Arc::new(AsyncRwLock::new(BufWriter::new(writer)));
144
145 let implementation = implementation_for_methods.clone();
146
147 let module =
149 StratumImpl::build_rpc_methods(implementation_for_methods, peer_addr);
150 let methods = Arc::new(module);
151
152 let (tx, mut rx) = mpsc::channel::<String>(100);
154 implementation.connections.write().insert(peer_addr, tx);
155
156 let writer_clone = writer.clone();
158 let push_task = tokio::spawn(async move {
159 while let Some(msg) = rx.recv().await {
160 let mut writer = writer_clone.write().await;
161 if let Err(e) = writer.write_all(msg.as_bytes()).await {
162 debug!("Failed to write notification: {}", e);
163 break;
164 }
165 if let Err(e) = writer.write_all(b"\n").await {
166 debug!("Failed to write newline: {}", e);
167 break;
168 }
169 if let Err(e) = writer.flush().await {
170 debug!("Failed to flush: {}", e);
171 break;
172 }
173 }
174 });
175
176 let mut line = String::new();
177
178 loop {
179 line.clear();
180
181 let n = reader.read_line(&mut line).await?;
182
183 if n == 0 {
184 debug!("Connection closed by peer {}", peer_addr);
185 break;
186 }
187
188 let trimmed = line.trim();
189 if trimmed.is_empty() {
190 continue;
191 }
192
193 debug!("Received request from {}: {}", peer_addr, trimmed);
194
195 let request: Result<Request, _> = serde_json::from_str(trimmed);
197
198 let response = match request {
199 Ok(req) => {
200 let id = req.id.clone();
201 let method = req.method.clone();
202
203 match methods.raw_json_request(&trimmed, 1).await {
205 Ok((response_raw, _rx)) => {
206 let response_str = response_raw.get();
207 debug!("Response for {}: {}", method, response_str);
208 response_str.to_string()
209 }
210 Err(e) => {
211 warn!("Method call failed for {}: {}", method, e);
212 let error_response: Response<Value> = Response::new(
213 ResponsePayload::error(ErrorObjectOwned::owned(
214 -32603,
215 "Internal error",
216 Some(e.to_string()),
217 )),
218 id,
219 );
220 serde_json::to_string(&error_response).unwrap()
221 }
222 }
223 }
224 Err(e) => {
225 warn!("Failed to parse request: {}", e);
226 let error_response: Response<Value> = Response::new(
227 ResponsePayload::error(ErrorObjectOwned::owned(
228 -32700,
229 "Parse error",
230 Some(e.to_string()),
231 )),
232 Id::Null,
233 );
234 serde_json::to_string(&error_response).unwrap()
235 }
236 };
237
238 let mut writer = writer.write().await;
240 writer.write_all(response.as_bytes()).await?;
241 writer.write_all(b"\n").await?;
242 writer.flush().await?;
243 }
244
245 push_task.abort();
247 implementation.connections.write().remove(&peer_addr);
248 Ok(())
253}
254
255struct StratumImpl {
256 dispatcher: Arc<dyn JobDispatcher>,
258 secret: Option<H256>,
260 notify_counter: RwLock<u32>,
262 workers: Arc<RwLock<HashMap<SocketAddr, String>>>,
264 connections: Arc<RwLock<HashMap<SocketAddr, mpsc::Sender<String>>>>,
266}
267
268impl StratumImpl {
269 async fn subscribe(
271 &self, params: Params<'_>, peer_addr: SocketAddr,
272 ) -> RpcResult<bool> {
273 let params_vec: Vec<String> = params.parse()?;
274
275 if params_vec.len() < 2 {
276 return Err(ErrorObjectOwned::owned(
277 -32602,
278 "Invalid params: expected [worker_id, secret]",
279 None::<()>,
280 ));
281 }
282
283 let worker_id = ¶ms_vec[0];
284 let secret = ¶ms_vec[1];
285
286 if let Some(valid_secret) = self.secret {
287 let hash = keccak(secret);
288 if hash != valid_secret {
289 return Ok(false);
290 }
291 }
292
293 let mut workers = self.workers.write();
294 workers.insert(peer_addr, worker_id.clone());
295
296 Ok(true)
297 }
298
299 async fn submit(&self, params: Params<'_>) -> RpcResult<Vec<Value>> {
301 let params_vec: Vec<String> = params.parse()?;
302
303 match self.dispatcher.submit(params_vec) {
304 Ok(()) => Ok(vec![Value::Bool(true)]),
305 Err(crate::traits::Error::InvalidSolution(msg)) => {
306 warn!("Error because of invalid solution: {:?}", msg);
307 Ok(vec![Value::Bool(false), Value::String(msg)])
308 }
309 Err(submit_err) => {
310 warn!("Error while submitting share: {:?}", submit_err);
311 Ok(vec![Value::Bool(false)])
312 }
313 }
314 }
315
316 fn push_work_all(&self, payload: String) -> Result<(), Error> {
317 let connections = self.connections.read();
318 let workers = self.workers.read();
319
320 let next_request_id = {
321 let mut counter = self.notify_counter.write();
322 if *counter == ::std::u32::MAX {
323 *counter = NOTIFY_COUNTER_INITIAL;
324 } else {
325 *counter += 1
326 }
327 *counter
328 };
329
330 let mut hup_peers = HashSet::with_capacity(0);
331 let workers_msg = format!(
332 r#"{{"id":{},"method":"mining.notify","params":{}}}"#,
333 next_request_id, payload
334 );
335
336 trace!(target: "stratum", "Pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg);
337
338 for (addr, worker_id) in workers.iter() {
339 trace!(target: "stratum", "Pushing work to addr {} worker_id {}", &addr, &worker_id);
340
341 if let Some(tx) = connections.get(addr) {
342 if let Err(e) = tx.try_send(workers_msg.clone()) {
343 debug!(target: "stratum", "Worker no longer connected: addr {}, error: {:?}", &addr, e);
344 hup_peers.insert(*addr);
345 }
346 } else {
347 debug!(target: "stratum", "Worker has no active connection: addr {}", &addr);
348 hup_peers.insert(*addr);
349 }
350 }
351
352 drop(connections);
353 drop(workers);
354
355 if !hup_peers.is_empty() {
356 let mut connections = self.connections.write();
357 let mut workers = self.workers.write();
358 for hup_peer in hup_peers {
359 connections.remove(&hup_peer);
360 workers.remove(&hup_peer);
361 }
362 }
363
364 Ok(())
365 }
366
367 fn build_rpc_methods(
368 implementation_for_methods: Arc<StratumImpl>, peer_addr: SocketAddr,
369 ) -> RpcModule<SocketAddr> {
370 let mut module = RpcModule::new(peer_addr);
371
372 {
374 let implementation = implementation_for_methods.clone();
375 module
376 .register_async_method(
377 "mining.subscribe",
378 move |params: Params, _ctx: Arc<SocketAddr>, _ext| {
379 let implementation = implementation.clone();
380 let peer_addr = *(_ctx.as_ref());
381 async move {
382 implementation.subscribe(params, peer_addr).await
383 }
384 },
385 )
386 .expect("successfully register mining.subscribe method");
387 }
388
389 {
391 let implementation = implementation_for_methods.clone();
392 module
393 .register_async_method(
394 "mining.submit",
395 move |params: Params, _ctx: Arc<SocketAddr>, _ext| {
396 let implementation = implementation.clone();
397 async move { implementation.submit(params).await }
398 },
399 )
400 .expect("successfully register mining.submit method");
401 }
402
403 module
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use std::{net::SocketAddr, sync::Arc, time::Duration};
411 use tokio::{
412 io::{AsyncReadExt, AsyncWriteExt},
413 net::TcpStream,
414 time::sleep,
415 };
416
417 pub struct VoidManager;
418
419 impl JobDispatcher for VoidManager {
420 fn submit(&self, _payload: Vec<String>) -> Result<(), Error> { Ok(()) }
421 }
422
423 async fn dummy_request(addr: &SocketAddr, data: &str) -> Vec<u8> {
424 let mut stream = TcpStream::connect(addr)
425 .await
426 .expect("Should connect to server");
427
428 let mut data_vec = data.as_bytes().to_vec();
429 data_vec.extend(b"\n");
430
431 stream
432 .write_all(&data_vec)
433 .await
434 .expect("Should write data to stream");
435
436 stream.shutdown().await.expect("Should shutdown write half");
437
438 let mut read_buf = Vec::with_capacity(2048);
439 stream
440 .read_to_end(&mut read_buf)
441 .await
442 .expect("Should read data from stream");
443
444 read_buf
445 }
446
447 #[tokio::test]
448 async fn can_be_started() {
449 let stratum = Stratum::start(
450 &"127.0.0.1:19980".parse().unwrap(),
451 Arc::new(VoidManager),
452 None,
453 )
454 .await;
455 assert!(stratum.is_ok());
456 if let Ok(s) = stratum {
457 let _ = s.stop().await;
458 }
459 }
460
461 struct DummyManager {
462 initial_payload: String,
463 }
464
465 impl DummyManager {
466 fn build() -> DummyManager {
467 DummyManager {
468 initial_payload: r#"[ "dummy payload" ]"#.to_owned(),
469 }
470 }
471
472 fn of_initial(mut self, new_initial: &str) -> DummyManager {
473 self.initial_payload = new_initial.to_owned();
474 self
475 }
476 }
477
478 impl JobDispatcher for DummyManager {
479 fn submit(&self, _payload: Vec<String>) -> Result<(), Error> { Ok(()) }
480 }
481
482 #[tokio::test]
483 async fn can_subscribe() {
484 let addr = "127.0.0.1:19970".parse().unwrap();
485 let stratum = Stratum::start(
486 &addr,
487 Arc::new(
488 DummyManager::build()
489 .of_initial(r#"["dummy autorize payload"]"#),
490 ),
491 None,
492 )
493 .await
494 .expect("There should be no error starting stratum");
495
496 let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", ""], "id": 1}"#;
497 let response =
498 String::from_utf8(dummy_request(&addr, request).await).unwrap();
499
500 let response_json: Value =
502 serde_json::from_str(response.trim()).unwrap();
503 let expected_json: Value =
504 serde_json::from_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#)
505 .unwrap();
506 assert_eq!(expected_json, response_json);
507 assert_eq!(1, stratum.implementation.workers.read().len());
508
509 let _ = stratum.stop().await;
510 }
511
512 #[tokio::test]
513 async fn can_push_work() {
514 let _ = ::env_logger::try_init();
515
516 let addr = "127.0.0.1:19995".parse().unwrap();
517 let stratum = Stratum::start(
518 &addr,
519 Arc::new(
520 DummyManager::build()
521 .of_initial(r#"["dummy autorize payload"]"#),
522 ),
523 None,
524 )
525 .await
526 .expect("There should be no error starting stratum");
527
528 let mut auth_request =
529 r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", ""], "id": 1}"#
530 .as_bytes()
531 .to_vec();
532 auth_request.extend(b"\n");
533
534 let auth_response = "{\"jsonrpc\":\"2.0\",\"result\":true,\"id\":1}\n";
535
536 let response = {
537 let mut stream = TcpStream::connect(&addr)
538 .await
539 .expect("Should connect to server");
540
541 stream
543 .write_all(&auth_request)
544 .await
545 .expect("Should write auth request");
546
547 let mut read_buf0 = vec![0u8; auth_response.len()];
549 stream
550 .read_exact(&mut read_buf0)
551 .await
552 .expect("Should read auth response");
553
554 let response_json: Value = serde_json::from_str(
556 String::from_utf8(read_buf0).unwrap().trim(),
557 )
558 .unwrap();
559 let expected_json: Value =
560 serde_json::from_str(auth_response).unwrap();
561 assert_eq!(expected_json, response_json);
562 trace!(target: "stratum", "Received authorization confirmation");
563
564 sleep(Duration::from_millis(100)).await;
566
567 trace!(target: "stratum", "Pushing work to peers");
569 stratum
570 .push_work_all(r#"{ "00040008", "100500" }"#.to_owned())
571 .expect("Pushing work should produce no errors");
572
573 sleep(Duration::from_millis(100)).await;
575
576 trace!(target: "stratum", "Ready to read work from server");
577 sleep(Duration::from_millis(100)).await;
578
579 stream.shutdown().await.expect("Should shutdown write half");
580
581 let mut read_buf1 = Vec::with_capacity(2048);
583 stream
584 .read_to_end(&mut read_buf1)
585 .await
586 .expect("Should read work response");
587
588 trace!(target: "stratum", "Received work from server");
589 read_buf1
590 };
591
592 let response =
593 String::from_utf8(response).expect("Response should be utf-8");
594
595 assert_eq!(
596 r#"{"id":17,"method":"mining.notify","params":{ "00040008", "100500" }}"#.to_owned() + "\n",
597 response
598 );
599
600 let _ = stratum.stop().await;
601 }
602
603 #[tokio::test]
604 async fn test_can_subscribe_with_secret() {
605 let addr = "127.0.0.1:19971".parse().unwrap();
606 let secret_str = "test_secret";
607 let secret_hash = keccak(secret_str);
608
609 let stratum =
610 Stratum::start(&addr, Arc::new(VoidManager), Some(secret_hash))
611 .await
612 .expect("Should start stratum with secret");
613
614 let request = format!(
615 r#"{{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", "{}"], "id": 1}}"#,
616 secret_str
617 );
618
619 let response =
620 String::from_utf8(dummy_request(&addr, &request).await).unwrap();
621
622 let response_json: Value =
624 serde_json::from_str(response.trim()).unwrap();
625 let expected_json: Value =
626 serde_json::from_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#)
627 .unwrap();
628 assert_eq!(expected_json, response_json);
629 assert_eq!(1, stratum.implementation.workers.read().len());
630
631 let _ = stratum.stop().await;
632 }
633
634 #[tokio::test]
635 async fn test_can_subscribe_with_invalid_secret() {
636 let addr = "127.0.0.1:19972".parse().unwrap();
637 let secret_str = "test_secret";
638 let secret_hash = keccak(secret_str);
639 let stratum =
640 Stratum::start(&addr, Arc::new(VoidManager), Some(secret_hash))
641 .await
642 .expect("Should start stratum with secret");
643
644 let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", "wrong_secret"], "id": 2}"#;
645 let response =
646 String::from_utf8(dummy_request(&addr, request).await).unwrap();
647
648 let response_json: Value =
650 serde_json::from_str(response.trim()).unwrap();
651 let expected_json: Value =
652 serde_json::from_str(r#"{"jsonrpc":"2.0","result":false,"id":2}"#)
653 .unwrap();
654 assert_eq!(expected_json, response_json);
655 assert_eq!(0, stratum.implementation.workers.read().len());
656
657 let _ = stratum.stop().await;
658 }
659
660 #[tokio::test]
661 async fn test_can_submit() {
662 let addr = "127.0.0.1:19973".parse().unwrap();
663
664 struct TestDispatcher {
665 submissions: Arc<RwLock<Vec<Vec<String>>>>,
666 }
667
668 impl JobDispatcher for TestDispatcher {
669 fn submit(&self, payload: Vec<String>) -> Result<(), Error> {
670 self.submissions.write().push(payload);
671 Ok(())
672 }
673 }
674
675 let test_dispatcher = TestDispatcher {
676 submissions: Arc::new(RwLock::new(Vec::new())),
677 };
678 let submissions = test_dispatcher.submissions.clone();
679
680 let stratum = Stratum::start(&addr, Arc::new(test_dispatcher), None)
681 .await
682 .expect("Should start stratum");
683
684 let subscribe_request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", ""], "id": 1}"#;
686 let subscribe_response =
687 String::from_utf8(dummy_request(&addr, subscribe_request).await)
688 .unwrap();
689
690 let response_json: Value =
692 serde_json::from_str(subscribe_response.trim()).unwrap();
693 let expected_json: Value =
694 serde_json::from_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#)
695 .unwrap();
696 assert_eq!(expected_json, response_json);
697
698 let submit_request = r#"{"jsonrpc": "2.0", "method": "mining.submit", "params": ["test_miner", "job_id", "0x1", "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"], "id": 2}"#;
700
701 let submit_response =
702 String::from_utf8(dummy_request(&addr, submit_request).await)
703 .unwrap();
704
705 let response_json: Value =
707 serde_json::from_str(submit_response.trim()).unwrap();
708 let expected_json: Value =
709 serde_json::from_str(r#"{"jsonrpc":"2.0","result":[true],"id":2}"#)
710 .unwrap();
711 assert_eq!(expected_json, response_json);
712
713 assert_eq!(1, submissions.read().len());
714 assert_eq!(
715 vec![
716 "test_miner",
717 "job_id",
718 "0x1",
719 "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"
720 ],
721 submissions.read()[0]
722 );
723
724 assert_eq!(1, stratum.implementation.workers.read().len());
725
726 let _ = stratum.stop().await;
727 }
728}