1#![forbid(unsafe_code)]
9
10use std::{
11 collections::{hash_map, BTreeMap, HashMap, HashSet},
12 convert::TryFrom,
13 marker::PhantomData,
14 sync::Arc,
15};
16
17use anyhow::{anyhow, bail, ensure, format_err, Result};
18use fail::fail_point;
19
20use cached_pos_ledger_db::CachedPosLedgerDB;
21use cfx_types::H256;
22use consensus_types::db::LedgerBlockRW;
23use diem_crypto::{
24 hash::{
25 CryptoHash, EventAccumulatorHasher, TransactionAccumulatorHasher,
26 PRE_GENESIS_BLOCK_ID,
27 },
28 HashValue,
29};
30use diem_logger::prelude::*;
31use diem_state_view::StateViewId;
32use diem_types::{
33 account_address::{AccountAddress, HashAccountAddress},
34 account_state::AccountState,
35 account_state_blob::AccountStateBlob,
36 block_info::PivotBlockDecision,
37 committed_block::CommittedBlock,
38 contract_event::ContractEvent,
39 epoch_state::EpochState,
40 ledger_info::LedgerInfoWithSignatures,
41 on_chain_config,
42 proof::accumulator::InMemoryAccumulator,
43 reward_distribution_event::{RewardDistributionEventV2, VoteCount},
44 term_state::{
45 ElectionEvent, PosState, RegisterEvent, RetireEvent,
46 UpdateVotingPowerEvent,
47 },
48 transaction::{
49 Transaction, TransactionInfo, TransactionListWithProof,
50 TransactionOutput, TransactionPayload, TransactionStatus,
51 TransactionToCommit, Version,
52 },
53 write_set::{WriteOp, WriteSet},
54};
55use executor_types::{
56 BlockExecutor, ChunkExecutor, Error, ExecutedTrees, ProcessedVMOutput,
57 ProofReader, StateComputeResult, TransactionData, TransactionReplayer,
58};
59use pow_types::PowInterface;
60use storage_interface::state_view::VerifiedStateView;
61
62use crate::{
63 logging::{LogEntry, LogSchema},
64 metrics::{
65 DIEM_EXECUTOR_COMMIT_BLOCKS_SECONDS, DIEM_EXECUTOR_ERRORS,
66 DIEM_EXECUTOR_EXECUTE_AND_COMMIT_CHUNK_SECONDS,
67 DIEM_EXECUTOR_EXECUTE_BLOCK_SECONDS,
68 DIEM_EXECUTOR_SAVE_TRANSACTIONS_SECONDS,
69 DIEM_EXECUTOR_TRANSACTIONS_SAVED,
70 DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS,
71 },
72 vm::VMExecutor,
73};
74use diem_types::term_state::{
75 pos_state_config::{PosStateConfigTrait, POS_STATE_CONFIG},
76 DisputeEvent,
77};
78
79pub mod db_bootstrapper;
80mod logging;
81mod metrics;
82pub mod vm;
83
84type SparseMerkleProof = diem_types::proof::SparseMerkleProof<AccountStateBlob>;
85
86pub struct Executor<V> {
89 db_with_cache: Arc<CachedPosLedgerDB>,
90 consensus_db: Arc<dyn LedgerBlockRW>,
91 phantom: PhantomData<V>,
92 pow_handler: Arc<dyn PowInterface>,
93}
94
95impl<V> Executor<V>
96where V: VMExecutor
97{
98 pub fn committed_block_id(&self) -> HashValue {
99 self.db_with_cache.committed_block_id()
100 }
101
102 pub fn new(
104 db_with_cache: Arc<CachedPosLedgerDB>,
105 pow_handler: Arc<dyn PowInterface>,
106 consensus_db: Arc<dyn LedgerBlockRW>,
107 ) -> Self {
108 Self {
109 db_with_cache,
110 consensus_db,
111 phantom: PhantomData,
112 pow_handler,
113 }
114 }
115
116 fn find_chunk_li(
119 verified_target_li: LedgerInfoWithSignatures,
120 epoch_change_li: Option<LedgerInfoWithSignatures>,
121 new_output: &ProcessedVMOutput,
122 ) -> Result<Option<LedgerInfoWithSignatures>> {
123 if verified_target_li.ledger_info().version()
126 == new_output.version().unwrap_or(0)
127 {
128 ensure!(
129 verified_target_li
130 .ledger_info()
131 .transaction_accumulator_hash()
132 == new_output.accu_root(),
133 "Root hash in target ledger info does not match local computation."
134 );
135 return Ok(Some(verified_target_li));
136 }
137 if let Some(epoch_change_li) = epoch_change_li {
140 ensure!(
143 epoch_change_li.ledger_info().transaction_accumulator_hash()
144 == new_output.accu_root(),
145 "Root hash of a given epoch LI does not match local computation."
146 );
147 ensure!(
148 epoch_change_li.ledger_info().version()
149 == new_output.version().unwrap_or(0),
150 "Version of a given epoch LI does not match local computation."
151 );
152 ensure!(
153 epoch_change_li.ledger_info().ends_epoch(),
154 "Epoch change LI does not carry validator set"
155 );
156 ensure!(
157 epoch_change_li.ledger_info().next_epoch_state()
158 == new_output.epoch_state().as_ref(),
159 "New validator set of a given epoch LI does not match local computation"
160 );
161 return Ok(Some(epoch_change_li));
162 }
163 ensure!(
164 new_output.epoch_state().is_none(),
165 "End of epoch chunk based on local computation but no EoE LedgerInfo provided."
166 );
167 Ok(None)
168 }
169
170 fn verify_chunk(
176 &self, txn_list_with_proof: TransactionListWithProof,
177 verified_target_li: &LedgerInfoWithSignatures,
178 ) -> Result<(Vec<Transaction>, Vec<TransactionInfo>)> {
179 txn_list_with_proof.verify(
182 verified_target_li.ledger_info(),
183 txn_list_with_proof.first_transaction_version,
184 )?;
185
186 if txn_list_with_proof.transactions.is_empty() {
188 return Ok((Vec::new(), Vec::new()));
189 }
190 let first_txn_version =
191 match txn_list_with_proof.first_transaction_version {
192 Some(tx) => tx as Version,
193 None => {
194 bail!(
195 "first_transaction_version doesn't exist in {:?}",
196 txn_list_with_proof
197 );
198 }
199 };
200
201 let num_committed_txns = self
202 .db_with_cache
203 .cache
204 .lock()
205 .synced_trees()
206 .txn_accumulator()
207 .num_leaves();
208 ensure!(
209 first_txn_version <= num_committed_txns,
210 "Transaction list too new. Expected version: {}. First transaction version: {}.",
211 num_committed_txns,
212 first_txn_version
213 );
214 let versions_between_first_and_committed =
215 num_committed_txns - first_txn_version;
216 if txn_list_with_proof.transactions.len()
217 <= versions_between_first_and_committed as usize
218 {
219 return Ok((Vec::new(), Vec::new()));
221 }
222
223 let num_txns_to_skip = num_committed_txns - first_txn_version;
226
227 diem_debug!(
228 LogSchema::new(LogEntry::ChunkExecutor).num(num_txns_to_skip),
229 "skipping_chunk_txns"
230 );
231
232 let skipped_transaction_infos = &txn_list_with_proof
235 .proof
236 .transaction_infos()[..num_txns_to_skip as usize];
237
238 let frozen_subtree_roots_from_proof = txn_list_with_proof
241 .proof
242 .left_siblings()
243 .iter()
244 .rev()
245 .cloned()
246 .collect::<Vec<_>>();
247 let accu_from_proof =
248 InMemoryAccumulator::<TransactionAccumulatorHasher>::new(
249 frozen_subtree_roots_from_proof,
250 first_txn_version,
251 )?
252 .append(
253 &skipped_transaction_infos
254 .iter()
255 .map(CryptoHash::hash)
256 .collect::<Vec<_>>()[..],
257 );
258 ensure!(
260 self.db_with_cache.cache.lock().synced_trees().state_id() == accu_from_proof.root_hash(),
261 "Fork happens because the current synced_trees doesn't match the txn list provided."
262 );
263
264 let mut txns: Vec<_> = txn_list_with_proof.transactions;
266 txns.drain(0..num_txns_to_skip as usize);
267 let (_, mut txn_infos) = txn_list_with_proof.proof.unpack();
268 txn_infos.drain(0..num_txns_to_skip as usize);
269
270 Ok((txns, txn_infos))
271 }
272
273 fn process_vm_outputs(
276 &self, mut account_to_state: HashMap<AccountAddress, AccountState>,
277 account_to_proof: HashMap<HashValue, SparseMerkleProof>,
278 transactions: &[Transaction], vm_outputs: Vec<TransactionOutput>,
279 parent_trees: &ExecutedTrees, parent_block_id: &HashValue,
280 catch_up_mode: bool,
281 ) -> Result<ProcessedVMOutput> {
282 let mut txn_data = vec![];
287 let mut txn_info_hashes = vec![];
291
292 let proof_reader = ProofReader::new(account_to_proof);
293 let pivot_select_event_key =
294 PivotBlockDecision::pivot_select_event_key();
295 let election_event_key = ElectionEvent::event_key();
296 let retire_event_key = RetireEvent::event_key();
297 let register_event_key = RegisterEvent::event_key();
298 let update_voting_power_event_key = UpdateVotingPowerEvent::event_key();
299 let dispute_event_key = DisputeEvent::event_key();
300
301 let mut pivot_decision = None;
303 let mut new_pos_state = parent_trees.pos_state().clone();
304 let parent_pivot_decision = new_pos_state.pivot_decision().clone();
305 for vm_output in vm_outputs.clone().into_iter() {
306 for event in vm_output.events() {
307 if *event.key() == pivot_select_event_key {
309 if pivot_decision.is_some() {
310 bail!("Multiple pivot decisions in one block!");
311 }
312 pivot_decision = Some(PivotBlockDecision::from_bytes(
313 event.event_data(),
314 )?);
315 } else if *event.key() == election_event_key {
316 let election_event =
317 ElectionEvent::from_bytes(event.event_data())?;
318 new_pos_state.new_node_elected(&election_event)?;
319 } else if *event.key() == dispute_event_key {
320 let dispute_event =
321 DisputeEvent::from_bytes(event.event_data())?;
322 new_pos_state.forfeit_node(&dispute_event.node_id)?;
323 }
324 }
325 }
326
327 if *parent_block_id != *PRE_GENESIS_BLOCK_ID {
328 if let Some(pivot_decision) = &pivot_decision {
329 diem_debug!(
330 "process_vm_outputs: parent={:?} parent_pivot={:?}",
331 parent_block_id,
332 parent_pivot_decision
333 );
334
335 if !catch_up_mode {
339 if !self.pow_handler.validate_proposal_pivot_decision(
340 parent_pivot_decision.block_hash,
341 pivot_decision.block_hash,
342 ) {
343 bail!("Invalid pivot decision for block");
344 }
345
346 diem_debug!(
349 "check staking events: parent={:?} me={:?}",
350 parent_pivot_decision,
351 pivot_decision
352 );
353 let staking_events = self.pow_handler.get_staking_events(
354 parent_pivot_decision.height,
355 pivot_decision.height,
356 parent_pivot_decision.block_hash,
357 pivot_decision.block_hash,
358 )?;
359 let mut staking_events_iter = staking_events.iter();
360 for vm_output in vm_outputs.clone().into_iter() {
361 for event in vm_output.events() {
362 if *event.key() == register_event_key {
364 let register_event = RegisterEvent::from_bytes(
365 event.event_data(),
366 )?;
367 match register_event.matches_staking_event(staking_events_iter.next().ok_or(anyhow!("More staking transactions packed than actual pow events"))?) {
368 Ok(true) => {}
369 Ok(false) => bail!("Packed staking transactions unmatch PoW events)"),
370 Err(e) => diem_error!("error decoding pow events: err={:?}", e),
371 }
372 new_pos_state
373 .register_node(register_event.node_id)?;
374 } else if *event.key()
375 == update_voting_power_event_key
376 {
377 let update_voting_power_event =
378 UpdateVotingPowerEvent::from_bytes(
379 event.event_data(),
380 )?;
381 match update_voting_power_event.matches_staking_event(staking_events_iter.next().ok_or(anyhow!("More staking transactions packed than actual pow events"))?) {
382 Ok(true) => {}
383 Ok(false) => bail!("Packed staking transactions unmatch PoW events)"),
384 Err(e) => diem_error!("error decoding pow events: err={:?}", e),
385 }
386 new_pos_state.update_voting_power(
387 &update_voting_power_event.node_address,
388 update_voting_power_event.voting_power,
389 )?;
390 } else if *event.key() == retire_event_key {
391 let retire_event = RetireEvent::from_bytes(
392 event.event_data(),
393 )?;
394 match retire_event.matches_staking_event(staking_events_iter.next().ok_or(anyhow!("More staking transactions packed than actual pow events"))?) {
395 Ok(true) => {}
396 Ok(false) => bail!("Packed staking transactions unmatch PoW events)"),
397 Err(e) => diem_error!("error decoding pow events: err={:?}", e),
398 }
399 new_pos_state.retire_node(
400 &retire_event.node_id,
401 retire_event.votes,
402 )?;
403 }
404 }
405 }
406 ensure!(
407 staking_events_iter.next().is_none(),
408 "Not all PoW staking events are packed"
409 );
410 } else {
411 for vm_output in vm_outputs.clone().into_iter() {
412 for event in vm_output.events() {
413 if *event.key() == register_event_key {
415 let register_event = RegisterEvent::from_bytes(
416 event.event_data(),
417 )?;
418 new_pos_state
419 .register_node(register_event.node_id)?;
420 } else if *event.key()
421 == update_voting_power_event_key
422 {
423 let update_voting_power_event =
424 UpdateVotingPowerEvent::from_bytes(
425 event.event_data(),
426 )?;
427 new_pos_state.update_voting_power(
428 &update_voting_power_event.node_address,
429 update_voting_power_event.voting_power,
430 )?;
431 } else if *event.key() == retire_event_key {
432 let retire_event = RetireEvent::from_bytes(
433 event.event_data(),
434 )?;
435 new_pos_state.retire_node(
436 &retire_event.node_id,
437 retire_event.votes,
438 )?;
439 }
440 }
441 }
442 }
443 } else {
444 if vm_outputs.iter().any(|output| {
447 output.events().iter().any(|event| {
448 *event.key() == retire_event_key
449 || *event.key() == update_voting_power_event_key
450 })
451 }) {
452 bail!("Should not pack staking related transactions");
453 }
454 pivot_decision = Some(parent_pivot_decision);
455 }
456 }
457 if let Some(pivot_decision) = &pivot_decision {
459 new_pos_state.set_pivot_decision(pivot_decision.clone());
460 }
461 let mut next_epoch_state = new_pos_state.next_view()?;
462
463 let txn_blobs =
464 itertools::zip_eq(vm_outputs.iter(), transactions.iter())
465 .map(|(vm_output, txn)| {
466 process_write_set(
467 txn,
468 &mut account_to_state,
469 vm_output.write_set().clone(),
470 )
471 })
472 .collect::<Result<Vec<_>>>()?;
473
474 let (txn_state_roots, current_state_tree) = parent_trees
475 .state_tree()
476 .batch_update(
477 txn_blobs
478 .iter()
479 .map(|m| {
480 m.iter()
481 .map(|(account, value)| (account.hash(), value))
482 .collect::<Vec<_>>()
483 })
484 .collect(),
485 &proof_reader,
486 )
487 .expect("Failed to update state tree.");
488
489 for ((vm_output, txn), (mut state_tree_hash, blobs)) in
490 itertools::zip_eq(
491 itertools::zip_eq(vm_outputs.into_iter(), transactions.iter()),
492 itertools::zip_eq(txn_state_roots, txn_blobs),
493 )
494 {
495 diem_debug!(
497 "process_vm_outputs: {} {:?}",
498 parent_trees.txn_accumulator().version(),
499 state_tree_hash
500 );
501 if parent_trees.txn_accumulator().version() != 0 {
502 state_tree_hash = Default::default();
504 }
505 let event_tree = {
506 let event_hashes: Vec<_> =
507 vm_output.events().iter().map(CryptoHash::hash).collect();
508 InMemoryAccumulator::<EventAccumulatorHasher>::from_leaves(
509 &event_hashes,
510 )
511 };
512
513 let mut txn_info_hash = None;
514 match vm_output.status() {
515 TransactionStatus::Keep(status) => {
516 let txn_info = TransactionInfo::new(
524 txn.hash(),
525 state_tree_hash,
526 event_tree.root_hash(),
527 vm_output.gas_used(),
528 status.clone(),
529 );
530
531 let real_txn_info_hash = txn_info.hash();
532 txn_info_hashes.push(real_txn_info_hash);
533 txn_info_hash = Some(real_txn_info_hash);
534 }
535 TransactionStatus::Discard(status) => {
536 if !vm_output.write_set().is_empty()
537 || !vm_output.events().is_empty()
538 {
539 diem_error!(
540 "Discarded transaction has non-empty write set or events. \
541 Transaction: {:?}. Status: {:?}.",
542 txn, status,
543 );
544 DIEM_EXECUTOR_ERRORS.inc();
545 }
546 }
547 TransactionStatus::Retry => (),
548 }
549
550 txn_data.push(TransactionData::new(
551 blobs,
552 vm_output.events().to_vec(),
553 vm_output.status().clone(),
554 state_tree_hash,
555 Arc::new(event_tree),
556 vm_output.gas_used(),
557 txn_info_hash,
558 ));
559 }
560
561 if next_epoch_state.is_some()
563 && next_epoch_state.as_ref().unwrap().epoch == 1
564 {
565 txn_data.resize(
567 transactions.len(),
568 TransactionData::new(
569 HashMap::new(),
570 vec![],
571 TransactionStatus::Retry,
572 current_state_tree.root_hash(),
573 Arc::new(
574 InMemoryAccumulator::<EventAccumulatorHasher>::default(
575 ),
576 ),
577 0,
578 None,
579 ),
580 );
581
582 let validator_set = account_to_state
583 .get(&on_chain_config::config_address())
584 .map(|state| {
585 state.get_validator_set()?.ok_or_else(|| {
586 format_err!("ValidatorSet does not exist")
587 })
588 })
589 .ok_or_else(|| {
590 format_err!("ValidatorSet account does not exist")
591 })??;
592 next_epoch_state = Some(EpochState::new(
603 1,
607 (&validator_set).into(),
608 pivot_decision
609 .as_ref()
610 .map(|p| p.block_hash.as_bytes().to_vec())
611 .unwrap_or(vec![]),
612 ))
613 };
614
615 let current_transaction_accumulator =
616 parent_trees.txn_accumulator().append(&txn_info_hashes);
617
618 Ok(ProcessedVMOutput::new(
619 txn_data,
620 ExecutedTrees::new_copy(
621 Arc::new(current_state_tree),
622 Arc::new(current_transaction_accumulator),
623 new_pos_state,
624 ),
625 next_epoch_state,
626 pivot_decision,
628 ))
629 }
630
631 fn extract_reconfig_events(
632 events: Vec<ContractEvent>,
633 ) -> Vec<ContractEvent> {
634 let new_epoch_event_key = on_chain_config::new_epoch_event_key();
635 events
636 .into_iter()
637 .filter(|event| *event.key() == new_epoch_event_key)
638 .collect()
639 }
640
641 fn get_executed_trees(
642 &self, block_id: HashValue,
643 ) -> Result<ExecutedTrees, Error> {
644 let executed_trees = if block_id
645 == self.db_with_cache.cache.lock().committed_block_id()
646 {
647 self.db_with_cache.cache.lock().committed_trees().clone()
648 } else {
649 self.db_with_cache
650 .get_block(&block_id)?
651 .lock()
652 .output()
653 .executed_trees()
654 .clone()
655 };
656
657 Ok(executed_trees)
658 }
659
660 fn get_executed_state_view<'a>(
661 &self, id: StateViewId, executed_trees: &'a ExecutedTrees,
662 ) -> VerifiedStateView<'a> {
663 let cache = self.db_with_cache.cache.lock();
664 VerifiedStateView::new(
665 id,
666 Arc::clone(&self.db_with_cache.db.reader),
667 cache.committed_trees().version(),
668 cache.committed_trees().state_root(),
669 executed_trees.state_tree(),
670 executed_trees.pos_state().clone(),
671 )
672 }
673
674 fn replay_transactions_impl(
675 &self, first_version: u64, transactions: Vec<Transaction>,
676 transaction_infos: Vec<TransactionInfo>,
677 ) -> Result<(
678 ProcessedVMOutput,
679 Vec<TransactionToCommit>,
680 Vec<ContractEvent>,
681 Vec<Transaction>,
682 Vec<TransactionInfo>,
683 )> {
684 let cache = self.db_with_cache.cache.lock();
686 let state_view = VerifiedStateView::new(
687 StateViewId::ChunkExecution { first_version },
688 Arc::clone(&self.db_with_cache.db.reader),
689 cache.synced_trees().version(),
690 cache.synced_trees().state_root(),
691 cache.synced_trees().state_tree(),
692 PosState::new_empty(),
694 );
695
696 fail_point!("executor::vm_execute_chunk", |_| {
697 Err(anyhow::anyhow!("Injected error in execute_chunk"))
698 });
699 let vm_outputs =
700 V::execute_block(transactions.clone(), &state_view, true)?;
701
702 for output in &vm_outputs {
705 if let TransactionStatus::Discard(_) = output.status() {
706 bail!("Syncing transactions that should be discarded.");
707 }
708 }
709
710 let (account_to_state, account_to_proof) = state_view.into();
711
712 let output = self.process_vm_outputs(
713 account_to_state,
714 account_to_proof,
715 &transactions,
716 vm_outputs,
717 cache.synced_trees(),
718 &HashValue::zero(),
720 true,
721 )?;
722
723 let mut txns_to_commit = vec![];
726 let mut reconfig_events = vec![];
727 let mut seen_retry = false;
728 let mut txns_to_retry = vec![];
729 let mut txn_infos_to_retry = vec![];
730 for ((txn, txn_data), (i, txn_info)) in itertools::zip_eq(
731 itertools::zip_eq(transactions, output.transaction_data()),
732 transaction_infos.into_iter().enumerate(),
733 ) {
734 let recorded_status = match txn_data.status() {
735 TransactionStatus::Keep(recorded_status) => recorded_status.clone(),
736 status @ TransactionStatus::Discard(_) => bail!(
737 "The transaction at version {}, got the status of 'Discard': {:?}",
738 first_version
739 .checked_add(i as u64)
740 .ok_or_else(|| format_err!("version + i overflows"))?,
741 status
742 ),
743 TransactionStatus::Retry => {
744 seen_retry = true;
745 txns_to_retry.push(txn);
746 txn_infos_to_retry.push(txn_info);
747 continue;
748 }
749 };
750 assert!(!seen_retry);
751 let generated_txn_info = TransactionInfo::new(
752 txn.hash(),
753 txn_data.state_root_hash(),
754 txn_data.event_root_hash(),
755 txn_data.gas_used(),
756 recorded_status.clone(),
757 );
758 ensure!(
759 txn_info == generated_txn_info,
760 "txn_info do not match for {}-th transaction in chunk.\nChunk txn_info: {}\nProof txn_info: {}",
761 i, generated_txn_info, txn_info
762 );
763 txns_to_commit.push(TransactionToCommit::new(
764 txn,
765 txn_data.account_blobs().clone(),
766 txn_data.events().to_vec(),
767 txn_data.gas_used(),
768 recorded_status,
769 ));
770 reconfig_events.append(&mut Self::extract_reconfig_events(
771 txn_data.events().to_vec(),
772 ));
773 }
774
775 Ok((
776 output,
777 txns_to_commit,
778 reconfig_events,
779 txns_to_retry,
780 txn_infos_to_retry,
781 ))
782 }
783
784 fn execute_chunk(
785 &self, first_version: u64, transactions: Vec<Transaction>,
786 transaction_infos: Vec<TransactionInfo>,
787 ) -> Result<(
788 ProcessedVMOutput,
789 Vec<TransactionToCommit>,
790 Vec<ContractEvent>,
791 )> {
792 let num_txns = transactions.len();
793
794 let (
795 processed_vm_output,
796 txns_to_commit,
797 events,
798 txns_to_retry,
799 _txn_infos_to_retry,
800 ) = self.replay_transactions_impl(
801 first_version,
802 transactions,
803 transaction_infos,
804 )?;
805
806 ensure!(
807 txns_to_retry.is_empty(),
808 "The transaction at version {} got the status of 'Retry'",
809 num_txns
810 .checked_sub(txns_to_retry.len())
811 .ok_or_else(|| format_err!("integer overflow occurred"))?
812 .checked_add(first_version as usize)
813 .ok_or_else(|| format_err!("integer overflow occurred"))?,
814 );
815
816 Ok((processed_vm_output, txns_to_commit, events))
817 }
818}
819
820impl<V: VMExecutor> ChunkExecutor for Executor<V> {
821 fn execute_and_commit_chunk(
822 &self,
823 txn_list_with_proof: TransactionListWithProof,
824 verified_target_li: LedgerInfoWithSignatures,
827 epoch_change_li: Option<LedgerInfoWithSignatures>,
830 ) -> Result<Vec<ContractEvent>> {
831 let _timer =
832 DIEM_EXECUTOR_EXECUTE_AND_COMMIT_CHUNK_SECONDS.start_timer();
833 diem_info!(
838 LogSchema::new(LogEntry::ChunkExecutor)
839 .local_synced_version(
840 self.db_with_cache
841 .cache
842 .lock()
843 .synced_trees()
844 .txn_accumulator()
845 .num_leaves()
846 - 1
847 )
848 .first_version_in_request(
849 txn_list_with_proof.first_transaction_version
850 )
851 .num_txns_in_request(txn_list_with_proof.transactions.len()),
852 "sync_request_received",
853 );
854
855 let (transactions, transaction_infos) =
857 self.verify_chunk(txn_list_with_proof, &verified_target_li)?;
858
859 let first_version = self
861 .db_with_cache
862 .cache
863 .lock()
864 .synced_trees()
865 .txn_accumulator()
866 .num_leaves();
867 let (output, txns_to_commit, reconfig_events) =
868 self.execute_chunk(first_version, transactions, transaction_infos)?;
869
870 let ledger_info_to_commit =
872 Self::find_chunk_li(verified_target_li, epoch_change_li, &output)?;
873 if ledger_info_to_commit.is_none() && txns_to_commit.is_empty() {
874 return Ok(reconfig_events);
875 }
876 fail_point!("executor::commit_chunk", |_| {
877 Err(anyhow::anyhow!("Injected error in commit_chunk"))
878 });
879 self.db_with_cache.db.writer.save_transactions(
880 &txns_to_commit,
881 first_version,
882 ledger_info_to_commit.as_ref(),
883 None,
884 vec![],
885 vec![],
886 )?;
887
888 let output_trees = output.executed_trees().clone();
890 if let Some(ledger_info_with_sigs) = &ledger_info_to_commit {
891 self.db_with_cache.update_block_tree_root(
892 output_trees,
893 ledger_info_with_sigs.ledger_info(),
894 vec![],
895 vec![],
896 );
897 } else {
898 self.db_with_cache.update_synced_trees(output_trees);
899 }
900 self.db_with_cache.reset();
901
902 diem_info!(
903 LogSchema::new(LogEntry::ChunkExecutor)
904 .synced_to_version(
905 self.db_with_cache
906 .cache
907 .lock()
908 .synced_trees()
909 .version()
910 .expect("version must exist")
911 )
912 .committed_with_ledger_info(ledger_info_to_commit.is_some()),
913 "sync_finished",
914 );
915
916 Ok(reconfig_events)
917 }
918}
919
920impl<V: VMExecutor> TransactionReplayer for Executor<V> {
921 fn replay_chunk(
922 &self, mut first_version: Version, mut txns: Vec<Transaction>,
923 mut txn_infos: Vec<TransactionInfo>,
924 ) -> Result<()> {
925 ensure!(
926 first_version
927 == self
928 .db_with_cache
929 .cache
930 .lock()
931 .synced_trees()
932 .txn_accumulator()
933 .num_leaves(),
934 "Version not expected. Expected: {}, got: {}",
935 self.db_with_cache
936 .cache
937 .lock()
938 .synced_trees()
939 .txn_accumulator()
940 .num_leaves(),
941 first_version,
942 );
943 while !txns.is_empty() {
944 let num_txns = txns.len();
945
946 let (output, txns_to_commit, _, txns_to_retry, txn_infos_to_retry) =
947 self.replay_transactions_impl(first_version, txns, txn_infos)?;
948 assert!(txns_to_retry.len() < num_txns);
949
950 self.db_with_cache.db.writer.save_transactions(
951 &txns_to_commit,
952 first_version,
953 None,
954 None,
955 vec![],
956 vec![],
957 )?;
958
959 self.db_with_cache
960 .update_synced_trees(output.executed_trees().clone());
961
962 txns = txns_to_retry;
963 txn_infos = txn_infos_to_retry;
964 first_version += txns_to_commit.len() as u64;
965 }
966 Ok(())
967 }
968
969 fn expecting_version(&self) -> Version {
970 self.db_with_cache
971 .cache
972 .lock()
973 .synced_trees()
974 .version()
975 .map_or(0, |v| v.checked_add(1).expect("Integer overflow occurred"))
976 }
977}
978
979impl<V: VMExecutor> BlockExecutor for Executor<V> {
980 fn committed_block_id(&self) -> Result<HashValue, Error> {
981 Ok(self.committed_block_id())
982 }
983
984 fn execute_block(
985 &self, block: (HashValue, Vec<Transaction>),
986 parent_block_id: HashValue, catch_up_mode: bool,
987 ) -> Result<StateComputeResult, Error> {
988 let (block_id, mut transactions) = block;
989
990 let (output, state_compute_result) = if parent_block_id
994 != self.committed_block_id()
995 && self
996 .db_with_cache
997 .get_block(&parent_block_id)?
998 .lock()
999 .output()
1000 .has_reconfiguration()
1001 {
1002 let parent = self.db_with_cache.get_block(&parent_block_id)?;
1003 let parent_block = parent.lock();
1004 let parent_output = parent_block.output();
1005
1006 diem_info!(
1007 LogSchema::new(LogEntry::BlockExecutor).block_id(block_id),
1008 "reconfig_descendant_block_received"
1009 );
1010
1011 let mut output = ProcessedVMOutput::new(
1012 vec![],
1013 parent_output.executed_trees().clone(),
1014 parent_output.epoch_state().clone(),
1015 parent_output.pivot_block().clone(),
1018 );
1019 output.set_pos_state_skipped();
1020
1021 let parent_accu = parent_output.executed_trees().txn_accumulator();
1022 let state_compute_result = output.compute_result(
1023 parent_accu.frozen_subtree_roots().clone(),
1024 parent_accu.num_leaves(),
1025 );
1026
1027 transactions = vec![];
1029
1030 (output, state_compute_result)
1031 } else {
1032 diem_info!(
1033 LogSchema::new(LogEntry::BlockExecutor).block_id(block_id),
1034 "execute_block"
1035 );
1036
1037 let _timer = DIEM_EXECUTOR_EXECUTE_BLOCK_SECONDS.start_timer();
1038
1039 let parent_block_executed_trees =
1040 self.get_executed_trees(parent_block_id)?;
1041
1042 let state_view = self.get_executed_state_view(
1043 StateViewId::BlockExecution { block_id },
1044 &parent_block_executed_trees,
1045 );
1046
1047 let vm_outputs = {
1052 let _timer =
1055 DIEM_EXECUTOR_VM_EXECUTE_BLOCK_SECONDS.start_timer();
1056 fail_point!("executor::vm_execute_block", |_| {
1057 Err(Error::from(anyhow::anyhow!(
1058 "Injected error in vm_execute_block"
1059 )))
1060 });
1061 V::execute_block(
1062 transactions.clone(),
1063 &state_view,
1064 catch_up_mode,
1065 )
1066 .map_err(anyhow::Error::from)?
1067 };
1068
1069 let status: Vec<_> = vm_outputs
1072 .iter()
1073 .map(TransactionOutput::status)
1074 .cloned()
1075 .collect();
1076 if !status.is_empty() {
1077 diem_trace!("Execution status: {:?}", status);
1078 }
1079
1080 let (account_to_state, account_to_proof) = state_view.into();
1081
1082 let output = self
1083 .process_vm_outputs(
1084 account_to_state,
1085 account_to_proof,
1086 &transactions,
1087 vm_outputs,
1088 &parent_block_executed_trees,
1089 &parent_block_id,
1090 catch_up_mode,
1091 )
1092 .map_err(|err| {
1093 format_err!("Failed to execute block: {}", err)
1094 })?;
1095
1096 let parent_accu = parent_block_executed_trees.txn_accumulator();
1097
1098 diem_debug!("parent leaves: {}", parent_accu.num_leaves());
1099 let state_compute_result = output.compute_result(
1100 parent_accu.frozen_subtree_roots().clone(),
1101 parent_accu.num_leaves(),
1102 );
1103 (output, state_compute_result)
1104 };
1105
1106 self.db_with_cache
1108 .add_block(parent_block_id, (block_id, transactions, output))?;
1109
1110 Ok(state_compute_result)
1111 }
1112
1113 fn commit_blocks(
1114 &self, block_ids: Vec<HashValue>,
1115 ledger_info_with_sigs: LedgerInfoWithSignatures,
1116 ) -> Result<(Vec<Transaction>, Vec<ContractEvent>), Error> {
1117 let _timer = DIEM_EXECUTOR_COMMIT_BLOCKS_SECONDS.start_timer();
1118 let mut pos_state_to_commit = self
1119 .get_executed_trees(
1120 ledger_info_with_sigs.ledger_info().consensus_block_id(),
1121 )?
1122 .pos_state()
1123 .clone();
1124
1125 if ledger_info_with_sigs.ledger_info().ends_epoch()
1128 && ledger_info_with_sigs.ledger_info().epoch() != 0
1129 {
1130 let ending_block =
1131 ledger_info_with_sigs.ledger_info().consensus_block_id();
1132 let mut elected = BTreeMap::new();
1133 let mut voted_block_id = ending_block;
1134 let verifier = self
1137 .db_with_cache
1138 .cache
1139 .lock()
1140 .committed_trees()
1141 .pos_state()
1142 .epoch_state()
1143 .verifier()
1144 .clone();
1146 for committee_member in verifier.address_to_validator_info().keys()
1147 {
1148 elected.insert(*committee_member, VoteCount::default());
1149 }
1150 loop {
1151 let block = self
1152 .consensus_db
1153 .get_ledger_block(&voted_block_id)?
1154 .unwrap();
1155 diem_trace!("count vote for block {:?}", block);
1156 if block.quorum_cert().ledger_info().signatures().len() == 0 {
1157 if let Some(author) = block.author() {
1160 let leader_status =
1161 elected.get_mut(&author).expect("in epoch state");
1162 leader_status.leader_count += 1;
1163 }
1164 break;
1165 }
1166 if let Some(author) = block.author() {
1167 let leader_status =
1168 elected.get_mut(&author).expect("in epoch state");
1169 leader_status.leader_count += 1;
1170 leader_status.included_vote_count += verifier
1171 .extra_vote_count(
1172 block
1173 .quorum_cert()
1174 .ledger_info()
1175 .signatures()
1176 .keys(),
1177 )
1178 .unwrap();
1179 }
1180 for voter in
1181 block.quorum_cert().ledger_info().signatures().keys()
1182 {
1183 elected
1184 .get_mut(&voter)
1185 .expect("in epoch state")
1186 .vote_count +=
1187 verifier.get_voting_power(voter).unwrap();
1188 }
1189 voted_block_id = block.parent_id();
1190 }
1191 let mut force_retired = HashSet::new();
1192
1193 for (node, vote_count) in elected.iter_mut() {
1195 if vote_count.vote_count == 0 {
1196 force_retired.insert(node);
1197 } else {
1198 vote_count.total_votes =
1199 verifier.get_voting_power(node).unwrap_or(0);
1200 if vote_count.total_votes == 0 {
1201 diem_warn!("Node {:?} has voted for epoch {} without voting power.",
1202 node,
1203 pos_state_to_commit.epoch_state().epoch);
1204 }
1205 }
1206 }
1207
1208 if !force_retired.is_empty() {
1209 let end_epoch = ledger_info_with_sigs.ledger_info().epoch();
1211 let start_epoch = end_epoch.saturating_sub(
1212 POS_STATE_CONFIG.force_retire_check_epoch_count(
1213 pos_state_to_commit.current_view(),
1214 ),
1215 ) + 1;
1216 for end_ledger_info in self
1219 .db_with_cache
1220 .db
1221 .reader
1222 .get_epoch_ending_ledger_infos(start_epoch, end_epoch)?
1223 .get_all_ledger_infos()
1224 {
1225 let mut voted_block_id =
1226 end_ledger_info.ledger_info().consensus_block_id();
1227 loop {
1228 let block = self
1229 .consensus_db
1230 .get_ledger_block(&voted_block_id)?
1231 .unwrap();
1232 if block.quorum_cert().ledger_info().signatures().len()
1233 == 0
1234 {
1235 break;
1236 }
1237 for voter in block
1238 .quorum_cert()
1239 .ledger_info()
1240 .signatures()
1241 .keys()
1242 {
1243 force_retired.remove(voter);
1246 }
1247 voted_block_id = block.parent_id();
1248 }
1249 }
1250 for node in force_retired {
1251 pos_state_to_commit.force_retire_node(&node)?;
1252 }
1253 }
1254
1255 let reward_event = RewardDistributionEventV2 {
1256 candidates: pos_state_to_commit.next_evicted_term(),
1257 elected: elected
1258 .into_iter()
1259 .map(|(k, v)| (H256::from_slice(k.as_ref()), v))
1260 .collect(),
1261 view: pos_state_to_commit.current_view(),
1262 };
1263 self.db_with_cache.db.writer.save_reward_event(
1264 ledger_info_with_sigs.ledger_info().epoch(),
1265 &reward_event,
1266 )?;
1267 self.db_with_cache
1268 .get_block(&ending_block)
1269 .expect("latest committed block not pruned")
1270 .lock()
1271 .replace_pos_state(pos_state_to_commit.clone());
1272 }
1273
1274 diem_info!(
1275 LogSchema::new(LogEntry::BlockExecutor).block_id(
1276 ledger_info_with_sigs.ledger_info().consensus_block_id()
1277 ),
1278 "commit_block"
1279 );
1280
1281 let version = ledger_info_with_sigs.ledger_info().version();
1282
1283 let num_txns_in_li = version
1284 .checked_add(1)
1285 .ok_or_else(|| format_err!("version + 1 overflows"))?;
1286 let num_persistent_txns = self
1287 .db_with_cache
1288 .cache
1289 .lock()
1290 .synced_trees()
1291 .txn_accumulator()
1292 .num_leaves();
1293
1294 if num_txns_in_li < num_persistent_txns {
1295 return Err(Error::InternalError {
1296 error: format!(
1297 "Try to commit stale transactions with the last version as {}",
1298 version
1299 ),
1300 });
1301 }
1302
1303 let mut txns_to_keep = vec![];
1308 let arc_blocks = block_ids
1309 .iter()
1310 .map(|id| self.db_with_cache.get_block(id))
1311 .collect::<Result<Vec<_>, Error>>()?;
1312 let blocks = arc_blocks.iter().map(|b| b.lock()).collect::<Vec<_>>();
1313 let mut committed_blocks = Vec::new();
1314 let mut signatures_vec = Vec::new();
1315 if ledger_info_with_sigs.ledger_info().epoch() != 0 {
1316 for (i, b) in blocks.iter().enumerate() {
1317 let ledger_block = self
1318 .consensus_db
1319 .get_ledger_block(&b.id())
1320 .unwrap()
1321 .unwrap();
1322 let view =
1323 b.output().executed_trees().pos_state().current_view();
1324 committed_blocks.push(CommittedBlock {
1325 hash: b.id(),
1326 epoch: ledger_block.epoch(),
1327 miner: ledger_block.author(),
1328 parent_hash: ledger_block.parent_id(),
1329 round: ledger_block.round(),
1330 pivot_decision: b.output().pivot_block().clone().unwrap(),
1331 version: b.output().version().unwrap(),
1332 timestamp: ledger_block.timestamp_usecs(),
1333 view,
1334 is_skipped: b
1335 .output()
1336 .executed_trees()
1337 .pos_state()
1338 .skipped(),
1339 });
1340 if i != 0 {
1342 signatures_vec.push((
1343 ledger_block.quorum_cert().certified_block().id(),
1344 ledger_block.quorum_cert().ledger_info().clone(),
1345 ));
1346 }
1347 }
1348 let last_block = blocks.last().expect("not empty").id();
1349 if let Some(qc) = self.consensus_db.get_qc_for_block(&last_block)? {
1350 signatures_vec.push((last_block, qc.ledger_info().clone()));
1351 } else {
1352 assert!(ledger_info_with_sigs.ledger_info().ends_epoch());
1357 }
1358 } else {
1359 committed_blocks.push(CommittedBlock {
1360 hash: ledger_info_with_sigs.ledger_info().consensus_block_id(),
1361 epoch: 0,
1362 round: 0,
1363 miner: None,
1364 parent_hash: HashValue::default(),
1365 pivot_decision: ledger_info_with_sigs
1366 .ledger_info()
1367 .pivot_decision()
1368 .unwrap()
1369 .clone(),
1370 version: ledger_info_with_sigs.ledger_info().version(),
1371 timestamp: ledger_info_with_sigs
1372 .ledger_info()
1373 .timestamp_usecs(),
1374 view: 1,
1375 is_skipped: false,
1376 });
1377 }
1378 for (txn, txn_data) in blocks.iter().flat_map(|block| {
1379 itertools::zip_eq(
1380 block.transactions(),
1381 block.output().transaction_data(),
1382 )
1383 }) {
1384 if let TransactionStatus::Keep(recorded_status) = txn_data.status()
1385 {
1386 txns_to_keep.push(TransactionToCommit::new(
1387 txn.clone(),
1388 txn_data.account_blobs().clone(),
1389 txn_data.events().to_vec(),
1390 txn_data.gas_used(),
1391 recorded_status.clone(),
1392 ));
1393 }
1394 }
1395
1396 let last_block = blocks
1397 .last()
1398 .ok_or_else(|| format_err!("CommittableBlockBatch is empty"))?;
1399
1400 let num_txns_in_speculative_accumulator = last_block
1403 .output()
1404 .executed_trees()
1405 .txn_accumulator()
1406 .num_leaves();
1407 assert_eq!(
1408 num_txns_in_li, num_txns_in_speculative_accumulator as Version,
1409 "Number of transactions in ledger info ({}) does not match number of transactions \
1410 in accumulator ({}).",
1411 num_txns_in_li, num_txns_in_speculative_accumulator,
1412 );
1413
1414 let num_txns_to_keep = txns_to_keep.len() as u64;
1415
1416 let first_version_to_keep = num_txns_in_li - num_txns_to_keep;
1419 assert!(
1420 first_version_to_keep <= num_persistent_txns,
1421 "first_version {} in the blocks to commit cannot exceed # of committed txns: {}.",
1422 first_version_to_keep,
1423 num_persistent_txns
1424 );
1425
1426 let num_txns_to_skip = num_persistent_txns - first_version_to_keep;
1427 let first_version_to_commit = first_version_to_keep + num_txns_to_skip;
1428
1429 if num_txns_to_skip != 0 {
1430 diem_debug!(
1431 LogSchema::new(LogEntry::BlockExecutor)
1432 .latest_synced_version(num_persistent_txns - 1)
1433 .first_version_to_keep(first_version_to_keep)
1434 .num_txns_to_keep(num_txns_to_keep)
1435 .first_version_to_commit(first_version_to_commit),
1436 "skip_transactions_when_committing"
1437 );
1438 }
1439
1440 let txns_to_commit = &txns_to_keep[num_txns_to_skip as usize..];
1442
1443 let num_txns_to_commit = txns_to_commit.len() as u64;
1444 {
1445 let _timer = DIEM_EXECUTOR_SAVE_TRANSACTIONS_SECONDS.start_timer();
1446 DIEM_EXECUTOR_TRANSACTIONS_SAVED.observe(num_txns_to_commit as f64);
1447
1448 assert_eq!(
1449 first_version_to_commit,
1450 num_txns_in_li - num_txns_to_commit
1451 );
1452 fail_point!("executor::commit_blocks", |_| {
1453 Err(Error::from(anyhow::anyhow!(
1454 "Injected error in commit_blocks"
1455 )))
1456 });
1457 self.db_with_cache.db.writer.save_transactions(
1458 txns_to_commit,
1459 first_version_to_commit,
1460 Some(&ledger_info_with_sigs),
1461 Some(pos_state_to_commit),
1462 committed_blocks,
1463 signatures_vec,
1464 )?;
1465 }
1466
1467 let mut committed_txns = vec![];
1470 let mut reconfig_events = vec![];
1471 for txn in txns_to_commit.iter() {
1472 committed_txns.push(txn.transaction().clone());
1473 reconfig_events.append(&mut Self::extract_reconfig_events(
1474 txn.events().to_vec(),
1475 ));
1476 }
1477
1478 for block in blocks {
1479 block.output().executed_trees().state_tree().prune()
1480 }
1481
1482 let old_committed_block = self.db_with_cache.prune(
1483 ledger_info_with_sigs.ledger_info(),
1484 committed_txns.clone(),
1485 reconfig_events.clone(),
1486 )?;
1487 self.db_with_cache
1488 .db
1489 .writer
1490 .delete_pos_state_by_block(&old_committed_block)?;
1491
1492 Ok((committed_txns, reconfig_events))
1495 }
1496}
1497
1498pub fn process_write_set(
1502 transaction: &Transaction,
1503 account_to_state: &mut HashMap<AccountAddress, AccountState>,
1504 write_set: WriteSet,
1505) -> Result<HashMap<AccountAddress, AccountStateBlob>> {
1506 let mut updated_blobs = HashMap::new();
1507
1508 let mut addrs = HashSet::new();
1511 for (access_path, write_op) in write_set.into_iter() {
1512 let address = access_path.address;
1513 let path = access_path.path;
1514 match account_to_state.entry(address) {
1515 hash_map::Entry::Occupied(mut entry) => {
1516 update_account_state(entry.get_mut(), path, write_op);
1517 }
1518 hash_map::Entry::Vacant(entry) => {
1519 match transaction {
1524 Transaction::GenesisTransaction(_) => (),
1525 Transaction::BlockMetadata(_) => {
1526 }
1529 Transaction::UserTransaction(txn) => match txn.payload() {
1530 TransactionPayload::WriteSet(_) => (),
1531 _ => bail!(
1532 "Write set should be a subset of read set: {:?}.",
1533 txn
1534 ),
1535 },
1536 }
1537
1538 let mut account_state = Default::default();
1539 update_account_state(&mut account_state, path, write_op);
1540 entry.insert(account_state);
1541 }
1542 }
1543 addrs.insert(address);
1544 }
1545
1546 for addr in addrs {
1547 let account_state =
1548 account_to_state.get(&addr).expect("Address should exist.");
1549 let account_blob = AccountStateBlob::try_from(account_state)?;
1550 updated_blobs.insert(addr, account_blob);
1551 }
1552
1553 Ok(updated_blobs)
1554}
1555
1556fn update_account_state(
1557 account_state: &mut AccountState, path: Vec<u8>, write_op: WriteOp,
1558) {
1559 match write_op {
1560 WriteOp::Value(new_value) => account_state.insert(path, new_value),
1561 WriteOp::Deletion => account_state.remove(&path),
1562 };
1563}