cfx_stratum/
lib.rs

1// Copyright 2019-2020 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5// Copyright 2015-2019 Parity Technologies (UK) Ltd.
6// This file is part of Parity Ethereum.
7
8// Parity Ethereum is free software: you can redistribute it and/or modify
9// it under the terms of the GNU General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12
13// Parity Ethereum is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16// GNU General Public License for more details.
17
18// You should have received a copy of the GNU General Public License
19// along with Parity Ethereum.  If not, see <http://www.gnu.org/licenses/>.
20
21//! Stratum protocol implementation for Conflux clients
22
23use keccak_hash as hash;
24use log::{debug, trace, warn};
25
26mod traits;
27
28pub use traits::{Error, JobDispatcher, PushWorkHandler, ServiceConfiguration};
29
30use cfx_types::H256;
31use hash::keccak;
32use jsonrpsee::{
33    core::RpcResult,
34    server::RpcModule,
35    types::{ErrorObjectOwned, Id, Params, Request, Response, ResponsePayload},
36};
37use parking_lot::RwLock;
38use serde_json::Value;
39use std::{
40    collections::{HashMap, HashSet},
41    io,
42    net::SocketAddr,
43    sync::Arc,
44};
45use tokio::{
46    io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
47    net::{TcpListener, TcpStream},
48    sync::{mpsc, RwLock as AsyncRwLock},
49};
50
51const NOTIFY_COUNTER_INITIAL: u32 = 16;
52
53/// Container which owns rpc server and stratum implementation
54pub struct Stratum {
55    /// stratum protocol implementation
56    implementation: Arc<StratumImpl>,
57    /// Shutdown sender
58    shutdown_tx: mpsc::Sender<()>,
59}
60
61impl Stratum {
62    pub async fn start(
63        addr: &SocketAddr, dispatcher: Arc<dyn JobDispatcher>,
64        secret: Option<H256>,
65    ) -> Result<Arc<Stratum>, Error> {
66        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
67
68        let implementation = Arc::new(StratumImpl {
69            dispatcher,
70            secret,
71            notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
72            workers: Arc::new(RwLock::new(HashMap::new())),
73            connections: Arc::new(RwLock::default()),
74        });
75
76        let implementation_for_methods = implementation.clone();
77        let listener = TcpListener::bind(addr)
78            .await
79            .map_err(|e| Error::Io(e.to_string()))?;
80
81        let local_addr = listener
82            .local_addr()
83            .map_err(|e| Error::Io(e.to_string()))?;
84        debug!("Stratum server started on {}", local_addr);
85
86        // Spawn server task
87        tokio::spawn(async move {
88            loop {
89                tokio::select! {
90                    result = listener.accept() => {
91                        match result {
92                            Ok((stream, peer_addr)) => {
93                                debug!("Accepted connection from {}", peer_addr);
94                                let impl_for_methods = implementation_for_methods.clone();
95
96                                tokio::spawn(async move {
97                                    if let Err(e) = handle_connection(stream, peer_addr, impl_for_methods).await {
98                                        debug!("Connection error for {}: {}", peer_addr, e);
99                                    }
100                                });
101                            }
102                            Err(e) => {
103                                warn!("Failed to accept connection: {}", e);
104                            }
105                        }
106                    }
107                    _ = shutdown_rx.recv() => {
108                        debug!("Shutting down stratum server");
109                        break;
110                    }
111                }
112            }
113        });
114
115        let stratum = Arc::new(Stratum {
116            implementation,
117            shutdown_tx,
118        });
119
120        Ok(stratum)
121    }
122
123    pub async fn stop(&self) -> Result<(), Error> {
124        self.shutdown_tx.send(()).await.map_err(|e| {
125            Error::Io(format!("Failed to send shutdown signal: {}", e))
126        })
127    }
128}
129
130impl PushWorkHandler for Stratum {
131    fn push_work_all(&self, payload: String) -> Result<(), Error> {
132        debug!("Pushing job {} to miners", payload);
133        self.implementation.push_work_all(payload)
134    }
135}
136
137async fn handle_connection(
138    stream: TcpStream, peer_addr: SocketAddr,
139    implementation_for_methods: Arc<StratumImpl>,
140) -> io::Result<()> {
141    let (reader, writer) = stream.into_split();
142    let mut reader = BufReader::new(reader);
143    let writer = Arc::new(AsyncRwLock::new(BufWriter::new(writer)));
144
145    let implementation = implementation_for_methods.clone();
146
147    // Create RPC module with peer_addr as context
148    let module =
149        StratumImpl::build_rpc_methods(implementation_for_methods, peer_addr);
150    let methods = Arc::new(module);
151
152    // Register connection for push notifications
153    let (tx, mut rx) = mpsc::channel::<String>(100);
154    implementation.connections.write().insert(peer_addr, tx);
155
156    // Spawn task to handle push notifications
157    let writer_clone = writer.clone();
158    let push_task = tokio::spawn(async move {
159        while let Some(msg) = rx.recv().await {
160            let mut writer = writer_clone.write().await;
161            if let Err(e) = writer.write_all(msg.as_bytes()).await {
162                debug!("Failed to write notification: {}", e);
163                break;
164            }
165            if let Err(e) = writer.write_all(b"\n").await {
166                debug!("Failed to write newline: {}", e);
167                break;
168            }
169            if let Err(e) = writer.flush().await {
170                debug!("Failed to flush: {}", e);
171                break;
172            }
173        }
174    });
175
176    let mut line = String::new();
177
178    loop {
179        line.clear();
180
181        let n = reader.read_line(&mut line).await?;
182
183        if n == 0 {
184            debug!("Connection closed by peer {}", peer_addr);
185            break;
186        }
187
188        let trimmed = line.trim();
189        if trimmed.is_empty() {
190            continue;
191        }
192
193        debug!("Received request from {}: {}", peer_addr, trimmed);
194
195        // Parse the JSON-RPC request
196        let request: Result<Request, _> = serde_json::from_str(trimmed);
197
198        let response = match request {
199            Ok(req) => {
200                let id = req.id.clone();
201                let method = req.method.clone();
202
203                // Call the method
204                match methods.raw_json_request(&trimmed, 1).await {
205                    Ok((response_raw, _rx)) => {
206                        let response_str = response_raw.get();
207                        debug!("Response for {}: {}", method, response_str);
208                        response_str.to_string()
209                    }
210                    Err(e) => {
211                        warn!("Method call failed for {}: {}", method, e);
212                        let error_response: Response<Value> = Response::new(
213                            ResponsePayload::error(ErrorObjectOwned::owned(
214                                -32603,
215                                "Internal error",
216                                Some(e.to_string()),
217                            )),
218                            id,
219                        );
220                        serde_json::to_string(&error_response).unwrap()
221                    }
222                }
223            }
224            Err(e) => {
225                warn!("Failed to parse request: {}", e);
226                let error_response: Response<Value> = Response::new(
227                    ResponsePayload::error(ErrorObjectOwned::owned(
228                        -32700,
229                        "Parse error",
230                        Some(e.to_string()),
231                    )),
232                    Id::Null,
233                );
234                serde_json::to_string(&error_response).unwrap()
235            }
236        };
237
238        // Write response
239        let mut writer = writer.write().await;
240        writer.write_all(response.as_bytes()).await?;
241        writer.write_all(b"\n").await?;
242        writer.flush().await?;
243    }
244
245    // Clean up on disconnect
246    push_task.abort();
247    implementation.connections.write().remove(&peer_addr);
248    // Note: We don't remove from workers map here to allow reconnection with
249    // same worker_id Workers are only removed when push_work_all detects a
250    // failed connection
251
252    Ok(())
253}
254
255struct StratumImpl {
256    /// Payload manager
257    dispatcher: Arc<dyn JobDispatcher>,
258    /// Secret if any
259    secret: Option<H256>,
260    /// Dispatch notify counter
261    notify_counter: RwLock<u32>,
262    // socket addr to worker_id mapping for active workers
263    workers: Arc<RwLock<HashMap<SocketAddr, String>>>,
264    /// Active connections for pushing notifications
265    connections: Arc<RwLock<HashMap<SocketAddr, mpsc::Sender<String>>>>,
266}
267
268impl StratumImpl {
269    /// rpc method `mining.subscribe`
270    async fn subscribe(
271        &self, params: Params<'_>, peer_addr: SocketAddr,
272    ) -> RpcResult<bool> {
273        let params_vec: Vec<String> = params.parse()?;
274
275        if params_vec.len() < 2 {
276            return Err(ErrorObjectOwned::owned(
277                -32602,
278                "Invalid params: expected [worker_id, secret]",
279                None::<()>,
280            ));
281        }
282
283        let worker_id = &params_vec[0];
284        let secret = &params_vec[1];
285
286        if let Some(valid_secret) = self.secret {
287            let hash = keccak(secret);
288            if hash != valid_secret {
289                return Ok(false);
290            }
291        }
292
293        let mut workers = self.workers.write();
294        workers.insert(peer_addr, worker_id.clone());
295
296        Ok(true)
297    }
298
299    /// rpc method `mining.submit`
300    async fn submit(&self, params: Params<'_>) -> RpcResult<Vec<Value>> {
301        let params_vec: Vec<String> = params.parse()?;
302
303        match self.dispatcher.submit(params_vec) {
304            Ok(()) => Ok(vec![Value::Bool(true)]),
305            Err(crate::traits::Error::InvalidSolution(msg)) => {
306                warn!("Error because of invalid solution: {:?}", msg);
307                Ok(vec![Value::Bool(false), Value::String(msg)])
308            }
309            Err(submit_err) => {
310                warn!("Error while submitting share: {:?}", submit_err);
311                Ok(vec![Value::Bool(false)])
312            }
313        }
314    }
315
316    fn push_work_all(&self, payload: String) -> Result<(), Error> {
317        let connections = self.connections.read();
318        let workers = self.workers.read();
319
320        let next_request_id = {
321            let mut counter = self.notify_counter.write();
322            if *counter == ::std::u32::MAX {
323                *counter = NOTIFY_COUNTER_INITIAL;
324            } else {
325                *counter += 1
326            }
327            *counter
328        };
329
330        let mut hup_peers = HashSet::with_capacity(0);
331        let workers_msg = format!(
332            r#"{{"id":{},"method":"mining.notify","params":{}}}"#,
333            next_request_id, payload
334        );
335
336        trace!(target: "stratum", "Pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg);
337
338        for (addr, worker_id) in workers.iter() {
339            trace!(target: "stratum", "Pushing work to addr {} worker_id {}", &addr, &worker_id);
340
341            if let Some(tx) = connections.get(addr) {
342                if let Err(e) = tx.try_send(workers_msg.clone()) {
343                    debug!(target: "stratum", "Worker no longer connected: addr {}, error: {:?}", &addr, e);
344                    hup_peers.insert(*addr);
345                }
346            } else {
347                debug!(target: "stratum", "Worker has no active connection: addr {}", &addr);
348                hup_peers.insert(*addr);
349            }
350        }
351
352        drop(connections);
353        drop(workers);
354
355        if !hup_peers.is_empty() {
356            let mut connections = self.connections.write();
357            let mut workers = self.workers.write();
358            for hup_peer in hup_peers {
359                connections.remove(&hup_peer);
360                workers.remove(&hup_peer);
361            }
362        }
363
364        Ok(())
365    }
366
367    fn build_rpc_methods(
368        implementation_for_methods: Arc<StratumImpl>, peer_addr: SocketAddr,
369    ) -> RpcModule<SocketAddr> {
370        let mut module = RpcModule::new(peer_addr);
371
372        // Register mining.subscribe method
373        {
374            let implementation = implementation_for_methods.clone();
375            module
376                .register_async_method(
377                    "mining.subscribe",
378                    move |params: Params, _ctx: Arc<SocketAddr>, _ext| {
379                        let implementation = implementation.clone();
380                        let peer_addr = *(_ctx.as_ref());
381                        async move {
382                            implementation.subscribe(params, peer_addr).await
383                        }
384                    },
385                )
386                .expect("successfully register mining.subscribe method");
387        }
388
389        // Register mining.submit method
390        {
391            let implementation = implementation_for_methods.clone();
392            module
393                .register_async_method(
394                    "mining.submit",
395                    move |params: Params, _ctx: Arc<SocketAddr>, _ext| {
396                        let implementation = implementation.clone();
397                        async move { implementation.submit(params).await }
398                    },
399                )
400                .expect("successfully register mining.submit method");
401        }
402
403        module
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use super::*;
410    use std::{net::SocketAddr, sync::Arc, time::Duration};
411    use tokio::{
412        io::{AsyncReadExt, AsyncWriteExt},
413        net::TcpStream,
414        time::sleep,
415    };
416
417    pub struct VoidManager;
418
419    impl JobDispatcher for VoidManager {
420        fn submit(&self, _payload: Vec<String>) -> Result<(), Error> { Ok(()) }
421    }
422
423    async fn dummy_request(addr: &SocketAddr, data: &str) -> Vec<u8> {
424        let mut stream = TcpStream::connect(addr)
425            .await
426            .expect("Should connect to server");
427
428        let mut data_vec = data.as_bytes().to_vec();
429        data_vec.extend(b"\n");
430
431        stream
432            .write_all(&data_vec)
433            .await
434            .expect("Should write data to stream");
435
436        stream.shutdown().await.expect("Should shutdown write half");
437
438        let mut read_buf = Vec::with_capacity(2048);
439        stream
440            .read_to_end(&mut read_buf)
441            .await
442            .expect("Should read data from stream");
443
444        read_buf
445    }
446
447    #[tokio::test]
448    async fn can_be_started() {
449        let stratum = Stratum::start(
450            &"127.0.0.1:19980".parse().unwrap(),
451            Arc::new(VoidManager),
452            None,
453        )
454        .await;
455        assert!(stratum.is_ok());
456        if let Ok(s) = stratum {
457            let _ = s.stop().await;
458        }
459    }
460
461    struct DummyManager {
462        initial_payload: String,
463    }
464
465    impl DummyManager {
466        fn build() -> DummyManager {
467            DummyManager {
468                initial_payload: r#"[ "dummy payload" ]"#.to_owned(),
469            }
470        }
471
472        fn of_initial(mut self, new_initial: &str) -> DummyManager {
473            self.initial_payload = new_initial.to_owned();
474            self
475        }
476    }
477
478    impl JobDispatcher for DummyManager {
479        fn submit(&self, _payload: Vec<String>) -> Result<(), Error> { Ok(()) }
480    }
481
482    #[tokio::test]
483    async fn can_subscribe() {
484        let addr = "127.0.0.1:19970".parse().unwrap();
485        let stratum = Stratum::start(
486            &addr,
487            Arc::new(
488                DummyManager::build()
489                    .of_initial(r#"["dummy autorize payload"]"#),
490            ),
491            None,
492        )
493        .await
494        .expect("There should be no error starting stratum");
495
496        let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", ""], "id": 1}"#;
497        let response =
498            String::from_utf8(dummy_request(&addr, request).await).unwrap();
499
500        // Parse and compare JSON instead of string comparison
501        let response_json: Value =
502            serde_json::from_str(response.trim()).unwrap();
503        let expected_json: Value =
504            serde_json::from_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#)
505                .unwrap();
506        assert_eq!(expected_json, response_json);
507        assert_eq!(1, stratum.implementation.workers.read().len());
508
509        let _ = stratum.stop().await;
510    }
511
512    #[tokio::test]
513    async fn can_push_work() {
514        let _ = ::env_logger::try_init();
515
516        let addr = "127.0.0.1:19995".parse().unwrap();
517        let stratum = Stratum::start(
518            &addr,
519            Arc::new(
520                DummyManager::build()
521                    .of_initial(r#"["dummy autorize payload"]"#),
522            ),
523            None,
524        )
525        .await
526        .expect("There should be no error starting stratum");
527
528        let mut auth_request =
529            r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", ""], "id": 1}"#
530            .as_bytes()
531            .to_vec();
532        auth_request.extend(b"\n");
533
534        let auth_response = "{\"jsonrpc\":\"2.0\",\"result\":true,\"id\":1}\n";
535
536        let response = {
537            let mut stream = TcpStream::connect(&addr)
538                .await
539                .expect("Should connect to server");
540
541            // Write auth request
542            stream
543                .write_all(&auth_request)
544                .await
545                .expect("Should write auth request");
546
547            // Read auth response
548            let mut read_buf0 = vec![0u8; auth_response.len()];
549            stream
550                .read_exact(&mut read_buf0)
551                .await
552                .expect("Should read auth response");
553
554            // Parse and compare JSON instead of string comparison
555            let response_json: Value = serde_json::from_str(
556                String::from_utf8(read_buf0).unwrap().trim(),
557            )
558            .unwrap();
559            let expected_json: Value =
560                serde_json::from_str(auth_response).unwrap();
561            assert_eq!(expected_json, response_json);
562            trace!(target: "stratum", "Received authorization confirmation");
563
564            // Wait a bit
565            sleep(Duration::from_millis(100)).await;
566
567            // Push work
568            trace!(target: "stratum", "Pushing work to peers");
569            stratum
570                .push_work_all(r#"{ "00040008", "100500" }"#.to_owned())
571                .expect("Pushing work should produce no errors");
572
573            // Wait a bit
574            sleep(Duration::from_millis(100)).await;
575
576            trace!(target: "stratum", "Ready to read work from server");
577            sleep(Duration::from_millis(100)).await;
578
579            stream.shutdown().await.expect("Should shutdown write half");
580
581            // Read work response
582            let mut read_buf1 = Vec::with_capacity(2048);
583            stream
584                .read_to_end(&mut read_buf1)
585                .await
586                .expect("Should read work response");
587
588            trace!(target: "stratum", "Received work from server");
589            read_buf1
590        };
591
592        let response =
593            String::from_utf8(response).expect("Response should be utf-8");
594
595        assert_eq!(
596            r#"{"id":17,"method":"mining.notify","params":{ "00040008", "100500" }}"#.to_owned() + "\n",
597            response
598        );
599
600        let _ = stratum.stop().await;
601    }
602
603    #[tokio::test]
604    async fn test_can_subscribe_with_secret() {
605        let addr = "127.0.0.1:19971".parse().unwrap();
606        let secret_str = "test_secret";
607        let secret_hash = keccak(secret_str);
608
609        let stratum =
610            Stratum::start(&addr, Arc::new(VoidManager), Some(secret_hash))
611                .await
612                .expect("Should start stratum with secret");
613
614        let request = format!(
615            r#"{{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", "{}"], "id": 1}}"#,
616            secret_str
617        );
618
619        let response =
620            String::from_utf8(dummy_request(&addr, &request).await).unwrap();
621
622        // Parse and compare JSON instead of string comparison
623        let response_json: Value =
624            serde_json::from_str(response.trim()).unwrap();
625        let expected_json: Value =
626            serde_json::from_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#)
627                .unwrap();
628        assert_eq!(expected_json, response_json);
629        assert_eq!(1, stratum.implementation.workers.read().len());
630
631        let _ = stratum.stop().await;
632    }
633
634    #[tokio::test]
635    async fn test_can_subscribe_with_invalid_secret() {
636        let addr = "127.0.0.1:19972".parse().unwrap();
637        let secret_str = "test_secret";
638        let secret_hash = keccak(secret_str);
639        let stratum =
640            Stratum::start(&addr, Arc::new(VoidManager), Some(secret_hash))
641                .await
642                .expect("Should start stratum with secret");
643
644        let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", "wrong_secret"], "id": 2}"#;
645        let response =
646            String::from_utf8(dummy_request(&addr, request).await).unwrap();
647
648        // Parse and compare JSON instead of string comparison
649        let response_json: Value =
650            serde_json::from_str(response.trim()).unwrap();
651        let expected_json: Value =
652            serde_json::from_str(r#"{"jsonrpc":"2.0","result":false,"id":2}"#)
653                .unwrap();
654        assert_eq!(expected_json, response_json);
655        assert_eq!(0, stratum.implementation.workers.read().len());
656
657        let _ = stratum.stop().await;
658    }
659
660    #[tokio::test]
661    async fn test_can_submit() {
662        let addr = "127.0.0.1:19973".parse().unwrap();
663
664        struct TestDispatcher {
665            submissions: Arc<RwLock<Vec<Vec<String>>>>,
666        }
667
668        impl JobDispatcher for TestDispatcher {
669            fn submit(&self, payload: Vec<String>) -> Result<(), Error> {
670                self.submissions.write().push(payload);
671                Ok(())
672            }
673        }
674
675        let test_dispatcher = TestDispatcher {
676            submissions: Arc::new(RwLock::new(Vec::new())),
677        };
678        let submissions = test_dispatcher.submissions.clone();
679
680        let stratum = Stratum::start(&addr, Arc::new(test_dispatcher), None)
681            .await
682            .expect("Should start stratum");
683
684        // subscribe
685        let subscribe_request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", ""], "id": 1}"#;
686        let subscribe_response =
687            String::from_utf8(dummy_request(&addr, subscribe_request).await)
688                .unwrap();
689
690        // Parse and compare JSON instead of string comparison
691        let response_json: Value =
692            serde_json::from_str(subscribe_response.trim()).unwrap();
693        let expected_json: Value =
694            serde_json::from_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#)
695                .unwrap();
696        assert_eq!(expected_json, response_json);
697
698        // submit
699        let submit_request = r#"{"jsonrpc": "2.0", "method": "mining.submit", "params": ["test_miner", "job_id", "0x1", "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"], "id": 2}"#;
700
701        let submit_response =
702            String::from_utf8(dummy_request(&addr, submit_request).await)
703                .unwrap();
704
705        // Parse and compare JSON instead of string comparison
706        let response_json: Value =
707            serde_json::from_str(submit_response.trim()).unwrap();
708        let expected_json: Value =
709            serde_json::from_str(r#"{"jsonrpc":"2.0","result":[true],"id":2}"#)
710                .unwrap();
711        assert_eq!(expected_json, response_json);
712
713        assert_eq!(1, submissions.read().len());
714        assert_eq!(
715            vec![
716            "test_miner",
717            "job_id",
718            "0x1",
719            "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"
720        ],
721            submissions.read()[0]
722        );
723
724        assert_eq!(1, stratum.implementation.workers.read().len());
725
726        let _ = stratum.stop().await;
727    }
728}