cfxcore/consensus/
pos_handler.rs

1use std::sync::{mpsc, Arc, Weak};
2
3use once_cell::sync::OnceCell;
4
5use cfx_types::{H256, U256, U64};
6use diem_config::{config::NodeConfig, keys::ConfigKey};
7use diem_crypto::HashValue;
8use diem_types::{
9    contract_event::ContractEvent,
10    epoch_state::EpochState,
11    reward_distribution_event::RewardDistributionEventV2,
12    term_state::{DisputeEvent, UnlockEvent},
13    validator_config::{ConsensusPrivateKey, ConsensusVRFPrivateKey},
14};
15use keccak_hash::keccak;
16use primitives::pos::{NodeId, PosBlockId};
17use storage_interface::{DBReaderForPoW, DbReader};
18
19use crate::{
20    genesis_block::GenesisPosState,
21    pos::{
22        consensus::{
23            network::{
24                NetworkReceivers as ConsensusNetworkReceivers,
25                NetworkTask as ConsensusNetworkTask,
26            },
27            ConsensusDB, TestCommand,
28        },
29        mempool::network::{
30            NetworkReceivers as MemPoolNetworkReceivers,
31            NetworkTask as MempoolNetworkTask,
32        },
33        pos::{start_pos_consensus, PosDropHandle},
34        protocol::sync_protocol::HotStuffSynchronizationProtocol,
35    },
36    sync::ProtocolConfiguration,
37    ConsensusGraph,
38};
39
40use cached_pos_ledger_db::CachedPosLedgerDB;
41use consensus_types::block::Block;
42use diem_config::config::SafetyRulesTestConfig;
43use diem_types::{
44    account_address::from_consensus_public_key,
45    block_info::{PivotBlockDecision, Round},
46    chain_id::ChainId,
47    epoch_state::HARDCODED_COMMITTEE_FOR_EPOCH,
48    term_state::pos_state_config::{PosStateConfig, POS_STATE_CONFIG},
49    transaction::TransactionPayload,
50};
51use network::NetworkService;
52use parking_lot::Mutex;
53use pos_ledger_db::PosLedgerDB;
54use std::{fs, io::Read, path::PathBuf};
55
56pub type PosVerifier = PosHandler;
57
58/// This includes the interfaces that the PoW consensus needs from the PoS
59/// consensus.
60///
61/// We assume the PoS service will be always available after `initialize()`
62/// returns, so all the other interfaces will panic if the PoS service is not
63/// ready.
64pub trait PosInterface: Send + Sync {
65    /// Wait for initialization.
66    fn initialize(&self) -> Result<(), String>;
67
68    /// Get a PoS block by its ID.
69    ///
70    /// Return `None` if the block does not exist or is not committed.
71    fn get_committed_block(&self, h: &PosBlockId) -> Option<PosBlock>;
72
73    /// Return the latest committed PoS block ID.
74    /// This will become the PoS reference of the mined PoW block.
75    fn latest_block(&self) -> PosBlockId;
76
77    fn get_events(
78        &self, from: &PosBlockId, to: &PosBlockId,
79    ) -> Vec<ContractEvent>;
80
81    fn get_epoch_ending_blocks(
82        &self, start_epoch: u64, end_epoch: u64,
83    ) -> Vec<PosBlockId>;
84
85    fn get_reward_event(&self, epoch: u64)
86        -> Option<RewardDistributionEventV2>;
87
88    fn get_epoch_state(&self, block_id: &PosBlockId) -> EpochState;
89
90    fn pos_ledger_db(&self) -> &Arc<PosLedgerDB>;
91
92    fn consensus_db(&self) -> &Arc<ConsensusDB>;
93
94    fn cached_db(&self) -> &Arc<CachedPosLedgerDB>;
95}
96
97#[allow(unused)]
98pub struct PosBlock {
99    hash: PosBlockId,
100    epoch: u64,
101    round: u64,
102    pivot_decision: H256,
103    version: u64,
104    view: u64,
105    /* parent: PosBlockId,
106     * author: NodeId,
107     * voters: Vec<NodeId>, */
108}
109
110pub struct PosHandler {
111    pos: OnceCell<Box<dyn PosInterface>>,
112    network: Mutex<Option<Arc<NetworkService>>>,
113    // Keep all tokio Runtime so they will not be dropped directly.
114    drop_handle: Mutex<Option<PosDropHandle>>,
115    consensus_network_receiver: Mutex<Option<ConsensusNetworkReceivers>>,
116    mempool_network_receiver: Mutex<Option<MemPoolNetworkReceivers>>,
117    test_command_sender: Mutex<Option<channel::Sender<TestCommand>>>,
118    enable_height: u64,
119    hsb_protocol_handler: Option<Arc<HotStuffSynchronizationProtocol>>,
120    pub conf: PosConfiguration,
121}
122
123impl PosHandler {
124    pub fn new(
125        network: Option<Arc<NetworkService>>, conf: PosConfiguration,
126        enable_height: u64,
127    ) -> Self {
128        let mut pos = Self {
129            pos: OnceCell::new(),
130            network: Mutex::new(network.clone()),
131            drop_handle: Mutex::new(None),
132            consensus_network_receiver: Mutex::new(None),
133            mempool_network_receiver: Mutex::new(None),
134            test_command_sender: Mutex::new(None),
135            enable_height,
136            hsb_protocol_handler: None,
137            conf,
138        };
139        if let Some(network) = &network {
140            // initialize hotstuff protocol handler
141            let (consensus_network_task, consensus_network_receiver) =
142                ConsensusNetworkTask::new();
143            let (mempool_network_task, mempool_network_receiver) =
144                MempoolNetworkTask::new();
145            let own_node_hash = keccak(
146                network.net_key_pair().expect("Error node key").public(),
147            );
148            let protocol_handler =
149                Arc::new(HotStuffSynchronizationProtocol::new(
150                    own_node_hash,
151                    consensus_network_task,
152                    mempool_network_task,
153                    pos.conf.protocol_conf.clone(),
154                ));
155            protocol_handler.clone().register(network.clone()).unwrap();
156            *pos.consensus_network_receiver.lock() =
157                Some(consensus_network_receiver);
158            *pos.mempool_network_receiver.lock() =
159                Some(mempool_network_receiver);
160            pos.hsb_protocol_handler = Some(protocol_handler);
161        }
162        pos
163    }
164
165    pub fn initialize(
166        &self, consensus: Arc<ConsensusGraph>,
167    ) -> Result<(), String> {
168        if self.pos.get().is_some() {
169            warn!("Initializing already-initialized PosHandler!");
170            return Ok(());
171        }
172        let pos_config_path = match self.conf.diem_conf_path.as_ref() {
173            Some(path) => PathBuf::from(path),
174            None => bail!("No pos config!"),
175        };
176
177        POS_STATE_CONFIG
178            .set(self.conf.pos_state_config.clone())
179            .map_err(|e| {
180                format!("Failed to set pos state config: e={:?}", e)
181            })?;
182        let mut pos_config = NodeConfig::load(pos_config_path)
183            .map_err(|e| format!("Failed to load node config: e={:?}", e))?;
184        HARDCODED_COMMITTEE_FOR_EPOCH
185            .set(pos_config.consensus.hardcoded_epoch_committee.clone())
186            .map_err(|e| {
187                format!("Failed to set hardcoded_epoch_committee: e={:?}", e)
188            })?;
189
190        pos_config.set_data_dir(pos_config.data_dir().to_path_buf());
191        let pos_genesis = read_initial_nodes_from_file(
192            self.conf.pos_initial_nodes_path.as_str(),
193        )?;
194        let network = self.network.lock().take().expect("pos not initialized");
195        let (test_command_sender, test_command_receiver) =
196            channel::new_test(1024);
197
198        pos_config.consensus.safety_rules.test = Some(SafetyRulesTestConfig {
199            author: from_consensus_public_key(
200                &self.conf.bls_key.public_key(),
201                &self.conf.vrf_key.public_key(),
202            ),
203            consensus_key: Some(self.conf.bls_key.clone()),
204            execution_key: Some(self.conf.bls_key.clone()),
205            waypoint: Some(pos_config.base.waypoint.waypoint()),
206        });
207        pos_config.consensus.safety_rules.vrf_private_key =
208            Some(self.conf.vrf_key.clone());
209        pos_config.consensus.safety_rules.export_consensus_key = true;
210        pos_config.consensus.safety_rules.vrf_proposal_threshold =
211            self.conf.vrf_proposal_threshold;
212        pos_config.consensus.chain_id = ChainId::new(network.network_id());
213
214        let pos_drop_handle = start_pos_consensus(
215            &pos_config,
216            network,
217            self.conf.protocol_conf.clone(),
218            Some((
219                self.conf.bls_key.public_key(),
220                self.conf.vrf_key.public_key(),
221            )),
222            pos_genesis,
223            self.consensus_network_receiver
224                .lock()
225                .take()
226                .expect("not initialized"),
227            self.mempool_network_receiver
228                .lock()
229                .take()
230                .expect("not initialized"),
231            test_command_receiver,
232            self.hsb_protocol_handler.clone().expect("set in new"),
233        );
234        debug!("PoS initialized");
235        let pos_connection = PosConnection::new(
236            pos_drop_handle.pos_ledger_db.clone(),
237            pos_drop_handle.consensus_db.clone(),
238            pos_drop_handle.cached_db.clone(),
239        );
240        pos_drop_handle.pow_handler.initialize(consensus);
241        if self.pos.set(Box::new(pos_connection)).is_err() {
242            bail!("PoS initialized twice!");
243        }
244        *self.test_command_sender.lock() = Some(test_command_sender);
245        *self.drop_handle.lock() = Some(pos_drop_handle);
246        Ok(())
247    }
248
249    pub fn config(&self) -> &PosConfiguration { &self.conf }
250
251    fn pos(&self) -> &Box<dyn PosInterface> { self.pos.get().unwrap() }
252
253    pub fn pos_option(&self) -> Option<&Box<dyn PosInterface>> {
254        self.pos.get()
255    }
256
257    pub fn is_enabled_at_height(&self, height: u64) -> bool {
258        height >= self.enable_height
259    }
260
261    pub fn enable_height(&self) -> u64 { self.enable_height }
262
263    pub fn is_committed(&self, h: &PosBlockId) -> bool {
264        self.pos().get_committed_block(h).is_some()
265    }
266
267    /// Check if `me` is equal to or extends `preds` (parent and referees).
268    ///
269    /// Since committed PoS blocks form a chain, and no pos block should be
270    /// skipped, we only need to check if the round of `me` is equal to or plus
271    /// one compared with the predecessors' rounds.
272    ///
273    /// Return `false` if `me` or `preds` contains non-existent PoS blocks.
274    pub fn verify_against_predecessors(
275        &self, me: &PosBlockId, preds: &Vec<PosBlockId>,
276    ) -> bool {
277        let me_round = match self.pos().get_committed_block(me) {
278            None => {
279                warn!("No pos block for me={:?}", me);
280                return false;
281            }
282            Some(b) => (b.epoch, b.round),
283        };
284        for p in preds {
285            let p_round = match self.pos().get_committed_block(p) {
286                None => {
287                    warn!("No pos block for pred={:?}", p);
288                    return false;
289                }
290                Some(b) => (b.epoch, b.round),
291            };
292            if me_round < p_round {
293                warn!("Incorrect round: me={:?}, pred={:?}", me_round, p_round);
294                return false;
295            }
296        }
297        true
298    }
299
300    pub fn get_pivot_decision(&self, h: &PosBlockId) -> Option<H256> {
301        // Return None if `pos` has not been initialized
302        self.pos
303            .get()?
304            .get_committed_block(h)
305            .map(|b| b.pivot_decision)
306    }
307
308    pub fn get_latest_pos_reference(&self) -> PosBlockId {
309        self.pos().latest_block()
310    }
311
312    pub fn get_pos_view(&self, h: &PosBlockId) -> Option<u64> {
313        self.pos().get_committed_block(h).map(|b| b.view)
314    }
315
316    pub fn get_unlock_nodes(
317        &self, h: &PosBlockId, parent_pos_ref: &PosBlockId,
318    ) -> Vec<(NodeId, u64)> {
319        let unlock_event_key = UnlockEvent::event_key();
320        let mut unlock_nodes = Vec::new();
321        for event in self.pos().get_events(parent_pos_ref, h) {
322            if *event.key() == unlock_event_key {
323                let unlock_event = UnlockEvent::from_bytes(event.event_data())
324                    .expect("key checked");
325                let node_id = H256::from_slice(unlock_event.node_id.as_ref());
326                let votes = unlock_event.unlocked;
327                unlock_nodes.push((node_id, votes));
328            }
329        }
330        unlock_nodes
331    }
332
333    pub fn get_disputed_nodes(
334        &self, h: &PosBlockId, parent_pos_ref: &PosBlockId,
335    ) -> Vec<NodeId> {
336        let dispute_event_key = DisputeEvent::event_key();
337        let mut disputed_nodes = Vec::new();
338        for event in self.pos().get_events(parent_pos_ref, h) {
339            if *event.key() == dispute_event_key {
340                let dispute_event =
341                    DisputeEvent::from_bytes(event.event_data())
342                        .expect("key checked");
343                disputed_nodes
344                    .push(H256::from_slice(dispute_event.node_id.as_ref()));
345            }
346        }
347        disputed_nodes
348    }
349
350    pub fn get_reward_distribution_event(
351        &self, h: &PosBlockId, parent_pos_ref: &PosBlockId,
352    ) -> Option<Vec<(u64, RewardDistributionEventV2)>> {
353        if h == parent_pos_ref {
354            return None;
355        }
356        let me_block = self.pos().get_committed_block(h)?;
357        let parent_block = self.pos().get_committed_block(parent_pos_ref)?;
358        if me_block.epoch == parent_block.epoch {
359            return None;
360        }
361        let mut events = Vec::new();
362        for epoch in parent_block.epoch..me_block.epoch {
363            events.push((epoch, self.pos().get_reward_event(epoch)?));
364        }
365        Some(events)
366    }
367
368    pub fn pos_ledger_db(&self) -> &Arc<PosLedgerDB> {
369        self.pos().pos_ledger_db()
370    }
371
372    pub fn consensus_db(&self) -> &Arc<ConsensusDB> {
373        self.pos().consensus_db()
374    }
375
376    pub fn cached_db(&self) -> &Arc<CachedPosLedgerDB> {
377        self.pos().cached_db()
378    }
379
380    pub fn stop(&self) -> Option<(Weak<PosLedgerDB>, Weak<ConsensusDB>)> {
381        self.network.lock().take();
382        self.consensus_network_receiver.lock().take();
383        self.mempool_network_receiver.lock().take();
384        self.drop_handle.lock().take().map(|pos_drop_handle| {
385            let pos_ledger_db = pos_drop_handle.pos_ledger_db.clone();
386            let consensus_db = pos_drop_handle.consensus_db.clone();
387            (
388                Arc::downgrade(&pos_ledger_db),
389                Arc::downgrade(&consensus_db),
390            )
391        })
392    }
393}
394
395/// The functions used in tests to construct attack cases
396impl PosHandler {
397    pub fn force_vote_proposal(&self, block_id: H256) -> anyhow::Result<()> {
398        self.test_command_sender
399            .lock()
400            .as_mut()
401            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
402            .try_send(TestCommand::ForceVoteProposal(h256_to_diem_hash(
403                &block_id,
404            )))
405            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
406    }
407
408    pub fn force_propose(
409        &self, round: U64, parent_block_id: H256,
410        payload: Vec<TransactionPayload>,
411    ) -> anyhow::Result<()> {
412        self.test_command_sender
413            .lock()
414            .as_mut()
415            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
416            .try_send(TestCommand::ForcePropose {
417                round: round.as_u64(),
418                parent_id: h256_to_diem_hash(&parent_block_id),
419                payload,
420            })
421            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
422    }
423
424    pub fn trigger_timeout(&self, timeout_type: String) -> anyhow::Result<()> {
425        let command = match timeout_type.as_str() {
426            "local" => TestCommand::LocalTimeout,
427            "proposal" => TestCommand::ProposalTimeOut,
428            "new_round" => TestCommand::NewRoundTimeout,
429            _ => anyhow::bail!("Unknown timeout type"),
430        };
431        self.test_command_sender
432            .lock()
433            .as_mut()
434            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
435            .try_send(command)
436            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
437    }
438
439    pub fn force_sign_pivot_decision(
440        &self, pivot_decision: PivotBlockDecision,
441    ) -> anyhow::Result<()> {
442        self.test_command_sender
443            .lock()
444            .as_mut()
445            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
446            .try_send(TestCommand::BroadcastPivotDecision(pivot_decision))
447            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))
448    }
449
450    pub fn get_chosen_proposal(&self) -> anyhow::Result<Option<Block>> {
451        let (tx, rx) = mpsc::sync_channel(1);
452        self.test_command_sender
453            .lock()
454            .as_mut()
455            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
456            .try_send(TestCommand::GetChosenProposal(tx))
457            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
458        rx.recv().map_err(|e| anyhow::anyhow!("recv: err={:?}", e))
459    }
460
461    pub fn stop_election(&self) -> anyhow::Result<Option<Round>> {
462        let (tx, rx) = mpsc::sync_channel(1);
463        self.test_command_sender
464            .lock()
465            .as_mut()
466            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
467            .try_send(TestCommand::StopElection(tx))
468            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
469        rx.recv().map_err(|e| anyhow::anyhow!("recv: err={:?}", e))
470    }
471
472    pub fn start_voting(&self, initialize: bool) -> anyhow::Result<()> {
473        let (tx, rx) = mpsc::sync_channel(1);
474        self.test_command_sender
475            .lock()
476            .as_mut()
477            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
478            .try_send(TestCommand::StartVoting((initialize, tx)))
479            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
480        rx.recv()?
481    }
482
483    pub fn stop_voting(&self) -> anyhow::Result<()> {
484        let (tx, rx) = mpsc::sync_channel(1);
485        self.test_command_sender
486            .lock()
487            .as_mut()
488            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
489            .try_send(TestCommand::StopVoting(tx))
490            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
491        rx.recv()?
492    }
493
494    pub fn voting_status(&self) -> anyhow::Result<bool> {
495        let (tx, rx) = mpsc::sync_channel(1);
496        self.test_command_sender
497            .lock()
498            .as_mut()
499            .ok_or(anyhow::anyhow!("Pos not initialized!"))?
500            .try_send(TestCommand::GetVotingStatus(tx))
501            .map_err(|e| anyhow::anyhow!("try_send: err={:?}", e))?;
502        Ok(rx.recv()?)
503    }
504}
505
506pub struct PosConnection {
507    pos_storage: Arc<PosLedgerDB>,
508    consensus_db: Arc<ConsensusDB>,
509    pos_cache_db: Arc<CachedPosLedgerDB>,
510}
511
512impl PosConnection {
513    pub fn new(
514        pos_storage: Arc<PosLedgerDB>, consensus_db: Arc<ConsensusDB>,
515        pos_cache_db: Arc<CachedPosLedgerDB>,
516    ) -> Self {
517        Self {
518            pos_storage,
519            consensus_db,
520            pos_cache_db,
521        }
522    }
523}
524
525impl PosInterface for PosConnection {
526    fn initialize(&self) -> Result<(), String> { Ok(()) }
527
528    fn get_committed_block(&self, h: &PosBlockId) -> Option<PosBlock> {
529        debug!("get_committed_block: {:?}", h);
530        let block_hash = h256_to_diem_hash(h);
531        let committed_block = self
532            .pos_storage
533            .get_committed_block_by_hash(&block_hash)
534            .ok()?;
535
536        /*
537        let parent;
538        let author;
539        if *h == PosBlockId::default() {
540            // genesis has no block, and its parent/author will not be used.
541            parent = PosBlockId::default();
542            author = NodeId::default();
543        } else {
544            let block = self
545                .pos_consensus_db
546                .get_ledger_block(&block_hash)
547                .map_err(|e| {
548                    warn!("get_committed_block: err={:?}", e);
549                    e
550                })
551                .ok()??;
552            debug_assert_eq!(block.id(), block_hash);
553            parent = diem_hash_to_h256(&block.parent_id());
554            // NIL block has no author.
555            author = H256::from_slice(block.author().unwrap_or(Default::default()).as_ref());
556        }
557         */
558        debug!("pos_handler gets committed_block={:?}", committed_block);
559        Some(PosBlock {
560            hash: *h,
561            epoch: committed_block.epoch,
562            round: committed_block.round,
563            pivot_decision: committed_block.pivot_decision.block_hash,
564            view: committed_block.view,
565            /* parent,
566             * author,
567             * voters: ledger_info
568             *     .signatures()
569             *     .keys()
570             *     .map(|author| H256::from_slice(author.as_ref()))
571             *     .collect(), */
572            version: committed_block.version,
573        })
574    }
575
576    fn latest_block(&self) -> PosBlockId {
577        diem_hash_to_h256(
578            &self
579                .pos_storage
580                .get_latest_ledger_info_option()
581                .expect("Initialized")
582                .ledger_info()
583                .consensus_block_id(),
584        )
585    }
586
587    fn get_events(
588        &self, from: &PosBlockId, to: &PosBlockId,
589    ) -> Vec<ContractEvent> {
590        let start_version = self
591            .pos_storage
592            .get_committed_block_by_hash(&h256_to_diem_hash(from))
593            .expect("err reading ledger info for from")
594            .version;
595        let end_version = self
596            .pos_storage
597            .get_committed_block_by_hash(&h256_to_diem_hash(to))
598            .expect("err reading ledger info for to")
599            .version;
600        self.pos_storage
601            .get_events_by_version(start_version, end_version)
602            .expect("err reading events")
603    }
604
605    fn get_epoch_ending_blocks(
606        &self, start_epoch: u64, end_epoch: u64,
607    ) -> Vec<PosBlockId> {
608        self.pos_storage
609            .get_epoch_ending_blocks(start_epoch, end_epoch)
610            .expect("err reading epoch ending blocks")
611            .into_iter()
612            .map(|h| diem_hash_to_h256(&h))
613            .collect()
614    }
615
616    fn get_reward_event(
617        &self, epoch: u64,
618    ) -> Option<RewardDistributionEventV2> {
619        self.pos_storage.get_reward_event(epoch).ok()
620    }
621
622    fn get_epoch_state(&self, block_id: &PosBlockId) -> EpochState {
623        self.pos_storage
624            .get_pos_state(&h256_to_diem_hash(block_id))
625            .expect("parent of an ending_epoch block")
626            .epoch_state()
627            .clone()
628    }
629
630    fn pos_ledger_db(&self) -> &Arc<PosLedgerDB> { &self.pos_storage }
631
632    fn consensus_db(&self) -> &Arc<ConsensusDB> { &self.consensus_db }
633
634    fn cached_db(&self) -> &Arc<CachedPosLedgerDB> { &self.pos_cache_db }
635}
636
637pub struct PosConfiguration {
638    pub bls_key: ConfigKey<ConsensusPrivateKey>,
639    pub vrf_key: ConfigKey<ConsensusVRFPrivateKey>,
640    pub diem_conf_path: Option<String>,
641    pub protocol_conf: ProtocolConfiguration,
642    pub pos_initial_nodes_path: String,
643    pub vrf_proposal_threshold: U256,
644    pub pos_state_config: PosStateConfig,
645}
646
647fn diem_hash_to_h256(h: &HashValue) -> PosBlockId { H256::from(h.as_ref()) }
648fn h256_to_diem_hash(h: &PosBlockId) -> HashValue {
649    HashValue::new(h.to_fixed_bytes())
650}
651
652pub fn save_initial_nodes_to_file(path: &str, genesis_nodes: GenesisPosState) {
653    fs::write(path, serde_json::to_string(&genesis_nodes).unwrap()).unwrap();
654}
655
656pub fn read_initial_nodes_from_file(
657    path: &str,
658) -> Result<GenesisPosState, String> {
659    let mut file = fs::File::open(path)
660        .map_err(|e| format!("failed to open initial nodes file: {:?}", e))?;
661
662    let mut nodes_str = String::new();
663    file.read_to_string(&mut nodes_str)
664        .map_err(|e| format!("failed to read initial nodes file: {:?}", e))?;
665
666    serde_json::from_str(nodes_str.as_str())
667        .map_err(|e| format!("failed to parse initial nodes file: {:?}", e))
668}