executor/
lib.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8#![forbid(unsafe_code)]
9
10use std::{
11    collections::{hash_map, BTreeMap, HashMap, HashSet},
12    convert::TryFrom,
13    marker::PhantomData,
14    sync::Arc,
15};
16
17use anyhow::{anyhow, bail, ensure, format_err, Result};
18use fail::fail_point;
19
20use cached_pos_ledger_db::CachedPosLedgerDB;
21use cfx_types::H256;
22use consensus_types::db::LedgerBlockRW;
23use diem_crypto::{
24    hash::{
25        CryptoHash, EventAccumulatorHasher, TransactionAccumulatorHasher,
26        PRE_GENESIS_BLOCK_ID,
27    },
28    HashValue,
29};
30use diem_logger::prelude::*;
31use diem_state_view::StateViewId;
32use diem_types::{
33    account_address::{AccountAddress, HashAccountAddress},
34    account_state::AccountState,
35    account_state_blob::AccountStateBlob,
36    block_info::PivotBlockDecision,
37    committed_block::CommittedBlock,
38    contract_event::ContractEvent,
39    epoch_state::EpochState,
40    ledger_info::LedgerInfoWithSignatures,
41    on_chain_config,
42    proof::accumulator::InMemoryAccumulator,
43    reward_distribution_event::{RewardDistributionEventV2, VoteCount},
44    term_state::{
45        ElectionEvent, PosState, RegisterEvent, RetireEvent,
46        UpdateVotingPowerEvent,
47    },
48    transaction::{
49        Transaction, TransactionInfo, TransactionListWithProof,
50        TransactionOutput, TransactionPayload, TransactionStatus,
51        TransactionToCommit, Version,
52    },
53    write_set::{WriteOp, WriteSet},
54};
55use executor_types::{
56    BlockExecutor, ChunkExecutor, Error, ExecutedTrees, ProcessedVMOutput,
57    ProofReader, StateComputeResult, TransactionData, TransactionReplayer,
58};
59use pow_types::PowInterface;
60use storage_interface::state_view::VerifiedStateView;
61
62use crate::{
63    logging::{LogEntry, LogSchema},
64    metrics::{
65        DIEM_EXECUTOR_COMMIT_BLOCKS_SECONDS, DIEM_EXECUTOR_ERRORS,
66        DIEM_EXECUTOR_EXECUTE_AND_COMMIT_CHUNK_SECONDS,
67        DIEM_EXECUTOR_EXECUTE_BLOCK_SECONDS,
68        DIEM_EXECUTOR_SAVE_TRANSACTIONS_SECONDS,
69        DIEM_EXECUTOR_TRANSACTIONS_SAVED,
70        DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS,
71    },
72    vm::VMExecutor,
73};
74use diem_types::term_state::{
75    pos_state_config::{PosStateConfigTrait, POS_STATE_CONFIG},
76    DisputeEvent,
77};
78
79pub mod db_bootstrapper;
80mod logging;
81mod metrics;
82pub mod vm;
83
84type SparseMerkleProof = diem_types::proof::SparseMerkleProof<AccountStateBlob>;
85
86/// `Executor` implements all functionalities the execution module needs to
87/// provide.
88pub struct Executor<V> {
89    db_with_cache: Arc<CachedPosLedgerDB>,
90    consensus_db: Arc<dyn LedgerBlockRW>,
91    phantom: PhantomData<V>,
92    pow_handler: Arc<dyn PowInterface>,
93}
94
95impl<V> Executor<V>
96where V: VMExecutor
97{
98    pub fn committed_block_id(&self) -> HashValue {
99        self.db_with_cache.committed_block_id()
100    }
101
102    /// Constructs an `Executor`.
103    pub fn new(
104        db_with_cache: Arc<CachedPosLedgerDB>,
105        pow_handler: Arc<dyn PowInterface>,
106        consensus_db: Arc<dyn LedgerBlockRW>,
107    ) -> Self {
108        Self {
109            db_with_cache,
110            consensus_db,
111            phantom: PhantomData,
112            pow_handler,
113        }
114    }
115
116    /// In case there is a new LI to be added to a LedgerStore, verify and
117    /// return it.
118    fn find_chunk_li(
119        verified_target_li: LedgerInfoWithSignatures,
120        epoch_change_li: Option<LedgerInfoWithSignatures>,
121        new_output: &ProcessedVMOutput,
122    ) -> Result<Option<LedgerInfoWithSignatures>> {
123        // If the chunk corresponds to the target LI, the target LI can be added
124        // to storage.
125        if verified_target_li.ledger_info().version()
126            == new_output.version().unwrap_or(0)
127        {
128            ensure!(
129                verified_target_li
130                    .ledger_info()
131                    .transaction_accumulator_hash()
132                    == new_output.accu_root(),
133                "Root hash in target ledger info does not match local computation."
134            );
135            return Ok(Some(verified_target_li));
136        }
137        // If the epoch change LI is present, it must match the version of the
138        // chunk: verify the version and the root hash.
139        if let Some(epoch_change_li) = epoch_change_li {
140            // Verify that the given ledger info corresponds to the new
141            // accumulator.
142            ensure!(
143                epoch_change_li.ledger_info().transaction_accumulator_hash()
144                    == new_output.accu_root(),
145                "Root hash of a given epoch LI does not match local computation."
146            );
147            ensure!(
148                epoch_change_li.ledger_info().version()
149                    == new_output.version().unwrap_or(0),
150                "Version of a given epoch LI does not match local computation."
151            );
152            ensure!(
153                epoch_change_li.ledger_info().ends_epoch(),
154                "Epoch change LI does not carry validator set"
155            );
156            ensure!(
157                epoch_change_li.ledger_info().next_epoch_state()
158                    == new_output.epoch_state().as_ref(),
159                "New validator set of a given epoch LI does not match local computation"
160            );
161            return Ok(Some(epoch_change_li));
162        }
163        ensure!(
164            new_output.epoch_state().is_none(),
165            "End of epoch chunk based on local computation but no EoE LedgerInfo provided."
166        );
167        Ok(None)
168    }
169
170    /// Verify input chunk and return transactions to be applied, skipping those
171    /// already persisted. Specifically:
172    ///  1. Verify that input transactions belongs to the ledger represented by
173    /// the ledger info.  2. Verify that transactions to skip match what's
174    /// already persisted (no fork).  3. Return Transactions to be applied.
175    fn verify_chunk(
176        &self, txn_list_with_proof: TransactionListWithProof,
177        verified_target_li: &LedgerInfoWithSignatures,
178    ) -> Result<(Vec<Transaction>, Vec<TransactionInfo>)> {
179        // 1. Verify that input transactions belongs to the ledger represented
180        // by the ledger info.
181        txn_list_with_proof.verify(
182            verified_target_li.ledger_info(),
183            txn_list_with_proof.first_transaction_version,
184        )?;
185
186        // Return empty if there's no work to do.
187        if txn_list_with_proof.transactions.is_empty() {
188            return Ok((Vec::new(), Vec::new()));
189        }
190        let first_txn_version =
191            match txn_list_with_proof.first_transaction_version {
192                Some(tx) => tx as Version,
193                None => {
194                    bail!(
195                        "first_transaction_version doesn't exist in {:?}",
196                        txn_list_with_proof
197                    );
198                }
199            };
200
201        let num_committed_txns = self
202            .db_with_cache
203            .cache
204            .lock()
205            .synced_trees()
206            .txn_accumulator()
207            .num_leaves();
208        ensure!(
209            first_txn_version <= num_committed_txns,
210            "Transaction list too new. Expected version: {}. First transaction version: {}.",
211            num_committed_txns,
212            first_txn_version
213        );
214        let versions_between_first_and_committed =
215            num_committed_txns - first_txn_version;
216        if txn_list_with_proof.transactions.len()
217            <= versions_between_first_and_committed as usize
218        {
219            // All already in DB, nothing to do.
220            return Ok((Vec::new(), Vec::new()));
221        }
222
223        // 2. Verify that skipped transactions match what's already persisted
224        // (no fork):
225        let num_txns_to_skip = num_committed_txns - first_txn_version;
226
227        diem_debug!(
228            LogSchema::new(LogEntry::ChunkExecutor).num(num_txns_to_skip),
229            "skipping_chunk_txns"
230        );
231
232        // If the proof is verified, then the length of txn_infos and txns must
233        // be the same.
234        let skipped_transaction_infos = &txn_list_with_proof
235            .proof
236            .transaction_infos()[..num_txns_to_skip as usize];
237
238        // Left side of the proof happens to be the frozen subtree roots of the
239        // accumulator right before the list of txns are applied.
240        let frozen_subtree_roots_from_proof = txn_list_with_proof
241            .proof
242            .left_siblings()
243            .iter()
244            .rev()
245            .cloned()
246            .collect::<Vec<_>>();
247        let accu_from_proof =
248            InMemoryAccumulator::<TransactionAccumulatorHasher>::new(
249                frozen_subtree_roots_from_proof,
250                first_txn_version,
251            )?
252            .append(
253                &skipped_transaction_infos
254                    .iter()
255                    .map(CryptoHash::hash)
256                    .collect::<Vec<_>>()[..],
257            );
258        // The two accumulator root hashes should be identical.
259        ensure!(
260            self.db_with_cache.cache.lock().synced_trees().state_id() == accu_from_proof.root_hash(),
261            "Fork happens because the current synced_trees doesn't match the txn list provided."
262        );
263
264        // 3. Return verified transactions to be applied.
265        let mut txns: Vec<_> = txn_list_with_proof.transactions;
266        txns.drain(0..num_txns_to_skip as usize);
267        let (_, mut txn_infos) = txn_list_with_proof.proof.unpack();
268        txn_infos.drain(0..num_txns_to_skip as usize);
269
270        Ok((txns, txn_infos))
271    }
272
273    /// Post-processing of what the VM outputs. Returns the entire block's
274    /// output.
275    fn process_vm_outputs(
276        &self, mut account_to_state: HashMap<AccountAddress, AccountState>,
277        account_to_proof: HashMap<HashValue, SparseMerkleProof>,
278        transactions: &[Transaction], vm_outputs: Vec<TransactionOutput>,
279        parent_trees: &ExecutedTrees, parent_block_id: &HashValue,
280        catch_up_mode: bool,
281    ) -> Result<ProcessedVMOutput> {
282        // The data of each individual transaction. For convenience purpose,
283        // even for the transactions that will be discarded, we will
284        // compute its in-memory Sparse Merkle Tree (it will be
285        // identical to the previous one).
286        let mut txn_data = vec![];
287        // The hash of each individual TransactionInfo object. This will not
288        // include the transactions that will be discarded, since they
289        // do not go into the transaction accumulator.
290        let mut txn_info_hashes = vec![];
291
292        let proof_reader = ProofReader::new(account_to_proof);
293        let pivot_select_event_key =
294            PivotBlockDecision::pivot_select_event_key();
295        let election_event_key = ElectionEvent::event_key();
296        let retire_event_key = RetireEvent::event_key();
297        let register_event_key = RegisterEvent::event_key();
298        let update_voting_power_event_key = UpdateVotingPowerEvent::event_key();
299        let dispute_event_key = DisputeEvent::event_key();
300
301        // Find the next pivot block.
302        let mut pivot_decision = None;
303        let mut new_pos_state = parent_trees.pos_state().clone();
304        let parent_pivot_decision = new_pos_state.pivot_decision().clone();
305        for vm_output in vm_outputs.clone().into_iter() {
306            for event in vm_output.events() {
307                // check for pivot block selection.
308                if *event.key() == pivot_select_event_key {
309                    if pivot_decision.is_some() {
310                        bail!("Multiple pivot decisions in one block!");
311                    }
312                    pivot_decision = Some(PivotBlockDecision::from_bytes(
313                        event.event_data(),
314                    )?);
315                } else if *event.key() == election_event_key {
316                    let election_event =
317                        ElectionEvent::from_bytes(event.event_data())?;
318                    new_pos_state.new_node_elected(&election_event)?;
319                } else if *event.key() == dispute_event_key {
320                    let dispute_event =
321                        DisputeEvent::from_bytes(event.event_data())?;
322                    new_pos_state.forfeit_node(&dispute_event.node_id)?;
323                }
324            }
325        }
326
327        if *parent_block_id != *PRE_GENESIS_BLOCK_ID {
328            if let Some(pivot_decision) = &pivot_decision {
329                diem_debug!(
330                    "process_vm_outputs: parent={:?} parent_pivot={:?}",
331                    parent_block_id,
332                    parent_pivot_decision
333                );
334
335                // The check and event processing below will be skipped during
336                // PoS catching up, because pow has not processed these pivot
337                // decisions.
338                if !catch_up_mode {
339                    if !self.pow_handler.validate_proposal_pivot_decision(
340                        parent_pivot_decision.block_hash,
341                        pivot_decision.block_hash,
342                    ) {
343                        bail!("Invalid pivot decision for block");
344                    }
345
346                    // Verify if the proposer has packed all staking events as
347                    // expected.
348                    diem_debug!(
349                        "check staking events: parent={:?} me={:?}",
350                        parent_pivot_decision,
351                        pivot_decision
352                    );
353                    let staking_events = self.pow_handler.get_staking_events(
354                        parent_pivot_decision.height,
355                        pivot_decision.height,
356                        parent_pivot_decision.block_hash,
357                        pivot_decision.block_hash,
358                    )?;
359                    let mut staking_events_iter = staking_events.iter();
360                    for vm_output in vm_outputs.clone().into_iter() {
361                        for event in vm_output.events() {
362                            // check for pivot block selection.
363                            if *event.key() == register_event_key {
364                                let register_event = RegisterEvent::from_bytes(
365                                    event.event_data(),
366                                )?;
367                                match register_event.matches_staking_event(staking_events_iter.next().ok_or(anyhow!("More staking transactions packed than actual pow events"))?) {
368                                    Ok(true) => {}
369                                    Ok(false) => bail!("Packed staking transactions unmatch PoW events)"),
370                                    Err(e) => diem_error!("error decoding pow events: err={:?}", e),
371                                }
372                                new_pos_state
373                                    .register_node(register_event.node_id)?;
374                            } else if *event.key()
375                                == update_voting_power_event_key
376                            {
377                                let update_voting_power_event =
378                                    UpdateVotingPowerEvent::from_bytes(
379                                        event.event_data(),
380                                    )?;
381                                match update_voting_power_event.matches_staking_event(staking_events_iter.next().ok_or(anyhow!("More staking transactions packed than actual pow events"))?) {
382                                    Ok(true) => {}
383                                    Ok(false) => bail!("Packed staking transactions unmatch PoW events)"),
384                                    Err(e) => diem_error!("error decoding pow events: err={:?}", e),
385                                }
386                                new_pos_state.update_voting_power(
387                                    &update_voting_power_event.node_address,
388                                    update_voting_power_event.voting_power,
389                                )?;
390                            } else if *event.key() == retire_event_key {
391                                let retire_event = RetireEvent::from_bytes(
392                                    event.event_data(),
393                                )?;
394                                match retire_event.matches_staking_event(staking_events_iter.next().ok_or(anyhow!("More staking transactions packed than actual pow events"))?) {
395                                    Ok(true) => {}
396                                    Ok(false) => bail!("Packed staking transactions unmatch PoW events)"),
397                                    Err(e) => diem_error!("error decoding pow events: err={:?}", e),
398                                }
399                                new_pos_state.retire_node(
400                                    &retire_event.node_id,
401                                    retire_event.votes,
402                                )?;
403                            }
404                        }
405                    }
406                    ensure!(
407                        staking_events_iter.next().is_none(),
408                        "Not all PoW staking events are packed"
409                    );
410                } else {
411                    for vm_output in vm_outputs.clone().into_iter() {
412                        for event in vm_output.events() {
413                            // check for pivot block selection.
414                            if *event.key() == register_event_key {
415                                let register_event = RegisterEvent::from_bytes(
416                                    event.event_data(),
417                                )?;
418                                new_pos_state
419                                    .register_node(register_event.node_id)?;
420                            } else if *event.key()
421                                == update_voting_power_event_key
422                            {
423                                let update_voting_power_event =
424                                    UpdateVotingPowerEvent::from_bytes(
425                                        event.event_data(),
426                                    )?;
427                                new_pos_state.update_voting_power(
428                                    &update_voting_power_event.node_address,
429                                    update_voting_power_event.voting_power,
430                                )?;
431                            } else if *event.key() == retire_event_key {
432                                let retire_event = RetireEvent::from_bytes(
433                                    event.event_data(),
434                                )?;
435                                new_pos_state.retire_node(
436                                    &retire_event.node_id,
437                                    retire_event.votes,
438                                )?;
439                            }
440                        }
441                    }
442                }
443            } else {
444                // No new pivot decision, so there should be no staking-related
445                // transactions.
446                if vm_outputs.iter().any(|output| {
447                    output.events().iter().any(|event| {
448                        *event.key() == retire_event_key
449                            || *event.key() == update_voting_power_event_key
450                    })
451                }) {
452                    bail!("Should not pack staking related transactions");
453                }
454                pivot_decision = Some(parent_pivot_decision);
455            }
456        }
457        // TODO(lpl): This is only for pos-tool
458        if let Some(pivot_decision) = &pivot_decision {
459            new_pos_state.set_pivot_decision(pivot_decision.clone());
460        }
461        let mut next_epoch_state = new_pos_state.next_view()?;
462
463        let txn_blobs =
464            itertools::zip_eq(vm_outputs.iter(), transactions.iter())
465                .map(|(vm_output, txn)| {
466                    process_write_set(
467                        txn,
468                        &mut account_to_state,
469                        vm_output.write_set().clone(),
470                    )
471                })
472                .collect::<Result<Vec<_>>>()?;
473
474        let (txn_state_roots, current_state_tree) = parent_trees
475            .state_tree()
476            .batch_update(
477                txn_blobs
478                    .iter()
479                    .map(|m| {
480                        m.iter()
481                            .map(|(account, value)| (account.hash(), value))
482                            .collect::<Vec<_>>()
483                    })
484                    .collect(),
485                &proof_reader,
486            )
487            .expect("Failed to update state tree.");
488
489        for ((vm_output, txn), (mut state_tree_hash, blobs)) in
490            itertools::zip_eq(
491                itertools::zip_eq(vm_outputs.into_iter(), transactions.iter()),
492                itertools::zip_eq(txn_state_roots, txn_blobs),
493            )
494        {
495            // Not genesis transactions.
496            diem_debug!(
497                "process_vm_outputs: {} {:?}",
498                parent_trees.txn_accumulator().version(),
499                state_tree_hash
500            );
501            if parent_trees.txn_accumulator().version() != 0 {
502                // TODO(lpl): Remove state tree.
503                state_tree_hash = Default::default();
504            }
505            let event_tree = {
506                let event_hashes: Vec<_> =
507                    vm_output.events().iter().map(CryptoHash::hash).collect();
508                InMemoryAccumulator::<EventAccumulatorHasher>::from_leaves(
509                    &event_hashes,
510                )
511            };
512
513            let mut txn_info_hash = None;
514            match vm_output.status() {
515                TransactionStatus::Keep(status) => {
516                    // ensure!(
517                    //     !vm_output.write_set().is_empty(),
518                    //     "Transaction with empty write set should be
519                    // discarded.", );
520                    // Compute hash for the TransactionInfo object. We need the
521                    // hash of the transaction itself, the
522                    // state root hash as well as the event root hash.
523                    let txn_info = TransactionInfo::new(
524                        txn.hash(),
525                        state_tree_hash,
526                        event_tree.root_hash(),
527                        vm_output.gas_used(),
528                        status.clone(),
529                    );
530
531                    let real_txn_info_hash = txn_info.hash();
532                    txn_info_hashes.push(real_txn_info_hash);
533                    txn_info_hash = Some(real_txn_info_hash);
534                }
535                TransactionStatus::Discard(status) => {
536                    if !vm_output.write_set().is_empty()
537                        || !vm_output.events().is_empty()
538                    {
539                        diem_error!(
540                            "Discarded transaction has non-empty write set or events. \
541                             Transaction: {:?}. Status: {:?}.",
542                            txn, status,
543                        );
544                        DIEM_EXECUTOR_ERRORS.inc();
545                    }
546                }
547                TransactionStatus::Retry => (),
548            }
549
550            txn_data.push(TransactionData::new(
551                blobs,
552                vm_output.events().to_vec(),
553                vm_output.status().clone(),
554                state_tree_hash,
555                Arc::new(event_tree),
556                vm_output.gas_used(),
557                txn_info_hash,
558            ));
559        }
560
561        // TODO(lpl): For genesis.
562        if next_epoch_state.is_some()
563            && next_epoch_state.as_ref().unwrap().epoch == 1
564        {
565            // Pad the rest of transactions
566            txn_data.resize(
567                transactions.len(),
568                TransactionData::new(
569                    HashMap::new(),
570                    vec![],
571                    TransactionStatus::Retry,
572                    current_state_tree.root_hash(),
573                    Arc::new(
574                        InMemoryAccumulator::<EventAccumulatorHasher>::default(
575                        ),
576                    ),
577                    0,
578                    None,
579                ),
580            );
581
582            let validator_set = account_to_state
583                .get(&on_chain_config::config_address())
584                .map(|state| {
585                    state.get_validator_set()?.ok_or_else(|| {
586                        format_err!("ValidatorSet does not exist")
587                    })
588                })
589                .ok_or_else(|| {
590                    format_err!("ValidatorSet account does not exist")
591                })??;
592            /*let configuration = account_to_state
593            .get(&on_chain_config::config_address())
594            .map(|state| {
595                state.get_configuration_resource()?.ok_or_else(|| {
596                    format_err!("Configuration does not exist")
597                })
598            })
599            .ok_or_else(|| {
600                format_err!("Association account does not exist")
601            })??;*/
602            next_epoch_state = Some(EpochState::new(
603                // TODO(lpl): This is only used for genesis, and after
604                // executing the genesis block, the epoch
605                // number should be increased from 0 to 1.
606                1,
607                (&validator_set).into(),
608                pivot_decision
609                    .as_ref()
610                    .map(|p| p.block_hash.as_bytes().to_vec())
611                    .unwrap_or(vec![]),
612            ))
613        };
614
615        let current_transaction_accumulator =
616            parent_trees.txn_accumulator().append(&txn_info_hashes);
617
618        Ok(ProcessedVMOutput::new(
619            txn_data,
620            ExecutedTrees::new_copy(
621                Arc::new(current_state_tree),
622                Arc::new(current_transaction_accumulator),
623                new_pos_state,
624            ),
625            next_epoch_state,
626            // TODO(lpl): Check if we need to assert it's Some.
627            pivot_decision,
628        ))
629    }
630
631    fn extract_reconfig_events(
632        events: Vec<ContractEvent>,
633    ) -> Vec<ContractEvent> {
634        let new_epoch_event_key = on_chain_config::new_epoch_event_key();
635        events
636            .into_iter()
637            .filter(|event| *event.key() == new_epoch_event_key)
638            .collect()
639    }
640
641    fn get_executed_trees(
642        &self, block_id: HashValue,
643    ) -> Result<ExecutedTrees, Error> {
644        let executed_trees = if block_id
645            == self.db_with_cache.cache.lock().committed_block_id()
646        {
647            self.db_with_cache.cache.lock().committed_trees().clone()
648        } else {
649            self.db_with_cache
650                .get_block(&block_id)?
651                .lock()
652                .output()
653                .executed_trees()
654                .clone()
655        };
656
657        Ok(executed_trees)
658    }
659
660    fn get_executed_state_view<'a>(
661        &self, id: StateViewId, executed_trees: &'a ExecutedTrees,
662    ) -> VerifiedStateView<'a> {
663        let cache = self.db_with_cache.cache.lock();
664        VerifiedStateView::new(
665            id,
666            Arc::clone(&self.db_with_cache.db.reader),
667            cache.committed_trees().version(),
668            cache.committed_trees().state_root(),
669            executed_trees.state_tree(),
670            executed_trees.pos_state().clone(),
671        )
672    }
673
674    fn replay_transactions_impl(
675        &self, first_version: u64, transactions: Vec<Transaction>,
676        transaction_infos: Vec<TransactionInfo>,
677    ) -> Result<(
678        ProcessedVMOutput,
679        Vec<TransactionToCommit>,
680        Vec<ContractEvent>,
681        Vec<Transaction>,
682        Vec<TransactionInfo>,
683    )> {
684        // Construct a StateView and pass the transactions to VM.
685        let cache = self.db_with_cache.cache.lock();
686        let state_view = VerifiedStateView::new(
687            StateViewId::ChunkExecution { first_version },
688            Arc::clone(&self.db_with_cache.db.reader),
689            cache.synced_trees().version(),
690            cache.synced_trees().state_root(),
691            cache.synced_trees().state_tree(),
692            // TODO(lpl): State sync not used yet.
693            PosState::new_empty(),
694        );
695
696        fail_point!("executor::vm_execute_chunk", |_| {
697            Err(anyhow::anyhow!("Injected error in execute_chunk"))
698        });
699        let vm_outputs =
700            V::execute_block(transactions.clone(), &state_view, true)?;
701
702        // Since other validators have committed these transactions, their
703        // status should all be TransactionStatus::Keep.
704        for output in &vm_outputs {
705            if let TransactionStatus::Discard(_) = output.status() {
706                bail!("Syncing transactions that should be discarded.");
707            }
708        }
709
710        let (account_to_state, account_to_proof) = state_view.into();
711
712        let output = self.process_vm_outputs(
713            account_to_state,
714            account_to_proof,
715            &transactions,
716            vm_outputs,
717            cache.synced_trees(),
718            // TODO(lpl): This function is not used.
719            &HashValue::zero(),
720            true,
721        )?;
722
723        // Since we have verified the proofs, we just need to verify that each
724        // TransactionInfo object matches what we have computed locally.
725        let mut txns_to_commit = vec![];
726        let mut reconfig_events = vec![];
727        let mut seen_retry = false;
728        let mut txns_to_retry = vec![];
729        let mut txn_infos_to_retry = vec![];
730        for ((txn, txn_data), (i, txn_info)) in itertools::zip_eq(
731            itertools::zip_eq(transactions, output.transaction_data()),
732            transaction_infos.into_iter().enumerate(),
733        ) {
734            let recorded_status = match txn_data.status() {
735                TransactionStatus::Keep(recorded_status) => recorded_status.clone(),
736                status @ TransactionStatus::Discard(_) => bail!(
737                    "The transaction at version {}, got the status of 'Discard': {:?}",
738                    first_version
739                        .checked_add(i as u64)
740                        .ok_or_else(|| format_err!("version + i overflows"))?,
741                    status
742                ),
743                TransactionStatus::Retry => {
744                    seen_retry = true;
745                    txns_to_retry.push(txn);
746                    txn_infos_to_retry.push(txn_info);
747                    continue;
748                }
749            };
750            assert!(!seen_retry);
751            let generated_txn_info = TransactionInfo::new(
752                txn.hash(),
753                txn_data.state_root_hash(),
754                txn_data.event_root_hash(),
755                txn_data.gas_used(),
756                recorded_status.clone(),
757            );
758            ensure!(
759                txn_info == generated_txn_info,
760                "txn_info do not match for {}-th transaction in chunk.\nChunk txn_info: {}\nProof txn_info: {}",
761                i, generated_txn_info, txn_info
762            );
763            txns_to_commit.push(TransactionToCommit::new(
764                txn,
765                txn_data.account_blobs().clone(),
766                txn_data.events().to_vec(),
767                txn_data.gas_used(),
768                recorded_status,
769            ));
770            reconfig_events.append(&mut Self::extract_reconfig_events(
771                txn_data.events().to_vec(),
772            ));
773        }
774
775        Ok((
776            output,
777            txns_to_commit,
778            reconfig_events,
779            txns_to_retry,
780            txn_infos_to_retry,
781        ))
782    }
783
784    fn execute_chunk(
785        &self, first_version: u64, transactions: Vec<Transaction>,
786        transaction_infos: Vec<TransactionInfo>,
787    ) -> Result<(
788        ProcessedVMOutput,
789        Vec<TransactionToCommit>,
790        Vec<ContractEvent>,
791    )> {
792        let num_txns = transactions.len();
793
794        let (
795            processed_vm_output,
796            txns_to_commit,
797            events,
798            txns_to_retry,
799            _txn_infos_to_retry,
800        ) = self.replay_transactions_impl(
801            first_version,
802            transactions,
803            transaction_infos,
804        )?;
805
806        ensure!(
807            txns_to_retry.is_empty(),
808            "The transaction at version {} got the status of 'Retry'",
809            num_txns
810                .checked_sub(txns_to_retry.len())
811                .ok_or_else(|| format_err!("integer overflow occurred"))?
812                .checked_add(first_version as usize)
813                .ok_or_else(|| format_err!("integer overflow occurred"))?,
814        );
815
816        Ok((processed_vm_output, txns_to_commit, events))
817    }
818}
819
820impl<V: VMExecutor> ChunkExecutor for Executor<V> {
821    fn execute_and_commit_chunk(
822        &self,
823        txn_list_with_proof: TransactionListWithProof,
824        // Target LI that has been verified independently: the proofs are
825        // relative to this version.
826        verified_target_li: LedgerInfoWithSignatures,
827        // An optional end of epoch LedgerInfo. We do not allow chunks that end
828        // epoch without carrying any epoch change LI.
829        epoch_change_li: Option<LedgerInfoWithSignatures>,
830    ) -> Result<Vec<ContractEvent>> {
831        let _timer =
832            DIEM_EXECUTOR_EXECUTE_AND_COMMIT_CHUNK_SECONDS.start_timer();
833        // 1. Update the cache in executor to be consistent with latest synced
834        // state.
835        // self.reset_cache()?;
836
837        diem_info!(
838            LogSchema::new(LogEntry::ChunkExecutor)
839                .local_synced_version(
840                    self.db_with_cache
841                        .cache
842                        .lock()
843                        .synced_trees()
844                        .txn_accumulator()
845                        .num_leaves()
846                        - 1
847                )
848                .first_version_in_request(
849                    txn_list_with_proof.first_transaction_version
850                )
851                .num_txns_in_request(txn_list_with_proof.transactions.len()),
852            "sync_request_received",
853        );
854
855        // 2. Verify input transaction list.
856        let (transactions, transaction_infos) =
857            self.verify_chunk(txn_list_with_proof, &verified_target_li)?;
858
859        // 3. Execute transactions.
860        let first_version = self
861            .db_with_cache
862            .cache
863            .lock()
864            .synced_trees()
865            .txn_accumulator()
866            .num_leaves();
867        let (output, txns_to_commit, reconfig_events) =
868            self.execute_chunk(first_version, transactions, transaction_infos)?;
869
870        // 4. Commit to DB.
871        let ledger_info_to_commit =
872            Self::find_chunk_li(verified_target_li, epoch_change_li, &output)?;
873        if ledger_info_to_commit.is_none() && txns_to_commit.is_empty() {
874            return Ok(reconfig_events);
875        }
876        fail_point!("executor::commit_chunk", |_| {
877            Err(anyhow::anyhow!("Injected error in commit_chunk"))
878        });
879        self.db_with_cache.db.writer.save_transactions(
880            &txns_to_commit,
881            first_version,
882            ledger_info_to_commit.as_ref(),
883            None,
884            vec![],
885            vec![],
886        )?;
887
888        // 5. Cache maintenance.
889        let output_trees = output.executed_trees().clone();
890        if let Some(ledger_info_with_sigs) = &ledger_info_to_commit {
891            self.db_with_cache.update_block_tree_root(
892                output_trees,
893                ledger_info_with_sigs.ledger_info(),
894                vec![],
895                vec![],
896            );
897        } else {
898            self.db_with_cache.update_synced_trees(output_trees);
899        }
900        self.db_with_cache.reset();
901
902        diem_info!(
903            LogSchema::new(LogEntry::ChunkExecutor)
904                .synced_to_version(
905                    self.db_with_cache
906                        .cache
907                        .lock()
908                        .synced_trees()
909                        .version()
910                        .expect("version must exist")
911                )
912                .committed_with_ledger_info(ledger_info_to_commit.is_some()),
913            "sync_finished",
914        );
915
916        Ok(reconfig_events)
917    }
918}
919
920impl<V: VMExecutor> TransactionReplayer for Executor<V> {
921    fn replay_chunk(
922        &self, mut first_version: Version, mut txns: Vec<Transaction>,
923        mut txn_infos: Vec<TransactionInfo>,
924    ) -> Result<()> {
925        ensure!(
926            first_version
927                == self
928                    .db_with_cache
929                    .cache
930                    .lock()
931                    .synced_trees()
932                    .txn_accumulator()
933                    .num_leaves(),
934            "Version not expected. Expected: {}, got: {}",
935            self.db_with_cache
936                .cache
937                .lock()
938                .synced_trees()
939                .txn_accumulator()
940                .num_leaves(),
941            first_version,
942        );
943        while !txns.is_empty() {
944            let num_txns = txns.len();
945
946            let (output, txns_to_commit, _, txns_to_retry, txn_infos_to_retry) =
947                self.replay_transactions_impl(first_version, txns, txn_infos)?;
948            assert!(txns_to_retry.len() < num_txns);
949
950            self.db_with_cache.db.writer.save_transactions(
951                &txns_to_commit,
952                first_version,
953                None,
954                None,
955                vec![],
956                vec![],
957            )?;
958
959            self.db_with_cache
960                .update_synced_trees(output.executed_trees().clone());
961
962            txns = txns_to_retry;
963            txn_infos = txn_infos_to_retry;
964            first_version += txns_to_commit.len() as u64;
965        }
966        Ok(())
967    }
968
969    fn expecting_version(&self) -> Version {
970        self.db_with_cache
971            .cache
972            .lock()
973            .synced_trees()
974            .version()
975            .map_or(0, |v| v.checked_add(1).expect("Integer overflow occurred"))
976    }
977}
978
979impl<V: VMExecutor> BlockExecutor for Executor<V> {
980    fn committed_block_id(&self) -> Result<HashValue, Error> {
981        Ok(self.committed_block_id())
982    }
983
984    fn execute_block(
985        &self, block: (HashValue, Vec<Transaction>),
986        parent_block_id: HashValue, catch_up_mode: bool,
987    ) -> Result<StateComputeResult, Error> {
988        let (block_id, mut transactions) = block;
989
990        // Reconfiguration rule - if a block is a child of pending
991        // reconfiguration, it needs to be empty So we roll over the
992        // executed state until it's committed and we start new epoch.
993        let (output, state_compute_result) = if parent_block_id
994            != self.committed_block_id()
995            && self
996                .db_with_cache
997                .get_block(&parent_block_id)?
998                .lock()
999                .output()
1000                .has_reconfiguration()
1001        {
1002            let parent = self.db_with_cache.get_block(&parent_block_id)?;
1003            let parent_block = parent.lock();
1004            let parent_output = parent_block.output();
1005
1006            diem_info!(
1007                LogSchema::new(LogEntry::BlockExecutor).block_id(block_id),
1008                "reconfig_descendant_block_received"
1009            );
1010
1011            let mut output = ProcessedVMOutput::new(
1012                vec![],
1013                parent_output.executed_trees().clone(),
1014                parent_output.epoch_state().clone(),
1015                // The block has no pivot decision transaction, so it's the
1016                // same as the parent.
1017                parent_output.pivot_block().clone(),
1018            );
1019            output.set_pos_state_skipped();
1020
1021            let parent_accu = parent_output.executed_trees().txn_accumulator();
1022            let state_compute_result = output.compute_result(
1023                parent_accu.frozen_subtree_roots().clone(),
1024                parent_accu.num_leaves(),
1025            );
1026
1027            // Reset the reconfiguration suffix transactions to empty list.
1028            transactions = vec![];
1029
1030            (output, state_compute_result)
1031        } else {
1032            diem_info!(
1033                LogSchema::new(LogEntry::BlockExecutor).block_id(block_id),
1034                "execute_block"
1035            );
1036
1037            let _timer = DIEM_EXECUTOR_EXECUTE_BLOCK_SECONDS.start_timer();
1038
1039            let parent_block_executed_trees =
1040                self.get_executed_trees(parent_block_id)?;
1041
1042            let state_view = self.get_executed_state_view(
1043                StateViewId::BlockExecution { block_id },
1044                &parent_block_executed_trees,
1045            );
1046
1047            // FIXME(lpl): Check the error processing in `execute_block`,
1048            // `process_vm_outputs`, and transaction packing. We
1049            // need to ensure that there is no packing behavior that
1050            // makes all new proposals invalid during execution.
1051            let vm_outputs = {
1052                // trace_code_block!("executor::execute_block", {"block",
1053                // block_id});
1054                let _timer =
1055                    DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.start_timer();
1056                fail_point!("executor::vm_execute_block", |_| {
1057                    Err(Error::from(anyhow::anyhow!(
1058                        "Injected error in vm_execute_block"
1059                    )))
1060                });
1061                V::execute_block(
1062                    transactions.clone(),
1063                    &state_view,
1064                    catch_up_mode,
1065                )
1066                .map_err(anyhow::Error::from)?
1067            };
1068
1069            // trace_code_block!("executor::process_vm_outputs", {"block",
1070            // block_id});
1071            let status: Vec<_> = vm_outputs
1072                .iter()
1073                .map(TransactionOutput::status)
1074                .cloned()
1075                .collect();
1076            if !status.is_empty() {
1077                diem_trace!("Execution status: {:?}", status);
1078            }
1079
1080            let (account_to_state, account_to_proof) = state_view.into();
1081
1082            let output = self
1083                .process_vm_outputs(
1084                    account_to_state,
1085                    account_to_proof,
1086                    &transactions,
1087                    vm_outputs,
1088                    &parent_block_executed_trees,
1089                    &parent_block_id,
1090                    catch_up_mode,
1091                )
1092                .map_err(|err| {
1093                    format_err!("Failed to execute block: {}", err)
1094                })?;
1095
1096            let parent_accu = parent_block_executed_trees.txn_accumulator();
1097
1098            diem_debug!("parent leaves: {}", parent_accu.num_leaves());
1099            let state_compute_result = output.compute_result(
1100                parent_accu.frozen_subtree_roots().clone(),
1101                parent_accu.num_leaves(),
1102            );
1103            (output, state_compute_result)
1104        };
1105
1106        // Add the output to the speculation_output_tree
1107        self.db_with_cache
1108            .add_block(parent_block_id, (block_id, transactions, output))?;
1109
1110        Ok(state_compute_result)
1111    }
1112
1113    fn commit_blocks(
1114        &self, block_ids: Vec<HashValue>,
1115        ledger_info_with_sigs: LedgerInfoWithSignatures,
1116    ) -> Result<(Vec<Transaction>, Vec<ContractEvent>), Error> {
1117        let _timer = DIEM_EXECUTOR_COMMIT_BLOCKS_SECONDS.start_timer();
1118        let mut pos_state_to_commit = self
1119            .get_executed_trees(
1120                ledger_info_with_sigs.ledger_info().consensus_block_id(),
1121            )?
1122            .pos_state()
1123            .clone();
1124
1125        // TODO(lpl): Implement force_retire better?
1126        // Process pos_state to apply force_retire.
1127        if ledger_info_with_sigs.ledger_info().ends_epoch()
1128            && ledger_info_with_sigs.ledger_info().epoch() != 0
1129        {
1130            let ending_block =
1131                ledger_info_with_sigs.ledger_info().consensus_block_id();
1132            let mut elected = BTreeMap::new();
1133            let mut voted_block_id = ending_block;
1134            // `self.cache.committed_trees` should be within this epoch and
1135            // before ending_block.
1136            let verifier = self
1137                .db_with_cache
1138                .cache
1139                .lock()
1140                .committed_trees()
1141                .pos_state()
1142                .epoch_state()
1143                .verifier()
1144                // Clone to avoid possible deadlock.
1145                .clone();
1146            for committee_member in verifier.address_to_validator_info().keys()
1147            {
1148                elected.insert(*committee_member, VoteCount::default());
1149            }
1150            loop {
1151                let block = self
1152                    .consensus_db
1153                    .get_ledger_block(&voted_block_id)?
1154                    .unwrap();
1155                diem_trace!("count vote for block {:?}", block);
1156                if block.quorum_cert().ledger_info().signatures().len() == 0 {
1157                    // parent is round-0 virtual block and has not voters, so we
1158                    // just add `leader_count` and break the loop.
1159                    if let Some(author) = block.author() {
1160                        let leader_status =
1161                            elected.get_mut(&author).expect("in epoch state");
1162                        leader_status.leader_count += 1;
1163                    }
1164                    break;
1165                }
1166                if let Some(author) = block.author() {
1167                    let leader_status =
1168                        elected.get_mut(&author).expect("in epoch state");
1169                    leader_status.leader_count += 1;
1170                    leader_status.included_vote_count += verifier
1171                        .extra_vote_count(
1172                            block
1173                                .quorum_cert()
1174                                .ledger_info()
1175                                .signatures()
1176                                .keys(),
1177                        )
1178                        .unwrap();
1179                }
1180                for voter in
1181                    block.quorum_cert().ledger_info().signatures().keys()
1182                {
1183                    elected
1184                        .get_mut(&voter)
1185                        .expect("in epoch state")
1186                        .vote_count +=
1187                        verifier.get_voting_power(voter).unwrap();
1188                }
1189                voted_block_id = block.parent_id();
1190            }
1191            let mut force_retired = HashSet::new();
1192
1193            // Force retire the nodes that have not voted in this term.
1194            for (node, vote_count) in elected.iter_mut() {
1195                if vote_count.vote_count == 0 {
1196                    force_retired.insert(node);
1197                } else {
1198                    vote_count.total_votes =
1199                        verifier.get_voting_power(node).unwrap_or(0);
1200                    if vote_count.total_votes == 0 {
1201                        diem_warn!("Node {:?} has voted for epoch {} without voting power.",
1202                            node,
1203                            pos_state_to_commit.epoch_state().epoch);
1204                    }
1205                }
1206            }
1207
1208            if !force_retired.is_empty() {
1209                // `end_epoch` has been checked above and is excluded below.
1210                let end_epoch = ledger_info_with_sigs.ledger_info().epoch();
1211                let start_epoch = end_epoch.saturating_sub(
1212                    POS_STATE_CONFIG.force_retire_check_epoch_count(
1213                        pos_state_to_commit.current_view(),
1214                    ),
1215                ) + 1;
1216                // Check more past epochs to see if the nodes in `force_retired`
1217                // have voted.
1218                for end_ledger_info in self
1219                    .db_with_cache
1220                    .db
1221                    .reader
1222                    .get_epoch_ending_ledger_infos(start_epoch, end_epoch)?
1223                    .get_all_ledger_infos()
1224                {
1225                    let mut voted_block_id =
1226                        end_ledger_info.ledger_info().consensus_block_id();
1227                    loop {
1228                        let block = self
1229                            .consensus_db
1230                            .get_ledger_block(&voted_block_id)?
1231                            .unwrap();
1232                        if block.quorum_cert().ledger_info().signatures().len()
1233                            == 0
1234                        {
1235                            break;
1236                        }
1237                        for voter in block
1238                            .quorum_cert()
1239                            .ledger_info()
1240                            .signatures()
1241                            .keys()
1242                        {
1243                            // Find a vote, so the node will not be force
1244                            // retired.
1245                            force_retired.remove(voter);
1246                        }
1247                        voted_block_id = block.parent_id();
1248                    }
1249                }
1250                for node in force_retired {
1251                    pos_state_to_commit.force_retire_node(&node)?;
1252                }
1253            }
1254
1255            let reward_event = RewardDistributionEventV2 {
1256                candidates: pos_state_to_commit.next_evicted_term(),
1257                elected: elected
1258                    .into_iter()
1259                    .map(|(k, v)| (H256::from_slice(k.as_ref()), v))
1260                    .collect(),
1261                view: pos_state_to_commit.current_view(),
1262            };
1263            self.db_with_cache.db.writer.save_reward_event(
1264                ledger_info_with_sigs.ledger_info().epoch(),
1265                &reward_event,
1266            )?;
1267            self.db_with_cache
1268                .get_block(&ending_block)
1269                .expect("latest committed block not pruned")
1270                .lock()
1271                .replace_pos_state(pos_state_to_commit.clone());
1272        }
1273
1274        diem_info!(
1275            LogSchema::new(LogEntry::BlockExecutor).block_id(
1276                ledger_info_with_sigs.ledger_info().consensus_block_id()
1277            ),
1278            "commit_block"
1279        );
1280
1281        let version = ledger_info_with_sigs.ledger_info().version();
1282
1283        let num_txns_in_li = version
1284            .checked_add(1)
1285            .ok_or_else(|| format_err!("version + 1 overflows"))?;
1286        let num_persistent_txns = self
1287            .db_with_cache
1288            .cache
1289            .lock()
1290            .synced_trees()
1291            .txn_accumulator()
1292            .num_leaves();
1293
1294        if num_txns_in_li < num_persistent_txns {
1295            return Err(Error::InternalError {
1296                error: format!(
1297                    "Try to commit stale transactions with the last version as {}",
1298                    version
1299                ),
1300            });
1301        }
1302
1303        // All transactions that need to go to storage. In the above example,
1304        // this means all the transactions in A, B and C whose status ==
1305        // TransactionStatus::Keep. This must be done before calculate
1306        // potential skipping of transactions in idempotent commit.
1307        let mut txns_to_keep = vec![];
1308        let arc_blocks = block_ids
1309            .iter()
1310            .map(|id| self.db_with_cache.get_block(id))
1311            .collect::<Result<Vec<_>, Error>>()?;
1312        let blocks = arc_blocks.iter().map(|b| b.lock()).collect::<Vec<_>>();
1313        let mut committed_blocks = Vec::new();
1314        let mut signatures_vec = Vec::new();
1315        if ledger_info_with_sigs.ledger_info().epoch() != 0 {
1316            for (i, b) in blocks.iter().enumerate() {
1317                let ledger_block = self
1318                    .consensus_db
1319                    .get_ledger_block(&b.id())
1320                    .unwrap()
1321                    .unwrap();
1322                let view =
1323                    b.output().executed_trees().pos_state().current_view();
1324                committed_blocks.push(CommittedBlock {
1325                    hash: b.id(),
1326                    epoch: ledger_block.epoch(),
1327                    miner: ledger_block.author(),
1328                    parent_hash: ledger_block.parent_id(),
1329                    round: ledger_block.round(),
1330                    pivot_decision: b.output().pivot_block().clone().unwrap(),
1331                    version: b.output().version().unwrap(),
1332                    timestamp: ledger_block.timestamp_usecs(),
1333                    view,
1334                    is_skipped: b
1335                        .output()
1336                        .executed_trees()
1337                        .pos_state()
1338                        .skipped(),
1339                });
1340                // The signatures of each block is in the qc of the next block.
1341                if i != 0 {
1342                    signatures_vec.push((
1343                        ledger_block.quorum_cert().certified_block().id(),
1344                        ledger_block.quorum_cert().ledger_info().clone(),
1345                    ));
1346                }
1347            }
1348            let last_block = blocks.last().expect("not empty").id();
1349            if let Some(qc) = self.consensus_db.get_qc_for_block(&last_block)? {
1350                signatures_vec.push((last_block, qc.ledger_info().clone()));
1351            } else {
1352                // If we are catching up, all QCs come from retrieved blocks, so
1353                // we cannot get the QC that votes for the last
1354                // block in an epoch as the QC is within another
1355                // unknown child block.
1356                assert!(ledger_info_with_sigs.ledger_info().ends_epoch());
1357            }
1358        } else {
1359            committed_blocks.push(CommittedBlock {
1360                hash: ledger_info_with_sigs.ledger_info().consensus_block_id(),
1361                epoch: 0,
1362                round: 0,
1363                miner: None,
1364                parent_hash: HashValue::default(),
1365                pivot_decision: ledger_info_with_sigs
1366                    .ledger_info()
1367                    .pivot_decision()
1368                    .unwrap()
1369                    .clone(),
1370                version: ledger_info_with_sigs.ledger_info().version(),
1371                timestamp: ledger_info_with_sigs
1372                    .ledger_info()
1373                    .timestamp_usecs(),
1374                view: 1,
1375                is_skipped: false,
1376            });
1377        }
1378        for (txn, txn_data) in blocks.iter().flat_map(|block| {
1379            itertools::zip_eq(
1380                block.transactions(),
1381                block.output().transaction_data(),
1382            )
1383        }) {
1384            if let TransactionStatus::Keep(recorded_status) = txn_data.status()
1385            {
1386                txns_to_keep.push(TransactionToCommit::new(
1387                    txn.clone(),
1388                    txn_data.account_blobs().clone(),
1389                    txn_data.events().to_vec(),
1390                    txn_data.gas_used(),
1391                    recorded_status.clone(),
1392                ));
1393            }
1394        }
1395
1396        let last_block = blocks
1397            .last()
1398            .ok_or_else(|| format_err!("CommittableBlockBatch is empty"))?;
1399
1400        // Check that the version in ledger info (computed by consensus) matches
1401        // the version computed by us.
1402        let num_txns_in_speculative_accumulator = last_block
1403            .output()
1404            .executed_trees()
1405            .txn_accumulator()
1406            .num_leaves();
1407        assert_eq!(
1408            num_txns_in_li, num_txns_in_speculative_accumulator as Version,
1409            "Number of transactions in ledger info ({}) does not match number of transactions \
1410             in accumulator ({}).",
1411            num_txns_in_li, num_txns_in_speculative_accumulator,
1412        );
1413
1414        let num_txns_to_keep = txns_to_keep.len() as u64;
1415
1416        // Skip txns that are already committed to allow failures in state sync
1417        // process.
1418        let first_version_to_keep = num_txns_in_li - num_txns_to_keep;
1419        assert!(
1420            first_version_to_keep <= num_persistent_txns,
1421            "first_version {} in the blocks to commit cannot exceed # of committed txns: {}.",
1422            first_version_to_keep,
1423            num_persistent_txns
1424        );
1425
1426        let num_txns_to_skip = num_persistent_txns - first_version_to_keep;
1427        let first_version_to_commit = first_version_to_keep + num_txns_to_skip;
1428
1429        if num_txns_to_skip != 0 {
1430            diem_debug!(
1431                LogSchema::new(LogEntry::BlockExecutor)
1432                    .latest_synced_version(num_persistent_txns - 1)
1433                    .first_version_to_keep(first_version_to_keep)
1434                    .num_txns_to_keep(num_txns_to_keep)
1435                    .first_version_to_commit(first_version_to_commit),
1436                "skip_transactions_when_committing"
1437            );
1438        }
1439
1440        // Skip duplicate txns that are already persistent.
1441        let txns_to_commit = &txns_to_keep[num_txns_to_skip as usize..];
1442
1443        let num_txns_to_commit = txns_to_commit.len() as u64;
1444        {
1445            let _timer = DIEM_EXECUTOR_SAVE_TRANSACTIONS_SECONDS.start_timer();
1446            DIEM_EXECUTOR_TRANSACTIONS_SAVED.observe(num_txns_to_commit as f64);
1447
1448            assert_eq!(
1449                first_version_to_commit,
1450                num_txns_in_li - num_txns_to_commit
1451            );
1452            fail_point!("executor::commit_blocks", |_| {
1453                Err(Error::from(anyhow::anyhow!(
1454                    "Injected error in commit_blocks"
1455                )))
1456            });
1457            self.db_with_cache.db.writer.save_transactions(
1458                txns_to_commit,
1459                first_version_to_commit,
1460                Some(&ledger_info_with_sigs),
1461                Some(pos_state_to_commit),
1462                committed_blocks,
1463                signatures_vec,
1464            )?;
1465        }
1466
1467        // Calculate committed transactions and reconfig events now that commit
1468        // has succeeded
1469        let mut committed_txns = vec![];
1470        let mut reconfig_events = vec![];
1471        for txn in txns_to_commit.iter() {
1472            committed_txns.push(txn.transaction().clone());
1473            reconfig_events.append(&mut Self::extract_reconfig_events(
1474                txn.events().to_vec(),
1475            ));
1476        }
1477
1478        for block in blocks {
1479            block.output().executed_trees().state_tree().prune()
1480        }
1481
1482        let old_committed_block = self.db_with_cache.prune(
1483            ledger_info_with_sigs.ledger_info(),
1484            committed_txns.clone(),
1485            reconfig_events.clone(),
1486        )?;
1487        self.db_with_cache
1488            .db
1489            .writer
1490            .delete_pos_state_by_block(&old_committed_block)?;
1491
1492        // Now that the blocks are persisted successfully, we can reply to
1493        // consensus
1494        Ok((committed_txns, reconfig_events))
1495    }
1496}
1497
1498/// For all accounts modified by this transaction, find the previous blob and
1499/// update it based on the write set. Returns the blob value of all these
1500/// accounts.
1501pub fn process_write_set(
1502    transaction: &Transaction,
1503    account_to_state: &mut HashMap<AccountAddress, AccountState>,
1504    write_set: WriteSet,
1505) -> Result<HashMap<AccountAddress, AccountStateBlob>> {
1506    let mut updated_blobs = HashMap::new();
1507
1508    // Find all addresses this transaction touches while processing each write
1509    // op.
1510    let mut addrs = HashSet::new();
1511    for (access_path, write_op) in write_set.into_iter() {
1512        let address = access_path.address;
1513        let path = access_path.path;
1514        match account_to_state.entry(address) {
1515            hash_map::Entry::Occupied(mut entry) => {
1516                update_account_state(entry.get_mut(), path, write_op);
1517            }
1518            hash_map::Entry::Vacant(entry) => {
1519                // Before writing to an account, VM should always read that
1520                // account. So we should not reach this code
1521                // path. The exception is genesis transaction (and
1522                // maybe other writeset transactions).
1523                match transaction {
1524                    Transaction::GenesisTransaction(_) => (),
1525                    Transaction::BlockMetadata(_) => {
1526                        // bail!("BlockMetadata: Write set should be a subset of
1527                        // read set.")
1528                    }
1529                    Transaction::UserTransaction(txn) => match txn.payload() {
1530                        TransactionPayload::WriteSet(_) => (),
1531                        _ => bail!(
1532                            "Write set should be a subset of read set: {:?}.",
1533                            txn
1534                        ),
1535                    },
1536                }
1537
1538                let mut account_state = Default::default();
1539                update_account_state(&mut account_state, path, write_op);
1540                entry.insert(account_state);
1541            }
1542        }
1543        addrs.insert(address);
1544    }
1545
1546    for addr in addrs {
1547        let account_state =
1548            account_to_state.get(&addr).expect("Address should exist.");
1549        let account_blob = AccountStateBlob::try_from(account_state)?;
1550        updated_blobs.insert(addr, account_blob);
1551    }
1552
1553    Ok(updated_blobs)
1554}
1555
1556fn update_account_state(
1557    account_state: &mut AccountState, path: Vec<u8>, write_op: WriteOp,
1558) {
1559    match write_op {
1560        WriteOp::Value(new_value) => account_state.insert(path, new_value),
1561        WriteOp::Deletion => account_state.remove(&path),
1562    };
1563}