safety_rules/
safety_rules_manager.rs1use crate::{
9 local_client::LocalClient,
10 persistent_safety_storage::PersistentSafetyStorage,
11 process::ProcessService,
12 remote_service::RemoteService,
13 serializer::{SerializerClient, SerializerService},
14 thread::ThreadService,
15 SafetyRules, TSafetyRules,
16};
17use diem_config::config::{SafetyRulesConfig, SafetyRulesService};
18use diem_infallible::RwLock;
19use diem_logger::prelude::*;
20use diem_secure_storage::{KVStorage, Storage};
21use diem_types::{
22 account_address::AccountAddress, validator_config::ConsensusVRFPrivateKey,
23};
24use std::{convert::TryInto, net::SocketAddr, sync::Arc};
25
26pub fn storage(config: &SafetyRulesConfig) -> PersistentSafetyStorage {
27 let backend = &config.backend;
28 let internal_storage: Storage =
29 backend.try_into().expect("Unable to initialize storage");
30 if let Err(error) = internal_storage.available() {
31 panic!("Storage is not available: {:?}", error);
32 }
33
34 if let Some(test_config) = &config.test {
35 let author = test_config.author;
36 let consensus_private_key = test_config
37 .consensus_key
38 .as_ref()
39 .expect("Missing consensus key in test config")
40 .private_key();
41 let waypoint = test_config.waypoint.expect("No waypoint in config");
42 diem_debug!(
43 "safety_rules_manager: backed={:?}, waypoint={:?}",
44 config.backend,
45 waypoint
46 );
47
48 PersistentSafetyStorage::initialize(
49 internal_storage,
50 author,
51 consensus_private_key,
52 waypoint,
53 config.enable_cached_safety_data,
54 )
55 } else {
56 panic!("Remote consensus key storage not supported!")
57 }
58}
59
60enum SafetyRulesWrapper {
61 Local(Arc<RwLock<SafetyRules>>),
62 Process(ProcessService),
63 Serializer(Arc<RwLock<SerializerService>>),
64 Thread(ThreadService),
65}
66
67pub struct SafetyRulesManager {
68 internal_safety_rules: SafetyRulesWrapper,
69}
70
71impl SafetyRulesManager {
72 pub fn new(config: &SafetyRulesConfig) -> Self {
73 if let SafetyRulesService::Process(conf) = &config.service {
74 return Self::new_process(
75 conf.server_address(),
76 config.network_timeout_ms,
77 );
78 }
79
80 let storage = storage(config);
81 let verify_vote_proposal_signature =
82 config.verify_vote_proposal_signature;
83 let export_consensus_key = config.export_consensus_key;
84 let author = config.test.as_ref().map(|c| c.author).unwrap_or_default();
85 match config.service {
86 SafetyRulesService::Local => Self::new_local(
87 storage,
88 verify_vote_proposal_signature,
89 export_consensus_key,
90 config.vrf_private_key.as_ref().map(|key| key.private_key()),
91 author,
92 ),
93 SafetyRulesService::Serializer => Self::new_serializer(
94 storage,
95 verify_vote_proposal_signature,
96 export_consensus_key,
97 author,
98 ),
99 SafetyRulesService::Thread => Self::new_thread(
100 storage,
101 verify_vote_proposal_signature,
102 export_consensus_key,
103 config.network_timeout_ms,
104 ),
105 _ => {
106 panic!("Unimplemented SafetyRulesService: {:?}", config.service)
107 }
108 }
109 }
110
111 pub fn new_local(
112 storage: PersistentSafetyStorage, verify_vote_proposal_signature: bool,
113 export_consensus_key: bool,
114 vrf_private_key: Option<ConsensusVRFPrivateKey>,
115 author: AccountAddress,
116 ) -> Self {
117 let safety_rules = SafetyRules::new(
118 storage,
119 verify_vote_proposal_signature,
120 export_consensus_key,
121 vrf_private_key,
122 author,
123 );
124 Self {
125 internal_safety_rules: SafetyRulesWrapper::Local(Arc::new(
126 RwLock::new(safety_rules),
127 )),
128 }
129 }
130
131 pub fn new_process(server_addr: SocketAddr, timeout_ms: u64) -> Self {
132 let process_service = ProcessService::new(server_addr, timeout_ms);
133 Self {
134 internal_safety_rules: SafetyRulesWrapper::Process(process_service),
135 }
136 }
137
138 pub fn new_serializer(
139 storage: PersistentSafetyStorage, verify_vote_proposal_signature: bool,
140 export_consensus_key: bool, author: AccountAddress,
141 ) -> Self {
142 let safety_rules = SafetyRules::new(
143 storage,
144 verify_vote_proposal_signature,
145 export_consensus_key,
146 None,
148 author,
149 );
150 let serializer_service = SerializerService::new(safety_rules);
151 Self {
152 internal_safety_rules: SafetyRulesWrapper::Serializer(Arc::new(
153 RwLock::new(serializer_service),
154 )),
155 }
156 }
157
158 pub fn new_thread(
159 storage: PersistentSafetyStorage, verify_vote_proposal_signature: bool,
160 export_consensus_key: bool, timeout_ms: u64,
161 ) -> Self {
162 let thread = ThreadService::new(
163 storage,
164 verify_vote_proposal_signature,
165 export_consensus_key,
166 timeout_ms,
167 );
168 Self {
169 internal_safety_rules: SafetyRulesWrapper::Thread(thread),
170 }
171 }
172
173 pub fn client(&self) -> Box<dyn TSafetyRules + Send + Sync> {
174 match &self.internal_safety_rules {
175 SafetyRulesWrapper::Local(safety_rules) => {
176 Box::new(LocalClient::new(safety_rules.clone()))
177 }
178 SafetyRulesWrapper::Process(process) => Box::new(process.client()),
179 SafetyRulesWrapper::Serializer(serializer_service) => {
180 Box::new(SerializerClient::new(serializer_service.clone()))
181 }
182 SafetyRulesWrapper::Thread(thread) => Box::new(thread.client()),
183 }
184 }
185}