cfxcore/consensus/
pos_handler.rs

1use std::sync::{mpsc, Arc, Weak};
2
3use futures::channel::mpsc as futures_mpsc;
4use once_cell::sync::OnceCell;
5
6use cfx_types::{H256, U256, U64};
7use diem_config::{config::NodeConfig, keys::ConfigKey};
8use diem_crypto::HashValue;
9use diem_types::{
10    contract_event::ContractEvent,
11    epoch_state::EpochState,
12    reward_distribution_event::RewardDistributionEventV2,
13    term_state::{DisputeEvent, UnlockEvent},
14    validator_config::{ConsensusPrivateKey, ConsensusVRFPrivateKey},
15};
16use keccak_hash::keccak;
17use primitives::pos::{NodeId, PosBlockId};
18use storage_interface::{DBReaderForPoW, DbReader};
19
20use crate::{
21    genesis_block::GenesisPosState,
22    pos::{
23        consensus::{
24            network::{
25                NetworkReceivers as ConsensusNetworkReceivers,
26                NetworkTask as ConsensusNetworkTask,
27            },
28            ConsensusDB, TestCommand,
29        },
30        mempool::network::{
31            NetworkReceivers as MemPoolNetworkReceivers,
32            NetworkTask as MempoolNetworkTask,
33        },
34        pos::{
35            start_pos_consensus, PosChainParams, PosDropHandle, PosNodeKeys,
36        },
37        protocol::sync_protocol::HotStuffSynchronizationProtocol,
38    },
39    sync::ProtocolConfiguration,
40    ConsensusGraph,
41};
42
43use cached_pos_ledger_db::CachedPosLedgerDB;
44use consensus_types::block::Block;
45use diem_types::{
46    account_address::from_consensus_public_key,
47    block_info::{PivotBlockDecision, Round},
48    chain_id::ChainId,
49    epoch_state::HARDCODED_COMMITTEE_FOR_EPOCH,
50    term_state::pos_state_config::{PosStateConfig, POS_STATE_CONFIG},
51    transaction::TransactionPayload,
52};
53use network::NetworkService;
54use parking_lot::Mutex;
55use pos_ledger_db::PosLedgerDB;
56use std::{fs, io::Read, path::PathBuf};
57
58pub type PosVerifier = PosHandler;
59
60/// This includes the interfaces that the PoW consensus needs from the PoS
61/// consensus.
62///
63/// We assume the PoS service will be always available after `initialize()`
64/// returns, so all the other interfaces will panic if the PoS service is not
65/// ready.
66pub trait PosInterface: Send + Sync {
67    /// Wait for initialization.
68    fn initialize(&self) -> Result<(), String>;
69
70    /// Get a PoS block by its ID.
71    ///
72    /// Return `None` if the block does not exist or is not committed.
73    fn get_committed_block(&self, h: &PosBlockId) -> Option<PosBlock>;
74
75    /// Return the latest committed PoS block ID.
76    /// This will become the PoS reference of the mined PoW block.
77    fn latest_block(&self) -> PosBlockId;
78
79    fn get_events(
80        &self, from: &PosBlockId, to: &PosBlockId,
81    ) -> Vec<ContractEvent>;
82
83    fn get_epoch_ending_blocks(
84        &self, start_epoch: u64, end_epoch: u64,
85    ) -> Vec<PosBlockId>;
86
87    fn get_reward_event(&self, epoch: u64)
88        -> Option<RewardDistributionEventV2>;
89
90    fn get_epoch_state(&self, block_id: &PosBlockId) -> EpochState;
91
92    fn pos_ledger_db(&self) -> &Arc<PosLedgerDB>;
93
94    fn consensus_db(&self) -> &Arc<ConsensusDB>;
95
96    fn cached_db(&self) -> &Arc<CachedPosLedgerDB>;
97}
98
99#[allow(unused)]
100pub struct PosBlock {
101    hash: PosBlockId,
102    epoch: u64,
103    round: u64,
104    pivot_decision: H256,
105    version: u64,
106    view: u64,
107    /* parent: PosBlockId,
108     * author: NodeId,
109     * voters: Vec<NodeId>, */
110}
111
112pub struct PosHandler {
113    pos: OnceCell<Box<dyn PosInterface>>,
114    network: Mutex<Option<Arc<NetworkService>>>,
115    // Keep all tokio Runtime so they will not be dropped directly.
116    drop_handle: Mutex<Option<PosDropHandle>>,
117    consensus_network_receiver: Mutex<Option<ConsensusNetworkReceivers>>,
118    mempool_network_receiver: Mutex<Option<MemPoolNetworkReceivers>>,
119    test_command_sender: Mutex<Option<futures_mpsc::Sender<TestCommand>>>,
120    enable_height: u64,
121    hsb_protocol_handler: Option<Arc<HotStuffSynchronizationProtocol>>,
122    pub conf: PosConfiguration,
123}
124
125impl PosHandler {
126    pub fn new(
127        network: Option<Arc<NetworkService>>, conf: PosConfiguration,
128        enable_height: u64,
129    ) -> Self {
130        let mut pos = Self {
131            pos: OnceCell::new(),
132            network: Mutex::new(network.clone()),
133            drop_handle: Mutex::new(None),
134            consensus_network_receiver: Mutex::new(None),
135            mempool_network_receiver: Mutex::new(None),
136            test_command_sender: Mutex::new(None),
137            enable_height,
138            hsb_protocol_handler: None,
139            conf,
140        };
141        if let Some(network) = &network {
142            // initialize hotstuff protocol handler
143            let (consensus_network_task, consensus_network_receiver) =
144                ConsensusNetworkTask::new();
145            let (mempool_network_task, mempool_network_receiver) =
146                MempoolNetworkTask::new();
147            let own_node_hash = keccak(
148                network.net_key_pair().expect("Error node key").public(),
149            );
150            let protocol_handler =
151                Arc::new(HotStuffSynchronizationProtocol::new(
152                    own_node_hash,
153                    consensus_network_task,
154                    mempool_network_task,
155                    pos.conf.protocol_conf.clone(),
156                ));
157            protocol_handler.clone().register(network.clone()).unwrap();
158            *pos.consensus_network_receiver.lock() =
159                Some(consensus_network_receiver);
160            *pos.mempool_network_receiver.lock() =
161                Some(mempool_network_receiver);
162            pos.hsb_protocol_handler = Some(protocol_handler);
163        }
164        pos
165    }
166
167    pub fn initialize(
168        &self, consensus: Arc<ConsensusGraph>,
169    ) -> Result<(), String> {
170        if self.pos.get().is_some() {
171            warn!("Initializing already-initialized PosHandler!");
172            return Ok(());
173        }
174        let pos_config_path = match self.conf.diem_conf_path.as_ref() {
175            Some(path) => PathBuf::from(path),
176            None => bail!("No pos config!"),
177        };
178
179        POS_STATE_CONFIG
180            .set(self.conf.pos_state_config.clone())
181            .map_err(|e| {
182                format!("Failed to set pos state config: e={:?}", e)
183            })?;
184        let mut pos_config = NodeConfig::load(pos_config_path)
185            .map_err(|e| format!("Failed to load node config: e={:?}", e))?;
186        HARDCODED_COMMITTEE_FOR_EPOCH
187            .set(pos_config.consensus.hardcoded_epoch_committee.clone())
188            .map_err(|e| {
189                format!("Failed to set hardcoded_epoch_committee: e={:?}", e)
190            })?;
191
192        pos_config.set_data_dir(pos_config.data_dir().to_path_buf());
193        let pos_genesis = read_initial_nodes_from_file(
194            self.conf.pos_initial_nodes_path.as_str(),
195        )?;
196        let network = self.network.lock().take().expect("pos not initialized");
197        let (test_command_sender, test_command_receiver) =
198            futures_mpsc::channel(1024);
199
200        let node_keys = PosNodeKeys {
201            author: from_consensus_public_key(
202                &self.conf.bls_key.public_key(),
203                &self.conf.vrf_key.public_key(),
204            ),
205            consensus_private_key: self.conf.bls_key.private_key(),
206            vrf_private_key: self.conf.vrf_key.private_key(),
207        };
208        let chain_params = PosChainParams {
209            chain_id: ChainId::new(network.network_id()),
210            vrf_proposal_threshold: self.conf.vrf_proposal_threshold,
211        };
212
213        let pos_drop_handle = start_pos_consensus(
214            &pos_config,
215            network,
216            self.conf.protocol_conf.clone(),
217            node_keys,
218            chain_params,
219            pos_genesis,
220            self.consensus_network_receiver
221                .lock()
222                .take()
223                .expect("not initialized"),
224            self.mempool_network_receiver
225                .lock()
226                .take()
227                .expect("not initialized"),
228            test_command_receiver,
229            self.hsb_protocol_handler.clone().expect("set in new"),
230        );
231        debug!("PoS initialized");
232        let pos_connection = PosConnection::new(
233            pos_drop_handle.pos_ledger_db.clone(),
234            pos_drop_handle.consensus_db.clone(),
235            pos_drop_handle.cached_db.clone(),
236        );
237        pos_drop_handle.pow_handler.initialize(consensus);
238        if self.pos.set(Box::new(pos_connection)).is_err() {
239            bail!("PoS initialized twice!");
240        }
241        *self.test_command_sender.lock() = Some(test_command_sender);
242        *self.drop_handle.lock() = Some(pos_drop_handle);
243        Ok(())
244    }
245
246    pub fn config(&self) -> &PosConfiguration { &self.conf }
247
248    fn pos(&self) -> &Box<dyn PosInterface> { self.pos.get().unwrap() }
249
250    pub fn pos_option(&self) -> Option<&Box<dyn PosInterface>> {
251        self.pos.get()
252    }
253
254    pub fn is_enabled_at_height(&self, height: u64) -> bool {
255        height >= self.enable_height
256    }
257
258    pub fn enable_height(&self) -> u64 { self.enable_height }
259
260    pub fn is_committed(&self, h: &PosBlockId) -> bool {
261        self.pos().get_committed_block(h).is_some()
262    }
263
264    /// Check if `me` is equal to or extends `preds` (parent and referees).
265    ///
266    /// Since committed PoS blocks form a chain, and no pos block should be
267    /// skipped, we only need to check if the round of `me` is equal to or plus
268    /// one compared with the predecessors' rounds.
269    ///
270    /// Return `false` if `me` or `preds` contains non-existent PoS blocks.
271    pub fn verify_against_predecessors(
272        &self, me: &PosBlockId, preds: &Vec<PosBlockId>,
273    ) -> bool {
274        let me_round = match self.pos().get_committed_block(me) {
275            None => {
276                warn!("No pos block for me={:?}", me);
277                return false;
278            }
279            Some(b) => (b.epoch, b.round),
280        };
281        for p in preds {
282            let p_round = match self.pos().get_committed_block(p) {
283                None => {
284                    warn!("No pos block for pred={:?}", p);
285                    return false;
286                }
287                Some(b) => (b.epoch, b.round),
288            };
289            if me_round < p_round {
290                warn!("Incorrect round: me={:?}, pred={:?}", me_round, p_round);
291                return false;
292            }
293        }
294        true
295    }
296
297    pub fn get_pivot_decision(&self, h: &PosBlockId) -> Option<H256> {
298        // Return None if `pos` has not been initialized
299        self.pos
300            .get()?
301            .get_committed_block(h)
302            .map(|b| b.pivot_decision)
303    }
304
305    pub fn get_latest_pos_reference(&self) -> PosBlockId {
306        self.pos().latest_block()
307    }
308
309    pub fn get_pos_view(&self, h: &PosBlockId) -> Option<u64> {
310        self.pos().get_committed_block(h).map(|b| b.view)
311    }
312
313    pub fn get_unlock_nodes(
314        &self, h: &PosBlockId, parent_pos_ref: &PosBlockId,
315    ) -> Vec<(NodeId, u64)> {
316        let unlock_event_key = UnlockEvent::event_key();
317        let mut unlock_nodes = Vec::new();
318        for event in self.pos().get_events(parent_pos_ref, h) {
319            if *event.key() == unlock_event_key {
320                let unlock_event = UnlockEvent::from_bytes(event.event_data())
321                    .expect("key checked");
322                let node_id = H256::from_slice(unlock_event.node_id.as_ref());
323                let votes = unlock_event.unlocked;
324                unlock_nodes.push((node_id, votes));
325            }
326        }
327        unlock_nodes
328    }
329
330    pub fn get_disputed_nodes(
331        &self, h: &PosBlockId, parent_pos_ref: &PosBlockId,
332    ) -> Vec<NodeId> {
333        let dispute_event_key = DisputeEvent::event_key();
334        let mut disputed_nodes = Vec::new();
335        for event in self.pos().get_events(parent_pos_ref, h) {
336            if *event.key() == dispute_event_key {
337                let dispute_event =
338                    DisputeEvent::from_bytes(event.event_data())
339                        .expect("key checked");
340                disputed_nodes
341                    .push(H256::from_slice(dispute_event.node_id.as_ref()));
342            }
343        }
344        disputed_nodes
345    }
346
347    pub fn get_reward_distribution_event(
348        &self, h: &PosBlockId, parent_pos_ref: &PosBlockId,
349    ) -> Option<Vec<(u64, RewardDistributionEventV2)>> {
350        if h == parent_pos_ref {
351            return None;
352        }
353        let me_block = self.pos().get_committed_block(h)?;
354        let parent_block = self.pos().get_committed_block(parent_pos_ref)?;
355        if me_block.epoch == parent_block.epoch {
356            return None;
357        }
358        let mut events = Vec::new();
359        for epoch in parent_block.epoch..me_block.epoch {
360            events.push((epoch, self.pos().get_reward_event(epoch)?));
361        }
362        Some(events)
363    }
364
365    pub fn pos_ledger_db(&self) -> &Arc<PosLedgerDB> {
366        self.pos().pos_ledger_db()
367    }
368
369    pub fn consensus_db(&self) -> &Arc<ConsensusDB> {
370        self.pos().consensus_db()
371    }
372
373    pub fn cached_db(&self) -> &Arc<CachedPosLedgerDB> {
374        self.pos().cached_db()
375    }
376
377    pub fn stop(&self) -> Option<(Weak<PosLedgerDB>, Weak<ConsensusDB>)> {
378        self.network.lock().take();
379        self.consensus_network_receiver.lock().take();
380        self.mempool_network_receiver.lock().take();
381        self.drop_handle.lock().take().map(|pos_drop_handle| {
382            let pos_ledger_db = pos_drop_handle.pos_ledger_db.clone();
383            let consensus_db = pos_drop_handle.consensus_db.clone();
384            (
385                Arc::downgrade(&pos_ledger_db),
386                Arc::downgrade(&consensus_db),
387            )
388        })
389    }
390}
391
392/// The functions used in tests to construct attack cases
393impl PosHandler {
394    pub fn force_vote_proposal(&self, block_id: H256) -> anyhow::Result<()> {
395        self.test_command_sender
396            .lock()
397            .as_mut()
398            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
399            .try_send(TestCommand::ForceVoteProposal(h256_to_diem_hash(
400                &block_id,
401            )))
402            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
403    }
404
405    pub fn force_propose(
406        &self, round: U64, parent_block_id: H256,
407        payload: Vec<TransactionPayload>,
408    ) -> anyhow::Result<()> {
409        self.test_command_sender
410            .lock()
411            .as_mut()
412            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
413            .try_send(TestCommand::ForcePropose {
414                round: round.as_u64(),
415                parent_id: h256_to_diem_hash(&parent_block_id),
416                payload,
417            })
418            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
419    }
420
421    pub fn trigger_timeout(&self, timeout_type: String) -> anyhow::Result<()> {
422        let command = match timeout_type.as_str() {
423            "local" => TestCommand::LocalTimeout,
424            "proposal" => TestCommand::ProposalTimeOut,
425            "new_round" => TestCommand::NewRoundTimeout,
426            _ => anyhow::bail!("Unknown timeout type"),
427        };
428        self.test_command_sender
429            .lock()
430            .as_mut()
431            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
432            .try_send(command)
433            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
434    }
435
436    pub fn force_sign_pivot_decision(
437        &self, pivot_decision: PivotBlockDecision,
438    ) -> anyhow::Result<()> {
439        self.test_command_sender
440            .lock()
441            .as_mut()
442            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
443            .try_send(TestCommand::BroadcastPivotDecision(pivot_decision))
444            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
445    }
446
447    pub fn get_chosen_proposal(&self) -> anyhow::Result<Option<Block>> {
448        let (tx, rx) = mpsc::sync_channel(1);
449        self.test_command_sender
450            .lock()
451            .as_mut()
452            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
453            .try_send(TestCommand::GetChosenProposal(tx))
454            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
455        rx.recv().map_err(|e| anyhow::anyhow!("recv: err={:?}", e))
456    }
457
458    pub fn stop_election(&self) -> anyhow::Result<Option<Round>> {
459        let (tx, rx) = mpsc::sync_channel(1);
460        self.test_command_sender
461            .lock()
462            .as_mut()
463            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
464            .try_send(TestCommand::StopElection(tx))
465            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
466        rx.recv().map_err(|e| anyhow::anyhow!("recv: err={:?}", e))
467    }
468
469    pub fn start_voting(&self, initialize: bool) -> anyhow::Result<()> {
470        let (tx, rx) = mpsc::sync_channel(1);
471        self.test_command_sender
472            .lock()
473            .as_mut()
474            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
475            .try_send(TestCommand::StartVoting((initialize, tx)))
476            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
477        rx.recv()?
478    }
479
480    pub fn stop_voting(&self) -> anyhow::Result<()> {
481        let (tx, rx) = mpsc::sync_channel(1);
482        self.test_command_sender
483            .lock()
484            .as_mut()
485            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
486            .try_send(TestCommand::StopVoting(tx))
487            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
488        rx.recv()?
489    }
490
491    pub fn voting_status(&self) -> anyhow::Result<bool> {
492        let (tx, rx) = mpsc::sync_channel(1);
493        self.test_command_sender
494            .lock()
495            .as_mut()
496            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
497            .try_send(TestCommand::GetVotingStatus(tx))
498            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
499        Ok(rx.recv()?)
500    }
501}
502
503pub struct PosConnection {
504    pos_storage: Arc<PosLedgerDB>,
505    consensus_db: Arc<ConsensusDB>,
506    pos_cache_db: Arc<CachedPosLedgerDB>,
507}
508
509impl PosConnection {
510    pub fn new(
511        pos_storage: Arc<PosLedgerDB>, consensus_db: Arc<ConsensusDB>,
512        pos_cache_db: Arc<CachedPosLedgerDB>,
513    ) -> Self {
514        Self {
515            pos_storage,
516            consensus_db,
517            pos_cache_db,
518        }
519    }
520}
521
522impl PosInterface for PosConnection {
523    fn initialize(&self) -> Result<(), String> { Ok(()) }
524
525    fn get_committed_block(&self, h: &PosBlockId) -> Option<PosBlock> {
526        debug!("get_committed_block: {:?}", h);
527        let block_hash = h256_to_diem_hash(h);
528        let committed_block = self
529            .pos_storage
530            .get_committed_block_by_hash(&block_hash)
531            .ok()?;
532
533        /*
534        let parent;
535        let author;
536        if *h == PosBlockId::default() {
537            // genesis has no block, and its parent/author will not be used.
538            parent = PosBlockId::default();
539            author = NodeId::default();
540        } else {
541            let block = self
542                .pos_consensus_db
543                .get_ledger_block(&block_hash)
544                .map_err(|e| {
545                    warn!("get_committed_block: err={:?}", e);
546                    e
547                })
548                .ok()??;
549            debug_assert_eq!(block.id(), block_hash);
550            parent = diem_hash_to_h256(&block.parent_id());
551            // NIL block has no author.
552            author = H256::from_slice(block.author().unwrap_or(Default::default()).as_ref());
553        }
554         */
555        debug!("pos_handler gets committed_block={:?}", committed_block);
556        Some(PosBlock {
557            hash: *h,
558            epoch: committed_block.epoch,
559            round: committed_block.round,
560            pivot_decision: committed_block.pivot_decision.block_hash,
561            view: committed_block.view,
562            /* parent,
563             * author,
564             * voters: ledger_info
565             *     .signatures()
566             *     .keys()
567             *     .map(|author| H256::from_slice(author.as_ref()))
568             *     .collect(), */
569            version: committed_block.version,
570        })
571    }
572
573    fn latest_block(&self) -> PosBlockId {
574        diem_hash_to_h256(
575            &self
576                .pos_storage
577                .get_latest_ledger_info_option()
578                .expect("Initialized")
579                .ledger_info()
580                .consensus_block_id(),
581        )
582    }
583
584    fn get_events(
585        &self, from: &PosBlockId, to: &PosBlockId,
586    ) -> Vec<ContractEvent> {
587        let start_version = self
588            .pos_storage
589            .get_committed_block_by_hash(&h256_to_diem_hash(from))
590            .expect("err reading ledger info for from")
591            .version;
592        let end_version = self
593            .pos_storage
594            .get_committed_block_by_hash(&h256_to_diem_hash(to))
595            .expect("err reading ledger info for to")
596            .version;
597        self.pos_storage
598            .get_events_by_version(start_version, end_version)
599            .expect("err reading events")
600    }
601
602    fn get_epoch_ending_blocks(
603        &self, start_epoch: u64, end_epoch: u64,
604    ) -> Vec<PosBlockId> {
605        self.pos_storage
606            .get_epoch_ending_blocks(start_epoch, end_epoch)
607            .expect("err reading epoch ending blocks")
608            .into_iter()
609            .map(|h| diem_hash_to_h256(&h))
610            .collect()
611    }
612
613    fn get_reward_event(
614        &self, epoch: u64,
615    ) -> Option<RewardDistributionEventV2> {
616        self.pos_storage.get_reward_event(epoch).ok()
617    }
618
619    fn get_epoch_state(&self, block_id: &PosBlockId) -> EpochState {
620        self.pos_storage
621            .get_pos_state(&h256_to_diem_hash(block_id))
622            .expect("parent of an ending_epoch block")
623            .epoch_state()
624            .clone()
625    }
626
627    fn pos_ledger_db(&self) -> &Arc<PosLedgerDB> { &self.pos_storage }
628
629    fn consensus_db(&self) -> &Arc<ConsensusDB> { &self.consensus_db }
630
631    fn cached_db(&self) -> &Arc<CachedPosLedgerDB> { &self.pos_cache_db }
632}
633
634pub struct PosConfiguration {
635    pub bls_key: ConfigKey<ConsensusPrivateKey>,
636    pub vrf_key: ConfigKey<ConsensusVRFPrivateKey>,
637    pub diem_conf_path: Option<String>,
638    pub protocol_conf: ProtocolConfiguration,
639    pub pos_initial_nodes_path: String,
640    pub vrf_proposal_threshold: U256,
641    pub pos_state_config: PosStateConfig,
642}
643
644fn diem_hash_to_h256(h: &HashValue) -> PosBlockId { H256::from(h.as_ref()) }
645fn h256_to_diem_hash(h: &PosBlockId) -> HashValue {
646    HashValue::new(h.to_fixed_bytes())
647}
648
649pub fn save_initial_nodes_to_file(path: &str, genesis_nodes: GenesisPosState) {
650    fs::write(path, serde_json::to_string(&genesis_nodes).unwrap()).unwrap();
651}
652
653pub fn read_initial_nodes_from_file(
654    path: &str,
655) -> Result<GenesisPosState, String> {
656    let mut file = fs::File::open(path)
657        .map_err(|e| format!("failed to open initial nodes file: {:?}", e))?;
658
659    let mut nodes_str = String::new();
660    file.read_to_string(&mut nodes_str)
661        .map_err(|e| format!("failed to read initial nodes file: {:?}", e))?;
662
663    serde_json::from_str(nodes_str.as_str())
664        .map_err(|e| format!("failed to parse initial nodes file: {:?}", e))
665}