cfxcore/pos/state_sync/executor_proxy.rs
1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::pos::state_sync::{
9 counters,
10 error::Error,
11 logging::{LogEntry, LogEvent, LogSchema},
12 shared_components::SyncState,
13};
14use diem_logger::prelude::*;
15use diem_types::{
16 contract_event::ContractEvent,
17 ledger_info::LedgerInfoWithSignatures,
18 move_resource::MoveStorage,
19 on_chain_config::{OnChainConfigPayload, ON_CHAIN_CONFIG_REGISTRY},
20 transaction::TransactionListWithProof,
21};
22use executor_types::{ChunkExecutor, ExecutedTrees};
23use itertools::Itertools;
24use std::{collections::HashSet, sync::Arc};
25use storage_interface::DbReader;
26use subscription_service::ReconfigSubscription;
27
28/// Proxies interactions with execution and storage for state synchronization
29pub trait ExecutorProxyTrait: Send {
30 /// Sync the local state with the latest in storage.
31 fn get_local_storage_state(&self) -> Result<SyncState, Error>;
32
33 /// Execute and commit a batch of transactions
34 fn execute_chunk(
35 &mut self, txn_list_with_proof: TransactionListWithProof,
36 verified_target_li: LedgerInfoWithSignatures,
37 intermediate_end_of_epoch_li: Option<LedgerInfoWithSignatures>,
38 ) -> Result<(), Error>;
39
40 /// Gets chunk of transactions given the known version, target version and
41 /// the max limit.
42 fn get_chunk(
43 &self, known_version: u64, limit: u64, target_version: u64,
44 ) -> Result<TransactionListWithProof, Error>;
45
46 /// Get the epoch changing ledger info for the given epoch so that we can
47 /// move to next epoch.
48 fn get_epoch_change_ledger_info(
49 &self, epoch: u64,
50 ) -> Result<LedgerInfoWithSignatures, Error>;
51
52 /// Get ledger info at an epoch boundary version.
53 fn get_epoch_ending_ledger_info(
54 &self, version: u64,
55 ) -> Result<LedgerInfoWithSignatures, Error>;
56
57 /// Returns the ledger's timestamp for the given version in microseconds
58 fn get_version_timestamp(&self, version: u64) -> Result<u64, Error>;
59
60 /// publishes on-chain config updates to subscribed components
61 fn publish_on_chain_config_updates(
62 &mut self, events: Vec<ContractEvent>,
63 ) -> Result<(), Error>;
64}
65
66pub(crate) struct ExecutorProxy {
67 storage: Arc<dyn DbReader>,
68 executor: Box<dyn ChunkExecutor>,
69 reconfig_subscriptions: Vec<ReconfigSubscription>,
70 on_chain_configs: OnChainConfigPayload,
71}
72
73impl ExecutorProxy {
74 pub(crate) fn new(
75 storage: Arc<dyn DbReader>, executor: Box<dyn ChunkExecutor>,
76 mut reconfig_subscriptions: Vec<ReconfigSubscription>,
77 ) -> Self {
78 // TODO(lpl): Double check the `None` case here.
79 let on_chain_configs = if let Ok(Some(startup_info)) =
80 storage.get_startup_info(false)
81 {
82 let epoch_state = startup_info.latest_epoch_state.or(startup_info
83 .latest_ledger_info
84 .ledger_info()
85 .next_epoch_state()
86 .cloned());
87 if let Some(epoch_state) = epoch_state {
88 OnChainConfigPayload::new(
89 epoch_state.epoch,
90 Arc::new(
91 ON_CHAIN_CONFIG_REGISTRY
92 .iter()
93 .cloned()
94 .zip_eq(vec![bcs::to_bytes(&epoch_state).unwrap()])
95 .collect(),
96 ),
97 )
98 } else {
99 Self::fetch_all_configs(&*storage).expect(
100 "[state sync] Failed initial read of on-chain configs",
101 )
102 }
103 } else {
104 Self::fetch_all_configs(&*storage)
105 .expect("[state sync] Failed initial read of on-chain configs")
106 };
107
108 for subscription in reconfig_subscriptions.iter_mut() {
109 subscription.publish(on_chain_configs.clone()).expect(
110 "[state sync] Failed to publish initial on-chain config",
111 );
112 }
113 Self {
114 storage,
115 executor,
116 reconfig_subscriptions,
117 on_chain_configs,
118 }
119 }
120
121 fn fetch_all_configs(
122 storage: &dyn DbReader,
123 ) -> Result<OnChainConfigPayload, Error> {
124 let access_paths = ON_CHAIN_CONFIG_REGISTRY
125 .iter()
126 .map(|config_id| config_id.access_path())
127 .collect();
128 let configs = storage
129 .batch_fetch_resources_by_version(access_paths, 0)
130 .map_err(|error| {
131 Error::UnexpectedError(format!(
132 "Failed batch fetch of resources: {}",
133 error
134 ))
135 })?;
136 // TODO(linxi): get correct epoch
137 // let synced_version =
138 // storage.fetch_synced_version().map_err(|error| {
139 // Error::UnexpectedError(format!(
140 // "Failed to fetch storage synced version: {}",
141 // error
142 // ))
143 // })?;
144 //
145 // let account_state_blob = storage
146 // .get_account_state_with_proof_by_version(
147 // config_address(),
148 // synced_version,
149 // )
150 // .map_err(|error| {
151 // Error::UnexpectedError(format!(
152 // "Failed to fetch account state with proof {}",
153 // error
154 // ))
155 // })?
156 // .0;
157 /*let epoch = account_state_blob
158 .map(|blob| {
159 AccountState::try_from(&blob).and_then(|state| {
160 Ok(state
161 .get_configuration_resource()?
162 .ok_or_else(|| {
163 Error::UnexpectedError(
164 "Configuration resource does not exist".into(),
165 )
166 })?
167 .epoch())
168 })
169 })
170 .ok_or_else(|| {
171 Error::UnexpectedError("Missing account state blob".into())
172 })?
173 .map_err(|error| {
174 Error::UnexpectedError(format!(
175 "Failed to fetch configuration resource: {}",
176 error
177 ))
178 })?;*/
179
180 Ok(OnChainConfigPayload::new(
181 1, /* The epoch number after executing genesis block */
182 Arc::new(
183 ON_CHAIN_CONFIG_REGISTRY
184 .iter()
185 .cloned()
186 .zip_eq(configs)
187 .collect(),
188 ),
189 ))
190 }
191}
192
193impl ExecutorProxyTrait for ExecutorProxy {
194 fn get_local_storage_state(&self) -> Result<SyncState, Error> {
195 let storage_info =
196 self.storage.get_startup_info(false).map_err(|error| {
197 Error::UnexpectedError(format!(
198 "Failed to get startup info from storage: {}",
199 error
200 ))
201 })?;
202 let storage_info = storage_info.ok_or_else(|| {
203 Error::UnexpectedError("Missing startup info from storage".into())
204 })?;
205 let current_epoch_state = storage_info.get_epoch_state().clone();
206
207 let synced_trees =
208 if let Some(synced_tree_state) = storage_info.synced_tree_state {
209 // TODO(lpl): synced_tree_state.pos_state is left unhandled.
210 ExecutedTrees::from(synced_tree_state)
211 } else {
212 ExecutedTrees::new_with_pos_state(
213 storage_info.committed_tree_state,
214 storage_info.committed_pos_state,
215 )
216 };
217
218 Ok(SyncState::new(
219 storage_info.latest_ledger_info,
220 synced_trees,
221 current_epoch_state,
222 ))
223 }
224
225 fn execute_chunk(
226 &mut self, txn_list_with_proof: TransactionListWithProof,
227 verified_target_li: LedgerInfoWithSignatures,
228 intermediate_end_of_epoch_li: Option<LedgerInfoWithSignatures>,
229 ) -> Result<(), Error> {
230 // track chunk execution time
231 let timer = counters::EXECUTE_CHUNK_DURATION.start_timer();
232 let reconfig_events = self
233 .executor
234 .execute_and_commit_chunk(
235 txn_list_with_proof,
236 verified_target_li,
237 intermediate_end_of_epoch_li,
238 )
239 .map_err(|error| {
240 Error::UnexpectedError(format!(
241 "Execute and commit chunk failed: {}",
242 error
243 ))
244 })?;
245 timer.stop_and_record();
246 if let Err(e) = self.publish_on_chain_config_updates(reconfig_events) {
247 diem_error!(
248 LogSchema::event_log(LogEntry::Reconfig, LogEvent::Fail)
249 .error(&e),
250 "Failed to publish reconfig updates in execute_chunk"
251 );
252 counters::RECONFIG_PUBLISH_COUNT
253 .with_label_values(&[counters::FAIL_LABEL])
254 .inc();
255 }
256 Ok(())
257 }
258
259 fn get_chunk(
260 &self, known_version: u64, limit: u64, target_version: u64,
261 ) -> Result<TransactionListWithProof, Error> {
262 let starting_version =
263 known_version.checked_add(1).ok_or_else(|| {
264 Error::IntegerOverflow("Starting version has overflown!".into())
265 })?;
266 self.storage
267 .get_transactions(starting_version, limit, target_version, false)
268 .map_err(|error| {
269 Error::UnexpectedError(format!(
270 "Failed to get transactions from storage {}",
271 error
272 ))
273 })
274 }
275
276 fn get_epoch_change_ledger_info(
277 &self, epoch: u64,
278 ) -> Result<LedgerInfoWithSignatures, Error> {
279 let next_epoch = epoch.checked_add(1).ok_or_else(|| {
280 Error::IntegerOverflow("Next epoch has overflown!".into())
281 })?;
282 let mut epoch_ending_ledger_infos = self
283 .storage
284 .get_epoch_ending_ledger_infos(epoch, next_epoch)
285 .map_err(|error| Error::UnexpectedError(error.to_string()))?;
286
287 epoch_ending_ledger_infos
288 .ledger_info_with_sigs
289 .pop()
290 .ok_or_else(|| {
291 Error::UnexpectedError(format!(
292 "Missing epoch change ledger info for epoch: {:?}",
293 epoch
294 ))
295 })
296 }
297
298 fn get_epoch_ending_ledger_info(
299 &self, version: u64,
300 ) -> Result<LedgerInfoWithSignatures, Error> {
301 self.storage
302 .get_epoch_ending_ledger_info(version)
303 .map_err(|error| Error::UnexpectedError(error.to_string()))
304 }
305
306 fn get_version_timestamp(&self, version: u64) -> Result<u64, Error> {
307 self.storage
308 .get_block_timestamp(version)
309 .map_err(|error| Error::UnexpectedError(error.to_string()))
310 }
311
312 fn publish_on_chain_config_updates(
313 &mut self, events: Vec<ContractEvent>,
314 ) -> Result<(), Error> {
315 if events.is_empty() {
316 return Ok(());
317 }
318 diem_info!(LogSchema::new(LogEntry::Reconfig)
319 .count(events.len())
320 .reconfig_events(events.clone()));
321
322 let event_keys = events
323 .iter()
324 .map(|event| *event.key())
325 .collect::<HashSet<_>>();
326
327 // calculate deltas
328 let new_configs = OnChainConfigPayload::new(
329 1, /* not used */
330 Arc::new(
331 ON_CHAIN_CONFIG_REGISTRY
332 .iter()
333 .cloned()
334 .zip_eq(vec![events[0].event_data().to_vec()])
335 .collect(),
336 ),
337 );
338 diem_debug!("get {} configs", new_configs.configs().len());
339
340 let changed_configs = new_configs
341 .configs()
342 .iter()
343 .filter(|(id, cfg)| {
344 &self.on_chain_configs.configs().get(id).unwrap_or_else(|| {
345 panic!(
346 "Missing on-chain config value in local copy: {}",
347 id
348 )
349 }) != cfg
350 })
351 .map(|(id, _)| *id)
352 .collect::<HashSet<_>>();
353
354 // notify subscribers
355 let mut publish_success = true;
356 for subscription in self.reconfig_subscriptions.iter_mut() {
357 // publish updates if *any* of the subscribed configs changed
358 // or any of the subscribed events were emitted
359 let subscribed_items = subscription.subscribed_items();
360 if !changed_configs.is_disjoint(&subscribed_items.configs)
361 || !event_keys.is_disjoint(&subscribed_items.events)
362 {
363 diem_debug!("publish {} configs", new_configs.configs().len());
364 if let Err(e) = subscription.publish(new_configs.clone()) {
365 publish_success = false;
366 diem_error!(
367 LogSchema::event_log(LogEntry::Reconfig, LogEvent::PublishError)
368 .subscription_name(subscription.name.clone())
369 .error(&Error::UnexpectedError(e.to_string())),
370 "Failed to publish reconfig notification to subscription {}",
371 subscription.name
372 );
373 } else {
374 diem_info!(
375 LogSchema::event_log(LogEntry::Reconfig, LogEvent::Success)
376 .subscription_name(subscription.name.clone()),
377 "Successfully published reconfig notification to subscription {}",
378 subscription.name
379 );
380 }
381 }
382 }
383
384 self.on_chain_configs = new_configs;
385 if publish_success {
386 counters::RECONFIG_PUBLISH_COUNT
387 .with_label_values(&[counters::SUCCESS_LABEL])
388 .inc();
389 Ok(())
390 } else {
391 Err(Error::UnexpectedError(
392 "Failed to publish at least one subscription!".into(),
393 ))
394 }
395 }
396}
397
398/*
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use channel::diem_channel::Receiver;
403 use diem_crypto::{ed25519::*, PrivateKey, Uniform};
404 use diem_transaction_builder::stdlib::{
405 encode_peer_to_peer_with_metadata_script,
406 encode_set_validator_config_and_reconfigure_script,
407 encode_update_diem_version_script,
408 };
409 use diem_types::{
410 account_address::AccountAddress,
411 account_config::{diem_root_address, xus_tag},
412 block_metadata::BlockMetadata,
413 contract_event::ContractEvent,
414 ledger_info::LedgerInfoWithSignatures,
415 on_chain_config::{
416 DiemVersion, OnChainConfig, OnChainConfigPayload, VMConfig,
417 ValidatorSet,
418 },
419 transaction::{Transaction, WriteSetPayload},
420 };
421 use diem_vm::DiemVM;
422 use pos-ledger-db::DiemDB;
423 use executor::Executor;
424 use executor_test_helpers::{
425 bootstrap_genesis, gen_block_id, gen_ledger_info_with_sigs,
426 get_test_signed_transaction,
427 };
428 use executor_types::BlockExecutor;
429 use futures::{future::FutureExt, stream::StreamExt};
430 use storage_interface::DbReaderWriter;
431 use subscription_service::ReconfigSubscription;
432 use vm_genesis::Validator;
433
434 // TODO(joshlind): add unit tests for general executor proxy behaviour!
435 // TODO(joshlind): add unit tests for subscription events.. seems like these
436 // are missing?
437
438 #[test]
439 fn test_pub_sub_different_subscription() {
440 let (subscription, mut reconfig_receiver) =
441 ReconfigSubscription::subscribe_all(
442 "",
443 vec![VMConfig::CONFIG_ID],
444 vec![],
445 );
446 let (validators, mut block_executor, mut executor_proxy) =
447 bootstrap_genesis_and_set_subscription(
448 subscription,
449 &mut reconfig_receiver,
450 );
451
452 // Create a dummy prologue transaction that will bump the timer, and
453 // update the validator set
454 let validator_account = validators[0].owner_address;
455 let dummy_txn = create_dummy_transaction(1, validator_account);
456 let reconfig_txn = create_new_update_diem_version_transaction(1);
457
458 // Execute and commit the block
459 let block = vec![dummy_txn, reconfig_txn];
460 let (reconfig_events, _) =
461 execute_and_commit_block(&mut block_executor, block, 1);
462
463 // Publish the on chain config updates
464 executor_proxy
465 .publish_on_chain_config_updates(reconfig_events)
466 .unwrap();
467
468 // Verify no reconfig notification is sent (we only subscribed to
469 // VMConfig)
470 assert!(reconfig_receiver
471 .select_next_some()
472 .now_or_never()
473 .is_none());
474 }
475
476 #[test]
477 fn test_pub_sub_drop_receiver() {
478 let (subscription, mut reconfig_receiver) =
479 ReconfigSubscription::subscribe_all(
480 "",
481 vec![DiemVersion::CONFIG_ID],
482 vec![],
483 );
484 let (validators, mut block_executor, mut executor_proxy) =
485 bootstrap_genesis_and_set_subscription(
486 subscription,
487 &mut reconfig_receiver,
488 );
489
490 // Create a dummy prologue transaction that will bump the timer, and
491 // update the Diem version
492 let validator_account = validators[0].owner_address;
493 let dummy_txn = create_dummy_transaction(1, validator_account);
494 let reconfig_txn = create_new_update_diem_version_transaction(1);
495
496 // Execute and commit the reconfig block
497 let block = vec![dummy_txn, reconfig_txn];
498 let (reconfig_events, _) =
499 execute_and_commit_block(&mut block_executor, block, 1);
500
501 // Drop the reconfig receiver
502 drop(reconfig_receiver);
503
504 // Verify publishing on-chain config updates fails due to dropped
505 // receiver
506 assert!(executor_proxy
507 .publish_on_chain_config_updates(reconfig_events)
508 .is_err());
509 }
510
511 #[test]
512 fn test_pub_sub_multiple_subscriptions() {
513 let (subscription, mut reconfig_receiver) =
514 ReconfigSubscription::subscribe_all(
515 "",
516 vec![ValidatorSet::CONFIG_ID, DiemVersion::CONFIG_ID],
517 vec![],
518 );
519 let (validators, mut block_executor, mut executor_proxy) =
520 bootstrap_genesis_and_set_subscription(
521 subscription,
522 &mut reconfig_receiver,
523 );
524
525 // Create a dummy prologue transaction that will bump the timer, and
526 // update the Diem version
527 let validator_account = validators[0].owner_address;
528 let dummy_txn = create_dummy_transaction(1, validator_account);
529 let reconfig_txn = create_new_update_diem_version_transaction(1);
530
531 // Give the validator some money so it can send a rotation tx and rotate
532 // the validator's consensus key.
533 let money_txn =
534 create_transfer_to_validator_transaction(validator_account, 2);
535 let rotation_txn =
536 create_consensus_key_rotation_transaction(&validators[0], 0);
537
538 // Execute and commit the reconfig block
539 let block = vec![dummy_txn, reconfig_txn, money_txn, rotation_txn];
540 let (reconfig_events, _) =
541 execute_and_commit_block(&mut block_executor, block, 1);
542
543 // Publish the on chain config updates
544 executor_proxy
545 .publish_on_chain_config_updates(reconfig_events)
546 .unwrap();
547
548 // Verify reconfig notification is sent
549 assert!(reconfig_receiver
550 .select_next_some()
551 .now_or_never()
552 .is_some());
553 }
554
555 #[test]
556 fn test_pub_sub_no_reconfig_events() {
557 let (subscription, mut reconfig_receiver) =
558 ReconfigSubscription::subscribe_all(
559 "",
560 vec![DiemVersion::CONFIG_ID],
561 vec![],
562 );
563 let (_, _, mut executor_proxy) = bootstrap_genesis_and_set_subscription(
564 subscription,
565 &mut reconfig_receiver,
566 );
567
568 // Publish no on chain config updates
569 executor_proxy
570 .publish_on_chain_config_updates(vec![])
571 .unwrap();
572
573 // Verify no reconfig notification is sent
574 assert!(reconfig_receiver
575 .select_next_some()
576 .now_or_never()
577 .is_none());
578 }
579
580 #[test]
581 fn test_pub_sub_no_subscriptions() {
582 let (subscription, mut reconfig_receiver) =
583 ReconfigSubscription::subscribe_all("", vec![], vec![]);
584 let (validators, mut block_executor, mut executor_proxy) =
585 bootstrap_genesis_and_set_subscription(
586 subscription,
587 &mut reconfig_receiver,
588 );
589
590 // Create a dummy prologue transaction that will bump the timer, and
591 // update the Diem version
592 let validator_account = validators[0].owner_address;
593 let dummy_txn = create_dummy_transaction(1, validator_account);
594 let reconfig_txn = create_new_update_diem_version_transaction(1);
595
596 // Execute and commit the reconfig block
597 let block = vec![dummy_txn, reconfig_txn];
598 let (reconfig_events, _) =
599 execute_and_commit_block(&mut block_executor, block, 1);
600
601 // Publish the on chain config updates
602 executor_proxy
603 .publish_on_chain_config_updates(reconfig_events)
604 .unwrap();
605
606 // Verify no reconfig notification is sent
607 assert!(reconfig_receiver
608 .select_next_some()
609 .now_or_never()
610 .is_none());
611 }
612
613 #[test]
614 fn test_pub_sub_diem_version() {
615 let (subscription, mut reconfig_receiver) =
616 ReconfigSubscription::subscribe_all(
617 "",
618 vec![DiemVersion::CONFIG_ID],
619 vec![],
620 );
621 let (validators, mut block_executor, mut executor_proxy) =
622 bootstrap_genesis_and_set_subscription(
623 subscription,
624 &mut reconfig_receiver,
625 );
626
627 // Create a dummy prologue transaction that will bump the timer, and
628 // update the Diem version
629 let validator_account = validators[0].owner_address;
630 let dummy_txn = create_dummy_transaction(1, validator_account);
631 let allowlist_txn = create_new_update_diem_version_transaction(1);
632
633 // Execute and commit the reconfig block
634 let block = vec![dummy_txn, allowlist_txn];
635 let (reconfig_events, _) =
636 execute_and_commit_block(&mut block_executor, block, 1);
637
638 // Publish the on chain config updates
639 executor_proxy
640 .publish_on_chain_config_updates(reconfig_events)
641 .unwrap();
642
643 // Verify the correct reconfig notification is sent
644 let payload =
645 reconfig_receiver.select_next_some().now_or_never().unwrap();
646 let received_config = payload.get::<DiemVersion>().unwrap();
647 assert_eq!(received_config, DiemVersion { major: 7 });
648 }
649
650 #[test]
651 fn test_pub_sub_with_executor_proxy() {
652 let (subscription, mut reconfig_receiver) =
653 ReconfigSubscription::subscribe_all(
654 "",
655 vec![ValidatorSet::CONFIG_ID, DiemVersion::CONFIG_ID],
656 vec![],
657 );
658 let (validators, mut block_executor, mut executor_proxy) =
659 bootstrap_genesis_and_set_subscription(
660 subscription,
661 &mut reconfig_receiver,
662 );
663
664 // Create a dummy prologue transaction that will bump the timer and
665 // update the Diem version
666 let validator_account = validators[0].owner_address;
667 let dummy_txn_1 = create_dummy_transaction(1, validator_account);
668 let reconfig_txn = create_new_update_diem_version_transaction(1);
669
670 // Execute and commit the reconfig block
671 let block = vec![dummy_txn_1.clone(), reconfig_txn.clone()];
672 let (_, ledger_info_epoch_1) =
673 execute_and_commit_block(&mut block_executor, block, 1);
674
675 // Give the validator some money so it can send a rotation tx, create
676 // another dummy prologue to bump the timer and rotate the
677 // validator's consensus key.
678 let money_txn =
679 create_transfer_to_validator_transaction(validator_account, 2);
680 let dummy_txn_2 = create_dummy_transaction(2, validator_account);
681 let rotation_txn =
682 create_consensus_key_rotation_transaction(&validators[0], 0);
683
684 // Execute and commit the reconfig block
685 let block =
686 vec![money_txn.clone(), dummy_txn_2.clone(), rotation_txn.clone()];
687 let (_, ledger_info_epoch_2) =
688 execute_and_commit_block(&mut block_executor, block, 2);
689
690 // Grab the first two executed transactions and verify responses
691 let txns = executor_proxy.get_chunk(0, 2, 2).unwrap();
692 assert_eq!(txns.transactions, vec![dummy_txn_1, reconfig_txn]);
693 assert!(executor_proxy
694 .execute_chunk(txns, ledger_info_epoch_1.clone(), None)
695 .is_ok());
696 assert_eq!(
697 ledger_info_epoch_1,
698 executor_proxy.get_epoch_change_ledger_info(1).unwrap()
699 );
700 assert_eq!(
701 ledger_info_epoch_1,
702 executor_proxy.get_epoch_ending_ledger_info(2).unwrap()
703 );
704
705 // Grab the next two executed transactions (forced by limit) and verify
706 // responses
707 let txns = executor_proxy.get_chunk(2, 2, 5).unwrap();
708 assert_eq!(txns.transactions, vec![money_txn, dummy_txn_2]);
709 executor_proxy.get_epoch_ending_ledger_info(4).unwrap_err();
710
711 // Grab the last transaction and verify responses
712 let txns = executor_proxy.get_chunk(4, 1, 5).unwrap();
713 assert_eq!(txns.transactions, vec![rotation_txn]);
714 assert!(executor_proxy
715 .execute_chunk(txns, ledger_info_epoch_2.clone(), None)
716 .is_ok());
717 assert_eq!(
718 ledger_info_epoch_2,
719 executor_proxy.get_epoch_change_ledger_info(2).unwrap()
720 );
721 assert_eq!(
722 ledger_info_epoch_2,
723 executor_proxy.get_epoch_ending_ledger_info(5).unwrap()
724 );
725 }
726
727 #[test]
728 fn test_pub_sub_with_executor_sync_state() {
729 let (subscription, mut reconfig_receiver) =
730 ReconfigSubscription::subscribe_all(
731 "",
732 vec![ValidatorSet::CONFIG_ID, DiemVersion::CONFIG_ID],
733 vec![],
734 );
735 let (validators, mut block_executor, executor_proxy) =
736 bootstrap_genesis_and_set_subscription(
737 subscription,
738 &mut reconfig_receiver,
739 );
740
741 // Create a dummy prologue transaction that will bump the timer and
742 // update the Diem version
743 let validator_account = validators[0].owner_address;
744 let dummy_txn = create_dummy_transaction(1, validator_account);
745 let reconfig_txn = create_new_update_diem_version_transaction(1);
746
747 // Execute and commit the reconfig block
748 let block = vec![dummy_txn, reconfig_txn];
749 let _ = execute_and_commit_block(&mut block_executor, block, 1);
750
751 // Verify executor proxy sync state
752 let sync_state = executor_proxy.get_local_storage_state().unwrap();
753 assert_eq!(sync_state.trusted_epoch(), 2); // 1 reconfiguration has occurred, trusted = next
754 assert_eq!(sync_state.committed_version(), 2); // 2 transactions have committed
755 assert_eq!(sync_state.synced_version(), 2); // 2 transactions have synced
756
757 // Give the validator some money so it can send a rotation tx, create
758 // another dummy prologue to bump the timer and rotate the
759 // validator's consensus key.
760 let money_txn =
761 create_transfer_to_validator_transaction(validator_account, 2);
762 let dummy_txn = create_dummy_transaction(2, validator_account);
763 let rotation_txn =
764 create_consensus_key_rotation_transaction(&validators[0], 0);
765
766 // Execute and commit the reconfig block
767 let block = vec![money_txn, dummy_txn, rotation_txn];
768 let _ = execute_and_commit_block(&mut block_executor, block, 2);
769
770 // Verify executor proxy sync state
771 let sync_state = executor_proxy.get_local_storage_state().unwrap();
772 assert_eq!(sync_state.trusted_epoch(), 3); // 2 reconfigurations have occurred, trusted = next
773 assert_eq!(sync_state.committed_version(), 5); // 5 transactions have committed
774 assert_eq!(sync_state.synced_version(), 5); // 5 transactions have synced
775 }
776
777 /// Executes a genesis transaction, creates the executor proxy and sets the
778 /// given reconfig subscription.
779 fn bootstrap_genesis_and_set_subscription(
780 subscription: ReconfigSubscription,
781 reconfig_receiver: &mut Receiver<(), OnChainConfigPayload>,
782 ) -> (Vec<Validator>, Box<Executor<DiemVM>>, ExecutorProxy)
783 {
784 // Generate a genesis change set
785 let (genesis, validators) =
786 vm_genesis::test_genesis_change_set_and_validators(Some(1));
787
788 // Create test diem database
789 let db_path = diem_temppath::TempPath::new();
790 db_path.create_as_dir().unwrap();
791 let (db, db_rw) =
792 DbReaderWriter::wrap(DiemDB::new_for_test(db_path.path()));
793
794 // Boostrap the genesis transaction
795 let genesis_txn =
796 Transaction::GenesisTransaction(WriteSetPayload::Direct(genesis));
797 bootstrap_genesis::<DiemVM>(&db_rw, &genesis_txn).unwrap();
798
799 // Create executor proxy with given subscription
800 let block_executor = Box::new(Executor::<DiemVM>::new(db_rw.clone()));
801 let chunk_executor = Box::new(Executor::<DiemVM>::new(db_rw));
802 let executor_proxy =
803 ExecutorProxy::new(db, chunk_executor, vec![subscription]);
804
805 // Verify initial reconfiguration notification is sent
806 assert!(
807 reconfig_receiver
808 .select_next_some()
809 .now_or_never()
810 .is_some(),
811 "Expected an initial reconfig notification on executor proxy creation!",
812 );
813
814 (validators, block_executor, executor_proxy)
815 }
816
817 /// Creates a transaction that rotates the consensus key of the given
818 /// validator account.
819 fn create_consensus_key_rotation_transaction(
820 validator: &Validator, sequence_number: u64,
821 ) -> Transaction {
822 let operator_key = validator.key.clone();
823 let operator_public_key = operator_key.public_key();
824 let operator_account = validator.operator_address;
825 let new_consensus_key =
826 Ed25519PrivateKey::generate_for_testing().public_key();
827
828 get_test_signed_transaction(
829 operator_account,
830 sequence_number,
831 operator_key,
832 operator_public_key,
833 Some(encode_set_validator_config_and_reconfigure_script(
834 validator.owner_address,
835 new_consensus_key.to_bytes().to_vec(),
836 Vec::new(),
837 Vec::new(),
838 )),
839 )
840 }
841
842 /// Creates a dummy transaction (useful for bumping the timer).
843 fn create_dummy_transaction(
844 index: u8, validator_account: AccountAddress,
845 ) -> Transaction {
846 Transaction::BlockMetadata(BlockMetadata::new(
847 gen_block_id(index),
848 index as u64,
849 (index as u64 + 1) * 100000010,
850 vec![],
851 validator_account,
852 ))
853 }
854
855 /// Creates a transaction that creates a reconfiguration event by changing
856 /// the Diem version
857 fn create_new_update_diem_version_transaction(
858 sequence_number: u64,
859 ) -> Transaction {
860 let genesis_key = vm_genesis::GENESIS_KEYPAIR.0.clone();
861 get_test_signed_transaction(
862 diem_root_address(),
863 sequence_number,
864 genesis_key.clone(),
865 genesis_key.public_key(),
866 Some(encode_update_diem_version_script(
867 0, 7, // version
868 )),
869 )
870 }
871
872 /// Creates a transaction that sends funds to the specified validator
873 /// account.
874 fn create_transfer_to_validator_transaction(
875 validator_account: AccountAddress, sequence_number: u64,
876 ) -> Transaction {
877 let genesis_key = vm_genesis::GENESIS_KEYPAIR.0.clone();
878 get_test_signed_transaction(
879 diem_root_address(),
880 sequence_number,
881 genesis_key.clone(),
882 genesis_key.public_key(),
883 Some(encode_peer_to_peer_with_metadata_script(
884 xus_tag(),
885 validator_account,
886 1_000_000,
887 vec![],
888 vec![],
889 )),
890 )
891 }
892
893 /// Executes and commits a given block that will cause a reconfiguration
894 /// event.
895 fn execute_and_commit_block(
896 block_executor: &mut Box<Executor<DiemVM>>, block: Vec<Transaction>,
897 block_id: u8,
898 ) -> (Vec<ContractEvent>, LedgerInfoWithSignatures)
899 {
900 let block_hash = gen_block_id(block_id);
901
902 // Execute block
903 let output = block_executor
904 .execute_block(
905 (block_hash, block),
906 block_executor.committed_block_id(),
907 )
908 .expect("Failed to execute block!");
909 assert!(
910 output.has_reconfiguration(),
911 "Block execution is missing a reconfiguration!"
912 );
913
914 // Commit block
915 let ledger_info_with_sigs = gen_ledger_info_with_sigs(
916 block_id.into(),
917 output,
918 block_hash,
919 vec![],
920 );
921 let (_, reconfig_events) = block_executor
922 .commit_blocks(vec![block_hash], ledger_info_with_sigs.clone())
923 .unwrap();
924 assert!(
925 !reconfig_events.is_empty(),
926 "Expected reconfig events from block commit!"
927 );
928
929 (reconfig_events, ledger_info_with_sigs)
930 }
931}
932*/