cfxcore/pos/state_sync/
executor_proxy.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8use crate::pos::state_sync::{
9    counters,
10    error::Error,
11    logging::{LogEntry, LogEvent, LogSchema},
12    shared_components::SyncState,
13};
14use diem_logger::prelude::*;
15use diem_types::{
16    contract_event::ContractEvent,
17    ledger_info::LedgerInfoWithSignatures,
18    move_resource::MoveStorage,
19    on_chain_config::{OnChainConfigPayload, ON_CHAIN_CONFIG_REGISTRY},
20    transaction::TransactionListWithProof,
21};
22use executor_types::{ChunkExecutor, ExecutedTrees};
23use itertools::Itertools;
24use std::{collections::HashSet, sync::Arc};
25use storage_interface::DbReader;
26use subscription_service::ReconfigSubscription;
27
28/// Proxies interactions with execution and storage for state synchronization
29pub trait ExecutorProxyTrait: Send {
30    /// Sync the local state with the latest in storage.
31    fn get_local_storage_state(&self) -> Result<SyncState, Error>;
32
33    /// Execute and commit a batch of transactions
34    fn execute_chunk(
35        &mut self, txn_list_with_proof: TransactionListWithProof,
36        verified_target_li: LedgerInfoWithSignatures,
37        intermediate_end_of_epoch_li: Option<LedgerInfoWithSignatures>,
38    ) -> Result<(), Error>;
39
40    /// Gets chunk of transactions given the known version, target version and
41    /// the max limit.
42    fn get_chunk(
43        &self, known_version: u64, limit: u64, target_version: u64,
44    ) -> Result<TransactionListWithProof, Error>;
45
46    /// Get the epoch changing ledger info for the given epoch so that we can
47    /// move to next epoch.
48    fn get_epoch_change_ledger_info(
49        &self, epoch: u64,
50    ) -> Result<LedgerInfoWithSignatures, Error>;
51
52    /// Get ledger info at an epoch boundary version.
53    fn get_epoch_ending_ledger_info(
54        &self, version: u64,
55    ) -> Result<LedgerInfoWithSignatures, Error>;
56
57    /// Returns the ledger's timestamp for the given version in microseconds
58    fn get_version_timestamp(&self, version: u64) -> Result<u64, Error>;
59
60    /// publishes on-chain config updates to subscribed components
61    fn publish_on_chain_config_updates(
62        &mut self, events: Vec<ContractEvent>,
63    ) -> Result<(), Error>;
64}
65
66pub(crate) struct ExecutorProxy {
67    storage: Arc<dyn DbReader>,
68    executor: Box<dyn ChunkExecutor>,
69    reconfig_subscriptions: Vec<ReconfigSubscription>,
70    on_chain_configs: OnChainConfigPayload,
71}
72
73impl ExecutorProxy {
74    pub(crate) fn new(
75        storage: Arc<dyn DbReader>, executor: Box<dyn ChunkExecutor>,
76        mut reconfig_subscriptions: Vec<ReconfigSubscription>,
77    ) -> Self {
78        // TODO(lpl): Double check the `None` case here.
79        let on_chain_configs = if let Ok(Some(startup_info)) =
80            storage.get_startup_info(false)
81        {
82            let epoch_state = startup_info.latest_epoch_state.or(startup_info
83                .latest_ledger_info
84                .ledger_info()
85                .next_epoch_state()
86                .cloned());
87            if let Some(epoch_state) = epoch_state {
88                OnChainConfigPayload::new(
89                    epoch_state.epoch,
90                    Arc::new(
91                        ON_CHAIN_CONFIG_REGISTRY
92                            .iter()
93                            .cloned()
94                            .zip_eq(vec![bcs::to_bytes(&epoch_state).unwrap()])
95                            .collect(),
96                    ),
97                )
98            } else {
99                Self::fetch_all_configs(&*storage).expect(
100                    "[state sync] Failed initial read of on-chain configs",
101                )
102            }
103        } else {
104            Self::fetch_all_configs(&*storage)
105                .expect("[state sync] Failed initial read of on-chain configs")
106        };
107
108        for subscription in reconfig_subscriptions.iter_mut() {
109            subscription.publish(on_chain_configs.clone()).expect(
110                "[state sync] Failed to publish initial on-chain config",
111            );
112        }
113        Self {
114            storage,
115            executor,
116            reconfig_subscriptions,
117            on_chain_configs,
118        }
119    }
120
121    fn fetch_all_configs(
122        storage: &dyn DbReader,
123    ) -> Result<OnChainConfigPayload, Error> {
124        let access_paths = ON_CHAIN_CONFIG_REGISTRY
125            .iter()
126            .map(|config_id| config_id.access_path())
127            .collect();
128        let configs = storage
129            .batch_fetch_resources_by_version(access_paths, 0)
130            .map_err(|error| {
131                Error::UnexpectedError(format!(
132                    "Failed batch fetch of resources: {}",
133                    error
134                ))
135            })?;
136        // TODO(linxi): get correct epoch
137        // let synced_version =
138        //     storage.fetch_synced_version().map_err(|error| {
139        //         Error::UnexpectedError(format!(
140        //             "Failed to fetch storage synced version: {}",
141        //             error
142        //         ))
143        //     })?;
144        //
145        // let account_state_blob = storage
146        //     .get_account_state_with_proof_by_version(
147        //         config_address(),
148        //         synced_version,
149        //     )
150        //     .map_err(|error| {
151        //         Error::UnexpectedError(format!(
152        //             "Failed to fetch account state with proof {}",
153        //             error
154        //         ))
155        //     })?
156        //     .0;
157        /*let epoch = account_state_blob
158        .map(|blob| {
159            AccountState::try_from(&blob).and_then(|state| {
160                Ok(state
161                    .get_configuration_resource()?
162                    .ok_or_else(|| {
163                        Error::UnexpectedError(
164                            "Configuration resource does not exist".into(),
165                        )
166                    })?
167                    .epoch())
168            })
169        })
170        .ok_or_else(|| {
171            Error::UnexpectedError("Missing account state blob".into())
172        })?
173        .map_err(|error| {
174            Error::UnexpectedError(format!(
175                "Failed to fetch configuration resource: {}",
176                error
177            ))
178        })?;*/
179
180        Ok(OnChainConfigPayload::new(
181            1, /* The epoch number after executing genesis block */
182            Arc::new(
183                ON_CHAIN_CONFIG_REGISTRY
184                    .iter()
185                    .cloned()
186                    .zip_eq(configs)
187                    .collect(),
188            ),
189        ))
190    }
191}
192
193impl ExecutorProxyTrait for ExecutorProxy {
194    fn get_local_storage_state(&self) -> Result<SyncState, Error> {
195        let storage_info =
196            self.storage.get_startup_info(false).map_err(|error| {
197                Error::UnexpectedError(format!(
198                    "Failed to get startup info from storage: {}",
199                    error
200                ))
201            })?;
202        let storage_info = storage_info.ok_or_else(|| {
203            Error::UnexpectedError("Missing startup info from storage".into())
204        })?;
205        let current_epoch_state = storage_info.get_epoch_state().clone();
206
207        let synced_trees =
208            if let Some(synced_tree_state) = storage_info.synced_tree_state {
209                // TODO(lpl): synced_tree_state.pos_state is left unhandled.
210                ExecutedTrees::from(synced_tree_state)
211            } else {
212                ExecutedTrees::new_with_pos_state(
213                    storage_info.committed_tree_state,
214                    storage_info.committed_pos_state,
215                )
216            };
217
218        Ok(SyncState::new(
219            storage_info.latest_ledger_info,
220            synced_trees,
221            current_epoch_state,
222        ))
223    }
224
225    fn execute_chunk(
226        &mut self, txn_list_with_proof: TransactionListWithProof,
227        verified_target_li: LedgerInfoWithSignatures,
228        intermediate_end_of_epoch_li: Option<LedgerInfoWithSignatures>,
229    ) -> Result<(), Error> {
230        // track chunk execution time
231        let timer = counters::EXECUTE_CHUNK_DURATION.start_timer();
232        let reconfig_events = self
233            .executor
234            .execute_and_commit_chunk(
235                txn_list_with_proof,
236                verified_target_li,
237                intermediate_end_of_epoch_li,
238            )
239            .map_err(|error| {
240                Error::UnexpectedError(format!(
241                    "Execute and commit chunk failed: {}",
242                    error
243                ))
244            })?;
245        timer.stop_and_record();
246        if let Err(e) = self.publish_on_chain_config_updates(reconfig_events) {
247            diem_error!(
248                LogSchema::event_log(LogEntry::Reconfig, LogEvent::Fail)
249                    .error(&e),
250                "Failed to publish reconfig updates in execute_chunk"
251            );
252            counters::RECONFIG_PUBLISH_COUNT
253                .with_label_values(&[counters::FAIL_LABEL])
254                .inc();
255        }
256        Ok(())
257    }
258
259    fn get_chunk(
260        &self, known_version: u64, limit: u64, target_version: u64,
261    ) -> Result<TransactionListWithProof, Error> {
262        let starting_version =
263            known_version.checked_add(1).ok_or_else(|| {
264                Error::IntegerOverflow("Starting version has overflown!".into())
265            })?;
266        self.storage
267            .get_transactions(starting_version, limit, target_version, false)
268            .map_err(|error| {
269                Error::UnexpectedError(format!(
270                    "Failed to get transactions from storage {}",
271                    error
272                ))
273            })
274    }
275
276    fn get_epoch_change_ledger_info(
277        &self, epoch: u64,
278    ) -> Result<LedgerInfoWithSignatures, Error> {
279        let next_epoch = epoch.checked_add(1).ok_or_else(|| {
280            Error::IntegerOverflow("Next epoch has overflown!".into())
281        })?;
282        let mut epoch_ending_ledger_infos = self
283            .storage
284            .get_epoch_ending_ledger_infos(epoch, next_epoch)
285            .map_err(|error| Error::UnexpectedError(error.to_string()))?;
286
287        epoch_ending_ledger_infos
288            .ledger_info_with_sigs
289            .pop()
290            .ok_or_else(|| {
291                Error::UnexpectedError(format!(
292                    "Missing epoch change ledger info for epoch: {:?}",
293                    epoch
294                ))
295            })
296    }
297
298    fn get_epoch_ending_ledger_info(
299        &self, version: u64,
300    ) -> Result<LedgerInfoWithSignatures, Error> {
301        self.storage
302            .get_epoch_ending_ledger_info(version)
303            .map_err(|error| Error::UnexpectedError(error.to_string()))
304    }
305
306    fn get_version_timestamp(&self, version: u64) -> Result<u64, Error> {
307        self.storage
308            .get_block_timestamp(version)
309            .map_err(|error| Error::UnexpectedError(error.to_string()))
310    }
311
312    fn publish_on_chain_config_updates(
313        &mut self, events: Vec<ContractEvent>,
314    ) -> Result<(), Error> {
315        if events.is_empty() {
316            return Ok(());
317        }
318        diem_info!(LogSchema::new(LogEntry::Reconfig)
319            .count(events.len())
320            .reconfig_events(events.clone()));
321
322        let event_keys = events
323            .iter()
324            .map(|event| *event.key())
325            .collect::<HashSet<_>>();
326
327        // calculate deltas
328        let new_configs = OnChainConfigPayload::new(
329            1, /* not used */
330            Arc::new(
331                ON_CHAIN_CONFIG_REGISTRY
332                    .iter()
333                    .cloned()
334                    .zip_eq(vec![events[0].event_data().to_vec()])
335                    .collect(),
336            ),
337        );
338        diem_debug!("get {} configs", new_configs.configs().len());
339
340        let changed_configs = new_configs
341            .configs()
342            .iter()
343            .filter(|(id, cfg)| {
344                &self.on_chain_configs.configs().get(id).unwrap_or_else(|| {
345                    panic!(
346                        "Missing on-chain config value in local copy: {}",
347                        id
348                    )
349                }) != cfg
350            })
351            .map(|(id, _)| *id)
352            .collect::<HashSet<_>>();
353
354        // notify subscribers
355        let mut publish_success = true;
356        for subscription in self.reconfig_subscriptions.iter_mut() {
357            // publish updates if *any* of the subscribed configs changed
358            // or any of the subscribed events were emitted
359            let subscribed_items = subscription.subscribed_items();
360            if !changed_configs.is_disjoint(&subscribed_items.configs)
361                || !event_keys.is_disjoint(&subscribed_items.events)
362            {
363                diem_debug!("publish {} configs", new_configs.configs().len());
364                if let Err(e) = subscription.publish(new_configs.clone()) {
365                    publish_success = false;
366                    diem_error!(
367                        LogSchema::event_log(LogEntry::Reconfig, LogEvent::PublishError)
368                            .subscription_name(subscription.name.clone())
369                            .error(&Error::UnexpectedError(e.to_string())),
370                        "Failed to publish reconfig notification to subscription {}",
371                        subscription.name
372                    );
373                } else {
374                    diem_info!(
375                        LogSchema::event_log(LogEntry::Reconfig, LogEvent::Success)
376                            .subscription_name(subscription.name.clone()),
377                        "Successfully published reconfig notification to subscription {}",
378                        subscription.name
379                    );
380                }
381            }
382        }
383
384        self.on_chain_configs = new_configs;
385        if publish_success {
386            counters::RECONFIG_PUBLISH_COUNT
387                .with_label_values(&[counters::SUCCESS_LABEL])
388                .inc();
389            Ok(())
390        } else {
391            Err(Error::UnexpectedError(
392                "Failed to publish at least one subscription!".into(),
393            ))
394        }
395    }
396}
397
398/*
399#[cfg(test)]
400mod tests {
401    use super::*;
402    use channel::diem_channel::Receiver;
403    use diem_crypto::{ed25519::*, PrivateKey, Uniform};
404    use diem_transaction_builder::stdlib::{
405        encode_peer_to_peer_with_metadata_script,
406        encode_set_validator_config_and_reconfigure_script,
407        encode_update_diem_version_script,
408    };
409    use diem_types::{
410        account_address::AccountAddress,
411        account_config::{diem_root_address, xus_tag},
412        block_metadata::BlockMetadata,
413        contract_event::ContractEvent,
414        ledger_info::LedgerInfoWithSignatures,
415        on_chain_config::{
416            DiemVersion, OnChainConfig, OnChainConfigPayload, VMConfig,
417            ValidatorSet,
418        },
419        transaction::{Transaction, WriteSetPayload},
420    };
421    use diem_vm::DiemVM;
422    use pos-ledger-db::DiemDB;
423    use executor::Executor;
424    use executor_test_helpers::{
425        bootstrap_genesis, gen_block_id, gen_ledger_info_with_sigs,
426        get_test_signed_transaction,
427    };
428    use executor_types::BlockExecutor;
429    use futures::{future::FutureExt, stream::StreamExt};
430    use storage_interface::DbReaderWriter;
431    use subscription_service::ReconfigSubscription;
432    use vm_genesis::Validator;
433
434    // TODO(joshlind): add unit tests for general executor proxy behaviour!
435    // TODO(joshlind): add unit tests for subscription events.. seems like these
436    // are missing?
437
438    #[test]
439    fn test_pub_sub_different_subscription() {
440        let (subscription, mut reconfig_receiver) =
441            ReconfigSubscription::subscribe_all(
442                "",
443                vec![VMConfig::CONFIG_ID],
444                vec![],
445            );
446        let (validators, mut block_executor, mut executor_proxy) =
447            bootstrap_genesis_and_set_subscription(
448                subscription,
449                &mut reconfig_receiver,
450            );
451
452        // Create a dummy prologue transaction that will bump the timer, and
453        // update the validator set
454        let validator_account = validators[0].owner_address;
455        let dummy_txn = create_dummy_transaction(1, validator_account);
456        let reconfig_txn = create_new_update_diem_version_transaction(1);
457
458        // Execute and commit the block
459        let block = vec![dummy_txn, reconfig_txn];
460        let (reconfig_events, _) =
461            execute_and_commit_block(&mut block_executor, block, 1);
462
463        // Publish the on chain config updates
464        executor_proxy
465            .publish_on_chain_config_updates(reconfig_events)
466            .unwrap();
467
468        // Verify no reconfig notification is sent (we only subscribed to
469        // VMConfig)
470        assert!(reconfig_receiver
471            .select_next_some()
472            .now_or_never()
473            .is_none());
474    }
475
476    #[test]
477    fn test_pub_sub_drop_receiver() {
478        let (subscription, mut reconfig_receiver) =
479            ReconfigSubscription::subscribe_all(
480                "",
481                vec![DiemVersion::CONFIG_ID],
482                vec![],
483            );
484        let (validators, mut block_executor, mut executor_proxy) =
485            bootstrap_genesis_and_set_subscription(
486                subscription,
487                &mut reconfig_receiver,
488            );
489
490        // Create a dummy prologue transaction that will bump the timer, and
491        // update the Diem version
492        let validator_account = validators[0].owner_address;
493        let dummy_txn = create_dummy_transaction(1, validator_account);
494        let reconfig_txn = create_new_update_diem_version_transaction(1);
495
496        // Execute and commit the reconfig block
497        let block = vec![dummy_txn, reconfig_txn];
498        let (reconfig_events, _) =
499            execute_and_commit_block(&mut block_executor, block, 1);
500
501        // Drop the reconfig receiver
502        drop(reconfig_receiver);
503
504        // Verify publishing on-chain config updates fails due to dropped
505        // receiver
506        assert!(executor_proxy
507            .publish_on_chain_config_updates(reconfig_events)
508            .is_err());
509    }
510
511    #[test]
512    fn test_pub_sub_multiple_subscriptions() {
513        let (subscription, mut reconfig_receiver) =
514            ReconfigSubscription::subscribe_all(
515                "",
516                vec![ValidatorSet::CONFIG_ID, DiemVersion::CONFIG_ID],
517                vec![],
518            );
519        let (validators, mut block_executor, mut executor_proxy) =
520            bootstrap_genesis_and_set_subscription(
521                subscription,
522                &mut reconfig_receiver,
523            );
524
525        // Create a dummy prologue transaction that will bump the timer, and
526        // update the Diem version
527        let validator_account = validators[0].owner_address;
528        let dummy_txn = create_dummy_transaction(1, validator_account);
529        let reconfig_txn = create_new_update_diem_version_transaction(1);
530
531        // Give the validator some money so it can send a rotation tx and rotate
532        // the validator's consensus key.
533        let money_txn =
534            create_transfer_to_validator_transaction(validator_account, 2);
535        let rotation_txn =
536            create_consensus_key_rotation_transaction(&validators[0], 0);
537
538        // Execute and commit the reconfig block
539        let block = vec![dummy_txn, reconfig_txn, money_txn, rotation_txn];
540        let (reconfig_events, _) =
541            execute_and_commit_block(&mut block_executor, block, 1);
542
543        // Publish the on chain config updates
544        executor_proxy
545            .publish_on_chain_config_updates(reconfig_events)
546            .unwrap();
547
548        // Verify reconfig notification is sent
549        assert!(reconfig_receiver
550            .select_next_some()
551            .now_or_never()
552            .is_some());
553    }
554
555    #[test]
556    fn test_pub_sub_no_reconfig_events() {
557        let (subscription, mut reconfig_receiver) =
558            ReconfigSubscription::subscribe_all(
559                "",
560                vec![DiemVersion::CONFIG_ID],
561                vec![],
562            );
563        let (_, _, mut executor_proxy) = bootstrap_genesis_and_set_subscription(
564            subscription,
565            &mut reconfig_receiver,
566        );
567
568        // Publish no on chain config updates
569        executor_proxy
570            .publish_on_chain_config_updates(vec![])
571            .unwrap();
572
573        // Verify no reconfig notification is sent
574        assert!(reconfig_receiver
575            .select_next_some()
576            .now_or_never()
577            .is_none());
578    }
579
580    #[test]
581    fn test_pub_sub_no_subscriptions() {
582        let (subscription, mut reconfig_receiver) =
583            ReconfigSubscription::subscribe_all("", vec![], vec![]);
584        let (validators, mut block_executor, mut executor_proxy) =
585            bootstrap_genesis_and_set_subscription(
586                subscription,
587                &mut reconfig_receiver,
588            );
589
590        // Create a dummy prologue transaction that will bump the timer, and
591        // update the Diem version
592        let validator_account = validators[0].owner_address;
593        let dummy_txn = create_dummy_transaction(1, validator_account);
594        let reconfig_txn = create_new_update_diem_version_transaction(1);
595
596        // Execute and commit the reconfig block
597        let block = vec![dummy_txn, reconfig_txn];
598        let (reconfig_events, _) =
599            execute_and_commit_block(&mut block_executor, block, 1);
600
601        // Publish the on chain config updates
602        executor_proxy
603            .publish_on_chain_config_updates(reconfig_events)
604            .unwrap();
605
606        // Verify no reconfig notification is sent
607        assert!(reconfig_receiver
608            .select_next_some()
609            .now_or_never()
610            .is_none());
611    }
612
613    #[test]
614    fn test_pub_sub_diem_version() {
615        let (subscription, mut reconfig_receiver) =
616            ReconfigSubscription::subscribe_all(
617                "",
618                vec![DiemVersion::CONFIG_ID],
619                vec![],
620            );
621        let (validators, mut block_executor, mut executor_proxy) =
622            bootstrap_genesis_and_set_subscription(
623                subscription,
624                &mut reconfig_receiver,
625            );
626
627        // Create a dummy prologue transaction that will bump the timer, and
628        // update the Diem version
629        let validator_account = validators[0].owner_address;
630        let dummy_txn = create_dummy_transaction(1, validator_account);
631        let allowlist_txn = create_new_update_diem_version_transaction(1);
632
633        // Execute and commit the reconfig block
634        let block = vec![dummy_txn, allowlist_txn];
635        let (reconfig_events, _) =
636            execute_and_commit_block(&mut block_executor, block, 1);
637
638        // Publish the on chain config updates
639        executor_proxy
640            .publish_on_chain_config_updates(reconfig_events)
641            .unwrap();
642
643        // Verify the correct reconfig notification is sent
644        let payload =
645            reconfig_receiver.select_next_some().now_or_never().unwrap();
646        let received_config = payload.get::<DiemVersion>().unwrap();
647        assert_eq!(received_config, DiemVersion { major: 7 });
648    }
649
650    #[test]
651    fn test_pub_sub_with_executor_proxy() {
652        let (subscription, mut reconfig_receiver) =
653            ReconfigSubscription::subscribe_all(
654                "",
655                vec![ValidatorSet::CONFIG_ID, DiemVersion::CONFIG_ID],
656                vec![],
657            );
658        let (validators, mut block_executor, mut executor_proxy) =
659            bootstrap_genesis_and_set_subscription(
660                subscription,
661                &mut reconfig_receiver,
662            );
663
664        // Create a dummy prologue transaction that will bump the timer and
665        // update the Diem version
666        let validator_account = validators[0].owner_address;
667        let dummy_txn_1 = create_dummy_transaction(1, validator_account);
668        let reconfig_txn = create_new_update_diem_version_transaction(1);
669
670        // Execute and commit the reconfig block
671        let block = vec![dummy_txn_1.clone(), reconfig_txn.clone()];
672        let (_, ledger_info_epoch_1) =
673            execute_and_commit_block(&mut block_executor, block, 1);
674
675        // Give the validator some money so it can send a rotation tx, create
676        // another dummy prologue to bump the timer and rotate the
677        // validator's consensus key.
678        let money_txn =
679            create_transfer_to_validator_transaction(validator_account, 2);
680        let dummy_txn_2 = create_dummy_transaction(2, validator_account);
681        let rotation_txn =
682            create_consensus_key_rotation_transaction(&validators[0], 0);
683
684        // Execute and commit the reconfig block
685        let block =
686            vec![money_txn.clone(), dummy_txn_2.clone(), rotation_txn.clone()];
687        let (_, ledger_info_epoch_2) =
688            execute_and_commit_block(&mut block_executor, block, 2);
689
690        // Grab the first two executed transactions and verify responses
691        let txns = executor_proxy.get_chunk(0, 2, 2).unwrap();
692        assert_eq!(txns.transactions, vec![dummy_txn_1, reconfig_txn]);
693        assert!(executor_proxy
694            .execute_chunk(txns, ledger_info_epoch_1.clone(), None)
695            .is_ok());
696        assert_eq!(
697            ledger_info_epoch_1,
698            executor_proxy.get_epoch_change_ledger_info(1).unwrap()
699        );
700        assert_eq!(
701            ledger_info_epoch_1,
702            executor_proxy.get_epoch_ending_ledger_info(2).unwrap()
703        );
704
705        // Grab the next two executed transactions (forced by limit) and verify
706        // responses
707        let txns = executor_proxy.get_chunk(2, 2, 5).unwrap();
708        assert_eq!(txns.transactions, vec![money_txn, dummy_txn_2]);
709        executor_proxy.get_epoch_ending_ledger_info(4).unwrap_err();
710
711        // Grab the last transaction and verify responses
712        let txns = executor_proxy.get_chunk(4, 1, 5).unwrap();
713        assert_eq!(txns.transactions, vec![rotation_txn]);
714        assert!(executor_proxy
715            .execute_chunk(txns, ledger_info_epoch_2.clone(), None)
716            .is_ok());
717        assert_eq!(
718            ledger_info_epoch_2,
719            executor_proxy.get_epoch_change_ledger_info(2).unwrap()
720        );
721        assert_eq!(
722            ledger_info_epoch_2,
723            executor_proxy.get_epoch_ending_ledger_info(5).unwrap()
724        );
725    }
726
727    #[test]
728    fn test_pub_sub_with_executor_sync_state() {
729        let (subscription, mut reconfig_receiver) =
730            ReconfigSubscription::subscribe_all(
731                "",
732                vec![ValidatorSet::CONFIG_ID, DiemVersion::CONFIG_ID],
733                vec![],
734            );
735        let (validators, mut block_executor, executor_proxy) =
736            bootstrap_genesis_and_set_subscription(
737                subscription,
738                &mut reconfig_receiver,
739            );
740
741        // Create a dummy prologue transaction that will bump the timer and
742        // update the Diem version
743        let validator_account = validators[0].owner_address;
744        let dummy_txn = create_dummy_transaction(1, validator_account);
745        let reconfig_txn = create_new_update_diem_version_transaction(1);
746
747        // Execute and commit the reconfig block
748        let block = vec![dummy_txn, reconfig_txn];
749        let _ = execute_and_commit_block(&mut block_executor, block, 1);
750
751        // Verify executor proxy sync state
752        let sync_state = executor_proxy.get_local_storage_state().unwrap();
753        assert_eq!(sync_state.trusted_epoch(), 2); // 1 reconfiguration has occurred, trusted = next
754        assert_eq!(sync_state.committed_version(), 2); // 2 transactions have committed
755        assert_eq!(sync_state.synced_version(), 2); // 2 transactions have synced
756
757        // Give the validator some money so it can send a rotation tx, create
758        // another dummy prologue to bump the timer and rotate the
759        // validator's consensus key.
760        let money_txn =
761            create_transfer_to_validator_transaction(validator_account, 2);
762        let dummy_txn = create_dummy_transaction(2, validator_account);
763        let rotation_txn =
764            create_consensus_key_rotation_transaction(&validators[0], 0);
765
766        // Execute and commit the reconfig block
767        let block = vec![money_txn, dummy_txn, rotation_txn];
768        let _ = execute_and_commit_block(&mut block_executor, block, 2);
769
770        // Verify executor proxy sync state
771        let sync_state = executor_proxy.get_local_storage_state().unwrap();
772        assert_eq!(sync_state.trusted_epoch(), 3); // 2 reconfigurations have occurred, trusted = next
773        assert_eq!(sync_state.committed_version(), 5); // 5 transactions have committed
774        assert_eq!(sync_state.synced_version(), 5); // 5 transactions have synced
775    }
776
777    /// Executes a genesis transaction, creates the executor proxy and sets the
778    /// given reconfig subscription.
779    fn bootstrap_genesis_and_set_subscription(
780        subscription: ReconfigSubscription,
781        reconfig_receiver: &mut Receiver<(), OnChainConfigPayload>,
782    ) -> (Vec<Validator>, Box<Executor<DiemVM>>, ExecutorProxy)
783    {
784        // Generate a genesis change set
785        let (genesis, validators) =
786            vm_genesis::test_genesis_change_set_and_validators(Some(1));
787
788        // Create test diem database
789        let db_path = diem_temppath::TempPath::new();
790        db_path.create_as_dir().unwrap();
791        let (db, db_rw) =
792            DbReaderWriter::wrap(DiemDB::new_for_test(db_path.path()));
793
794        // Boostrap the genesis transaction
795        let genesis_txn =
796            Transaction::GenesisTransaction(WriteSetPayload::Direct(genesis));
797        bootstrap_genesis::<DiemVM>(&db_rw, &genesis_txn).unwrap();
798
799        // Create executor proxy with given subscription
800        let block_executor = Box::new(Executor::<DiemVM>::new(db_rw.clone()));
801        let chunk_executor = Box::new(Executor::<DiemVM>::new(db_rw));
802        let executor_proxy =
803            ExecutorProxy::new(db, chunk_executor, vec![subscription]);
804
805        // Verify initial reconfiguration notification is sent
806        assert!(
807            reconfig_receiver
808                .select_next_some()
809                .now_or_never()
810                .is_some(),
811            "Expected an initial reconfig notification on executor proxy creation!",
812        );
813
814        (validators, block_executor, executor_proxy)
815    }
816
817    /// Creates a transaction that rotates the consensus key of the given
818    /// validator account.
819    fn create_consensus_key_rotation_transaction(
820        validator: &Validator, sequence_number: u64,
821    ) -> Transaction {
822        let operator_key = validator.key.clone();
823        let operator_public_key = operator_key.public_key();
824        let operator_account = validator.operator_address;
825        let new_consensus_key =
826            Ed25519PrivateKey::generate_for_testing().public_key();
827
828        get_test_signed_transaction(
829            operator_account,
830            sequence_number,
831            operator_key,
832            operator_public_key,
833            Some(encode_set_validator_config_and_reconfigure_script(
834                validator.owner_address,
835                new_consensus_key.to_bytes().to_vec(),
836                Vec::new(),
837                Vec::new(),
838            )),
839        )
840    }
841
842    /// Creates a dummy transaction (useful for bumping the timer).
843    fn create_dummy_transaction(
844        index: u8, validator_account: AccountAddress,
845    ) -> Transaction {
846        Transaction::BlockMetadata(BlockMetadata::new(
847            gen_block_id(index),
848            index as u64,
849            (index as u64 + 1) * 100000010,
850            vec![],
851            validator_account,
852        ))
853    }
854
855    /// Creates a transaction that creates a reconfiguration event by changing
856    /// the Diem version
857    fn create_new_update_diem_version_transaction(
858        sequence_number: u64,
859    ) -> Transaction {
860        let genesis_key = vm_genesis::GENESIS_KEYPAIR.0.clone();
861        get_test_signed_transaction(
862            diem_root_address(),
863            sequence_number,
864            genesis_key.clone(),
865            genesis_key.public_key(),
866            Some(encode_update_diem_version_script(
867                0, 7, // version
868            )),
869        )
870    }
871
872    /// Creates a transaction that sends funds to the specified validator
873    /// account.
874    fn create_transfer_to_validator_transaction(
875        validator_account: AccountAddress, sequence_number: u64,
876    ) -> Transaction {
877        let genesis_key = vm_genesis::GENESIS_KEYPAIR.0.clone();
878        get_test_signed_transaction(
879            diem_root_address(),
880            sequence_number,
881            genesis_key.clone(),
882            genesis_key.public_key(),
883            Some(encode_peer_to_peer_with_metadata_script(
884                xus_tag(),
885                validator_account,
886                1_000_000,
887                vec![],
888                vec![],
889            )),
890        )
891    }
892
893    /// Executes and commits a given block that will cause a reconfiguration
894    /// event.
895    fn execute_and_commit_block(
896        block_executor: &mut Box<Executor<DiemVM>>, block: Vec<Transaction>,
897        block_id: u8,
898    ) -> (Vec<ContractEvent>, LedgerInfoWithSignatures)
899    {
900        let block_hash = gen_block_id(block_id);
901
902        // Execute block
903        let output = block_executor
904            .execute_block(
905                (block_hash, block),
906                block_executor.committed_block_id(),
907            )
908            .expect("Failed to execute block!");
909        assert!(
910            output.has_reconfiguration(),
911            "Block execution is missing a reconfiguration!"
912        );
913
914        // Commit block
915        let ledger_info_with_sigs = gen_ledger_info_with_sigs(
916            block_id.into(),
917            output,
918            block_hash,
919            vec![],
920        );
921        let (_, reconfig_events) = block_executor
922            .commit_blocks(vec![block_hash], ledger_info_with_sigs.clone())
923            .unwrap();
924        assert!(
925            !reconfig_events.is_empty(),
926            "Expected reconfig events from block commit!"
927        );
928
929        (reconfig_events, ledger_info_with_sigs)
930    }
931}
932*/