1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

// Copyright 2021 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/

use std::{fmt, sync::Arc, time::Duration};

use serde::Serialize;

use consensus_types::{common::Round, sync_info::SyncInfo, vote::Vote};
use diem_logger::{prelude::*, Schema};
use diem_types::validator_verifier::ValidatorVerifier;

use crate::pos::consensus::{
    counters,
    pending_votes::{PendingVotes, VoteReceptionResult},
    util::time_service::{SendTask, TimeService},
};

/// A reason for starting a new round: introduced for monitoring / debug
/// purposes.
#[derive(Serialize, Eq, Debug, PartialEq)]
pub enum NewRoundReason {
    QCReady,
    Timeout,
}

impl fmt::Display for NewRoundReason {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            NewRoundReason::QCReady => write!(f, "QCReady"),
            NewRoundReason::Timeout => write!(f, "TCReady"),
        }
    }
}

/// NewRoundEvents produced by RoundState are guaranteed to be monotonically
/// increasing. NewRoundEvents are consumed by the rest of the system: they can
/// cause sending new proposals or voting for some proposals that wouldn't have
/// been voted otherwise. The duration is populated for debugging and testing
#[derive(Debug, PartialEq, Eq)]
pub struct NewRoundEvent {
    pub round: Round,
    pub reason: NewRoundReason,
    pub timeout: Duration,
}

impl fmt::Display for NewRoundEvent {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(
            f,
            "NewRoundEvent: [round: {}, reason: {}, timeout: {:?}]",
            self.round, self.reason, self.timeout
        )
    }
}

/// Determines the maximum round duration based on the round difference between
/// the current round and the committed round
pub trait RoundTimeInterval: Send + Sync + 'static {
    /// Use the index of the round after the highest quorum certificate to
    /// commit a block and return the duration for this round
    ///
    /// Round indices start at 0 (round index = 0 is the first round after the
    /// round that led to the highest committed round).  Given that round r
    /// is the highest round to commit a block, then round index 0 is round
    /// r+1.  Note that for genesis does not follow the 3-chain rule for
    /// commits, so round 1 has round index 0.  For example, if one wants to
    /// calculate the round duration of round 6 and the highest committed round
    /// is 3 (meaning the highest round to commit a block is round 5, then
    /// the round index is 0.
    fn get_round_duration(
        &self, round_index_after_committed_qc: usize,
    ) -> Duration;
}

/// Round durations increase exponentially
/// Basically time interval is base * mul^power
/// Where power=max(rounds_since_qc, max_exponent)
#[derive(Clone)]
pub struct ExponentialTimeInterval {
    // Initial time interval duration after a successful quorum commit.
    base_ms: u64,
    // By how much we increase interval every time
    exponent_base: f64,
    // Maximum time interval won't exceed base * mul^max_pow.
    // Theoretically, setting it means
    // that we rely on synchrony assumptions when the known max messaging delay
    // is max_interval.  Alternatively, we can consider using max_interval
    // to meet partial synchrony assumptions where while delta is unknown,
    // it is <= max_interval.
    max_exponent: usize,
}

impl ExponentialTimeInterval {
    #[cfg(any(test, feature = "fuzzing"))]
    #[allow(unused)]
    pub fn fixed(duration: Duration) -> Self { Self::new(duration, 1.0, 0) }

    pub fn new(
        base: Duration, exponent_base: f64, max_exponent: usize,
    ) -> Self {
        assert!(
            max_exponent < 32,
            "max_exponent for RoundStateTimeInterval should be <32"
        );
        assert!(
            exponent_base.powf(max_exponent as f64).ceil()
                < f64::from(std::u32::MAX),
            "Maximum interval multiplier should be less then u32::Max"
        );
        ExponentialTimeInterval {
            base_ms: base.as_millis() as u64, /* any reasonable ms timeout
                                               * fits u64 perfectly */
            exponent_base,
            max_exponent,
        }
    }
}

impl RoundTimeInterval for ExponentialTimeInterval {
    fn get_round_duration(
        &self, round_index_after_committed_qc: usize,
    ) -> Duration {
        let pow = round_index_after_committed_qc.min(self.max_exponent) as u32;
        let base_multiplier = self.exponent_base.powf(f64::from(pow));
        let duration_ms =
            ((self.base_ms as f64) * base_multiplier).ceil() as u64;
        Duration::from_millis(duration_ms)
    }
}

/// `RoundState` contains information about a specific round and moves forward
/// when receives new certificates.
///
/// A round `r` starts in the following cases:
/// * there is a QuorumCert for round `r-1`,
/// * there is a TimeoutCertificate for round `r-1`.
///
/// Round interval calculation is the responsibility of the
/// RoundStateTimeoutInterval trait. It depends on the delta between the current
/// round and the highest committed round (the intuition is that we want to
/// exponentially grow the interval the further the current round is from the
/// last committed round).
///
/// Whenever a new round starts a local timeout is set following the round
/// interval. This local timeout is going to send the timeout events once in
/// interval until the new round starts.
pub struct RoundState {
    // Determines the time interval for a round given the number of
    // non-committed rounds since last commit.
    time_interval: Box<dyn RoundTimeInterval>,
    // Highest known committed round as reported by the caller. The caller
    // might choose not to inform the RoundState about certain committed
    // rounds (e.g., NIL blocks): in this case the committed round in
    // RoundState might lag behind the committed round of a block tree.
    highest_committed_round: Round,
    // Current round is max{highest_qc, highest_tc} + 1.
    current_round: Round,
    // The deadline for the next local timeout event. It is reset every time a
    // new round start, or a previous deadline expires.
    // Represents as Duration since UNIX_EPOCH.
    current_round_deadline: Duration,
    // Service for timer
    time_service: Arc<dyn TimeService>,
    // To send local timeout events to the subscriber (e.g., SMR)
    timeout_sender: channel::Sender<(u64, Round)>,
    // To send timeout events for proposal selection to the subscriber (e.g.,
    // SMR)
    proposal_timeout_sender: channel::Sender<(u64, Round)>,
    new_round_timeout_sender: channel::Sender<(u64, Round)>,
    new_round_sent: bool,
    // Votes received for the current round.
    pending_votes: PendingVotes,
    // Vote sent locally for the current round.
    vote_sent: Option<Vote>,
}

#[derive(Default, Schema)]
pub struct RoundStateLogSchema<'a> {
    round: Option<Round>,
    committed_round: Option<Round>,
    #[schema(display)]
    pending_votes: Option<&'a PendingVotes>,
    #[schema(display)]
    self_vote: Option<&'a Vote>,
}

impl<'a> RoundStateLogSchema<'a> {
    pub fn new(round_state: Option<&'a RoundState>) -> Self {
        match round_state {
            Some(state) => Self {
                round: Some(state.current_round),
                committed_round: Some(state.highest_committed_round),
                pending_votes: Some(&state.pending_votes),
                self_vote: state.vote_sent.as_ref(),
            },
            None => Self::default(),
        }
    }
}

impl RoundState {
    pub fn new(
        time_interval: Box<dyn RoundTimeInterval>,
        time_service: Arc<dyn TimeService>,
        timeout_sender: channel::Sender<(u64, Round)>,
        proposal_timeout_sender: channel::Sender<(u64, Round)>,
        new_round_timeout_sender: channel::Sender<(u64, Round)>,
    ) -> Self {
        // Our counters are initialized lazily, so they're not going to appear
        // in Prometheus if some conditions never happen. Invoking get()
        // function enforces creation.
        counters::QC_ROUNDS_COUNT.get();
        counters::TIMEOUT_ROUNDS_COUNT.get();
        counters::TIMEOUT_COUNT.get();

        Self {
            time_interval,
            highest_committed_round: 0,
            current_round: 0,
            current_round_deadline: time_service.get_current_timestamp(),
            time_service,
            timeout_sender,
            proposal_timeout_sender,
            new_round_timeout_sender,
            new_round_sent: false,
            pending_votes: PendingVotes::new(),
            vote_sent: None,
        }
    }

    /// Return the current round.
    pub fn current_round(&self) -> Round { self.current_round }

    /// Returns deadline for current round
    pub fn current_round_deadline(&self) -> Duration {
        self.current_round_deadline
    }

    /// In case the local timeout corresponds to the current round, reset the
    /// timeout and return true. Otherwise ignore and return false.
    pub fn process_local_timeout(&mut self, epoch_round: (u64, Round)) -> bool {
        diem_info!(round = epoch_round.1, "Local timeout");
        counters::TIMEOUT_COUNT.inc();
        self.setup_timeout(epoch_round.0);
        true
    }

    /// Notify the RoundState about the potentially new QC, TC, and highest
    /// committed round. Note that some of these values might not be
    /// available by the caller.
    pub fn process_certificates(
        &mut self, sync_info: SyncInfo,
    ) -> Option<NewRoundEvent> {
        if sync_info.highest_commit_round() > self.highest_committed_round {
            self.highest_committed_round = sync_info.highest_commit_round();
        }
        let new_round = sync_info.highest_round() + 1;
        if new_round > self.current_round {
            // Start a new round.
            self.current_round = new_round;
            self.new_round_sent = false;
            self.pending_votes = PendingVotes::new();
            self.vote_sent = None;
            let timeout = self.setup_timeout(sync_info.epoch());
            // The new round reason is QCReady in case both QC and TC are equal
            let new_round_reason =
                if sync_info.highest_timeout_certificate().is_none() {
                    NewRoundReason::QCReady
                } else {
                    NewRoundReason::Timeout
                };
            let new_round_event = NewRoundEvent {
                round: self.current_round,
                reason: new_round_reason,
                timeout,
            };
            diem_debug!(
                round = new_round,
                "Starting new round: {}",
                new_round_event
            );
            return Some(new_round_event);
        }
        None
    }

    pub fn insert_vote(
        &mut self, vote: &Vote, verifier: &ValidatorVerifier,
    ) -> VoteReceptionResult {
        if vote.vote_data().proposed().round() == self.current_round {
            self.pending_votes.insert_vote(vote, verifier)
        } else {
            VoteReceptionResult::UnexpectedRound(
                vote.vote_data().proposed().round(),
                self.current_round,
            )
        }
    }

    pub fn record_vote(&mut self, vote: Vote) {
        if vote.vote_data().proposed().round() == self.current_round {
            self.vote_sent = Some(vote);
        }
    }

    pub fn vote_sent(&self) -> Option<Vote> { self.vote_sent.clone() }

    pub fn get_round_certificate(
        &self, verifier: &ValidatorVerifier,
    ) -> VoteReceptionResult {
        self.pending_votes.get_certificate(verifier)
    }

    /// Setup the timeout task and return the duration of the current timeout
    fn setup_timeout(&mut self, epoch: u64) -> Duration {
        let timeout_sender = self.timeout_sender.clone();
        let timeout = self.setup_deadline();
        diem_trace!(
            "Scheduling timeout of {} ms for round {}",
            timeout.as_millis(),
            self.current_round
        );
        self.time_service.run_after(
            timeout,
            SendTask::make(timeout_sender, (epoch, self.current_round)),
        );
        timeout
    }

    /// Setup the current round deadline and return the duration of the current
    /// round
    fn setup_deadline(&mut self) -> Duration {
        let round_index_after_committed_round =
            self.get_round_index_after_committed_round();
        let timeout = self
            .time_interval
            .get_round_duration(round_index_after_committed_round);
        let now = self.time_service.get_current_timestamp();
        diem_debug!(
            round = self.current_round,
            "{:?} passed since the previous deadline.",
            now.checked_sub(self.current_round_deadline)
                .map_or("0 ms".to_string(), |v| format!("{:?}", v))
        );
        diem_debug!(
            round = self.current_round,
            "Set round deadline to {:?} from now",
            timeout
        );
        self.current_round_deadline = now + timeout;
        timeout
    }

    /// Setup the timeout task and return the duration of the current timeout
    pub fn setup_proposal_timeout(&self, epoch: u64) -> Duration {
        let proposal_timeout_sender = self.proposal_timeout_sender.clone();
        let timeout = self.setup_proposal_deadline();
        diem_trace!(
            "Scheduling proposal selection timeout of {} ms for round {}",
            timeout.as_millis(),
            self.current_round
        );
        self.time_service.run_after(
            timeout,
            SendTask::make(
                proposal_timeout_sender,
                (epoch, self.current_round),
            ),
        );
        timeout
    }

    /// Setup the timeout task and return the duration of the current timeout
    pub fn setup_new_round_timeout(&mut self, epoch: u64) -> Option<Duration> {
        if self.new_round_sent {
            return None;
        }
        let new_round_timeout_sender = self.new_round_timeout_sender.clone();
        let timeout = self.setup_new_round_deadline();
        diem_trace!(
            "Scheduling new round timeout of {} ms for round {}",
            timeout.as_millis(),
            self.current_round
        );
        self.time_service.run_after(
            timeout,
            SendTask::make(
                new_round_timeout_sender,
                (epoch, self.current_round),
            ),
        );
        self.new_round_sent = true;
        Some(timeout)
    }

    /// TODO(lpl): Decide a proper timeout setting.
    /// Currently it's set to half the round timeout.
    fn setup_proposal_deadline(&self) -> Duration {
        let round_index_after_committed_round =
            self.get_round_index_after_committed_round();
        let timeout = self
            .time_interval
            .get_round_duration(round_index_after_committed_round)
            / 2;
        diem_debug!(
            round = self.current_round,
            "Set proposal selection deadline to {:?} from now",
            timeout
        );
        timeout
    }

    fn setup_new_round_deadline(&self) -> Duration {
        let round_index_after_committed_round =
            self.get_round_index_after_committed_round();
        let timeout = self
            .time_interval
            .get_round_duration(round_index_after_committed_round)
            / 2;
        diem_debug!(
            round = self.current_round,
            "Set proposal selection deadline to {:?} from now",
            timeout
        );
        timeout
    }

    fn get_round_index_after_committed_round(&self) -> usize {
        let round_index_after_committed_round = {
            if self.highest_committed_round == 0 {
                // Genesis doesn't require the 3-chain rule for commit, hence
                // start the index at the round after genesis.
                self.current_round - 1
            } else if self.current_round < self.highest_committed_round + 3 {
                0
            } else {
                self.current_round - self.highest_committed_round - 3
            }
        } as usize;
        round_index_after_committed_round
    }

    pub fn vote_received(&self, vote: &Vote) -> bool {
        self.pending_votes.vote_received(vote)
    }
}