1use crate::{
6 report::{report_async, FileReporter, Reportable},
7 report_influxdb::{InfluxdbReportable, InfluxdbReporter},
8 report_prometheus::{PrometheusReportable, PrometheusReporter},
9};
10use cfx_tasks::TaskExecutor;
11use duration_str::deserialize_duration;
12use log::{error, info};
13use serde::{Deserialize, Serialize};
14use std::{
15 sync::atomic::{AtomicBool, Ordering},
16 time::Duration,
17};
18
19pub const ORDER: Ordering = Ordering::Relaxed;
20
21static ENABLED: AtomicBool = AtomicBool::new(false);
22static STOPPED: AtomicBool = AtomicBool::new(false);
23
24pub fn is_enabled() -> bool { ENABLED.load(ORDER) }
25
26pub fn enable() { ENABLED.store(true, ORDER); }
27
28pub fn stop() { STOPPED.store(true, ORDER); }
30
31pub fn is_stopped() -> bool { STOPPED.load(ORDER) }
32
33pub trait Metric:
34 Send + Sync + Reportable + InfluxdbReportable + PrometheusReportable
35{
36 fn get_type(&self) -> &str;
37}
38
39#[derive(Debug, Serialize, Deserialize, Clone)]
40#[serde(default)]
41pub struct MetricsConfiguration {
42 pub enabled: bool,
43
44 #[serde(deserialize_with = "deserialize_duration")]
45 pub report_interval: Duration,
46
47 pub file_report_output: Option<String>,
48
49 pub influxdb_report_host: Option<String>,
50 pub influxdb_report_db: String,
51 pub influxdb_report_username: Option<String>,
52 pub influxdb_report_password: Option<String>,
53 pub influxdb_report_node: Option<String>,
54 pub prometheus_listen_addr: Option<String>,
55}
56
57impl Default for MetricsConfiguration {
58 fn default() -> Self {
59 Self {
60 enabled: false,
61 report_interval: Duration::from_secs(10),
62 file_report_output: None,
63 influxdb_report_host: None,
64 influxdb_report_db: "".into(),
65 influxdb_report_username: None,
66 influxdb_report_password: None,
67 influxdb_report_node: None,
68 prometheus_listen_addr: None,
69 }
70 }
71}
72
73pub fn initialize(config: MetricsConfiguration, executor: TaskExecutor) {
74 info!("Initializing metrics with config: {:?}", config);
75 if !config.enabled {
76 return;
77 }
78
79 enable();
80
81 if let Some(output) = config.file_report_output {
83 let reporter = FileReporter::new(output);
84 report_async(reporter, config.report_interval);
85 }
86
87 if let Some(host) = config.influxdb_report_host {
89 let mut auth = None;
90
91 if let Some(username) = config.influxdb_report_username {
92 if let Some(password) = config.influxdb_report_password {
93 auth = Some((username, password));
94 }
95 }
96
97 let mut reporter = match auth {
98 Some((username, password)) => InfluxdbReporter::with_auth(
99 host,
100 config.influxdb_report_db,
101 username,
102 password,
103 ),
104 None => InfluxdbReporter::new(host, config.influxdb_report_db),
105 };
106
107 if let Some(node) = config.influxdb_report_node {
108 reporter.add_tag("node".into(), node);
109 }
110
111 report_async(reporter, config.report_interval);
112 }
113
114 if let Some(addr) = config.prometheus_listen_addr {
117 match PrometheusReporter::new(&addr, executor) {
118 Ok(reporter) => {
119 info!("Initializing PrometheusReporter to listen on {}", addr);
120 match reporter.start_http_server() {
121 Ok(_) => {
122 info!("PrometheusReporter started successfully");
123 }
124 Err(e) => {
125 error!("Failed to start PrometheusReporter: {}", e);
126 }
127 }
128 }
129 Err(e) => {
130 error!("Failed to initialize PrometheusReporter: {}", e);
131 }
132 }
133 }
134}