diem_secure_push_metrics/
lib.rs1#![forbid(unsafe_code)]
9
10pub use diem_metrics_core::{
12 register_histogram, register_histogram_vec, register_int_counter,
13 register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
14 Histogram, HistogramTimer, HistogramVec, IntCounter, IntCounterVec,
15 IntGauge, IntGaugeVec,
16};
17
18use diem_logger::{error as diem_error, info as diem_info};
19use diem_metrics_core::{Encoder, TextEncoder};
20use std::{
21 env,
22 sync::mpsc,
23 thread::{self, JoinHandle},
24 time::Duration,
25};
26
27const DEFAULT_PUSH_FREQUENCY_SECS: u64 = 15;
28
29#[must_use = "Assign the contructed pusher to a variable, \
32 otherwise the worker thread is joined immediately."]
33pub struct MetricsPusher {
34 worker_thread: Option<JoinHandle<()>>,
35 quit_sender: mpsc::Sender<()>,
36}
37
38impl MetricsPusher {
39 fn push(push_metrics_endpoint: &str) {
40 let mut buffer = Vec::new();
41
42 if let Err(e) =
43 TextEncoder::new().encode(&diem_metrics_core::gather(), &mut buffer)
44 {
45 diem_error!("Failed to encode push metrics: {}.", e.to_string());
46 } else {
47 let response = ureq::post(&push_metrics_endpoint)
48 .timeout_connect(10_000)
49 .send_bytes(&buffer);
50 if let Some(error) = response.synthetic_error() {
51 diem_error!(
52 "Failed to push metrics to {}. Error: {}",
53 push_metrics_endpoint,
54 error
55 );
56 }
57 }
58 }
59
60 fn worker(
61 quit_receiver: mpsc::Receiver<()>, push_metrics_endpoint: String,
62 push_metrics_frequency_secs: u64,
63 ) {
64 while quit_receiver
65 .recv_timeout(Duration::from_secs(push_metrics_frequency_secs))
66 .is_err()
67 {
68 Self::push(&push_metrics_endpoint);
70 }
71 Self::push(&push_metrics_endpoint);
73 }
74
75 fn start_worker_thread(
76 quit_receiver: mpsc::Receiver<()>,
77 ) -> Option<JoinHandle<()>> {
78 let push_metrics_endpoint = match env::var("PUSH_METRICS_ENDPOINT") {
80 Ok(s) => s,
81 Err(_) => {
82 diem_info!("PUSH_METRICS_ENDPOINT env var is not set. Skipping sending metrics.");
83 return None;
84 }
85 };
86 let push_metrics_frequency_secs =
87 match env::var("PUSH_METRICS_FREQUENCY_SECS") {
88 Ok(s) => match s.parse::<u64>() {
89 Ok(i) => i,
90 Err(_) => {
91 diem_error!(
92 "Invalid value for PUSH_METRICS_FREQUENCY_SECS: {}",
93 s
94 );
95 return None;
96 }
97 },
98 Err(_) => DEFAULT_PUSH_FREQUENCY_SECS,
99 };
100 diem_info!(
101 "Starting push metrics loop. Sending metrics to {} with a frequency of {} seconds",
102 push_metrics_endpoint, push_metrics_frequency_secs
103 );
104 Some(thread::spawn(move || {
105 Self::worker(
106 quit_receiver,
107 push_metrics_endpoint,
108 push_metrics_frequency_secs,
109 )
110 }))
111 }
112
113 pub fn start() -> Self {
116 let (tx, rx) = mpsc::channel();
117 let worker_thread = Self::start_worker_thread(rx);
118
119 Self {
120 worker_thread,
121 quit_sender: tx,
122 }
123 }
124
125 pub fn join(&mut self) {
126 if let Some(worker_thread) = self.worker_thread.take() {
127 if let Err(e) = self.quit_sender.send(()) {
128 diem_error!(
129 "Failed to send quit signal to metric pushing worker thread: {:?}",
130 e
131 );
132 }
133 if let Err(e) = worker_thread.join() {
134 diem_error!(
135 "Failed to join metric pushing worker thread: {:?}",
136 e
137 );
138 }
139 }
140 }
141}
142
143impl Drop for MetricsPusher {
144 fn drop(&mut self) { self.join() }
145}