cfx_stratum/
lib.rs

1// Copyright 2019-2020 Conflux Foundation. All rights reserved.
2// Conflux is free software and distributed under GNU General Public License.
3// See http://www.gnu.org/licenses/
4
5// Copyright 2015-2019 Parity Technologies (UK) Ltd.
6// This file is part of Parity Ethereum.
7
8// Parity Ethereum is free software: you can redistribute it and/or modify
9// it under the terms of the GNU General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12
13// Parity Ethereum is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16// GNU General Public License for more details.
17
18// You should have received a copy of the GNU General Public License
19// along with Parity Ethereum.  If not, see <http://www.gnu.org/licenses/>.
20
21//! Stratum protocol implementation for Conflux clients
22
23use keccak_hash as hash;
24use log::{debug, trace, warn};
25
26mod traits;
27
28pub use traits::{Error, JobDispatcher, PushWorkHandler, ServiceConfiguration};
29
30use jsonrpc_core::{
31    to_value, Compatibility, IoDelegate, MetaIoHandler, Metadata, Params, Value,
32};
33use jsonrpc_tcp_server::{
34    Dispatcher, MetaExtractor, PushMessageError, RequestContext,
35    Server as JsonRpcServer, ServerBuilder as JsonRpcServerBuilder,
36};
37use std::sync::Arc;
38
39use crate::traits::Error::InvalidSolution;
40use cfx_types::H256;
41use hash::keccak;
42use parking_lot::RwLock;
43use std::{
44    collections::{HashMap, HashSet},
45    net::SocketAddr,
46};
47
48type RpcResult = Result<jsonrpc_core::Value, jsonrpc_core::Error>;
49
50const NOTIFY_COUNTER_INITIAL: u32 = 16;
51
52/// Container which owns rpc server and stratum implementation
53pub struct Stratum {
54    /// RPC server
55    ///
56    /// It is an `Option` so it can be easily closed and released during `drop`
57    /// phase
58    rpc_server: Option<JsonRpcServer>,
59    /// stratum protocol implementation
60    ///
61    /// It is owned by a container and rpc server
62    implementation: Arc<StratumImpl>,
63    /// Message dispatcher (tcp/ip service)
64    ///
65    /// Used to push messages to peers
66    tcp_dispatcher: Dispatcher,
67}
68
69impl Stratum {
70    pub fn start(
71        addr: &SocketAddr, dispatcher: Arc<dyn JobDispatcher>,
72        secret: Option<H256>,
73    ) -> Result<Arc<Stratum>, Error> {
74        let implementation = Arc::new(StratumImpl {
75            dispatcher,
76            workers: Arc::new(RwLock::default()),
77            secret,
78            notify_counter: RwLock::new(NOTIFY_COUNTER_INITIAL),
79        });
80
81        let mut delegate = IoDelegate::<StratumImpl, SocketMetadata>::new(
82            implementation.clone(),
83        );
84        delegate.add_method_with_meta("mining.subscribe", {
85            let implementation = implementation.clone();
86            move |_, params, meta| {
87                let implementation = implementation.clone();
88                async move { implementation.subscribe(params, meta).await }
89            }
90        });
91
92        delegate.add_method_with_meta("mining.submit", {
93            let implementation = implementation.clone();
94            move |_, params, meta| {
95                let implementation = implementation.clone();
96                async move { implementation.submit(params, meta).await }
97            }
98        });
99        let mut handler = MetaIoHandler::<SocketMetadata>::with_compatibility(
100            Compatibility::Both,
101        );
102        handler.extend_with(delegate);
103
104        let server_builder = JsonRpcServerBuilder::new(handler);
105        let tcp_dispatcher = server_builder.dispatcher();
106        let server_builder = server_builder.session_meta_extractor(
107            PeerMetaExtractor::new(tcp_dispatcher.clone()),
108        );
109        let server = server_builder.start(addr)?;
110
111        let stratum = Arc::new(Stratum {
112            rpc_server: Some(server),
113            implementation,
114            tcp_dispatcher,
115        });
116
117        Ok(stratum)
118    }
119}
120
121impl PushWorkHandler for Stratum {
122    fn push_work_all(&self, payload: String) -> Result<(), Error> {
123        debug!("Pushing job {} to miners", payload);
124
125        self.implementation
126            .push_work_all(payload, &self.tcp_dispatcher)
127    }
128}
129
130impl Drop for Stratum {
131    fn drop(&mut self) {
132        // shut down rpc server
133        if let Some(server) = self.rpc_server.take() {
134            server.close()
135        }
136    }
137}
138
139struct StratumImpl {
140    /// Payload manager
141    dispatcher: Arc<dyn JobDispatcher>,
142    /// Authorized workers (socket - worker_id)
143    workers: Arc<RwLock<HashMap<SocketAddr, String>>>,
144    /// Secret if any
145    secret: Option<H256>,
146    /// Dispatch notify couinter
147    notify_counter: RwLock<u32>,
148}
149
150impl StratumImpl {
151    /// rpc method `mining.subscribe`
152    async fn subscribe(
153        &self, params: Params, meta: SocketMetadata,
154    ) -> RpcResult {
155        params.parse::<(String, String)>().map(|(worker_id, secret)|{
156            if let Some(valid_secret) = self.secret {
157                let hash = keccak(secret);
158                if hash != valid_secret {
159                    return to_value(&false);
160                }
161            }
162            debug!(target: "stratum", "New worker #{} registered", worker_id);
163            self.workers.write().insert(meta.addr().clone(), worker_id);
164            to_value(true)
165        }).map(|v| v.expect("Only true/false is returned and it's always serializable; qed"))
166    }
167
168    /// rpc method `mining.submit`
169    async fn submit(&self, params: Params, _meta: SocketMetadata) -> RpcResult {
170        Ok(Value::Array(match params {
171            Params::Array(vals) => {
172                // first two elements are service messages (worker_id & job_id)
173                match self.dispatcher.submit(
174                    vals.iter()
175                        .filter_map(|val| match *val {
176                            Value::String(ref s) => Some(s.to_owned()),
177                            _ => None,
178                        })
179                        .collect::<Vec<String>>(),
180                ) {
181                    Ok(()) => vec![to_value(true).expect("serializable")],
182                    Err(InvalidSolution(msg)) => {
183                        // When we have invalid solution, we propagate the
184                        // reason to the client
185                        warn!("Error because of invalid solution: {:?}", msg);
186                        vec![
187                            to_value(false).expect("serializable"),
188                            to_value(msg).expect("serializable"),
189                        ]
190                    }
191                    Err(submit_err) => {
192                        warn!("Error while submitting share: {:?}", submit_err);
193                        vec![to_value(false).expect("serializable")]
194                    }
195                }
196            }
197            _ => {
198                trace!(target: "stratum", "Invalid submit work format {:?}", params);
199                vec![to_value(false).expect("serializable")]
200            }
201        }))
202    }
203
204    fn push_work_all(
205        &self, payload: String, tcp_dispatcher: &Dispatcher,
206    ) -> Result<(), Error> {
207        let hup_peers = {
208            let workers = self.workers.read();
209            let next_request_id = {
210                let mut counter = self.notify_counter.write();
211                if *counter == ::std::u32::MAX {
212                    *counter = NOTIFY_COUNTER_INITIAL;
213                } else {
214                    *counter += 1
215                }
216                *counter
217            };
218
219            let mut hup_peers = HashSet::with_capacity(0); // most of the cases won't be needed, hence avoid allocation
220            let workers_msg = format!("{{ \"id\": {}, \"method\": \"mining.notify\", \"params\": {} }}", next_request_id, payload);
221            trace!(target: "stratum", "Pushing work for {} workers (payload: '{}')", workers.len(), &workers_msg);
222            for (ref addr, worker_id) in workers.iter() {
223                trace!(target: "stratum", "Pushing work to {} at addr {}", &worker_id, &addr);
224                match tcp_dispatcher.push_message(addr, workers_msg.clone()) {
225                    Err(PushMessageError::NoSuchPeer) => {
226                        debug!(target: "stratum", "Worker no longer connected: {} addr {}", &worker_id, &addr);
227                        hup_peers.insert(**addr);
228                    }
229                    Err(PushMessageError::Send(e)) => {
230                        warn!(target: "stratum", "Unexpected transport error: {:?}", e);
231                        if e.is_disconnected() {
232                            hup_peers.insert(**addr);
233                        }
234                    }
235                    Ok(_) => {}
236                }
237            }
238            hup_peers
239        };
240
241        if !hup_peers.is_empty() {
242            let mut workers = self.workers.write();
243            for hup_peer in hup_peers {
244                workers.remove(&hup_peer);
245            }
246        }
247
248        Ok(())
249    }
250}
251
252#[derive(Clone)]
253pub struct SocketMetadata {
254    addr: SocketAddr,
255    // with the new version of jsonrpc-core, SocketMetadata
256    // won't have to implement default, so this field will not
257    // have to be an Option
258    #[allow(dead_code)]
259    tcp_dispatcher: Option<Dispatcher>,
260}
261
262impl Default for SocketMetadata {
263    fn default() -> Self {
264        SocketMetadata {
265            addr: "0.0.0.0:0".parse().unwrap(),
266            tcp_dispatcher: None,
267        }
268    }
269}
270
271impl SocketMetadata {
272    pub fn addr(&self) -> &SocketAddr { &self.addr }
273}
274
275impl Metadata for SocketMetadata {}
276
277pub struct PeerMetaExtractor {
278    tcp_dispatcher: Dispatcher,
279}
280
281impl PeerMetaExtractor {
282    fn new(tcp_dispatcher: Dispatcher) -> Self {
283        PeerMetaExtractor { tcp_dispatcher }
284    }
285}
286
287impl MetaExtractor<SocketMetadata> for PeerMetaExtractor {
288    fn extract(&self, context: &RequestContext) -> SocketMetadata {
289        SocketMetadata {
290            addr: context.peer_addr,
291            tcp_dispatcher: Some(self.tcp_dispatcher.clone()),
292        }
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use std::{net::SocketAddr, sync::Arc};
300    use tokio::{
301        io::{AsyncReadExt, AsyncWriteExt},
302        net::TcpStream,
303        runtime::Runtime,
304        time::sleep,
305    };
306
307    pub struct VoidManager;
308
309    impl JobDispatcher for VoidManager {
310        fn submit(&self, _payload: Vec<String>) -> Result<(), Error> { Ok(()) }
311    }
312
313    fn dummy_request(addr: &SocketAddr, data: &str) -> Vec<u8> {
314        let runtime = Runtime::new()
315            .expect("Tokio Runtime should be created with no errors");
316
317        runtime.block_on(async {
318            let mut stream = TcpStream::connect(addr)
319                .await
320                .expect("Should connect to server");
321
322            let mut data_vec = data.as_bytes().to_vec();
323            data_vec.extend(b"\n");
324
325            stream
326                .write_all(&data_vec)
327                .await
328                .expect("Should write data to stream");
329
330            stream.shutdown().await.expect("Should shutdown write half");
331
332            let mut read_buf = Vec::with_capacity(2048);
333            stream
334                .read_to_end(&mut read_buf)
335                .await
336                .expect("Should read data from stream");
337
338            read_buf
339        })
340    }
341
342    #[test]
343    fn can_be_started() {
344        let stratum = Stratum::start(
345            &"127.0.0.1:19980".parse().unwrap(),
346            Arc::new(VoidManager),
347            None,
348        );
349        assert!(stratum.is_ok());
350    }
351
352    struct DummyManager {
353        initial_payload: String,
354    }
355
356    impl DummyManager {
357        fn build() -> DummyManager {
358            DummyManager {
359                initial_payload: r#"[ "dummy payload" ]"#.to_owned(),
360            }
361        }
362
363        fn of_initial(mut self, new_initial: &str) -> DummyManager {
364            self.initial_payload = new_initial.to_owned();
365            self
366        }
367    }
368
369    impl JobDispatcher for DummyManager {
370        fn submit(&self, _payload: Vec<String>) -> Result<(), Error> { Ok(()) }
371    }
372
373    fn terminated_str(origin: &'static str) -> String {
374        let mut s = String::new();
375        s.push_str(origin);
376        s.push_str("\n");
377        s
378    }
379
380    #[test]
381    fn can_subscribe() {
382        let addr = "127.0.0.1:19970".parse().unwrap();
383        let stratum = Stratum::start(
384            &addr,
385            Arc::new(
386                DummyManager::build()
387                    .of_initial(r#"["dummy autorize payload"]"#),
388            ),
389            None,
390        )
391        .expect("There should be no error starting stratum");
392
393        let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", ""], "id": 1}"#;
394        let response =
395            String::from_utf8(dummy_request(&addr, request)).unwrap();
396
397        assert_eq!(
398            terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#),
399            response
400        );
401        assert_eq!(1, stratum.implementation.workers.read().len());
402    }
403
404    #[test]
405    fn can_push_work() {
406        let _ = ::env_logger::try_init();
407
408        let addr = "127.0.0.1:19995".parse().unwrap();
409        let stratum = Stratum::start(
410            &addr,
411            Arc::new(
412                DummyManager::build()
413                    .of_initial(r#"["dummy autorize payload"]"#),
414            ),
415            None,
416        )
417        .expect("There should be no error starting stratum");
418
419        let mut auth_request =
420            r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", ""], "id": 1}"#
421            .as_bytes()
422            .to_vec();
423        auth_request.extend(b"\n");
424
425        let auth_response = "{\"jsonrpc\":\"2.0\",\"result\":true,\"id\":1}\n";
426
427        let runtime = Runtime::new()
428            .expect("Tokio Runtime should be created with no errors");
429
430        let response = runtime.block_on(async {
431            let mut stream = TcpStream::connect(&addr)
432                .await
433                .expect("Should connect to server");
434
435            // Write auth request
436            stream
437                .write_all(&auth_request)
438                .await
439                .expect("Should write auth request");
440
441            // Read auth response
442            let mut read_buf0 = vec![0u8; auth_response.len()];
443            stream
444                .read_exact(&mut read_buf0)
445                .await
446                .expect("Should read auth response");
447
448            assert_eq!(String::from_utf8(read_buf0).unwrap(), auth_response);
449            trace!(target: "stratum", "Received authorization confirmation");
450
451            // Wait a bit
452            sleep(std::time::Duration::from_millis(100)).await;
453
454            // Push work
455            trace!(target: "stratum", "Pusing work to peers");
456            stratum
457                .push_work_all(r#"{ "00040008", "100500" }"#.to_owned())
458                .expect("Pushing work should produce no errors");
459
460            // Wait a bit
461            sleep(std::time::Duration::from_millis(100)).await;
462
463            trace!(target: "stratum", "Ready to read work from server");
464            sleep(std::time::Duration::from_millis(100)).await;
465
466            stream.shutdown().await.expect("Should shutdown write half");
467
468            // Read work response
469            let mut read_buf1 = Vec::with_capacity(2048);
470            stream
471                .read_to_end(&mut read_buf1)
472                .await
473                .expect("Should read work response");
474
475            trace!(target: "stratum", "Received work from server");
476            read_buf1
477        });
478
479        let response =
480            String::from_utf8(response).expect("Response should be utf-8");
481
482        assert_eq!(
483            "{ \"id\": 17, \"method\": \"mining.notify\", \"params\": { \"00040008\", \"100500\" } }\n",
484            response
485        );
486    }
487
488    #[test]
489    fn test_can_subscribe_with_secret() {
490        let addr = "127.0.0.1:19971".parse().unwrap();
491        let secret_str = "test_secret";
492        let secret_hash = keccak(secret_str);
493
494        let stratum =
495            Stratum::start(&addr, Arc::new(VoidManager), Some(secret_hash))
496                .expect("Should start stratum with secret");
497
498        let request = format!(
499            r#"{{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", "{}"], "id": 1}}"#,
500            secret_str
501        );
502
503        let response =
504            String::from_utf8(dummy_request(&addr, &request)).unwrap();
505
506        assert_eq!(
507            terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#),
508            response
509        );
510        assert_eq!(1, stratum.implementation.workers.read().len());
511    }
512
513    #[test]
514    fn test_can_subscribe_with_invalid_secret() {
515        let addr = "127.0.0.1:19972".parse().unwrap();
516        let secret_str = "test_secret";
517        let secret_hash = keccak(secret_str);
518        let stratum =
519            Stratum::start(&addr, Arc::new(VoidManager), Some(secret_hash))
520                .expect("Should start stratum with secret");
521
522        let request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", "wrong_secret"], "id": 2}"#;
523        let response =
524            String::from_utf8(dummy_request(&addr, request)).unwrap();
525
526        assert_eq!(
527            terminated_str(r#"{"jsonrpc":"2.0","result":false,"id":2}"#),
528            response
529        );
530        assert_eq!(0, stratum.implementation.workers.read().len());
531    }
532
533    #[test]
534    fn test_can_submit() {
535        let addr = "127.0.0.1:19973".parse().unwrap();
536
537        struct TestDispatcher {
538            submissions: Arc<RwLock<Vec<Vec<String>>>>,
539        }
540
541        impl JobDispatcher for TestDispatcher {
542            fn submit(&self, payload: Vec<String>) -> Result<(), Error> {
543                self.submissions.write().push(payload);
544                Ok(())
545            }
546        }
547
548        let test_dispatcher = TestDispatcher {
549            submissions: Arc::new(RwLock::new(Vec::new())),
550        };
551        let submissions = test_dispatcher.submissions.clone();
552
553        let stratum = Stratum::start(&addr, Arc::new(test_dispatcher), None)
554            .expect("Should start stratum");
555
556        // subscribe
557        let subscribe_request = r#"{"jsonrpc": "2.0", "method": "mining.subscribe", "params": ["miner1", ""], "id": 1}"#;
558        let subscribe_response =
559            String::from_utf8(dummy_request(&addr, subscribe_request)).unwrap();
560
561        assert_eq!(
562            terminated_str(r#"{"jsonrpc":"2.0","result":true,"id":1}"#),
563            subscribe_response
564        );
565
566        // submit
567
568        let submit_request = r#"{"jsonrpc": "2.0", "method": "mining.submit", "params": ["test_miner", "job_id", "0x1", "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"], "id": 2}"#;
569
570        let submit_response =
571            String::from_utf8(dummy_request(&addr, submit_request)).unwrap();
572
573        assert_eq!(
574            terminated_str(r#"{"jsonrpc":"2.0","result":[true],"id":2}"#),
575            submit_response
576        );
577
578        assert_eq!(1, submissions.read().len());
579        assert_eq!(
580            vec![
581            "test_miner",
582            "job_id",
583            "0x1",
584            "0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"
585        ],
586            submissions.read()[0]
587        );
588
589        assert_eq!(1, stratum.implementation.workers.read().len());
590    }
591}