1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

// Copyright 2021 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/

#![forbid(unsafe_code)]

// Re-export counter types from prometheus crate
pub use diem_metrics_core::{
    register_histogram, register_histogram_vec, register_int_counter,
    register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
    Histogram, HistogramTimer, HistogramVec, IntCounter, IntCounterVec,
    IntGauge, IntGaugeVec,
};

use diem_logger::{error as diem_error, info as diem_info};
use diem_metrics_core::{Encoder, TextEncoder};
use std::{
    env,
    sync::mpsc,
    thread::{self, JoinHandle},
    time::Duration,
};

const DEFAULT_PUSH_FREQUENCY_SECS: u64 = 15;

/// MetricsPusher provides a function to push a list of Metrics to a
/// configurable pushgateway endpoint.
#[must_use = "Assign the contructed pusher to a variable, \
              otherwise the worker thread is joined immediately."]
pub struct MetricsPusher {
    worker_thread: Option<JoinHandle<()>>,
    quit_sender: mpsc::Sender<()>,
}

impl MetricsPusher {
    fn push(push_metrics_endpoint: &str) {
        let mut buffer = Vec::new();

        if let Err(e) =
            TextEncoder::new().encode(&diem_metrics_core::gather(), &mut buffer)
        {
            diem_error!("Failed to encode push metrics: {}.", e.to_string());
        } else {
            let response = ureq::post(&push_metrics_endpoint)
                .timeout_connect(10_000)
                .send_bytes(&buffer);
            if let Some(error) = response.synthetic_error() {
                diem_error!(
                    "Failed to push metrics to {}. Error: {}",
                    push_metrics_endpoint,
                    error
                );
            }
        }
    }

    fn worker(
        quit_receiver: mpsc::Receiver<()>, push_metrics_endpoint: String,
        push_metrics_frequency_secs: u64,
    ) {
        while quit_receiver
            .recv_timeout(Duration::from_secs(push_metrics_frequency_secs))
            .is_err()
        {
            // Timeout, no quit signal received.
            Self::push(&push_metrics_endpoint);
        }
        // final push
        Self::push(&push_metrics_endpoint);
    }

    fn start_worker_thread(
        quit_receiver: mpsc::Receiver<()>,
    ) -> Option<JoinHandle<()>> {
        // eg value for PUSH_METRICS_ENDPOINT: "http://pushgateway.server.com:9091/metrics/job/safety_rules"
        let push_metrics_endpoint = match env::var("PUSH_METRICS_ENDPOINT") {
            Ok(s) => s,
            Err(_) => {
                diem_info!("PUSH_METRICS_ENDPOINT env var is not set. Skipping sending metrics.");
                return None;
            }
        };
        let push_metrics_frequency_secs =
            match env::var("PUSH_METRICS_FREQUENCY_SECS") {
                Ok(s) => match s.parse::<u64>() {
                    Ok(i) => i,
                    Err(_) => {
                        diem_error!(
                            "Invalid value for PUSH_METRICS_FREQUENCY_SECS: {}",
                            s
                        );
                        return None;
                    }
                },
                Err(_) => DEFAULT_PUSH_FREQUENCY_SECS,
            };
        diem_info!(
            "Starting push metrics loop. Sending metrics to {} with a frequency of {} seconds",
            push_metrics_endpoint, push_metrics_frequency_secs
        );
        Some(thread::spawn(move || {
            Self::worker(
                quit_receiver,
                push_metrics_endpoint,
                push_metrics_frequency_secs,
            )
        }))
    }

    /// start starts a new thread and periodically pushes the metrics to a
    /// pushgateway endpoint
    pub fn start() -> Self {
        let (tx, rx) = mpsc::channel();
        let worker_thread = Self::start_worker_thread(rx);

        Self {
            worker_thread,
            quit_sender: tx,
        }
    }

    pub fn join(&mut self) {
        if let Some(worker_thread) = self.worker_thread.take() {
            if let Err(e) = self.quit_sender.send(()) {
                diem_error!(
                    "Failed to send quit signal to metric pushing worker thread: {:?}",
                    e
                );
            }
            if let Err(e) = worker_thread.join() {
                diem_error!(
                    "Failed to join metric pushing worker thread: {:?}",
                    e
                );
            }
        }
    }
}

impl Drop for MetricsPusher {
    fn drop(&mut self) { self.join() }
}