diem_secure_push_metrics/
lib.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8#![forbid(unsafe_code)]
9
10// Re-export counter types from prometheus crate
11pub use diem_metrics_core::{
12    register_histogram, register_histogram_vec, register_int_counter,
13    register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
14    Histogram, HistogramTimer, HistogramVec, IntCounter, IntCounterVec,
15    IntGauge, IntGaugeVec,
16};
17
18use diem_logger::{error as diem_error, info as diem_info};
19use diem_metrics_core::{Encoder, TextEncoder};
20use std::{
21    env,
22    sync::mpsc,
23    thread::{self, JoinHandle},
24    time::Duration,
25};
26
27const DEFAULT_PUSH_FREQUENCY_SECS: u64 = 15;
28
29/// MetricsPusher provides a function to push a list of Metrics to a
30/// configurable pushgateway endpoint.
31#[must_use = "Assign the contructed pusher to a variable, \
32              otherwise the worker thread is joined immediately."]
33pub struct MetricsPusher {
34    worker_thread: Option<JoinHandle<()>>,
35    quit_sender: mpsc::Sender<()>,
36}
37
38impl MetricsPusher {
39    fn push(push_metrics_endpoint: &str) {
40        let mut buffer = Vec::new();
41
42        if let Err(e) =
43            TextEncoder::new().encode(&diem_metrics_core::gather(), &mut buffer)
44        {
45            diem_error!("Failed to encode push metrics: {}.", e.to_string());
46        } else {
47            let response = ureq::post(&push_metrics_endpoint)
48                .timeout_connect(10_000)
49                .send_bytes(&buffer);
50            if let Some(error) = response.synthetic_error() {
51                diem_error!(
52                    "Failed to push metrics to {}. Error: {}",
53                    push_metrics_endpoint,
54                    error
55                );
56            }
57        }
58    }
59
60    fn worker(
61        quit_receiver: mpsc::Receiver<()>, push_metrics_endpoint: String,
62        push_metrics_frequency_secs: u64,
63    ) {
64        while quit_receiver
65            .recv_timeout(Duration::from_secs(push_metrics_frequency_secs))
66            .is_err()
67        {
68            // Timeout, no quit signal received.
69            Self::push(&push_metrics_endpoint);
70        }
71        // final push
72        Self::push(&push_metrics_endpoint);
73    }
74
75    fn start_worker_thread(
76        quit_receiver: mpsc::Receiver<()>,
77    ) -> Option<JoinHandle<()>> {
78        // eg value for PUSH_METRICS_ENDPOINT: "http://pushgateway.server.com:9091/metrics/job/safety_rules"
79        let push_metrics_endpoint = match env::var("PUSH_METRICS_ENDPOINT") {
80            Ok(s) => s,
81            Err(_) => {
82                diem_info!("PUSH_METRICS_ENDPOINT env var is not set. Skipping sending metrics.");
83                return None;
84            }
85        };
86        let push_metrics_frequency_secs =
87            match env::var("PUSH_METRICS_FREQUENCY_SECS") {
88                Ok(s) => match s.parse::<u64>() {
89                    Ok(i) => i,
90                    Err(_) => {
91                        diem_error!(
92                            "Invalid value for PUSH_METRICS_FREQUENCY_SECS: {}",
93                            s
94                        );
95                        return None;
96                    }
97                },
98                Err(_) => DEFAULT_PUSH_FREQUENCY_SECS,
99            };
100        diem_info!(
101            "Starting push metrics loop. Sending metrics to {} with a frequency of {} seconds",
102            push_metrics_endpoint, push_metrics_frequency_secs
103        );
104        Some(thread::spawn(move || {
105            Self::worker(
106                quit_receiver,
107                push_metrics_endpoint,
108                push_metrics_frequency_secs,
109            )
110        }))
111    }
112
113    /// start starts a new thread and periodically pushes the metrics to a
114    /// pushgateway endpoint
115    pub fn start() -> Self {
116        let (tx, rx) = mpsc::channel();
117        let worker_thread = Self::start_worker_thread(rx);
118
119        Self {
120            worker_thread,
121            quit_sender: tx,
122        }
123    }
124
125    pub fn join(&mut self) {
126        if let Some(worker_thread) = self.worker_thread.take() {
127            if let Err(e) = self.quit_sender.send(()) {
128                diem_error!(
129                    "Failed to send quit signal to metric pushing worker thread: {:?}",
130                    e
131                );
132            }
133            if let Err(e) = worker_thread.join() {
134                diem_error!(
135                    "Failed to join metric pushing worker thread: {:?}",
136                    e
137                );
138            }
139        }
140    }
141}
142
143impl Drop for MetricsPusher {
144    fn drop(&mut self) { self.join() }
145}